In [3]:
import os
import argparse
import pandas as pd
import mlflow
from mlflow.tracking.client import MlflowClient
from dotenv import load_dotenv
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error

In [4]:
# Download Data - needs to be run in working-directory as src/train.py
def download_data(color, year, month):
    # Check if data is already downloaded
    if not os.path.exists(f"./data/{color}_tripdata_{year}-{month:02d}.parquet"):
        print(f"Downloading {color}_tripdata_{year}-{month:02d}.parquet")
        os.system(f"wget -P ./data https://d37ci6vzurychx.cloudfront.net/trip-data/{color}_tripdata_{year}-{month:02d}.parquet")
        print("Download finished")
    # Load the data
    print(f"Reading {color}_tripdata_{year}-{month:02d}.parquet")
    df = pd.read_parquet(f"./data/{color}_tripdata_{year}-{month:02d}.parquet")
    print("Import finished")
    return df

In [5]:
# Helper function to calculate trip duration
def calculate_trip_duration_in_minutes(df, features = features, target = target):
    df["duration"] = (df["lpep_dropoff_datetime"] - df["lpep_pickup_datetime"]).dt.total_seconds() / 60
    df = df[(df["duration"] >= 1) & (df["duration"] <= 60)]
    df = df[(df['passenger_count'] > 0) & (df['passenger_count'] < 8)]
    features.append(target)
    #df = df[features]
    print(features)

In [6]:
# Preprocessing steps including trip_duration
def preprocess(df, features = features, target = target):
    df = df.copy()
    df_processed = calculate_trip_duration_in_minutes(df)

    y = df_processed[target]
    X = df_processed.drop(columns=[target])

    X_train, X_test, y_train, y_test = train_test_split(
        X, y, random_state=42, test_size=0.2
    )
    return X_train, X_test, y_train, y_test

In [7]:
# Model Training
def train_model(X_train, X_test, y_train, y_test, color, year, month, features = features, target = target):
    
    # load and set environment variables
    load_dotenv()

    MLFLOW_TRACKING_URI = os.getenv("MLFLOW_TRACKING_URI")
    SA_KEY= os.getenv("SA_KEY")
    os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = SA_KEY

    mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
    client = MlflowClient(tracking_uri=MLFLOW_TRACKING_URI)

    print("Starting ML-Flow-Run")
    with mlflow.start_run():
    
        tags = {
            "model": "linear regression",
            "developer": "Gunnar",
            "dataset": f"{color}-taxi",
            "year": year,
            "month": month,
            "features": features,
            "target": target
        }
        mlflow.set_tags(tags)
        
        lr = LinearRegression()
        lr.fit(X_train, y_train)
    
        y_pred = lr.predict(X_test)
        rmse = mean_squared_error(y_test, y_pred, squared=False)
        mlflow.log_metric("rmse", rmse)
    
        mlflow.sklearn.log_model(lr, "model")
        run_id = mlflow.active_run().info.run_id

        model_uri = f"runs:/{run_id}/model"
        model_name = f"{color}-taxi-ride-duration-3"
        mlflow.register_model(model_uri=model_uri, name=model_name)

        model_version = 1
        new_stage = "Production"
        client.transition_model_version_stage(
        name=model_name,
        version=model_version,
        stage=new_stage,
        archive_existing_versions=False
    )
    print("model-training finished")

In [8]:
def main_train_model(color, year, month):

    # Download data
    df_taxi = download_data(color, year, month)

    # preprocess data
    X_train, X_test, y_train, y_test = preprocess(df_taxi)
    train_model(X_train, X_test, y_train, y_test, color, year, month)

In [9]:
color = "green"
year = 2021
month = 1

In [10]:
# Download data
df_taxi = download_data(color, year, month)
df_taxi.columns

Reading green_tripdata_2021-01.parquet
Import finished


Index(['VendorID', 'lpep_pickup_datetime', 'lpep_dropoff_datetime',
       'store_and_fwd_flag', 'RatecodeID', 'PULocationID', 'DOLocationID',
       'passenger_count', 'trip_distance', 'fare_amount', 'extra', 'mta_tax',
       'tip_amount', 'tolls_amount', 'ehail_fee', 'improvement_surcharge',
       'total_amount', 'payment_type', 'trip_type', 'congestion_surcharge'],
      dtype='object')

In [11]:
features = ["PULocationID", "DOLocationID", "trip_distance", "passenger_count", 
                "fare_amount", "total_amount"]
target = "duration"
calculate_trip_duration_in_minutes(df_taxi)

['PULocationID', 'DOLocationID', 'trip_distance', 'passenger_count', 'fare_amount', 'total_amount', 'duration']
duration
None
