In [22]:
!pip install kafka-python
!pip install avro-python3



In [23]:
# import required libraries
from kafka import KafkaConsumer, KafkaProducer
import avro.schema
import avro.io
import io
import hashlib, json

In [24]:
def serialize(schema, obj):
    bytes_writer = io.BytesIO()
    encoder = avro.io.BinaryEncoder(bytes_writer)
    writer = avro.io.DatumWriter(schema)
    writer.write(obj, encoder)
    return bytes_writer.getvalue()

In [25]:
def deserialize(schema, raw_bytes):
    bytes_reader = io.BytesIO(raw_bytes)
    decoder = avro.io.BinaryDecoder(bytes_reader)
    reader = avro.io.DatumReader(schema)
    return reader.read(decoder)

In [26]:
#note: you must have the following file in the same directory

schema_file = 'transaction.avsc'
txschema = avro.schema.parse(open(schema_file).read())
schema_file = 'submit.avsc'
submitschema = avro.schema.parse(open(schema_file).read())
schema_file = 'result.avsc'
resultschema = avro.schema.parse(open(schema_file).read())

In [27]:
# Connect to kafka broker running in your local host (docker). Change this to your kafka broker if needed
kafka_broker = 'lab.aimet.tech :9092'

In [28]:
producer = KafkaProducer(bootstrap_servers=[kafka_broker])

In [29]:
txconsumer = KafkaConsumer(
    'transaction',
     bootstrap_servers=[kafka_broker],
     enable_auto_commit=True,
     value_deserializer=lambda x: deserialize(txschema, x))
resultconsumer = KafkaConsumer(
    'result',
     bootstrap_servers=[kafka_broker],
     enable_auto_commit=True,
     value_deserializer=lambda x: deserialize(resultschema, x))



In [30]:
def gen_signature(txid, payer, payee, amount, token):
    o = {'txid': txid, 'payer': payer, 'payee': payee, 'amount': amount, 'token': token}
    return hashlib.md5(json.dumps(o, sort_keys=True).encode('utf-8')).hexdigest()

In [32]:
# Main loop for processing transactions

message_count_limit = 5
message_count = 0
result_message_count_limit = 5
result_message_count = 0

for message in txconsumer:
    # Extract transaction information
    tx_data = message.value
    txid = tx_data['txid']
    payer = tx_data['payer']
    payee = tx_data['payee']
    amount = tx_data['amount']
    print(tx_data,txid,payer,payee,amount)
    # Generate signature
    signature = gen_signature(txid, payer, payee, amount, token="50e71bd809146c6c3b944ed9341346f9")

    # Prepare verification data
    verification_data = {'vid': 'V298128', 'txid': txid, 'signature': signature}

    # Serialize verification data
    serialized_verification_data = serialize(submitschema, verification_data)

    # Send verification data to 'submit' topic
    producer.send('submit', value=serialized_verification_data)

    message_count += 1
    # Check if message count limit exceeded
    if message_count >= message_count_limit:
        break


# Function to handle result messages
def handle_result_message(result_message):
    # Extract result information
    result_data = result_message.value
    timestamp = result_data['timestamp']
    vid = result_data['vid']
    txid = result_data['txid']
    checksum = result_data['checksum']

    # Print the result information
    print("Timestamp:", timestamp)
    print("VID:", vid)
    print("TXID:", txid)
    print("Checksum:", checksum)

# Main loop for processing result messages
for result_message in resultconsumer:
    # Handle result message
    handle_result_message(result_message)

    result_message_count += 1
    # Check if result_message count limit exceeded
    if result_message_count >= result_message_count_limit:
        break

{'txid': 'TX08288', 'payer': 'A60228', 'payee': 'A35093', 'amount': 1552} TX08288 A60228 A35093 1552
{'txid': 'TX01542', 'payer': 'A10275', 'payee': 'A17602', 'amount': 758} TX01542 A10275 A17602 758
{'txid': 'TX01663', 'payer': 'A82882', 'payee': 'A41303', 'amount': 1877} TX01663 A82882 A41303 1877
{'txid': 'TX07258', 'payer': 'A59857', 'payee': 'A49700', 'amount': 620} TX07258 A59857 A49700 620
{'txid': 'TX03072', 'payer': 'A45162', 'payee': 'A66657', 'amount': 174} TX03072 A45162 A66657 174
Timestamp: 1711116394
VID: V298128
TXID: TX08288
Checksum: b9ab4879746a9fff4509c89572dd659f
Timestamp: 1711116395
VID: V298128
TXID: TX01542
Checksum: 6df33a9d8716ec47329f857d74fcd392
Timestamp: 1711116396
VID: V298128
TXID: TX01663
Checksum: cf82bdad448564962098dae1f9c54dcf
Timestamp: 1711116397
VID: V298128
TXID: TX07258
Checksum: c9f6a5092b688a2e17fe607850d90cf1
Timestamp: 1711116398
VID: V298128
TXID: TX03072
Checksum: b48e26284b39e6410ffff93378f592f3
