In [2]:
# import required libraries
from kafka import KafkaConsumer, KafkaProducer
import avro.schema
import avro.io
import io
import hashlib, json

In [3]:
def serialize(schema, obj):
    bytes_writer = io.BytesIO()
    encoder = avro.io.BinaryEncoder(bytes_writer)
    writer = avro.io.DatumWriter(schema)
    writer.write(obj, encoder)
    return bytes_writer.getvalue()

In [4]:
def deserialize(schema, raw_bytes):
    bytes_reader = io.BytesIO(raw_bytes)
    decoder = avro.io.BinaryDecoder(bytes_reader)
    reader = avro.io.DatumReader(schema)
    return reader.read(decoder)

In [5]:
schema_file = 'transaction.avsc'
txschema = avro.schema.parse(open(schema_file).read())
schema_file = 'submit.avsc'
submitschema = avro.schema.parse(open(schema_file).read())
schema_file = 'result.avsc'
resultschema = avro.schema.parse(open(schema_file).read())

In [6]:
# Connect to kafka broker running in your local host (docker). Change this to your kafka broker if needed
kafka_broker = 'lab.aimet.tech:9092'

In [7]:
producer = KafkaProducer(bootstrap_servers=[kafka_broker])

In [8]:
txconsumer = KafkaConsumer(
    'transaction',
     bootstrap_servers=[kafka_broker],
     enable_auto_commit=True,
     value_deserializer=lambda x: deserialize(txschema, x))
resultconsumer = KafkaConsumer(
    'result', 
     bootstrap_servers=[kafka_broker],
     enable_auto_commit=True,
     value_deserializer=lambda x: deserialize(resultschema, x))

In [9]:
def gen_signature(txid, payer, payee, amount, token):
    o = {'txid': txid, 'payer': payer, 'payee': payee, 'amount': amount, 'token': token}
    return hashlib.md5(json.dumps(o, sort_keys=True).encode('utf-8')).hexdigest()

# Assignment

In [11]:
def submit_verification(vid, txid, signature):
    submit_data = {
        'vid': vid,
        'txid': txid,
        'signature': signature
    }
    
    serialized_data = serialize(submitschema, submit_data)
    producer.send('submit', serialized_data)
    producer.flush()
    return True

In [12]:
VID = 'V878613'
TOKEN = 'c6f697f7d57d6cc9bed218454300ddcd'

In [13]:
for tx_message in txconsumer:
    tx_data = tx_message.value
    txid = tx_data['txid']
    payer = tx_data['payer']
    payee = tx_data['payee']
    amount = tx_data['amount']

    signature = gen_signature(txid, payer, payee, amount, TOKEN)
    submit_verification(VID, txid, signature)

    transaction_verification = False
    for message in resultconsumer:
        result = message.value
        if result['vid'] == VID and result['txid'] == txid:
            if result['code'] == '200' or result['code'] == 200:
                print(f"Transaction {txid} is OK")
                print(result)
                print('-'*50)
            else:
                print(f"Transaction {txid} is NOT OK with {result['code']}")
            break

Transaction TX05672 is OK
{'timestamp': 1743922885, 'vid': 'V878613', 'txid': 'TX05672', 'checksum': '1c7ba407fad45ca154e6a1a4e0230c4d', 'code': 200, 'message': 'Confirm'}
--------------------------------------------------
Transaction TX05119 is OK
{'timestamp': 1743922892, 'vid': 'V878613', 'txid': 'TX05119', 'checksum': '3a7c62ea2d9ae770e5ebfa12f5221793', 'code': 200, 'message': 'Confirm'}
--------------------------------------------------
Transaction TX04519 is OK
{'timestamp': 1743922898, 'vid': 'V878613', 'txid': 'TX04519', 'checksum': '42ff7ad89d3fb187aea3bc43af1c0792', 'code': 200, 'message': 'Confirm'}
--------------------------------------------------
Transaction TX08806 is OK
{'timestamp': 1743922908, 'vid': 'V878613', 'txid': 'TX08806', 'checksum': '9b96ae6ecc9ec874a055a4982fba5dba', 'code': 200, 'message': 'Confirm'}
--------------------------------------------------
Transaction TX02491 is OK
{'timestamp': 1743922915, 'vid': 'V878613', 'txid': 'TX02491', 'checksum': 'fbdefb

KeyboardInterrupt: 