In [1]:
import datetime
import time
import random
import logging 
import pandas as pd
import psycopg2 as psycopg
from joblib import load, dump

from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_absolute_error, mean_absolute_percentage_error

In [2]:
from evidently.report import Report
from evidently import ColumnMapping
from evidently.metrics import ColumnDriftMetric, DatasetDriftMetric, DatasetMissingValuesMetric

In [3]:
import requests
from tqdm import tqdm

files = [('green_tripdata_2022-02.parquet', './data'), ('green_tripdata_2022-01.parquet', './data')]

print("Download files:")
for file, path in files:
    url=f"https://d37ci6vzurychx.cloudfront.net/trip-data/{file}"
    resp=requests.get(url, stream=True)
    save_path=f"{path}/{file}"
    with open(save_path, "wb") as handle:
        for data in tqdm(resp.iter_content(),
                        desc=f"{file}",
                        postfix=f"save to {save_path}",
                        total=int(resp.headers["Content-Length"])):
            handle.write(data)

Download files:


green_tripdata_2022-02.parquet: 100%|█████████████████████████████████████████████████████████████████████████| 1428262/1428262 [00:08<00:00, 163745.57it/s, save to ./data/green_tripdata_2022-02.parquet]
green_tripdata_2022-01.parquet: 100%|█████████████████████████████████████████████████████████████████████████| 1254291/1254291 [00:08<00:00, 155816.20it/s, save to ./data/green_tripdata_2022-01.parquet]


In [4]:
jan_data = pd.read_parquet('data/green_tripdata_2022-01.parquet')

In [5]:
# create target
jan_data["duration_min"] = jan_data.lpep_dropoff_datetime - jan_data.lpep_pickup_datetime
jan_data.duration_min = jan_data.duration_min.apply(lambda td : float(td.total_seconds())/60)

In [6]:
# filter out outliers
jan_data = jan_data[(jan_data.duration_min >= 0) & (jan_data.duration_min <= 60)]
jan_data = jan_data[(jan_data.passenger_count > 0) & (jan_data.passenger_count <= 8)]

In [7]:
# data labeling
target = "duration_min"
num_features = ["passenger_count", "trip_distance", "fare_amount", "total_amount"]
cat_features = ["PULocationID", "DOLocationID"]

jan_data.shape

(55211, 21)

In [8]:
train_data = jan_data[:30000]
val_data = jan_data[30000:]

In [9]:
model = LinearRegression()
model.fit(train_data[num_features + cat_features], train_data[target])

In [10]:
train_preds = model.predict(train_data[num_features + cat_features])
train_data['prediction'] = train_preds

val_preds = model.predict(val_data[num_features + cat_features])
val_data['prediction'] = val_preds



In [11]:
print(mean_absolute_error(train_data.duration_min, train_data.prediction))
print(mean_absolute_error(val_data.duration_min, val_data.prediction))

3.804665373785062
4.142064073688447


In [None]:
with open('models/lin_reg.bin', 'wb') as f_out:
    dump(model, f_out)

val_data.to_parquet('data/reference.parquet')

In [13]:
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s]: %(message)s")

In [14]:
SEND_TIMEOUT = 10
rand = random.Random()

In [55]:
create_table_statement = """
drop table if exists grafana.evidently_metrics;
create table grafana.evidently_metrics(
	timestamp timestamp,
	prediction_drift float,
	num_drifted_columns integer,
	share_missing_values float
)
"""

In [17]:
reference_data = pd.read_parquet('data/reference.parquet') # validation data
with open('models/lin_reg.bin', 'rb') as f_in: # load production_model
	model = load(f_in)

In [18]:
raw_data = pd.read_parquet('data/green_tripdata_2022-02.parquet') # test data

In [20]:
begin = datetime.datetime(2022, 2, 1, 0, 0)
begin

datetime.datetime(2022, 2, 1, 0, 0)

In [21]:
num_features = ['passenger_count', 'trip_distance', 'fare_amount', 'total_amount']
cat_features = ['PULocationID', 'DOLocationID']
column_mapping = ColumnMapping(
    prediction='prediction',
    numerical_features=num_features,
    categorical_features=cat_features,
    target=None
)

In [22]:
report = Report(metrics = [
    ColumnDriftMetric(column_name='prediction'),
    DatasetDriftMetric(),
    DatasetMissingValuesMetric()
])


In [58]:
conn = psycopg.connect(
    dbname="app_db",
    user="root",
    password="root",
    host="localhost"
)

# Enable autocommit
conn.autocommit = True

# Create a cursor
cur = conn.cursor()

In [74]:
def prep_db():
    conn = psycopg.connect(
        user="root",
        password="root",
        host="localhost"
    )
    
    # Enable autocommit
    conn.autocommit = True
    
    # Create a cursor
    cur = conn.cursor()
    
    # Execute a query
    cur.execute("SELECT 1 FROM pg_database WHERE datname='app_db'")
    
    res = cur.fetchall()

    if len(res.fetchall()) == 0:
        curr.execute("create database app_db;")

    cur.execute(create_table_statement)

In [59]:
cur.execute(create_table_statement)

```python
def calculate_metrics_postgresql(curr, i):
	current_data = raw_data[(raw_data.lpep_pickup_datetime >= (begin + datetime.timedelta(i))) &
		(raw_data.lpep_pickup_datetime < (begin + datetime.timedelta(i + 1)))]

	#current_data.fillna(0, inplace=True)
	current_data['prediction'] = model.predict(current_data[num_features + cat_features].fillna(0))

	report.run(reference_data = reference_data, current_data = current_data,
		column_mapping=column_mapping)

	result = report.as_dict()

	prediction_drift = result['metrics'][0]['result']['drift_score']
	num_drifted_columns = result['metrics'][1]['result']['number_of_drifted_columns']
	share_missing_values = result['metrics'][2]['result']['current']['share_of_missing_values']

	curr.execute(
		"insert into grafana.evidently_metrics(timestamp, prediction_drift, num_drifted_columns, share_missing_values) values (%s, %s, %s, %s)",
		(begin + datetime.timedelta(i), prediction_drift, num_drifted_columns, share_missing_values)
	)
```

In [62]:
i = 0
begin + datetime.timedelta(i)

datetime.datetime(2022, 2, 1, 0, 0)

In [63]:
begin + datetime.timedelta(i + 1)

datetime.datetime(2022, 2, 2, 0, 0)

In [64]:
raw_data[(raw_data.lpep_pickup_datetime >= (begin + datetime.timedelta(i))) &
		(raw_data.lpep_pickup_datetime < (begin + datetime.timedelta(i + 1)))]

Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge
0,2,2022-02-01 00:20:21,2022-02-01 00:24:30,N,1.0,43,238,1.0,1.16,5.50,0.5,0.5,1.02,0.00,,0.3,7.82,1.0,1.0,0.00
1,2,2022-02-01 00:32:26,2022-02-01 00:35:31,N,1.0,166,24,1.0,0.57,4.50,0.5,0.5,0.00,0.00,,0.3,5.80,2.0,1.0,0.00
2,1,2022-02-01 00:17:27,2022-02-01 00:44:44,N,1.0,226,219,1.0,0.00,42.20,0.0,0.5,0.00,0.00,,0.3,43.00,1.0,1.0,0.00
3,2,2022-02-01 00:45:37,2022-02-01 01:27:16,N,1.0,89,83,1.0,16.62,49.00,0.5,0.5,0.00,0.00,,0.3,50.30,2.0,1.0,0.00
4,2,2022-02-01 00:06:46,2022-02-01 00:30:06,N,1.0,7,238,1.0,5.97,21.00,0.5,0.5,4.50,0.00,,0.3,29.55,1.0,1.0,2.75
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
62247,2,2022-02-01 22:21:00,2022-02-01 22:48:00,,,41,65,,13.28,37.37,0.0,0.0,8.93,6.55,,0.3,53.15,,,
62248,2,2022-02-01 23:34:00,2022-02-02 00:04:00,,,37,11,,13.60,43.65,0.0,0.0,0.00,0.00,,0.3,43.95,,,
62249,2,2022-02-01 23:31:00,2022-02-01 23:56:00,,,74,49,,11.96,38.32,0.0,0.0,9.91,6.55,,0.3,55.08,,,
62250,2,2022-02-01 23:09:00,2022-02-01 23:34:00,,,41,169,,5.03,19.05,0.0,0.0,4.30,0.00,,0.3,23.65,,,


In [65]:
current_data = raw_data[(raw_data.lpep_pickup_datetime >= (begin + datetime.timedelta(i))) &
		(raw_data.lpep_pickup_datetime < (begin + datetime.timedelta(i + 1)))]

In [66]:
current_data['prediction'] = model.predict(current_data[num_features + cat_features].fillna(0))

In [67]:
report.run(reference_data = reference_data, current_data = current_data,
		column_mapping=column_mapping)

In [69]:
result = report.as_dict()

In [72]:
conn = psycopg.connect(
    dbname="app_db",
    user="root",
    password="root",
    host="localhost"
)

# Create a cursor
cur = conn.cursor()

# Execute a query
cur.execute("SELECT 1 FROM pg_database WHERE datname='app_db'")

res = cur.fetchall()


In [73]:
res

[(1,)]

In [49]:
if len(res) == 0:
    # conn.execute("create database test;")
    print("create database test;")

In [53]:
conn = psycopg.connect(
    dbname="app_db",
    user="root",
    password="root",
    host="localhost"
)

# Create a cursor
cur = conn.cursor()

In [54]:
cur.execute(create_table_statement)

In [38]:
# Fetch results
len(res.fetchall())

AttributeError: 'NoneType' object has no attribute 'fetchall'

In [36]:
# Print results
for row in rows:
    print(row)

(1,)


In [31]:
cur = conn.cursor()

res = cur.execute()

In [33]:
res

In [26]:
with psycopg.connect("host=localhost port=5432 user=root password=root") as conn:
    res = conn.execute("SELECT 1 from pg_database where datname = 'app_db'")
    res.fetchall()

OperationalError: connection to server at "localhost" (127.0.0.1), port 5432 failed: FATAL:  database "root" does not exist


In [None]:
def prep_db():
	with psycopg.connect("host=localhost port=5432 user=root password=root", autocommit=True) as conn:
		res = conn.execute("SELECT 1 FROM pg_database WHERE datname='app_db'")
		if len(res.fetchall()) == 0:
			conn.execute("create database test;")
		with psycopg.connect("host=localhost port=5432 dbname=test user=root password=root") as conn:
			conn.execute(create_table_statement)

In [16]:
with open('models/lin_reg.bin', 'wb') as f_out:
    dump(model, f_out)

In [17]:
val_data.to_parquet('data/reference.parquet')

In [18]:
with open('models/lin_reg.bin', 'rb') as f_in:
	model = joblib.load(f_in)


In [None]:
train_preds = model.predict(train_data[num_features + cat_features])
train_data['prediction'] = train_preds

In [3]:
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s]: %(message)s")


In [4]:
SEND_TIMEOUT = 10
rand = random.Random()


In [5]:
create_table_statement = """
drop table if exists grafana.evidently_metrics;
create table grafana.evidently_metrics(
	timestamp timestamp,
	prediction_drift float,
	num_drifted_columns integer,
	share_missing_values float
)
"""


In [7]:
reference_data = pd.read_parquet('data/reference.parquet') # trainig data


In [None]:
raw_data = pd.read_parquet('data/green_tripdata_2022-02.parquet') # test data
