<a href="https://colab.research.google.com/github/Arati930/codtech_tasks/blob/main/Task1_BigData_Colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:

# Install Java (needed for PySpark) + Python packages
!apt-get update -y
!apt-get install -y openjdk-11-jdk-headless -qq

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"

!pip install -q pyspark==3.4.1 pyarrow dask[complete] fastparquet pandas matplotlib


Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
Hit:2 https://cli.github.com/packages stable InRelease
Get:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Get:4 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Hit:5 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:6 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Get:7 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Get:11 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [127 kB]
Get:12 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  Packages [2,065 kB]
Get:13 https://r2u.stat.illinois.edu/ubuntu jammy/main amd64 Packages [2,

In [2]:

%%writefile Task1_BigData_PySpark_and_Dask_Notebook.py
# ===============================
# TASK 1 - Big Data Analysis
# PySpark + Dask pipelines
# ===============================

import os
from pathlib import Path

# Create data directory
DATA_DIR = Path('data_big')
DATA_DIR.mkdir(exist_ok=True)

# -------- Dataset Acquisition --------
import urllib.request

sample_csv = DATA_DIR / '311_sample.csv'
if not sample_csv.exists():
    try:
        url = 'https://data.cityofnewyork.us/resource/erm2-nwe9.csv?$limit=50000'
        urllib.request.urlretrieve(url, sample_csv)
        print('Downloaded sample dataset:', sample_csv)
    except Exception as e:
        print("Download failed, fallback to synthetic data", e)
else:
    print("Sample already exists:", sample_csv)

# Synthetic dataset
def synthesize_dataset(csv_path, n_rows=100000):
    import random, csv, datetime
    categories = ['A','B','C','D']
    locations = ['North','South','East','West']
    if csv_path.exists():
        return
    with open(csv_path, 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(['event_id','ts','category','location','value1','value2'])
        start = datetime.datetime(2020,1,1)
        for i in range(n_rows):
            ts = start + datetime.timedelta(seconds=random.randint(0, 60*60*24*365))
            writer.writerow([i+1, ts.isoformat(), random.choice(categories),
                             random.choice(locations),
                             random.random()*1000, random.gauss(50,20)])

synthetic_csv = DATA_DIR / 'synthetic_events.csv'
synthesize_dataset(synthetic_csv, n_rows=50000)

print("Data ready at:", DATA_DIR)

# -------- PySpark Pipeline --------
try:
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, to_timestamp, count, avg
    spark = SparkSession.builder.appName("BigDataTask1").getOrCreate()
    file_to_read = str(synthetic_csv if synthetic_csv.exists() else sample_csv)

    df = spark.read.option("header","true").option("inferSchema","true").csv(file_to_read)
    df = df.withColumn("ts", to_timestamp(col("ts")))

    for c in ["value1","value2"]:
        if c in df.columns:
            df = df.withColumn(c, col(c).cast("double"))

    agg = df.groupBy("category","location").agg(
        count("*").alias("n_events"),
        avg("value1").alias("avg_value1"),
        avg("value2").alias("avg_value2")
    )
    agg.show()
    out_dir = DATA_DIR / "pyspark_output"
    agg.repartition(1).write.mode("overwrite").parquet(str(out_dir))
    spark.stop()
except Exception as e:
    print("PySpark section skipped:", e)

# -------- Dask Pipeline --------
try:
    import dask.dataframe as dd
    from dask.distributed import Client, LocalCluster
    cluster = LocalCluster(n_workers=2, threads_per_worker=1)
    client = Client(cluster)

    csv_path = str(synthetic_csv if synthetic_csv.exists() else sample_csv)
    ddf = dd.read_csv(csv_path, assume_missing=True)
    ddf["ts"] = dd.to_datetime(ddf["ts"], errors="coerce")
    for c in ["value1","value2"]:
        if c in ddf.columns:
            ddf[c] = dd.to_numeric(ddf[c], errors="coerce")

    agg_ddf = ddf.groupby(["category","location"]).agg({
        "event_id":"count",
        "value1":"mean",
        "value2":"mean"
    }).rename(columns={"event_id":"n_events","value1":"avg_value1","value2":"avg_value2"})

    print(agg_ddf.compute().head())
    out_parquet = DATA_DIR / "dask_output"
    agg_ddf.to_parquet(str(out_parquet), engine="fastparquet")
    client.close()
except Exception as e:
    print("Dask section skipped:", e)

# -------- Report --------
report_path = DATA_DIR / "summary_report.txt"
with open(report_path, "w") as f:
    f.write("Big Data Analysis - Task 1\n")
    f.write("Data stored in: " + str(DATA_DIR) + "\n")
    f.write("Outputs in pyspark_output/ and dask_output/\n")
print("Report written:", report_path)


Writing Task1_BigData_PySpark_and_Dask_Notebook.py


In [3]:

%run Task1_BigData_PySpark_and_Dask_Notebook.py


Downloaded sample dataset: data_big/311_sample.csv
Data ready at: data_big
+--------+--------+--------+------------------+------------------+
|category|location|n_events|        avg_value1|        avg_value2|
+--------+--------+--------+------------------+------------------+
|       B|   North|    3203|504.91970344847596| 49.94472190349554|
|       D|   North|    3072| 499.0922068270466|49.998765099796096|
|       D|    West|    3127|496.39151355872656| 49.75940751819461|
|       D|    East|    3162| 497.9248178962062|50.001359167035886|
|       B|   South|    3138|502.51777789874376| 50.21172531422503|
|       C|    West|    2954| 505.6136827503561| 50.71398613751749|
|       B|    East|    3113| 503.7441174029004| 50.40117699733774|
|       C|    East|    3246|  507.290271622896|50.124099977081926|
|       B|    West|    3193|  498.666425248334|50.272355301101236|
|       A|   South|    3152|497.75147478181674|49.940089394260625|
|       A|    West|    3149|495.14304969039716| 49.568

INFO:distributed.http.proxy:To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
INFO:distributed.scheduler:State start
INFO:distributed.scheduler:  Scheduler at:     tcp://127.0.0.1:33551
INFO:distributed.scheduler:  dashboard at:  http://127.0.0.1:8787/status
INFO:distributed.scheduler:Registering Worker plugin shuffle
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:37389'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:33557'
INFO:distributed.nanny:Worker process 2725 was killed by signal 2


Dask section skipped: 'NoneType' object has no attribute 'sc'
Report written: data_big/summary_report.txt


In [4]:

!ls -l data_big
!head -n 20 data_big/summary_report.txt


total 34532
-rw-r--r-- 1 root root 31816165 Oct  4 10:17 311_sample.csv
drwxr-xr-x 2 root root     4096 Oct  4 10:18 pyspark_output
-rw-r--r-- 1 root root       96 Oct  4 10:20 summary_report.txt
-rw-r--r-- 1 root root  3532012 Oct  4 10:17 synthetic_events.csv
Big Data Analysis - Task 1
Data stored in: data_big
Outputs in pyspark_output/ and dask_output/
