In [1]:
import sys
import os

def configure_spark(spark_home=None, pyspark_python=None):
    spark_home = spark_home or "/path/to/default/spark/home"
    os.environ['SPARK_HOME'] = spark_home

    # Add the PySpark directories to the Python path:
    sys.path.insert(1, os.path.join(spark_home, 'python'))
    sys.path.insert(1, os.path.join(spark_home, 'python', 'pyspark'))
    sys.path.insert(1, os.path.join(spark_home, 'python', 'build'))

    # If PySpark isn't specified, use currently running Python binary:
    pyspark_python = pyspark_python or sys.executable
    os.environ['PYSPARK_PYTHON'] = pyspark_python
    
configure_spark('/usr/local/spark', '/home/ubuntu/anaconda3/envs/dat500/bin/python')

In [2]:
import argparse
import findspark
findspark.init()
import pyspark
from pyspark import SQLContext
sc = pyspark.SparkContext(master='spark://192.168.11.239:7077', appName='hourly_forecaster')
sqlContext = SQLContext(sc)

In [3]:
from datetime import datetime
import pyspark.sql.functions as F #avoid conflicts with regular python functions
from pyspark.sql.functions import udf, pandas_udf, PandasUDFType
from pyspark.sql.types import *
import pandas as pd
os.environ["ARROW_PRE_0_15_IPC_FORMAT"] = "0"
from fbprophet import Prophet
import matplotlib.pyplot as plt
from fbprophet.plot import plot_plotly
import plotly.offline as py
import holidays
import numpy as np

In [4]:
df_train = sqlContext.read.csv("/datasets/crimes.csv", header='true')\
.filter( (F.col("Year") < 2020) & (F.col("Year") > 2017)  ).withColumn('Day', F.to_date("Date", 'MM/dd/yyyy hh:mm:ss a'))\
.groupBy("Day","District").count()

df_test = sqlContext.read.csv("/datasets/crimes.csv", header='true')\
.filter(F.col("Year") == 2020).withColumn('Day', F.to_date("Date", 'MM/dd/yyyy hh:mm:ss a'))\
.groupBy("Day","District").count()

print("Length of train: ", df_train.count())
print("Length of test: ", df_test.count())

Length of train:  16077
Length of test:  661


In [5]:
df_test.show()

+----------+--------+-----+
|       Day|District|count|
+----------+--------+-----+
|2020-01-30|     009|   22|
|2020-01-12|     001|   28|
|2020-01-15|     006|   40|
|2020-01-01|     004|   34|
|2020-01-16|     025|   44|
|2020-01-02|     003|   43|
|2020-01-20|     010|   40|
|2020-01-03|     019|   27|
|2020-01-11|     019|   26|
|2020-01-09|     025|   27|
|2020-01-03|     001|   30|
|2020-01-01|     014|   25|
|2020-01-14|     024|   28|
|2020-01-28|     010|   30|
|2020-01-12|     007|   26|
|2020-01-24|     002|   22|
|2020-01-04|     011|   44|
|2020-01-19|     020|    5|
|2020-01-21|     022|   17|
|2020-01-25|     010|   35|
+----------+--------+-----+
only showing top 20 rows



In [6]:
#Last day to train on
df_train.select(F.max(F.col('Day'))).collect()[0][0]

datetime.date(2019, 12, 31)

In [7]:
def generate_US_holidays(start,end,state="IL"):
    hdays = holidays.UnitedStates(years=list(np.arange(start,end+1)),state=state).items()
    hdays = dict(list(hdays)) #Convert to dict since it is immutable
    return {str(k):v for k,v in hdays.items()} #Want Y-m-d format

us_hdays = generate_US_holidays(2001,2020)

In [24]:
@udf(StringType())
def holiday_desc(_date):
    if str(_date) in us_hdays.keys():
        return us_hdays[str(_date)]
    else:
        return "None"

Add holiday description as a column on the dataframe

In [25]:
df = (df\
    .withColumn('holiday', holiday_desc(F.col('Day'))).select("holiday","Day").show())

+--------------+----------+
|       holiday|       Day|
+--------------+----------+
|New Year's Day|2018-01-01|
|New Year's Day|2018-01-01|
|New Year's Day|2018-01-01|
|New Year's Day|2018-01-01|
|New Year's Day|2018-01-01|
|New Year's Day|2018-01-01|
|New Year's Day|2018-01-01|
|New Year's Day|2018-01-01|
|New Year's Day|2018-01-01|
|New Year's Day|2018-01-01|
|New Year's Day|2018-01-01|
|New Year's Day|2018-01-01|
|New Year's Day|2018-01-01|
|New Year's Day|2018-01-01|
|New Year's Day|2018-01-01|
|New Year's Day|2018-01-01|
|New Year's Day|2018-01-01|
|New Year's Day|2018-01-01|
|New Year's Day|2018-01-01|
|New Year's Day|2018-01-01|
+--------------+----------+
only showing top 20 rows



In [26]:
schema = StructType([
        StructField("District", StringType(), True),
        StructField("count", StringType(), True),
        StructField("ds", DateType(), True),
        StructField("yhat", DoubleType(), True)
    ])

## Forecast data in parallel using Pandas UDFs

This strategy is heavily inspired by:
https://github.com/AlexWarembourg/Medium/blob/master/Pyspark_Pandas_UDF.ipynb

In [28]:
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def prophet_udf(df):
    
    def train_predict_prophet(df, cutoff):
        ts_train = (df
                    .query('Day <= @cutoff_point')
                    .rename(columns={'Day': 'ds', 'count': 'y'})
                    .sort_values('ds')
                    )
        ts_test = (df
                   .query('Day > @cutoff_point')
                   .rename(columns={'Day': 'ds', 'count': 'y'})
                   .sort_values('ds')
                   .assign(ds=lambda x: pd.to_datetime(x["ds"]))
                   .drop('y', axis=1)
                   )

        m = Prophet(yearly_seasonality=True,
                    weekly_seasonality=True,
                    daily_seasonality=True)
        m.fit(ts_train)

        # to date
        df["Day"] = pd.to_datetime(df["Day"])

        
        ts_hat = (m.predict(ts_test)[["ds", "yhat"]]
                  .assign(ds=lambda x: pd.to_datetime(x["ds"]))
                  ).merge(ts_test, on=["ds"], how="left")  # merge to retrieve item and store index
        return pd.DataFrame(ts_hat, columns=schema.fieldNames())

    return train_predict_prophet(df, cutoff_point)

In [27]:
df_train = sqlContext.read.csv("/datasets/crimes.csv", header='true')\
.filter( (F.col("Year") < 2020) & (F.col("Year") > 2017)  )\
.withColumn('Day', F.to_date("Date", 'MM/dd/yyyy hh:mm:ss a'))\
.withColumn('holiday', holiday_desc(F.col('Day')))\
.select("Day","District","holiday")\
.groupBy("Day","District","holiday")\
.count()


df_test = sqlContext.read.csv("/datasets/crimes.csv", header='true')\
.filter(F.col("Year") == 2020)\
.withColumn('Day', F.to_date("Date", 'MM/dd/yyyy hh:mm:ss a'))\
.withColumn('holiday', holiday_desc(F.col('Day')))\
.select("Day","District","holiday")\
.groupBy("Day","District","holiday")\
.count()

cutoff_point = df_train.select(F.max(F.col('Day'))).collect()[0][0]
#df_test = df_test.withColumn('count', F.lit(None))

df = (df_train.union(df_test)).sort(F.col('Day'))


predictions = (df
              .groupBy("District")
              .apply(prophet_udf)
              )

Okay, lets see an example from district 8

In [29]:
predictions.filter(F.col("District")=="008").show(5)

+--------+-----+----------+------------------+
|District|count|        ds|              yhat|
+--------+-----+----------+------------------+
|     008| null|2020-01-01|47.985235827081695|
|     008| null|2020-01-02|37.820163262207245|
|     008| null|2020-01-03|39.696138621564536|
|     008| null|2020-01-04|  35.7491284104951|
|     008| null|2020-01-05| 35.82730837248379|
+--------+-----+----------+------------------+
only showing top 5 rows



In [17]:
avocado_season = pd.DataFrame({
  'holiday': 'avocado season',
  'ds': pd.to_datetime(['2014-07-31', '2014-09-16', 
                        '2015-07-31', '2015-09-16',
                        '2016-07-31', '2016-09-16',
                        '2017-07-31', '2017-09-16',
                       '2018-07-31', '2018-09-16',
                        '2019-07-31', '2019-09-16']),
  'lower_window': -1,
  'upper_window': 0,
})
avocado_season

Unnamed: 0,holiday,ds,lower_window,upper_window
0,avocado season,2014-07-31,-1,0
1,avocado season,2014-09-16,-1,0
2,avocado season,2015-07-31,-1,0
3,avocado season,2015-09-16,-1,0
4,avocado season,2016-07-31,-1,0
5,avocado season,2016-09-16,-1,0
6,avocado season,2017-07-31,-1,0
7,avocado season,2017-09-16,-1,0
8,avocado season,2018-07-31,-1,0
9,avocado season,2018-09-16,-1,0
