# Tutorial on PySpark Transformations and MLIB
## Learn how to transform  Data and apply Regression techniques using PySpark
-  Rossmann Sales Dataset is used in this tutorial and it can be found at https://www.kaggle.com/c/rossmann-store-sales/data .
- For installing spark 2.2.1 and  to enable it on your Jupyter Notebook on your local PC (Windows) https://medium.com/@GalarnykMichael/install-spark-on-windows-pyspark-4498a5d8d66c

### Why PySpark ?
Apache Spark is one the most widely used frameworks when it comes to handling and working with Big Data and Python is one of the most widely used programming languages for Data Analysis, Machine Learning, and much more. So, why not use them together? This is where Spark with Python also known as PySpark comes into the picture.

PySpark MLlib is a machine-learning library. It is a wrapper over PySpark Core to do data analysis using machine-learning algorithms. It works on distributed systems and is scalable. We can find implementations of classification, clustering, linear regression, and other machine-learning algorithms in PySpark MLlib.

### Initializing a Spark Session and importing necessary libraries

In [None]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql import Window
spark=SparkSession.builder.appName('Sample').getOrCreate()

Default no of partitions in spark is 200, it can be changed based on your requirement. We can even repartition the data based on the  columns.
- example: dataframe1=dataframe.repartition(x)
   - x: can be no of partitions or even the column name  on which you want to partition the data

In [None]:
sqlContext.sql("set spark.sql.shuffle.partitions=10") 

### Dataframes in Spark
In Spark, a DataFrame is a distributed collection of rows under named columns. 
In simple terms, it can be referred as a table in relational database or an Excel sheet with Column headers. 
It has the following caharacteristics:
- Immutable in nature : We can create DataFrame  once but can’t change it. And we can transform a DataFrame after applying transformations.
- Lazy Evaluations: Which means that a task is not executed until an action is performed.
   - Action commands in spark : count(),collect(), aggregate(),reduce() etc
- Distributed: DataFrame are distributed in nature.

### Loading CSV data into Spark Dataframes

In [3]:
# reading the csv files into spark datframes
train=spark.read.csv('train_v2.csv',header='true', inferSchema='true')
stores=spark.read.csv('store.csv',header='true', inferSchema='true')

### Schema Of the Data retreived

In [4]:
# Lets have a  look at the schema of the data to get a better unbderstanding of what the data is 
train.printSchema()

root
 |-- Store: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- Sales: integer (nullable = true)
 |-- Customers: integer (nullable = true)
 |-- Open: integer (nullable = true)
 |-- Promo: integer (nullable = true)
 |-- StateHoliday: string (nullable = true)
 |-- SchoolHoliday: integer (nullable = true)



In [5]:
# To view the top 5 records of the dataframe
train.show(5)

+-----+---------+-------------------+-----+---------+----+-----+------------+-------------+
|Store|DayOfWeek|               Date|Sales|Customers|Open|Promo|StateHoliday|SchoolHoliday|
+-----+---------+-------------------+-----+---------+----+-----+------------+-------------+
|    1|        5|2015-01-30 00:00:00| 5577|      616|   1|    1|           0|            0|
|    2|        5|2015-01-30 00:00:00| 5919|      624|   1|    1|           0|            0|
|    3|        5|2015-01-30 00:00:00| 6911|      678|   1|    1|           0|            0|
|    4|        5|2015-01-30 00:00:00|13307|     1632|   1|    1|           0|            0|
|    5|        5|2015-01-30 00:00:00| 5640|      617|   1|    1|           0|            0|
+-----+---------+-------------------+-----+---------+----+-----+------------+-------------+
only showing top 5 rows



In [6]:
# now let's have a look at the stores data
stores.show(5)

+-----+---------+----------+-------------------+-------------------------+------------------------+------+---------------+---------------+---------------+
|Store|StoreType|Assortment|CompetitionDistance|CompetitionOpenSinceMonth|CompetitionOpenSinceYear|Promo2|Promo2SinceWeek|Promo2SinceYear|  PromoInterval|
+-----+---------+----------+-------------------+-------------------------+------------------------+------+---------------+---------------+---------------+
|    1|        c|         a|               1270|                        9|                    2008|     0|           null|           null|           null|
|    2|        a|         a|                570|                       11|                    2007|     1|             13|           2010|Jan,Apr,Jul,Oct|
|    3|        a|         a|              14130|                       12|                    2006|     1|             14|           2011|Jan,Apr,Jul,Oct|
|    4|        c|         c|                620|                      

### Data Preparation

In [7]:
#Extracting the start_date and End_date from the train dataframe
print("End_Date:",train.select('Date').rdd.max()[0])
print("Start_Date:",train.select('Date').rdd.min()[0])

End_Date: 2015-01-30 00:00:00
Start_Date: 2014-11-26 00:00:00


In [8]:
# lets compute the average sales per customer per  store 
# 1.create a new column with avg sales of a customer  for each store and each date
train =train.withColumn('SalesPerCustomer',train['Sales']/train['Customers'])
# 2.now compute the average customers, sales and sales per customer at doing store level aggregation
avg_store=train.groupby('Store').mean('customers','sales','salespercustomer')
# 3. join  the results with stores dataframe
store=stores.join(avg_store,'Store',how='inner')
#store.show(5)

In [9]:
#join stores info to the sales(train) data
join=train.join(store,'Store',how='inner')
join.printSchema()

root
 |-- Store: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- Sales: integer (nullable = true)
 |-- Customers: integer (nullable = true)
 |-- Open: integer (nullable = true)
 |-- Promo: integer (nullable = true)
 |-- StateHoliday: string (nullable = true)
 |-- SchoolHoliday: integer (nullable = true)
 |-- SalesPerCustomer: double (nullable = true)
 |-- StoreType: string (nullable = true)
 |-- Assortment: string (nullable = true)
 |-- CompetitionDistance: integer (nullable = true)
 |-- CompetitionOpenSinceMonth: integer (nullable = true)
 |-- CompetitionOpenSinceYear: integer (nullable = true)
 |-- Promo2: integer (nullable = true)
 |-- Promo2SinceWeek: integer (nullable = true)
 |-- Promo2SinceYear: integer (nullable = true)
 |-- PromoInterval: string (nullable = true)
 |-- avg(customers): double (nullable = true)
 |-- avg(sales): double (nullable = true)
 |-- avg(salespercustomer): double (nullable = true)



#### Data Cleaning

In [10]:
# Now lets find out if there any nulls in the data
from pyspark.sql.functions import *
join.select([count(when(col(c).isNull(), c)).alias(c) for c in join.columns]).show()

+-----+---------+----+-----+---------+----+-----+------------+-------------+----------------+---------+----------+-------------------+-------------------------+------------------------+------+---------------+---------------+-------------+--------------+----------+---------------------+
|Store|DayOfWeek|Date|Sales|Customers|Open|Promo|StateHoliday|SchoolHoliday|SalesPerCustomer|StoreType|Assortment|CompetitionDistance|CompetitionOpenSinceMonth|CompetitionOpenSinceYear|Promo2|Promo2SinceWeek|Promo2SinceYear|PromoInterval|avg(customers)|avg(sales)|avg(salespercustomer)|
+-----+---------+----+-----+---------+----+-----+------------+-------------+----------------+---------+----------+-------------------+-------------------------+------------------------+------+---------------+---------------+-------------+--------------+----------+---------------------+
|    0|        0|   0|    0|        0|   0|    0|           0|            0|           12212|        0|         0|                162|     

We can see that they are missing values in  few of the columns.
Missing Values is one of the most common problem faced during Data Cleaning/ exploratory data analysis.
- There are several techniques to handle missing data.
  - We can drop the variables or records in  the data if the no of records are less and their impact on the data is less.
  - We can also impute them  with mean, median or zero based on the data patterns 
  - Later we can also use PCA(Principal Component Analysis) or interpolation techniques.

In [11]:
# For now let fill the columns with null values using '0'
join=join.fillna({'PromoInterval':0, 'Promo2SinceYear':0, 'Promo2SinceWeek':0,'CompetitionOpenSinceYear':0,'SalesPerCustomer':0,'CompetitionOpenSinceMonth':0,'StoreType':0,'CompetitionDistance':0})

### Feature Engineering
Feature engineering is the process of using domain knowledge of the data to create features that make machine learning algorithms work. It is fundamental to the application of machine learning and helps in increasing the accuracy of the model. It is really essential in creating the right features.

I will be using Random Forest Regressor model for this data.
So, I split the date into discrete components so the decision trees were able to make better guesses. 
I also tried using months since the competition has started and how long the store was in promotion since.

Additionally, I also built three  other features - Average sales, customers and sales per customers. 

In [12]:
# Creating new columns by Extracting year, week, month, day from the date column
from pyspark.sql.functions import *
join=join.withColumn('Day',dayofmonth('date')).withColumn('Month',month('date'))\
     .withColumn('Year',year('date')).withColumn('Week',weekofyear('date'))


In [13]:
# Number of months that competition has existed for  and  Number of weeks that promotion has existed for   
join=join.withColumn('MonthsCompetitionOpen',12*(join['Year']-join['CompetitionOpenSinceYear'])+(join['Month']-join['CompetitionOpenSinceMonth']))\
     .withColumn('WeeksPromoOpen',12*(join['Year']-join['Promo2SinceYear'])+(join['Week']-join['Promo2SinceWeek']))
join=join.withColumn('WeeksPromoOpen',when((col('Promo2SinceYear')==0),0).otherwise(join['WeeksPromoOpen']))\
      .withColumn('MonthsCompetitionOpen',when((col('CompetitionOpenSinceYear')==0),0).otherwise(join['MonthsCompetitionOpen']))

### Let split the data into train and test set


In [14]:
join_train = join.filter((join.Date<'2015-01-30'))
join_valid = join.filter((join.Date>='2015-01-30'))

### Dealing with Categorical and Continous Features

In [15]:
# Defing Categorical and Contionous Variables in the dataset
cat_vars = ['Store', 'Week', 'Year', 'Month', 'Day', 'DayOfWeek','StateHoliday', 'CompetitionOpenSinceMonth',
    'Promo2SinceWeek', 'StoreType', 'Assortment', 'PromoInterval', 'CompetitionOpenSinceYear', 'Promo2SinceYear',
    'MonthsCompetitionOpen','WeeksPromoOpen','Promo','SchoolHoliday']

contin_vars = ['SalesPerCustomer','Customers' ]

#### Pipeline
A Spark Pipeline is specified as a sequence of stages, and each stage is either a Transformer or an Estimator. These stages are run in order, and the input DataFrame is transformed as it passes through each stage.
The Pipeline API, introduced in Spark 1.2, is a high-level API for MLlib. Inspired by the popular implementation in scikit-learn, the concept of Pipelines is to facilitate the creation, tuning, and inspection of practical ML workflows.
#### For categorical data lets apply String Indexer 
String Indexer- Used to convert string columns into numeric.It encodes a string column of labels to a column of label indices. The indices are in [0, numLabels), ordered by label frequencies, so the most frequent label gets index 0. String Indexer Functioning is some what similar to Label Encoder from Scikit-Learn.


- In the below code, Indexers is pipeline with a series of string Indexers applied on columns that are defined as the categorical variables

In [16]:
#Import the libraries for string indexing and applying it to categorical values
from pyspark.ml import Pipeline
from pyspark.ml.feature import  StringIndexer, VectorAssembler, StandardScaler
indexers = [StringIndexer(inputCol=cat_var,outputCol=cat_var+ "Index4",handleInvalid="skip") for cat_var in cat_vars]
#defining the model features into a variable
features_set=[cat_var+ "Index4" for cat_var in cat_vars] 



#### Standard Scaler on Continous Values
Centering and Scaling happen independently on each feature by computing the relevant statistics on the samples in the training set. Mean and standard deviation are then stored to be used on later data using the transform method.
Standardization of a dataset is a common requirement for many machine learning estimators: they might behave badly if the individual feature does not more or less look like standard normally distributed data.

- For using the standard scaler in the spark the input data must be in the form of vectors. lets apply vector assembler before passing it into the model.
#### VectorAssembler
VectorAssembler is a transformer that combines a given list of columns into a single vector column. It is useful for combining raw features and features generated by different feature transformers into a single feature vector, in order to train ML models like logistic regression and decision trees.

In [17]:
cont_assembler = VectorAssembler( inputCols = contin_vars, outputCol = "contin_test_vars")
contin_scaler = StandardScaler(  inputCol="contin_test_vars", outputCol="scaledFeatures",  withStd=True, withMean=True)
features_set.append('scaledFeatures')


#### Importing VectorAssembler and creating our Features
We must transform our data using the VectorAssembler function to a single column where each row of the DataFrame contains a feature vector. In order to predict, we need to select columns based on which we will then create our features column. 

In [18]:
assembler=VectorAssembler( inputCols = features_set, outputCol = "features")


In [19]:
# Making sure that there are no nulls in the training data
from pyspark.sql.functions import *
join_train.select([count(when(col(c).isNull(), c)).alias(c) for c in join.columns]).show()

+-----+---------+----+-----+---------+----+-----+------------+-------------+----------------+---------+----------+-------------------+-------------------------+------------------------+------+---------------+---------------+-------------+--------------+----------+---------------------+---+-----+----+----+---------------------+--------------+
|Store|DayOfWeek|Date|Sales|Customers|Open|Promo|StateHoliday|SchoolHoliday|SalesPerCustomer|StoreType|Assortment|CompetitionDistance|CompetitionOpenSinceMonth|CompetitionOpenSinceYear|Promo2|Promo2SinceWeek|Promo2SinceYear|PromoInterval|avg(customers)|avg(sales)|avg(salespercustomer)|Day|Month|Year|Week|MonthsCompetitionOpen|WeeksPromoOpen|
+-----+---------+----+-----+---------+----+-----+------------+-------------+----------------+---------+----------+-------------------+-------------------------+------------------------+------+---------------+---------------+-------------+--------------+----------+---------------------+---+-----+----+----+------

In [20]:
# Adding the above stages to pipeline
indexers.append(cont_assembler)
indexers.append(contin_scaler)
indexers.append(assembler)


#### Random Forest 
Random Forest is an ensembling technique used for classification and regression.  It operates by building a large number of decision trees at training and outputting the predicted value or class of the indiviadual trees in order to reduce the risk of overfitting.




In [21]:
# Import the libraries for random forest
from pyspark.ml.regression import RandomForestRegressor
rf = RandomForestRegressor(labelCol="Sales", featuresCol="features", maxBins=1500)
# append the model stage in to the indexer
indexers.append(rf)
#define the indexers as the pipeline
pipeline = Pipeline(stages=indexers)
# print the stages defined in the pipeline
print(indexers)
# fit the stages in a pipeline
model=pipeline.fit(join_train)

[StringIndexer_48fe8aeef031f2e0f5e4, StringIndexer_4bfe97418705f84e70f7, StringIndexer_49b6804941edff26ab15, StringIndexer_4f178fb54e16cb99c2b7, StringIndexer_4a2c8ef1b40d90a2ae5a, StringIndexer_4808b93801b11f2fe43f, StringIndexer_4310b1219eb8d07ac995, StringIndexer_4ff0931279fb92e72db9, StringIndexer_4408bfbe956ba33edbc0, StringIndexer_4fc599c1e67378d2244c, StringIndexer_47dda37493c7859637c2, StringIndexer_451bbca50870a3605169, StringIndexer_44c4b226ef28af6b02f3, StringIndexer_4be5ae39397444d05a0b, StringIndexer_42aebec7298b58c9b9bc, StringIndexer_441aa31c286dc6a381d2, StringIndexer_4ec9b9771c1bcf4be0db, StringIndexer_42be9815af3150847198, VectorAssembler_4ef095bfd882c5c1e40e, StandardScaler_4185bdd313b07c3814db, VectorAssembler_4b80b463e3b5089a2ba2, RandomForestRegressor_4392803d0bf90dd9096d]


In [22]:
# Predict the values on the test set
predictions =model.transform(join_valid)

In [23]:
#select the columns to display
cols=['Store','Date', 'Sales','prediction']
predictions.select(cols).show()

+-----+-------------------+-----+------------------+
|Store|               Date|Sales|        prediction|
+-----+-------------------+-----+------------------+
|    1|2015-01-30 00:00:00| 5577|5845.1198334143455|
|    2|2015-01-30 00:00:00| 5919| 6098.789365477029|
|    3|2015-01-30 00:00:00| 6911|6764.8572665876945|
|    4|2015-01-30 00:00:00|13307|11466.169721704722|
|    5|2015-01-30 00:00:00| 5640| 5923.680647830589|
|    6|2015-01-30 00:00:00| 6555| 6288.475040140846|
|    7|2015-01-30 00:00:00|11430|10436.872705071195|
|    8|2015-01-30 00:00:00| 6401| 6666.074368751535|
|    9|2015-01-30 00:00:00| 8072| 6944.267589050096|
|   10|2015-01-30 00:00:00| 6350|6341.4473516140115|
|   11|2015-01-30 00:00:00|10031| 9542.229182267922|
|   12|2015-01-30 00:00:00| 9156| 9085.144163716632|
|   13|2015-01-30 00:00:00| 7004| 5939.529759894327|
|   14|2015-01-30 00:00:00| 6491| 6453.847984811469|
|   15|2015-01-30 00:00:00| 8898| 8345.240343645364|
|   16|2015-01-30 00:00:00| 9546|  8972.018988

#### Regression Evaluator
Available metrics
- Mean Squared Error
- Root Mean Squared Error
- Mean Absolute Error
- Co-efficient of determination(R2)
- Explained Variance

we will be using root mean sqaure error for our evaluation.

RMSE= sqrt(sum(outputvalue-predictedvalue)^2/no of samples)


In [24]:
# compute the test error
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import mean, min, max,stddev,ceil
evaluator = RegressionEvaluator(    labelCol="Sales", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(" Root Mean Square Error  on test data = %g" % rmse)

 Root Mean Square Error  on test data = 1393.67


This article is to give a basic overview on PySpark and How it can be used for machine learning models.
There is still scope for decreasing the error by tuning parameters. Fortunately, Spark’s MLlib contains a CrossValidator tool that makes tuning hyperparameters a little less painful. The CrossValidator can be used with any algorithm supported by MLlib.