# Big Data Analysis at Scale — Dask / PySpark

> Internship Deliverable: Notebook with insights derived from big data processing.

**What this does**
- Auto-detects **Dask** or **PySpark** and uses whichever is available.
- Reads a big dataset (Parquet or CSV, local or cloud), performs ETL + analytics at scale.
- Shows partitioning, lazy evaluation, caching, and write-out of results.
- Produces **insights** and persists outputs to Parquet/CSV.

**Pro tip**: If you're running locally with limited RAM, prefer Parquet + column pruning, and start small (sample) before scaling.


## 0) Config — set your inputs/outputs
Update the paths below. For real big-data vibes, point to a directory of multiple Parquet/CSV files.

**Examples:**
- Local: `data/nyc_taxi_parquet/2019/*/*.parquet`
- Cloud (Spark): `s3a://ursa-labs-taxi-data/2019/01/*.parquet`
- Cloud (Dask): `s3://ursa-labs-taxi-data/2019/01/*.parquet`


In [None]:
import os
from pyspark.sql import SparkSession, functions as F

# paths (update if needed)
INPUT_PATH = r"C:\Users\basun\OneDrive\Desktop\Programming practice\bigdata_project\data\nyc_taxi\yellow_tripdata_2019-01.parquet"
OUTPUT_DIR = r"C:\Users\basun\OneDrive\Desktop\Programming practice\bigdata_project\outputs"
CURATED_PATH = os.path.join(OUTPUT_DIR, "curated_parquet")

os.makedirs(OUTPUT_DIR, exist_ok=True)
print("Inputs set ✅")

## 1) Environment detection + cluster spin-up
We try Dask first (fast to boot locally), then fall back to PySpark if available. Both paths implement similar logic.

In [None]:
spark = SparkSession.builder.appName("BigDataAnalysis").getOrCreate()
print("SparkSession ready ✨", spark.version)

In [None]:
if BACKEND == "dask":
    from dask.distributed import Client, LocalCluster
    cluster = LocalCluster()
    client = Client(cluster)
    display(client)
    print(f"Dask cluster up with {len(client.nthreads())} workers ✨")
elif BACKEND == "spark":
    from pyspark.sql import SparkSession
    spark = (SparkSession.builder
             .appName("BigDataAnalysisDemo")
             .config("spark.sql.sources.useV1SourceList", "csv,json")
             .getOrCreate())
    spark
    print("SparkSession ready ✨", spark.version)

## 2) Load — column pruning + schema hints
We prune columns to reduce IO and speed things up. Adjust `COLUMNS` to match your dataset.
Below assumes NYC taxi-like columns as a demo; map these to your schema if different.

In [None]:
df = spark.read.parquet(INPUT_PATH)

# check schema + preview
df.printSchema()
df.show(5)
print("Raw row count:", df.count())


## 3) ETL — cleanups, time features, and filters
We’ll:
- Cast dtypes
- Compute trip duration
- Filter out obvious outliers
- Add hour/day/weekday features for time-series group-bys

In [None]:
from pyspark.sql.functions import to_timestamp, col, unix_timestamp, hour, dayofmonth, dayofweek

df = (df
      .withColumn("tpep_pickup_datetime", to_timestamp(col("tpep_pickup_datetime")))
      .withColumn("tpep_dropoff_datetime", to_timestamp(col("tpep_dropoff_datetime")))
     )

df = df.withColumn(
    "duration_min",
    (unix_timestamp(col("tpep_dropoff_datetime")) - unix_timestamp(col("tpep_pickup_datetime"))) / 60.0
)

# filters (relaxed to avoid empty df issues)
df = df.filter(
    (col("trip_distance") > 0) &
    (col("trip_distance") < 100) &
    (col("duration_min") > 0) &
    (col("duration_min") < 500)
)

df = (df
      .withColumn("hour", hour(col("tpep_pickup_datetime")))
      .withColumn("day", dayofmonth(col("tpep_pickup_datetime")))
      .withColumn("dow", dayofweek(col("tpep_pickup_datetime")))
     )

df.show(5)
print("Filtered row count:", df.count())


## 4) Core analytics — insights
We compute:
- Median / 95th pct of trip distance & duration
- Hourly ride volume
- Top pickup zones
These are classic, scalable group-bys.

In [None]:
# Percentiles
q_td = df.approxQuantile("trip_distance",[0.5,0.95],0.01)
q_du = df.approxQuantile("duration_min",[0.5,0.95],0.01)

hourly = df.groupBy("hour").count().orderBy("hour")
top_pu = df.groupBy("PULocationID").count().orderBy(F.desc("count")).limit(10)

import pandas as pd

q_pdf = pd.DataFrame([
    {"column":"trip_distance","p50":q_td[0],"p95":q_td[1]},
    {"column":"duration_min","p50":q_du[0],"p95":q_du[1]}
])
hourly_pdf = hourly.toPandas()
top_pu_pdf = top_pu.toPandas()

print("Percentiles:\n", q_pdf)
print("\nHourly volume:\n", hourly_pdf)
print("\nTop pickup zones:\n", top_pu_pdf)

# save
q_pdf.to_csv(os.path.join(OUTPUT_DIR, "percentiles.csv"), index=False)
hourly_pdf.to_csv(os.path.join(OUTPUT_DIR, "hourly_volume.csv"), index=False)
top_pu_pdf.to_csv(os.path.join(OUTPUT_DIR, "top_pickups.csv"), index=False)
print("✅ Saved CSVs to:", OUTPUT_DIR)


## 5) Scalability flex — partitions & caching
Show how performance changes with different partition counts and caching/persisting. This is 
a **demonstration block** — tweak `N_PARTITIONS` based on your machine or cluster.

In [None]:
df2 = df.repartition(8).persist()
import time
t0 = time.time()
_ = df2.agg(F.avg("fare_amount").alias("avg_fare")).collect()
print(f"Spark avg(fare_amount) with 8 partitions took {time.time()-t0:.2f}s")


## 6) Save curated dataset
Good practice: write a cleaned, analytics-ready table for downstream stuff (dashboards/ML).

In [None]:
df.write.mode("overwrite").parquet(CURATED_PATH)
print("✅ Wrote curated dataset to:", CURATED_PATH)


## 7) Insights (interpretation)
When you run the notebook on a real dataset, fill in your observations below. Example:
- **P50 trip distance**: X km; **P95**: Y km → long-tail trips exist but are rare.
- **Peak demand hour**: HH:00 local → staffing & surge pricing opportunity.
- **Top pickup zones**: IDs [A, B, C] → consider geo-targeted supply.

This section is meant for you to contextualize numbers for business stakeholders.


---
### Appendix: Switching between Dask and PySpark
- Install Dask: `pip install dask[complete] distributed pyarrow fsspec s3fs`
- Install Spark: `pip install pyspark pyarrow`
- Spark + S3: configure `hadoop-aws` + AWS creds or use local data.
- Parquet >>> CSV for performance; leverage partitioned folders.


In [None]:
print(OUTPUT_DIR)

In [None]:
print("Row count:", df.count())
df.show(5)

In [None]:
print("Row count:", df.count())


In [None]:
import os
from pyspark.sql import SparkSession, functions as F

# ======================
# CONFIG
# ======================
INPUT_PATH = r"C:\Users\basun\OneDrive\Desktop\Programming practice\bigdata_project\data\nyc_taxi\yellow_tripdata_2019-01.parquet"
OUTPUT_DIR = r"C:\Users\basun\OneDrive\Desktop\Programming practice\bigdata_project\outputs"
CURATED_PATH = os.path.join(OUTPUT_DIR, "curated_parquet")
os.makedirs(OUTPUT_DIR, exist_ok=True)

# ======================
# START SPARK
# ======================
spark = SparkSession.builder.appName("BigDataAnalysis").getOrCreate()
print("SparkSession ready ✨", spark.version)

# ======================
# LOAD DATA
# ======================
df = spark.read.parquet(INPUT_PATH)
print("Raw row count:", df.count())

# ======================
# ETL (cleaning + features)
# ======================
from pyspark.sql.functions import to_timestamp, col, unix_timestamp, hour, dayofmonth, dayofweek

df = (df
      .withColumn("tpep_pickup_datetime", to_timestamp(col("tpep_pickup_datetime")))
      .withColumn("tpep_dropoff_datetime", to_timestamp(col("tpep_dropoff_datetime")))
     )

df = df.withColumn(
    "duration_min",
    (unix_timestamp(col("tpep_dropoff_datetime")) - unix_timestamp(col("tpep_pickup_datetime"))) / 60.0
)

# filters (kept broad so df isn’t empty)
df = df.filter(
    (col("trip_distance") > 0) &
    (col("trip_distance") < 100) &
    (col("duration_min") > 0) &
    (col("duration_min") < 500)
)

df = (df
      .withColumn("hour", hour(col("tpep_pickup_datetime")))
      .withColumn("day", dayofmonth(col("tpep_pickup_datetime")))
      .withColumn("dow", dayofweek(col("tpep_pickup_datetime")))
     )

print("Filtered row count:", df.count())
df.show(5)

# ======================
# CORE ANALYTICS
# ======================
q_td = df.approxQuantile("trip_distance",[0.5,0.95],0.01)
q_du = df.approxQuantile("duration_min",[0.5,0.95],0.01)

hourly = df.groupBy("hour").count().orderBy("hour")
top_pu = df.groupBy("PULocationID").count().orderBy(F.desc("count")).limit(10)

import pandas as pd
q_pdf = pd.DataFrame([
    {"column":"trip_distance","p50":q_td[0],"p95":q_td[1]},
    {"column":"duration_min","p50":q_du[0],"p95":q_du[1]}
])
hourly_pdf = hourly.toPandas()
top_pu_pdf = top_pu.toPandas()

print("\nPercentiles:\n", q_pdf)
print("\nHourly volume:\n", hourly_pdf)
print("\nTop pickup zones:\n", top_pu_pdf)

# save outputs
q_pdf.to_csv(os.path.join(OUTPUT_DIR, "percentiles.csv"), index=False)
hourly_pdf.to_csv(os.path.join(OUTPUT_DIR, "hourly_volume.csv"), index=False)
top_pu_pdf.to_csv(os.path.join(OUTPUT_DIR, "top_pickups.csv"), index=False)
print("✅ Saved CSVs to:", OUTPUT_DIR)

# ======================
# SAVE CURATED DATASET
# ======================
df.write.mode("overwrite").parquet(CURATED_PATH)
print("✅ Wrote curated dataset to:", CURATED_PATH)


In [None]:
import os
from pyspark.sql import SparkSession, functions as F

# ======================
# CONFIG
# ======================
INPUT_PATH = r"C:\Users\basun\OneDrive\Desktop\Programming practice\bigdata_project\data\nyc_taxi\yellow_tripdata_2019-01.parquet"

# write outputs to a simple path (not OneDrive)
OUTPUT_DIR = r"C:\bigdata_outputs"
CURATED_PATH = os.path.join(OUTPUT_DIR, "curated_parquet")
os.makedirs(OUTPUT_DIR, exist_ok=True)

# ======================
# START SPARK
# ======================
spark = SparkSession.builder.appName("BigDataAnalysis").getOrCreate()
print("SparkSession ready ✨", spark.version)

# ======================
# LOAD DATA
# ======================
df = spark.read.parquet(INPUT_PATH)
print("Raw row count:", df.count())

# ======================
# ETL (cleaning + features)
# ======================
from pyspark.sql.functions import to_timestamp, col, unix_timestamp, hour, dayofmonth, dayofweek

df = (df
      .withColumn("tpep_pickup_datetime", to_timestamp(col("tpep_pickup_datetime")))
      .withColumn("tpep_dropoff_datetime", to_timestamp(col("tpep_dropoff_datetime")))
     )

df = df.withColumn(
    "duration_min",
    (unix_timestamp(col("tpep_dropoff_datetime")) - unix_timestamp(col("tpep_pickup_datetime"))) / 60.0
)

# filters (broad so df isn’t empty)
df = df.filter(
    (col("trip_distance") > 0) &
    (col("trip_distance") < 100) &
    (col("duration_min") > 0) &
    (col("duration_min") < 500)
)

df = (df
      .withColumn("hour", hour(col("tpep_pickup_datetime")))
      .withColumn("day", dayofmonth(col("tpep_pickup_datetime")))
      .withColumn("dow", dayofweek(col("tpep_pickup_datetime")))
     )

print("Filtered row count:", df.count())
df.show(5)

# ======================
# CORE ANALYTICS
# ======================
q_td = df.approxQuantile("trip_distance",[0.5,0.95],0.01)
q_du = df.approxQuantile("duration_min",[0.5,0.95],0.01)

hourly = df.groupBy("hour").count().orderBy("hour")
top_pu = df.groupBy("PULocationID").count().orderBy(F.desc("count")).limit(10)

import pandas as pd
q_pdf = pd.DataFrame([
    {"column":"trip_distance","p50":q_td[0],"p95":q_td[1]},
    {"column":"duration_min","p50":q_du[0],"p95":q_du[1]}
])
hourly_pdf = hourly.toPandas()
top_pu_pdf = top_pu.toPandas()

print("\nPercentiles:\n", q_pdf)
print("\nHourly volume:\n", hourly_pdf)
print("\nTop pickup zones:\n", top_pu_pdf)

# save outputs
q_pdf.to_csv(os.path.join(OUTPUT_DIR, "percentiles.csv"), index=False)
hourly_pdf.to_csv(os.path.join(OUTPUT_DIR, "hourly_volume.csv"), index=False)
top_pu_pdf.to_csv(os.path.join(OUTPUT_DIR, "top_pickups.csv"), index=False)
print("✅ Saved CSVs to:", OUTPUT_DIR)

# ======================
# SAVE CURATED DATASET
# ======================
print("Final row count to save:", df.count())
df.write.mode("overwrite").parquet(CURATED_PATH)
print("✅ Wrote curated dataset to:", CURATED_PATH)

# ======================
# TEST SMALL CSV (sanity check)
# ======================
df.limit(10).toPandas().to_csv(os.path.join(OUTPUT_DIR, "test_out.csv"), index=False)
print("✅ Wrote test_out.csv for quick check")


In [None]:
# -------- BIG DATA ANALYSIS (ONE-CELL, SPARK) --------
import os, sys, glob, time, traceback
import pandas as pd
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.functions import to_timestamp, col, unix_timestamp, hour, dayofmonth, dayofweek

# ---------- CONFIG ----------
INPUT_PATH = r"C:\Users\basun\OneDrive\Desktop\Programming practice\bigdata_project\data\nyc_taxi\yellow_tripdata_2019-01.parquet"
PREFERRED_OUTPUTS = [r"C:\bigdata_outputs", r"C:\temp\bigdata_outputs"]  # fallbacks if blocked
FAST_SAMPLE = False            # True = faster test run on ~1% of data
SAMPLE_FRACTION = 0.01

# ---------- PICK OUTPUT DIR THAT WORKS ----------
OUTPUT_DIR = None
err_msgs = []
for cand in PREFERRED_OUTPUTS:
    try:
        os.makedirs(cand, exist_ok=True)
        testfile = os.path.join(cand, "python_write_test.txt")
        with open(testfile, "w", encoding="utf-8") as f:
            f.write("ok")
        os.remove(testfile)
        OUTPUT_DIR = cand
        break
    except Exception as e:
        err_msgs.append(f"{cand} -> {e}")

if OUTPUT_DIR is None:
    print("❌ Could not create/write any output dir:", err_msgs)
    raise SystemExit(1)

CURATED_PATH = os.path.join(OUTPUT_DIR, "curated_parquet")
print("✅ Using OUTPUT_DIR:", OUTPUT_DIR)

# ---------- CHECK INPUT EXISTS ----------
def expand_paths(p):
    if any(ch in p for ch in ["*", "?", "["]):
        return glob.glob(p)
    return [p] if os.path.exists(p) else []

matches = expand_paths(INPUT_PATH)
print("Input path:", INPUT_PATH)
print("Matched files:", len(matches))
if len(matches) == 0:
    print("❌ No files found at INPUT_PATH. Listing parent dir for clues:")
    parent = os.path.dirname(INPUT_PATH)
    try:
        print("Parent dir:", parent)
        print("Parent dir entries (first 20):", os.listdir(parent)[:20])
    except Exception as e:
        print("Could not list parent dir:", e)
    raise SystemExit(1)

# ---------- START SPARK ----------
spark = SparkSession.builder.appName("BigDataAnalysis-OneCell") \
    .config("spark.sql.shuffle.partitions", "8") \
    .getOrCreate()
print("SparkSession ✅", spark.version)

# ---------- SPARK WRITE SMOKE TEST ----------
try:
    tdf = spark.createDataFrame([(1, 10.0), (2, 20.0)], ["PULocationID","fare_amount"])
    smoke_path = os.path.join(OUTPUT_DIR, "spark_write_smoke")
    tdf.write.mode("overwrite").parquet(smoke_path)
    print("✅ Spark write smoke OK ->", smoke_path)
except Exception as e:
    print("❌ Spark write smoke FAILED:", e)
    print("Traceback:\n", traceback.format_exc())
    raise

# ---------- LOAD DATA ----------
try:
    t0 = time.time()
    df = spark.read.option("mergeSchema", True).parquet(INPUT_PATH)
    print(f"Loaded parquet in {time.time()-t0:.2f}s")
except Exception as e:
    print("❌ Failed to read parquet:", e)
    print("Traceback:\n", traceback.format_exc())
    raise

# (Optional) fast sample to speed things up locally
if FAST_SAMPLE:
    df = df.sample(False, SAMPLE_FRACTION, seed=42)
    print(f"FAST_SAMPLE on -> fraction={SAMPLE_FRACTION}")

# quick peek without scanning the world
df.show(5, truncate=False)

# ---------- ETL ----------
df = (df
      .withColumn("tpep_pickup_datetime", to_timestamp(col("tpep_pickup_datetime")))
      .withColumn("tpep_dropoff_datetime", to_timestamp(col("tpep_dropoff_datetime")))
     )

df = df.withColumn(
    "duration_min",
    (unix_timestamp(col("tpep_dropoff_datetime")) - unix_timestamp(col("tpep_pickup_datetime"))) / 60.0
)

# filters (broad to avoid empty DF on some months)
df = df.filter(
    (col("trip_distance") > 0) &
    (col("trip_distance") < 100) &
    (col("duration_min") > 0) &
    (col("duration_min") < 500)
)

df = (df
      .withColumn("hour", hour(col("tpep_pickup_datetime")))
      .withColumn("day", dayofmonth(col("tpep_pickup_datetime")))
      .withColumn("dow", dayofweek(col("tpep_pickup_datetime")))
     )

# count can be slow; print a cheap sanity via limit first
mini = df.limit(5).toPandas()
print("Mini preview (post-ETL):")
print(mini)

# safer row count (will take some time if large)
rc = df.count()
print("Post-ETL row count:", rc)
if rc == 0:
    print("❌ Post-ETL DataFrame is empty. Relax filters or check timestamps in your file.")
    raise SystemExit(1)

# ---------- CORE ANALYTICS ----------
q_td = df.approxQuantile("trip_distance",[0.5,0.95],0.01)
q_du = df.approxQuantile("duration_min",[0.5,0.95],0.01)
hourly = df.groupBy("hour").count().orderBy("hour")
top_pu = df.groupBy("PULocationID").count().orderBy(F.desc("count")).limit(10)

q_pdf = pd.DataFrame([
    {"column":"trip_distance","p50":q_td[0],"p95":q_td[1]},
    {"column":"duration_min","p50":q_du[0],"p95":q_du[1]}
])
hourly_pdf = hourly.toPandas()
top_pu_pdf = top_pu.toPandas()

print("\nPercentiles:\n", q_pdf)
print("\nHourly volume (head):\n", hourly_pdf.head())
print("\nTop pickup zones:\n", top_pu_pdf)

# ---------- SAVE CSV OUTPUTS ----------
try:
    q_pdf.to_csv(os.path.join(OUTPUT_DIR, "percentiles.csv"), index=False)
    hourly_pdf.to_csv(os.path.join(OUTPUT_DIR, "hourly_volume.csv"), index=False)
    top_pu_pdf.to_csv(os.path.join(OUTPUT_DIR, "top_pickups.csv"), index=False)
    print("✅ Saved CSVs to:", OUTPUT_DIR)
except Exception as e:
    print("❌ Failed saving CSVs:", e)
    print("Traceback:\n", traceback.format_exc())
    raise

# ---------- SAVE CURATED PARQUET ----------
try:
    # coalesce to fewer files so it’s obvious in Explorer
    df.coalesce(1).write.mode("overwrite").parquet(CURATED_PATH)
    print("✅ Wrote curated dataset to:", CURATED_PATH)
except Exception as e:
    print("❌ Failed writing curated parquet:", e)
    print("Traceback:\n", traceback.format_exc())
    raise

# ---------- FINAL LISTING ----------
try:
    print("\nFiles in OUTPUT_DIR:")
    print(os.listdir(OUTPUT_DIR))
    if os.path.isdir(CURATED_PATH):
        print("Curated files (first 10):", os.listdir(CURATED_PATH)[:10])
except Exception as e:
    print("Could not list output dir:", e)

# Optional: pop open File Explorer (Windows only)
try:
    os.startfile(OUTPUT_DIR)
except Exception:
    pass
# -------- END --------



In [None]:
import os

print("Main outputs:", os.listdir(r"C:\bigdata_outputs"))
print("Curated parquet folder:", os.listdir(r"C:\bigdata_outputs\curated_parquet"))


In [None]:
import os
INPUT_PATH = r"C:\Users\basun\OneDrive\Desktop\Programming practice\bigdata_project\data\nyc_taxi\yellow_tripdata_2019-01.parquet"

print("Exists? ->", os.path.exists(INPUT_PATH))
print("Is dir? ->", os.path.isdir(INPUT_PATH))

parent = os.path.dirname(INPUT_PATH)
print("Parent folder listing (first 10):", os.listdir(parent)[:10])


In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("TestWrite").getOrCreate()
test = spark.createDataFrame([(1,"a"),(2,"b")], ["id","val"])
print("Test count:", test.count())

test_out = r"C:\bigdata_outputs\test_parquet"
test.write.mode("overwrite").parquet(test_out)
print("Wrote test parquet to:", test_out)

import os
print("Contents of test_out:", os.listdir(test_out))
