In [4]:
{
	"namespace": "tranx.avro",
	"type": "record",
	"name": "TransactionRecord",
	"fields": [
		{"name": "ID", "type": "string"},
		{"name": "TRANSACTION_TYPE",  "type": "string"},
		{"name": "AMOUNT", "type": "int"},
        {"name": "DATE", "type": "string"},
        {"name": "CURRENCY", "type": "string"}

	]
}

3


In [None]:
# app.py
from kafka import KafkaProducer
import time
import io
import avro.schema
from avro.io import DatumWriter
from create_data import generate_transaction_data

TOPIC_NAME = 'transaction'
BROKERS = ['localhost:9092', 'localhost:9093', 'localhost:9094']
SCHEMA_PATH = "tranx.avsc"
SCHEMA = avro.schema.parse(open(SCHEMA_PATH).read())

def avro_serializer(value: dict, schema: avro.schema.Schema=SCHEMA) -> bytes:
    writer = DatumWriter(schema)
    bytes_writer = io.BytesIO()
    encoder = avro.io.BinaryEncoder(bytes_writer)

    writer.write(value, encoder)

    return bytes_writer.getvalue()

producer = KafkaProducer(bootstrap_servers = BROKERS,
                        value_serializer = lambda rows : avro_serializer(rows)
                        )

while True:
    trax_data = generate_transaction_data()

    producer.send(TOPIC_NAME, trax_data)
    print(trax_data)

    time.sleep(3)


In [None]:
# transaction_detector.py
from kafka import KafkaConsumer, KafkaProducer
import io
import avro.schema
import avro.io
from avro.io import DatumWriter

BROKERS = ['localhost:9092', 'localhost:9093', 'localhost:9094']
TRANSACTION_TOPIC_NAME = 'transaction'
LEGIT_TOPIC_NAME = 'legit'
FRAUD_TOPIC_NAME = 'fraud'
SCHEMA_PATH = "tranx.avsc"
SCHEMA = avro.schema.parse(open(SCHEMA_PATH).read())

def is_suspicious(trax_msg: dict) -> bool:
    return trax_msg['TRANSACTION_TYPE'] == 'BITCOIN' and trax_msg['AMOUNT'] >= 80

def avro_serializer(value: dict, schema: avro.schema.Schema=SCHEMA) -> bytes:
    writer = DatumWriter(schema)
    bytes_writer = io.BytesIO()
    encoder = avro.io.BinaryEncoder(bytes_writer)

    writer.write(value, encoder)

    return bytes_writer.getvalue()

def avro_deserializer(value: bytes, schema: avro.schema.Schema=SCHEMA) -> dict:
    bytes_reader = io.BytesIO(value)
    decoder = avro.io.BinaryDecoder(bytes_reader)

    reader = avro.io.DatumReader(SCHEMA)
    message = reader.read(decoder)

    return message

consumer = KafkaConsumer(TRANSACTION_TOPIC_NAME, 
                        bootstrap_servers=BROKERS,
                        value_deserializer = lambda rows: avro_deserializer(rows)
                        )

producer = KafkaProducer(bootstrap_servers = BROKERS, 
                        value_serializer = lambda rows : avro_serializer(rows)
                        )
for msg in consumer:
    message = msg.value

    target_topic = FRAUD_TOPIC_NAME if is_suspicious(message) else LEGIT_TOPIC_NAME

    producer.send(target_topic, message)

    print(message)


In [None]:
# fraud_processor.py
from kafka import KafkaConsumer
import io
import avro.schema
import avro.io

BROKERS = ['localhost:9092', 'localhost:9093', 'localhost:9094']
FRAUD_TOPIC_NAME = 'fraud'
SCHEMA_PATH = "tranx.avsc"
SCHEMA = avro.schema.parse(open(SCHEMA_PATH).read())

def avro_deserializer(value: bytes, schema: avro.schema.Schema=SCHEMA) -> dict:
    bytes_reader = io.BytesIO(value)
    decoder = avro.io.BinaryDecoder(bytes_reader)

    reader = avro.io.DatumReader(SCHEMA)
    message = reader.read(decoder)

    return message
    
consumer = KafkaConsumer(FRAUD_TOPIC_NAME, 
                        bootstrap_servers=BROKERS,
                        value_deserializer = lambda rows: avro_deserializer(rows)
                        )

for msg in consumer:
    message = msg.value

    print(f'Fraud Data!! : {message}')

In [None]:
#legit_processor.py
from kafka import KafkaConsumer
import io
import avro.schema
import avro.io

BROKERS = ['localhost:9092', 'localhost:9093', 'localhost:9094']
LEGIT_TOPIC_NAME = 'legit'
SCHEMA_PATH = "tranx.avsc"
SCHEMA = avro.schema.parse(open(SCHEMA_PATH).read())

def avro_deserializer(value: bytes, schema: avro.schema.Schema=SCHEMA) -> dict:
    bytes_reader = io.BytesIO(value)
    decoder = avro.io.BinaryDecoder(bytes_reader)

    reader = avro.io.DatumReader(SCHEMA)
    message = reader.read(decoder)

    return message
    
consumer = KafkaConsumer(LEGIT_TOPIC_NAME, 
                        bootstrap_servers=BROKERS,
                        value_deserializer = lambda rows: avro_deserializer(rows)
                        )

for msg in consumer:
    message = msg.value
    
    print(f'Legit Data : {message}')