## Introduction

In this project, we design and implement a real-time payment data pipeline for Darooghe, a payment service provider facilitating transactions through an online gateway, POS devices, mobile applications, and NFC (contactless) payments. The primary objectives are to ingest transaction events at scale, validate and process them in real time, detect and flag fraudulent activity, compute commission metrics, and store both raw and aggregated data for historical analysis and visualization. This architecture ensures high-throughput, low-latency handling of payment events, enabling Darooghe to monitor transaction flows and business insights continuously

### Apache Kafka
Apache Kafka is an open-source distributed event streaming platform used here as the ingestion layer. It provides a durable, fault-tolerant backbone for capturing high volumes of transaction events from a synthetic transaction generator. Transactions are published to Kafka topics (darooghe.transactions), where they can be consumed by downstream systems. Kafka’s scalability and partitioning model ensure that our pipeline can handle spikes in transaction rates with minimal latency

<div style="text-align: center;">
    <img src="additional_files\image1.png" alt="Roulette Wheel" style="width: 50%;">
</div>

### Apache Spark
Apache Spark is a unified analytics engine for large-scale data processing. In this project, we employ both Spark Streaming for real-time (micro-batch) processing of incoming Kafka events and PySpark for batch analytics on stored data. Spark Streaming reads from Kafka, performs windowed aggregations, applies fraud-detection rules, and calculates commission metrics on the fly. PySpark batch jobs analyze historical transaction data to generate reports on commission efficiency, temporal patterns, and customer segmentation


<div style="text-align: center;">
    <img src="additional_files\Picture5-2.png" alt="Roulette Wheel" style="width: 50%;">
</div>

### MongoDB
MongoDB is a NoSQL, document-oriented database chosen for its flexible schema and horizontal scalability. We use MongoDB to store validated transactions and aggregated insights. A partitioning strategy (e.g., by date or merchant) is employed to optimize query performance and data retention policies. Aggregated collections (daily, weekly, monthly summaries) support efficient historical analysis and dashboard visualizations without the overhead of re-processing raw event streams

## Import Required Libraries

In [2]:
import json
import datetime
import logging
from confluent_kafka import Consumer, Producer
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum as _sum, avg, count, col, udf, stddev, hour, date_format, when, to_date
from pyspark.sql.types import DoubleType
from pymongo import MongoClient
from datetime import datetime, timedelta

## Environment Setup

To initialize the local Kafka-based ingestion pipeline and monitoring UI, follow these steps:

1. **Start ZooKeeper**  

   Navigate to your Kafka installation directory and run:  

   ```bash
   bin/zookeeper-server-start.sh config/zookeeper.properties
   ```

2. **Start Kafka Broker**  

   In a new terminal (still inside the Kafka directory), launch the broker:  
   
   ```bash
   bin/kafka-server-start.sh config/server.properties
   ```

3. **Launch Kafdrop Monitoring UI**  

   Open a separate terminal, navigate to your Kafdrop installation folder, and execute:  
   
   ```bash
   java -jar kafdrop-4.1.0.jar --kafka.brokerConnect=localhost:9092
   ```

4. **Access the UI**  

   Open your browser and go to  

   ```
   http://localhost:9000
   ```  
   Here you can explore all Kafka topics, view partitions, offsets, and inspect individual messages.

5. **Run the Transaction Generator**  

   Finally, execute your Python producer script to start generating synthetic transactions: 

   ```bash
   python darooghe_pulse.py
   
   ```  
   As messages are produced to the `darooghe.transactions` topic, you’ll see them appear in real time within the Kafdrop UI—complete with headers, payloads, and partition metadata.

   <div style="text-align: center;">
      <img src="additional_files\Screenshot from 2025-04-26 14-11-41.png" alt="Roulette Wheel" style="width: 70%;">
   </div>

## Data Ingestion Layer

### Kafka Consumer Implementation

In [12]:
def validate_transaction(event):
    errors = []
    try:
        total_expected = event['amount'] + event['vat_amount'] + event['commission_amount']
        if event['total_amount'] != total_expected:
            errors.append('ERR_AMOUNT')
    except:
        errors.append('ERR_AMOUNT')

    try:
        now = datetime.datetime.utcnow()
        ts = datetime.datetime.fromisoformat(event['timestamp'].replace("Z", ""))
        day_per_sec = 86400
        if ts > now or (now - ts).total_seconds() > day_per_sec:
            errors.append('ERR_TIME')
    except:
        errors.append('ERR_TIME')

    try:
        if event['payment_method'] == 'mobile':
            os = event.get('device_info', {}).get('os', None)
            if os not in ['Android', 'iOS']:
                errors.append('ERR_DEVICE')
    except:
        errors.append('ERR_DEVICE')

    return errors

In [4]:
logging.basicConfig(level=logging.INFO)

consumer_conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'darooghe-consumer',
    'auto.offset.reset': 'earliest',
}
consumer = Consumer(consumer_conf)
consumer.subscribe(['darooghe.transactions'])

producer = Producer({'bootstrap.servers': 'localhost:9092'})

try:
    while True:
        msg = consumer.poll(1.0)
        if msg is None:
            continue
        if msg.error():
            logging.error(f"Consumer error: {msg.error()}")
            continue

        try:
            event = json.loads(msg.value().decode('utf-8'))
            # raw_event = json.loads(msg.value().decode('utf-8'))
            # event = parse_transaction(raw_event)
            transaction_id = event.get('transaction_id', 'unknown')

            errors = validate_transaction(event)

            if errors:
                error_msg = {
                    'transaction_id': transaction_id,
                    'errors': errors,
                    'original_data': event
                }
                producer.produce(
                    'darooghe.error_logs',
                    key=transaction_id,
                    value=json.dumps(error_msg, default=str)
                )
                logging.warning(f"Invalid transaction: {transaction_id}, errors: {errors}")
            else:
                logging.info(f"Valid transaction: {transaction_id}")

        except Exception as e:
            logging.error(f"Parse error: {e}")

        producer.poll(0)

except KeyboardInterrupt:
    logging.info("Shutting down...")

finally:
    consumer.close()
    producer.flush()

In [11]:
consumer_conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'darooghe-consumer',
    'auto.offset.reset': 'earliest',
}
consumer = Consumer(consumer_conf)
consumer.subscribe(['darooghe.transactions'])

try:
    msg = consumer.poll(timeout=10.0)
    if msg is None:
        print("No message received.")
    elif msg.error():
        print(f"Error: {msg.error()}")
    else:
        event = json.loads(msg.value().decode('utf-8'))
        # raw_event = json.loads(msg.value().decode('utf-8'))
        # event = parse_transaction(raw_event)
        print("Received message:")
        print(json.dumps(event, indent=2))

        errors = validate_transaction(event)
        if errors:
            print("Validation Errors:", errors)
        else:
            print("Message is VALID!")

finally:
    consumer.close()


Received message:
{
  "transaction_id": "73173b37-edad-4e17-9fe1-48305b1a2410",
  "timestamp": "2025-04-20T15:43:30.033099Z",
  "customer_id": "cust_222",
  "merchant_id": "merch_28",
  "merchant_category": "entertainment",
  "payment_method": "online",
  "amount": 1459060,
  "location": {
    "lat": 35.73375549866577,
    "lng": 51.30714798144749
  },
  "device_info": {
    "os": "iOS",
    "app_version": "3.1.0",
    "device_model": "iPhone 15"
  },
  "status": "approved",
  "commission_type": "flat",
  "commission_amount": 29181,
  "vat_amount": 131315,
  "total_amount": 1619556,
  "customer_type": "business",
  "risk_level": 1,
  "failure_reason": null
}
Validation Errors: ['ERR_TIME']


### Schema Management

This helper function takes a raw JSON-like event (raw_event) from Kafka and:

- **Extracts** each expected field (IDs, categories, status, etc.).

- **Parses** the ISO-formatted timestamp (stripping the trailing “Z” and re-serializing to ISO).

- **Casts** numeric values (amounts, VAT, risk level) to int and coordinates to float.

- **Preserves** optional fields (device_info, failure_reason) with sensible defaults.

- **Returns** a normalized dict with all fields in the right Python types.

- **Handles** missing or malformed data by catching KeyError, ValueError, or TypeError, logging a schema error, and returning None so invalid events can be skipped or re-published to an error topic.

In [13]:
def parse_transaction(raw_event):
    try:
        event = {}
        event['transaction_id'] = str(raw_event['transaction_id'])
        event['timestamp'] = datetime.datetime.fromisoformat(
            raw_event['timestamp'].replace('Z', '')
        ).isoformat()
        event['customer_id'] = str(raw_event['customer_id'])
        event['merchant_id'] = str(raw_event['merchant_id'])
        event['merchant_category'] = str(raw_event['merchant_category'])
        event['payment_method'] = str(raw_event['payment_method'])
        event['amount'] = int(raw_event['amount'])
        event['location'] = {
            'lat': float(raw_event['location']['lat']),
            'lng': float(raw_event['location']['lng']),
        }
        event['device_info'] = raw_event.get('device_info', {})
        event['status'] = str(raw_event['status'])
        event['commission_type'] = str(raw_event['commission_type'])
        event['commission_amount'] = int(raw_event['commission_amount'])
        event['vat_amount'] = int(raw_event['vat_amount'])
        event['total_amount'] = int(raw_event['total_amount'])
        event['customer_type'] = str(raw_event['customer_type'])
        event['risk_level'] = int(raw_event['risk_level'])
        event['failure_reason'] = raw_event.get('failure_reason', None)
        return event
    except (KeyError, ValueError, TypeError) as e:
        print(f"Schema error: {e}")
        return None


## Batch Processing Layer

### Collect data from Kafka (once) and save to a local file (e.g., JSON)

In [14]:
consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'batch-job-consumer',
    'auto.offset.reset': 'earliest',
})
consumer.subscribe(['darooghe.transactions'])

with open('transactions_data.json', 'w') as f:
    for _ in range(10000):
        msg = consumer.poll(1.0)
        if msg is None or msg.error():
            continue
        raw_event = msg.value().decode('utf-8')
        f.write(raw_event + '\n')

consumer.close()

### Use PySpark to analyze that saved file

The computed metrics are defined by the following formulas:

- **Total Commission** per category:  
  $$ \text{total\_commission} = \sum_{i=1}^{N} \text{commission\_amount}_i $$

- **Average Commission** per transaction:  
  $$ \text{avg\_commission} = \frac{1}{N} \sum_{i=1}^{N} \text{commission\_amount}_i $$

- **Transaction Count**:  
  $$ \text{transaction\_count} = N $$

- **Average Commission Ratio**:  
  $$ \text{avg\_commission\_ratio} = \frac{1}{N} \sum_{i=1}^{N} \frac{\text{commission\_amount}_i}{\text{total\_amount}_i} $$


In [15]:
spark = SparkSession.builder \
    .appName("CommissionAnalysisBatchJob") \
    .getOrCreate()

transactions_df = spark.read.json("transactions_data.json")

aggregated_metrics = transactions_df.groupBy("merchant_category").agg(
    _sum("commission_amount").alias("total_commission"),
    avg("commission_amount").alias("avg_commission"),
    count("*").alias("transaction_count"),
    avg(col("commission_amount") / col("total_amount")).alias("avg_commission_ratio")
)

aggregated_metrics.show(truncate=False)


spark.stop()

25/04/26 14:55:26 WARN Utils: Your hostname, amir-Lenovo resolves to a loopback address: 127.0.1.1; using 192.168.217.64 instead (on interface wlp8s0)
25/04/26 14:55:26 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/26 14:55:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

+-----------------+----------------+------------------+-----------------+--------------------+
|merchant_category|total_commission|avg_commission    |transaction_count|avg_commission_ratio|
+-----------------+----------------+------------------+-----------------+--------------------+
|retail           |41004263        |20563.82296890672 |1994             |0.018017193139572094|
|entertainment    |41893100        |20515.71988246817 |2042             |0.018017257715458642|
|food_service     |41074509        |20881.804270462635|1967             |0.018017220475492927|
|government       |41840033        |20784.914555389965|2013             |0.018017223352158204|
|transportation   |41127958        |20729.81754032258 |1984             |0.018017249509518204|
+-----------------+----------------+------------------+-----------------+--------------------+



1. **Load historical data**  
   We read all past transactions from the JSON file into a Spark DataFrame.

2. **Define commission models**  

   - **Flat**: a constant 2% rate.  

   - **Progressive**: tiered two-rate model (1.5% up to 1 000 000; 3% thereafter). 

   - **Tiered**: base 2% plus a bonus 0.5% on any amount above 1 500 000.

3. **UDF registration**  

   By wrapping each Python function in udf(..., DoubleType()), Spark can apply them across the cluster.

4. **Simulate commissions**  

   We add three new columns (flat_commission, etc.) to the DataFrame, each holding that model’s computed value for the transaction’s amount.

5. **Aggregate results**  

   Grouping by merchant_category, we sum:  

   - The **actual** commissions already collected (commission_amount).  

   - The **simulated** totals under each of the three models.  

6. **Compare**  

   The resulting table lets you see, for each merchant category, how much commission Darooghe actually earned versus what each model would have yielded—enabling you to identify the **optimal commission structure**.

In [17]:
spark = SparkSession.builder.appName("CommissionModelSimulation").getOrCreate()

df = spark.read.json("transactions_data.json")


def flat_commission(amount):
    return amount * 0.02

def progressive_commission(amount):
    return amount * 0.015 if amount <= 1_000_000 else amount * 0.03

def tiered_commission(amount):
    base = amount * 0.02
    extra = amount * 0.005 if amount > 1_500_000 else 0
    return base + extra

# Register UDF(user defined function)
flat_udf = udf(flat_commission, DoubleType())
progressive_udf = udf(progressive_commission, DoubleType())
tiered_udf = udf(tiered_commission, DoubleType())

# Apply each model
simulated = df.withColumn("flat_commission", flat_udf(col("amount"))) \
              .withColumn("progressive_commission", progressive_udf(col("amount"))) \
              .withColumn("tiered_commission", tiered_udf(col("amount")))

# Aggregate per merchant_category
results = simulated.groupBy("merchant_category").agg(
    _sum("commission_amount").alias("actual_total_commission"),
    _sum("flat_commission").alias("sim_flat_total"),
    _sum("progressive_commission").alias("sim_progressive_total"),
    _sum("tiered_commission").alias("sim_tiered_total")
)


results.show(truncate=False)

spark.stop()

                                                                                

+-----------------+-----------------------+--------------------+---------------------+--------------------+
|merchant_category|actual_total_commission|sim_flat_total      |sim_progressive_total|sim_tiered_total    |
+-----------------+-----------------------+--------------------+---------------------+--------------------+
|retail           |41004263               |4.1005229539999984E7|5.405845412999993E7  |4.568187095999997E7 |
|entertainment    |41893100               |4.189408534000007E7 |5.526702296999998E7  |4.637942272000006E7 |
|food_service     |41074509               |4.107548939999999E7 |5.4409329929999955E7 |4.5779860334999956E7|
|government       |41840033               |4.1841009099999994E7|5.5230637050000004E7 |4.6578242445E7      |
|transportation   |41127958               |4.112893235999997E7 |5.416426744499999E7  |4.549861609499994E7 |
+-----------------+-----------------------+--------------------+---------------------+--------------------+



### Transaction Pattern Analysis

Analyze the relationship between payment_method and amount to see if there's a pattern.

Analyze how each payment_method differs in terms of transaction volume and amount—by grouping on payment_method we compute:

- **num_transactions**: how often each method is used  

- **total_amount**: overall spend per method  

- **average_amount**: typical transaction size  

- **stddev_amount**: variability in spend

This lets us see if, for example, mobile payments tend to be smaller but more frequent, or POS transactions are higher-value but less common.

In [18]:
spark = SparkSession.builder.appName("TransactionPatternAnalysis").getOrCreate()

df = spark.read.json("transactions_data.json")

payment_stats = df.groupBy("payment_method").agg(
    count("*").alias("num_transactions"),
    _sum("amount").alias("total_amount"),
    avg("amount").alias("average_amount"),
    stddev("amount").alias("stddev_amount")
)

payment_stats.show(truncate=False)
spark.stop()

+--------------+----------------+------------+------------------+-----------------+
|payment_method|num_transactions|total_amount|average_amount    |stddev_amount    |
+--------------+----------------+------------+------------------+-----------------+
|online        |2498            |2515074463  |1006835.2534027222|570523.1693378846|
|pos           |2490            |2629182500  |1055896.5863453816|563076.6858889284|
|mobile        |2555            |2655173846  |1039206.9847358122|561859.2174693045|
|nfc           |2457            |2547806478  |1036958.2735042735|569215.1451068346|
+--------------+----------------+------------+------------------+-----------------+



Extract the busiest time by:

1. **Hour of day** 

   - Extract hour from each timestamp, group by hour, and count transactions (num_transactions).

   - This shows you which hours (e.g., 14:00–15:00) see the most activity.

2. **Day of week**  

   - Extract day_of_week (e.g., Monday, Tuesday) from each timestamp, group by it, and count transactions.  

   - This reveals which weekdays or weekends are the busiest.

Together, these metrics pinpoint daily and weekly peak transaction periods for capacity planning and targeted promotions.

In [19]:
spark = SparkSession.builder.appName("PeakTransactionTimes").getOrCreate()

df = spark.read.json("transactions_data.json")

df = df.withColumn("hour", hour("timestamp")) \
       .withColumn("day_of_week", date_format("timestamp", "EEEE")) 


hourly_stats = df.groupBy("hour").agg(
    count("*").alias("num_transactions")
).orderBy("hour")

daily_stats = df.groupBy("day_of_week").agg(
    count("*").alias("num_transactions")
)

print("Transactions per hour of day:")
hourly_stats.show(24)

print("Transactions per day of week:")
daily_stats.show()

spark.stop()

Transactions per hour of day:
+----+----------------+
|hour|num_transactions|
+----+----------------+
|   0|             380|
|   1|             400|
|   2|             469|
|   3|             463|
|   4|             447|
|   5|             401|
|   6|             408|
|   7|             429|
|   8|             387|
|   9|             452|
|  10|             413|
|  11|             385|
|  12|             423|
|  13|             419|
|  14|             401|
|  15|             433|
|  16|             418|
|  17|             394|
|  18|             399|
|  19|             411|
|  20|             385|
|  21|             439|
|  22|             410|
|  23|             434|
+----+----------------+

Transactions per day of week:
+-----------+----------------+
|day_of_week|num_transactions|
+-----------+----------------+
|  Wednesday|            1425|
|    Tuesday|            1452|
|     Friday|            1414|
|   Thursday|            1438|
|   Saturday|            1419|
|     Monday|      

Group customers by their transaction behavior, then:

1. **Compute per-customer stats**  

   - **num_transaction**: how many times they paid  

   - **total_spen**: sum of all their transaction amounts  

   - **avg_spen**: average transaction size  

2. **Assign segments**  

   - **power_user** if they made more than 19 transactions  

   - **big_spender** if they spent over 22 000 000 IRR in total  

   - **regular** otherwise  

3. **Review top customers**  

   - Order by total_spent descending and show the top 10 to see our highest-value segments.  

This segmentation lets us tailor marketing or loyalty programs to frequent users (“power users”) and high-value customers (“big spenders”), while treating the rest as our baseline “regular” group.

In [20]:
spark = SparkSession.builder.appName("CustomerSegmentation").getOrCreate()

df = spark.read.json("transactions_data.json")

customer_stats = df.groupBy("customer_id").agg(
    count("*").alias("num_transactions"),
    _sum("amount").alias("total_spent"),
    avg("amount").alias("avg_spent")
)

segmented = customer_stats.withColumn(
    "segment",
    when(col("num_transactions") > 19, "power_user")
    .when(col("total_spent") > 22_000_000, "big_spender")
    .otherwise("regular")
)

segmented.orderBy("total_spent", ascending=False).show(10, truncate=False)

spark.stop()

+-----------+----------------+-----------+------------------+-----------+
|customer_id|num_transactions|total_spent|avg_spent         |segment    |
+-----------+----------------+-----------+------------------+-----------+
|cust_416   |21              |25672339   |1222492.3333333333|power_user |
|cust_748   |19              |22247687   |1170930.894736842 |big_spender|
|cust_140   |16              |21869193   |1366824.5625      |regular    |
|cust_566   |16              |21178241   |1323640.0625      |regular    |
|cust_951   |17              |21060735   |1238866.7647058824|regular    |
|cust_531   |16              |20774359   |1298397.4375      |regular    |
|cust_918   |17              |20569158   |1209950.4705882352|regular    |
|cust_709   |18              |20010541   |1111696.7222222222|regular    |
|cust_899   |17              |19680791   |1157693.5882352942|regular    |
|cust_600   |19              |19632924   |1033311.7894736842|regular    |
+-----------+----------------+--------

By grouping on **merchant_category**, this snippet lets you compare:

- **num_transactions**: How many transactions each category handles  

- **avg_amount**: The typical transaction size per category  

- **total_amount**: The overall spend flowing through each category  

- **avg_risk_level**: The average fraud-risk score assigned to transactions in each category  

Ordering by num_transactions shows you which merchant categories drive the most volume, while the other metrics reveal differences in spend size and risk profile—helping you identify, for example, that “food_service” may be high-volume but low-value, whereas “entertainment” sees fewer transactions at higher amounts and risk.

In [21]:
spark = SparkSession.builder.appName("MerchantCategoryComparison").getOrCreate()
df = spark.read.json("transactions_data.json")

category_stats = df.groupBy("merchant_category").agg(
    count("*").alias("num_transactions"),
    avg("amount").alias("avg_amount"),
    _sum("amount").alias("total_amount"),
    avg("risk_level").alias("avg_risk_level")
)

category_stats.orderBy("num_transactions", ascending=False).show(truncate=False)

spark.stop() 

+-----------------+----------------+------------------+------------+------------------+
|merchant_category|num_transactions|avg_amount        |total_amount|avg_risk_level    |
+-----------------+----------------+------------------+------------+------------------+
|entertainment    |2042            |1025810.1209598433|2094704267  |2.0514201762977473|
|government       |2013            |1039269.9726775957|2092050455  |2.0665673124689516|
|retail           |1994            |1028215.3846539619|2050261477  |2.083751253761284 |
|transportation   |1984            |1036515.4324596775|2056446618  |2.0403225806451615|
|food_service     |1967            |1044115.1347229283|2053774470  |2.072191154041688 |
+-----------------+----------------+------------------+------------+------------------+



Bucket each transaction into a part of the day (morning, afternoon, evening, night) based on its hour, then compute:

- **num_transactions**: how many transactions occur in each bucket  

- **avg_amount**: the average transaction size in each bucket  

This reveals which parts of the day—say, “morning” versus “evening”—drive the most activity and where transaction values tend to be higher.

In [24]:
spark = SparkSession.builder \
    .appName("PeakTransactionTimes") \
    .getOrCreate()

df = spark.read.json("transactions_data.json")

df = df.withColumn("hour", hour("timestamp")) \
       .withColumn("day_part",
           when(col("hour").between(5, 11), "morning")
          .when(col("hour").between(12, 17), "afternoon")
          .when(col("hour").between(18, 23), "evening")
          .otherwise("night")
       )

day_part_stats = df.groupBy("day_part").agg(
    count("*").alias("num_transactions"),
    avg("amount").alias("avg_amount")
)

day_part_stats.orderBy("day_part").show()

spark.stop()


+---------+----------------+------------------+
| day_part|num_transactions|        avg_amount|
+---------+----------------+------------------+
|afternoon|            2488|1026659.3954983923|
|  evening|            2478|1037492.3644067796|
|  morning|            2875|1038253.6135652174|
|    night|            2159|1036138.7183881426|
+---------+----------------+------------------+



By extracting the **date** from each transaction’s timestamp and then grouping by that date, this code computes:

- **total_amount**: total spend per day  

- **num_transactions**: number of transactions per day 

- **avg_amount**: average transaction size per day  

Ordering by date shows us the daily time series, so we can easily see whether overall spending (or transaction volume/size) is trending up or down over time.

In [27]:
spark = SparkSession.builder \
    .appName("DailySpendingTrend") \
    .getOrCreate()

df = spark.read.json("transactions_data.json")

df = df.withColumn("date", to_date("timestamp"))

daily_trend = df.groupBy("date").agg(
    _sum("amount").alias("total_amount"),
    count("*").alias("num_transactions"),
    avg("amount").alias("avg_amount")
)

daily_trend.orderBy("date").show(truncate=False)

spark.stop()


+----------+------------+----------------+------------------+
|date      |total_amount|num_transactions|avg_amount        |
+----------+------------+----------------+------------------+
|2025-04-19|735824056   |728             |1010747.3296703297|
|2025-04-20|1457558096  |1418            |1027897.1057827927|
|2025-04-21|1496528030  |1445            |1035659.5363321799|
|2025-04-22|1525335221  |1449            |1052681.3119392684|
|2025-04-23|1510470003  |1435            |1052592.336585366 |
|2025-04-24|1508634479  |1444            |1044760.7195290859|
|2025-04-25|1433101752  |1410            |1016384.2212765957|
|2025-04-26|679785650   |671             |1013093.3681073026|
+----------+------------+----------------+------------------+



### Data Storage Implementation

Load all transactions into MongoDB, while:

- **Connecting** to a local MongoDB server and selecting the `darooghe_db` database and `transactions` collection.  

- **Reading** each event from the `transactions_data.json` file line-by-line.  

- **Parsing** the `timestamp` field to a Python `datetime` and **adding** a new `date` field (only the day part) to each event.

- **Inserting** the modified event into MongoDB.

The new `date` field acts as a **partition key**, making it easier and faster to query data by day (for example, fetching only today's or yesterday's transactions).

This setup improves scalability and allows efficient filtering by time when working with large datasets.


In [29]:
# first run "mongod --dbpath ~/mongodb-data" on cmd

client = MongoClient("mongodb://localhost:27017/")
db = client["darooghe_db"]
collection = db["transactions"]

with open("transactions_data.json") as f:
    for line in f:
        event = json.loads(line)

        timestamp = datetime.fromisoformat(event["timestamp"].replace("Z", ""))
        event["timestamp"] = timestamp
        event["date"] = timestamp.date().isoformat()  # for partitioning

        collection.insert_one(event)

print("Data inserted into MongoDB")


Data inserted into MongoDB


Connect to MongoDB and **enforce a data retention policy** by:

- Defining a **cutoff_time** (current time minus 24 hours).

- Deleting all transactions where the `timestamp` is **older than 24 hours**.

- Printing how many outdated documents were removed.

This keeps the database clean and ensures only recent, detailed transaction data is stored.

In [30]:
client = MongoClient("mongodb://localhost:27017/")
db = client["darooghe_db"]
collection = db["transactions"]

# Define retention period: 24 hours (1 day)
cutoff_time = datetime.utcnow() - timedelta(days=1)

# Delete all with a timestamp older than the cutoff
delete_result = collection.delete_many({"timestamp": {"$lt": cutoff_time}})
print(f"Deleted {delete_result.deleted_count} documents older than {cutoff_time.isoformat()}")

Deleted 18633 documents older than 2025-04-25T12:08:21.058660


Connect to MongoDB and **create an aggregated daily commission dataset** by:

- Grouping transactions by `merchant_id` and `date`.

- Calculating total commission, number of transactions, and total amount per merchant per day.

- Saving the result into a new collection called `merchant_daily_commissions`.

This aggregation summarizes detailed data and makes long-term trend analysis much faster.

---

#### To check the results:

1. First, open your MongoDB shell by running:


    ```bash

    mongosh mongodb://localhost:27017

    ```

2. Then switch to your database:

    ```mongodb

    use darooghe_db

    ```

3. Finally, view a few documents from the new collection:

    ```mongodb

    db.merchant_daily_commissions.find().limit(5).pretty()

    ```

This will show you a clean JSON view of the first 5 aggregated records!

In [31]:
client = MongoClient("mongodb://localhost:27017/")
db = client["darooghe_db"]

# Aggregation pipeline to compute daily summary per merchant
pipeline = [
    {
        "$group": {
            "_id": {
                "merchant_id": "$merchant_id",
                "date": "$date"
            },
            "total_commission": {"$sum": "$commission_amount"},
            "transaction_count": {"$sum": 1},
            "total_amount": {"$sum": "$amount"}
        }
    },
    {
        "$project": {
            "_id": 0,
            "merchant_id": "$_id.merchant_id",
            "date": "$_id.date",
            "total_commission": 1,
            "transaction_count": 1,
            "total_amount": 1
        }
    },
    {
        "$out": "merchant_daily_commissions"
    }
]


db.transactions.aggregate(pipeline)
print("Aggregated commission data saved to 'merchant_daily_commissions' collection.")



Aggregated commission data saved to 'merchant_daily_commissions' collection.
