In [1]:
# import required libraries
from kafka import KafkaConsumer, KafkaProducer
import avro.schema
import avro.io
import io
import hashlib, json

In [2]:
def serialize(schema, obj):
    bytes_writer = io.BytesIO()
    encoder = avro.io.BinaryEncoder(bytes_writer)
    writer = avro.io.DatumWriter(schema)
    writer.write(obj, encoder)
    return bytes_writer.getvalue()

In [3]:
def deserialize(schema, raw_bytes):
    bytes_reader = io.BytesIO(raw_bytes)
    decoder = avro.io.BinaryDecoder(bytes_reader)
    reader = avro.io.DatumReader(schema)
    return reader.read(decoder)

In [4]:
schema_file = 'transaction.avsc'
txschema = avro.schema.parse(open(schema_file).read())
schema_file = 'submit.avsc'
submitschema = avro.schema.parse(open(schema_file).read())
schema_file = 'result.avsc'
resultschema = avro.schema.parse(open(schema_file).read())

In [5]:
# Connect to kafka broker running in your local host (docker). Change this to your kafka broker if needed
kafka_broker = 'lab.aimet.tech:9092'

In [6]:
producer = KafkaProducer(bootstrap_servers=[kafka_broker])

In [7]:
txconsumer = KafkaConsumer(
    'transaction',
     bootstrap_servers=[kafka_broker],
     enable_auto_commit=True,
     value_deserializer=lambda x: deserialize(txschema, x))
resultconsumer = KafkaConsumer(
    'result',
     bootstrap_servers=[kafka_broker],
     enable_auto_commit=True,
     value_deserializer=lambda x: deserialize(resultschema, x))

In [8]:
def gen_signature(txid, payer, payee, amount, token):
    o = {'txid': txid, 'payer': payer, 'payee': payee, 'amount': amount, 'token': token}
    return hashlib.md5(json.dumps(o, sort_keys=True).encode('utf-8')).hexdigest()

In [9]:
vid = 'V570882'
token = '1df91d828519e519b420639f24c9b42e'

In [10]:
transaction = None
#Get 1 transaction
for message in txconsumer:
    transaction = message
    break
print(transaction)

ConsumerRecord(topic='transaction', partition=0, offset=6792, timestamp=1731472368961, timestamp_type=0, key=None, value={'txid': 'TX08942', 'payer': 'A53105', 'payee': 'A03336', 'amount': 350}, headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=24, serialized_header_size=-1)


In [11]:
#Generate signature based on given data
signature = gen_signature(transaction.value['txid'],transaction.value['payer'], transaction.value['payee'], transaction.value['amount'], token)
print(signature)

defb4946625d4f5ec2c0d994b44ca2b2


In [12]:
#Prepare data to be serialize
to_submit = {'vid': vid, 'txid': transaction.value['txid'], 'signature': signature}
print(to_submit)

#Serialize data
data = serialize(submitschema, to_submit)
print(data)

#Submit data
producer.send('result', data)

{'vid': 'V570882', 'txid': 'TX08942', 'signature': 'defb4946625d4f5ec2c0d994b44ca2b2'}
b'\x0eV570882\x0eTX08942@defb4946625d4f5ec2c0d994b44ca2b2'


<kafka.producer.future.FutureRecordMetadata at 0x203b663d7d0>

In [13]:
result = None
#Get a result that matches with vid and txid
for message in resultconsumer:
    print(message)
    if message.value['vid'] == vid and message.value['txid'] == transaction.value['txid']:
        result = message
    break
print(result)

ConsumerRecord(topic='result', partition=0, offset=17, timestamp=1731472369485, timestamp_type=0, key=None, value={'timestamp': 1731472368, 'vid': 'V281741', 'txid': 'TX08725', 'checksum': '6db3ba967b872b2a58b961b2b24549e8', 'code': 200, 'message': 'Confirm'}, headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=64, serialized_header_size=-1)
None


In [14]:
for message in resultconsumer:
    print(message.value)

KeyboardInterrupt: 