# <b><font size=7 color='orange'> Computer Assignment 2 </font></b>

<b><font color="orange" size="6">Collaborators:</font></b><br>
<font color="orange" size="5">
  <i>Golboo Rashidi - 810100148<br>
     Mehrad Liviyan - 810101501<br>
     Pooria Mahdian - 810101530
  </i>
</font>


In this project, we design and implement a real-time payment data pipeline for Darooghe, a payment service provider facilitating transactions through an online gateway, POS devices, mobile applications, and NFC (contactless) payments. The primary objectives are to ingest transaction events at scale, validate and process them in real time, detect and flag fraudulent activity, compute commission metrics, and store both raw and aggregated data for historical analysis and visualization. This architecture ensures high-throughput, low-latency handling of payment events, enabling Darooghe to monitor transaction flows and business insights continuously

## Core Concepts Behind Building a Real-Time Data Analytics Pipeline
Building a real-time data analytics pipeline means designing a system that can ingest, process, and analyze data as soon as it arrives — without delay. These systems are used in finance, fraud detection, social media, recommendation engines, and more.Here are the core concepts used in this project:

1.   ### Data Ingestion (Streaming Input):
Real-time systems need to capture data as it happens.In this project, Apache Kafka is used to continuously ingest transaction events.

2.  ### Stream Processing:
 Instead of processing all the data at once (batch), stream processors handle micro-batches or individual events in real time.This enables timely detection of patterns, anomalies, or business insights.


3.   ### Real-Time Processing Engine(Why PySpark?):
  Apache Spark Structured Streaming enables scalable, fault-tolerant, and distributed stream processing. It supports windowed operations (e.g., "last 5 minutes") and joins with historical data. PySpark gives access to Spark's power through Python — making it more accessible to data science workflows.In this project, PySpark is used for both batch processing (analyzing historical MongoDB data) and Streaming processing (real-time Kafka data processing), which makes it a unified tool across your pipeline.

4.   ### Fault Tolerance:
  Real-time systems must be resilient to failures.Kafka allows replaying events by using consumer offsets.Spark Streaming uses checkpointing to recover the last state in case of crashes.


5.   ### Data Storage and Access:
  Cleaned and validated data must be persisted efficiently.NoSQL databases like MongoDB are ideal for storing semi-structured JSON-like data. Also, we can later query or visualize the stored results easily.

6.   ### Event-Driven Architecture:
  Each transaction can trigger further actions:

  - Fraud detection,

  - Notifications,

  - Dashboard updates.

  This is done through Kafka topics like fraud_alerts, error_logs, etc.


7.   ### Low Latency & High Throughput:
  The pipeline is designed to process millions of transactions per day with minimal delay.




### Apache Kafka
Apache Kafka is an open-source distributed event streaming platform used here as the ingestion layer. It provides a durable, fault-tolerant backbone for capturing high volumes of transaction events from a synthetic transaction generator. Transactions are published to Kafka topics (darooghe.transactions), where they can be consumed by downstream systems. Kafka’s scalability and partitioning model ensure that our pipeline can handle spikes in transaction rates with minimal latency

<div style="text-align: center;">
    <img src="additional_files\image1.png" alt="Roulette Wheel" style="width: 50%;">
</div>

## Why Kafka?
* ### Kafka as a Real-Time Data Ingestion Layer:
  Kafka acts as a buffer between data producers and consumers.The transaction generator continuously produces synthetic payment events.Kafka stores and streams these events in real time to any consumer that subscribes.


* ### Scalable Message Queuing:
  Kafka efficiently handles high-throughput workloads (thousands of messages per second). It also allows multiple consumers to read from them in parallel.

* ### Fault Tolerance and Replay:
   Kafka provides durable storage and replayability.If Spark Streaming job crashes or restarts, it can resume reading from where it left off using offset tracking.


* ### Decoupling Validation and Storage
  Kafka enables a validation-first design.The system validates transactions on the fly.Valid ones go into MongoDB and Invalid ones go into a separate topic(darooghe.error_logs).This modularity would be difficult if all data were dumped into a database up front.

* ###  Real-Time Fraud Alerts:
  Kafka also powers real-time alerts by sending suspicious transaction data (e.g., velocity, geographic anomalies) into the darooghe.fraud_alerts topic.


### If a database were used instead of Kafka for real-time ingestion:
Real-time processing would become harder, as databases are better suited for querying and storage, not streaming.Also, scalability and fault tolerance would decrease.



### Apache Spark
Apache Spark is a unified analytics engine for large-scale data processing. In this project, we employ both Spark Streaming for real-time (micro-batch) processing of incoming Kafka events and PySpark for batch analytics on stored data. Spark Streaming reads from Kafka, performs windowed aggregations, applies fraud-detection rules, and calculates commission metrics on the fly. PySpark batch jobs analyze historical transaction data to generate reports on commission efficiency, temporal patterns, and customer segmentation


<div style="text-align: center;">
    <img src="additional_files\Picture5-2.png" alt="Roulette Wheel" style="width: 50%;">
</div>

### MongoDB
MongoDB is a NoSQL, document-oriented database chosen for its flexible schema and horizontal scalability. We use MongoDB to store validated transactions and aggregated insights. A partitioning strategy (e.g., by date or merchant) is employed to optimize query performance and data retention policies. Aggregated collections (daily, weekly, monthly summaries) support efficient historical analysis and dashboard visualizations without the overhead of re-processing raw event streams

## Environment Setup

### Goals
| Component | Why we need it | Proof of success |
|-----------|----------------|------------------|
| **Kafka + ZooKeeper** | Buffer high-volume events & allow replay | `kafka-topics.sh --list` shows project topics |
| **Kafdrop** | Visual monitoring of topics, offsets, payloads | Open `http://localhost:9000` |
| **Spark 3.5.5** | Distributed batch + stream analytics | `spark-submit --version` prints cleanly |
| **MongoDB 4.4** | Durable storage & fast aggregates | `mongo --eval 'db.stats()'` returns without error |
| **Python venv** | Isolated, reproducible dependencies | `(venv)` prompt active |


### Host Specification
| Item | Value |
|------|-------|
| OS | Windows 11  |
| Java | OpenJDK 21 |
| Kafka | 3.7.0 (Scala 2.13) |
| Spark | 3.5.5 |
| MongoDB | 4.4 |
| Python | 3.12 |




Mongodb running command: sudo docker run -d --name mongodb -p 27017:27017 -v mongodb-data:/data/db mongo:4.4

Mongodb stop last time command: sudo docker rm -f mongodb

Kafka zookeeper running command: .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties on C:\kafka

Kafka running command: .\bin\windows\kafka-server-start.bat .\config\server.properties on C:\kafka

kafka fix: rm \tmp

kafdrop running: java -jar kafdrop-4.1.0.jar --kafka.brokerConnect=localhost:9092 --server.port=9000 on D:\onedrive\desktop

## Data Ingestion Layer

### Kafka Consumer Implementation

#### I.​ Implement a Kafka consumer to connect to the Kafka server and read from the darooghe.transactions topic.

##### first import needed libraries

In [None]:
from confluent_kafka import Consumer, KafkaException, KafkaError
import json

  This section initializes the Kafka consumer configuration using the confluent_kafka library. It specifies connection parameters such as the Kafka server (localhost:9092) and the consumer group ID (darooghe-consumer-group). The consumer is then subscribed to the darooghe.transactions topic, which streams real-time transaction events.

In [None]:
conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'darooghe-consumer-group',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': True
}

consumer = Consumer(conf)
consumer.subscribe(['darooghe.transactions'])

This part performs a test read from Kafka, showing one transaction from the stream to ensure that Kafka is producing data correctly and to verify the structure of the data before implementing validation or storage logic.

In [None]:

print("Connected to Kafka. Waiting for messages...")

try:
    while True:
        msg = consumer.poll(1.0)
        if msg is None:
            continue
        if msg.error():
            raise KafkaException(msg.error())

        transaction = json.loads(msg.value().decode('utf-8'))
        print("Sample Transaction:")
        print(json.dumps(transaction, indent=4))
        break

except KeyboardInterrupt:
    print("Interrupted by user")

finally:
    consumer.close()

Connected to Kafka. Waiting for messages...
Sample Transaction:
{
    "transaction_id": "217ab5d9-2fc4-40ad-ab95-9c6ec0b88fe1",
    "timestamp": "2025-05-01T10:13:59.419427Z",
    "customer_id": "cust_249",
    "merchant_id": "merch_5",
    "merchant_category": "retail",
    "payment_method": "online",
    "amount": 517891,
    "location": {
        "lat": 35.70778534881133,
        "lng": 51.431322572644795
    },
    "device_info": {
        "os": "Android",
        "app_version": "2.4.1",
        "device_model": "Samsung Galaxy S25"
    },
    "status": "approved",
    "commission_type": "progressive",
    "commission_amount": 10357,
    "vat_amount": 46610,
    "total_amount": 574858,
    "customer_type": "CIP",
    "risk_level": 5,
    "failure_reason": null
}


#### II. Implement proper deserialization and data validation. Validate each transaction against these basic business rules below. Write all transactions that have been determined as invalid, into the topic darooghe.error_logs. Include the error code, with relevant data, and obviously, the ID of the transaction in the messages you write into darooghe.error_logs topic.

This part defines three specific validation functions:

The **AmountConsistency** function verifies that the total_amount equals the sum of the base amount, VAT, and commission.

**TimeWarping** ensures the transaction timestamp is not in the future or more than a day old.

**DeviceMismatch** checks that if the payment method is "mobile", the operating system is either iOS or Android. These checks are then used inside the validate_transaction function, which returns a list of error codes for any rules the transaction fails.

In [None]:
from datetime import datetime, timedelta, timezone

In [None]:
def AmountConsistency(tx):
    return not abs(tx['total_amount'] - (tx['amount'] + tx['vat_amount'] + tx['commission_amount'])) < 1

def TimeWarping(tx):
    tx_time = datetime.fromisoformat(tx['timestamp'].replace("Z", "+00:00"))
    now = datetime.now(timezone.utc)
    return tx_time > now or tx_time < (now - timedelta(days=1))

def DeviceMismatch(tx):
    if tx['payment_method'] == "mobile":
        os = tx.get("device_info", {}).get("os", "")
        if os not in ["iOS", "Android"]:
            return True

    return False

def validate_transaction(tx):
    errors = []

    if AmountConsistency(tx):
        errors.append("ERR_AMOUNT")

    if TimeWarping(tx):
        errors.append("ERR_TIME")

    if DeviceMismatch(tx):
        errors.append("ERR_DEVICE")

    return errors

##### now we will validate each message when we get and also put thast on the darooghe.error_logs topic with a producer:

###### import needed libraries

In [None]:
from confluent_kafka import Consumer, KafkaException, KafkaError, Producer
import json

Here we configure and initialize both the Kafka consumer and producer. The consumer connects to the darooghe.transactions topic to receive transaction data in real time.

In [None]:
conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'darooghe-consumer-group',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': True
}

consumer = Consumer(conf)
producer = Producer({'bootstrap.servers': 'localhost:9092'})

consumer.subscribe(['darooghe.transactions'])

This block is the main loop that continuously consumes messages from Kafka, decodes each transaction, and runs it through the validation functions. If a transaction fails any validation rule, it is wrapped in an error log with the transaction ID and specific error codes, then sent to the darooghe.error_logs Kafka topic via the producer. Valid transactions are simply logged as valid.

In [None]:
print("Connected to Kafka. Waiting for messages...")

try:
    while True:
        msg = consumer.poll(1.0)
        if msg is None:
            continue
        if msg.error():
            raise KafkaException(msg.error())
        transaction = json.loads(msg.value().decode('utf-8'))
        print(f"Processing transaction {transaction['transaction_id']}...")
        errors = validate_transaction(transaction)
        if errors:
            error_message = {
                "transaction_id": transaction["transaction_id"],
                "errors": errors,
                "raw_data": transaction
            }
            producer.produce('darooghe.error_logs', value=json.dumps(error_message).encode('utf-8'))
            producer.flush()
            print(f"Invalid transaction: {transaction['transaction_id']} - Errors: {errors}")
        else:
            print(f"Valid transaction: {transaction['transaction_id']}")

except KeyboardInterrupt:
    print("Interrupted by user")

finally:
    consumer.close()


Connected to Kafka. Waiting for messages...
Processing transaction cf7f3120-74f2-4c70-81ce-82399bef2448...
Valid transaction: cf7f3120-74f2-4c70-81ce-82399bef2448
Processing transaction 52b4262b-9322-4e57-a40c-e589983dd425...
Valid transaction: 52b4262b-9322-4e57-a40c-e589983dd425
Processing transaction 0ddc0e4b-40e2-4dc4-89dd-745ceda5b44d...
Valid transaction: 0ddc0e4b-40e2-4dc4-89dd-745ceda5b44d
Processing transaction 9015fbcd-1369-47f6-a9ef-01bce29a01db...
Valid transaction: 9015fbcd-1369-47f6-a9ef-01bce29a01db
Processing transaction b92c36a1-9c68-47ee-b4ec-87545dd1fa41...
Valid transaction: b92c36a1-9c68-47ee-b4ec-87545dd1fa41
Processing transaction 24b98242-8650-4e84-9b9c-29bdb2d7e49f...
Valid transaction: 24b98242-8650-4e84-9b9c-29bdb2d7e49f
Processing transaction 755efe95-97bb-4558-8fce-d0aae7e48a60...
Valid transaction: 755efe95-97bb-4558-8fce-d0aae7e48a60
Processing transaction dbd2eb2b-39a5-474e-9e20-52876f9f3d74...
Valid transaction: dbd2eb2b-39a5-474e-9e20-52876f9f3d74
Proc

#### III. Display one message

In [None]:
from confluent_kafka import Consumer, KafkaException, KafkaError
import json
conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'darooghe-consumer-group',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': True
}

consumer = Consumer(conf)

consumer.subscribe(['darooghe.transactions'])
print("Connected to Kafka. Waiting for messages...")

try:
    while True:
        msg = consumer.poll(1.0)
        if msg is None:
            continue
        if msg.error():
            raise KafkaException(msg.error())
        transaction = json.loads(msg.value().decode('utf-8'))
        print(f"Processing transaction {transaction['transaction_id']}...")
        errors = validate_transaction(transaction)
        if errors:
            error_message = {
                "transaction_id": transaction["transaction_id"],
                "errors": errors,
                "raw_data": transaction
            }
            print(f"Invalid transaction: {transaction['transaction_id']} - Errors: {errors}")
            print(json.dumps(transaction, indent=4))
        else:
            print(f"Valid transaction: {transaction['transaction_id']}")
            print(json.dumps(transaction, indent=4))
        break

except KeyboardInterrupt:
    print("Interrupted by user")

finally:
    consumer.close()


Connected to Kafka. Waiting for messages...
Processing transaction 76564da1-354d-4d31-a36b-44e98e6e41e8...
Valid transaction: 76564da1-354d-4d31-a36b-44e98e6e41e8
{
    "transaction_id": "76564da1-354d-4d31-a36b-44e98e6e41e8",
    "timestamp": "2025-05-01T10:20:50.023082Z",
    "customer_id": "cust_266",
    "merchant_id": "merch_14",
    "merchant_category": "transportation",
    "payment_method": "mobile",
    "amount": 521464,
    "location": {
        "lat": 35.6842317694648,
        "lng": 51.267827449531104
    },
    "device_info": {
        "os": "iOS",
        "app_version": "3.1.0",
        "device_model": "iPhone 15"
    },
    "status": "approved",
    "commission_type": "flat",
    "commission_amount": 10429,
    "vat_amount": 46931,
    "total_amount": 578824,
    "customer_type": "individual",
    "risk_level": 2,
    "failure_reason": null
}


### Schema Management

#### I. Create appropriate data structures for transaction events.

This part defines a TransactionEvent data class that encapsulates the structure of a payment transaction using Python's dataclasses module. Each attribute of the transaction—like transaction_id, timestamp, amount, commission_amount, device_info, and others—is explicitly typed. This structured representation simplifies validation, parsing, and transformation tasks across the pipeline.

In [None]:
from dataclasses import dataclass
from typing import Optional


@dataclass
class TransactionEvent:
    transaction_id: str
    timestamp: datetime
    customer_id: str
    merchant_id: str
    merchant_category: str
    payment_method: str
    amount: int
    location_lat: float
    location_lng: float
    device_info: Optional[dict]
    status: str
    commission_type: str
    commission_amount: int
    vat_amount: int
    total_amount: int
    customer_type: str
    risk_level: int
    failure_reason: Optional[str]


#### II. Implement type conversion and validation.
**Parse_transaction** function takes in a raw dictionary (typically from a Kafka message) and attempts to convert it into a **TransactionEvent** object. It handles type casting and catches the error and returns None if any required fields are missing or malformed.

In [None]:
def parse_transaction(raw_event):
    try:
        return TransactionEvent(
            transaction_id=str(raw_event['transaction_id']),
            timestamp=datetime.fromisoformat(raw_event['timestamp'].replace('Z', '')),
            customer_id=str(raw_event['customer_id']),
            merchant_id=str(raw_event['merchant_id']),
            merchant_category=str(raw_event['merchant_category']),
            payment_method=str(raw_event['payment_method']),
            amount=int(raw_event['amount']),
            location_lat=float(raw_event['location']['lat']),
            location_lng=float(raw_event['location']['lng']),
            device_info=raw_event.get('device_info', None),
            status=str(raw_event['status']),
            commission_type=str(raw_event['commission_type']),
            commission_amount=int(raw_event['commission_amount']),
            vat_amount=int(raw_event['vat_amount']),
            total_amount=int(raw_event['total_amount']),
            customer_type=str(raw_event['customer_type']),
            risk_level=int(raw_event['risk_level']),
            failure_reason=raw_event.get('failure_reason', None)
        )
    except (KeyError, ValueError, TypeError) as e:
        print(f"Error parsing transaction: {e}")
        return None

Here we implement rule-based transaction validation functions (AmountConsistency, TimeWarping, and DeviceMismatch) that operate on the structured TransactionEvent objects.

In [None]:
from datetime import datetime, timezone, timedelta
from typing import List

def AmountConsistency(tx: TransactionEvent) -> bool:
    expected_total = tx.amount + tx.vat_amount + tx.commission_amount
    return abs(tx.total_amount - expected_total) >= 1

def TimeWarping(tx: TransactionEvent) -> bool:
    tx_time = tx.timestamp if tx.timestamp.tzinfo else tx.timestamp.replace(tzinfo=timezone.utc)
    now = datetime.now(timezone.utc)
    return tx_time > now or tx_time < (now - timedelta(days=1))

def DeviceMismatch(tx: TransactionEvent) -> bool:
    if tx.payment_method == "mobile":
        os = tx.device_info.get("os") if tx.device_info else None
        return os not in ["iOS", "Android"]
    return False

def validate_transaction(tx: TransactionEvent) -> List[str]:
    errors = []
    if AmountConsistency(tx):
        errors.append("ERR_AMOUNT")
    if TimeWarping(tx):
        errors.append("ERR_TIME")
    if DeviceMismatch(tx):
        errors.append("ERR_DEVICE")
    return errors


In [None]:
from confluent_kafka import Consumer, KafkaException, KafkaError
import json
conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'darooghe-consumer-group',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': True
}

consumer = Consumer(conf)
consumer.subscribe(['darooghe.transactions'])
print("Connected to Kafka. Waiting for messages...")

try:
    while True:
        msg = consumer.poll(1.0)
        if msg is None:
            continue

        if msg.error():
            raise KafkaException(msg.error())

        transaction_raw_data = json.loads(msg.value().decode('utf-8'))
        transaction = parse_transaction(transaction_raw_data)
        if transaction is None:
            print("Skipping invalid transaction data.")
            continue

        errors = validate_transaction(transaction)
        if errors:
            error_message = {
                "transaction_id": transaction.transaction_id,
                "errors": errors,
                "raw_data": transaction_raw_data
            }
            print(f"Invalid transaction: {transaction.transaction_id} - Errors: {errors}")
            print(json.dumps(error_message, indent=4))
        else:
            print(f"Valid transaction: {transaction.transaction_id}")
            print(json.dumps(transaction_raw_data, indent=4))

        break

except KeyboardInterrupt:
    print("Interrupted by user")

finally:
    consumer.close()

Connected to Kafka. Waiting for messages...
Valid transaction: 94a6dc8e-4756-4263-9ac0-7ccf0a985ac3
{
    "transaction_id": "94a6dc8e-4756-4263-9ac0-7ccf0a985ac3",
    "timestamp": "2025-05-01T10:20:50.030484Z",
    "customer_id": "cust_423",
    "merchant_id": "merch_30",
    "merchant_category": "retail",
    "payment_method": "nfc",
    "amount": 1074582,
    "location": {
        "lat": 35.73491902790553,
        "lng": 51.39724902573528
    },
    "device_info": {},
    "status": "approved",
    "commission_type": "flat",
    "commission_amount": 21491,
    "vat_amount": 96712,
    "total_amount": 1192785,
    "customer_type": "CIP",
    "risk_level": 3,
    "failure_reason": null
}


## Batch Processing Layer

### store samples on a json file:

In [None]:
from confluent_kafka import Consumer
import json

consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'batch-job-consumer',
    'auto.offset.reset': 'earliest',
})
consumer.subscribe(['darooghe.transactions'])

NUM_OF_SAMPLES = 1000


with open('transactions_data.json', 'w', encoding='utf-8') as f:
    # f.write("[")
    count = 0
    while count < NUM_OF_SAMPLES:
        msg = consumer.poll(1.0)
        if msg is None:
            continue
        if msg.error():
            print(f"Kafka error: {msg.error()}")
            continue

        print(count)

        transaction_raw_data = json.loads(msg.value().decode('utf-8'))
        transaction = parse_transaction(transaction_raw_data)
        if transaction is None:
            print("Skipping invalid transaction data.")
            continue

        errors = validate_transaction(transaction)
        if errors:
            error_message = {
                "transaction_id": transaction.transaction_id,
                "errors": errors,
                "raw_data": transaction_raw_data
            }
            print(f"Invalid transaction: {transaction.transaction_id} - Errors: {errors}")
            print(json.dumps(error_message, indent=4))
            continue

        try:
            # if count == NUM_OF_SAMPLES-1:
            #     raw_event = json.loads(msg.value().decode('utf-8'))
            #     f.write(json.dumps(raw_event, indent=4) + '\n')
            # else:
            #     raw_event = json.loads(msg.value().decode('utf-8'))
            #     f.write(json.dumps(raw_event, indent=4) + ',' + '\n')
            raw_event = json.loads(msg.value().decode('utf-8'))
            f.write(json.dumps(raw_event) + '\n')
            count += 1
        except json.JSONDecodeError as e:
            print(f"Invalid JSON message: {e}")
            continue
    # f.write("]")
consumer.close()
print("Done writing " + str(NUM_OF_SAMPLES) + " messages to transactions_data.json")


0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
27

### Commission Analysis Batch Job

#### I. Develop batch processing jobs that query aggregated data to generate reports on commission efficiency by merchant category. For example, calculate total commissions, average commissions per transaction, and commission-to-transaction ratios.

##### setup spark

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("CommissionAnalysisJob") \
    .getOrCreate()


Load the json file

In [None]:
df = spark.read.json("transactions_data.json")
df.printSchema()
df.show(5)
df.select("device_info").show(5, truncate=False)


root
 |-- amount: long (nullable = true)
 |-- commission_amount: long (nullable = true)
 |-- commission_type: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- customer_type: string (nullable = true)
 |-- device_info: struct (nullable = true)
 |    |-- app_version: string (nullable = true)
 |    |-- device_model: string (nullable = true)
 |    |-- os: string (nullable = true)
 |-- failure_reason: string (nullable = true)
 |-- location: struct (nullable = true)
 |    |-- lat: double (nullable = true)
 |    |-- lng: double (nullable = true)
 |-- merchant_category: string (nullable = true)
 |-- merchant_id: string (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- risk_level: long (nullable = true)
 |-- status: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- total_amount: long (nullable = true)
 |-- transaction_id: string (nullable = true)
 |-- vat_amount: long (nullable = true)

+-------+-----------------+---------------+---

This part analyzes a batch of data grouped by merchant_category. It calculates key metrics including the total and average commission, the total number of transactions, and the ratio of commission to transaction amount (rounded to 4 decimal places).

In [None]:
from pyspark.sql.functions import col, sum as _sum, avg, count, round

commission_stats = df.groupBy("merchant_category").agg(
    _sum("commission_amount").alias("total_commission"),
    avg("commission_amount").alias("avg_commission"),
    count("*").alias("transaction_count"),
    round((_sum("commission_amount") / _sum("amount")), 4).alias("commission_to_amount_ratio")
)
commission_stats.show()

+-----------------+----------------+------------------+-----------------+--------------------------+
|merchant_category|total_commission|    avg_commission|transaction_count|commission_to_amount_ratio|
+-----------------+----------------+------------------+-----------------+--------------------------+
|           retail|         3978145| 20400.74358974359|              195|                      0.02|
|    entertainment|         4053783|           21335.7|              190|                      0.02|
|     food_service|         4660409|20712.928888888888|              225|                      0.02|
|       government|         4238092|20980.653465346535|              202|                      0.02|
|   transportation|         3915657|20827.962765957447|              188|                      0.02|
+-----------------+----------------+------------------+-----------------+--------------------------+



#### II. Identify optimal commission structures, use historical commission data to analyze trends, and simulate different commission models. The job should identify optimal structures by comparing historical performance and profitability across various merchant categories.


This section creates a new column called simulated_commission_2_5_percent in the DataFrame by applying a flat 2.5% rate on each transaction's amount.

In [None]:
from pyspark.sql.functions import expr

df_with_simulated = df.withColumn(
    "simulated_commission_2_5_percent",
    expr("amount * 0.025")
)


This part aggregates and compares actual commission earnings against the simulated 2.5% model, grouped by merchant_category.

In [None]:
comparison = df_with_simulated.groupBy("merchant_category").agg(
    _sum("commission_amount").alias("total_actual_commission"),
    _sum("simulated_commission_2_5_percent").alias("total_simulated_commission"),
    count("*").alias("transaction_count"),
    round((_sum("commission_amount") / _sum("amount")), 4).alias("actual_commission_ratio"),
    round((_sum("simulated_commission_2_5_percent") / _sum("amount")), 4).alias("simulated_commission_ratio")
)

comparison.show(truncate=False)


+-----------------+-----------------------+--------------------------+-----------------+-----------------------+--------------------------+
|merchant_category|total_actual_commission|total_simulated_commission|transaction_count|actual_commission_ratio|simulated_commission_ratio|
+-----------------+-----------------------+--------------------------+-----------------+-----------------------+--------------------------+
|retail           |3978145                |4972802.925               |195              |0.02                   |0.0250                    |
|entertainment    |4053783                |5067351.475               |190              |0.02                   |0.0250                    |
|food_service     |4660409                |5825643.625               |225              |0.02                   |0.0250                    |
|government       |4238092                |5297737.900               |202              |0.02                   |0.0250                    |
|transportation   |3

as we can see if we increase the percatge of commision, the commision increases.

Here 3 different commission approaches are used for comparison:
* flat_commission

* progressive_commission

* tiered_commission



In [None]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf

df_with_simulated = df.withColumn(
    "simulated_commission_2_5_percent",
    expr("amount * 0.025")
)
simulated = df.withColumn("flat_commission", expr("amount * 0.03")) \
              .withColumn("progressive_commission", expr("CASE WHEN amount > 1000 THEN amount * 0.05 ELSE amount * 0.02 END")) \
              .withColumn("tiered_commission", expr("CASE WHEN amount > 2000 THEN amount * 0.08 ELSE amount * 0.04 END"))



In [None]:
results = simulated.groupBy("merchant_category").agg(
    _sum("commission_amount").alias("actual_total_commission"),
    _sum("flat_commission").alias("sim_flat_total"),
    _sum("progressive_commission").alias("sim_progressive_total"),
    _sum("tiered_commission").alias("sim_tiered_total")
)

results.show(truncate=False)


+-----------------+-----------------------+--------------+---------------------+----------------+
|merchant_category|actual_total_commission|sim_flat_total|sim_progressive_total|sim_tiered_total|
+-----------------+-----------------------+--------------+---------------------+----------------+
|retail           |3978145                |5967363.51    |9945605.85           |15912969.36     |
|entertainment    |4053783                |6080821.77    |10134702.95          |16215524.72     |
|food_service     |4660409                |6990772.35    |11651287.25          |18642059.60     |
|government       |4238092                |6357285.48    |10595475.80          |16952761.28     |
|transportation   |3915657                |5873622.57    |9789370.95           |15662993.52     |
+-----------------+-----------------------+--------------+---------------------+----------------+



we can see that ***tiered*** model have best commission.

### Transaction Pattern Analysis

#### I. Discover and report on temporal patterns in transaction data

let see if transactions happen more in some times of a day or not:

This section prepares the DataFrame for temporal analysis by casting the timestamp column into a Spark Timestamp type and extracting time-based features like hour, day_of_week, month, and day_of_month. It then groups transactions by each of these temporal dimensions and aggregates the total transaction amount to uncover patterns in when users spend money.

In [None]:
from pyspark.sql.functions import col, hour, dayofweek, month, date_format

df = df.withColumn("timestamp", col("timestamp").cast("timestamp"))

df = df.withColumn("hour", hour(col("timestamp"))) \
                 .withColumn("day_of_week", dayofweek(col("timestamp"))) \
                 .withColumn("month", month(col("timestamp"))) \
                 .withColumn("day_of_month", date_format(col("timestamp"), "d"))

df.show(truncate=False)


+-------+-----------------+---------------+-----------+-------------+------------------------------------+------------------+----------------------------------------+-----------------+-----------+--------------+----------+--------+--------------------------+------------+------------------------------------+----------+----+-----------+-----+------------+
|amount |commission_amount|commission_type|customer_id|customer_type|device_info                         |failure_reason    |location                                |merchant_category|merchant_id|payment_method|risk_level|status  |timestamp                 |total_amount|transaction_id                      |vat_amount|hour|day_of_week|month|day_of_month|
+-------+-----------------+---------------+-----------+-------------+------------------------------------+------------------+----------------------------------------+-----------------+-----------+--------------+----------+--------+--------------------------+------------+-----------------

for sum of amount of transaction based on hour:

In [None]:
temporal_patterns = df.groupBy("hour").agg({"amount": "sum"}).withColumnRenamed("sum(amount)", "total_transactions")
temporal_patterns.show()

+----+------------------+
|hour|total_transactions|
+----+------------------+
|  13|        1042328856|
+----+------------------+



In [None]:
temporal_patterns = df.groupBy("day_of_week").agg({"amount": "sum"}).withColumnRenamed("sum(amount)", "total_transactions")
temporal_patterns.show()

+-----------+------------------+
|day_of_week|total_transactions|
+-----------+------------------+
|          5|        1042328856|
+-----------+------------------+



In [None]:
temporal_patterns = df.groupBy("day_of_month").agg({"amount": "sum"}).withColumnRenamed("sum(amount)", "total_transactions")
temporal_patterns.show()

+------------+------------------+
|day_of_month|total_transactions|
+------------+------------------+
|           1|        1042328856|
+------------+------------------+



In [None]:
temporal_patterns = df.groupBy("month").agg({"amount": "sum"}).withColumnRenamed("sum(amount)", "total_transactions")
temporal_patterns.show()

+-----+------------------+
|month|total_transactions|
+-----+------------------+
|    5|        1042328856|
+-----+------------------+



#### II. Identify peak transaction times based on historical data.

we can easily make a query like the previos part but this time not on sum of amount, this time on count transactions.

we will do that for hour and day of week:

This segment aggregates transaction data by customer_id, computing the total number of transactions (transaction_count) and total money spent (total_spend) per customer. It creates a profile of each customer’s activity level and financial engagement with the platform.

In [None]:
hourly_transactions = df.groupBy("hour")\
    .agg(count("transaction_id") \
    .alias("transaction_count")) \
    .orderBy("hour")

daily_transactions = df.groupBy("day_of_week")\
    .agg(count("transaction_id")\
    .alias("transaction_count")) \
    .orderBy("day_of_week")


hourly_transactions.show()
daily_transactions.show()

+----+-----------------+
|hour|transaction_count|
+----+-----------------+
|  13|             1000|
+----+-----------------+

+-----------+-----------------+
|day_of_week|transaction_count|
+-----------+-----------------+
|          5|             1000|
+-----------+-----------------+



In [None]:
# import matplotlib.pyplot as plt

# # Collect the hourly transactions data to plot
# hour_data = hourly_transactions.collect()
# hours = [row['hour'] for row in hour_data]
# total_transactions = [row['total_transactions'] for row in hour_data]

# # Plot the data
# plt.figure(figsize=(10, 6))
# plt.plot(hours, total_transactions, marker='o')
# plt.title('Peak Transaction Times by Hour')
# plt.xlabel('Hour of the Day')
# plt.ylabel('Total Transactions')
# plt.xticks(range(24))  # 24 hours
# plt.grid(True)
# plt.show()


#### III. Segment customers based on spending frequency and patterns.

First of all we should compute 2 things base of customers:

    1. how many transaction each customer have
    
    2. how much each customer spent

we will group by our data based on customer's id:

In [None]:
customer_stats = df.groupBy("customer_id") \
    .agg(
        count("transaction_id").alias("transaction_count"),
        _sum("amount").alias("total_spend")
    )

customer_stats.show()

+-----------+-----------------+-----------+
|customer_id|transaction_count|total_spend|
+-----------+-----------------+-----------+
|     cust_2|                1|    1517474|
|   cust_655|                1|    1494397|
|   cust_445|                5|    6506917|
|   cust_197|                1|     186485|
|   cust_478|                1|    1545273|
|   cust_246|                1|     207122|
|   cust_349|                2|    2201181|
|   cust_337|                2|    1779002|
|    cust_43|                1|    1566307|
|   cust_501|                1|    1083271|
|   cust_862|                1|    1358890|
|   cust_705|                4|    4797441|
|   cust_866|                1|     978616|
|    cust_47|                1|    1268715|
|   cust_852|                2|    2631002|
|   cust_578|                3|    4185562|
|   cust_903|                1|     367370|
|   cust_460|                1|      64654|
|   cust_811|                1|     850424|
|   cust_563|                1| 

In this part, we categorize customers into different segments based on two key factors:

* Spending Level: How much money a customer has spent in total.

* Shopping Frequency: How often the customer has made a transaction.



1. Spending Level (High Spend vs Low Spend)
We classify customers into two groups based on how much they’ve spent:

    * High Spend: Customers who have spent more than 1,000,000 Toman.

    * Low Spend: Customers who have spent 1,000,000 Toman or less.

2. Shopping Frequency (Frequent Shopper vs Infrequent Shopper)
Next, we look at how often each customer shops. We categorize them as:

    * Frequent Shopper: Customers who have made more than 10 transactions.

    * Infrequent Shopper: Customers who have made 10 or fewer transactions.

In [None]:
from pyspark.sql.functions import when

high_spend_threshold = 1000000
frequent_shopper_threshold = 10

segmented_customers = customer_stats.withColumn(
    "spending_category",

    when(col("total_spend") > high_spend_threshold, "High Spend")
     .otherwise("Low Spend")
)

segmented_customers = segmented_customers.withColumn(
    "shopping_frequency",
    when(col("transaction_count") > frequent_shopper_threshold, "Frequent Shopper")
     .otherwise("Infrequent Shopper")
)

segmented_customers.show()

+-----------+-----------------+-----------+-----------------+------------------+
|customer_id|transaction_count|total_spend|spending_category|shopping_frequency|
+-----------+-----------------+-----------+-----------------+------------------+
|     cust_2|                1|    1517474|       High Spend|Infrequent Shopper|
|   cust_655|                1|    1494397|       High Spend|Infrequent Shopper|
|   cust_445|                5|    6506917|       High Spend|Infrequent Shopper|
|   cust_197|                1|     186485|        Low Spend|Infrequent Shopper|
|   cust_478|                1|    1545273|       High Spend|Infrequent Shopper|
|   cust_246|                1|     207122|        Low Spend|Infrequent Shopper|
|   cust_349|                2|    2201181|       High Spend|Infrequent Shopper|
|   cust_337|                2|    1779002|       High Spend|Infrequent Shopper|
|    cust_43|                1|    1566307|       High Spend|Infrequent Shopper|
|   cust_501|               

For example, we can see that cust_705 is categorized as a low spender, so we will send fewer messages to them.

After applying these rules, we have a new table where each customer is labeled as:

Their spending category (High Spend / Low Spend).

Their shopping frequency (Frequent Shopper / Infrequent Shopper).

This helps businesses understand which customers are spending more and shopping more often, enabling them to target marketing efforts more effectively.

#### IV. Compare transaction behavior across different merchant categories.

we can see which merchant category have more num of transactions or which of them spent more money on it and if the average amount is high, it means that it's products or services are expenssive.

In [None]:
merchant_category_behavior = df.groupBy("merchant_category") \
    .agg(
        _sum("amount").alias("total_spend"),
        count("transaction_id").alias("transaction_count"),
        avg("amount").alias("avg_transaction_value")
    )

merchant_category_behavior.show()

+-----------------+-----------+-----------------+---------------------+
|merchant_category|total_spend|transaction_count|avg_transaction_value|
+-----------------+-----------+-----------------+---------------------+
|           retail|  198912117|              195|   1020062.1384615385|
|    entertainment|  202694059|              190|   1066810.8368421053|
|     food_service|  233025745|              225|   1035669.9777777778|
|       government|  211909516|              202|   1049057.0099009902|
|   transportation|  195787419|              188|   1041422.4414893617|
+-----------------+-----------+-----------------+---------------------+



As we see food service have highest transactions which means that people do more transactions on them.

#### V. Identify when most transactions happen (e.g., morning, evening).

We define the time periods as follows: 5 to 11 as morning, 12 to 17 as afternoon, 18 to 23 as evening, and 0 to 4 as night.

In [None]:
df = df.withColumn("hour", hour("timestamp")) \
       .withColumn("day_part",
           when(col("hour").between(5, 11), "morning")
          .when(col("hour").between(12, 17), "afternoon")
          .when(col("hour").between(18, 23), "evening")
          .when(col("hour").between(0, 4), "night")
          .otherwise("night")
       )

day_part_stats = df.groupBy("day_part").agg(
    count("*").alias("num_transactions"),
    avg("amount").alias("avg_amount")
)

day_part_stats.orderBy("day_part").show()

+---------+----------------+-----------+
| day_part|num_transactions| avg_amount|
+---------+----------------+-----------+
|afternoon|            1000|1042328.856|
+---------+----------------+-----------+



#### VI. Notice if people are spending more or less over time.

we can consider two diffrent approche for this part:

    1. consider year and mothly changes
    
    2. consider date changes

##### year and mothly changes

By grouping the data by date (such as year and month), we can observe the number of transactions, as well as the average and total transaction amounts for each period.

In [None]:
from pyspark.sql.functions import month, year

df = df.withColumn("month", month("timestamp")) \
                 .withColumn("year", year("timestamp"))

spending_over_time = df.groupBy("year", "month") \
    .agg(
        _sum("amount").alias("total_amount"),
        count("*").alias("num_transactions"),
        avg("amount").alias("avg_amount")
    ) \
    .orderBy("year", "month")

spending_over_time.show()


+----+-----+------------+----------------+-----------+
|year|month|total_amount|num_transactions| avg_amount|
+----+-----+------------+----------------+-----------+
|2025|    5|  1042328856|            1000|1042328.856|
+----+-----+------------+----------------+-----------+



##### date changes

we can see number of transactions and average and sum of amount based on transactions when we group our data based on date (year, month, day).

In [None]:
from pyspark.sql.functions import to_date

df = df.withColumn("date", to_date("timestamp"))

daily_trend = df.groupBy("date").agg(
    _sum("amount").alias("total_amount"),
    count("*").alias("num_transactions"),
    avg("amount").alias("avg_amount")
)

daily_trend.orderBy("date").show(truncate=False)

+----------+------------+----------------+-----------+
|date      |total_amount|num_transactions|avg_amount |
+----------+------------+----------------+-----------+
|2025-05-01|1042328856  |1000            |1042328.856|
+----------+------------+----------------+-----------+



### Data Storage Implementation

#### I. Load the data into MongoDB. Use a proper partitioning strategy—by date, merchant, or another logical key—to ensure efficient querying and scalability.

First we should connect to mongodb

In [None]:
from pyspark.sql import SparkSession
from pymongo import MongoClient
client = MongoClient("mongodb://localhost:27017/")
db = client['finance']
transactions_collection = db['transactions']

Then we should partion our data, We will partition data based on date (i.e., by the transaction date). This will allow us to easily filter data for specific time periods (e.g., last week or last month) when querying later.

In [None]:
from pyspark.sql.functions import to_date
df = df.withColumn("transaction_date", to_date("timestamp"))
df.show(2)

+-------+-----------------+---------------+-----------+-------------+--------------------+--------------+--------------------+-----------------+-----------+--------------+----------+--------+--------------------+------------+--------------------+----------+----+-----------+-----+------------+---------+----+----------+----------------+
| amount|commission_amount|commission_type|customer_id|customer_type|         device_info|failure_reason|            location|merchant_category|merchant_id|payment_method|risk_level|  status|           timestamp|total_amount|      transaction_id|vat_amount|hour|day_of_week|month|day_of_month| day_part|year|      date|transaction_date|
+-------+-----------------+---------------+-----------+-------------+--------------------+--------------+--------------------+-----------------+-----------+--------------+----------+--------+--------------------+------------+--------------------+----------+----+-----------+-----+------------+---------+----+----------+-------

Finally we insert our data to mongodb based on our patitioning.

In [None]:
data = df.collect()
documents = []

for row in data:
    doc = row.asDict()
    if doc.get('transaction_date'):
        doc['transaction_date'] = datetime.combine(doc['transaction_date'], datetime.min.time())
        doc['date'] = datetime.combine(doc['date'], datetime.min.time())
    documents.append(doc)
transactions_collection.insert_many(documents)
print("Data inserted into MongoDB successfully!")

Data inserted into MongoDB successfully!


##### Check it:

In [None]:
transactions = transactions_collection.find()
print("\nData from MongoDB:")
for transaction in transactions:
    print(transaction)


Data from MongoDB:
{'_id': ObjectId('6813abb7b8772b9b390ba8ae'), 'amount': 316092, 'commission_amount': 6321, 'commission_type': 'tiered', 'customer_id': 'cust_134', 'customer_type': 'CIP', 'device_info': ['3.1.0', 'iPhone 15', 'iOS'], 'failure_reason': None, 'location': [35.79119177606669, 51.36092132479518], 'merchant_category': 'retail', 'merchant_id': 'merch_42', 'payment_method': 'online', 'risk_level': 1, 'status': 'approved', 'timestamp': '2025-05-01T10:07:29.788116Z', 'total_amount': 350861, 'transaction_id': 'b5115c68-b0d1-46d0-9f0e-78a5167a3638', 'vat_amount': 28448, 'transaction_date': datetime.datetime(2025, 5, 1, 0, 0)}
{'_id': ObjectId('6813abb7b8772b9b390ba8af'), 'amount': 1076044, 'commission_amount': 21520, 'commission_type': 'progressive', 'customer_id': 'cust_927', 'customer_type': 'business', 'device_info': [None, None, None], 'failure_reason': None, 'location': [35.79185078301848, 51.3707594326629], 'merchant_category': 'transportation', 'merchant_id': 'merch_9', 

#### II. Implement data retention policy (e.g. keep the last 24 hours of detailed data).

First, we retrieve the current time, calculate the cutoff time (24 hours ago), query the samples that were created before this cutoff, and then delete them.

In [None]:
current_time = datetime.utcnow()
cutoff_time = current_time - timedelta(days=1)
delete_filter = {"transaction_date": {"$lt": cutoff_time}}
delete_result = transactions_collection.delete_many(delete_filter)
print(f"Deleted {delete_result.deleted_count} documents older than 24 hours.")

Deleted 0 documents older than 24 hours.


  current_time = datetime.utcnow()


#### III. Using MongoDB queries, create aggregated historical datasets for longer-term analysis. These datasets should summarize key insights, such as:
    ● Summarized transaction data: Group transactions by merchant, customer
    segment, or time period (e.g., daily, weekly, monthly).
    
    ● Commission reports: Aggregate total commissions earned per merchant
    category over time.

##### 1. Summarized transaction data:

we will make 2 examples :

    * Daily transaction totals per merchant
    
    * Monthly transaction summary per customer type

###### Daily transaction totals per merchant

In [None]:
pipeline = [
    {
        "$group": {
            "_id": {
                "merchant_id": "$merchant_id",
                "date": { "$dateToString": { "format": "%Y-%m-%d", "date": "$transaction_date" } }
            },
            "total_transactions": { "$sum": 1 },
            "total_amount": { "$sum": "$amount" },
            "total_vat": { "$sum": "$vat_amount" }
        }
    },
    { "$sort": { "_id.date": 1 } }
]


see the result

In [None]:
result = transactions_collection.aggregate(pipeline)
for doc in result:
    print(doc)

{'_id': {'merchant_id': 'merch_14', 'date': '2025-05-01'}, 'total_transactions': 147, 'total_amount': 140836227, 'total_vat': 12675188}
{'_id': {'merchant_id': 'merch_10', 'date': '2025-05-01'}, 'total_transactions': 142, 'total_amount': 156294701, 'total_vat': 14066454}
{'_id': {'merchant_id': 'merch_17', 'date': '2025-05-01'}, 'total_transactions': 89, 'total_amount': 98187889, 'total_vat': 8836869}
{'_id': {'merchant_id': 'merch_12', 'date': '2025-05-01'}, 'total_transactions': 120, 'total_amount': 143074841, 'total_vat': 12876675}
{'_id': {'merchant_id': 'merch_39', 'date': '2025-05-01'}, 'total_transactions': 128, 'total_amount': 139235371, 'total_vat': 12531119}
{'_id': {'merchant_id': 'merch_19', 'date': '2025-05-01'}, 'total_transactions': 114, 'total_amount': 109037112, 'total_vat': 9813286}
{'_id': {'merchant_id': 'merch_2', 'date': '2025-05-01'}, 'total_transactions': 122, 'total_amount': 117763899, 'total_vat': 10598699}
{'_id': {'merchant_id': 'merch_15', 'date': '2025-05-

###### Monthly transaction summary per customer type

In [None]:
pipeline = [
    {
        "$group": {
            "_id": {
                "customer_type": "$customer_type",
                "month": { "$dateToString": { "format": "%Y-%m", "date": "$transaction_date" } }
            },
            "transaction_count": { "$sum": 1 },
            "total_amount": { "$sum": "$amount" }
        }
    },
    { "$sort": { "_id.month": 1 } }
]


see the result

In [None]:
result = transactions_collection.aggregate(pipeline)
for doc in result:
    print(doc)

{'_id': {'customer_type': 'CIP', 'month': '2025-05'}, 'transaction_count': 2027, 'total_amount': 2095177695}
{'_id': {'customer_type': 'business', 'month': '2025-05'}, 'transaction_count': 2037, 'total_amount': 2107765485}
{'_id': {'customer_type': 'individual', 'month': '2025-05'}, 'transaction_count': 1936, 'total_amount': 1960008237}


##### 2. Commission reports:

###### Commission per merchant category (monthly)

In [None]:
pipeline = [
    {
        "$group": {
            "_id": {
                "merchant_category": "$merchant_category",
                "month": { "$dateToString": { "format": "%Y-%m", "date": "$transaction_date" } }
            },
            "total_commission": { "$sum": "$commission_amount" },
            "transactions": { "$sum": 1 }
        }
    },
    { "$sort": { "_id.month": 1 } }
]

see the result:

In [None]:
result = transactions_collection.aggregate(pipeline)
for doc in result:
    print(doc)

{'_id': {'merchant_category': 'retail', 'month': '2025-05'}, 'total_commission': 22540735, 'transactions': 1173}
{'_id': {'merchant_category': 'government', 'month': '2025-05'}, 'total_commission': 24179978, 'transactions': 1152}
{'_id': {'merchant_category': 'entertainment', 'month': '2025-05'}, 'total_commission': 26132371, 'transactions': 1214}
{'_id': {'merchant_category': 'food_service', 'month': '2025-05'}, 'total_commission': 24606920, 'transactions': 1214}
{'_id': {'merchant_category': 'transportation', 'month': '2025-05'}, 'total_commission': 25796043, 'transactions': 1247}


In [None]:
client.close()
print("MongoDB client connection closed.")
spark.stop()

MongoDB client connection closed.


## Real-Time Processing Layer

### Spark Streaming Application

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType, MapType
import os

os.environ["HADOOP_HOME"] = "C:/hadoop-3.3.6"
os.environ["hadoop.home.dir"] = "C:/hadoop-3.3.6"


import shutil

path = 'C:/Users/Mehrad/spark_checkpoints/realtime_insights'

if os.path.exists(path):
    shutil.rmtree(path)


spark = SparkSession.builder \
    .appName("RealTimeProcessing") \
    .config("checkpointLocation", "C:/Users/Mehrad/spark_checkpoints") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.5") \
    .config("spark.hadoop.io.native.lib.available", "false") \
    .getOrCreate()

kafka_bootstrap_servers = "localhost:9092"
topic = "darooghe.transactions"

raw_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", topic) \
    .option("startingOffsets", "latest") \
    .load()

transactions_df = raw_df.selectExpr("CAST(value AS STRING)")

from pyspark.sql.types import *

transaction_schema = StructType([
    StructField("transaction_id", StringType()),
    StructField("timestamp", TimestampType()),
    StructField("customer_id", StringType()),
    StructField("merchant_id", StringType()),
    StructField("merchant_category", StringType()),
    StructField("payment_method", StringType()),
    StructField("amount", DoubleType()),
    StructField("location", StructType([
        StructField("lat", DoubleType()),
        StructField("lng", DoubleType())
    ])),
    StructField("device_info", StructType([
        StructField("os", StringType()),
        StructField("app_version", StringType()),
        StructField("device_model", StringType())
    ])),
    StructField("status", StringType()),
    StructField("commission_type", StringType()),
    StructField("commission_amount", DoubleType()),
    StructField("vat_amount", DoubleType()),
    StructField("total_amount", DoubleType()),
    StructField("customer_type", StringType()),
    StructField("risk_level", IntegerType()),
    StructField("failure_reason", StringType())
])

from pyspark.sql.functions import from_json

parsed_df = transactions_df.select(
    from_json(col("value"), transaction_schema).alias("data")
).select("data.*")

from pyspark.sql.functions import window, col, avg, count

windowed_df = parsed_df \
    .withWatermark("timestamp", "2 minutes") \
    .groupBy(
        window(col("timestamp"), "1 minute", "20 seconds"),
        col("merchant_category")
    ).agg(
        count("*").alias("transaction_count"),
        avg("amount").alias("average_transaction_amount")
    )

from pyspark.sql.functions import to_json, struct

output_df = windowed_df.select(
    to_json(struct(
        col("window.start").alias("window_start"),
        col("window.end").alias("window_end"),
        col("merchant_category"),
        col("transaction_count"),
        col("average_transaction_amount")
    )).alias("value")
)

query = output_df.writeStream \
    .format("kafka") \
    .outputMode("update") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("topic", "darooghe.realtime_insights") \
    .option("checkpointLocation", "C:/Users/Mehrad/spark_checkpoints/realtime_insights") \
    .start()


query.awaitTermination(100)

spark.stop()



### Fraud Detection System

#### I. Implement three fraud detection rules:

##### A. Velocity check: More than 5 transactions from the same customer in 2 minutes

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType, MapType
import os

os.environ["HADOOP_HOME"] = "C:/hadoop-3.3.6"
os.environ["hadoop.home.dir"] = "C:/hadoop-3.3.6"

import shutil

path = 'C:/Users/Mehrad/spark_checkpoints/velocity_check'

if os.path.exists(path):
    shutil.rmtree(path)



spark = SparkSession.builder \
    .appName("RealTimeProcessing") \
    .config("checkpointLocation", "C:/Users/Mehrad/spark_checkpoints") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.5") \
    .config("spark.hadoop.io.native.lib.available", "false") \
    .getOrCreate()

kafka_bootstrap_servers = "localhost:9092"
topic = "darooghe.transactions"

raw_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", topic) \
    .option("startingOffsets", "latest") \
    .load()

transactions_df = raw_df.selectExpr("CAST(value AS STRING)")

from pyspark.sql.types import *

transaction_schema = StructType([
    StructField("transaction_id", StringType()),
    StructField("timestamp", TimestampType()),
    StructField("customer_id", StringType()),
    StructField("merchant_id", StringType()),
    StructField("merchant_category", StringType()),
    StructField("payment_method", StringType()),
    StructField("amount", DoubleType()),
    StructField("location", StructType([
        StructField("lat", DoubleType()),
        StructField("lng", DoubleType())
    ])),
    StructField("device_info", StructType([
        StructField("os", StringType()),
        StructField("app_version", StringType()),
        StructField("device_model", StringType())
    ])),
    StructField("status", StringType()),
    StructField("commission_type", StringType()),
    StructField("commission_amount", DoubleType()),
    StructField("vat_amount", DoubleType()),
    StructField("total_amount", DoubleType()),
    StructField("customer_type", StringType()),
    StructField("risk_level", IntegerType()),
    StructField("failure_reason", StringType())
])

from pyspark.sql.functions import from_json

parsed_df = transactions_df.select(
    from_json(col("value"), transaction_schema).alias("data")
).select("data.*")

from pyspark.sql.functions import window, col, avg, count


velocity_check_df = parsed_df \
    .withWatermark("timestamp", "3 minutes") \
    .groupBy(
        window(col("timestamp"), "2 minutes", "20 seconds"),
        col("customer_id")
    ) \
    .agg(count("*").alias("tx_count")) \
    .filter(col("tx_count") > 5)

velocity_alerts_df = velocity_check_df.select(
    to_json(struct(
        col("window.start").alias("window_start"),
        col("window.end").alias("window_end"),
        col("customer_id"),
        col("tx_count"),
    )).alias("value")
)

velocity_query = velocity_alerts_df.writeStream \
    .format("kafka") \
    .outputMode("update") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("topic", "darooghe.fraud_alerts_velocity") \
    .option("checkpointLocation", "C:/Users/Mehrad/spark_checkpoints/velocity_check") \
    .start()


velocity_query.awaitTermination(60)


spark.stop()



##### B. Geographical impossibility: Transactions from locations >50 km apart within 5 minutes.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, expr, to_json, struct, unix_timestamp, radians, sin, cos, atan2, sqrt
from pyspark.sql.types import *
import os
import shutil

os.environ["HADOOP_HOME"] = "C:/hadoop-3.3.6"
os.environ["hadoop.home.dir"] = "C:/hadoop-3.3.6"

path = 'C:/Users/Mehrad/spark_checkpoints/geo_check'
if os.path.exists(path):
    shutil.rmtree(path)

spark = SparkSession.builder \
    .appName("RealTimeProcessing") \
    .config("checkpointLocation", "C:/Users/Mehrad/spark_checkpoints") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.5") \
    .config("spark.hadoop.io.native.lib.available", "false") \
    .getOrCreate()

kafka_bootstrap_servers = "localhost:9092"
topic = "darooghe.transactions"

raw_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", topic) \
    .option("startingOffsets", "latest") \
    .load()

transactions_df = raw_df.selectExpr("CAST(value AS STRING)")

transaction_schema = StructType([
    StructField("transaction_id", StringType()),
    StructField("timestamp", TimestampType()),
    StructField("customer_id", StringType()),
    StructField("merchant_id", StringType()),
    StructField("merchant_category", StringType()),
    StructField("payment_method", StringType()),
    StructField("amount", DoubleType()),
    StructField("location", StructType([
        StructField("lat", DoubleType()),
        StructField("lng", DoubleType())
    ])),
    StructField("device_info", StructType([
        StructField("os", StringType()),
        StructField("app_version", StringType()),
        StructField("device_model", StringType())
    ])),
    StructField("status", StringType()),
    StructField("commission_type", StringType()),
    StructField("commission_amount", DoubleType()),
    StructField("vat_amount", DoubleType()),
    StructField("total_amount", DoubleType()),
    StructField("customer_type", StringType()),
    StructField("risk_level", IntegerType()),
    StructField("failure_reason", StringType())
])

parsed_df = transactions_df.select(
    from_json(col("value"), transaction_schema).alias("data")
).select("data.*")

left = parsed_df.withWatermark("timestamp", "6 minutes").alias("a")
right = parsed_df.withWatermark("timestamp", "6 minutes").alias("b")

geo_joined = left.join(
    right,
    on=expr("""
        a.customer_id = b.customer_id AND
        a.timestamp < b.timestamp AND
        unix_timestamp(b.timestamp) - unix_timestamp(a.timestamp) <= 300
    """)
)

R = 6371.0
geo_checked = geo_joined.withColumn(
    "distance_km",
    R * 2 * atan2(
        sqrt(
            sin((radians(col("b.location.lat") - col("a.location.lat")) / 2) ** 2) +
            cos(radians(col("a.location.lat"))) * cos(radians(col("b.location.lat"))) *
            sin((radians(col("b.location.lng") - col("a.location.lng")) / 2) ** 2)
        ),
        sqrt(
            1 - (
                sin((radians(col("b.location.lat") - col("a.location.lat")) / 2) ** 2) +
                cos(radians(col("a.location.lat"))) * cos(radians(col("b.location.lat"))) *
                sin((radians(col("b.location.lng") - col("a.location.lng")) / 2) ** 2)
            )
        )
    )
).filter(col("distance_km") > 50)

geo_alerts_df = geo_checked.select(
    to_json(struct(
        col("a.customer_id").alias("customer_id"),
        col("a.timestamp").alias("timestamp_a"),
        col("b.timestamp").alias("timestamp_b"),
        col("distance_km")
    )).alias("value")
)

geo_query = geo_alerts_df.writeStream \
    .format("kafka") \
    .outputMode("append") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("topic", "darooghe.fraud_alerts_geo") \
    .option("checkpointLocation", "C:/Users/Mehrad/spark_checkpoints/geo_check") \
    .start()

geo_query.awaitTermination(120)

spark.stop()


##### C. Amount anomaly: Transaction amount >1000% of customer's average (find customer’s average from pre-existing data).

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, window, to_json, struct, avg
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
import os
import shutil

os.environ["HADOOP_HOME"] = "C:/hadoop-3.3.6"
os.environ["hadoop.home.dir"] = "C:/hadoop-3.3.6"

path = 'C:/Users/Mehrad/spark_checkpoints/amount_check'
if os.path.exists(path):
    shutil.rmtree(path)

spark = SparkSession.builder \
    .appName("RealTimeProcessing") \
    .config("checkpointLocation", "C:/Users/Mehrad/spark_checkpoints") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.5") \
    .config("spark.hadoop.io.native.lib.available", "false") \
    .getOrCreate()

kafka_bootstrap_servers = "localhost:9092"
topic = "darooghe.transactions"

raw_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", topic) \
    .option("startingOffsets", "latest") \
    .load()

transactions_df = raw_df.selectExpr("CAST(value AS STRING)")
transaction_schema = StructType([
    StructField("transaction_id", StringType()),
    StructField("timestamp", TimestampType()),
    StructField("customer_id", StringType()),
    StructField("merchant_id", StringType()),
    StructField("merchant_category", StringType()),
    StructField("payment_method", StringType()),
    StructField("amount", DoubleType()),
    StructField("location", StructType([
        StructField("lat", DoubleType()),
        StructField("lng", DoubleType())
    ])),
    StructField("device_info", StructType([
        StructField("os", StringType()),
        StructField("app_version", StringType()),
        StructField("device_model", StringType())
    ])),
    StructField("status", StringType()),
    StructField("commission_type", StringType()),
    StructField("commission_amount", DoubleType()),
    StructField("vat_amount", DoubleType()),
    StructField("total_amount", DoubleType()),
    StructField("customer_type", StringType()),
    StructField("risk_level", IntegerType()),
    StructField("failure_reason", StringType())
])

parsed_df = transactions_df.select(
    from_json(col("value"), transaction_schema).alias("data")
).select("data.*")

historical_df = spark.read.json("transactions_data.json")
customer_avg_df = historical_df.groupBy("customer_id").agg(avg("amount").alias("avg_amount"))

amount_joined_df = parsed_df.join(customer_avg_df, on="customer_id", how="inner")
amount_anomaly_df = amount_joined_df.filter(col("amount") > 10 * col("avg_amount"))

amount_alerts_df = amount_anomaly_df.select(
    to_json(struct(
        col("transaction_id"),
        col("timestamp"),
        col("customer_id"),
        col("amount"),
        col("avg_amount")
    )).alias("value")
)


amount_query = amount_alerts_df.writeStream \
    .format("kafka") \
    .outputMode("append") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("topic", "darooghe.fraud_alerts_amount") \
    .option("checkpointLocation", "C:/Users/Mehrad/spark_checkpoints/amount_check") \
    .start()

amount_query.awaitTermination(60)

spark.stop()


#### II. Write detected fraud events to the darooghe.fraud_alerts topic

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, expr, to_json, struct, unix_timestamp, radians, sin, cos, atan2, sqrt
from pyspark.sql.types import *
import os
import shutil
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, window, to_json, struct, avg
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
from pyspark.sql.functions import window, col, avg, count

os.environ["HADOOP_HOME"] = "C:/hadoop-3.3.6"
os.environ["hadoop.home.dir"] = "C:/hadoop-3.3.6"

path = 'C:/Users/Mehrad/spark_checkpoints/geo_check'
if os.path.exists(path):
    shutil.rmtree(path)

path = 'C:/Users/Mehrad/spark_checkpoints/amount_check'
if os.path.exists(path):
    shutil.rmtree(path)


path = 'C:/Users/Mehrad/spark_checkpoints/velocity_check'

if os.path.exists(path):
    shutil.rmtree(path)

spark = SparkSession.builder \
    .appName("RealTimeProcessing") \
    .config("checkpointLocation", "C:/Users/Mehrad/spark_checkpoints") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.5") \
    .config("spark.hadoop.io.native.lib.available", "false") \
    .getOrCreate()

kafka_bootstrap_servers = "localhost:9092"
topic = "darooghe.transactions"

raw_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", topic) \
    .option("startingOffsets", "latest") \
    .load()

transactions_df = raw_df.selectExpr("CAST(value AS STRING)")

transaction_schema = StructType([
    StructField("transaction_id", StringType()),
    StructField("timestamp", TimestampType()),
    StructField("customer_id", StringType()),
    StructField("merchant_id", StringType()),
    StructField("merchant_category", StringType()),
    StructField("payment_method", StringType()),
    StructField("amount", DoubleType()),
    StructField("location", StructType([
        StructField("lat", DoubleType()),
        StructField("lng", DoubleType())
    ])),
    StructField("device_info", StructType([
        StructField("os", StringType()),
        StructField("app_version", StringType()),
        StructField("device_model", StringType())
    ])),
    StructField("status", StringType()),
    StructField("commission_type", StringType()),
    StructField("commission_amount", DoubleType()),
    StructField("vat_amount", DoubleType()),
    StructField("total_amount", DoubleType()),
    StructField("customer_type", StringType()),
    StructField("risk_level", IntegerType()),
    StructField("failure_reason", StringType())
])

parsed_df = transactions_df.select(
    from_json(col("value"), transaction_schema).alias("data")
).select("data.*")

left = parsed_df.withWatermark("timestamp", "6 minutes").alias("a")
right = parsed_df.withWatermark("timestamp", "6 minutes").alias("b")

geo_joined = left.join(
    right,
    on=expr("""
        a.customer_id = b.customer_id AND
        a.timestamp < b.timestamp AND
        unix_timestamp(b.timestamp) - unix_timestamp(a.timestamp) <= 300
    """)
)

R = 6371.0
geo_checked = geo_joined.withColumn(
    "distance_km",
    R * 2 * atan2(
        sqrt(
            sin((radians(col("b.location.lat") - col("a.location.lat")) / 2) ** 2) +
            cos(radians(col("a.location.lat"))) * cos(radians(col("b.location.lat"))) *
            sin((radians(col("b.location.lng") - col("a.location.lng")) / 2) ** 2)
        ),
        sqrt(
            1 - (
                sin((radians(col("b.location.lat") - col("a.location.lat")) / 2) ** 2) +
                cos(radians(col("a.location.lat"))) * cos(radians(col("b.location.lat"))) *
                sin((radians(col("b.location.lng") - col("a.location.lng")) / 2) ** 2)
            )
        )
    )
).filter(col("distance_km") > 50)

geo_alerts_df = geo_checked.select(
    to_json(struct(
        col("a.customer_id").alias("customer_id"),
        col("a.timestamp").alias("timestamp_a"),
        col("b.timestamp").alias("timestamp_b"),
        col("distance_km")
    )).alias("value")
)

geo_query = geo_alerts_df.writeStream \
    .format("kafka") \
    .outputMode("append") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("topic", "darooghe.fraud_alerts") \
    .option("checkpointLocation", "C:/Users/Mehrad/spark_checkpoints/geo_check") \
    .start()


velocity_check_df = parsed_df \
    .withWatermark("timestamp", "3 minutes") \
    .groupBy(
        window(col("timestamp"), "2 minutes", "20 seconds"),
        col("customer_id")
    ) \
    .agg(count("*").alias("tx_count")) \
    .filter(col("tx_count") > 5)

velocity_alerts_df = velocity_check_df.select(
    to_json(struct(
        col("window.start").alias("window_start"),
        col("window.end").alias("window_end"),
        col("customer_id"),
        col("tx_count"),
    )).alias("value")
)

velocity_query = velocity_alerts_df.writeStream \
    .format("kafka") \
    .outputMode("update") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("topic", "darooghe.fraud_alerts") \
    .option("checkpointLocation", "C:/Users/Mehrad/spark_checkpoints/velocity_check") \
    .start()



historical_df = spark.read.json("transactions_data.json")
customer_avg_df = historical_df.groupBy("customer_id").agg(avg("amount").alias("avg_amount"))

amount_joined_df = parsed_df.join(customer_avg_df, on="customer_id", how="inner")
amount_anomaly_df = amount_joined_df.filter(col("amount") > 10 * col("avg_amount"))

amount_alerts_df = amount_anomaly_df.select(
    to_json(struct(
        col("transaction_id"),
        col("timestamp"),
        col("customer_id"),
        col("amount"),
        col("avg_amount")
    )).alias("value")
)


amount_query = amount_alerts_df.writeStream \
    .format("kafka") \
    .outputMode("append") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("topic", "darooghe.fraud_alerts") \
    .option("checkpointLocation", "C:/Users/Mehrad/spark_checkpoints/amount_check") \
    .start()

amount_query.awaitTermination(120)

velocity_query.awaitTermination(120)
geo_query.awaitTermination(120)

spark.stop()


### Real-Time Commission Analytics

#### I. Calculate and write to an (or multiple) arbitrary topic (topics), real-time metrics for commissions:

##### A. Total commission by type per minute.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType, MapType
import os
from pyspark.sql.functions import col, sum as _sum, avg, count, round, to_json, struct

os.environ["HADOOP_HOME"] = "C:/hadoop-3.3.6"
os.environ["hadoop.home.dir"] = "C:/hadoop-3.3.6"


import shutil

path = 'C:/Users/Mehrad/spark_checkpoints/commission_type'

if os.path.exists(path):
    shutil.rmtree(path)


spark = SparkSession.builder \
    .appName("RealTimeProcessing") \
    .config("checkpointLocation", "C:/Users/Mehrad/spark_checkpoints") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.5") \
    .config("spark.hadoop.io.native.lib.available", "false") \
    .getOrCreate()

kafka_bootstrap_servers = "localhost:9092"
topic = "darooghe.transactions"

raw_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", topic) \
    .option("startingOffsets", "latest") \
    .load()

transactions_df = raw_df.selectExpr("CAST(value AS STRING)")

from pyspark.sql.types import *

transaction_schema = StructType([
    StructField("transaction_id", StringType()),
    StructField("timestamp", TimestampType()),
    StructField("customer_id", StringType()),
    StructField("merchant_id", StringType()),
    StructField("merchant_category", StringType()),
    StructField("payment_method", StringType()),
    StructField("amount", DoubleType()),
    StructField("location", StructType([
        StructField("lat", DoubleType()),
        StructField("lng", DoubleType())
    ])),
    StructField("device_info", StructType([
        StructField("os", StringType()),
        StructField("app_version", StringType()),
        StructField("device_model", StringType())
    ])),
    StructField("status", StringType()),
    StructField("commission_type", StringType()),
    StructField("commission_amount", DoubleType()),
    StructField("vat_amount", DoubleType()),
    StructField("total_amount", DoubleType()),
    StructField("customer_type", StringType()),
    StructField("risk_level", IntegerType()),
    StructField("failure_reason", StringType())
])

from pyspark.sql.functions import from_json

parsed_df = transactions_df.select(
    from_json(col("value"), transaction_schema).alias("data")
).select("data.*")

from pyspark.sql.functions import window, col, avg, count

commission_by_type_df = parsed_df \
    .withWatermark("timestamp", "2 minutes") \
    .groupBy(
        window(col("timestamp"), "1 minute"),
        col("commission_type")
    ) \
    .agg(
        _sum("commission_amount").alias("total_commission")
    )

commission_by_type_output = commission_by_type_df.select(
    to_json(struct(
        col("window.start").alias("window_start"),
        col("window.end").alias("window_end"),
        col("commission_type"),
        col("total_commission")
    )).alias("value")
)

commission_by_type_query = commission_by_type_output.writeStream \
    .format("kafka") \
    .outputMode("update") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("topic", "darooghe.commission_by_type") \
    .option("checkpointLocation", "C:/Users/Mehrad/spark_checkpoints/commission_type") \
    .start()

commission_by_type_query.awaitTermination(120)

spark.stop()



##### B. Commission ratio (commission/transaction amount) by merchant category.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, window, to_json, struct
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType, MapType
import os

os.environ["HADOOP_HOME"] = "C:/hadoop-3.3.6"
os.environ["hadoop.home.dir"] = "C:/hadoop-3.3.6"


import shutil

path = 'C:/Users/Mehrad/spark_checkpoints/commission_ratio'

if os.path.exists(path):
    shutil.rmtree(path)


spark = SparkSession.builder \
    .appName("RealTimeProcessing") \
    .config("checkpointLocation", "C:/Users/Mehrad/spark_checkpoints") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.5") \
    .config("spark.hadoop.io.native.lib.available", "false") \
    .getOrCreate()

kafka_bootstrap_servers = "localhost:9092"
topic = "darooghe.transactions"

raw_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", topic) \
    .option("startingOffsets", "latest") \
    .load()

transactions_df = raw_df.selectExpr("CAST(value AS STRING)")

from pyspark.sql.types import *

transaction_schema = StructType([
    StructField("transaction_id", StringType()),
    StructField("timestamp", TimestampType()),
    StructField("customer_id", StringType()),
    StructField("merchant_id", StringType()),
    StructField("merchant_category", StringType()),
    StructField("payment_method", StringType()),
    StructField("amount", DoubleType()),
    StructField("location", StructType([
        StructField("lat", DoubleType()),
        StructField("lng", DoubleType())
    ])),
    StructField("device_info", StructType([
        StructField("os", StringType()),
        StructField("app_version", StringType()),
        StructField("device_model", StringType())
    ])),
    StructField("status", StringType()),
    StructField("commission_type", StringType()),
    StructField("commission_amount", DoubleType()),
    StructField("vat_amount", DoubleType()),
    StructField("total_amount", DoubleType()),
    StructField("customer_type", StringType()),
    StructField("risk_level", IntegerType()),
    StructField("failure_reason", StringType())
])

from pyspark.sql.functions import from_json

parsed_df = transactions_df.select(
    from_json(col("value"), transaction_schema).alias("data")
).select("data.*")

from pyspark.sql.functions import window, col, avg, count

commission_ratio_df = parsed_df \
    .filter(col("amount") > 0) \
    .withWatermark("timestamp", "2 minutes") \
    .groupBy(
        window(col("timestamp"), "1 minute"),
        col("merchant_category")
    ) \
    .agg(
        avg(col("commission_amount") / col("amount")).alias("commission_ratio")
    )

commission_ratio_output = commission_ratio_df.select(
    to_json(struct(
        col("window.start").alias("window_start"),
        col("window.end").alias("window_end"),
        col("merchant_category"),
        col("commission_ratio")
    )).alias("value")
)

commission_ratio_query = commission_ratio_output.writeStream \
    .format("kafka") \
    .outputMode("update") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("topic", "darooghe.commission_ratio") \
    .option("checkpointLocation", "C:/Users/Mehrad/spark_checkpoints/commission_ratio") \
    .start()

commission_ratio_query.awaitTermination(120)

spark.stop()



##### C. Highest commission-generating merchants in 5-minute windows.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, window, sum as _sum, to_json, struct
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
import os
import shutil

os.environ["HADOOP_HOME"] = "C:/hadoop-3.3.6"
os.environ["hadoop.home.dir"] = "C:/hadoop-3.3.6"


checkpoint_path = "C:/Users/Mehrad/spark_checkpoints/top_merchants"
if os.path.exists(checkpoint_path):
    shutil.rmtree(checkpoint_path)


spark = SparkSession.builder \
    .appName("RealTimeProcessing") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.5") \
    .config("spark.hadoop.io.native.lib.available", "false") \
    .getOrCreate()

kafka_bootstrap_servers = "localhost:9092"
topic = "darooghe.transactions"

raw_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", topic) \
    .option("startingOffsets", "latest") \
    .load()

transactions_df = raw_df.selectExpr("CAST(value AS STRING)")

transaction_schema = StructType([
    StructField("transaction_id", StringType()),
    StructField("timestamp", TimestampType()),
    StructField("customer_id", StringType()),
    StructField("merchant_id", StringType()),
    StructField("merchant_category", StringType()),
    StructField("payment_method", StringType()),
    StructField("amount", DoubleType()),
    StructField("location", StructType([
        StructField("lat", DoubleType()),
        StructField("lng", DoubleType())
    ])),
    StructField("device_info", StructType([
        StructField("os", StringType()),
        StructField("app_version", StringType()),
        StructField("device_model", StringType())
    ])),
    StructField("status", StringType()),
    StructField("commission_type", StringType()),
    StructField("commission_amount", DoubleType()),
    StructField("vat_amount", DoubleType()),
    StructField("total_amount", DoubleType()),
    StructField("customer_type", StringType()),
    StructField("risk_level", IntegerType()),
    StructField("failure_reason", StringType())
])

parsed_df = transactions_df.select(
    from_json(col("value"), transaction_schema).alias("data")
).select("data.*")

merchant_commission_df = parsed_df \
    .withWatermark("timestamp", "6 minutes") \
    .groupBy(
        window(col("timestamp"), "5 minutes"),
        col("merchant_id")
    ).agg(
        _sum("commission_amount").alias("total_commission")
    )

def process_top_merchants(batch_df, batch_id):
    if batch_df.isEmpty():
        return

    window_spec = Window.partitionBy("window").orderBy(col("total_commission").desc())

    ranked_df = batch_df.withColumn("rank", row_number().over(window_spec)) \
                        .filter(col("rank") <= 5)

    output_df = ranked_df.select(to_json(struct(
        col("window.start").alias("window_start"),
        col("window.end").alias("window_end"),
        col("merchant_id"),
        col("total_commission"),
        col("rank")
    )).alias("value"))

    output_df.write \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
        .option("topic", "darooghe.top_merchants") \
        .save()

top_merchants_query = merchant_commission_df.writeStream \
    .foreachBatch(process_top_merchants) \
    .outputMode("update") \
    .option("checkpointLocation", checkpoint_path) \
    .start()

top_merchants_query.awaitTermination(120)
spark.stop()


##### D. All

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.sql.functions import from_json, col, window, sum as _sum, to_json, struct
import os
import shutil

os.environ["HADOOP_HOME"] = "C:/hadoop-3.3.6"
os.environ["hadoop.home.dir"] = "C:/hadoop-3.3.6"

checkpoint_base = "C:/Users/Mehrad/spark_checkpoints"
for cp in ["by_type", "ratio_by_category", "top_merchants"]:
    path = f"{checkpoint_base}/{cp}"
    if os.path.exists(path):
        shutil.rmtree(path)

spark = SparkSession.builder \
    .appName("RealTimeProcessing") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.5") \
    .config("spark.hadoop.io.native.lib.available", "false") \
    .getOrCreate()

kafka_bootstrap_servers = "localhost:9092"
source_topic = "darooghe.transactions"

raw_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", source_topic) \
    .option("startingOffsets", "latest") \
    .load()

transactions_df = raw_df.selectExpr("CAST(value AS STRING)")

transaction_schema = StructType([
    StructField("transaction_id", StringType()),
    StructField("timestamp", TimestampType()),
    StructField("customer_id", StringType()),
    StructField("merchant_id", StringType()),
    StructField("merchant_category", StringType()),
    StructField("payment_method", StringType()),
    StructField("amount", DoubleType()),
    StructField("location", StructType([
        StructField("lat", DoubleType()),
        StructField("lng", DoubleType())
    ])),
    StructField("device_info", StructType([
        StructField("os", StringType()),
        StructField("app_version", StringType()),
        StructField("device_model", StringType())
    ])),
    StructField("status", StringType()),
    StructField("commission_type", StringType()),
    StructField("commission_amount", DoubleType()),
    StructField("vat_amount", DoubleType()),
    StructField("total_amount", DoubleType()),
    StructField("customer_type", StringType()),
    StructField("risk_level", IntegerType()),
    StructField("failure_reason", StringType())
])

parsed_df = transactions_df.select(
    from_json(col("value"), transaction_schema).alias("data")
).select("data.*")

parsed_df = parsed_df.withWatermark("timestamp", "6 minutes")

commission_by_type = parsed_df.groupBy(
    window(col("timestamp"), "5 minutes"),
    col("commission_type")
).agg(
    _sum("commission_amount").alias("total_commission")
)

commission_by_type_output = commission_by_type.select(
    to_json(struct(
        col("window.start").alias("window_start"),
        col("window.end").alias("window_end"),
        col("commission_type"),
        col("total_commission")
    )).alias("value")
)

commission_by_type_query = commission_by_type_output.writeStream \
    .format("kafka") \
    .outputMode("update") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("topic", "darooghe.commission_by_type2") \
    .option("checkpointLocation", f"{checkpoint_base}/by_type") \
    .start()

commission_ratio_df = parsed_df \
    .withColumn("commission_ratio", col("commission_amount") / col("amount")) \
    .groupBy("merchant_category") \
    .agg(avg("commission_ratio").alias("avg_commission_ratio"))

commission_ratio_output = commission_ratio_df.select(
    to_json(struct(
        col("merchant_category"),
        round(col("avg_commission_ratio"), 4).alias("avg_commission_ratio")
    )).alias("value")
)

commission_ratio_query = commission_ratio_output.writeStream \
    .format("kafka") \
    .outputMode("complete") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("topic", "darooghe.commission_ratio_by_category2") \
    .option("checkpointLocation", f"{checkpoint_base}/ratio_by_category") \
    .start()

merchant_commission_df = parsed_df.groupBy(
    window(col("timestamp"), "5 minutes"),
    col("merchant_id")
).agg(
    _sum("commission_amount").alias("total_commission")
)

def process_top_merchants(batch_df, batch_id):
    if batch_df.isEmpty():
        return

    window_spec = Window.partitionBy("window").orderBy(col("total_commission").desc())

    ranked_df = batch_df.withColumn("rank", row_number().over(window_spec)) \
                        .filter(col("rank") <= 5)

    output_df = ranked_df.select(to_json(struct(
        col("window.start").alias("window_start"),
        col("window.end").alias("window_end"),
        col("merchant_id"),
        col("total_commission"),
        col("rank")
    )).alias("value"))

    output_df.write \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
        .option("topic", "darooghe.top_merchants2") \
        .save()

top_merchants_query = merchant_commission_df.writeStream \
    .foreachBatch(process_top_merchants) \
    .outputMode("update") \
    .option("checkpointLocation", f"{checkpoint_base}/top_merchants") \
    .start()


commission_by_type_query.awaitTermination(300)
commission_ratio_query.awaitTermination(300)
top_merchants_query.awaitTermination(300)

spark.stop()


## Visualization

### I. Transactions Volume: Display time-series charts that show real-time and historical transaction volumes, helping to identify trends and seasonal peaks.

In [None]:
try:
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import from_json, col, window, to_json, struct
    from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType, MapType
    import os

    os.environ["HADOOP_HOME"] = "C:/hadoop-3.3.6"
    os.environ["hadoop.home.dir"] = "C:/hadoop-3.3.6"


    import shutil

    path = 'C:/Users/Mehrad/spark_checkpoints/volume'

    if os.path.exists(path):
        shutil.rmtree(path)


    spark = SparkSession.builder \
        .appName("RealTimeProcessing") \
        .config("checkpointLocation", "C:/Users/Mehrad/spark_checkpoints") \
        .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.5") \
        .config("spark.hadoop.io.native.lib.available", "false") \
        .getOrCreate()

    kafka_bootstrap_servers = "localhost:9092"
    topic = "darooghe.transactions"

    raw_df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
        .option("subscribe", topic) \
        .option("startingOffsets", "latest") \
        .load()

    transactions_df = raw_df.selectExpr("CAST(value AS STRING)")

    from pyspark.sql.types import *

    transaction_schema = StructType([
        StructField("transaction_id", StringType()),
        StructField("timestamp", TimestampType()),
        StructField("customer_id", StringType()),
        StructField("merchant_id", StringType()),
        StructField("merchant_category", StringType()),
        StructField("payment_method", StringType()),
        StructField("amount", DoubleType()),
        StructField("location", StructType([
            StructField("lat", DoubleType()),
            StructField("lng", DoubleType())
        ])),
        StructField("device_info", StructType([
            StructField("os", StringType()),
            StructField("app_version", StringType()),
            StructField("device_model", StringType())
        ])),
        StructField("status", StringType()),
        StructField("commission_type", StringType()),
        StructField("commission_amount", DoubleType()),
        StructField("vat_amount", DoubleType()),
        StructField("total_amount", DoubleType()),
        StructField("customer_type", StringType()),
        StructField("risk_level", IntegerType()),
        StructField("failure_reason", StringType())
    ])

    from pyspark.sql.functions import from_json

    parsed_df = transactions_df.select(
        from_json(col("value"), transaction_schema).alias("data")
    ).select("data.*")

    from pyspark.sql.functions import window, col, avg, count

    transaction_volume_df = parsed_df \
        .withWatermark("timestamp", "2 minutes") \
        .groupBy(window(col("timestamp"), "1 minute")) \
        .agg(count("*").alias("transaction_count"))


    transaction_volume_output = transaction_volume_df.select(
        to_json(struct(
            col("window.start").alias("window_start"),
            col("window.end").alias("window_end"),
            col("transaction_count")
        )).alias("value")
    )

    transaction_volume_query = transaction_volume_output.writeStream \
        .format("kafka") \
        .outputMode("update") \
        .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
        .option("topic", "darooghe.transaction_volume") \
        .option("checkpointLocation", f"{path}") \
        .start()


    transaction_volume_query.awaitTermination(2000)

    spark.stop()

except KeyboardInterrupt:
    spark.stop()

In [None]:
import json
import pandas as pd
from kafka import KafkaConsumer
from dateutil import parser
import plotly.express as px

from datetime import datetime

consumer = KafkaConsumer(
    'darooghe.transaction_volume',
    bootstrap_servers='localhost:9092',
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='visualization-group-new',
    consumer_timeout_ms=30000,
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

data = []
max_messages = 50
print("Collecting transaction volume data...")

for i, msg in enumerate(consumer):
    value = msg.value
    print(f"[{i}] Raw Message:", value)

    try:
        window_start = parser.isoparse(value["window_start"])
        window_end = parser.isoparse(value["window_end"])
        transaction_count = int(value["transaction_count"])

        data.append({
            "window_start": window_start,
            "window_end": window_end,
            "transaction_count": transaction_count
        })
    except Exception as e:
        print(f"Error parsing message #{i}: {e}")
        continue

    if i >= max_messages:
        break

consumer.close()

if not data:
    print("No data received from Kafka topic.")
else:
    df = pd.DataFrame(data)
    df.sort_values(by="window_start", inplace=True)

    fig = px.line(
        df,
        x="window_start",
        y="transaction_count",
        title="Transaction Volume Over Time (1-Minute Windows)",
        labels={"window_start": "Time", "transaction_count": "Transaction Count"},
        markers=True
    )

    fig.update_layout(
        xaxis_title='Time',
        yaxis_title='Number of Transactions',
        template='plotly_dark'
    )
    fig.show()

Collecting transaction volume data...


### II. Merchant Analysis: Show the top 5 merchants based on the number of transactions.

In [None]:
try:
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import from_json, col, count, to_json, struct
    from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
    import os
    import shutil

    os.environ["HADOOP_HOME"] = "C:/hadoop-3.3.6"
    os.environ["hadoop.home.dir"] = "C:/hadoop-3.3.6"

    checkpoint_path = 'C:/Users/Mehrad/spark_checkpoints/merchant'

    if os.path.exists(checkpoint_path):
        shutil.rmtree(checkpoint_path)

    spark = SparkSession.builder \
        .appName("TopMerchants") \
        .config("checkpointLocation", checkpoint_path) \
        .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.5") \
        .getOrCreate()

    kafka_bootstrap_servers = "localhost:9092"
    topic = "darooghe.transactions"

    raw_df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
        .option("subscribe", topic) \
        .option("startingOffsets", "latest") \
        .load()

    transactions_df = raw_df.selectExpr("CAST(value AS STRING)")

    transaction_schema = StructType([
        StructField("transaction_id", StringType()),
        StructField("timestamp", TimestampType()),
        StructField("customer_id", StringType()),
        StructField("merchant_id", StringType()),
        StructField("merchant_category", StringType()),
        StructField("payment_method", StringType()),
        StructField("amount", DoubleType()),
        StructField("location", StructType([
            StructField("lat", DoubleType()),
            StructField("lng", DoubleType())
        ])),
        StructField("device_info", StructType([
            StructField("os", StringType()),
            StructField("app_version", StringType()),
            StructField("device_model", StringType())
        ])),
        StructField("status", StringType()),
        StructField("commission_type", StringType()),
        StructField("commission_amount", DoubleType()),
        StructField("vat_amount", DoubleType()),
        StructField("total_amount", DoubleType()),
        StructField("customer_type", StringType()),
        StructField("risk_level", IntegerType()),
        StructField("failure_reason", StringType())
    ])

    parsed_df = transactions_df.select(
        from_json(col("value"), transaction_schema).alias("data")
    ).select("data.*")

    merchant_counts = parsed_df \
        .groupBy("merchant_id") \
        .agg(count("*").alias("transaction_count"))

    top_merchants_df = merchant_counts \
        .orderBy(col("transaction_count").desc()) \
        .limit(5)

    merchant_output = top_merchants_df.select(
        to_json(struct(
            col("merchant_id"),
            col("transaction_count")
        )).alias("value")
    )

    merchant_query = merchant_output.writeStream \
        .format("kafka") \
        .outputMode("complete") \
        .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
        .option("topic", "darooghe.top_merchants") \
        .option("checkpointLocation", checkpoint_path) \
        .start()

    merchant_query.awaitTermination(1000)
    spark.stop()
except KeyboardInterrupt:
    spark.stop()


In [None]:
import json
import pandas as pd
from kafka import KafkaConsumer
import plotly.express as px

consumer = KafkaConsumer(
    'darooghe.top_merchants',
    bootstrap_servers='localhost:9092',
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='visualization-group-merchant',
    consumer_timeout_ms=15000,
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

data = []
print("Collecting top merchants data...")

for i, msg in enumerate(consumer):
    value = msg.value
    print(f"[{i}] Raw Message:", value)

    try:
        merchant_id = value["merchant_id"]
        transaction_count = int(value["transaction_count"])

        data.append({
            "merchant_id": merchant_id,
            "transaction_count": transaction_count
        })
    except Exception as e:
        print(f"Error parsing message #{i}: {e}")
        continue

consumer.close()

if not data:
    print("No data received from Kafka topic.")
else:
    df = pd.DataFrame(data)
    df = df.groupby("merchant_id").sum().reset_index()

    df = df.sort_values(by="transaction_count", ascending=False).head(5)

    fig = px.bar(
        df,
        x="merchant_id",
        y="transaction_count",
        title="Top 5 Merchants by Transaction Count",
        labels={"merchant_id": "Merchant ID", "transaction_count": "Transaction Count"}
    )

    fig.update_layout(
        xaxis_title='Merchant ID',
        yaxis_title='Number of Transactions',
        template='plotly_dark'
    )

    fig.show()

### III. User Activity: Display metrics such as the number of transactions per user, frequency of activity, and growth trends, offering insights into user engagement.

In [None]:
try:
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import from_json, col, count, to_json, struct, window, min, max, approx_count_distinct
    from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
    import os
    import shutil

    os.environ["HADOOP_HOME"] = "C:/hadoop-3.3.6"
    os.environ["hadoop.home.dir"] = "C:/hadoop-3.3.6"

    checkpoint_path = 'C:/Users/Mehrad/spark_checkpoints/user_activity'

    if os.path.exists(checkpoint_path):
        shutil.rmtree(checkpoint_path)

    spark = SparkSession.builder \
        .appName("UserActivity") \
        .config("checkpointLocation", checkpoint_path) \
        .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.5") \
        .getOrCreate()

    kafka_bootstrap_servers = "localhost:9092"
    topic = "darooghe.transactions"

    raw_df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
        .option("subscribe", topic) \
        .option("startingOffsets", "latest") \
        .load()

    transactions_df = raw_df.selectExpr("CAST(value AS STRING)")

    transaction_schema = StructType([
        StructField("transaction_id", StringType()),
        StructField("timestamp", TimestampType()),
        StructField("customer_id", StringType()),
        StructField("merchant_id", StringType()),
        StructField("merchant_category", StringType()),
        StructField("payment_method", StringType()),
        StructField("amount", DoubleType()),
        StructField("location", StructType([
            StructField("lat", DoubleType()),
            StructField("lng", DoubleType())
        ])),
        StructField("device_info", StructType([
            StructField("os", StringType()),
            StructField("app_version", StringType()),
            StructField("device_model", StringType())
        ])),
        StructField("status", StringType()),
        StructField("commission_type", StringType()),
        StructField("commission_amount", DoubleType()),
        StructField("vat_amount", DoubleType()),
        StructField("total_amount", DoubleType()),
        StructField("customer_type", StringType()),
        StructField("risk_level", IntegerType()),
        StructField("failure_reason", StringType())
    ])

    parsed_df = transactions_df.select(
        from_json(col("value"), transaction_schema).alias("data")
    ).select("data.*")

    user_activity_df = parsed_df \
        .withWatermark("timestamp", "2 minutes") \
        .groupBy(
            window(col("timestamp"), "5 minutes", "1 minute"),
            col("customer_id")
        ).agg(
            count("*").alias("transaction_count"),
            min("timestamp").alias("first_transaction_time"),
            max("timestamp").alias("last_transaction_time")
        )

    user_activity_output = user_activity_df.select(
        to_json(struct(
            col("window.start").alias("window_start"),
            col("window.end").alias("window_end"),
            col("customer_id"),
            col("transaction_count"),
            col("first_transaction_time"),
            col("last_transaction_time")
        )).alias("value")
    )

    user_activity_query = user_activity_output.writeStream \
        .format("kafka") \
        .outputMode("update") \
        .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
        .option("topic", "darooghe.user_activity") \
        .option("checkpointLocation", checkpoint_path) \
        .start()

    user_activity_query.awaitTermination(1000)
    spark.stop()
except KeyboardInterrupt:
    spark.stop()

In [None]:
import json
import pandas as pd
from kafka import KafkaConsumer
import plotly.express as px

consumer = KafkaConsumer(
    'darooghe.user_activity',
    bootstrap_servers='localhost:9092',
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='visualization-group-user-activity',
    consumer_timeout_ms=15000,
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

data = []
print("Collecting user activity data...")

i = 0

for i, msg in enumerate(consumer):
    value = msg.value
    print(f"[{i}] Raw Message:", value)

    try:
        window_start = value["window_start"]
        window_end = value["window_end"]
        customer_id = value["customer_id"]
        transaction_count = int(value["transaction_count"])
        first_transaction_time = value["first_transaction_time"]
        last_transaction_time = value["last_transaction_time"]

        data.append({
            "window_start": window_start,
            "window_end": window_end,
            "customer_id": customer_id,
            "transaction_count": transaction_count,
            "first_transaction_time": first_transaction_time,
            "last_transaction_time": last_transaction_time
        })
    except Exception as e:
        print(f"Error parsing message #{i}: {e}")
        continue

    if i >= 10000:
        break

consumer.close()

if not data:
    print("No data received from Kafka topic.")
else:
    df = pd.DataFrame(data)
    df["window_start"] = pd.to_datetime(df["window_start"])
    df["window_end"] = pd.to_datetime(df["window_end"])

    top_5_users = df.groupby('customer_id')['transaction_count'].sum().sort_values(ascending=False).head(5).index
    df_top_5 = df[df['customer_id'].isin(top_5_users)]

    df_top_5 = df_top_5.head(10000)

    fig = px.line(
        df_top_5,
        x="window_start",
        y="transaction_count",
        color="customer_id",
        title="Top 5 User Activity: Transaction Count Over Time",
        labels={"window_start": "Time", "transaction_count": "Transaction Count"},
        markers=True
    )

    fig.update_layout(
        xaxis_title='Time',
        yaxis_title='Number of Transactions',
        template='plotly_dark'
    )

    fig.show()


## Bonus

### Temporal Analysis

#### I. Identify transactions occurring outside the merchant's local business hours.

In [None]:
import os
import shutil
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, hour, when, to_json, struct
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

MERCHANT_HOURS = {
    'food_service': (8, 22),
    'retail': (9, 21),
    'entertainment': (10, 23),
    'transport': (0, 23)
}

os.environ["HADOOP_HOME"] = "C:/hadoop-3.3.6"
os.environ["hadoop.home.dir"] = "C:/hadoop-3.3.6"

checkpoint_path = 'C:/Users/Mehrad/spark_checkpoints/out_of_time'
if os.path.exists(checkpoint_path):
    shutil.rmtree(checkpoint_path)

spark = SparkSession.builder \
    .appName("RealTimeFraudDetection") \
    .config("checkpointLocation", checkpoint_path) \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.5") \
    .getOrCreate()

kafka_bootstrap_servers = "localhost:9092"
topic = "darooghe.transactions"

transaction_schema = StructType([
    StructField("transaction_id", StringType()),
    StructField("timestamp", TimestampType()),
    StructField("merchant_category", StringType()),
    StructField("merchant_id", StringType()),
    StructField("amount", DoubleType())
])

raw_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", topic) \
    .option("startingOffsets", "latest") \
    .load()

transactions_df = raw_df.selectExpr("CAST(value AS STRING)")

parsed_df = transactions_df.select(
    from_json(col("value"), transaction_schema).alias("data")
).select("data.*")

df = parsed_df.withColumn("hour", hour(col("timestamp")))

condition = None
for category, (start_hr, end_hr) in MERCHANT_HOURS.items():
    c = (col("merchant_category") == category) & (~col("hour").between(start_hr, end_hr))
    condition = c if condition is None else condition | c

df_flagged = df.withColumn(
    "outside_business_hours", when(condition, True).otherwise(False)
).filter(col("outside_business_hours") == True)

fraudulent_output = df_flagged.select(
    to_json(struct(
        col("transaction_id"),
        col("timestamp"),
        col("merchant_id"),
        col("merchant_category"),
        col("amount"),
        col("hour"),
        col("outside_business_hours")
    )).alias("value")
)

query = fraudulent_output.writeStream \
    .format("kafka") \
    .outputMode("append") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("topic", "darooghe.out_of_time") \
    .option("checkpointLocation", checkpoint_path) \
    .start()

query.awaitTermination(100)

spark.stop()


False

#### II. Identify which part of the day, each merchant category is most active.

In [None]:
import os
import shutil
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, hour, count, to_json, struct, window
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

os.environ["HADOOP_HOME"] = "C:/hadoop-3.3.6"
os.environ["hadoop.home.dir"] = "C:/hadoop-3.3.6"

checkpoint_path = "C:/Users/Mehrad/spark_checkpoints/merchant_hour_activity"
if os.path.exists(checkpoint_path):
    shutil.rmtree(checkpoint_path)

spark = SparkSession.builder \
    .appName("DailyMerchantHourActivity") \
    .config("checkpointLocation", checkpoint_path) \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.5") \
    .getOrCreate()

kafka_bootstrap_servers = "localhost:9092"
topic = "darooghe.transactions"

schema = StructType([
    StructField("transaction_id", StringType()),
    StructField("timestamp", TimestampType()),
    StructField("merchant_category", StringType()),
    StructField("merchant_id", StringType()),
    StructField("amount", DoubleType())
])

raw_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", topic) \
    .option("startingOffsets", "latest") \
    .load()

transactions_df = raw_df.selectExpr("CAST(value AS STRING)")

parsed_df = transactions_df.select(
    from_json(col("value"), schema).alias("data")
).select("data.*")

df = parsed_df.withColumn("hour", hour(col("timestamp")))

activity_df = df.withWatermark("timestamp", "2 hours") \
    .groupBy(
        window(col("timestamp"), "1 day"),
        col("merchant_category"),
        col("hour")
    ).agg(
        count("transaction_id").alias("transaction_count")
    )

output_df = activity_df.select(
    to_json(struct(
        col("window.start").alias("window_start"),
        col("window.end").alias("window_end"),
        col("merchant_category"),
        col("hour"),
        col("transaction_count")
    )).alias("value")
)

query = output_df.writeStream \
    .format("kafka") \
    .outputMode("update") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("topic", "darooghe.merchant_hour_activity") \
    .option("checkpointLocation", checkpoint_path) \
    .start()

query.awaitTermination(100)

spark.stop()


#### III. Identify merchants whose business faces sudden transaction spikes.