<a href="https://colab.research.google.com/github/hamzafarooq/Time-Series/blob/master/PySpark_xgboost.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


# **Running Pyspark in Colab**

To run spark in Colab, we need to first install all the dependencies in Colab environment i.e. Apache Spark 2.3.2 with hadoop 2.7, Java 8 and Findspark to locate the spark in the system. The tools installation can be carried out inside the Jupyter Notebook of the Colab. One important note is that if you are new in Spark, it is better to avoid Spark 2.4.0 version since some people have already complained about its compatibility issue with python. 
Follow the steps to install the dependencies:

In [1]:
# Run below commands
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.osuosl.org/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz
!tar xf spark-3.0.0-bin-hadoop3.2.tgz
!pip install -q findspark

In [37]:
!pip uninstall pyarrow -y
!pip install pyarrow==0.15.1

Uninstalling pyarrow-0.14.1:
  Successfully uninstalled pyarrow-0.14.1
Collecting pyarrow==0.15.1
[?25l  Downloading https://files.pythonhosted.org/packages/6c/32/ce1926f05679ea5448fd3b98fbd9419d8c7a65f87d1a12ee5fb9577e3a8e/pyarrow-0.15.1-cp36-cp36m-manylinux2010_x86_64.whl (59.2MB)
[K     |████████████████████████████████| 59.2MB 82kB/s 
Installing collected packages: pyarrow
Successfully installed pyarrow-0.15.1


Now that you installed Spark and Java in Colab, it is time to set the environment path which enables you to run Pyspark in your Colab environment. Set the location of Java and Spark by running the following code:

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

Run a local spark session to test your installation:

In [3]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

Congrats! Your Colab is ready to run Pyspark. Let's build a simple Linear Regression model.

# Linear Regression Model


Linear Regression model is one the oldest and widely used machine learning approach which assumes a relationship between dependent and independent variables. For example, a modeler might want to predict the forecast of the rain based on the humidity ratio. Linear Regression consists of the best fitting line through the scattered points on the graph and the best fitting line is known as the regression line.

The goal of this exercise to predict the housing prices by the given features. Let's predict the prices of the Boston Housing dataset by considering MEDV as the output variable and all the other variables as input.

Download the dataset from [here](https://github.com/asifahmed90/pyspark-ML-in-Colab/blob/master/BostonHousing.csv) and keep it somewhere on your computer. Load the dataset into your Colab directory from your local system:

In [17]:
from google.colab import auth
auth.authenticate_user()
print('Authenticated')

Authenticated


In [18]:
%%bigquery --project bold-sorter-281506 df2
SELECT *
FROM `bigquery-public-data.iowa_liquor_sales.sales`
where store_number  = '2633'
and date > '2018-01-01'

In [20]:
df2.to_csv('/content/drive/My Drive/data/store_2633.csv',index=False)

In [4]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


Check the dataset is uploaded correctly in the system by the following command

In [44]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

dataset = spark.read.csv('/content/drive/My Drive/data/store_2633.csv',inferSchema=True, header =True)

Now that we have uploaded the dataset, we can start analyzing. 
For our linear regression model we need to import two modules from Pyspark i.e. Vector Assembler and Linear Regression. Vector Assembler is a transformer that assembles all the features into one vector from multiple columns that contain type double. We could have used StringIndexer if any of our columns contains string values to convert it into numeric values. Luckily, the BostonHousing dataset only contains double values, so we don't need to worry about StringIndexer for now.

Notice that we used InferSchema inside read.csv mofule. InferSchema enables us to infer automatically different data types for each column.

Let us print look into the dataset to see the data types of each column:

In [45]:
dataset.printSchema()

root
 |-- invoice_and_item_number: string (nullable = true)
 |-- date: string (nullable = true)
 |-- store_number: integer (nullable = true)
 |-- store_name: string (nullable = true)
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- zip_code: integer (nullable = true)
 |-- store_location: string (nullable = true)
 |-- county_number: integer (nullable = true)
 |-- county: string (nullable = true)
 |-- category: integer (nullable = true)
 |-- category_name: string (nullable = true)
 |-- vendor_number: integer (nullable = true)
 |-- vendor_name: string (nullable = true)
 |-- item_number: integer (nullable = true)
 |-- item_description: string (nullable = true)
 |-- pack: integer (nullable = true)
 |-- bottle_volume_ml: integer (nullable = true)
 |-- state_bottle_cost: double (nullable = true)
 |-- state_bottle_retail: double (nullable = true)
 |-- bottles_sold: integer (nullable = true)
 |-- sale_dollars: double (nullable = true)
 |-- volume_sold_liters: doub

In [46]:
dataset.show(5)

+-----------------------+----------+------------+--------------------+---------------+----------+--------+--------------------+-------------+------+--------+-------------------+-------------+--------------------+-----------+--------------------+----+----------------+-----------------+-------------------+------------+------------+------------------+-------------------+
|invoice_and_item_number|      date|store_number|          store_name|        address|      city|zip_code|      store_location|county_number|county|category|      category_name|vendor_number|         vendor_name|item_number|    item_description|pack|bottle_volume_ml|state_bottle_cost|state_bottle_retail|bottles_sold|sale_dollars|volume_sold_liters|volume_sold_gallons|
+-----------------------+----------+------------+--------------------+---------------+----------+--------+--------------------+-------------+------+--------+-------------------+-------------+--------------------+-----------+--------------------+----+--------

In [47]:
df=dataset.select("*").toPandas()

In [25]:
df2_ds = df[['date','sale_dollars','item_description']]
#df2_ds=df2_ds.sort_index(axis=0)
aggregated=df2_ds.groupby(['date','item_description'],as_index=False).sum()
dfs=aggregated 
dfs=dfs.sort_values('date')
dfs['ds']=dfs['date']
dfs['y']=dfs['sale_dollars']
dfs=dfs.drop(['date','sale_dollars'],axis=1)
dfs_list = dfs[dfs['item_description'].isin(['Black Velvet', 'Crown Royal', 'Black Velvet Toasted Caramel',\
                                      'Canadian Ltd Whisky','Crown Royal Regal Apple'])]

In [26]:
dfs.tail(5)

Unnamed: 0,item_description,ds,y
48544,Dekuyper Buttershots,2020-07-30,141.72
48543,Dekuyper Blue Curacao,2020-07-30,283.68
48542,Deep Eddy Ruby Red Grapefruit,2020-07-30,153.0
48539,Crown Royal Black,2020-07-30,315.0
48638,Windsor Canadian,2020-07-30,113.28


In [None]:
listofitems=dfs['item_description'].head(5).values

In [None]:
listofitems

array(['1800 Coconut', '44 North Huckleberry',
       'Absolut Citron (lemon Vodka)', 'Absolut Mandrin',
       'Absolut Peppar'], dtype=object)

In [27]:
dfs_list = dfs[dfs['item_description'].isin(['Black Velvet', 'Crown Royal', 'Black Velvet Toasted Caramel',\
                                      'Canadian Ltd Whisky','Crown Royal Regal Apple'])]

In [28]:
dfs_list.tail(5)

Unnamed: 0,item_description,ds,y
48420,Crown Royal Regal Apple,2020-07-27,47.99
48410,Canadian Ltd Whisky,2020-07-27,99.0
48540,Crown Royal Regal Apple,2020-07-30,575.88
48520,Black Velvet,2020-07-30,477.6
48538,Crown Royal,2020-07-30,10202.4


In [29]:
dfs_list.to_csv("/content/drive/My Drive/data/dfs.csv",index=False)

In [30]:
len(dfs_list)

782

In [5]:
dataset = spark.read.csv('/content/drive/My Drive/data/dfs.csv',inferSchema=True, header =True)

In [6]:
dataset.printSchema()

root
 |-- item_description: string (nullable = true)
 |-- ds: string (nullable = true)
 |-- y: double (nullable = true)



In [7]:
dataset.show(3,False)

+-----------------------+----------+------+
|item_description       |ds        |y     |
+-----------------------+----------+------+
|Crown Royal Regal Apple|2018-01-03|287.94|
|Black Velvet           |2018-01-04|1194.0|
|Crown Royal Regal Apple|2018-01-04|287.94|
+-----------------------+----------+------+
only showing top 3 rows



In [8]:
from pyspark.sql.types import StructType,StructField,StringType,TimestampType,ArrayType,DoubleType
from pyspark.sql.functions import current_date
from pyspark.sql.functions import pandas_udf, PandasUDFType
from fbprophet import Prophet
from datetime import datetime
import pandas as pd

In [9]:
result_schema = StructType([

    StructField('item_description', StringType(), True),
    StructField('ds', TimestampType(), True),
    StructField('trend', DoubleType(), True),
    StructField('trend_upper', DoubleType(), True),
    StructField('trend_lower', DoubleType(), True),
    StructField('yearly', DoubleType(), True),
    StructField('yearly_upper', DoubleType(), True),
    StructField('yearly_lower', DoubleType(), True),
    StructField('yhat', DoubleType(), True),
    StructField('yhat_upper', DoubleType(), True),
    StructField('yhat_lower', DoubleType(), True),
    StructField('multiplicative_terms', DoubleType(), True),
    StructField('multiplicative_terms_upper', DoubleType(), True),
    StructField('multiplicative_terms_lower', DoubleType(), True),
    StructField('additive_terms', DoubleType(), True),
    StructField('additive_terms_upper', DoubleType(), True),
    StructField('additive_terms_lower', DoubleType(), True),

    ])

In [30]:
result_schema = StructType([

    StructField('item_description', StringType(), True),
    StructField('ds', TimestampType(), True),
   
    StructField('yhat', DoubleType(), True),

    ])

In [35]:
@pandas_udf(result_schema, PandasUDFType.GROUPED_MAP)
def forecast_loans(df):

    def prophet_model(df,test_start_date):
        df['ds'] = pd.to_datetime(df['ds'])

       

        # train
        ts_train = (df
                    .query('ds < @test_start_date')
                    .sort_values('ds')
                    )
        # test
        ts_test = (df
                   .query('ds >= @test_start_date')
                   .sort_values('ds')
                   .drop('y', axis=1)
                   )

        print(ts_test.columns)

        # instantiate the model, configure the parameters
        model = Prophet(
            interval_width=0.95,
            growth='linear',
            daily_seasonality=False,
            weekly_seasonality=False,
            yearly_seasonality=True,
            seasonality_mode='multiplicative'
        )

        # fit the model

        model.fit(ts_train.loc[:,['ds','y']])

        # configure predictions
        future_pd = model.make_future_dataframe(
            periods=len(ts_test))

        # make predictions
        results_pd = model.predict(future_pd)
        results_pd = pd.concat([results_pd,df['item_description']],axis = 1)
        print(results_pd)
        result_pd=results_pd[['item_description','ds','yhat']]

        return pd.DataFrame(result_pd, columns = result_schema.fieldNames())

    # return predictions
    return prophet_model(df, test_start_date= '2020-07-15')



In [36]:
results =(dataset.groupBy('item_description').apply(forecast_loans))


It is preferred to use 'applyInPandas' over this API. This API will be deprecated in the future releases. See SPARK-28264 for more details.



In [37]:
results.show(3,False)

+----------------------------+-------------------+------------------+
|item_description            |ds                 |yhat              |
+----------------------------+-------------------+------------------+
|Black Velvet Toasted Caramel|2018-01-15 00:00:00|121.2770169662127 |
|Black Velvet Toasted Caramel|2018-01-29 00:00:00|120.29642121112805|
|Black Velvet Toasted Caramel|2018-02-01 00:00:00|119.70436362635611|
+----------------------------+-------------------+------------------+
only showing top 3 rows



In [38]:
df_result=results.select("*").toPandas()

In [39]:
df_result_2 = df_result[df_result['ds']>='2020-07-15']

In [40]:
df_result_2

Unnamed: 0,item_description,ds,yhat
348,Black Velvet,2020-07-15,377.881879
349,Black Velvet,2020-07-16,378.777069
350,Black Velvet,2020-07-17,381.034956
351,Black Velvet,2020-07-18,384.556453
603,Crown Royal,2020-07-15,3862.675839
604,Crown Royal,2020-07-16,3780.655368
605,Crown Royal,2020-07-17,3708.039231
779,Crown Royal Regal Apple,2020-07-15,4053.198556
780,Crown Royal Regal Apple,2020-07-16,4060.142121
781,Crown Royal Regal Apple,2020-07-17,4066.115431


In [21]:
df_result.to_csv('df_results.csv',index=False)

Next step is to convert all the features from different columns into a single column and let's call this new vector column as 'Attributes' in the outputCol.

In [48]:
df2_ds = df[['date','sale_dollars','item_description']]
#df2_ds=df2_ds.sort_index(axis=0)
aggregated=df2_ds.groupby(['date','item_description'],as_index=False).sum()
dfs=aggregated 
dfs=dfs.sort_values('date')
dfs_list = dfs[dfs['item_description'].isin(['Black Velvet', 'Crown Royal', 'Black Velvet Toasted Caramel',\
                                      'Canadian Ltd Whisky','Crown Royal Regal Apple'])]

In [49]:
dfs_list.head(5)

Unnamed: 0,date,item_description,sale_dollars
23,2018-01-03,Crown Royal Regal Apple,287.94
103,2018-01-04,Black Velvet,1194.0
128,2018-01-04,Crown Royal Regal Apple,287.94
272,2018-01-08,Black Velvet,119.4
418,2018-01-11,Canadian Ltd Whisky,99.0


In [50]:
testspark = spark.createDataFrame(dfs_list)

In [51]:
testspark.show(5,False)

+----------+-----------------------+------------+
|date      |item_description       |sale_dollars|
+----------+-----------------------+------------+
|2018-01-03|Crown Royal Regal Apple|287.94      |
|2018-01-04|Black Velvet           |1194.0      |
|2018-01-04|Crown Royal Regal Apple|287.94      |
|2018-01-08|Black Velvet           |119.4       |
|2018-01-11|Canadian Ltd Whisky    |99.0        |
+----------+-----------------------+------------+
only showing top 5 rows



# xgboost Model

In [90]:

import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
import os
import pandas as pd
import statsmodels.api as sm
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import matplotlib.pyplot as plt
import xgboost as xgb
from sklearn.metrics import mean_squared_error, mean_absolute_error
import imageio
import os
from statsmodels.graphics.tsaplots import plot_acf


pandas.util.testing is deprecated. Use the functions in the public API at pandas.testing instead.



In [190]:
def create_features(df):
    """
    Creates time series features from datetime index
    """
    #df['date'] = df.index
    df['dayofweek'] = df['date'].dt.dayofweek
    df['quarter'] = df['date'].dt.quarter
    df['month'] = df['date'].dt.month
    df['year'] = df['date'].dt.year
    df['dayofyear'] = df['date'].dt.dayofyear
    df['dayofmonth'] = df['date'].dt.day
    df['weekofyear'] = df['date'].dt.weekofyear
    df['flag'] = pd.Series(np.where(df['date'] >= np.datetime64('2020-03-03'), 1, 0), index=df.index)
    
    X = df[['dayofweek','quarter','month','year',
           'dayofyear','dayofmonth','weekofyear','flag']]
    return X


In [191]:
def split_data(data, split_date):
    return data[data['date'] <= split_date].copy(), \
           data[data['date'] >  split_date].copy()

In [207]:
result_schema = StructType([
    StructField('date',TimestampType(), True),
    StructField('item_description', StringType(), True),
    StructField('prediction', DoubleType(), True),
    StructField('actual', DoubleType(), True),

    ])

In [256]:
import time
import numpy as np
from xgboost import XGBRegressor
from lightgbm import LGBMRegressor
from sklearn.model_selection import KFold
#from sklearn.metrics import accuracy
from sklearn.metrics import mean_absolute_error
from sklearn.model_selection import RandomizedSearchCV

In [212]:
@pandas_udf(result_schema, PandasUDFType.GROUPED_MAP)
def forecast_items(df):

    def xgboost_model(df):
        df['date'] = pd.to_datetime(df['date'])
        train, test = split_data(df, '2020-06-01')
        X_train, y_train = create_features(train), train['sale_dollars']
        X_test, y_test   = create_features(test), test['sale_dollars']

        para5= {'n_estimator':1000,'subsample': 0.33, 'min_child_weight': 0.4021, 'max_depth': 3,
                'learning_rate': 0.0455, 
        'gamma': 4.43, 'colsample_bytree': 0.9499999999999995, 'colsample_bylevel': 1.0}


        para5['eval_metric'] = "mae"
        num_boost_round = 999   
        
        
        # instantiate the model, configure the parameters
        reg=xgb.XGBRegressor(**para5)



        # fit the model

        reg.fit(X_train, y_train,
        eval_set=[(X_train, y_train), (X_test, y_test)],
        early_stopping_rounds=500, #stop if 50 consequent rounds without decrease of error
        verbose=False
        )
        

        # make predictions
        results_pd = reg.predict(X_test)
        three_list = [results_pd,test['date'].values,y_test]
        results_df =pd.DataFrame(three_list).transpose()
        results_df.columns = ['prediction','date','actual']
        results_df['item_description']=df['item_description']
        #results_df['date']=str(test['date'])
        #results_df = pd.concat([results_df,test['date']],axis = 1)
        #print(results_pd)
        

        return pd.DataFrame(results_df, columns = result_schema.fieldNames())

    # return predictions
    return xgboost_model(df)



In [265]:
@pandas_udf(result_schema, PandasUDFType.GROUPED_MAP)
def forecast_items(df):

    def xgboost_model(df):
        df['date'] = pd.to_datetime(df['date'])
        train, test = split_data(df, '2020-06-01')
        X_train, y_train = create_features(train), train['sale_dollars']
        X_test, y_test   = create_features(test), test['sale_dollars']

        model = XGBRegressor()

        param_grid = {
                'max_depth': [3, 4, 5, 6, 7, 8, 9, 10, 11, 12],
                'min_child_weight': np.arange(0.0001, 0.5, 0.001),
                'gamma': np.arange(0.0,40.0,0.005),
                'learning_rate': np.arange(0.0005,0.3,0.0005),
                'subsample': np.arange(0.01,1.0,0.01),
                'colsample_bylevel': np.round(np.arange(0.1,1.0,0.01)),
                'colsample_bytree' : np.arange(0.1,1.0,0.01)
        }

        kfold = KFold(n_splits=10, shuffle=True, random_state=10)
        grid_search = RandomizedSearchCV(model, param_grid, scoring="r2", n_iter = 1000, cv=kfold)
        grid_result = grid_search.fit(X_train,y_train)

       
        

        # make predictions
        results_pd = grid_search.predict(X_test)
        three_list = [results_pd,test['date'].values,y_test]
        results_df =pd.DataFrame(three_list).transpose()
        results_df.columns = ['prediction','date','actual']
        results_df['item_description']=df['item_description']
        #results_df['date']=str(test['date'])
        #results_df = pd.concat([results_df,test['date']],axis = 1)
        #print(results_pd)
        

        return pd.DataFrame(results_df, columns = result_schema.fieldNames())

    # return predictions
    return xgboost_model(df)



In [266]:
from time import process_time

t = process_time()
results =(testspark.groupBy('item_description').apply(forecast_items))
df_result=results.select("*").toPandas()
elapsed_time = process_time() - t



It is preferred to use 'applyInPandas' over this API. This API will be deprecated in the future releases. See SPARK-28264 for more details.



In [None]:
elapsed_time

In [267]:
display(results)

DataFrame[date: timestamp, item_description: string, prediction: double, actual: double]

In [262]:
#results.show(5)

KeyboardInterrupt: ignored

In [268]:
df_result=results.select("*").toPandas()

KeyboardInterrupt: ignored

In [269]:
df_result.head(20)

Unnamed: 0,date,item_description,prediction,actual
0,2020-06-08,Black Velvet Toasted Caramel,121.38298,121.56
1,2020-06-18,Black Velvet Toasted Caramel,121.38298,121.56
2,2020-06-25,Black Velvet Toasted Caramel,121.38298,121.56
3,2020-07-02,Black Velvet Toasted Caramel,121.38298,121.56
4,2020-07-13,Black Velvet Toasted Caramel,121.38298,121.56
5,2020-07-20,Black Velvet Toasted Caramel,121.38298,121.56
6,2020-06-02,Black Velvet,503.795746,119.4
7,2020-06-04,Black Velvet,587.075623,358.2
8,2020-06-08,Black Velvet,527.569519,358.2
9,2020-06-15,Black Velvet,502.571777,716.4


In [225]:
dfs_list.head(5)

Unnamed: 0,date,item_description,sale_dollars
23,2018-01-03,Crown Royal Regal Apple,287.94
103,2018-01-04,Black Velvet,1194.0
128,2018-01-04,Crown Royal Regal Apple,287.94
272,2018-01-08,Black Velvet,119.4
418,2018-01-11,Canadian Ltd Whisky,99.0


In [270]:
def xgboost_model_seq(df,item):
    df =df[df['item_description']==item]
    df=df.drop('item_description',axis=1)
    df['date'] = pd.to_datetime(df['date'])
    train, test = split_data(df, '2020-06-01')
    X_train, y_train = create_features(train), train['sale_dollars']
    X_test, y_test   = create_features(test), test['sale_dollars']

    model = XGBRegressor()

    param_grid = {
            'max_depth': [3, 4, 5, 6, 7, 8, 9, 10, 11, 12],
            'min_child_weight': np.arange(0.0001, 0.5, 0.001),
            'gamma': np.arange(0.0,40.0,0.005),
            'learning_rate': np.arange(0.0005,0.3,0.0005),
            'subsample': np.arange(0.01,1.0,0.01),
            'colsample_bylevel': np.round(np.arange(0.1,1.0,0.01)),
            'colsample_bytree' : np.arange(0.1,1.0,0.01)
    }

    kfold = KFold(n_splits=10, shuffle=True, random_state=10)
    grid_search = RandomizedSearchCV(model, param_grid, scoring="r2", n_iter = 1000, cv=kfold)
    grid_result = grid_search.fit(X_train,y_train)

  
    

    # make predictions
    results_pd = grid_search.predict(X_test)
    three_list = [results_pd,test['date'].values,y_test]
    results_df =pd.DataFrame(three_list).transpose()
    results_df.columns = ['prediction','date','actual']
    results_df['item_description']=item
    #results_df['date']=str(test['date'])
    #results_df = pd.concat([results_df,test['date']],axis = 1)
    #print(results_pd)
    

    return results_df


In [271]:
items=dfs_list['item_description'].unique()

In [272]:
appended_data = pd.DataFrame()

In [273]:
from time import process_time

t = process_time()
for item in items:
  data=xgboost_model_seq(dfs_list,item)
  appended_data=appended_data.append(data,ignore_index=True)
elapsed_time = process_time() - t

#appended_data = pd.concat(appended_data,ignore_index=True)


[1;30;43mStreaming output truncated to the last 5000 lines.[0m


In [274]:
elapsed_time

1076.058318979

In [275]:
appended_data

Unnamed: 0,prediction,date,actual,item_description
0,2869.08,2020-06-04,287.94,Crown Royal Regal Apple
1,2228.08,2020-06-08,340.08,Crown Royal Regal Apple
2,2479.3,2020-06-15,628.02,Crown Royal Regal Apple
3,2329.58,2020-06-18,287.94,Crown Royal Regal Apple
4,2731.22,2020-06-25,340.08,Crown Royal Regal Apple
5,2387.33,2020-06-29,628.02,Crown Royal Regal Apple
6,2144.43,2020-07-06,628.02,Crown Royal Regal Apple
7,2389.45,2020-07-13,287.94,Crown Royal Regal Apple
8,2214.7,2020-07-16,915.96,Crown Royal Regal Apple
9,2030.69,2020-07-20,31776.2,Crown Royal Regal Apple
