In [None]:
# === Experiment: Directory partitioning for GroupBy(User_ID) -> avg(Heart_Rate) ===
# Produces: /data/exp_directory_user_avg_results.csv

from pyspark.sql import SparkSession, functions as F
from pyspark.storagelevel import StorageLevel
import os, time, pandas as pd

# ---------- Config ----------
SOURCE_CSV  = "/content/cleaned_personal_health_data.csv"
OUTPUT_CSV  = "/content/Output/exp_directory_user_avg_results.csv"
DIR_BASE    = "/content/Output/output_directory_partitioned_userid"  # shared location
FIXED_PARTITIONS = 4      # for aggregation
RUNS = 5                  # number of timed runs
LIMIT_USERS = int(os.environ.get("LIMIT_USERS", "0"))

# ---------- Spark ----------
spark = SparkSession.builder \
    .appName("Exp-Directory-UserAvg") \
    .config("spark.sql.shuffle.partitions", str(FIXED_PARTITIONS)) \
    .config("spark.speculation", "false") \
    .getOrCreate()
spark.sparkContext.setLogLevel("WARN")

# ---------- Step 1: Load base CSV ----------
df = spark.read.csv(SOURCE_CSV, header=True, inferSchema=True)
df = df.select("User_ID", "Heart_Rate")

# Optional subset
if LIMIT_USERS > 0:
    subset = df.select("User_ID").distinct().limit(LIMIT_USERS)
    df = df.join(subset, on="User_ID", how="inner")
    print(f"[INFO] LIMIT_USERS={LIMIT_USERS} -> rows after filter: {df.count()}")

# ---------- Step 2: Write directory-partitioned dataset ----------
(
    df.write
    .mode("overwrite")
    .partitionBy("User_ID")
    .parquet(DIR_BASE)
)
print(f"[OK] Wrote partitioned dataset to {DIR_BASE}")

# ---------- Step 3: Read back ----------
dfp = (
    spark.read
         .option("basePath", DIR_BASE)
         .parquet(f"{DIR_BASE}/*")
         .select("User_ID", "Heart_Rate")
)
dfp = dfp.repartition(FIXED_PARTITIONS, "User_ID").persist(StorageLevel.MEMORY_AND_DISK)

# Warm-up
_ = dfp.groupBy("User_ID").agg(F.avg("Heart_Rate").alias("Avg_Heart_Rate")).count()

# ---------- Step 4: Timed runs ----------
times = []
for r in range(1, RUNS+1):
    t0 = time.perf_counter()
    _ = dfp.groupBy("User_ID").agg(F.avg("Heart_Rate").alias("Avg_Heart_Rate")).count()
    elapsed = round(time.perf_counter() - t0, 6)
    times.append(elapsed)
    print(f"[directory] run {r}/{RUNS}: {elapsed:.6f}s")

avg_t = round(sum(times)/len(times), 6)

# ---------- Step 5: Save ----------
results = pd.DataFrame([{
    "Partitioning Type": "directory",
    "Partitions": FIXED_PARTITIONS,
    "Runs": RUNS,
    "Per-Run Times (s)": ";".join(map(str, times)),
    "Avg Execution Time (s)": avg_t,
    "LimitUsers": LIMIT_USERS
}])
results.to_csv(OUTPUT_CSV, index=False)
print(f"\n[OK] Wrote {OUTPUT_CSV}")
display(results)
