In [16]:
#import 
import pandas as pd
import numpy as np
import google.cloud.bigquery as bq
import os
import pickle
import tensorflow as tf
from tensorflow import keras
import requests
import math

import pandas as pd

import statsmodels.api as sm
import statsmodels.tsa.api as smt

from statsmodels.tsa.stattools import adfuller

from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import seaborn as sns
from pylab import rcParams
import matplotlib.pyplot as plt
from matplotlib import rc
from pandas.plotting import register_matplotlib_converters

%matplotlib inline
%config InlineBackend.figure_format='retina'

register_matplotlib_converters()
sns.set(style='whitegrid', palette='muted', font_scale=1.5)

rcParams['figure.figsize'] = 22, 10

In [4]:
RANDOM_SEED = 42

np.random.seed(RANDOM_SEED)
tf.random.set_seed(RANDOM_SEED)

In [6]:
def create_dataset(X, y, time_steps=1):
    Xs, ys = [], []
    for i in range(len(X) - time_steps):
        v = X.iloc[i:(i + time_steps)].values
        Xs.append(v)        
        ys.append(y.iloc[i + time_steps])
    return np.array(Xs), np.array(ys)

In [7]:
def add_datetime_columns(_df):
    
    return (
        _df
        .assign(
            hour=lambda x: x.timestamp.dt.hour,
            date=lambda x: pd.to_datetime(x.timestamp.dt.date)))

In [8]:
def agg_value_by_hour(_df):
    return (
        _df
        .groupby(['date', 'hour'])
        .agg({'value': 'sum'})
        .reset_index()
        .assign(day_of_week=lambda x: x.date.dt.dayofweek))

In [9]:
def concat_day_hour(x):
    return(x.day_of_week.astype('str').str.cat(x.hour.astype('str'), sep='_'))

def get_hourly_means_map(_df):
    temp = (
        _df
        # Get mean at each hour for each day of week to capture
        # hourly/daily trends 
        .groupby(['day_of_week', 'hour'])
        .agg({'value': 'mean'})
        .reset_index()
        # Make a key for each day of week-hour combination
        .assign(
            key=lambda x: concat_day_hour(x),
            value=lambda x: np.round(x.value, 3)
        )
        .filter(['key', 'value'])
    )
    mean_mapper = dict(zip(temp.key, temp.value))
    return mean_mapper

In [10]:
def center_data(_df, mean_map):
    return (
        _df
        .assign(
            mean_map_key=lambda x: concat_day_hour(x),
            mean_val=lambda x: x.mean_map_key.map(mean_map),
            centered_value=lambda x: x.value - x.mean_val,
        )
        .filter(['date', 'hour', 'centered_value']))

In [11]:
def data_pipeline(_df, is_train, _mean_map=None):
    """
    if training then no mean_map is passed in, if testing then must pass in a mean map.
    If training then will return both the transformed data and mean_map
    """
    _df = add_datetime_columns(_df)
    _df = agg_value_by_hour(_df)
    # Log Transform to stabilize variance
    _df = _df.assign(value=lambda x: np.log10(x['value']))
    if is_train:
        # make a map of means by hour and day of week 
        _mean_map = get_hourly_means_map(_df)
        return center_data(_df, _mean_map), _mean_map
    else:
        if mean_map is None:
            print('Must pass in a mean mapper with test set')
        else:
            return center_data(_df, _mean_map)

In [20]:
def load_taxi_data(filepath='nyc_taxi.csv', url='https://raw.githubusercontent.com/numenta/NAB/master/data/realKnownCause/nyc_taxi.csv'):
    if ~os.path.exists(filepath):
        r = requests.get(url, allow_redirects=True)
        with open('nyc_taxi.csv', 'wb') as f:
            f.write(r.content)
    _df = (
        pd.read_csv(filepath)
        .assign(timestamp=lambda x: pd.to_datetime(x.timestamp))
    )
    
    return _df

In [21]:
df = load_taxi_data()

In [24]:
df.describe()

Unnamed: 0,value
count,10320.0
mean,15137.56938
std,6939.495808
min,8.0
25%,10262.0
50%,16778.0
75%,19838.75
max,39197.0
