In [1]:
import os
import requests
import datetime
import pandas as pd
import psycopg

from joblib import load, dump
from tqdm import tqdm
from loguru import logger

from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_absolute_error, mean_absolute_percentage_error


In [2]:
from evidently import ColumnMapping
from evidently.report import Report
from evidently.metrics import ( ColumnDriftMetric, DatasetDriftMetric, 
    DatasetMissingValuesMetric, ColumnQuantileMetric )


In [3]:
files = [
    ('green_tripdata_2024-01.parquet', './data'),
    ('green_tripdata_2024-02.parquet', './data'),
    ('green_tripdata_2024-03.parquet', './data'),
#    ('green_tripdata_2024-04.parquet', './data'),
    ]


In [4]:
print(" [i] Download datasets..")
for file, path in files:
    save_path=f"{path}/{file}"
    if os.path.exists(save_path): continue

    url=f"https://d37ci6vzurychx.cloudfront.net/trip-data/{file}"
    resp=requests.get(url, stream=True)
    with open(save_path, "wb") as handle:
        for data in tqdm(resp.iter_content(),
                        desc=f"{file}",
                        postfix=f"save to {save_path}",
                        total=int(resp.headers["Content-Length"])):
            handle.write(data)
            

 [i] Download datasets..


In [5]:
# load dataset and model
raw_dataset = pd.read_parquet('data/green_tripdata_2024-03.parquet')
ref_dataset = pd.read_parquet('data/reference.parquet')

with open('models/lin_reg.bin', 'rb') as f_input:
    model = load(f_input)


In [6]:
# features
predict = "duration"
numeric_features = ["passenger_count", "trip_distance", "fare_amount", "total_amount"]
categorical_features = ["PULocationID", "DOLocationID"]


In [7]:
# column mapping
column_mapping = ColumnMapping(
    target=None,
    prediction='prediction',
    numerical_features=numeric_features,
    categorical_features=categorical_features
)


In [8]:
# report
report = Report(
    metrics=[
        ColumnDriftMetric(column_name='prediction'),
        ColumnQuantileMetric(column_name='fare_amount', quantile=0.5),
    ]
)


In [9]:
# filters
## lpep_pickup_datetime
## raw_dataset['lpep_pickup_datetime'].dt.year == 2024
## taw_dataset['lpep_pickup_datetime'].dt.month == 3
raw_dataset = raw_dataset[ raw_dataset['lpep_pickup_datetime'].dt.year == 2024 ]
raw_dataset = raw_dataset[ raw_dataset['lpep_pickup_datetime'].dt.month == 3 ]


In [10]:
# dates, so we don't have to calculate them
dates = list(pd.to_datetime(raw_dataset['lpep_pickup_datetime']).dt.date.unique())


In [11]:
logger.info(raw_dataset.shape)


[32m2024-06-21 12:41:30.704[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m1[0m - [1m(57447, 20)[0m


In [12]:

def prep_dataset(dff: pd.DataFrame) -> pd.DataFrame:
    """
    Create a "y_val" (or "predicted") column named "duration",
    and remove outliers from the dataset
    """

    # create "duration" (aka target)
    dff["duration"] = dff.lpep_dropoff_datetime - dff.lpep_pickup_datetime
    dff["duration"] = dff["duration"].apply(lambda td : float(td.total_seconds())/60)

    # filter out outliers
    dff = dff[ (dff['duration'] >= 0) & (dff['duration'] <= 60) ]
    dff = dff[ (dff['passenger_count'] > 0) & (dff['passenger_count'] <= 8) ]

    return dff


In [13]:

def calculate_metric(df: pd.DataFrame):
    """
    Calculate the metrics for the "metric" above.
    """

    # logger.debug(f" [i] Running model prediction(s)..")
    df.fillna(0, inplace=True)  ## make sure no NaN exists
    df['prediction'] = model.predict(df[numeric_features + categorical_features])
    
    logger.debug(f" [i] Running evidently report(s)..")    
    report.run(
        reference_data=ref_dataset,
        current_data = df,
        column_mapping = column_mapping
    )

    result = report.as_dict()

    ## metrics[-1] -> last one from report
    ## there's also metrics[0] but not returned
    return result['metrics'][-1]['result']['current']['value']


In [14]:

create_table = """
    drop table if exists metrics;
    create table metrics(
        timestamp date,
        fare_amount float
        )
    """

def prep_database():
    with psycopg.connect(
        dbname='postgres',
        host='localhost',
        port=5432,
        user='postgres',
        password='s3cureP@55w0rd',
        autocommit=True
        ) as conn:
            
        conn.execute(create_table)
    

In [15]:
prep_database()


In [16]:

with psycopg.connect(
    dbname='postgres',
    host='localhost',
    port=5432,
    user='postgres',
    password='s3cureP@55w0rd'
) as conn:
    df = prep_dataset(raw_dataset)
    for this_date in dates:
        
        logger.debug(f" [i] Daily monitoring for date: {str(this_date)}")
        df_chunk = df[ df['lpep_pickup_datetime'].dt.date == this_date ]
        quant = calculate_metric(df_chunk)
        conn.execute(f"""
            INSERT INTO metrics(timestamp, fare_amount) VALUES('{this_date}', {quant});
            """)
        

[32m2024-06-21 12:41:30.948[0m | [34m[1mDEBUG   [0m | [36m__main__[0m:[36m<module>[0m:[36m11[0m - [34m[1m [i] Daily monitoring for date: 2024-03-01[0m
[32m2024-06-21 12:41:30.961[0m | [34m[1mDEBUG   [0m | [36m__main__[0m:[36mcalculate_metric[0m:[36m10[0m - [34m[1m [i] Running evidently report(s)..[0m
[32m2024-06-21 12:41:30.992[0m | [34m[1mDEBUG   [0m | [36m__main__[0m:[36m<module>[0m:[36m11[0m - [34m[1m [i] Daily monitoring for date: 2024-03-02[0m
[32m2024-06-21 12:41:31.003[0m | [34m[1mDEBUG   [0m | [36m__main__[0m:[36mcalculate_metric[0m:[36m10[0m - [34m[1m [i] Running evidently report(s)..[0m
[32m2024-06-21 12:41:31.029[0m | [34m[1mDEBUG   [0m | [36m__main__[0m:[36m<module>[0m:[36m11[0m - [34m[1m [i] Daily monitoring for date: 2024-03-03[0m
[32m2024-06-21 12:41:31.041[0m | [34m[1mDEBUG   [0m | [36m__main__[0m:[36mcalculate_metric[0m:[36m10[0m - [34m[1m [i] Running evidently report(s)..[0m
[32m2024-

[32m2024-06-21 12:41:32.044[0m | [34m[1mDEBUG   [0m | [36m__main__[0m:[36m<module>[0m:[36m11[0m - [34m[1m [i] Daily monitoring for date: 2024-03-26[0m
[32m2024-06-21 12:41:32.055[0m | [34m[1mDEBUG   [0m | [36m__main__[0m:[36mcalculate_metric[0m:[36m10[0m - [34m[1m [i] Running evidently report(s)..[0m
[32m2024-06-21 12:41:32.089[0m | [34m[1mDEBUG   [0m | [36m__main__[0m:[36m<module>[0m:[36m11[0m - [34m[1m [i] Daily monitoring for date: 2024-03-27[0m
[32m2024-06-21 12:41:32.101[0m | [34m[1mDEBUG   [0m | [36m__main__[0m:[36mcalculate_metric[0m:[36m10[0m - [34m[1m [i] Running evidently report(s)..[0m
[32m2024-06-21 12:41:32.135[0m | [34m[1mDEBUG   [0m | [36m__main__[0m:[36m<module>[0m:[36m11[0m - [34m[1m [i] Daily monitoring for date: 2024-03-28[0m
[32m2024-06-21 12:41:32.147[0m | [34m[1mDEBUG   [0m | [36m__main__[0m:[36mcalculate_metric[0m:[36m10[0m - [34m[1m [i] Running evidently report(s)..[0m
[32m2024-

In [17]:
# query that database
with psycopg.connect(
    dbname='postgres',
    host='localhost',
    port=5432,
    user='postgres',
    password='s3cureP@55w0rd'
) as conn:
    resp = conn.execute("""
        SELECT MAX(fare_amount) FROM metrics;
        """)
    fare_amounts = resp.fetchall()


In [18]:
fare_amounts


[(14.2,)]