# GVB predictions - 1-week ahead - Station level

## To-do:

* env file needs to be loaded. Note that "from sqlalchemy import create_engine" and "import env" needs to be uncommented in the third cell.

* Predictions need to be stored in the database.

* Decide where to calculate crowd levels. Do this based on static csv from GVB.

* Events are specified manually and are taken from a static file at this moment. As a temporary solution, we can save events in the database. Ideally, we need to get events from an API.

* Historical weather data and weather forecast have different units for some columns? This is currently solved by multiplying everything by 10, which should be correct.


## Preparations

In [None]:
%%capture
get_ipython().run_cell_magic('bash', '', 'pip install psycopg2-binary\npip install workalendar')

In [None]:
import pandas as pd
import geopandas as gpd
import numpy as np
import os

#from sqlalchemy import create_engine
#import env

from pyspark.sql import SparkSession
from pyspark.sql.functions import substring, length, col, expr
from pyspark.sql.types import *

import requests

from datetime import datetime, timedelta, date
import time
import pytz
from workalendar.europe import Netherlands

from sklearn.metrics import mean_squared_error, mean_absolute_error, mean_absolute_error

import helpers_gvb as h

import importlib   # to reload helpers without restarting kernel: importlib.reload(h)

import matplotlib
import matplotlib.pyplot as plt
%matplotlib inline

import warnings

## Settings

In [None]:
# create engine for SQL queries
#engine = create_engine("postgresql://{}:{}@{}:{}/{}".format(env.DATABASE_USERNAME_AZ, 
#                                                            env.DATABASE_PASSWORD_AZ, 
#                                                            "igordb.postgres.database.azure.com", 
#                                                            5432, 
#                                                            "igor"),
#                       connect_args={'sslmode':'require'})

In [None]:
# stations to create predictions for
stations = ['Centraal Station', 'Station Zuid', 'Station Bijlmer ArenA']

In [None]:
today = pd.to_datetime("today")
today_str = str(today.year) + "-" + str(today.month) + "-" + str(today.day)
covid_url = 'https://covidtrackerapi.bsg.ox.ac.uk/api/v2/stringency/date-range/2020-1-1/' + today_str

## Main

### 1. Get data

In [None]:
print('Start loading raw data') 

In [None]:
spark = SparkSession \
    .builder \
    .getOrCreate()

In [None]:
# Load 2019 GVB data (TEMPORARILY)
gvb_2019_bestemming_raw = spark.read.format("csv").option("header", "true").load("s3a://gvb-gvb/topics/gvb/2020/04/20/Datalab_Reis_Bestemming_Uur_2019.csv", sep = ";").toPandas()
gvb_2019_herkomst_raw = spark.read.format("csv").option("header", "true").load("s3a://gvb-gvb/topics/gvb/2020/04/20/Datalab_Reis_Herkomst_Uur_2019.csv", sep = ";").toPandas()

In [None]:
# Load 2020 GVB data (TEMPORARILY)
gvb_2020_bestemming_raw = spark.read.format("csv").option("header", "true").load("s3a://gvb-gvb/topics/gvb/2020/*/*/Datalab_Reis_Bestemming_Uur_2020*.csv", sep = ";").toPandas()
gvb_2020_herkomst_raw = spark.read.format("csv").option("header", "true").load("s3a://gvb-gvb/topics/gvb/2020/*/*/Datalab_Reis_Herkomst_Uur_2020*.csv", sep = ";").toPandas()

In [None]:
# Load 2021 GVB data (csv format only) (TEMPORARILY)
gvb_2021_bestemming_raw = spark.read.format("csv").option("header", "true").load("s3a://gvb-gvb/topics/gvb/2021/*/*/Datalab_Reis_Bestemming_Uur_2021*.csv", sep = ";").toPandas()
gvb_2021_herkomst_raw = spark.read.format("csv").option("header", "true").load("s3a://gvb-gvb/topics/gvb/2021/*/*/Datalab_Reis_Herkomst_Uur_2021*.csv", sep = ";").toPandas()

In [None]:
# Merge data from above 3 cells (TEMPORARILY)
gvb_bestemming_raw_csv = pd.concat([gvb_2019_bestemming_raw, gvb_2020_bestemming_raw, gvb_2021_bestemming_raw])
gvb_herkomst_raw_csv = pd.concat([gvb_2019_herkomst_raw, gvb_2020_herkomst_raw, gvb_2021_herkomst_raw])

In [None]:
# Load GVB data in JSON format
gvb_bestemming_raw_json = spark.read.format("json").option("header", "true").load("s3a://gvb-gvb/topics/gvb/*/*/*/*.json.gz", sep = ",").toPandas()
gvb_herkomst_raw_json = spark.read.format("json").option("header", "true").load("s3a://gvb-gvb/topics/gvb-herkomst/*/*/*/*.json.gz", sep = ",").toPandas()

In [None]:
# Load weather data
knmi_obs = spark.read.format("json").load("s3a://knmi-knmi/topics/knmi-observations/*/*/*/*").toPandas()
knmi_pred = spark.read.format("json").option("header", "true").load("s3a://knmi-knmi/topics/knmi/2021/*/*/*.json.gz", sep = ";").toPandas()

In [None]:
covid_df_raw = pd.DataFrame(requests.get(url = covid_url).json()['data'])

In [None]:
holidays_data_raw = Netherlands().holidays(2019) + Netherlands().holidays(2020) + Netherlands().holidays(2021) 

In [None]:
vacations_df = h.get_vacations()

In [None]:
events = h.get_events()

### 2. Prepare data

#### Pre-process data sources

In [None]:
print('Start pre-processing data')

In [None]:
bestemming, herkomst = h.merge_csv_json(gvb_bestemming_raw_csv, gvb_herkomst_raw_csv, gvb_bestemming_raw_json, gvb_herkomst_raw_json)

In [None]:
# Cast 'AantalReizen' to int to sum up
bestemming['AantalReizen'] = bestemming['AantalReizen'].astype(int)
herkomst['AantalReizen'] = herkomst['AantalReizen'].astype(int)

# Remove all duplicates
bestemming.drop_duplicates(inplace=True)
herkomst.drop_duplicates(inplace=True)

# Group by station name because we are analysing per station
bestemming_grouped = bestemming.groupby(['Datum', 'UurgroepOmschrijving (van aankomst)', 'AankomstHalteNaam'], as_index=False)['AantalReizen'].sum()
herkomst_grouped = herkomst.groupby(['Datum', 'UurgroepOmschrijving (van vertrek)', 'VertrekHalteNaam'], as_index=False)['AantalReizen'].sum()

In [None]:
bestemming_herkomst = h.merge_bestemming_herkomst(bestemming_grouped, herkomst_grouped)

In [None]:
gvb_dfs = []

for station in stations:
    gvb_dfs.append(h.preprocess_gvb_data_for_modelling(bestemming_herkomst, station))

In [None]:
knmi_historical = h.preprocess_knmi_data_hour(knmi_obs)

In [None]:
knmi_forecast = h.preprocess_metpre_data(knmi_pred)

In [None]:
covid_df = h.preprocess_covid_data(covid_df_raw)

In [None]:
holiday_df = h.preprocess_holiday_data(holidays_data_raw)

#### Merge datasources

In [None]:
gvb_dfs_merged = []

for df in gvb_dfs:
    gvb_dfs_merged.append(merge_gvb_with_datasources(df, knmi_historical, covid_df, holiday_df, vacations_df, events))

### 3. Clean data

In [None]:
print('Start cleaning data')

#### Interpolate missing data

In [None]:
gvb_dfs_interpolated = []

for df in gvb_dfs_merged:
    gvb_dfs_interpolated.append(h.interpolate_missing_values(df))

#### Create features

In [None]:
gvb_dfs_final = []

for df in gvb_dfs_interpolated:
    df['check-ins'] = df['check-ins'].astype(int)
    df['check-outs'] = df['check-outs'].astype(int)
    df[['check-ins_week_ago', 'check-outs_week_ago']] = df.apply(lambda x: h.get_crowd_last_week(df, x), axis=1, result_type="expand")
    gvb_dfs_final.append(df)

### 4. Create model dataframes

In [None]:
data_splits = []

for df in gvb_dfs_final:
    train, validation, test = h.get_train_val_test_split(df.dropna())
    data_splits.append([train, validation, test])

In [None]:
# Define features and targets. This is the same for all stations at the moment.
features = ['year', 'month', 'weekday', 'hour', 'holiday', 'vacation', 'planned_event', 'check-ins_week_ago', 
            'check-outs_week_ago', 'stringency', 'temperature', 'wind_speed', 'precipitation_h']
targets = ['check-ins', 'check-outs']

In [None]:
X_train_splits = []
y_train_splits = []

X_validation_splits = []
y_validation_splits = []

X_test_splits = []
y_test_splits = []

for split in data_splits:
    X_train_splits.append(split[0][features])
    y_train_splits.append(split[0][targets])
    
    X_validation_splits.append(split[1][features])
    y_validation_splits.append(split[1][targets])
    
    X_test_splits.append(split[2][features])
    y_test_splits.append(split[2][targets])

In [None]:
# Dataframes to predict check-ins and check-outs of next week
X_predict_dfs = []

for df in gvb_dfs_final:
    X_predict_dfs.append(h.get_future_df(features, df, covid_df.tail(1)['stringency'][0], holiday_df, vacations_df, knmi_forecast, events))

### 5. Create model

In [None]:
print('Start modelling')

In [None]:
basic_models = []

for x in range(0, len(data_splits)):
    model_basic, r_squared_basic, mae_basic, rmse_basic = h.train_random_forest_regressor(X_train_splits[x], y_train_splits[x], 
                                                                                          X_validation_splits[x], y_validation_splits[x], 
                                                                                          None)
    basic_models.append([model_basic, r_squared_basic, mae_basic, rmse_basic])

#### Tune (hyper-)parameters (not done because models currently do not improve with hyperparameter tuning)

In [None]:
# Specify hyperparameters, these could be station-specific. For now, default hyperparameter settings are being used.
centraal_station_hyperparameters = None
station_zuid_hyperparameters = None
station_bijlmer_arena_hyperparameters = None

hyperparameters = [centraal_station_hyperparameters,
                   station_zuid_hyperparameters, 
                   station_bijlmer_arena_hyperparameters]

In [None]:
#tuned_models = []

#for x in range(0, len(data_splits)):
#    model_tuned, r_squared_tuned, mae_tuned, rmse_tuned = h.train_random_forest_regressor(X_train_splits[x], y_train_splits[x], 
#                                                                                          X_validation_splits[x], y_validation_splits[x], 
#                                                                                          hyperparameters[x])
#    tuned_models.append([model_tuned, r_squared_tuned, mae_tuned, rmse_tuned])

##### Improvements compared to basic model (negative is worse performance)

In [None]:
#for x in range(0, len(basic_models)):
#    print("R-squared difference", tuned_models[x][1]-basic_models[x][1])
#    print("MAE difference", tuned_models[x][2]-basic_models[x][2])
#    print("RMSE difference", tuned_models[x][3]-basic_models[x][3])

#### Train test model (including validation data)

In [None]:
test_models = []

for x in range(0, len(data_splits)):
    X_train_with_val = pd.concat([X_train_splits[x], X_validation_splits[x]])
    y_train_with_val = pd.concat([y_train_splits[x], y_validation_splits[x]])
    
    model_test, r_squared_test, mae_test, rmse_test = h.train_random_forest_regressor(X_train_with_val, y_train_with_val, 
                                                                                          X_test_splits[x], y_test_splits[x], 
                                                                                          hyperparameters[x])
    test_models.append([model_test, r_squared_test, mae_test, rmse_test])

#### Check models on R-squared score

In [None]:
for x in range(0, len(test_models)):
    station_name = stations[x]
    r_squared = test_models[x][1]
    if r_squared < 0.7:
        warnings.warn("Model for " + station_name + " shows unexpected performance!")

#### Train final models (to make predictions)

In [None]:
final_models = []

for x in range(0, len(data_splits)):
    X_train_with_val = pd.concat([X_train_splits[x], X_validation_splits[x], X_test_splits[x]])
    y_train_with_val = pd.concat([y_train_splits[x], y_validation_splits[x], y_test_splits[x]])
    
    model_final = h.train_random_forest_regressor(X_train_with_val, y_train_with_val, X_test_splits[x], y_test_splits[x], 
                                                  hyperparameters[x])[0]
    final_models.append(model_final)

### 6. Prepare output

In [None]:
print('Start preparing data')

In [None]:
predictions = []

for predict_df in X_predict_dfs:
    for model in final_models:
        prediction = h.predict(model, predict_df)
        predictions.append(prediction)

In [None]:
# Calculate crowd levels here? Or use trigger in database?

### 7. Store data

In [None]:
print('Start storing data')

In [None]:
for x in range(0, len(stations)):
    station_name = stations[x] # Use this to write predictions to database
    predictions_for_station = predictions[x]
    
    final_prediction_dataframe = predictions_for_station.drop(columns=['year', 'month', 'weekday', 'holiday', 'vacation', 
                                                                       'planned_event', 'check-ins_week_ago', 
                                                                       'check-outs_week_ago', 'stringency', 'temperature', 
                                                                       'wind_speed', 'precipitation_h'])
    
    # Add time to datetime column so that it is easier to store in database
    final_prediction_dataframe['datetime'] = final_prediction_dataframe.apply(lambda x: x['datetime'].replace(hour=x['hour']), axis=1)
    final_prediction_dataframe.drop(columns=['hour'], inplace=True)
    final_prediction_dataframe['Station'] = station_name

    ### Code to write data to database

In [None]:
print('Finished storing data')