# 1) Data Exploration

In [None]:
import urllib
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
import PIL
import os
from pathlib import Path

from sklearn.compose import ColumnTransformer, make_column_transformer
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import FunctionTransformer, OneHotEncoder

from sklearn import set_config; set_config(display='diagram')

## 1.1) Load Data

**All data**

WagonCab's engineering team stores all it's cab course history since 2009 in a massive Big Query table `wagon-public-datasets.taxifare.raw_all`. 
- This table contains `1.1 Million` for this challenge exactly, from **2009 to jun 2015**. 
- *(Note from Le Wagon: In reality, there is 55M rows but we limited that for cost-control in the whole module)*

**Training phase data**

- The training phase takes place in `2015-01-01`. We can't access any data beyond that date to train our model
- The datascientist's notebook hereby trains a ML model on a **200k randomly sampled subset** (so that everything fit in RAM memory on the Data Scientist's laptop)
- WagonCab's data-engineering team has created a `materialized view` of this `raw` table called `raw_200k` and given read access to the DataScientist for its training.

In [None]:
LOCAL_DATA_PATH = Path('~').joinpath(".lewagon", "mlops", "data").expanduser()
GCP_PROJECT_WAGON = "wagon-public-datasets"
BQ_DATASET = "taxifare"
DATA_SIZE = "200k"  # raw_200k is a randomly sampled materialized view from "raw_all" data table
MIN_DATE = '2009-01-01'
MAX_DATE = '2015-01-01'
COLUMN_NAMES_RAW = ('fare_amount',	'pickup_datetime', 'pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude', 'passenger_count')

**Fill your `GCP_PROJECT` below 👇**, Then let's query the historical data (pre-2015), ordered by date (so as to train/test split chronologically more easily) !

In [None]:
GCP_PROJECT = "<your gcp project id>"

In [None]:
query = f"""
    SELECT {",".join(COLUMN_NAMES_RAW)}
    FROM {GCP_PROJECT_WAGON}.{BQ_DATASET}.raw_{DATA_SIZE} 
    WHERE pickup_datetime BETWEEN '{MIN_DATE}' AND '{MAX_DATE}'
    ORDER BY pickup_datetime
    """
print(query)

Make the BigQuery call only if file does not already exist locally.  
Else, store locally as CSV to avoid calling BigQuery at each notebook run.

In [None]:
data_query_cache_path = Path(LOCAL_DATA_PATH).joinpath("raw", f"query_{MIN_DATE}_{MAX_DATE}_{DATA_SIZE}.csv")

if data_query_cache_path.is_file():
    print("load local file...")
    df = pd.read_csv(data_query_cache_path, parse_dates=['pickup_datetime'])
    
else:
    print("Querying Big Query server...")
    from google.cloud import bigquery
    
    client = bigquery.Client(project=GCP_PROJECT)
    query_job = client.query(query)
    result = query_job.result() 
    df = result.to_dataframe()
    
    df.to_csv(data_query_cache_path, header=True, index=False)
    

In [None]:
df

In [None]:
df.info()

### 1.1.1) Compress Data

Let's compress our DataFrame by lowering its numeric `dtypes`
- from  `float64` to `float32`
- from `int64` to `int8`

To do so, we iterate on its columns, and for each one, reduce its `dtypes` as much as possible using [`pd.to_numeric`](https://pandas.pydata.org/docs/reference/api/pandas.to_numeric.html)

**💡 1) Read more about `dtype` compression in the ML Ops - Train at Scale lecture on Kitt, "Appendix A1: Memory Optimization"**

**💡 2) Then, understand and execute the following code**

In [None]:
def compress(df, **kwargs):
    """
    Reduces the size of the DataFrame by downcasting numerical columns
    """
    input_size = df.memory_usage(index=True).sum()/ 1024**2
    print("old dataframe size: ", round(input_size,2), 'MB')
    
    in_size = df.memory_usage(index=True).sum()

    for t in ["float", "integer"]:
        l_cols = list(df.select_dtypes(include=t))

        for col in l_cols:
            df[col] = pd.to_numeric(df[col], downcast=t)

    out_size = df.memory_usage(index=True).sum()
    ratio = (1 - round(out_size / in_size, 2)) * 100
    
    print("optimized size by {} %".format(round(ratio,2)))
    print("new DataFrame size: ", round(out_size / 1024**2,2), " MB")

    return df

In [None]:
df = compress(df, verbose=True)
df.head(1)

Let's check dtypes optimized

In [None]:
df.dtypes

💡 Int8 is a bit too small for some scientific libraries, let's use int16

In [None]:
df.passenger_count = df.passenger_count.astype("int16")

💡 Next time, we can force dtypes directly **at loading time** as follow:

```python
query_job = client.query(query)
result = query_job.result() 
df = result.to_dataframe(dtypes=DTYPES_RAW)
```

In [None]:
DTYPES_RAW = {
    "key": "datetime64[ns, UTC]",
    "fare_amount": "float32",
    "pickup_datetime": "datetime64[ns, UTC]",
    "pickup_longitude": "float32",
    "pickup_latitude": "float32",
    "dropoff_longitude": "float32",
    "dropoff_latitude": "float32",
    "passenger_count": "int16"
}

## 1.2) Clean Data

In [None]:
df.describe()

In [None]:
df.shape

remove redundant rows

In [None]:
df = df.drop_duplicates()
df.shape

Remove buggy transactions

In [None]:
df = df.dropna(how='any', axis=0)
print(df.shape)
df = df[(df.dropoff_latitude != 0) | (df.dropoff_longitude != 0) | (df.pickup_latitude != 0) | (df.pickup_longitude != 0)]
df = df[df.passenger_count > 0]
df = df[df.fare_amount > 0]
print(df.shape)

Remove geographically irrelevant transactions (rows)

In [None]:
# Let's check NYC bouding boxes
# Load image of NYC map
bounding_boxes = (-74.3, -73.7, 40.5, 40.9)

url = 'https://wagon-public-datasets.s3.amazonaws.com/data-science-images/07-ML-OPS/nyc_-74.3_-73.7_40.5_40.9.png'
nyc_map = np.array(PIL.Image.open(urllib.request.urlopen(url)))

plt.imshow(nyc_map);

In [None]:
df = df[df["pickup_latitude"].between(left=40.5, right=40.9)]
df = df[df["dropoff_latitude"].between(left=40.5, right=40.9)]
df = df[df["pickup_longitude"].between(left=-74.3, right=-73.7)]
df = df[df["dropoff_longitude"].between(left=-74.3, right=-73.7)]

In [None]:
df.describe()

In [None]:
# Let's cap training set to reasonable values 
df = df[df.fare_amount < 400]
df = df[df.passenger_count < 8]

In [None]:
df

## 1.3) Visualize Data

In [None]:
# Plot histogram of fare
df.fare_amount.hist(bins=100, figsize=(14,3))
plt.xlabel('fare $USD')
plt.title('Histogram')

In [None]:
# this function will be used more often to plot data on the NYC map
def plot_on_map(df, BB, nyc_map, s=10, alpha=0.2):
    fig, axs = plt.subplots(1, 2, figsize=(16,10))

    axs[0].scatter(df.pickup_longitude, df.pickup_latitude, zorder=1, alpha=alpha, c='red', s=s)
    axs[0].set_xlim((BB[0], BB[1]))
    axs[0].set_ylim((BB[2], BB[3]))
    axs[0].set_title('Pickup locations')
    axs[0].imshow(nyc_map, zorder=0, extent=BB)

    axs[1].scatter(df.dropoff_longitude, df.dropoff_latitude, zorder=1, alpha=alpha, c='blue', s=s)
    axs[1].set_xlim((BB[0], BB[1]))
    axs[1].set_ylim((BB[2], BB[3]))
    axs[1].set_title('Dropoff locations')
    axs[1].imshow(nyc_map, zorder=0, extent=BB)

In [None]:
# Plot training data on map
plot_on_map(df, bounding_boxes, nyc_map, s=1, alpha=0.3)

In [None]:
plot_on_map(df, bounding_boxes, nyc_map, s=20, alpha=1.0)

In [None]:
def plot_hires(df, BB, figsize=(12, 12), ax=None, c=('r', 'b')):
    if ax == None:
        fig, ax = plt.subplots(1, 1, figsize=figsize)
    
    def select_within_boundingbox(df, BB):
        return (df.pickup_longitude >= BB[0]) & (df.pickup_longitude <= BB[1]) & \
            (df.pickup_latitude >= BB[2]) & (df.pickup_latitude <= BB[3]) & \
            (df.dropoff_longitude >= BB[0]) & (df.dropoff_longitude <= BB[1]) & \
            (df.dropoff_latitude >= BB[2]) & (df.dropoff_latitude <= BB[3])
            
    idx = select_within_boundingbox(df, BB)
    ax.scatter(df[idx].pickup_longitude, df[idx].pickup_latitude, c="red", s=0.01, alpha=0.5)
    ax.scatter(df[idx].dropoff_longitude, df[idx].dropoff_latitude, c="blue", s=0.01, alpha=0.5)

In [None]:
plot_hires(df, (-74.1, -73.7, 40.6, 40.9))

In [None]:
plot_hires(df, (-74, -73.95, 40.7, 40.8))

### 1.4) Baseline Score  - Preliminary Intuitions

A baseline model should at least take into account the most obvious feature: the distance between `pickup` and `dropoff`.

The correct distance metric is appropriately named "Manhattan distance" (L1 distance), which computes the sum of horizontal and vertical distances between two points, instead of the diagonal (Euclidean, L2) distance.

In [None]:
import math

def manhattan_distance(start_lat: float, start_lon: float, end_lat: float, end_lon: float) -> float:
    """
    Calculate the Manhattan distance between in km two points on the earth (specified in decimal degrees).
    """
    earth_radius = 6371
    
    lat_1_rad, lon_1_rad = math.radians(start_lat), math.radians(start_lon)
    lat_2_rad, lon_2_rad = math.radians(end_lat), math.radians(end_lon)
    
    dlon_rad = lon_2_rad - lon_1_rad
    dlat_rad = lat_2_rad - lat_1_rad
    
    manhattan_rad = abs(dlon_rad) + abs(dlat_rad)
    manhattan_km = manhattan_rad * earth_radius
    
    return manhattan_km

In [None]:
%%time
df.apply(lambda row: manhattan_distance(row["pickup_latitude"], row["pickup_longitude"], row["dropoff_latitude"], row["dropoff_longitude"]), axis=1)

☝️ The following code takes a while and is not optimized.

Applying a mapping function row-by-row on a DataFrame is a very bad engineering practice, as DataFrames are stored in memory "column-by-column". We talk about "column-major" data storage formats. 

`df.apply(..., axis=1)` is equivalent to a python `for` loop, and does not harness NumPy's vectorized operations.

👇 Let's vectorize our code instead! Notice the improvement by a factor of several hundred 💪

In [None]:
def manhattan_distance_vectorized(df: pd.DataFrame, start_lat: str, start_lon: str, end_lat: str, end_lon: str) -> dict:
    """
    Calculate the Manhattan distance in km between two points on the earth (specified in decimal degrees).
    Vectorized version for pandas df
    """
    earth_radius = 6371
    
    lat_1_rad, lon_1_rad = np.radians(df[start_lat]), np.radians(df[start_lon])
    lat_2_rad, lon_2_rad = np.radians(df[end_lat]), np.radians(df[end_lon])
    
    dlon_rad = lon_2_rad - lon_1_rad
    dlat_rad = lat_2_rad - lat_1_rad
    
    manhattan_rad = np.abs(dlon_rad) + np.abs(dlat_rad)
    manhattan_km = manhattan_rad * earth_radius
    
    return manhattan_km

In [None]:
%%time
manhattan_distance_vectorized(df, "pickup_latitude", "pickup_longitude","dropoff_latitude", "dropoff_longitude")

In [None]:
df['distance'] = manhattan_distance_vectorized(df, "pickup_latitude", "pickup_longitude","dropoff_latitude", "dropoff_longitude")
df['distance'].hist(bins=50)
plt.title("distance (km)")

Let's drop the `distance` column we added manually, and create a true preprocessing pipeline now

In [None]:
df = df.drop(columns=['distance'])
df.shape

# 2) Train/Val/Test Split

🚨 We're dealing with timestamped data:
- It's important to split train/val/test in a **chronological** manner.
- We don't want to hold too long val or tests sets: macro-economic conditions are changing fast in real life!
- The smaller the `split_ratio`, the more often we'll have to re-train our model
- With a dataset of 6 years, it's perfectly fine to keep 1 month ahead of val, 1 month ahead for test
- In production, this means we shouldn't trust our model performance to extend beyond 1 month in the future

In [None]:
split_ratio = 0.02 # ~1 month for val, ~1 month for test

test_length = int(len(df) * split_ratio)
val_length = int((len(df)-test_length) * split_ratio)
train_length = len(df) - val_length - test_length

df_train = df.iloc[:train_length, :].sample(frac=1) # Shuffle datasets to improve training
df_val = df.iloc[train_length: train_length + val_length, :].sample(frac=1)
df_test = df.iloc[train_length+val_length:, :].sample(frac=1)

print(df_train.shape)
print(df_val.shape)
print(df_test.shape)

assert len(df_train) + len(df_val) + len(df_test) == len(df)

In [None]:
print(df_train.pickup_datetime.min())
print(df_train.pickup_datetime.max())
print('---')
print(df_val.pickup_datetime.min())
print(df_val.pickup_datetime.max())
print('---')
print(df_test.pickup_datetime.min())
print(df_test.pickup_datetime.max())

In [None]:

X = df.drop("fare_amount", axis=1)
y = df[["fare_amount"]]

X_train = df_train.drop("fare_amount", axis=1)
y_train = df_train[["fare_amount"]]

X_val = df_val.drop("fare_amount", axis=1)
y_val = df_val[["fare_amount"]]

X_test = df_test.drop("fare_amount", axis=1)
y_test = df_test[["fare_amount"]]

### Simple Baseline: $price = a * distance + b $

In [None]:
distances_train = np.array(manhattan_distance_vectorized(X_train, "pickup_latitude", "pickup_longitude","dropoff_latitude", "dropoff_longitude"))
distances_val = np.array(manhattan_distance_vectorized(X_val, "pickup_latitude", "pickup_longitude","dropoff_latitude", "dropoff_longitude"))
distances_test = np.array(manhattan_distance_vectorized(X_test, "pickup_latitude", "pickup_longitude","dropoff_latitude", "dropoff_longitude"))

In [None]:
sns.regplot(x=distances_train, y=y_train);

In [None]:
from scipy.stats import pearsonr

pearson, p_value = pearsonr(distances_train, y_train)
print(f'{pearson=}')
print(f'{p_value=}')

In [None]:
from sklearn.linear_model import LinearRegression

baseline_model = LinearRegression()
baseline_model.fit(distances_train[:, None], y_train)

In [None]:
baseline_pred_val = baseline_model.predict(distances_val[:, None])
baseline_pred_test = baseline_model.predict(distances_test[:, None])
baseline_mae_val = np.mean(np.abs(baseline_pred_val - y_val), axis=0)
baseline_mae_test = np.mean(np.abs(baseline_pred_test - y_test), axis=0)

print(f'mean taxifare prices on train set = {round(float(np.mean(y_train, axis=0)),2)} $')
print(f'🎯 baseline MAE on val set = {round(float(baseline_mae_val),2)} $')
print(f'🎯 baseline MAE on test set = {round(float(baseline_mae_test),2)} $')

# 3) Preprocessing Pipeline

We are given a dataset with only 5 features (passengers + lon/lat), and potentially dozens of millions of rows.

👉 It makes perfect sense to create a lot of "engineered" features such as "hour of the day"  
- Hundreds of them would cause no problem because the huge number of rows will allow our model to learn all weights associated with these multiple features
- A dense, Deep Learning network will be well-suited for such a case

❗️ The proposed preprocessor below outputs a **fixed number of features** (65) that is **independent of the training set**. You will see that it will come in handy when scaling it up to hundreds of millions of rows

## 3.1) Passenger Preprocessors

Let's analyze passenger numbers

In [None]:
df.passenger_count

In [None]:
sns.histplot(data=df, x="passenger_count");

In [None]:
# PASSENGER PIPE
p_min = 1.
p_max = 8.
passenger_pipe = FunctionTransformer(lambda p: (p-p_min)/(p_max-p_min))

In [None]:
preprocessor = ColumnTransformer(
    [
        ("passenger_scaler", passenger_pipe, ["passenger_count"]),
    ],
)
preprocessor

In [None]:
pd.DataFrame(preprocessor.fit_transform(X_train))

## 3.2) Time Preprocessor

let's extract interesting attributes from the `pickup_datetime`
- hour of the day
- day of the week
- month of the year
- number of days since 2009 (it may encode inflation parameters)

In [None]:
import math

def transform_time_features(X: pd.DataFrame) -> np.ndarray:
    assert isinstance(X, pd.DataFrame)
    
    timedelta = (X["pickup_datetime"] - pd.Timestamp('2009-01-01T00:00:00', tz='UTC'))/pd.Timedelta(1,'D')
    
    pickup_dt = X["pickup_datetime"].dt.tz_convert("America/New_York").dt
    dow = pickup_dt.weekday
    hour = pickup_dt.hour
    month = pickup_dt.month

    hour_sin = np.sin(2 * math.pi / 24 * hour)
    hour_cos = np.cos(2*math.pi / 24 * hour)
    
    return np.stack([hour_sin, hour_cos, dow, month, timedelta], axis=1)

X_time_processed = transform_time_features(X[["pickup_datetime"]])

pd.DataFrame(X_time_processed, columns=["hour_sin", "hour_cos", "dow", "month", "timedelta"]).head()

Then, we one-hot-encode `["day of week", "month"]` by forcing all 24*7 combinations of categories to be always present in `X_processed` 

(remember we want a fixed size for `X_processed` at the end)

In [None]:
time_categories = [
        np.arange(0, 7, 1),  # days of the week from 0 to 6
        np.arange(1, 13, 1)  # months of the year from 1 to 12
    ]

OneHotEncoder(categories=time_categories, sparse_output=False)\
    .fit_transform(X_time_processed[:,[2,3]]) # column index [2,3] for ['dow', 'month'] !

And combine this with a sort of "Min-Max" re-scaling of the `timedelta` column

In [None]:
print(X_time_processed[:,-1].min())
print(X_time_processed[:,-1].max())

In [None]:
timedelta_min = 0
timedelta_max = 2190 # Our model may extend in the future. No big deal if the scaled data extend slightly beyond 1.0

In [None]:
time_pipe = make_pipeline(
    FunctionTransformer(transform_time_features),
    make_column_transformer(
        (OneHotEncoder(
            categories=time_categories,
            sparse_output=False,
            handle_unknown="ignore"
        ), [2, 3]), # corresponds to columns ["day of week", "month"], not the other columns

        (FunctionTransformer(lambda year: (year - timedelta_min) / (timedelta_max - timedelta_min)), [4]), # min-max scale the columns 4 ["year"]
        remainder="passthrough" # keep hour_sin and hour_cos
    )
)

preprocessor = ColumnTransformer(
    [
        ("passenger_scaler", passenger_pipe, ["passenger_count"]),
        ("time_preproc", time_pipe, ["pickup_datetime"]),
    ],
)

preprocessor

In [None]:
pd.DataFrame(preprocessor.fit_transform(X_train)).plot(kind='box');

☝️ 23 features approximately centered and scaled

## 3.3) Distance Pipeline

Let's add both the haversine and Manhattan distances as features

In [None]:
lonlat_features = ["pickup_latitude", "pickup_longitude", "dropoff_latitude", "dropoff_longitude"]

In [None]:
def distances_vectorized(df: pd.DataFrame, start_lat: str, start_lon: str, end_lat: str, end_lon: str) -> dict:
    """
    Calculate the haversine and Manhattan distances between two points (specified in decimal degrees).
    Vectorized version for pandas df
    Computes distance in Km
    """
    earth_radius = 6371

    lat_1_rad, lon_1_rad = np.radians(df[start_lat]), np.radians(df[start_lon])
    lat_2_rad, lon_2_rad = np.radians(df[end_lat]), np.radians(df[end_lon])

    dlon_rad = lon_2_rad - lon_1_rad
    dlat_rad = lat_2_rad - lat_1_rad

    manhattan_rad = np.abs(dlon_rad) + np.abs(dlat_rad)
    manhattan_km = manhattan_rad * earth_radius

    a = (np.sin(dlat_rad / 2.0)**2 + np.cos(lat_1_rad) * np.cos(lat_2_rad) * np.sin(dlon_rad / 2.0)**2)
    haversine_rad = 2 * np.arcsin(np.sqrt(a))
    haversine_km = haversine_rad * earth_radius

    return dict(
        haversine = haversine_km,
        manhattan = manhattan_km
    )

In [None]:
def transform_lonlat_features(X:pd.DataFrame)-> pd.DataFrame:
    assert isinstance(X, pd.DataFrame)
    res = distances_vectorized(X, *lonlat_features)

    return pd.DataFrame(res)

distances = transform_lonlat_features(X[lonlat_features])
distances

In [None]:
dist_min = 0
dist_max = 100

In [None]:
distance_pipe = make_pipeline(
    FunctionTransformer(transform_lonlat_features),
    FunctionTransformer(lambda dist: (dist - dist_min) / (dist_max - dist_min))
    )
distance_pipe

In [None]:
preprocessor = ColumnTransformer(
    [
        ("passenger_scaler", passenger_pipe, ["passenger_count"]),
        ("time_preproc", time_pipe, ["pickup_datetime"]),
        ("dist_preproc", distance_pipe, lonlat_features),
    ],
)
preprocessor

In [None]:
X_processed = pd.DataFrame(preprocessor.fit_transform(X_train))
X_processed.plot(kind='box');

☝️ 25 features, approximately scaled

## 3.4) GeoHasher

Finally, let's add information about **districts**! 

Some might be more expensive than others to go to/depart from (e.g. JFK airport!)

To _bucketize_ geospatial information, we'll use `pygeohash`

In [None]:
import pygeohash as gh

💡 pygeohash converts (lat,lon) into geospacial "squared buckets" of chosen precisions. The more precision you ask, the more "buckets" possibility there is!

<img src="https://wagon-public-datasets.s3.amazonaws.com/data-science-images/07-ML-OPS/geohashes.png">

In [None]:
x0 = X_train.iloc[0,:]
(x0.pickup_latitude, x0.pickup_longitude)

In [None]:
print(gh.encode(x0.pickup_latitude, x0.pickup_longitude, precision=3))
print(gh.encode(x0.pickup_latitude, x0.pickup_longitude, precision=4))
print(gh.encode(x0.pickup_latitude, x0.pickup_longitude, precision=5))

👇 Let's apply it to ALL of our data set (note that this preprocessing may take a very long time!)

In [None]:
def compute_geohash(X:pd.DataFrame, precision:int = 5) -> np.ndarray:
    """
    Add a geohash (ex: "dr5rx") of len "precision" = 5 by default
    corresponding to each (lon, lat) tuple, for pick-up and drop-off
    """
    assert isinstance(X, pd.DataFrame)

    X["geohash_pickup"] = X.apply(
        lambda x: gh.encode(x.pickup_latitude, x.pickup_longitude, precision=precision),
        axis=1
    )
    X["geohash_dropoff"] = X.apply(
        lambda x: gh.encode(x.dropoff_latitude, x.dropoff_longitude, precision=precision),
        axis=1
    )

    return X[["geohash_pickup", "geohash_dropoff"]]

In [None]:
compute_geohash(X_train)

☝️ Notice that this time, we have no choice but to apply `pygeohash` row by row with `df.apply(axis=1)`, and it takes a while to compute.

This is the danger of relying on an external Python library that is not always vectorized. 

👇 What are the most common districts?

In [None]:
all_geohashes = pd.concat([
    X_train.apply(lambda x: gh.encode(x.pickup_latitude, x.pickup_longitude, precision=4), axis=1),
    X_train.apply(lambda x: gh.encode(x.dropoff_latitude, x.dropoff_longitude, precision=4), axis=1),
])

In [None]:
print(len(all_geohashes.value_counts()))
plt.figure(figsize=(15,5))
plt.plot(np.cumsum(all_geohashes.value_counts()[:20])/(2*len(X_train))*100)
plt.title("percentage of taxi rides from/to these districts");

☝️ Only the first 20 districts matter. We can one-hot encode them.

In [None]:
most_important_geohash_districts = np.array(all_geohashes.value_counts()[:20].index)
most_important_geohash_districts

In [None]:
# Let's hard-code below the 20 most frequent district GeoHashes of precision 5,
# covering about 99% of all dropoff/pickup locations.
most_important_geohash_districts = [
    "dr5ru", "dr5rs", "dr5rv", "dr72h", "dr72j", "dr5re", "dr5rk",
    "dr5rz", "dr5ry", "dr5rt", "dr5rg", "dr5x1", "dr5x0", "dr72m",
    "dr5rm", "dr5rx", "dr5x2", "dr5rw", "dr5rh", "dr5x8"
]

Let's one-hot encode each GeoHash in one of the top-20 different buckets listed above


In [None]:
geohash_categories = [
    most_important_geohash_districts,  # pickup district list
    most_important_geohash_districts  # dropoff district list
]

geohash_pipe = make_pipeline(
    FunctionTransformer(compute_geohash),
    OneHotEncoder(
        categories=geohash_categories,
        handle_unknown="ignore",
        sparse_output=False
    )
)
geohash_pipe

## 3.5) Full Preprocessing Pipeline

Let's recap our final preprocessor

### a) Encoders

In [None]:
import math
import numpy as np
import pandas as pd
import pygeohash as gh

In [None]:
def transform_time_features(X: pd.DataFrame) -> np.ndarray:
    assert isinstance(X, pd.DataFrame)
    
    timedelta = (X["pickup_datetime"] - pd.Timestamp('2009-01-01T00:00:00', tz='UTC'))/pd.Timedelta(1,'D')
    
    pickup_dt = X["pickup_datetime"].dt.tz_convert("America/New_York").dt
    dow = pickup_dt.weekday
    hour = pickup_dt.hour
    month = pickup_dt.month

    hour_sin = np.sin(2 * math.pi / 24 * hour)
    hour_cos = np.cos(2*math.pi / 24 * hour)
    
    return np.stack([hour_sin, hour_cos, dow, month, timedelta], axis=1)

In [None]:
def transform_lonlat_features(X: pd.DataFrame) -> pd.DataFrame:

    assert isinstance(X, pd.DataFrame)
    lonlat_features = ["pickup_latitude", "pickup_longitude", "dropoff_latitude", "dropoff_longitude"]

    def distances_vectorized(df: pd.DataFrame, start_lat: str, start_lon: str, end_lat: str, end_lon: str) -> dict:
        """
        Calculate the haversine and Manhattan distances between two points on the earth (specified in decimal degrees).
        Vectorized version for pandas df
        Computes distance in Km
        """
        earth_radius = 6371

        lat_1_rad, lon_1_rad = np.radians(df[start_lat]), np.radians(df[start_lon])
        lat_2_rad, lon_2_rad = np.radians(df[end_lat]), np.radians(df[end_lon])

        dlon_rad = lon_2_rad - lon_1_rad
        dlat_rad = lat_2_rad - lat_1_rad

        manhattan_rad = np.abs(dlon_rad) + np.abs(dlat_rad)
        manhattan_km = manhattan_rad * earth_radius

        a = (np.sin(dlat_rad / 2.0)**2 + np.cos(lat_1_rad) * np.cos(lat_2_rad) * np.sin(dlon_rad / 2.0)**2)
        haversine_rad = 2 * np.arcsin(np.sqrt(a))
        haversine_km = haversine_rad * earth_radius

        return dict(
            haversine=haversine_km,
            manhattan=manhattan_km)

    result = pd.DataFrame(distances_vectorized(X, *lonlat_features))

    return result

In [None]:
def compute_geohash(X: pd.DataFrame, precision: int = 5) -> np.ndarray:
    """
    Add a GeoHash (ex: "dr5rx") of len "precision" = 5 by default
    corresponding to each (lon, lat) tuple, for pick-up, and drop-off
    """
    assert isinstance(X, pd.DataFrame)

    X["geohash_pickup"] = X.apply(
        lambda x: gh.encode(x.pickup_latitude, x.pickup_longitude, precision=precision),
        axis=1
    )
    X["geohash_dropoff"] = X.apply(
        lambda x: gh.encode(x.dropoff_latitude, x.dropoff_longitude, precision=precision),
        axis=1
    )

    return X[["geohash_pickup", "geohash_dropoff"]]

### b) Pipeline

In [None]:
from sklearn.pipeline import make_pipeline
from sklearn.compose import ColumnTransformer, make_column_transformer
from sklearn.preprocessing import OneHotEncoder, FunctionTransformer

In [None]:
# PASSENGER PIPE
p_min = 1
p_max = 8
passenger_pipe = FunctionTransformer(lambda p: (p - p_min) / (p_max - p_min))

# DISTANCE PIPE
dist_min = 0
dist_max = 100

distance_pipe = make_pipeline(
    FunctionTransformer(transform_lonlat_features),
    FunctionTransformer(lambda dist: (dist - dist_min) / (dist_max - dist_min))
)

# TIME PIPE
timedelta_min = 0
timedelta_max = 2090

time_categories = [
    np.arange(0, 7, 1),  # days of the week
    np.arange(1, 13, 1)  # months of the year
]

time_pipe = make_pipeline(
    FunctionTransformer(transform_time_features),
    make_column_transformer(
        (OneHotEncoder(
            categories=time_categories,
            sparse_output=False,
            handle_unknown="ignore"
        ), [2,3]), # corresponds to columns ["day of week", "month"], not the other columns

        (FunctionTransformer(lambda year: (year - timedelta_min) / (timedelta_max - timedelta_min)), [4]), # min-max scale the columns 4 ["timedelta"]
        remainder="passthrough" # keep hour_sin and hour_cos
    )
)

# GEOHASH PIPE
lonlat_features = [
    "pickup_latitude", "pickup_longitude", "dropoff_latitude",
    "dropoff_longitude"
]

# Below are the 20 most frequent district geohashes of precision 5,
# covering about 99% of all dropoff/pickup locations,
# according to prior analysis in a separate notebook
most_important_geohash_districts = [
    "dr5ru", "dr5rs", "dr5rv", "dr72h", "dr72j", "dr5re", "dr5rk",
    "dr5rz", "dr5ry", "dr5rt", "dr5rg", "dr5x1", "dr5x0", "dr72m",
    "dr5rm", "dr5rx", "dr5x2", "dr5rw", "dr5rh", "dr5x8"
]

geohash_categories = [
    most_important_geohash_districts,  # pickup district list
    most_important_geohash_districts  # dropoff district list
]

geohash_pipe = make_pipeline(
    FunctionTransformer(compute_geohash),
    OneHotEncoder(
        categories=geohash_categories,
        handle_unknown="ignore",
        sparse_output=False
    )
)

# COMBINED PREPROCESSOR
final_preprocessor = ColumnTransformer(
    [
        ("passenger_scaler", passenger_pipe, ["passenger_count"]),
        ("time_preproc", time_pipe, ["pickup_datetime"]),
        ("dist_preproc", distance_pipe, lonlat_features),
        ("geohash", geohash_pipe, lonlat_features),
    ],
    n_jobs=-1,
)

In [None]:
final_preprocessor

In [None]:
final_preprocessor.fit(X_train)

X_train_processed = final_preprocessor.transform(X_train)
X_val_processed = final_preprocessor.transform(X_val)
X_test_processed = final_preprocessor.transform(X_test)

In [None]:
fig, ax = plt.subplots(figsize=(10, 5))
pd.DataFrame(X_train_processed).plot(kind='box', ax=ax);

In [None]:
fig, ax = plt.subplots(figsize=(10,6))
sns.heatmap(pd.DataFrame(X_train_processed).corr(), vmin=-1, cmap='RdBu');

To conclude, we can can probably also compress our processed data to float32 without loosing too much perf

In [None]:
X_train_processed.dtype

In [None]:
print(X_train_processed.nbytes / 1024**2, "MB")

In [None]:
# Compress the data a bit
X_train_processed = X_train_processed.astype(np.float32)
X_val_processed = X_val_processed.astype(np.float32)
X_test_processed = X_test_processed.astype(np.float32)

print(X_train_processed.nbytes / 1024**2, "MB")

In [None]:
pd.DataFrame(X_train_processed)

In [None]:
pd.DataFrame(X_train_processed).describe().mean(axis=1)

☝️ The preprocessor outputs a **fixed** number of features (65) that is independent of the training set. 

☝️ The preprocessor is also  **state-less** (i.e it has no `.fit()` method, only a `.transform()`). It can be seen as a *pure function* $f:X \rightarrow X_{processed}$ without an internal state, as opposed to standard scaling for instance, which has to store "X_train standard deviations" as internal states.

These two features will make work much easier for the ML Engineering team to scale preprocessing to hundreds of GBs. 

# 4) Model

## 4.1) Architecture

In [None]:
from tensorflow import keras
from keras import Model, Sequential, layers, regularizers
from keras.callbacks import EarlyStopping

In [None]:
def initialize_model(input_shape:tuple) -> Model:
    """
    Initialize the Neural Network with random weights
    """

    reg = regularizers.l1_l2(l1=0.005)

    model = Sequential()
    model.add(layers.Input(shape=input_shape))
    model.add(layers.Dense(100, activation="relu", kernel_regularizer=reg))
    model.add(layers.BatchNormalization(momentum=0.9))
    model.add(layers.Dropout(rate=0.1))
    model.add(layers.Dense(50, activation="relu"))
    model.add(layers.BatchNormalization(momentum=0.9))  # use momentum=0 to only use statistic of the last seen minibatch in inference mode ("short memory"). Use 1 to average statistics of all seen batch during training histories.
    model.add(layers.Dropout(rate=0.1))
    model.add(layers.Dense(1, activation="linear"))

    print("✅ model initialized")

    return model

In [None]:
model = initialize_model(input_shape=X_train_processed.shape[1:])
model.summary()

In [None]:
learning_rate = 0.0005
batch_size = 256

optimizer = keras.optimizers.Adam(learning_rate=learning_rate)
model.compile(loss="mean_squared_error", optimizer=optimizer, metrics=["mae"])

In [None]:
es = EarlyStopping(
    monitor="val_loss",
    patience=2,
    restore_best_weights=True,
    verbose=0
)

history = model.fit(
    X_train_processed,
    y_train,
    validation_data=(X_val_processed, y_val),
    epochs=100,
    batch_size=batch_size,
    callbacks=[es],
    verbose=1
)

## 4.2) Performance evaluation

In [None]:
print("MAE val", round(model.evaluate(X_val_processed, y_val)[1],2), ' $')
print("MAE test", round(model.evaluate(X_test_processed, y_test)[1],2), ' $')
print("MAE test baseline", round(float(baseline_mae_test),2), ' $')

In [None]:
y_pred = model.predict(X_test_processed)

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

plt.figure(figsize=(15,8))

plt.hist(y_pred, label='pred', color='r', bins=200, alpha=0.3)
plt.hist(y_test, label='truth', color='b', bins=200, alpha=0.3)

plt.legend()
plt.xlim((0,60))

In [None]:
residuals = y_pred - y_test

In [None]:
residuals.describe()

In [None]:
residuals = y_pred - y_test
sns.histplot(residuals)
plt.xlim(xmin=-20, xmax=20)

In [None]:
residuals.sort_values(by='fare_amount')

In [None]:
# Residual vs. actual scatter plot
plt.figure(figsize=(15,5))
plt.scatter(x=y_test,y=residuals, alpha=0.1)
plt.xlabel('actual')
plt.ylabel('residuals')

In [None]:
# Residual vs. predicted scatter plot
plt.figure(figsize=(15,5))
plt.scatter(x=y_pred,y=residuals, alpha=0.1)
plt.xlabel('predicted')
plt.ylabel('residuals')

☝️ Our model trained on a limited 60k dataset has an MAE of about 2.5$ per course, compared with a mean course price of 11$.  

A simple linear regression would give us about 3$ of MAE, but the devil lies in the details!

In particular, we're not that good at predicting very long/expensive courses.

# 🧪 Test Your Understanding

❓ Try to answer these questions with your buddy
1. Are you satisfied with the model's performance? 
2. Any ideas to improve it?
3. What is a stateless pipeline (as opposed to a stateful one)?
4. How does an OHEncoder work with fixed column categories?
5. How is the data normalization done in the Neural Net?

<details>
  <summary markdown='span'>💡 Answers</summary>

1. We have a 25% improvement over the linear regression baseline (but we should cross-validate that to be sure). Besides, a 2$ forecast error on taxi courses whose prices also depend on traffic seems close to the irreducible error rate.


2. We could improve model by adding more train data (will test that later). Another promising idea could be to *learn* the embedding of our categorical features (instead of one-hot-encoding them).


3. A stateless pipeline has no real `.fit()` method, only a `.transform()`. 


4. To become stateless, we've hard-coded the `categories` to one-hot encode `OneHotEncoder(categories=categories,...)` and hard-coded the statistical features of each column in our scalers:  `FunctionTransformer(lambda dist: (dist - dist_min)/(dist_max - dist_min))`


5. In the TensorFlow model, notice the `layers.BatchNormalization()` we've added between each dense layer, which normalizes data batch-per-batch! It's a cool feature to help fix vanish gradients during the back-propagation!


</details>

❓ Predict the price for this new course `X_new` below and store the result `y_new` as a `np.ndarray`

In [None]:
X_train.iloc[0:2,:]['pickup_datetime']

In [None]:
X_new = pd.DataFrame(dict(
    pickup_datetime=[pd.Timestamp("2013-07-06 17:18:00", tz='UTC')],
    pickup_longitude=[-73.950655],
    pickup_latitude=[40.783282],
    dropoff_longitude=[-73.984365],
    dropoff_latitude=[40.769802],
    passenger_count=[1],
))

X_new

In [None]:
# Compute y_new
pass  # YOUR CODE HERE

In [None]:
from nbresult import ChallengeResult

result = ChallengeResult(
    'notebook',
    subdir='train_at_scale',
    y_new=y_new
)

result.write()
print(result.check())

Finally, let's run `make test_kitt` so as to allow Kitt to track your progress

In [None]:
! cd .. && make test_kitt 