In [None]:
# Mount drive
from google.colab import drive
drive.mount('/content/drive')

# Set working directory
%cd /content/drive/MyDrive/Research/Datasets/

In [None]:
"""
Performance Comparison: Pandas vs Polars (Eager & Lazy)
========================================================
Using NYC Motor Vehicle Collisions Dataset

This script benchmarks various data operations across three approaches:
1. Pandas (eager loading)
2. Polars (eager loading)
3. Polars (lazy loading)

Dataset: NYC Motor Vehicle Collisions
Download from: https://data.cityofnewyork.us/Public-Safety/Motor-Vehicle-Collisions-Crashes/h9gi-nx95
Save as: 'Motor_Vehicle_Collisions_Crashes.csv'
"""

import time
import numpy as np
import pandas as pd
import polars as pl
from functools import wraps

# =============================================================================
# CONFIGURATION
# =============================================================================
CSV_FILE = "nyccollision.csv"

# Column names as per the dataset
COLS = {
    "date": "CRASH DATE",
    "time": "CRASH TIME",
    "borough": "BOROUGH",
    "zip": "ZIP CODE",
    "lat": "LATITUDE",
    "lon": "LONGITUDE",
    "location": "LOCATION",
    "on_street": "ON STREET NAME",
    "cross_street": "CROSS STREET NAME",
    "off_street": "OFF STREET NAME",
    "persons_injured": "NUMBER OF PERSONS INJURED",
    "persons_killed": "NUMBER OF PERSONS KILLED",
    "pedestrians_injured": "NUMBER OF PEDESTRIANS INJURED",
    "pedestrians_killed": "NUMBER OF PEDESTRIANS KILLED",
    "cyclists_injured": "NUMBER OF CYCLIST INJURED",
    "cyclists_killed": "NUMBER OF CYCLIST KILLED",
    "motorists_injured": "NUMBER OF MOTORIST INJURED",
    "motorists_killed": "NUMBER OF MOTORIST KILLED",
    "factor_1": "CONTRIBUTING FACTOR VEHICLE 1",
    "factor_2": "CONTRIBUTING FACTOR VEHICLE 2",
    "factor_3": "CONTRIBUTING FACTOR VEHICLE 3",
    "factor_4": "CONTRIBUTING FACTOR VEHICLE 4",
    "factor_5": "CONTRIBUTING FACTOR VEHICLE 5",
    "collision_id": "COLLISION_ID",
    "vehicle_1": "VEHICLE TYPE CODE 1",
    "vehicle_2": "VEHICLE TYPE CODE 2",
    "vehicle_3": "VEHICLE TYPE CODE 3",
    "vehicle_4": "VEHICLE TYPE CODE 4",
    "vehicle_5": "VEHICLE TYPE CODE 5",
}

def timer(func):
    """Decorator to measure execution time"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        start = time.perf_counter()
        result = func(*args, **kwargs)
        end = time.perf_counter()
        return result, end - start
    return wrapper

def print_results(operation, times):
    """Pretty print benchmark results"""
    print(f"\n{'='*70}")
    print(f"Operation: {operation}")
    print(f"{'='*70}")
    print(f"  Pandas (Eager):       {times['pandas']:>10.4f} seconds")
    print(f"  Polars (Eager):       {times['polars_eager']:>10.4f} seconds")
    print(f"  Polars (Lazy):        {times['polars_lazy']:>10.4f} seconds")
    print(f"  ---")
    speedup_eager = times['pandas'] / times['polars_eager'] if times['polars_eager'] > 0 else float('inf')
    speedup_lazy = times['pandas'] / times['polars_lazy'] if times['polars_lazy'] > 0 else float('inf')
    print(f"  Polars Eager Speedup: {speedup_eager:>10.2f}x faster")
    print(f"  Polars Lazy Speedup:  {speedup_lazy:>10.2f}x faster")

print("="*70)
print("NYC MOTOR VEHICLE COLLISIONS - PANDAS vs POLARS BENCHMARK")
print("="*70)

# =============================================================================
# 1. BENCHMARK: READING CSV
# =============================================================================
print("\n[1/12] Benchmarking: Reading CSV File...")

@timer
def read_pandas():
    return pd.read_csv(CSV_FILE)

@timer
def read_polars_eager():
    return pl.read_csv(CSV_FILE)

@timer
def read_polars_lazy():
    return pl.scan_csv(CSV_FILE).collect()

_, t_pandas = read_pandas()
_, t_polars_eager = read_polars_eager()
_, t_polars_lazy = read_polars_lazy()

print_results("Reading CSV File", {
    "pandas": t_pandas,
    "polars_eager": t_polars_eager,
    "polars_lazy": t_polars_lazy
})

# Load data into memory for subsequent benchmarks
print("\nLoading data into memory for benchmarks...")
df_pandas = pd.read_csv(CSV_FILE)
df_polars = pl.read_csv(CSV_FILE)
lf_polars = pl.scan_csv(CSV_FILE)

print(f"Dataset loaded: {len(df_pandas):,} rows × {len(df_pandas.columns)} columns")



**Pipeline Operations**
  1.  Read CSV file
  2.  Filter invalid/null records
  3.  Parse dates & extract year/month/weekday/hour
  4.  Fill null values in numeric columns
  5.  Calculate: total_injured, total_killed, severity_score
  6.  Categorize: severity_category, victim_type, time_of_day
  7.  String operations: uppercase, regex matching
  8.  Complex 5-level groupby with 12 aggregations
  9.  Post-aggregation calculations (rates, percentages)
  10. Multi-column sort & top 100 selection

In [None]:
pandas_start = time.perf_counter()

# Step 1: Read CSV
step1_start = time.perf_counter()
df_pd = pd.read_csv(CSV_FILE)
step1_time = time.perf_counter() - step1_start
print(f"  Step 1  - Read CSV:                    {step1_time:.4f}s  |  Rows: {len(df_pd):,}")

# Step 2: Clean data - Remove rows with missing critical fields
step2_start = time.perf_counter()
df_pd = df_pd[
    (df_pd["BOROUGH"].notna()) &
    (df_pd["CONTRIBUTING FACTOR VEHICLE 1"].notna()) &
    (df_pd["CONTRIBUTING FACTOR VEHICLE 1"] != "Unspecified") &
    (df_pd["CRASH DATE"].notna())
].copy()
step2_time = time.perf_counter() - step2_start
print(f"  Step 2  - Clean data (filter nulls):   {step2_time:.4f}s  |  Rows: {len(df_pd):,}")

# Step 3: Parse date and extract components
step3_start = time.perf_counter()
df_pd["CRASH DATE"] = pd.to_datetime(df_pd["CRASH DATE"])
df_pd["crash_year"] = df_pd["CRASH DATE"].dt.year
df_pd["crash_month"] = df_pd["CRASH DATE"].dt.month
df_pd["crash_day_of_week"] = df_pd["CRASH DATE"].dt.dayofweek
df_pd["crash_hour"] = pd.to_datetime(df_pd["CRASH TIME"], format="%H:%M", errors="coerce").dt.hour
step3_time = time.perf_counter() - step3_start
print(f"  Step 3  - Parse dates & extract:       {step3_time:.4f}s")

# Step 4: Fill null numeric values
step4_start = time.perf_counter()
injury_cols = [
    "NUMBER OF PERSONS INJURED", "NUMBER OF PERSONS KILLED",
    "NUMBER OF PEDESTRIANS INJURED", "NUMBER OF PEDESTRIANS KILLED",
    "NUMBER OF CYCLIST INJURED", "NUMBER OF CYCLIST KILLED",
    "NUMBER OF MOTORIST INJURED", "NUMBER OF MOTORIST KILLED"
]
df_pd[injury_cols] = df_pd[injury_cols].fillna(0)
step4_time = time.perf_counter() - step4_start
print(f"  Step 4  - Fill null values:            {step4_time:.4f}s")

# Step 5: Create calculated columns
step5_start = time.perf_counter()
df_pd["total_injured"] = (
    df_pd["NUMBER OF PEDESTRIANS INJURED"] +
    df_pd["NUMBER OF CYCLIST INJURED"] +
    df_pd["NUMBER OF MOTORIST INJURED"]
)
df_pd["total_killed"] = (
    df_pd["NUMBER OF PEDESTRIANS KILLED"] +
    df_pd["NUMBER OF CYCLIST KILLED"] +
    df_pd["NUMBER OF MOTORIST KILLED"]
)
df_pd["total_casualties"] = df_pd["total_injured"] + df_pd["total_killed"]
df_pd["severity_score"] = df_pd["total_killed"] * 100 + df_pd["total_injured"] * 10
df_pd["has_injuries"] = df_pd["total_injured"] > 0
df_pd["has_fatalities"] = df_pd["total_killed"] > 0
step5_time = time.perf_counter() - step5_start
print(f"  Step 5  - Create calculated columns:   {step5_time:.4f}s")

# Step 6: Apply conditional categorization
step6_start = time.perf_counter()
conditions_severity = [
    df_pd["total_killed"] > 0,
    df_pd["total_injured"] >= 5,
    df_pd["total_injured"] >= 2,
    df_pd["total_injured"] >= 1,
]
choices_severity = ["Fatal", "Critical", "Serious", "Minor"]
df_pd["severity_category"] = np.select(conditions_severity, choices_severity, default="No Injury")

conditions_victim = [
    (df_pd["NUMBER OF PEDESTRIANS INJURED"] + df_pd["NUMBER OF PEDESTRIANS KILLED"]) > 0,
    (df_pd["NUMBER OF CYCLIST INJURED"] + df_pd["NUMBER OF CYCLIST KILLED"]) > 0,
]
choices_victim = ["Pedestrian", "Cyclist"]
df_pd["primary_victim_type"] = np.select(conditions_victim, choices_victim, default="Motorist/Other")

conditions_time = [
    (df_pd["crash_hour"] >= 6) & (df_pd["crash_hour"] < 12),
    (df_pd["crash_hour"] >= 12) & (df_pd["crash_hour"] < 18),
    (df_pd["crash_hour"] >= 18) & (df_pd["crash_hour"] < 22),
]
choices_time = ["Morning", "Afternoon", "Evening"]
df_pd["time_of_day"] = np.select(conditions_time, choices_time, default="Night")
step6_time = time.perf_counter() - step6_start
print(f"  Step 6  - Conditional categorization:  {step6_time:.4f}s")

# Step 7: String operations on contributing factors
step7_start = time.perf_counter()
df_pd["factor_category"] = df_pd["CONTRIBUTING FACTOR VEHICLE 1"].str.upper()
df_pd["is_driver_error"] = df_pd["CONTRIBUTING FACTOR VEHICLE 1"].str.contains(
    "Driver|Distraction|Fatigue|Alcohol|Drugs", case=False, regex=True, na=False
)
df_pd["is_vehicle_issue"] = df_pd["CONTRIBUTING FACTOR VEHICLE 1"].str.contains(
    "Brake|Steering|Tire|Light|Vehicle", case=False, regex=True, na=False
)
step7_time = time.perf_counter() - step7_start
print(f"  Step 7  - String operations:           {step7_time:.4f}s")

# Step 8: Complex multi-level grouping with aggregations
step8_start = time.perf_counter()
result_pd = df_pd.groupby(
    ["BOROUGH", "crash_year", "severity_category", "primary_victim_type", "time_of_day"]
).agg(
    total_crashes=("COLLISION_ID", "count"),
    total_injured=("total_injured", "sum"),
    total_killed=("total_killed", "sum"),
    total_casualties=("total_casualties", "sum"),
    avg_severity_score=("severity_score", "mean"),
    max_severity_score=("severity_score", "max"),
    pedestrians_injured=("NUMBER OF PEDESTRIANS INJURED", "sum"),
    cyclists_injured=("NUMBER OF CYCLIST INJURED", "sum"),
    motorists_injured=("NUMBER OF MOTORIST INJURED", "sum"),
    driver_error_count=("is_driver_error", "sum"),
    vehicle_issue_count=("is_vehicle_issue", "sum"),
    unique_factors=("CONTRIBUTING FACTOR VEHICLE 1", "nunique"),
).reset_index()
step8_time = time.perf_counter() - step8_start
print(f"  Step 8  - Complex grouping & agg:      {step8_time:.4f}s  |  Groups: {len(result_pd):,}")

# Step 9: Post-aggregation calculations
step9_start = time.perf_counter()
result_pd["casualty_rate"] = result_pd["total_casualties"] / result_pd["total_crashes"]
result_pd["fatality_rate"] = result_pd["total_killed"] / result_pd["total_crashes"]
result_pd["driver_error_pct"] = result_pd["driver_error_count"] / result_pd["total_crashes"] * 100
step9_time = time.perf_counter() - step9_start
print(f"  Step 9  - Post-aggregation calcs:      {step9_time:.4f}s")

# Step 10: Sort by severity and get top results
step10_start = time.perf_counter()
result_pd = result_pd.sort_values(
    ["total_casualties", "total_crashes", "avg_severity_score"],
    ascending=[False, False, False]
)
top_100_pd = result_pd.head(100)
step10_time = time.perf_counter() - step10_start
print(f"  Step 10 - Sort & get top 100:          {step10_time:.4f}s")

pandas_total = time.perf_counter() - pandas_start
print(f"\n  >>> PANDAS TOTAL TIME: {pandas_total:.4f} seconds <<<")




In [None]:
# =============================================================================
# POLARS (EAGER LOADING)
# =============================================================================
print("\n" + "=" * 80)
print("POLARS (EAGER LOADING)")
print("=" * 80)

polars_eager_start = time.perf_counter()

# Step 1: Read CSV
step1_start = time.perf_counter()
df_pl = pl.read_csv(CSV_FILE)
step1_time = time.perf_counter() - step1_start
print(f"  Step 1  - Read CSV:                    {step1_time:.4f}s  |  Rows: {len(df_pl):,}")

# Step 2: Clean data - Remove rows with missing critical fields
step2_start = time.perf_counter()
df_pl = df_pl.filter(
    (pl.col("BOROUGH").is_not_null()) &
    (pl.col("CONTRIBUTING FACTOR VEHICLE 1").is_not_null()) &
    (pl.col("CONTRIBUTING FACTOR VEHICLE 1") != "Unspecified") &
    (pl.col("CRASH DATE").is_not_null())
)
step2_time = time.perf_counter() - step2_start
print(f"  Step 2  - Clean data (filter nulls):   {step2_time:.4f}s  |  Rows: {len(df_pl):,}")

# Step 3: Parse date and extract components
step3_start = time.perf_counter()
df_pl = df_pl.with_columns([
    pl.col("CRASH DATE").str.to_datetime("%m/%d/%Y").alias("CRASH DATE"),
])
df_pl = df_pl.with_columns([
    pl.col("CRASH DATE").dt.year().alias("crash_year"),
    pl.col("CRASH DATE").dt.month().alias("crash_month"),
    pl.col("CRASH DATE").dt.weekday().alias("crash_day_of_week"),
    pl.col("CRASH TIME").str.slice(0, 2).cast(pl.Int32, strict=False).alias("crash_hour"),
])
step3_time = time.perf_counter() - step3_start
print(f"  Step 3  - Parse dates & extract:       {step3_time:.4f}s")

# Step 4: Fill null numeric values
step4_start = time.perf_counter()
injury_cols = [
    "NUMBER OF PERSONS INJURED", "NUMBER OF PERSONS KILLED",
    "NUMBER OF PEDESTRIANS INJURED", "NUMBER OF PEDESTRIANS KILLED",
    "NUMBER OF CYCLIST INJURED", "NUMBER OF CYCLIST KILLED",
    "NUMBER OF MOTORIST INJURED", "NUMBER OF MOTORIST KILLED"
]
df_pl = df_pl.with_columns([pl.col(c).fill_null(0) for c in injury_cols])
step4_time = time.perf_counter() - step4_start
print(f"  Step 4  - Fill null values:            {step4_time:.4f}s")

# Step 5: Create calculated columns
step5_start = time.perf_counter()
df_pl = df_pl.with_columns([
    (pl.col("NUMBER OF PEDESTRIANS INJURED") +
     pl.col("NUMBER OF CYCLIST INJURED") +
     pl.col("NUMBER OF MOTORIST INJURED")).alias("total_injured"),
    (pl.col("NUMBER OF PEDESTRIANS KILLED") +
     pl.col("NUMBER OF CYCLIST KILLED") +
     pl.col("NUMBER OF MOTORIST KILLED")).alias("total_killed"),
])
df_pl = df_pl.with_columns([
    (pl.col("total_injured") + pl.col("total_killed")).alias("total_casualties"),
    (pl.col("total_killed") * 100 + pl.col("total_injured") * 10).alias("severity_score"),
    (pl.col("total_injured") > 0).alias("has_injuries"),
    (pl.col("total_killed") > 0).alias("has_fatalities"),
])
step5_time = time.perf_counter() - step5_start
print(f"  Step 5  - Create calculated columns:   {step5_time:.4f}s")

# Step 6: Apply conditional categorization
step6_start = time.perf_counter()
df_pl = df_pl.with_columns([
    pl.when(pl.col("total_killed") > 0).then(pl.lit("Fatal"))
      .when(pl.col("total_injured") >= 5).then(pl.lit("Critical"))
      .when(pl.col("total_injured") >= 2).then(pl.lit("Serious"))
      .when(pl.col("total_injured") >= 1).then(pl.lit("Minor"))
      .otherwise(pl.lit("No Injury"))
      .alias("severity_category"),
    pl.when((pl.col("NUMBER OF PEDESTRIANS INJURED") + pl.col("NUMBER OF PEDESTRIANS KILLED")) > 0)
      .then(pl.lit("Pedestrian"))
      .when((pl.col("NUMBER OF CYCLIST INJURED") + pl.col("NUMBER OF CYCLIST KILLED")) > 0)
      .then(pl.lit("Cyclist"))
      .otherwise(pl.lit("Motorist/Other"))
      .alias("primary_victim_type"),
    pl.when((pl.col("crash_hour") >= 6) & (pl.col("crash_hour") < 12)).then(pl.lit("Morning"))
      .when((pl.col("crash_hour") >= 12) & (pl.col("crash_hour") < 18)).then(pl.lit("Afternoon"))
      .when((pl.col("crash_hour") >= 18) & (pl.col("crash_hour") < 22)).then(pl.lit("Evening"))
      .otherwise(pl.lit("Night"))
      .alias("time_of_day"),
])
step6_time = time.perf_counter() - step6_start
print(f"  Step 6  - Conditional categorization:  {step6_time:.4f}s")

# Step 7: String operations on contributing factors
step7_start = time.perf_counter()
df_pl = df_pl.with_columns([
    pl.col("CONTRIBUTING FACTOR VEHICLE 1").str.to_uppercase().alias("factor_category"),
    pl.col("CONTRIBUTING FACTOR VEHICLE 1")
      .str.contains("(?i)Driver|Distraction|Fatigue|Alcohol|Drugs")
      .fill_null(False).alias("is_driver_error"),
    pl.col("CONTRIBUTING FACTOR VEHICLE 1")
      .str.contains("(?i)Brake|Steering|Tire|Light|Vehicle")
      .fill_null(False).alias("is_vehicle_issue"),
])
step7_time = time.perf_counter() - step7_start
print(f"  Step 7  - String operations:           {step7_time:.4f}s")

# Step 8: Complex multi-level grouping with aggregations
step8_start = time.perf_counter()
result_pl = df_pl.group_by(
    ["BOROUGH", "crash_year", "severity_category", "primary_victim_type", "time_of_day"]
).agg([
    pl.col("COLLISION_ID").count().alias("total_crashes"),
    pl.col("total_injured").sum().alias("total_injured"),
    pl.col("total_killed").sum().alias("total_killed"),
    pl.col("total_casualties").sum().alias("total_casualties"),
    pl.col("severity_score").mean().alias("avg_severity_score"),
    pl.col("severity_score").max().alias("max_severity_score"),
    pl.col("NUMBER OF PEDESTRIANS INJURED").sum().alias("pedestrians_injured"),
    pl.col("NUMBER OF CYCLIST INJURED").sum().alias("cyclists_injured"),
    pl.col("NUMBER OF MOTORIST INJURED").sum().alias("motorists_injured"),
    pl.col("is_driver_error").sum().alias("driver_error_count"),
    pl.col("is_vehicle_issue").sum().alias("vehicle_issue_count"),
    pl.col("CONTRIBUTING FACTOR VEHICLE 1").n_unique().alias("unique_factors"),
])
step8_time = time.perf_counter() - step8_start
print(f"  Step 8  - Complex grouping & agg:      {step8_time:.4f}s  |  Groups: {len(result_pl):,}")

# Step 9: Post-aggregation calculations
step9_start = time.perf_counter()
result_pl = result_pl.with_columns([
    (pl.col("total_casualties") / pl.col("total_crashes")).alias("casualty_rate"),
    (pl.col("total_killed") / pl.col("total_crashes")).alias("fatality_rate"),
    (pl.col("driver_error_count") / pl.col("total_crashes") * 100).alias("driver_error_pct"),
])
step9_time = time.perf_counter() - step9_start
print(f"  Step 9  - Post-aggregation calcs:      {step9_time:.4f}s")

# Step 10: Sort by severity and get top results
step10_start = time.perf_counter()
result_pl = result_pl.sort(
    ["total_casualties", "total_crashes", "avg_severity_score"],
    descending=[True, True, True]
)
top_100_pl = result_pl.head(100)
step10_time = time.perf_counter() - step10_start
print(f"  Step 10 - Sort & get top 100:          {step10_time:.4f}s")

polars_eager_total = time.perf_counter() - polars_eager_start
print(f"\n  >>> POLARS EAGER TOTAL TIME: {polars_eager_total:.4f} seconds <<<")




In [None]:
# =============================================================================
# POLARS (LAZY LOADING)
# =============================================================================
print("\n" + "=" * 80)
print("POLARS (LAZY LOADING)")
print("=" * 80)

polars_lazy_start = time.perf_counter()

# All steps combined in a single lazy query
print("  Building query plan...")

query_start = time.perf_counter()

# Build the complete lazy query
lazy_result = (
    # Step 1: Scan CSV (lazy)
    pl.scan_csv(CSV_FILE)

    # Step 2: Clean data
    .filter(
        (pl.col("BOROUGH").is_not_null()) &
        (pl.col("CONTRIBUTING FACTOR VEHICLE 1").is_not_null()) &
        (pl.col("CONTRIBUTING FACTOR VEHICLE 1") != "Unspecified") &
        (pl.col("CRASH DATE").is_not_null())
    )

    # Step 3: Parse date and extract components
    .with_columns([
        pl.col("CRASH DATE").str.to_datetime("%m/%d/%Y").alias("CRASH DATE"),
    ])
    .with_columns([
        pl.col("CRASH DATE").dt.year().alias("crash_year"),
        pl.col("CRASH DATE").dt.month().alias("crash_month"),
        pl.col("CRASH DATE").dt.weekday().alias("crash_day_of_week"),
        pl.col("CRASH TIME").str.slice(0, 2).cast(pl.Int32, strict=False).alias("crash_hour"),
    ])

    # Step 4: Fill null numeric values
    .with_columns([
        pl.col("NUMBER OF PERSONS INJURED").fill_null(0),
        pl.col("NUMBER OF PERSONS KILLED").fill_null(0),
        pl.col("NUMBER OF PEDESTRIANS INJURED").fill_null(0),
        pl.col("NUMBER OF PEDESTRIANS KILLED").fill_null(0),
        pl.col("NUMBER OF CYCLIST INJURED").fill_null(0),
        pl.col("NUMBER OF CYCLIST KILLED").fill_null(0),
        pl.col("NUMBER OF MOTORIST INJURED").fill_null(0),
        pl.col("NUMBER OF MOTORIST KILLED").fill_null(0),
    ])

    # Step 5: Create calculated columns
    .with_columns([
        (pl.col("NUMBER OF PEDESTRIANS INJURED") +
         pl.col("NUMBER OF CYCLIST INJURED") +
         pl.col("NUMBER OF MOTORIST INJURED")).alias("total_injured"),
        (pl.col("NUMBER OF PEDESTRIANS KILLED") +
         pl.col("NUMBER OF CYCLIST KILLED") +
         pl.col("NUMBER OF MOTORIST KILLED")).alias("total_killed"),
    ])
    .with_columns([
        (pl.col("total_injured") + pl.col("total_killed")).alias("total_casualties"),
        (pl.col("total_killed") * 100 + pl.col("total_injured") * 10).alias("severity_score"),
        (pl.col("total_injured") > 0).alias("has_injuries"),
        (pl.col("total_killed") > 0).alias("has_fatalities"),
    ])

    # Step 6: Apply conditional categorization
    .with_columns([
        pl.when(pl.col("total_killed") > 0).then(pl.lit("Fatal"))
          .when(pl.col("total_injured") >= 5).then(pl.lit("Critical"))
          .when(pl.col("total_injured") >= 2).then(pl.lit("Serious"))
          .when(pl.col("total_injured") >= 1).then(pl.lit("Minor"))
          .otherwise(pl.lit("No Injury"))
          .alias("severity_category"),
        pl.when((pl.col("NUMBER OF PEDESTRIANS INJURED") + pl.col("NUMBER OF PEDESTRIANS KILLED")) > 0)
          .then(pl.lit("Pedestrian"))
          .when((pl.col("NUMBER OF CYCLIST INJURED") + pl.col("NUMBER OF CYCLIST KILLED")) > 0)
          .then(pl.lit("Cyclist"))
          .otherwise(pl.lit("Motorist/Other"))
          .alias("primary_victim_type"),
        pl.when((pl.col("crash_hour") >= 6) & (pl.col("crash_hour") < 12)).then(pl.lit("Morning"))
          .when((pl.col("crash_hour") >= 12) & (pl.col("crash_hour") < 18)).then(pl.lit("Afternoon"))
          .when((pl.col("crash_hour") >= 18) & (pl.col("crash_hour") < 22)).then(pl.lit("Evening"))
          .otherwise(pl.lit("Night"))
          .alias("time_of_day"),
    ])

    # Step 7: String operations
    .with_columns([
        pl.col("CONTRIBUTING FACTOR VEHICLE 1").str.to_uppercase().alias("factor_category"),
        pl.col("CONTRIBUTING FACTOR VEHICLE 1")
          .str.contains("(?i)Driver|Distraction|Fatigue|Alcohol|Drugs")
          .fill_null(False).alias("is_driver_error"),
        pl.col("CONTRIBUTING FACTOR VEHICLE 1")
          .str.contains("(?i)Brake|Steering|Tire|Light|Vehicle")
          .fill_null(False).alias("is_vehicle_issue"),
    ])

    # Step 8: Complex grouping
    .group_by(["BOROUGH", "crash_year", "severity_category", "primary_victim_type", "time_of_day"])
    .agg([
        pl.col("COLLISION_ID").count().alias("total_crashes"),
        pl.col("total_injured").sum().alias("total_injured"),
        pl.col("total_killed").sum().alias("total_killed"),
        pl.col("total_casualties").sum().alias("total_casualties"),
        pl.col("severity_score").mean().alias("avg_severity_score"),
        pl.col("severity_score").max().alias("max_severity_score"),
        pl.col("NUMBER OF PEDESTRIANS INJURED").sum().alias("pedestrians_injured"),
        pl.col("NUMBER OF CYCLIST INJURED").sum().alias("cyclists_injured"),
        pl.col("NUMBER OF MOTORIST INJURED").sum().alias("motorists_injured"),
        pl.col("is_driver_error").sum().alias("driver_error_count"),
        pl.col("is_vehicle_issue").sum().alias("vehicle_issue_count"),
        pl.col("CONTRIBUTING FACTOR VEHICLE 1").n_unique().alias("unique_factors"),
    ])

    # Step 9: Post-aggregation calculations
    .with_columns([
        (pl.col("total_casualties") / pl.col("total_crashes")).alias("casualty_rate"),
        (pl.col("total_killed") / pl.col("total_crashes")).alias("fatality_rate"),
        (pl.col("driver_error_count") / pl.col("total_crashes") * 100).alias("driver_error_pct"),
    ])

    # Step 10: Sort and limit
    .sort(["total_casualties", "total_crashes", "avg_severity_score"], descending=[True, True, True])
    .head(100)
)

query_build_time = time.perf_counter() - query_start
print(f"  Query plan built in:                   {query_build_time:.4f}s")

# Execute the query
print("  Executing optimized query...")
execute_start = time.perf_counter()
top_100_lazy = lazy_result.collect()
execute_time = time.perf_counter() - execute_start
print(f"  Query execution:                       {execute_time:.4f}s  |  Results: {len(top_100_lazy):,}")

polars_lazy_total = time.perf_counter() - polars_lazy_start
print(f"\n  >>> POLARS LAZY TOTAL TIME: {polars_lazy_total:.4f} seconds <<<")




In [None]:
# =============================================================================
# FINAL COMPARISON SUMMARY
# =============================================================================
print("\n" + "=" * 80)
print("PERFORMANCE COMPARISON SUMMARY")
print("=" * 80)

print(f"""
┌─────────────────────────────────────────────────────────────────────────────┐
│                         EXECUTION TIME COMPARISON                           │
├─────────────────────────────────────────────────────────────────────────────┤
│  Pandas (Eager):        {pandas_total:>10.4f} seconds                               │
│  Polars (Eager):        {polars_eager_total:>10.4f} seconds                               │
│  Polars (Lazy):         {polars_lazy_total:>10.4f} seconds                               │
├─────────────────────────────────────────────────────────────────────────────┤
│                            SPEEDUP FACTORS                                  │
├─────────────────────────────────────────────────────────────────────────────┤
│  Polars Eager vs Pandas:  {pandas_total/polars_eager_total:>6.2f}x faster                               │
│  Polars Lazy vs Pandas:   {pandas_total/polars_lazy_total:>6.2f}x faster                               │
│  Polars Lazy vs Eager:    {polars_eager_total/polars_lazy_total:>6.2f}x faster                               │
└─────────────────────────────────────────────────────────────────────────────┘

Pipeline Operations Performed:
  1.  Read CSV file
  2.  Filter invalid/null records
  3.  Parse dates & extract year/month/weekday/hour
  4.  Fill null values in numeric columns
  5.  Calculate: total_injured, total_killed, severity_score
  6.  Categorize: severity_category, victim_type, time_of_day
  7.  String operations: uppercase, regex matching
  8.  Complex 5-level groupby with 12 aggregations
  9.  Post-aggregation calculations (rates, percentages)
  10. Multi-column sort & top 100 selection

Key Insights:
  • Polars Eager benefits from Rust's performance + parallelization
  • Polars Lazy additionally optimizes the query plan:
    - Predicate pushdown (filters applied early)
    - Projection pushdown (only needed columns read)
    - Operation reordering for efficiency
    - Reduced memory allocations
""")

# Verify results match
print("Verifying results consistency...")
print(f"  Pandas result shape:       {top_100_pd.shape}")
print(f"  Polars Eager result shape: {top_100_pl.shape}")
print(f"  Polars Lazy result shape:  {top_100_lazy.shape}")

#Dask for parallel computing

In [None]:
import dask.dataframe as dd
df = dd.read_csv('nyccollision.csv',  blocksize=100e6, dtype={'CONTRIBUTING FACTOR VEHICLE 3': 'object',
       'CONTRIBUTING FACTOR VEHICLE 4': 'object',
       'CONTRIBUTING FACTOR VEHICLE 5': 'object',
       'NUMBER OF PERSONS INJURED': 'float64',
       'VEHICLE TYPE CODE 3': 'object',
       'VEHICLE TYPE CODE 4': 'object',
       'VEHICLE TYPE CODE 5': 'object',
       'NUMBER OF PERSONS KILLED': 'float64',
       'ZIP CODE': 'object'})

In [None]:
df.visualize()

In [None]:
df.get_partition(0).compute()

In [None]:
import dask.dataframe as dd
df = dd.read_csv('nyccollision.csv',  blocksize=10e6, dtype={'CONTRIBUTING FACTOR VEHICLE 3': 'object',
       'CONTRIBUTING FACTOR VEHICLE 4': 'object',
       'CONTRIBUTING FACTOR VEHICLE 5': 'object',
       'NUMBER OF PERSONS INJURED': 'float64',
       'VEHICLE TYPE CODE 3': 'object',
       'VEHICLE TYPE CODE 4': 'object',
       'VEHICLE TYPE CODE 5': 'object',
       'NUMBER OF PERSONS KILLED': 'float64',
       'ZIP CODE': 'object'})
df.groupby('BOROUGH')['NUMBER OF PERSONS KILLED'].sum().compute()

#Space Optimization

In [None]:
# Write Parquet file format using polars
df_polars = pl.read_csv("nyccollision.csv")
df_polars.write_parquet("/content/drive/MyDrive/Research/Datasets/collision.parquet")

In [None]:
# Read Parquet file format using polars
df_polars = pl.read_parquet("collision.parquet")
df_polars

In [None]:
# Read Parquet file format using pandas
dfpandas = pd.read_parquet("collision.parquet")
df_pandas