# Analytics Pipeline: Data Engineering & Feature Extraction

This notebook serves as the primary data processing engine for the Cyclistic Behavioral Mirroring Model. It performs batch processing of raw CSV files, handles data cleaning, and engineers critical binary features like `is_commute` to enable downstream behavioral modeling.

## 1. Setup & Configuration

Define directory paths for raw and processed data, and set stability thresholds for station analysis.

In [4]:
import pandas as pd
import numpy as np
from pathlib import Path
RAW_DIR = Path("../data/raw")
PROCESSED_DIR = Path("../data/processed")
MIN_TRIPS_FOR_STABILITY = 50 

## 2. Pipeline Orchestration

The pipeline function executes the following sequence:
1. **Ingestion**: Loads all CSV files from the raw directory.
2. **Cleaning**: Standardizes column names, converts types, and filters out trips with null stations or invalid durations (e.g., < 1 min or > 24 hours).
3. **Feature Engineering**: Derives `hour`, `is_weekday`, and the critical `is_commute` flag based on time-of-day and trip duration.
4. **Export**: Saves a cleaned fact table (`fact_trips.csv`) and a station dimension table (`dim_stations.csv`) for subsequent analysis steps.

In [5]:
def run_analytics_pipeline():
    PROCESSED_DIR.mkdir(parents=True, exist_ok=True)
    
    all_files = list(RAW_DIR.glob("*.csv"))
    if not all_files:
        raise FileNotFoundError(f"No CSVs found in {RAW_DIR}.")

   
    keep_cols = [
        'ride_id', 'started_at', 'ended_at', 'member_casual',
        'start_station_name', 'start_lat', 'start_lng', 'end_lat', 'end_lng'
    ]
    
    print(f"Processing {len(all_files)} files...")
    df = pd.concat((pd.read_csv(f, usecols=lambda x: x.lower() in keep_cols) for f in all_files), ignore_index=True)
    df.columns = df.columns.str.lower()

    
    df['started_at'] = pd.to_datetime(df['started_at'], errors='coerce')
    df['ended_at'] = pd.to_datetime(df['ended_at'], errors='coerce')
    df = df.dropna(subset=['started_at', 'start_station_name'])
    
    df['ride_length'] = (df['ended_at'] - df['started_at']).dt.total_seconds() / 60
    df = df[(df['ride_length'] > 1) & (df['ride_length'] < 1440)]

   
    df['hour'] = df['started_at'].dt.hour
    df['is_weekday'] = df['started_at'].dt.dayofweek < 5
    is_rush = df['hour'].isin([7, 8, 9, 10, 16, 17, 18, 19])
    df['is_commute'] = is_rush & df['is_weekday'] & (df['ride_length'] <= 30)

   
    station_dim = df.groupby('start_station_name').agg({
        'start_lat': 'mean',
        'start_lng': 'mean'
    }).reset_index()
    station_dim.to_csv(PROCESSED_DIR / "dim_stations.csv", index=False)

    
    fact_cols = [
        'ride_id', 'start_station_name', 'started_at', 'member_casual', 
        'is_commute', 'ride_length', 'start_lat', 'start_lng', 'end_lat', 'end_lng'
    ]
    df[fact_cols].to_csv(PROCESSED_DIR / "fact_trips.csv", index=False)

    print(f"✅ SUCCESS: Pipeline re-run complete. Fact table now contains coordinates.")

## 3. Execution

Execute the end-to-end data pipeline.

In [6]:
if __name__ == "__main__":
    run_analytics_pipeline()

Processing 13 files...
✅ SUCCESS: Pipeline re-run complete. Fact table now contains coordinates.
