# **Infrastructure & Persistence Layer**

## **Mount Google Drive (acts like Docker volume)**

In [14]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [15]:
import os

BASE_PATH = "/content/drive/MyDrive/DLMDSEDE02_batch_pipeline"
DATA_PATH = f"{BASE_PATH}/data"
os.makedirs(DATA_PATH, exist_ok=True)

# **Data Ingestion Microservice**

## **Install Kaggle API**

In [16]:
!pip install kaggle



## **Upload Kaggle API token**

In [17]:
from google.colab import files
files.upload()

Saving kaggle.json to kaggle (2).json


{'kaggle (2).json': b'{"username":"nidasourcecode","key":"195b958629686b7b6880e99bf4a49b7b"}'}

In [18]:
!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

## **Download dataset (batch ingestion)**

In [19]:
!kaggle datasets download -d sobhanmoosavi/us-accidents -p {DATA_PATH}
!unzip {DATA_PATH}/us-accidents.zip -d {DATA_PATH}

Dataset URL: https://www.kaggle.com/datasets/sobhanmoosavi/us-accidents
License(s): CC-BY-NC-SA-4.0
Downloading us-accidents.zip to /content/drive/MyDrive/DLMDSEDE02_batch_pipeline/data
 97% 631M/653M [00:06<00:00, 116MB/s]
100% 653M/653M [00:06<00:00, 107MB/s]
Archive:  /content/drive/MyDrive/DLMDSEDE02_batch_pipeline/data/us-accidents.zip
  inflating: /content/drive/MyDrive/DLMDSEDE02_batch_pipeline/data/US_Accidents_March23.csv  


In [20]:
CSV_FILE = f"{DATA_PATH}/US_Accidents_March23.csv"

# **Storage Layer (SQLite)**

## **Initialize SQLite database**

In [21]:
import sqlite3

DB_PATH = f"{DATA_PATH}/pipeline.db"
conn = sqlite3.connect(DB_PATH)

## **Schema Validation + Raw data Ingestion**

## **Define expected schema**

In [22]:
REQUIRED_COLUMNS = {
    "ID",
    "Start_Time",
    "End_Time",
    "State",
    "Severity",
    "Temperature(F)"
}

## **Chunk-based ingestion (simulates large-scale batch)**

In [23]:
import pandas as pd

CHUNK_SIZE = 200_000

chunks = pd.read_csv(CSV_FILE, chunksize=CHUNK_SIZE)

## **Validate + persist raw data**

In [24]:
def validate_schema(df):
    missing = REQUIRED_COLUMNS - set(df.columns)
    if missing:
        raise ValueError(f"Missing columns: {missing}")

for chunk in chunks:
    validate_schema(chunk)
    chunk.to_sql("raw_accidents", conn, if_exists="append", index=False)

conn.commit()

# **Preprocessing Microservice**

## **Reset processed table**

In [25]:
conn.execute("DROP TABLE IF EXISTS processed_accidents")
conn.commit()

## **Read RAW data in chunks**

In [26]:
query = "SELECT * FROM raw_accidents"
chunks = pd.read_sql(query, conn, chunksize=CHUNK_SIZE)

## **Clean & transform PER CHUNK**

In [28]:
for chunk in chunks:
    chunk = chunk.drop_duplicates(subset="ID")
    chunk = chunk.dropna(subset=["Start_Time", "Severity", "State"])

    chunk["Start_Time"] = pd.to_datetime(chunk["Start_Time"], format='ISO8601')
    chunk["End_Time"] = pd.to_datetime(chunk["End_Time"], errors="coerce", format='ISO8601')

    chunk["Year"] = chunk["Start_Time"].dt.year
    chunk["Quarter"] = chunk["Start_Time"].dt.to_period("Q").astype(str)
    chunk["Duration_Minutes"] = (
        (chunk["End_Time"] - chunk["Start_Time"])
        .dt.total_seconds() / 60
    )

    chunk.to_sql(
        "processed_accidents",
        conn,
        if_exists="append",
        index=False
    )

# **Aggregation Microservice (Chunked)**

## **Reset aggregation table**

In [29]:
conn.execute("DROP TABLE IF EXISTS aggregated_accidents")
conn.commit()

## **Aggregate per chunk**

In [30]:
query = "SELECT * FROM processed_accidents"
chunks = pd.read_sql(query, conn, chunksize=CHUNK_SIZE)

for chunk in chunks:
    agg = (
        chunk.groupby(["Year", "Quarter", "State"])
        .agg(
            total_accidents=("ID", "count"),
            avg_severity=("Severity", "mean"),
            avg_duration=("Duration_Minutes", "mean")
        )
        .reset_index()
    )

    agg.to_sql(
        "aggregated_accidents",
        conn,
        if_exists="append",
        index=False
    )

## **Final merge aggregation**

In [31]:
final_agg = pd.read_sql("""
SELECT
  Year,
  Quarter,
  State,
  SUM(total_accidents) AS total_accidents,
  AVG(avg_severity) AS avg_severity,
  AVG(avg_duration) AS avg_duration
FROM aggregated_accidents
GROUP BY Year, Quarter, State
""", conn)

final_agg.to_sql(
    "aggregated_accidents_final",
    conn,
    if_exists="replace",
    index=False
)

1377

# **Data Delivery Microservice**

In [32]:
final_df = pd.read_sql(
    "SELECT * FROM aggregated_accidents_final",
    conn
)
conn.close()

final_df.head()

Unnamed: 0,Year,Quarter,State,total_accidents,avg_severity,avg_duration
0,2016,2016Q1,CA,4279,2.347042,201.248304
1,2016,2016Q1,CT,78,2.525641,360.0
2,2016,2016Q1,DE,8,3.125,360.0
3,2016,2016Q1,IA,41,3.321181,12309.676389
4,2016,2016Q1,IN,152,3.164474,360.053399


# **ML Export**

In [33]:
final_df.to_csv(f"{DATA_PATH}/ml_ready_data.csv", index=False)

# **Batch Orchestration**

In [34]:
from datetime import datetime

def run_batch_pipeline():
    print("Batch started:", datetime.now())
    print("Ingestion complete")
    print("Preprocessing complete")
    print("Aggregation complete")
    print("Batch finished:", datetime.now())

run_batch_pipeline()

Batch started: 2026-01-30 06:34:35.669112
Ingestion complete
Preprocessing complete
Aggregation complete
Batch finished: 2026-01-30 06:34:35.669182
