In [40]:
# import required libraries
from kafka import KafkaConsumer, KafkaProducer
import avro.schema
import avro.io
import io
import hashlib, json

In [41]:
def serialize(schema, obj):
    bytes_writer = io.BytesIO()
    encoder = avro.io.BinaryEncoder(bytes_writer)
    writer = avro.io.DatumWriter(schema)
    writer.write(obj, encoder)
    return bytes_writer.getvalue()

In [42]:
def deserialize(schema, raw_bytes):
    bytes_reader = io.BytesIO(raw_bytes)
    decoder = avro.io.BinaryDecoder(bytes_reader)
    reader = avro.io.DatumReader(schema)
    return reader.read(decoder)

In [43]:
schema_file = 'transaction.avsc'
txschema = avro.schema.parse(open(schema_file).read())
schema_file = 'submit.avsc'
submitschema = avro.schema.parse(open(schema_file).read())
schema_file = 'result.avsc'
resultschema = avro.schema.parse(open(schema_file).read())

In [44]:
# Connect to kafka broker running in your local host (docker). Change this to your kafka broker if needed
kafka_broker = '35.240.149.229:9092'

In [45]:
producer = KafkaProducer(bootstrap_servers=[kafka_broker])

In [46]:
txconsumer = KafkaConsumer(
    'transaction',
     bootstrap_servers=[kafka_broker],
     enable_auto_commit=True,
     value_deserializer=lambda x: deserialize(txschema, x))
resultconsumer = KafkaConsumer(
    'result',
     bootstrap_servers=[kafka_broker],
     enable_auto_commit=True,
     value_deserializer=lambda x: deserialize(resultschema, x))

In [47]:
def gen_signature(txid, payer, payee, amount, token):
    o = {'txid': txid, 'payer': payer, 'payee': payee, 'amount': amount, 'token': token}
    return hashlib.md5(json.dumps(o, sort_keys=True).encode('utf-8')).hexdigest()

In [48]:
# Read a transaction from “transaction” topic using transaction.avsc schema, extract transaction information including txid, payer, payee, and amount
# Generate a signature using the transaction information and the token
# Send the signature to “submit” topic using submit.avsc schema
for msg in txconsumer:
    tx = msg.value
    print(tx)
    txid = tx['txid']
    payer = tx['payer']
    payee = tx['payee']
    amount = tx['amount']
    token = 'ab101f7dca72a6cebd6ab56b55aa2810'
    signature = gen_signature(txid, payer, payee, amount, token)
    vid = 'V226190'
    submit = {'vid':vid, 'txid': txid, 'signature': signature}
    producer.send('submit', serialize(submitschema, submit))

{'txid': 'TX01490', 'payer': 'A90971', 'payee': 'A80362', 'amount': 1571}
{'txid': 'TX09639', 'payer': 'A39845', 'payee': 'A32096', 'amount': 1714}
{'txid': 'TX04233', 'payer': 'A51792', 'payee': 'A33353', 'amount': 1181}
{'txid': 'TX02470', 'payer': 'A79946', 'payee': 'A26374', 'amount': 803}
{'txid': 'TX09525', 'payer': 'A26666', 'payee': 'A51111', 'amount': 306}
{'txid': 'TX08848', 'payer': 'A44969', 'payee': 'A86476', 'amount': 1466}
{'txid': 'TX04876', 'payer': 'A96742', 'payee': 'A47458', 'amount': 1758}
{'txid': 'TX08273', 'payer': 'A77390', 'payee': 'A80996', 'amount': 1286}
{'txid': 'TX07201', 'payer': 'A56843', 'payee': 'A47741', 'amount': 1186}
{'txid': 'TX07931', 'payer': 'A82611', 'payee': 'A77536', 'amount': 1004}
{'txid': 'TX06497', 'payer': 'A88577', 'payee': 'A16385', 'amount': 496}
{'txid': 'TX07241', 'payer': 'A51109', 'payee': 'A24267', 'amount': 204}
{'txid': 'TX01584', 'payer': 'A45932', 'payee': 'A46986', 'amount': 932}
{'txid': 'TX00979', 'payer': 'A24235', 'pay