In [1]:
# imports
import platform
import logging
import sys
import os
from os import path

import numpy as np
import pandas as pd

import statsmodels.api as sm
from statsmodels.tsa.arima_model import ARIMA
from statsmodels.tsa.seasonal import seasonal_decompose
from statsmodels.tsa.stattools import adfuller

import matplotlib.pyplot as plt
import seaborn as sns

import warnings
import datetime as dt

# Multiple dataframes in single cell
from IPython.display import display
%matplotlib inline

from matplotlib.pylab import rcParams
rcParams['figure.figsize'] = 15, 6

  from pandas.core import datetools


In [2]:
# Working Dir / Load Data
from sys import platform
# if platform.system() == 'Windows':
if platform == 'win32':
    directory = 'D:\\project\\data\\kg_jpn_rest\\'
    exportDirectory = directory + 'export\\'

# Mac
#elif platform.system() == 'Darwin':
elif platform == 'darwin':
    directory = '//Project/data/kg_corpgroc/'
    exportDirectory = directory + 'export/'

# AWS
elif platform == 'linux':
    directory = '//data/'
    exportDirectory = directory + 'export/'

## Load Data

In [3]:
# Code to load notebooks borrowed from online
# http://jupyter-notebook.readthedocs.io/en/stable/examples/Notebook/Importing%20Notebooks.html
import io, os, sys, types
from IPython import get_ipython
from nbformat import read
from IPython.core.interactiveshell import InteractiveShell


def find_notebook(fullname, path=None):
    name = fullname.rsplit('.', 1)[-1]
    if not path:
        path = ['']
    for d in path:
        nb_path = os.path.join(d, name + ".ipynb")
        if os.path.isfile(nb_path):
            return nb_path
        # let import Notebook_Name find "Notebook Name.ipynb"
        nb_path = nb_path.replace("_", " ")
        if os.path.isfile(nb_path):
            return nb_path
        
class NotebookLoader(object):
    """Module Loader for Jupyter Notebooks"""
    def __init__(self, path=None):
        self.shell = InteractiveShell.instance()
        self.path = path
    
    def load_module(self, fullname):
        """import a notebook as a module"""
        path = find_notebook(fullname, self.path)
        
        print ("importing Jupyter notebook from %s" % path)
                                       
        # load the notebook object
        with io.open(path, 'r', encoding='utf-8') as f:
            nb = read(f, 4)
        
        # create the module and add it to sys.modules
        # if name in sys.modules:
        #    return sys.modules[name]
        mod = types.ModuleType(fullname)
        mod.__file__ = path
        mod.__loader__ = self
        mod.__dict__['get_ipython'] = get_ipython
        sys.modules[fullname] = mod
        
        # extra work to ensure that magics that would affect the user_ns
        # actually affect the notebook module's ns
        save_user_ns = self.shell.user_ns
        self.shell.user_ns = mod.__dict__
        
        try:
          for cell in nb.cells:
            if cell.cell_type == 'code':
                # transform the input to executable Python
                code = self.shell.input_transformer_manager.transform_cell(cell.source)
                # run the code in themodule
                exec(code, mod.__dict__)
        finally:
            self.shell.user_ns = save_user_ns
        return mod
    
class NotebookFinder(object):
    """Module finder that locates Jupyter Notebooks"""
    def __init__(self):
        self.loaders = {}

    def find_module(self, fullname, path=None):
        nb_path = find_notebook(fullname, path)
        if not nb_path:
            return

        key = path
        if path:
            # lists aren't hashable
            key = os.path.sep.join(path)

        if key not in self.loaders:
            self.loaders[key] = NotebookLoader(path)
        return self.loaders[key]   
    
sys.meta_path.append(NotebookFinder())

In [4]:
import helper_notebook as hlp

importing Jupyter notebook from helper_notebook.ipynb


In [6]:
#hlp.fn_finalize_submission_file()

In [7]:
# Load the data set / cleaned, joined, and formatted
dfSuper = hlp.fn_load_all_data(1)
dfSuper, newColList = hlp.fn_ts_add_cycletrend_analysis(dfSuper, 'visitors')

Data Load - Loading data on: Windows Directory: D:\project\data\kg_jpn_rest\
Data Load - Merging Data
Data Load - Adding Features
Data Load - Finished


In [8]:
dfSuper.head()

Unnamed: 0_level_0,air_store_id,visitors,calendar_date,day_of_week,holiday_flg,genre_name,area_name,latitude,longitude,reserve_visitors,...,forecast,cycle,trend,3-day-SMA,7-day-SMA,14-day-SMA,31-day-SMA,EWMA_7_days,EWMA_14_days,EWMA_31_days
visit_date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
2016-01-13,air_ba937bf13d40fb24,25.0,2016-01-13,Wednesday,0,Dining bar,Tōkyō-to Minato-ku Shibakōen,35.658068,139.751599,0.0,...,0.0,1.713874,23.286126,,,,,25.0,25.0,25.0
2016-01-14,air_ba937bf13d40fb24,32.0,2016-01-14,Thursday,0,Dining bar,Tōkyō-to Minato-ku Shibakōen,35.658068,139.751599,0.0,...,0.0,9.107589,22.892411,,,,,29.0,28.75,28.612903
2016-01-15,air_ba937bf13d40fb24,29.0,2016-01-15,Friday,0,Dining bar,Tōkyō-to Minato-ku Shibakōen,35.658068,139.751599,0.0,...,0.0,6.500232,22.499768,28.666667,,,,29.0,28.845501,28.750347
2016-01-16,air_ba937bf13d40fb24,22.0,2016-01-16,Saturday,0,Dining bar,Tōkyō-to Minato-ku Shibakōen,35.658068,139.751599,0.0,...,0.0,-0.11496,22.11496,27.666667,,,,26.44,26.751269,26.89605
2016-01-18,air_ba937bf13d40fb24,6.0,2016-01-18,Monday,0,Dining bar,Tōkyō-to Minato-ku Shibakōen,35.658068,139.751599,0.0,...,0.0,-15.748812,21.748812,19.0,,,,19.740077,21.337295,22.160784


## Main Code Block Start

In [9]:
# Variable to specify if we include a header one time in the files
includeHeaderRunOnce = True
# Determine if we are resuming a previous file
resumeRunningPreviousFile = False


print(exportDirectory)
    
#exportParamOptionsFileName = exportDirectory + 'export_param_' + str(file_args_store_nbr) + '.csv'
exportResultsSubmissionFileName = exportDirectory + 'export_results.csv'

exportLogName = exportDirectory + 'export_log.log'


# ===========================
# SETUP LOGGING
# ===========================
# Wipe any existing log file - change to keep this script
#if path.isfile(exportLogName):
#    os.remove(exportLogName)

# We may set another parameter to pass in to wipe the existing param options and results submissions

# SEt logging information
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

# create a file handler
handler = logging.FileHandler(exportLogName)
handler.setLevel(logging.INFO)

# create a logging format
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)

# add the handlers to the logger
logger.addHandler(handler)
logger.info('Start Logging')


# Find out if we have an existing file and work with it from there to update the mergedDataFrame
includeSubmissionHeaderRunOnce = True

D:\project\data\kg_jpn_rest\export\


## Check if we are resuming existing File

In [10]:
# FUNCTIONS
# Check if the file exists
def fn_determine_file_exists(fileName):
    fileExists = False

    if path.isfile(fileName):
        fileExists = True

    return  fileExists

# Will determine where the file / process left off to pick back up
def fn_determine_file_last_run(fileName, colNames):

    # Read In
    df_leftOff = pd.read_csv(fileName)
    
    # Grab the unique store number 
    df_leftOff = pd.DataFrame(df_leftOff['air_store_id'].unique(), columns=(colNames))
    
    
    # if I re-use this later, need to handle for multiple column names (if applicable)
    df_leftOff.sort_values(['air_store_id'], ascending=[True], inplace=True)

    # Set a Processed Flag for all the entries
    df_leftOff['processed'] = 1

    # Return a data frame to join later
    return df_leftOff

In [11]:
# !!!! THIS IS STILL BROKEN - NEED TO ADJUST / FIX - Merging too many records
# Determine if we are resuming a file
# this could stand to be cleaned up.  

# Set a default running dataframe - to be used to grab unique values
rundf = dfSuper.copy()

# Call the function to determine where we left off
if fn_determine_file_exists(exportResultsSubmissionFileName) == True:

    # Grab the completed results
    #completed_df = fn_determine_file_last_run(exportResultsSubmissionFileName, ['air_store_id'])
    existingDF = fn_determine_file_last_run(exportResultsSubmissionFileName, ['air_store_id'])
    
    # Should join back to test.  If there is no records, then we will scoop in the end. (meaning skip over).
    # feel this is a bit over-complicated, but late nights
    
    # Make a copy of the visit merged training
    copyTrainDF = visitMergeDF.copy(deep=True)
    # reset index to keep the visit date
    copyTrainDF.reset_index(inplace=True)
    # merge the train to the existing df to get the processed
    df_resume = pd.merge(copyTrainDF, existingDF, how='left', on='air_store_id', copy=True)
    # Reset the index
    df_resume.set_index('visit_date', inplace=True)
    # set all blank unprocssed to 0 to mean process
    df_resume['processed'].replace({np.nan: 0}, inplace=True)

    # grab only the unprocessed
    df_resume = df_resume[df_resume['processed']==0]
    
    # Resort anyway
    df_resume.sort_values(['air_store_id'], ascending=[True], inplace=True)

    # Then filter down the data-set to only un-filtered
    #df_test_iteration = df_resume[df_resume['processed']==0].copy()

    # set to True for below
    resumeRunningPreviousFile = True
    print('Existing File Detected')
    
    # Set the runnign dataframe to the resumed one
    rundf = df_resume

# Determine if we include header (multiple booleans as I had split multiple output files earlier)
if resumeRunningPreviousFile == True:
    includeSubmissionHeaderRunOnce = False
    
# Re-Order - in case we are not resuming
rundf.sort_values(['air_store_id'], ascending=[True], inplace=True)

## Loop through all records

In [12]:
# WHERE THE FUNCTION WOULD BE
def fn_ts_forecast_ARIMA_exog(df, storeId, exogParam, orderParam, visitCorrThreshold, shift, plot):
    warnings.filterwarnings("ignore") 

    errorOccured = False
    
    # Set the storeId
    ts = df[df['air_store_id']==storeId].copy()
    ts.asfreq('D')  # not needed
    ts['forecast'] = 0 # initialize

    visitorCorr = ts['corr_vis_resv'].head(1)
    
    # If correlation is above a threshold, then add the exog parameter of reservations for visitors
    if float(visitorCorr) >= float(visitCorrThreshold):
        #print('Visitor Corr: ' + str(visitorCorr))
        exogParam = exogParam + ['reserve_visitors']
    
    minDate = ts.index.min()
    maxDate = ts.index.max() 
    maxDate = maxDate - pd.DateOffset(30) # offset 30 days to see how the model will perform on train/test data
    #print('max date: ' + str(maxDate))
    
    forecast_start_range = len(ts) - 30  # so let's play with a month out to start the prediction
    # second try would be 15
    forecast_out_range = len(ts) + 39  # this is the number of days between 2017-04-22 and 2017-05-31
    # It needs + 39 to properly forecast

    # Add forecast range
    idx = pd.DataFrame(pd.date_range('2017-04-23','2017-05-31'), columns={'dateRange'})
    idx.set_index('dateRange',inplace=True)
    ts = pd.concat([ts,idx], axis=1)

    # Impute
    ts['air_store_id'].replace({np.nan: storeId}, inplace=True)
    ts['visitors'].replace({np.nan: 0}, inplace=True)
    ts.replace({np.nan: 0}, inplace=True)
    ts['forecast'] = 0.0

    # Try Catch Here
    try:
    
        #minDate = '2017-01-01'
        model = sm.tsa.ARIMA(endog=ts['visitors'][minDate:maxDate], exog=ts[exogParam][minDate:maxDate], order=orderParam)
        modelFit = model.fit()
        modelPred = modelFit.predict(start=forecast_start_range, end=forecast_out_range, exog=ts[exogParam])  # , dynamic=True
        #print('length of model: ' + str(len(modelPred)))  #70

        lengthModelPred = len(modelPred)

        #ts['forecast']["2017-03-18":"2017-05-31"] = modelPred[:]
        ts['forecast'][len(ts)-lengthModelPred : len(ts)] = modelPred[:]
        if shift == 1:
            ts['forecast'] = (ts['forecast'].shift(+1))

        # Grab AIC and MSE
        y = ts['visitors']["2017-03-23":"2017-04-22"]
        y_frcast = ts['forecast']["2017-03-23":"2017-04-22"]
        mse_frcast = ((y_frcast - y) ** 2).mean()
        #print('AIC: ' + str(modelFit.aic))
        #print('MSE: ' + str(mse_frcast))
        #print('\n')
    

    except:
        #(RuntimeError, TypeError, NameError, ValueError): 
        #errorOccured = True
        #(RuntimeError, TypeError, NameError):
        #print('Error')
        #print(RuntimeError())
        #print(TypeError)
        #print(NameError)
        #print(ValueError)
        #print('\n')
        logger.info('Error processing store: ' + storeId)
        errorOccured = True        
    pass


    
    #newColList = ['cycle','trend','3-day-SMA',,'31-day-SMA',,'EWMA_31_days']
    plotColList = ['visitors','forecast','reserve_visitors','holiday_flg','weekend']
    plotColList = plotColList + ['7-day-SMA','EWMA_7_days']
    
    if plot==1:
        ts[plotColList]["2016-01-01":"2017-05-31"].plot(grid=True)
    
    return ts[:]["2017-04-23":], errorOccured

In [15]:
# WHERE THE FUNCTION WOULD BE
def fn_ts_forecast_ARIMA_exog2(df, storeId, colName, exogParam, orderParam,
                               visitCorrThreshold, shift, plot):
    warnings.filterwarnings("ignore") 

    errorOccured = False
    
    # Set the storeId
    ts = df[df['air_store_id']==storeId].copy()
    ts.asfreq('D')  # not needed
    ts['forecast'] = 0 # initialize

    visitorCorr = ts['corr_vis_resv'].head(1)
    
    # If correlation is above a threshold, then add the exog parameter of reservations for visitors
    if float(visitorCorr) >= float(visitCorrThreshold):
        #print('Visitor Corr: ' + str(visitorCorr))
        exogParam = exogParam + ['reserve_visitors']
    
    minDate = ts.index.min()
    maxDate = ts.index.max() 
    maxDate = maxDate - pd.DateOffset(30) # offset 30 days to see how the model will perform on train/test data
    #print('max date: ' + str(maxDate))
    
    forecast_start_range = len(ts) - 60  # so let's play with a month out to start the prediction
    # second try would be 15
    forecast_out_range = len(ts) + 39  # this is the number of days between 2017-04-22 and 2017-05-31
    # It needs + 39 to properly forecast

    # Add forecast range
    idx = pd.DataFrame(pd.date_range('2017-04-23','2017-05-31'), columns={'dateRange'})
    idx.set_index('dateRange',inplace=True)
    ts = pd.concat([ts,idx], axis=1)

    # Impute
    ts['air_store_id'].replace({np.nan: storeId}, inplace=True)
    ts[colName].replace({np.nan: 0}, inplace=True)
    ts.replace({np.nan: 0}, inplace=True)
    ts['forecast'] = 0.0

    # Try Catch Here
    try:
    
        #minDate = '2017-01-01'
        model = sm.tsa.ARIMA(endog=ts[colName][minDate:maxDate], exog=ts[exogParam][minDate:maxDate], order=orderParam)
        modelFit = model.fit()
        modelPred = modelFit.predict(start=forecast_start_range, end=forecast_out_range, exog=ts[exogParam])  # , dynamic=True
        #print('length of model: ' + str(len(modelPred)))  #70

        lengthModelPred = len(modelPred)

        #ts['forecast']["2017-03-18":"2017-05-31"] = modelPred[:]
        ts['forecast'][len(ts)-lengthModelPred : len(ts)] = modelPred[:]
        if shift == 1:
            ts['forecast'] = (ts['forecast'].shift(+1))

        # Grab AIC and MSE
        y = ts['visitors']["2017-03-23":"2017-04-22"]
        y_frcast = ts['forecast']["2017-03-23":"2017-04-22"]
        mse_frcast = ((y_frcast - y) ** 2).mean()
        #print('AIC: ' + str(modelFit.aic))
        #print('MSE: ' + str(mse_frcast))
        #print('\n')
    

    except:
        #(RuntimeError, TypeError, NameError, ValueError): 
        #errorOccured = True
        #(RuntimeError, TypeError, NameError):
        #print('Error')
        #print(RuntimeError())
        #print(TypeError)
        #print(NameError)
        #print(ValueError)
        #print('\n')
        logger.info('Error processing store: ' + storeId)
        errorOccured = True        
    pass
    
    #newColList = ['cycle','trend','3-day-SMA',,'31-day-SMA',,'EWMA_31_days']
    plotColList = ['visitors','forecast','reserve_visitors','holiday_flg','weekend']
    plotColList = plotColList + ['7-day-SMA','EWMA_7_days']
    
    if plot==1:
        ts[plotColList]["2016-01-01":"2017-05-31"].plot(grid=True)
    
    return ts[:]["2017-04-23":], errorOccured

In [16]:
# Get all Unique Visits
visitStoreArr = rundf['air_store_id'].unique()

orderParam = (7,0,0)
visitCorrThreshold = 0.49

wkdayList = ['Friday', 'Monday', 'Saturday', 'Sunday', 'Thursday', 'Tuesday', 'Wednesday']
#exogParam = ['holiday_flg','dayofweek_num']
#exogParam = ['holiday_flg'] + wkdayList
#exogParam = ['holiday_flg','weekend'] + wkdayList  # Weekend doesn't seem to make a difference when combined.
exogParam = ['holiday_flg','weekend'] # combined with holiday, this over-all does better

shift=0
plot=0

# Default to the max of the array
maxLoopRun = len(visitStoreArr)

# if we are testing
testRun = 0
if testRun > 0:
    maxLoopRun = testRun

print('Total Records to Process: ' + str(maxLoopRun))
    
i=0
while i < maxLoopRun: 
    
    #if i%300==0:
    if i%300==0:
        print('Processing Index: ' + str(i) + ' - Store ID: ' + visitStoreArr[i])
    
    # Log Start
    logger.info('Start Index: ' + str(i) + ' - Restaraunt: ' + str(visitStoreArr[i]))
    
    # Run TS
    #dfTsRun, bError = fn_run_arima_timeseries(visitStoreArr[i], orderList)
    #dfTsRun, bError = fn_ts_forecast_ARIMA_exog(dfSuper, visitStoreArr[i], exogParam,orderParam, visitCorrThreshold, shift, plot)
    dfTsRun, bError = fn_ts_forecast_ARIMA_exog2(dfSuper, 
                                                visitStoreArr[i], '7-day-SMA',
                                                exogParam,orderParam, visitCorrThreshold, shift, plot)
    
    '''df, 
    storeId, colName, 
    exogParam, orderParam,
    visitCorrThreshold, shift, plot'''
    
    if bError == True:
        placeHolder=1
        
    # TODO:Reformat (possibly done), any numbers
    dfTsRun.reset_index(inplace=True)
    dfTsRun = dfTsRun[['air_store_id','index','visitors','forecast']]
    # colNames = ('visit_date','air_store_id','visitors','visitors_log','forecast','forecast_log','forecast_logExp')
    dfTsRun = dfTsRun.rename(columns={'index': 'visit_date'})
    
    with open(exportResultsSubmissionFileName, 'a') as f:
        dfTsRun.to_csv(f, header=includeSubmissionHeaderRunOnce, index=False, quotechar='"')
        f.close()
        includeSubmissionHeaderRunOnce = False
    
    # Log Start
    logger.info('Finish Index: ' + str(i) + ' - Restaraunt: ' + str(visitStoreArr[i]))
    
    # increment
    i=i+1
    
print('\n')
print('Finished All Records')

Total Records to Process: 829
Processing Index: 0 - Store ID: air_00a91d42b08b08d9
Processing Index: 300 - Store ID: air_629edf21ea38ac2d
Processing Index: 600 - Store ID: air_b8d9e1624baaadc2


Finished All Records


In [17]:
# Close out
logger.info('Stop Logging')
handlers = logger.handlers
for handler in handlers:
    handler.close()

In [19]:
hlp.fn_finalize_submission_file('','')

Correct Submission Length
forecast
Wrote file: D:\project\data\kg_jpn_rest\export\20180102_subm_frcst_flt.csv
