<a href="https://colab.research.google.com/github/Danushika06/Dpt/blob/main/23BIT013_In_Memory_Data_Processing_Challenge.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import pandas as pd
import random
import time
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark import StorageLevel

# ---------------------- Step 1: Initialize Spark ----------------------
spark = (
    SparkSession.builder
    .appName("InMemoryDataProcessing-Refactor")
    .master("local[*]")
    .getOrCreate()
)
spark.sparkContext.setLogLevel("WARN")
print("✅ Spark session created")

# ---------------------- Step 2: Generate large dataset ----------------------
# Seeded RNG for reproducibility (does not change the structure/output shape)
rng = random.Random(42)

N = 1_000_000  # 1 million rows
data = pd.DataFrame({
    "customer_id": [rng.randint(1, 100000) for _ in range(N)],
    "transaction_amount": [round(rng.uniform(5, 500), 2) for _ in range(N)],
    "category": [rng.choice(["A", "B", "C", "D"]) for _ in range(N)],
    "age": [rng.randint(18, 70) for _ in range(N)]
})
print("✅ Synthetic dataset created with 1,000,000 rows")

# ---------------------- Step 3: Create Spark DataFrame ----------------------
df = spark.createDataFrame(data)
print("✅ Spark DataFrame created")

# ---------------------- Step 4: Analytical query without caching ----------------------
start = time.perf_counter()
result1 = (
    df.groupBy("category")
      .agg(
          F.avg("transaction_amount").alias("avg_amount"),
          F.sum("transaction_amount").alias("total_amount")
      )
)
result1.show(5)
end = time.perf_counter()
print(f"⏱ Execution time without cache: {end - start:.2f} seconds")

# ---------------------- Step 5: Cache DataFrame in memory ----------------------
df.persist(StorageLevel.MEMORY_ONLY)  # same effect as cache() for this case
df.count()  # Trigger caching
print("✅ DataFrame cached in memory")

# ---------------------- Step 6: Analytical query with caching ----------------------
start = time.perf_counter()
result2 = (
    df.groupBy("category")
      .agg(
          F.avg("transaction_amount").alias("avg_amount"),
          F.sum("transaction_amount").alias("total_amount")
      )
)
result2.show(5)
end = time.perf_counter()
print(f"⏱ Execution time with cache: {end - start:.2f} seconds")

# ---------------------- Step 7: Optional - Additional real-time analysis ----------------------
# Example: top 5 customers by transaction amount
start = time.perf_counter()
top_customers = (
    df.groupBy("customer_id")
      .agg(F.sum("transaction_amount").alias("total_amount"))
      .orderBy(F.col("total_amount").desc())
)
top_customers.show(5)
end = time.perf_counter()
print(f"⏱ Execution time for top customers query: {end - start:.2f} seconds")

# ---------------------- Step 8: Stop Spark ----------------------
spark.stop()
print("✅ Spark session stopped")


✅ Spark session created
✅ Synthetic dataset created with 1,000,000 rows
✅ Spark DataFrame created
+--------+------------------+-------------------+
|category|        avg_amount|       total_amount|
+--------+------------------+-------------------+
|       B|252.66972962580155|6.308000867999986E7|
|       D|252.09951488039036|6.294697996999955E7|
|       C|252.23079039756232|6.293662681999975E7|
|       A|252.85554263642595|6.350087669999883E7|
+--------+------------------+-------------------+

⏱ Execution time without cache: 13.11 seconds
✅ DataFrame cached in memory
+--------+------------------+-------------------+
|category|        avg_amount|       total_amount|
+--------+------------------+-------------------+
|       B|252.66972962580155|6.308000867999986E7|
|       D|252.09951488039036|6.294697996999955E7|
|       C|252.23079039756232|6.293662681999975E7|
|       A|252.85554263642595|6.350087669999883E7|
+--------+------------------+-------------------+

⏱ Execution time with cac