# Sofia Air Quality ETL Notebook

This notebook implements the end-to-end data pipeline for the Sofia air quality dataset and is Colab-friendly.

Run cells from top to bottom in order. The steps are:

1. Install runtime dependencies (Colab).
2. (Optional) Example Spark ETL script that demonstrates processing inside a Kaggle/Colab environment.
3. Install additional packages used by the helper cells.
4. Upload `kaggle.json` and download the dataset into `data/raw/`.
5. Run the repository ETL (`processing/etl_pipeline.py`) which writes partitioned Parquet into `data/processed/sofia_air_quality_weather`.
6. Upload the processed output to S3.

Notes:
- In Colab you will be asked to upload `kaggle.json` (from Kaggle -> Account -> Create New API Token).
- If you prefer, run the same ETL locally by running `python run_pipeline.py` or calling `processing/etl_pipeline.run_etl()`.


In [None]:
### Example: Spark ETL (Kaggle/Colab example)

The cell below demonstrates how you might run a PySpark-based ETL inside a notebook environment where the dataset is already available (for example, in a Kaggle kernel). If you are running the repo ETL (recommended) skip this example and run the later `processing/etl_pipeline.run_etl()` cell.

In [None]:
!pip install pyspark plotly streamlit

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, avg, year, month
import os

# Start Spark session
spark = SparkSession.builder.appName("Sofia Air Quality + Weather ETL").getOrCreate()

# Base dataset path
base_path = "/kaggle/input/sofia-air-quality-dataset/"

# --- WEATHER FILES ---
weather_files = [
    os.path.join(base_path, f) for f in os.listdir(base_path)
    if "bme280sof" in f and os.path.getsize(os.path.join(base_path, f)) > 0
]

print(f"✅ Found {len(weather_files)} weather files with data")

df_weather = spark.read.csv(weather_files, header=True, inferSchema=True)
df_weather = df_weather.withColumn("date", to_date(col("timestamp")))
df_weather = df_weather.select(
    "timestamp", "location", "lat", "lon", "date", "temperature", "humidity", "pressure"
).dropna()

# --- POLLUTANT FILES ---
pollutant_files = [
    os.path.join(base_path, f) for f in os.listdir(base_path)
    if "sds011sof" in f and os.path.getsize(os.path.join(base_path, f)) > 0
]

print(f"✅ Found {len(pollutant_files)} pollutant files with data")

df_pollution = spark.read.csv(pollutant_files, header=True, inferSchema=True)
df_pollution = df_pollution.withColumn("date", to_date(col("timestamp")))
df_pollution = df_pollution.select(
    "timestamp", "location", "lat", "lon", "date",
    col("P1").alias("PM10"), col("P2").alias("PM2_5")
).dropna()

# --- JOIN POLLUTION + WEATHER ---
df_joined = df_pollution.join(
    df_weather,
    ["timestamp", "location", "lat", "lon", "date"],
    "inner"
)

# --- DAILY AVERAGES ---
daily_avg = df_joined.groupBy("location", "lat", "lon", "date").agg(
    avg("PM10").alias("avg_PM10"),
    avg("PM2_5").alias("avg_PM2_5"),
    avg("temperature").alias("avg_temperature"),
    avg("humidity").alias("avg_humidity"),
    avg("pressure").alias("avg_pressure")
).withColumn("year", year("date")).withColumn("month", month("date"))

# Save parquet
output_path = "/kaggle/working/sofia_air_quality_weather.parquet_updated"
daily_avg.write.mode("overwrite").partitionBy("year", "month").parquet(output_path)

spark.stop()
print(f"✅ Processed data saved at: {output_path}")


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, avg, year, month
import os

# Start Spark session
spark = SparkSession.builder.appName("Sofia Air Quality + Weather ETL").getOrCreate()

# Base dataset path
base_path = "/kaggle/input/sofia-air-quality-dataset/"

# --- WEATHER FILES ---
weather_files = [
    os.path.join(base_path, f) for f in os.listdir(base_path)
    if "bme280sof" in f and os.path.getsize(os.path.join(base_path, f)) > 0
]

print(f"✅ Found {len(weather_files)} weather files with data")

df_weather = spark.read.csv(weather_files, header=True, inferSchema=True)
df_weather = df_weather.withColumn("date", to_date(col("timestamp")))
df_weather = df_weather.select(
    "timestamp", "location", "lat", "lon", "date", "temperature", "humidity", "pressure"
).dropna()

# --- POLLUTANT FILES ---
pollutant_files = [
    os.path.join(base_path, f) for f in os.listdir(base_path)
    if "sds011sof" in f and os.path.getsize(os.path.join(base_path, f)) > 0
]

print(f"✅ Found {len(pollutant_files)} pollutant files with data")

df_pollution = spark.read.csv(pollutant_files, header=True, inferSchema=True)
df_pollution = df_pollution.withColumn("date", to_date(col("timestamp")))
df_pollution = df_pollution.select(
    "timestamp", "location", "lat", "lon", "date",
    col("P1").alias("PM10"), col("P2").alias("PM2_5")
).dropna()

# --- JOIN POLLUTION + WEATHER ---
df_joined = df_pollution.join(
    df_weather,
    ["timestamp", "location", "lat", "lon", "date"],
    "inner"
)

# --- DAILY AVERAGES ---
daily_avg = df_joined.groupBy("location", "lat", "lon", "date").agg(
    avg("PM10").alias("avg_PM10"),
    avg("PM2_5").alias("avg_PM2_5"),
    avg("temperature").alias("avg_temperature"),
    avg("humidity").alias("avg_humidity"),
    avg("pressure").alias("avg_pressure")
).withColumn("year", year("date")).withColumn("month", month("date"))

# Save parquet
output_path = "/kaggle/working/sofia_air_quality_weather.parquet_updated"
daily_avg.write.mode("overwrite").partitionBy("year", "month").parquet(output_path)

spark.stop()
print(f"✅ Processed data saved at: {output_path}")


In [None]:
### Install dependencies (Colab)

This cell installs the Python packages required when running in Google Colab: PySpark (for ETL), Kaggle client, boto3 for S3 uploads, and plotting libraries.

In [None]:
# Install runtime dependencies (Colab)
!pip install -q pyspark plotly streamlit boto3 kaggle pyarrow s3fs
print("✅ Dependencies installed")


In [None]:
# Upload kaggle.json and download Kaggle dataset (Colab)
from google.colab import files
import os
import json

print("Upload your kaggle.json (from Kaggle -> Account -> Create New API Token)")
uploaded = files.upload()
if 'kaggle.json' in uploaded:
    with open('kaggle.json', 'wb') as f:
        f.write(uploaded['kaggle.json'])
    os.environ['KAGGLE_CONFIG_DIR'] = os.getcwd()
    print('Saved kaggle.json to current working directory')
else:
    print('kaggle.json not uploaded, please upload and rerun this cell')

from kaggle.api.kaggle_api_extended import KaggleApi
api = KaggleApi()
api.authenticate()

# download dataset into data/raw
os.makedirs('data/raw', exist_ok=True)
api.dataset_download_files('hmavrodiev/sofia-air-quality-dataset', path='data/raw', unzip=True)
print('✅ Dataset downloaded to data/raw')


In [None]:
# Run the ETL pipeline (uses processing/etl_pipeline.py)
import sys
sys.path.append('.')
from processing.etl_pipeline import run_etl

# ensure raw data exists
if not os.path.exists('data/raw'):
    raise RuntimeError('data/raw is missing — run the previous cell to download the dataset')

run_etl(input_path='data/raw', output_path='data/processed/sofia_air_quality_weather')
print('✅ ETL completed and processed data saved to data/processed/sofia_air_quality_weather')


In [None]:
# Upload processed data to S3
import boto3
import os

bucket_name = 'my-sofia-air-quality'  # CHANGE to your bucket
prefix = 'processed-data/'

# walk data/processed and upload files
s3 = boto3.client('s3')
for root, dirs, files in os.walk('data/processed/sofia_air_quality_weather'):
    for fname in files:
        local_path = os.path.join(root, fname)
        rel_path = os.path.relpath(local_path, 'data/processed/sofia_air_quality_weather')
        s3_key = os.path.join(prefix, rel_path).replace('\\', '/')
        print(f'Uploading {local_path} -> s3://{bucket_name}/{s3_key}')
        s3.upload_file(local_path, bucket_name, s3_key)

print('✅ Upload completed')
