In [1]:
import pandas as pd
import pickle
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.feature_extraction import DictVectorizer
from sklearn.linear_model import LinearRegression
from sklearn.linear_model import Lasso
from sklearn.linear_model import Ridge

import dask.dataframe as dd

from sklearn.metrics import root_mean_squared_error, mean_squared_error

import altair as alt


In [2]:
# ! pip install "altair[all]"
# ! pip install "vegafusion[embed]"

In [3]:
def load_and_preprocess_data(file_path):
    """
    Load and preprocess the taxi trip data from a Parquet file.

    Args:
        file_path (str): Path to the Parquet file containing the trip data.

    Returns:
        pd.DataFrame: Preprocessed DataFrame with calculated trip durations and
                      appropriate data types for categorical features.
    """
    # Load data from Parquet file
    df = pd.read_parquet(file_path)

    # Calculate trip duration in minutes
    df['duration'] = (df['tpep_dropoff_datetime'] - df['tpep_pickup_datetime']).dt.total_seconds() / 60

    # Filter trips based on duration
    df = df[(df['duration'] >= 1) & (df['duration'] <= 60)]

    # Convert categorical features to string type
    categorical_columns = ['PULocationID', 'DOLocationID']
    df[categorical_columns] = df[categorical_columns].astype(str)

    return df

# Example usage
file_path = 'data/yellow_tripdata_2024-01.parquet'
df = load_and_preprocess_data(file_path)
df.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee,duration
0,2,2024-01-01 00:57:55,2024-01-01 01:17:43,1.0,1.72,1.0,N,186,79,2,17.7,1.0,0.5,0.0,0.0,1.0,22.7,2.5,0.0,19.8
1,1,2024-01-01 00:03:00,2024-01-01 00:09:36,1.0,1.8,1.0,N,140,236,1,10.0,3.5,0.5,3.75,0.0,1.0,18.75,2.5,0.0,6.6
2,1,2024-01-01 00:17:06,2024-01-01 00:35:01,1.0,4.7,1.0,N,236,79,1,23.3,3.5,0.5,3.0,0.0,1.0,31.3,2.5,0.0,17.916667
3,1,2024-01-01 00:36:38,2024-01-01 00:44:56,1.0,1.4,1.0,N,79,211,1,10.0,3.5,0.5,2.0,0.0,1.0,17.0,2.5,0.0,8.3
4,1,2024-01-01 00:46:51,2024-01-01 00:52:57,1.0,0.8,1.0,N,211,148,1,7.9,3.5,0.5,3.2,0.0,1.0,16.1,2.5,0.0,6.1


In [4]:

def prepare_features(df, categorical, numerical):
    """
    Prepare feature matrix from DataFrame for machine learning model.

    Args:
        df (pd.DataFrame): DataFrame containing the data.
        categorical (list): List of categorical column names.
        numerical (list): List of numerical column names.

    Returns:
        X (scipy.sparse.csr_matrix): Feature matrix.
        dv (DictVectorizer): Fitted DictVectorizer.
    """
    # Convert DataFrame to dictionary format suitable for DictVectorizer
    train_dicts = df[categorical + numerical].to_dict(orient='records')
    
    # Initialize and fit DictVectorizer
    dv = DictVectorizer()
    X = dv.fit_transform(train_dicts)
    
    return X, dv

def train_model(X, y):
    """
    Train a linear regression model.

    Args:
        X (scipy.sparse.csr_matrix): Feature matrix.
        y (numpy.ndarray): Target vector.

    Returns:
        model (LinearRegression): Trained linear regression model.
    """
    # Initialize and train the linear regression model
    model = LinearRegression()
    model.fit(X, y)
    
    return model

def evaluate_model(model, X, y):
    """
    Evaluate the trained model using RMSE.

    Args:
        model (LinearRegression): Trained linear regression model.
        X (scipy.sparse.csr_matrix): Feature matrix.
        y (numpy.ndarray): True target values.

    Returns:
        float: Root Mean Squared Error (RMSE) of the predictions.
    """
    # Predict target values using the model
    y_pred = model.predict(X)
    
    # Calculate and return the RMSE
    rmse = root_mean_squared_error(y, y_pred)
    
    return rmse

# Define column names for categorical and numerical features
categorical_columns = ['PULocationID', 'DOLocationID']
numerical_columns = ['trip_distance']

# Prepare features and target variable
X_train, dv = prepare_features(df, categorical_columns, numerical_columns)
y_train = df['duration'].values

# Train the model
lr_model = train_model(X_train, y_train)
y_pred = lr_model.predict(X_train)

# Evaluate the model
rmse = evaluate_model(lr_model, X_train, y_train)
print(f"Root Mean Squared Error: {rmse}")


Root Mean Squared Error: 7.952029670782532


In [5]:
y_train.shape

(2898906,)

In [6]:
# Increase the row limit
alt.data_transformers.disable_max_rows()


# Create DataFrame
df = pd.DataFrame({'actual': y_train, 'prediction': y_pred})

# Sample a subset of the data for plotting (e.g., 10000 records)
df_sampled = df.sample(n=10000, random_state=42)

# Melt the DataFrame for Altair
df_long = df_sampled.melt(var_name='Type', value_name='Value')

# Create the density plot using Altair
density_plot = alt.Chart(df_long).transform_density(
    'Value',
    as_=['Value', 'Density'],
    groupby=['Type']
).mark_area(opacity=0.3).encode(
    alt.X('Value:Q', title='Value'),
    alt.Y('Density:Q', title='duration (minutes)'),
    alt.Color('Type:N', title='time')
).properties(
    width=600,
    height=400,
    title=alt.TitleParams(
        text='Distribution Plot (Sampled)',
        subtitle='distribution of actual and predicted values',    
    )
)


# Display the plot
density_plot


In [7]:
# def read_dataframe(filename):
#     if filename.endswith('.csv'):
#         df = dd.read_csv(filename)

#         df['tpep_dropoff_datetime'] = dd.to_datetime(df['tpep_dropoff_datetime'])
#         df['tpep_pickup_datetime'] = dd.to_datetime(df['tpep_pickup_datetime'])
#     elif filename.endswith('.parquet'):
#         df = dd.read_parquet(filename)

#     df['duration'] = df['tpep_dropoff_datetime'] - df['tpep_pickup_datetime']
#     df['duration'] = df['duration'].dt.total_seconds() / 60

#     df = df[(df['duration'] >= 1) & (df['duration'] <= 60)]

#     categorical = ['PULocationID', 'DOLocationID']
#     df[categorical] = df[categorical].astype(str)
    
#     return df


In [8]:
def read_dataframe(filename):
    if filename.endswith('.csv'):
        df = pd.read_csv(filename)

        df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
        df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
    elif filename.endswith('.parquet'):
        df = pd.read_parquet(filename)

    df['duration'] = df.tpep_dropoff_datetime - df.tpep_pickup_datetime
    df.duration = df.duration.apply(lambda td: td.total_seconds() / 60)

    df = df[(df.duration >= 1) & (df.duration <= 60)]

    categorical = ['PULocationID', 'DOLocationID']
    df[categorical] = df[categorical].astype(str)
    
    return df

In [9]:
threshold = 2100000

df_train = read_dataframe('./data/yellow_tripdata_2024-01.parquet')[:threshold]
df_val = read_dataframe('./data/yellow_tripdata_2024-02.parquet')[:threshold]

# df_train = read_dataframe('./data/yellow_tripdata_2024-01.parquet')
# df_val = read_dataframe('./data/yellow_tripdata_2024-02.parquet')

In [10]:
df_train.shape, df_val.shape

((2100000, 20), (2100000, 20))

In [11]:
# Combine categorical columns into a single column
df_train['PU_DO'] = df_train['PULocationID'].astype(str) + '_' + df_train['DOLocationID'].astype(str)
df_val['PU_DO'] = df_val['PULocationID'].astype(str) + '_' + df_val['DOLocationID'].astype(str)


In [12]:
categorical = ['PU_DO'] #'PULocationID', 'DOLocationID']
numerical = ['trip_distance']

dv = DictVectorizer()

train_dicts = df_train[categorical + numerical].to_dict(orient='records')
X_train = dv.fit_transform(train_dicts)

val_dicts = df_val[categorical + numerical].to_dict(orient='records')
X_val = dv.transform(val_dicts)

In [13]:
target = 'duration'
y_train = df_train[target].values
y_val = df_val[target].values

In [14]:
lr = LinearRegression()
lr.fit(X_train, y_train)

y_pred = lr.predict(X_val)

root_mean_squared_error(y_val, y_pred)


5.5742603185599915

In [15]:
with open('models/lin_reg.bin', 'wb') as f_out:
    pickle.dump((dv, lr), f_out)


In [16]:
lr = Lasso(0.01)
lr.fit(X_train, y_train)

y_pred = lr.predict(X_val)

root_mean_squared_error(y_val, y_pred)

18.227024358149208

### TODO:

- rething forecating beyond 2m datapoints
- use altair for better charts 
- add docstrings to functions
