In [30]:
from pyspark.mllib.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorIndexer, VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
#from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.types import Row
from pyspark.sql.functions import col, round
import pandas as pd
import functools 
from pyspark.ml.feature import OneHotEncoder
from pyspark.sql.types import DoubleType
from pyspark.sql import SparkSession

In [31]:
## Attaching online sales csv file to Sparck Context
online_sales = sc.textFile("online_shoppers_intention.csv")

In [32]:
## Data exploration
online_sales.first()
for i in online_sales.take(10): print(i)

Administrative,Administrative_Duration,Informational,Informational_Duration,ProductRelated,ProductRelated_Duration,BounceRates,ExitRates,PageValues,SpecialDay,Month,OperatingSystems,Browser,Region,TrafficType,VisitorType,Weekend,Revenue
0,0,0,0,1,0,0.2,0.2,0,0,Feb,1,1,1,1,Returning_Visitor,FALSE,FALSE
0,0,0,0,2,64,0,0.1,0,0,Feb,2,2,1,2,Returning_Visitor,FALSE,FALSE
0,-1,0,-1,1,-1,0.2,0.2,0,0,Feb,4,1,9,3,Returning_Visitor,FALSE,FALSE
0,0,0,0,2,2.666666667,0.05,0.14,0,0,Feb,3,2,2,4,Returning_Visitor,FALSE,FALSE
0,0,0,0,10,627.5,0.02,0.05,0,0,Feb,3,3,1,4,Returning_Visitor,TRUE,FALSE
0,0,0,0,19,154.2166667,0.015789474,0.024561404,0,0,Feb,2,2,1,3,Returning_Visitor,FALSE,FALSE
0,-1,0,-1,1,-1,0.2,0.2,0,0.4,Feb,2,4,3,3,Returning_Visitor,FALSE,FALSE
1,-1,0,-1,1,-1,0.2,0.2,0,0,Feb,1,2,1,5,Returning_Visitor,TRUE,FALSE
0,0,0,0,2,37,0,0.1,0,0.8,Feb,2,2,2,3,Returning_Visitor,FALSE,FALSE


In [None]:
online_sales.count()

In [None]:
## Split data columns for pre analysis

In [33]:
x = "Administrative,Administrative_Duration,Informational,Informational_Duration,ProductRelated,ProductRelated_Duration,BounceRates,ExitRates,PageValues,SpecialDay,Month,OperatingSystems,Browser,Region,TrafficType,VisitorType,Weekend,Revenue"

In [34]:
x.split(",")

['Administrative',
 'Administrative_Duration',
 'Informational',
 'Informational_Duration',
 'ProductRelated',
 'ProductRelated_Duration',
 'BounceRates',
 'ExitRates',
 'PageValues',
 'SpecialDay',
 'Month',
 'OperatingSystems',
 'Browser',
 'Region',
 'TrafficType',
 'VisitorType',
 'Weekend',
 'Revenue']

In [38]:
(x.split(",")[10], (x.split(",")[4]))

('Month', 'ProductRelated')

In [None]:
## Defining sales RDD for related online sales

In [37]:
related_saleRDD = online_sales.map(lambda x: (x.split(",")[10], x.split(",")[4]))
for i in related_saleRDD.take(10): print(i)

('Month', 'ProductRelated')
('Feb', '1')
('Feb', '2')
('Feb', '1')
('Feb', '2')
('Feb', '10')
('Feb', '19')
('Feb', '1')
('Feb', '1')
('Feb', '2')


In [39]:
salesRDD = online_sales.filter(lambda x: "Sale" not in x).map(lambda x: x.split(","))

In [40]:
## Checking Month and Related Product sales
top_related_saleRDD = salesRDD.map(lambda x: (x[10],x[4])).groupByKey().mapValues(max)
top_related_saleRDD.collect()

[('Month', 'ProductRelated'),
 ('May', '99'),
 ('Feb', '90'),
 ('Mar', '98'),
 ('Oct', '97'),
 ('June', '99'),
 ('Jul', '98'),
 ('Aug', '98'),
 ('Nov', '99'),
 ('Sep', '98'),
 ('Dec', '98')]

In [41]:
## Checking VisitorType, Month and Related Product
grouped_salesRDD = salesRDD.map(lambda x: ((x[15],x[10]),x[4]))
grouped_salesRDD.take(10)

[(('VisitorType', 'Month'), 'ProductRelated'),
 (('Returning_Visitor', 'Feb'), '1'),
 (('Returning_Visitor', 'Feb'), '2'),
 (('Returning_Visitor', 'Feb'), '1'),
 (('Returning_Visitor', 'Feb'), '2'),
 (('Returning_Visitor', 'Feb'), '10'),
 (('Returning_Visitor', 'Feb'), '19'),
 (('Returning_Visitor', 'Feb'), '1'),
 (('Returning_Visitor', 'Feb'), '1'),
 (('Returning_Visitor', 'Feb'), '2')]

In [42]:
## Assesing least VisitorType, Month and Related Product
least_grouped_salesRDD = salesRDD.map(lambda x: ((x[15],x[10]),x[4])).groupByKey().mapValues(min)
least_grouped_salesRDD.take(10)

[(('Returning_Visitor', 'Feb'), '1'),
 (('Returning_Visitor', 'Mar'), ''),
 (('New_Visitor', 'May'), '0'),
 (('Returning_Visitor', 'Oct'), '0'),
 (('Returning_Visitor', 'June'), '0'),
 (('Returning_Visitor', 'Jul'), '1'),
 (('Returning_Visitor', 'Aug'), '0'),
 (('Returning_Visitor', 'Nov'), '0'),
 (('Returning_Visitor', 'Sep'), '1'),
 (('Other', 'June'), '222')]

In [43]:
## Assesing top VisitorType, Month and Related Product
top_grouped_salesRDD = salesRDD.map(lambda x: ((x[15],x[10]),x[4])).groupByKey().mapValues(max)
top_grouped_salesRDD.take(10)

[(('Returning_Visitor', 'Feb'), '90'),
 (('Returning_Visitor', 'Mar'), '98'),
 (('New_Visitor', 'May'), '9'),
 (('Returning_Visitor', 'Oct'), '97'),
 (('Returning_Visitor', 'June'), '99'),
 (('Returning_Visitor', 'Jul'), '98'),
 (('Returning_Visitor', 'Aug'), '98'),
 (('Returning_Visitor', 'Nov'), '99'),
 (('Returning_Visitor', 'Sep'), '98'),
 (('Other', 'June'), '222')]

In [44]:
## Top monthly bounce rate values
bouned_RDD = salesRDD.map(lambda x: (x[10],x[6])).groupByKey().mapValues(max)
bouned_RDD.take(10)

[('Month', 'BounceRates'),
 ('May', '7.09E-05'),
 ('Feb', '0.2'),
 ('Mar', '9.83E-05'),
 ('Oct', '3.83E-05'),
 ('June', '0.2'),
 ('Jul', '3.94E-05'),
 ('Aug', '2.73E-05'),
 ('Nov', '8.14E-05'),
 ('Sep', '0.2')]

In [45]:
## Top bounce rate by visitor type
top_visit_bounced_RDD = salesRDD.map(lambda x: (x[6],x[15])).groupByKey().mapValues(max)
top_visit_bounced_RDD.take(10)

[('BounceRates', 'VisitorType'),
 ('0.2', 'Returning_Visitor'),
 ('0', 'Returning_Visitor'),
 ('0.05', 'Returning_Visitor'),
 ('0.015789474', 'Returning_Visitor'),
 ('0.04', 'Returning_Visitor'),
 ('0.014285714', 'Returning_Visitor'),
 ('0.028571429', 'Returning_Visitor'),
 ('0.011111111', 'Returning_Visitor'),
 ('0.003703704', 'Returning_Visitor')]

In [46]:
## Grouping data by 'Month' sum aggregation
df.groupBy("Month").sum().show()

+-----+----------+-------------------+------------------+-------------------+
|Month|sum(label)|sum(Administrative)|sum(Informational)|sum(ProductRelated)|
+-----+----------+-------------------+------------------+-------------------+
|  Oct|     107.0|               1897|               268|              17275|
|  Sep|      83.0|               1427|               254|              14119|
|  Dec|     191.0|               3451|               862|              38947|
|  Aug|      75.0|               1311|               232|              15557|
|  May|     296.0|               5417|              1346|              60631|
| June|      29.0|                635|               162|               9384|
|  Feb|       3.0|                 98|                16|               1366|
|  Nov|     646.0|               7214|              1904|             117159|
|  Mar|     179.0|               3003|               781|              26627|
|  Jul|      64.0|               1010|               223|       

In [47]:
## Dataframe online_shoppers sales in pandas
pd.DataFrame(df.take(5), columns=df.columns)

Unnamed: 0,label,features,Administrative,Informational,ProductRelated,Month,VisitorType,Revenue
0,0.0,"(2.0, 0.0, 6.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, ...",2,0,6,Mar,New_Visitor,False
1,0.0,"(4.0, 0.0, 12.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0,...",4,0,12,Mar,Returning_Visitor,False
2,1.0,"(2.0, 2.0, 37.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0,...",2,2,37,Mar,Returning_Visitor,True
3,0.0,"(2.0, 0.0, 49.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0,...",2,0,49,Mar,Returning_Visitor,False
4,1.0,"(5.0, 0.0, 18.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0,...",5,0,18,Mar,Returning_Visitor,True


In [48]:
## Summary statistics for all variables
df.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
label,6715,0.24914370811615785,0.4325494002587845,0.0,1.0
Administrative,6715,3.7919583023082652,3.7460408202894766,0,27
Informational,6715,0.9006701414743112,1.6046897953639372,0,24
ProductRelated,6715,46.98093819806404,54.37872022217047,0,705
Month,6715,,,Aug,Sep
VisitorType,6715,,,New_Visitor,Returning_Visitor
Revenue,6715,,,false,true


In [49]:
## Summary statistics for numeric variables
numeric_features = [t[0] for t in df.dtypes if t[1] == "int"]
df.select(numeric_features).describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
Administrative,6715,3.7919583023082652,3.7460408202894766,0,27
Informational,6715,0.9006701414743112,1.6046897953639372,0,24
ProductRelated,6715,46.98093819806404,54.37872022217047,0,705


In [None]:
## Online_sales in DataFrames

In [3]:
spark = SparkSession.builder.appName("groupbyagg").getOrCreate()
df = spark.read.csv('online_shoppers_intention.csv', header = True, inferSchema = True)
spark

In [4]:
df.describe()
df.printSchema()

root
 |-- Administrative: integer (nullable = true)
 |-- Administrative_Duration: double (nullable = true)
 |-- Informational: integer (nullable = true)
 |-- Informational_Duration: double (nullable = true)
 |-- ProductRelated: integer (nullable = true)
 |-- ProductRelated_Duration: double (nullable = true)
 |-- BounceRates: double (nullable = true)
 |-- ExitRates: double (nullable = true)
 |-- PageValues: double (nullable = true)
 |-- SpecialDay: double (nullable = true)
 |-- Month: string (nullable = true)
 |-- OperatingSystems: integer (nullable = true)
 |-- Browser: integer (nullable = true)
 |-- Region: integer (nullable = true)
 |-- TrafficType: integer (nullable = true)
 |-- VisitorType: string (nullable = true)
 |-- Weekend: boolean (nullable = true)
 |-- Revenue: boolean (nullable = true)



In [5]:
## Filling missing values
df = df.na.fill(0)

In [6]:
df = df.withColumn('Revenue', df['Revenue'].cast("string"))

In [7]:
## Selecting a subset of the Dataframe for interest columns
cols_select = ['Administrative','Informational','ProductRelated','Month','VisitorType','Revenue']
df = df.select(cols_select).dropDuplicates()

In [8]:
## One Hot Encoding
column_vec_in = ['Month','VisitorType']
column_vec_out = ['Month_vec','VisitorType_vec']

In [9]:
indexers = [StringIndexer(inputCol=x, outputCol=x+'_tmp') for x in column_vec_in]
encoders = [OneHotEncoder(dropLast=False, inputCol=x+'_tmp', outputCol=y)
                         for x,y in zip(column_vec_in,column_vec_out)]
tmp = [[i,j] for i,j in zip(indexers,encoders)]
tmp = [i for sublist in tmp for i in sublist]

In [10]:
## Prepare labeled sets
cols_now = ['Administrative','Informational','ProductRelated','Month_vec','VisitorType_vec']
assembler_features = VectorAssembler(inputCols=cols_now, outputCol='features')
labelIndexer = StringIndexer(inputCol='Revenue', outputCol='label')
tmp += [assembler_features, labelIndexer]
#pipeline = Pipeline(stages=tmp)

In [11]:
pipeline = Pipeline(stages = tmp)
pipelineModel = pipeline.fit(df)
df = pipelineModel.transform(df)
selectedCols = ['label', 'features'] + cols_select
df = df.select(selectedCols)
df.printSchema()


root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- Administrative: integer (nullable = true)
 |-- Informational: integer (nullable = true)
 |-- ProductRelated: integer (nullable = true)
 |-- Month: string (nullable = true)
 |-- VisitorType: string (nullable = true)
 |-- Revenue: string (nullable = true)



In [12]:
## To view the new columns 'features' and 'label'
pd.DataFrame(df.take(5), columns=df.columns).transpose()

Unnamed: 0,0,1,2,3,4
label,0,0,1,0,1
features,"(2.0, 0.0, 6.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, ...","(4.0, 0.0, 12.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0,...","(2.0, 2.0, 37.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0,...","(2.0, 0.0, 49.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0,...","(5.0, 0.0, 18.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0,..."
Administrative,2,4,2,2,5
Informational,0,0,2,0,0
ProductRelated,6,12,37,49,18
Month,Mar,Mar,Mar,Mar,Mar
VisitorType,New_Visitor,Returning_Visitor,Returning_Visitor,Returning_Visitor,Returning_Visitor
Revenue,false,false,true,false,true


In [None]:
## Splitting Dataset for training and testing

In [13]:
train, test = df.randomSplit([0.7, 0.3], seed = 0)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

Training Dataset Count: 4651
Test Dataset Count: 2064


In [None]:
## Running the RANDOM FOREST CLASSIFICATION MODEL

In [14]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label', numTrees = 200)
rfModel = rf.fit(train)
predictions = rfModel.transform(test)


In [15]:
predictions.select('Administrative', 'ProductRelated', 'label', 'rawPrediction', 'prediction', 'probability').show(10)

+--------------+--------------+-----+--------------------+----------+--------------------+
|Administrative|ProductRelated|label|       rawPrediction|prediction|         probability|
+--------------+--------------+-----+--------------------+----------+--------------------+
|             4|            14|  0.0|[155.874360704383...|       0.0|[0.77937180352191...|
|             2|            20|  0.0|[155.185559155465...|       0.0|[0.77592779577732...|
|             1|            16|  0.0|[154.195095267891...|       0.0|[0.77097547633945...|
|             5|             5|  0.0|[143.429584876497...|       0.0|[0.71714792438248...|
|             2|            21|  0.0|[154.222486798409...|       0.0|[0.77111243399204...|
|             0|            35|  0.0|[154.576862414839...|       0.0|[0.77288431207419...|
|             4|            23|  1.0|[156.058879482101...|       0.0|[0.78029439741050...|
|             2|            37|  1.0|[155.707159882340...|       0.0|[0.77853579941170...|

In [None]:
## Evaluating our RFModel for Accuracy

In [17]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

Test Area Under ROC: 0.614812661277757


In [None]:
## Running the GRADIENT BOOSTED TREE CLASSIFICATION MODEL
## May take quite some time in running

In [58]:
from pyspark.ml.classification import GBTClassifier

gbt = GBTClassifier(maxIter=10)
gbtModel = gbt.fit(train)
predictions = gbtModel.transform(test)
predictions.select('Administrative', 'ProductRelated', 'label', 'rawPrediction', 'prediction', 'probability').show(10)

+--------------+--------------+-----+--------------------+----------+--------------------+
|Administrative|ProductRelated|label|       rawPrediction|prediction|         probability|
+--------------+--------------+-----+--------------------+----------+--------------------+
|             4|            14|  0.0|[0.50260806970813...|       0.0|[0.73208289701911...|
|             2|            20|  0.0|[0.58113045154732...|       0.0|[0.76174328995702...|
|             1|            16|  0.0|[0.59326393194864...|       0.0|[0.76611950040422...|
|             5|             5|  0.0|[0.66309947980641...|       0.0|[0.79021120306205...|
|             2|            21|  0.0|[0.53495182711649...|       0.0|[0.74457859309489...|
|             0|            35|  0.0|[0.65425015902294...|       0.0|[0.78726209661600...|
|             4|            23|  1.0|[0.58113045154732...|       0.0|[0.76174328995702...|
|             2|            37|  1.0|[0.68847500430430...|       0.0|[0.79850071154366...|

In [None]:
## Evaluating our RFModel for Accuracy

In [59]:
evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

Test Area Under ROC: 0.6358780540639143


In [None]:
## Gradient Boosted Tree Achieved Best Results ##