In [1]:
# import required libraries
from kafka import KafkaConsumer, KafkaProducer
import avro.schema
import avro.io
import io
import hashlib, json

In [2]:
def serialize(schema, obj):
    bytes_writer = io.BytesIO()
    encoder = avro.io.BinaryEncoder(bytes_writer)
    writer = avro.io.DatumWriter(schema)
    writer.write(obj, encoder)
    return bytes_writer.getvalue()

In [3]:
def deserialize(schema, raw_bytes):
    bytes_reader = io.BytesIO(raw_bytes)
    decoder = avro.io.BinaryDecoder(bytes_reader)
    reader = avro.io.DatumReader(schema)
    return reader.read(decoder)

In [4]:
schema_file = 'transaction.avsc'
txschema = avro.schema.parse(open(schema_file).read())
schema_file = 'submit.avsc'
submitschema = avro.schema.parse(open(schema_file).read())
schema_file = 'result.avsc'
resultschema = avro.schema.parse(open(schema_file).read())

In [5]:
# Connect to kafka broker running in your local host (docker). Change this to your kafka broker if needed
kafka_broker = 'lab.aimet.tech:9092'

In [6]:
producer = KafkaProducer(bootstrap_servers=[kafka_broker])

In [7]:
txconsumer = KafkaConsumer(
    'transaction',
     bootstrap_servers=[kafka_broker],
     enable_auto_commit=True,
     value_deserializer=lambda x: deserialize(txschema, x))
resultconsumer = KafkaConsumer(
    'result',
     bootstrap_servers=[kafka_broker],
     enable_auto_commit=True,
     value_deserializer=lambda x: deserialize(resultschema, x))

In [8]:
def gen_signature(txid, payer, payee, amount, token):
    o = {'txid': txid, 'payer': payer, 'payee': payee, 'amount': amount, 'token': token}
    return hashlib.md5(json.dumps(o, sort_keys=True).encode('utf-8')).hexdigest()

In [19]:
vid = 'V380180'
token = 'c62ae5ae250fda712f22f36a42206712'

answer = None
for message in txconsumer:
    tx = message.value
    print("Received transaction: ", tx)
    submit = {
        'vid': vid,
        'txid': tx['txid'],
        'signature': gen_signature(tx['txid'], tx['payer'], tx['payee'], tx['amount'], token)
    }
    print("Submitting transaction: ", submit)
    producer.send('submit', serialize(submitschema, submit))
    for message in resultconsumer:
        result = message.value
        if result['txid'] == tx['txid']:
            answer = result
            break
    if answer:
        print("Received result: ", answer)
        break
    else:
        print("No result received for txid: ", tx['txid'])

Received transaction:  {'txid': 'TX08993', 'payer': 'A58045', 'payee': 'A05683', 'amount': 1153}
Submitting transaction:  {'vid': 'V380180', 'txid': 'TX08993', 'signature': 'b38db7e6064aea03114779f228c3e73c'}
Received result:  {'timestamp': 1711122173, 'vid': 'V380180', 'txid': 'TX08993', 'checksum': 'b1610e853e37bab16c775def4b294034', 'code': 200, 'message': 'Confirm'}
