In [1]:
# import required libraries
from kafka import KafkaConsumer, KafkaProducer
import avro.schema
import avro.io
import io
import hashlib, json
import time

In [2]:
def serialize(schema, obj):
    bytes_writer = io.BytesIO()
    encoder = avro.io.BinaryEncoder(bytes_writer)
    writer = avro.io.DatumWriter(schema)
    writer.write(obj, encoder)
    return bytes_writer.getvalue()

In [3]:
def deserialize(schema, raw_bytes):
    bytes_reader = io.BytesIO(raw_bytes)
    decoder = avro.io.BinaryDecoder(bytes_reader)
    reader = avro.io.DatumReader(schema)
    return reader.read(decoder)

In [4]:
schema_file = 'transaction.avsc'
txschema = avro.schema.parse(open(schema_file).read())
schema_file = 'submit.avsc'
submitschema = avro.schema.parse(open(schema_file).read())
schema_file = 'result.avsc'
resultschema = avro.schema.parse(open(schema_file).read())

In [5]:
# Connect to kafka broker running in your local host (docker). Change this to your kafka broker if needed
kafka_broker = '34.87.2.170:9092'

In [6]:
producer = KafkaProducer(bootstrap_servers=[kafka_broker])

In [7]:
txconsumer = KafkaConsumer(
    'transaction',
     bootstrap_servers=[kafka_broker],
     enable_auto_commit=True,
     value_deserializer=lambda x: deserialize(txschema, x))

resultconsumer = KafkaConsumer(
    'result',
     bootstrap_servers=[kafka_broker],
     enable_auto_commit=True,
     value_deserializer=lambda x: deserialize(resultschema, x))

In [8]:
def gen_signature(txid, payer, payee, amount, token):
    o = {'txid': txid, 'payer': payer, 'payee': payee, 'amount': amount, 'token': token}
    return hashlib.md5(json.dumps(o, sort_keys=True).encode('utf-8')).hexdigest()

In [9]:
my_vid = 'V847760'
my_token = 'bb4de316af518f4884b5287575da7c27'
i=0
for mesg in txconsumer:
    if(i<5):
        print(mesg.value)
        tx_value_id = mesg.value['txid']
        tx_value_payer = mesg.value['payer']
        tx_value_payee = mesg.value['payee']
        tx_value_amount = mesg.value['amount']
        signature = gen_signature(tx_value_id,tx_value_payer,tx_value_payee, tx_value_amount, my_token)
        obj = {'txid': tx_value_id, 'vid':my_vid,'signature':signature}
        data = serialize(submitschema,obj)
        producer.send('submit',data)
        
        print(i)
        i+=1
        time.sleep(2)
        
    else:
        break

{'txid': 'TX01788', 'payer': 'A66683', 'payee': 'A04423', 'amount': 102}
0
{'txid': 'TX02853', 'payer': 'A73359', 'payee': 'A05997', 'amount': 1869}
1
{'txid': 'TX04247', 'payer': 'A13950', 'payee': 'A48914', 'amount': 1545}
2
{'txid': 'TX02711', 'payer': 'A13303', 'payee': 'A54263', 'amount': 465}
3
{'txid': 'TX00056', 'payer': 'A09313', 'payee': 'A70510', 'amount': 1052}
4


In [10]:
i=0
for mesg in resultconsumer:
    if(i<5):
        print(mesg.value)
        time.sleep(2)
        i+=1
        print(i)
    else:
        break

    

KeyboardInterrupt: 