<a href="https://colab.research.google.com/github/adhithyyaa/23BCS050_DATA_PROCESSING_CHALLENGE/blob/main/23BCS050_In_Memory_Data_Processing_Challenge.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:

import pandas as pd
import random
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, sum

# ---------------------- Step 1: Initialize Spark ----------------------
spark = (
    SparkSession.builder
    .appName("InMemoryDataProcessing")
    .master("local[*]")
    .getOrCreate()
)
spark.sparkContext.setLogLevel("WARN")
print("✅ Spark session initialized successfully.\n")

# ---------------------- Step 2: Generate Synthetic Dataset ----------------------
N = 1_000_000  # 1 million records

data = pd.DataFrame({
    "customer_id": [random.randint(1, 100000) for _ in range(N)],
    "transaction_amount": [round(random.uniform(5, 500), 2) for _ in range(N)],
    "category": [random.choice(["A", "B", "C", "D"]) for _ in range(N)],
    "age": [random.randint(18, 70) for _ in range(N)]
})

print(f"✅ Synthetic dataset created with {N:,} records.\n")

# ---------------------- Step 3: Convert to Spark DataFrame ----------------------
df = spark.createDataFrame(data)
print("✅ Spark DataFrame successfully created.\n")

# ---------------------- Step 4: Query Without In-Memory Cache ----------------------
print("🚀 Running aggregation WITHOUT caching...")
start = time.time()

result_no_cache = (
    df.groupBy("category")
      .agg(
          avg("transaction_amount").alias("avg_amount"),
          sum("transaction_amount").alias("total_amount")
      )
)
result_no_cache.show(5)

end = time.time()
print(f"⏱ Execution time (without cache): {end - start:.2f} seconds\n")

# ---------------------- Step 5: Enable In-Memory Caching ----------------------
df.cache()
df.count()  # triggers caching
print("✅ DataFrame cached in memory.\n")

# ---------------------- Step 6: Query With In-Memory Cache ----------------------
print("⚡ Running aggregation WITH caching...")
start = time.time()

result_cached = (
    df.groupBy("category")
      .agg(
          avg("transaction_amount").alias("avg_amount"),
          sum("transaction_amount").alias("total_amount")
      )
)
result_cached.show(5)

end = time.time()
print(f"⏱ Execution time (with cache): {end - start:.2f} seconds\n")

# ---------------------- Step 7: Real-Time Analysis Example ----------------------
print("📊 Running real-time query: Top 5 customers by total transaction amount...")
start = time.time()

top_customers = (
    df.groupBy("customer_id")
      .agg(sum("transaction_amount").alias("total_amount"))
      .orderBy(col("total_amount").desc())
)
top_customers.show(5)

end = time.time()
print(f"⏱ Execution time (top customers query): {end - start:.2f} seconds\n")

# ---------------------- Step 8: Stop Spark Session ----------------------
spark.stop()
print("🛑 Spark session stopped successfully.")


✅ Spark session initialized successfully.

✅ Synthetic dataset created with 1,000,000 records.

✅ Spark DataFrame successfully created.

🚀 Running aggregation WITHOUT caching...
+--------+------------------+-------------------+
|category|        avg_amount|       total_amount|
+--------+------------------+-------------------+
|       B|251.98508213158505|6.307942560999969E7|
|       D|252.39534501112723|6.306753923000043E7|
|       C|252.95622741606613|6.313003272000021E7|
|       A| 253.2420412828456|6.336748978000004E7|
+--------+------------------+-------------------+

⏱ Execution time (without cache): 11.40 seconds

✅ DataFrame cached in memory.

⚡ Running aggregation WITH caching...
+--------+------------------+-------------------+
|category|        avg_amount|       total_amount|
+--------+------------------+-------------------+
|       B|251.98508213158505|6.307942560999969E7|
|       D|252.39534501112723|6.306753923000043E7|
|       C|252.95622741606613|6.313003272000021E7|
|  