### 1. Environment Setup

Using Docker Compose file we set up a local Kafka development environment with three main services:

- **Kafka** (`apache/kafka:4.0.0`): A single-node Kafka broker using KRaft mode (no external Zookeeper), exposing port **9092** for client communication. Data is persisted using a named volume `kafka_data`. A basic healthcheck ensures Kafka is reachable.
  
- **Kafdrop** (`obsidiandynamics/kafdrop:latest`): A web-based UI for browsing Kafka topics and messages, accessible at **http://localhost:9000**. It connects to the Kafka broker internally.

- **Darooghe Pulse**: A custom service that generates event data and sends it to Kafka, simulating realistic transaction streams with configurable parameters like event rate, fraud rate, and merchant/customer counts.

### 2. Data Ingestion Layer

After having above services running...

In [None]:
import time
import json
from datetime import datetime, timedelta
from confluent_kafka import Consumer, Producer, KafkaError
import uuid


# Kafka Consumer Implementation
broker = "localhost:29092"
topic = "darooghe.transactions"
error_topic = "darooghe.error_logs"
group_id = f"darooghe-demo-{uuid.uuid4()}"

c_config = {
    "bootstrap.servers": broker,
    "group.id": group_id,
    "auto.offset.reset": "earliest"
}

# Schema Management
class TransactionEvent:
    def __init__(self, data):
        self.transaction_id = data.get('transaction_id')
        self.merchant_id = data.get('merchant_id')
        self.timestamp = data.get('timestamp')
        self.amount = float(data.get('amount', 0))
        self.vat_amount = float(data.get('vat_amount', 0))
        self.commission_amount = float(data.get('commission_amount', 0))
        self.total_amount = float(data.get('total_amount', 0))
        self.payment_method = data.get('payment_method')
        self.device_info = data.get('device_info', {})
        self.ingestion_time = datetime.now().timestamp()
    
    def validate(self):
        errors = []
        
        # Rule 1: Amount Consistency
        if abs(self.total_amount - (self.amount + self.vat_amount + self.commission_amount)) > 0.01:
            errors.append({
                "error_code": "ERR_AMOUNT",
                "message": "Total amount does not match sum of components",
                "transaction_id": self.transaction_id,
                "expected_total": self.amount + self.vat_amount + self.commission_amount,
                "actual_total": self.total_amount
            })
        
        # Rule 2: Time Warping
        try:
            tx_time = datetime.fromtimestamp(self.timestamp)
            now = datetime.fromtimestamp(self.ingestion_time)
            
            if tx_time > now or tx_time < now - timedelta(days=1):
                errors.append({
                    "error_code": "ERR_TIME",
                    "message": "Transaction timestamp is invalid",
                    "transaction_id": self.transaction_id,
                    "tx_timestamp": self.timestamp,
                    "ingestion_timestamp": self.ingestion_time
                })
        except (TypeError, ValueError):
            errors.append({
                "error_code": "ERR_TIME",
                "message": "Invalid timestamp format",
                "transaction_id": self.transaction_id
            })
        
        # Rule 3: Device Mismatch
        if self.payment_method == "mobile":
            device_os = self.device_info.get('os', '') if isinstance(self.device_info, dict) else ''
            if device_os not in ["iOS", "Android"]:
                errors.append({
                    "error_code": "ERR_DEVICE",
                    "message": "Mobile payment method requires iOS or Android OS",
                    "transaction_id": self.transaction_id,
                    "payment_method": self.payment_method,
                    "device_os": device_os
                })
        
        return errors

# Error logging producer
def setup_producer():
    p_config = {
        "bootstrap.servers": broker,
        "client.id": f"darooghe-producer-{uuid.uuid4()}"
    }
    return Producer(p_config)

def log_error(producer, error):
    producer.produce(
        error_topic,
        key=error.get('transaction_id', str(uuid.uuid4())),
        value=json.dumps(error).encode('utf-8')
    )
    producer.flush(timeout=10)

# Main consumer loop with validation
def process_transactions():
    consumer = Consumer(c_config)
    consumer.subscribe([topic])
    
    producer = setup_producer()
    
    timeout_end = time.time() + 30
    msg_count = 0
    displayed = False
    
    while time.time() < timeout_end and msg_count <= 10:
        msg = consumer.poll(1.0)
        
        if msg and not msg.error():
            try:
                # Deserialize the message
                transaction_data = json.loads(msg.value().decode("utf-8"))
                
                # Create and validate transaction event
                transaction = TransactionEvent(transaction_data)
                validation_errors = transaction.validate()
                
                # Log any errors
                for error in validation_errors:
                    log_error(producer, error)
                
                # Display one message as required
                if not displayed:
                    print("Sample Transaction Message:")
                    print(json.dumps(transaction_data, indent=2))
                    print("\nValidation Result:")
                    if validation_errors:
                        print(f"Found {len(validation_errors)} validation errors:")
                        for err in validation_errors:
                            print(f"  - {err['error_code']}: {err['message']}")
                    else:
                        print("Transaction is valid.")
                    displayed = True
                
                msg_count += 1
                
            except json.JSONDecodeError:
                print(f"Error: Could not parse message as JSON: {msg.value().decode('utf-8')[:100]}...")
            except Exception as e:
                print(f"Error processing message: {str(e)}")
    
    consumer.close()
    print(f"Processed {msg_count} messages.")


process_transactions()

Sample Transaction Message:
{
  "transaction_id": "99df9fd3-5866-4137-a94c-74f396ed1783",
  "timestamp": "2025-04-26T12:19:10.304293Z",
  "customer_id": "cust_303",
  "merchant_id": "merch_41",
  "merchant_category": "food_service",
  "payment_method": "pos",
  "amount": 351671,
  "location": {
    "lat": 35.670073724546555,
    "lng": 51.345939574762355
  },
  "device_info": {},
  "status": "approved",
  "commission_type": "progressive",
  "commission_amount": 7033,
  "vat_amount": 31650,
  "total_amount": 390354,
  "customer_type": "CIP",
  "risk_level": 3,
  "failure_reason": null
}

Validation Result:
Found 1 validation errors:
  - ERR_TIME: Invalid timestamp format
Processed 11 messages.


In [45]:
import pandas as pd

def consume_batch(topic_name='darooghe.transactions', max_messages=100, timeout=1.0):
    consumer = Consumer(c_config)
    consumer.subscribe([topic_name])

    messages = []
    count = 0
    start_time = time.time()

    try:
        while count < max_messages:
            msg = consumer.poll(timeout)

            if msg is None:
                if time.time() - start_time > 10:
                    print("No more messages received in the last 10 seconds.")
                    break
                continue

            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    continue
                else:
                    print(f"Error: {msg.error()}")
                    break

            transaction = json.loads(msg.value().decode('utf-8'))
            messages.append(transaction)
            count += 1
            start_time = time.time()

            if count % 10 == 0:
                print(f"Received {count} messages")

    except Exception as e:
        print(f"Error consuming messages: {e}")
    finally:
        consumer.close()

    return messages

def analyze_transactions(transactions):
    if not transactions:
        print("No transactions to analyze")
        return

    df = pd.DataFrame(transactions)

    print(f"\nAnalyzed {len(transactions)} transactions:")
    print(f"Average amount: {df['amount'].mean():.2f}")
    print(f"Total value: {df['amount'].sum():.2f}")

    print("\nTransactions by status:")
    display(df['status'].value_counts())

    print("\nTransactions by merchant category:")
    display(df['merchant_category'].value_counts())

    print("\nTransactions by payment method:")
    display(df['payment_method'].value_counts())

    print("\nSample transactions:")
    display(df.head())

    return df

In [None]:
transactions = consume_batch(max_messages=50)


Received 10 messages
Received 20 messages
Received 30 messages
Received 40 messages
Received 50 messages


In [41]:
df = analyze_transactions(transactions)


Analyzed 50 transactions:
Average amount: 993896.32
Total value: 49694816.00

Transactions by status:


status
approved    48
declined     2
Name: count, dtype: int64


Transactions by merchant category:


merchant_category
government        17
food_service      11
entertainment      9
retail             8
transportation     5
Name: count, dtype: int64


Transactions by payment method:


payment_method
nfc       18
mobile    11
pos       11
online    10
Name: count, dtype: int64


Sample transactions:


Unnamed: 0,transaction_id,timestamp,customer_id,merchant_id,merchant_category,payment_method,amount,location,device_info,status,commission_type,commission_amount,vat_amount,total_amount,customer_type,risk_level,failure_reason
0,4ac9c146-dfa8-497d-9a5a-076b87cf180b,2025-04-25T22:53:27.034126Z,cust_437,merch_20,food_service,mobile,747269,"{'lat': 35.732249568878416, 'lng': 51.30363376...","{'os': 'iOS', 'app_version': '3.1.0', 'device_...",approved,flat,14945,67254,829468,CIP,1,
1,5c977ab5-326f-43a8-a82b-c4aaf8a7d9c1,2025-04-24T12:48:13.736649Z,cust_310,merch_10,entertainment,pos,1287631,"{'lat': 35.72371335657222, 'lng': 51.371968096...",{},approved,progressive,25752,115886,1429269,CIP,1,
2,1345d733-04e0-47d2-852c-7c86cf251771,2025-04-23T05:12:30.870679Z,cust_966,merch_25,food_service,mobile,806994,"{'lat': 35.7163967763196, 'lng': 51.3536577923...","{'os': 'iOS', 'app_version': '3.1.0', 'device_...",approved,flat,16139,72629,895762,business,3,
3,bf08c5f6-04ac-4496-9229-a26628366d29,2025-04-26T06:41:08.696505Z,cust_679,merch_31,food_service,mobile,131107,"{'lat': 35.74185115831934, 'lng': 51.416097137...","{'os': 'iOS', 'app_version': '3.1.0', 'device_...",declined,flat,2622,11799,145528,business,5,system_error
4,5e595db0-60e0-448e-addb-4155a0925436,2025-04-26T15:50:24.194332Z,cust_57,merch_6,transportation,nfc,508993,"{'lat': 35.73485392032605, 'lng': 51.244449328...",{},approved,tiered,10179,45809,564981,business,1,


In [None]:
from IPython.display import display, clear_output

def consume_continuous(topic_name='darooghe.transactions', timeout=1.0, max_display=10):
    consumer = Consumer(c_config)
    consumer.subscribe([topic_name])

    recent_txns = []

    try:
        print("Starting continuous consumption... (Press Ctrl+C to stop)")
        while True:
            msg = consumer.poll(timeout)

            if msg is None:
                continue

            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    continue
                else:
                    print(f"Error: {msg.error()}")
                    break

            transaction = json.loads(msg.value().decode('utf-8'))

            recent_txns.append(transaction)
            if len(recent_txns) > max_display:
                recent_txns = recent_txns[-max_display:]

            clear_output(wait=True)
            print(f"Consuming messages from {topic_name}... (Press Ctrl+C to stop)")
            display(pd.DataFrame(recent_txns))

    except KeyboardInterrupt:
        print("\nConsumption stopped by user")
    finally:
        consumer.close()

In [48]:
consume_continuous()

Consuming messages from darooghe.transactions... (Press Ctrl+C to stop)

Consumption stopped by user
