In [4]:
!pip install kafka-python
!pip install avro

Collecting avro
  Downloading avro-1.12.0-py2.py3-none-any.whl.metadata (1.7 kB)
Downloading avro-1.12.0-py2.py3-none-any.whl (124 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m124.2/124.2 kB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: avro
Successfully installed avro-1.12.0


In [37]:
# import required libraries
from kafka import KafkaConsumer, KafkaProducer
import avro.schema
import avro.io
import io
import hashlib, json

In [6]:
def serialize(schema, obj):
    bytes_writer = io.BytesIO()
    encoder = avro.io.BinaryEncoder(bytes_writer)
    writer = avro.io.DatumWriter(schema)
    writer.write(obj, encoder)
    return bytes_writer.getvalue()

In [7]:
def deserialize(schema, raw_bytes):
    bytes_reader = io.BytesIO(raw_bytes)
    decoder = avro.io.BinaryDecoder(bytes_reader)
    reader = avro.io.DatumReader(schema)
    return reader.read(decoder)

In [10]:
schema_file = 'transaction.avsc'
txSchema = avro.schema.parse(open(schema_file).read())
schema_file = 'submit.avsc'
submitSchema = avro.schema.parse(open(schema_file).read())
schema_file = 'result.avsc'
resultSchema = avro.schema.parse(open(schema_file).read())

In [11]:
# Connect to kafka broker running in your local host (docker). Change this to your kafka broker if needed
kafka_broker = 'lab.aimet.tech:9092'

In [12]:
producer = KafkaProducer(bootstrap_servers=[kafka_broker])

In [13]:
txConsumer = KafkaConsumer(
    'transaction',
     bootstrap_servers=[kafka_broker],
     enable_auto_commit=True,
     value_deserializer=lambda x: deserialize(txSchema, x))

resultConsumer = KafkaConsumer(
    'result',
     bootstrap_servers=[kafka_broker],
     enable_auto_commit=True,
     value_deserializer=lambda x: deserialize(resultSchema, x))



In [14]:
def gen_signature(txid, payer, payee, amount, token):
    o = {'txid': txid, 'payer': payer, 'payee': payee, 'amount': amount, 'token': token}
    return hashlib.md5(json.dumps(o, sort_keys=True).encode('utf-8')).hexdigest()

In [26]:
VID = 'V859524'
TOKEN = 'aff6d129a66606c853f5bbfa74a3f796'

In [54]:
def verify():
    # get message
    msg = next(txConsumer)
    print(f"message = {msg.value}")

    # gen signature
    signature = gen_signature(
        msg.value["txid"],
        msg.value["payer"],
        msg.value["payee"],
        msg.value["amount"],
        TOKEN,
    )
    print(f"signature = {signature}")

    # send submit
    submit = {"vid": VID, "txid": msg.value["txid"], "signature": signature}
    print(f"submit = {submit}")

    data = serialize(submitSchema, submit)

    producer.send("submit", data)

    # get result
    for result_msg in resultConsumer:
        print("=================== result ===================")

        if (
            result_msg.value["txid"] == msg.value["txid"]
            and result_msg.value["vid"] == VID
        ):
            print("matched")
            for k, v in result_msg.value.items():
                print(f"{k} : {v}")
            print("==============================================")
            break
        else:
            print("not match")


verify()

message = {'txid': 'TX05014', 'payer': 'A51275', 'payee': 'A66378', 'amount': 1479}
signature = 21812b0db0878035ba7316fed7ffc435
submit = {'vid': 'V859524', 'txid': 'TX05014', 'signature': '21812b0db0878035ba7316fed7ffc435'}
matched
timestamp : 1743913057
vid : V859524
txid : TX05014
checksum : e020a7af84bd56b2fa5070191f28d965
code : 200
message : Confirm
