In [1]:
# Data manipulation
import pandas as pd   # Import pandas library, mainly used for handling tabular data (DataFrames, Series, etc.)
import numpy as np    # Import NumPy library, provides powerful numerical operations and array handling

# File and paths
from pathlib import Path    # Import Path from pathlib to work with filesystem paths in an object-oriented way
import shutil               # Import shutil to perform high-level file operations like copy, move, delete
import urllib.request       # Import urllib.request to handle opening URLs and downloading data from the web

# Visualization
import matplotlib.pyplot as plt  # Import matplotlib's pyplot module, commonly used for creating plots and visualizations



In [2]:
from pathlib import Path   # Import Path class for working with filesystem paths

# Loop through a list of folder names and create them if they don't already exist
for p in ["data/raw", "data/staging", "data/warehouse"]:
    Path(p).mkdir(parents=True, exist_ok=True)  
    # Path(p) creates a Path object for each directory string
    # .mkdir() makes the directory
    # parents=True allows creation of parent directories if they don’t exist
    # exist_ok=True avoids errors if the directory already exists

# Get the absolute path of the current working directory
Path(".").absolute()


WindowsPath('C:/Users/mamat/Documents/nyc')

In [None]:
import shutil   # Import shutil to perform high-level file operations like copy and move

# Define the source file path (where the file is originally located)
src = Path(r"C:\Users\mamat\Downloads\yellow_tripdata_2024-01.parquet")  

# Define the destination file path (where we want to copy the file to)
dst = Path("data/raw/yellow_tripdata_2024-01.parquet")

# Check if the source file exists and the destination file does not yet exist
if src.exists() and not dst.exists():
    shutil.copy2(src, dst)  
    # copy2() copies the file along with its metadata (timestamps, permissions, etc.)

# Print confirmation: whether the file exists at the destination, and show the path
print("Raw file at:", dst.exists(), dst)


In [None]:
import pandas as pd   # Import pandas for data manipulation and analysis

# Define the path to the raw Parquet file
raw_path = "data/raw/yellow_tripdata_2024-01.parquet"

# Read the Parquet file into a pandas DataFrame
df = pd.read_parquet(raw_path)

# Print the number of rows in the DataFrame
print("rows:", len(df))

# Display the first 5 rows of the DataFrame for a quick preview
df.head()


In [None]:

def clean_add_features(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()   # Work on a copy to avoid modifying the original DataFrame

    # --- timestamps to datetime ---
    for c in ["tpep_pickup_datetime", "tpep_dropoff_datetime"]:
        df[c] = pd.to_datetime(df[c], errors="coerce")  
        # Convert pickup/dropoff columns to datetime, invalid values become NaT

    # --- derived fields ---
    df["duration_min"] = (df["tpep_dropoff_datetime"] - df["tpep_pickup_datetime"]).dt.total_seconds() / 60.0
    # Trip duration in minutes
    
    df["pickup_hour"]   = df["tpep_pickup_datetime"].dt.hour   # Extract hour of pickup
    df["pickup_date"]   = df["tpep_pickup_datetime"].dt.date   # Extract date (no time)
    df["year"]          = df["tpep_pickup_datetime"].dt.year   # Extract year
    df["month"]         = df["tpep_pickup_datetime"].dt.month  # Extract month

    # --- required columns present ---
    required = ["tpep_pickup_datetime","tpep_dropoff_datetime","PULocationID","DOLocationID","fare_amount","total_amount"]
    df = df.dropna(subset=required)  
    # Remove rows missing critical fields

    # --- logical & outlier filters ---
    df = df[(df["duration_min"] >= 1) & (df["duration_min"] <= 180)]         
    # Keep trips between 1 min and 3 hours
    
    df["trip_distance"] = df["trip_distance"].fillna(0)  
    # Replace missing distances with 0
    
    df = df[(df["trip_distance"] >= 0) & (df["trip_distance"] <= 100)]       
    # Keep trips within 0–100 miles
    
    df = df[(df["fare_amount"] >= 0) & (df["total_amount"] >= 0)]            
    # Fares and totals must be non-negative

    # --- fare per mile guardrails (ignore when distance==0) ---
    with np.errstate(divide="ignore", invalid="ignore"):
        fpm = df["fare_amount"] / df["trip_distance"].replace({0: np.nan})  
        # Compute fare per mile; avoid division by zero
    
    df["fare_per_mile"] = fpm
    df = df[(df["fare_per_mile"].isna()) | ((df["fare_per_mile"] >= 0.5) & (df["fare_per_mile"] <= 25))]
    # Keep only reasonable fares per mile (0.5–25 USD), allow NaN when distance=0

    # --- de-duplicate ---
    before = len(df)
    dedup_cols = ["tpep_pickup_datetime","tpep_dropoff_datetime","PULocationID","DOLocationID","total_amount"]
    df = df.drop_duplicates(subset=dedup_cols, keep="first")  
    # Remove duplicates based on key trip identifiers
    print(f"Deduplicated: removed {before - len(df)} rows")

    return df

# Apply the cleaning function
cleaned = clean_add_features(df)

# Compare sizes before and after cleaning
len(df), len(cleaned), cleaned.head(2)


In [None]:
# Define the output directory where cleaned parquet files will be saved
out_dir = "data/warehouse/yellow_clean"

# Save the cleaned DataFrame as partitioned Parquet files
# - engine="pyarrow": use the PyArrow backend (required for partitioning in pandas >= 2.0)
# - index=False: don’t write the DataFrame index as a column
# - partition_cols=["year","month"]: split the dataset into folders by year/month (data lake style)
cleaned.to_parquet(out_dir, engine="pyarrow", index=False, partition_cols=["year","month"])

# Just return the output directory string so you can see where the files are stored
out_dir


In [None]:
# Path to the directory where the partitioned parquet files were written
clean_dir = "data/warehouse/yellow_clean"

# Read back the dataset
# - pandas will automatically detect the partitioned structure (year=..., month=...)
# - engine="pyarrow" is required since that’s what we used to write it
df_clean = pd.read_parquet(clean_dir, engine="pyarrow")

# Show how many rows were loaded and preview the first 2 rows
len(df_clean), df_clean.head(2)


In [None]:
trips_by_hour = (
    df_clean
    # 1. Create a new column "hour" from pickup timestamp
    .assign(hour=df_clean["tpep_pickup_datetime"].dt.hour)

    # 2. Group rows by pickup hour
    .groupby("hour", as_index=False)

    # 3. Count number of trips per hour (size = number of rows in each group)
    .size()

    # 4. Rename the count column from "size" → "trips"
    .rename(columns={"size": "trips"})

    # 5. Sort results by hour in ascending order (0..23)
    .sort_values("hour")
)

# Show the hourly trip counts DataFrame
trips_by_hour


In [None]:
monthly_rev = (
    df_clean
    # 1. Create a new column "ym" (year-month) from pickup timestamp
    #    - .dt.to_period("M") converts datetime → monthly period
    #    - .astype(str) makes it a string like "2024-01"
    .assign(ym=df_clean["tpep_pickup_datetime"].dt.to_period("M").astype(str))

    # 2. Group by the year-month column
    .groupby("ym", as_index=False)

    # 3. Aggregate trip metrics:
    #    - total revenue = sum of total_amount
    #    - average fare per trip = mean of total_amount
    #    - trips = number of rows in the group
    .agg(total_revenue=("total_amount", "sum"),
         avg_total=("total_amount", "mean"),
         trips=("total_amount", "size"))

    # 4. Sort by year-month for chronological order
    .sort_values("ym")
)

# Display the result
monthly_rev


In [None]:
from pathlib import Path

# Define a Path object pointing to the February 2024 Yellow Taxi parquet file
raw_feb = Path(r"C:\Users\mamat\Downloads\yellow_tripdata_2024-02.parquet")


In [None]:
# Destination path inside your project data/raw folder
dst = Path("data/raw/yellow_tripdata_2024-02.parquet")

# If the source parquet file exists (in Downloads) and is not already copied...
if raw_feb.exists() and not dst.exists():
    import shutil
    shutil.copy2(raw_feb, dst)   # Copy file along with metadata (timestamps, etc.)


In [None]:
# Load the February parquet file into a pandas DataFrame
df_feb = pd.read_parquet(dst)

# Print how many rows (trips) are in the February raw dataset
print("Rows in Feb raw file:", len(df_feb))

# Show the first 5 rows so you can inspect the columns and data
df_feb.head()


In [None]:
def clean_add_features(df: pd.DataFrame) -> pd.DataFrame:
    # Make a copy so we don't modify the original DataFrame passed in
    df = df.copy()   
    
    # -------------------------------------------------------------------
    # 1. Convert timestamp columns to proper datetime objects
    # -------------------------------------------------------------------
    # - Ensures calculations like time differences will work
    # - "errors='coerce'" turns invalid values into NaT (Not-a-Time)
    for c in ["tpep_pickup_datetime", "tpep_dropoff_datetime"]:
        df[c] = pd.to_datetime(df[c], errors="coerce")

    # -------------------------------------------------------------------
    # 2. Add derived / engineered features from pickup time
    # -------------------------------------------------------------------
    # Trip duration in minutes (dropoff minus pickup, converted from seconds to minutes)
    df["duration_min"] = (df["tpep_dropoff_datetime"] - df["tpep_pickup_datetime"]).dt.total_seconds() / 60.0
    
    # Hour of day when the trip started (0–23)
    df["pickup_hour"]   = df["tpep_pickup_datetime"].dt.hour
    
    # Calendar date (YYYY-MM-DD) of the pickup
    df["pickup_date"]   = df["tpep_pickup_datetime"].dt.date
    
    # Year of the pickup (useful for partitioning and trend analysis)
    df["year"]          = df["tpep_pickup_datetime"].dt.year
    
    # Month of the pickup (1–12)
    df["month"]         = df["tpep_pickup_datetime"].dt.month

    # -------------------------------------------------------------------
    # 3. Drop rows that are missing required fields
    # -------------------------------------------------------------------
    # These columns are essential to define a valid trip.
    required = [
        "tpep_pickup_datetime",
        "tpep_dropoff_datetime",
        "PULocationID",    # pickup location
        "DOLocationID",    # dropoff location
        "fare_amount",     # fare paid
        "total_amount"     # total charged (fare + extras)
    ]
    df = df.dropna(subset=required)

    # -------------------------------------------------------------------
    # 4. Apply logical filters and remove outliers
    # -------------------------------------------------------------------
    # Keep only trips lasting between 1 minute and 3 hours (180 minutes)
    df = df[(df["duration_min"] >= 1) & (df["duration_min"] <= 180)]
    
    # Replace missing distances with 0 (assume unreported distance means 0 miles)
    df["trip_distance"] = df["trip_distance"].fillna(0)
    
    # Keep trips where distance is between 0 and 100 miles
    df = df[(df["trip_distance"] >= 0) & (df["trip_distance"] <= 100)]
    
    # Keep trips where fare and total amount are non-negative
    df = df[(df["fare_amount"] >= 0) & (df["total_amount"] >= 0)]

    # -------------------------------------------------------------------
    # 5. Sanity check: Fare per mile
    # -------------------------------------------------------------------
    # - If distance is 0, skip calculation (replace with NaN).
    # - Keep only trips where fare per mile is between $0.50 and $25.
    with np.errstate(divide="ignore", invalid="ignore"):
        fpm = df["fare_amount"] / df["trip_distance"].replace({0: np.nan})
    
    df["fare_per_mile"] = fpm
    
    # Keep trips with reasonable fare per mile OR NaN (for zero distance trips)
    df = df[
        (df["fare_per_mile"].isna()) |
        ((df["fare_per_mile"] >= 0.5) & (df["fare_per_mile"] <= 25))
    ]

    # -------------------------------------------------------------------
    # 6. Remove duplicate trips
    # -------------------------------------------------------------------
    # Define columns that uniquely identify a trip:
    # pickup time, dropoff time, pickup location, dropoff location, total amount.
    # If multiple identical rows exist, keep the first and drop the rest.
    before = len(df)
    dedup_cols = [
        "tpep_pickup_datetime",
        "tpep_dropoff_datetime",
        "PULocationID",
        "DOLocationID",
        "total_amount"
    ]
    df = df.drop_duplicates(subset=dedup_cols, keep="first")
    removed = before - len(df)
    print(f"Deduplicated: removed {removed} rows")

    # -------------------------------------------------------------------
    # 7. Return the cleaned DataFrame
    # -------------------------------------------------------------------
    return df


In [None]:
# 1. Apply the cleaning + feature engineering function to February data
cleaned_feb = clean_add_features(df_feb)

# 2. Define the warehouse output directory
out_dir = "data/warehouse/yellow_clean"

# 3. Save the cleaned February dataset to parquet files
#    - engine="pyarrow": needed for partitioned writes
#    - index=False: don’t write the pandas index column
#    - partition_cols=["year","month"]: split files into subfolders by year and month
cleaned_feb.to_parquet(out_dir, engine="pyarrow", index=False, partition_cols=["year","month"])

# 4. Print a confirmation so you know where the data was written
print("Cleaned Feb data saved under:", out_dir)


In [None]:
# Clean the raw February dataset using your cleaning + feature engineering function
cleaned_feb = clean_add_features(df_feb)

# Define the warehouse directory where partitioned parquet files will be stored
out_dir = "data/warehouse/yellow_clean"

# Save the cleaned February data into the warehouse
# - engine="pyarrow": parquet backend that supports partitioned writes
# - index=False: do not include the DataFrame index in the parquet file
# - partition_cols=["year","month"]: automatically organize into subfolders by year/month
cleaned_feb.to_parquet(
    out_dir,
    engine="pyarrow",
    index=False,
    partition_cols=["year","month"]
)

# Print a confirmation so you know where the cleaned files ended up
print("Cleaned Feb data saved under:", out_dir)
