# Project : ClickStream Data Analysis Using YooChoose Competition Data
## Data Scientist : Kemal Emre Çolak
### Problem Derivation-SubProblem = This problem was a question that getting asked by YooChoose Competition Community. Unfortunately, there is no latest summission file that I can check my solution. 

#### Problem: Predict that user will buy or not in this session with given ClickStream Data.

Note that this is analysis phase of full project. Project is implemented using Scala-Frameless(Spark) by using a lot of patterns and use cases. Reason behind I use Frameless is Type-Safetiness of DataFrames-DataSets.

In [1]:
// Defining imports
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator,MulticlassClassificationEvaluator}
import org.apache.spark.ml.feature.{StringIndexer,StandardScaler,MinMaxScaler,MaxAbsScaler}
import org.apache.spark.ml.classification.{LogisticRegression,BinaryLogisticRegressionSummary,RandomForestClassifier,DecisionTreeClassifier,GBTClassifier}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.rand
import org.apache.spark.ml.{Pipeline, PipelineStage}
import org.apache.spark.ml.tuning.{CrossValidator, CrossValidatorModel, ParamGridBuilder}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.expressions.Window
import org.apache.spark.ml.feature.{VectorAssembler,ChiSqSelector}


Intitializing Scala interpreter ...

Spark Web UI available at http://DESKTOP-OF67SJU:4040
SparkContext available as 'sc' (version = 2.4.5, master = local[*], app id = local-1602135701279)
SparkSession available as 'spark'


import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator, MulticlassClassificationEvaluator}
import org.apache.spark.ml.feature.{StringIndexer, StandardScaler, MinMaxScaler, MaxAbsScaler}
import org.apache.spark.ml.classification.{LogisticRegression, BinaryLogisticRegressionSummary, RandomForestClassifier, DecisionTreeClassifier, GBTClassifier}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.rand
import org.apache.spark.ml.{Pipeline, PipelineStage}
import org.apache.spark.ml.tuning.{CrossValidator, CrossValidatorModel, ParamGri...

In [2]:
// Loading ClickStreamData and transform it case class(sessID:string,date:string,itemID:string,type:string) format.
val cStreamDF = spark.read.format("csv")
    .option("header","TRUE")
    .option("delimeter",",")
    .load("yoochoose-clicks.dat")
    .toDF("sessID","date","itemID","type")

cStreamDF: org.apache.spark.sql.DataFrame = [sessID: string, date: string ... 2 more fields]


In [3]:
// Loading ClickStreamSession Buy Data and transform it case class(sessID:string,date:string,itemID:string,price:int,qty:string) format.
val cStreamBasketDF = spark.read.format("csv")
    .option("header","TRUE")
    .option("delimeter",",")
    .load("yoochoose-buys.dat")
    .toDF("sessID","date","itemID","price","qty")

cStreamBasketDF: org.apache.spark.sql.DataFrame = [sessID: string, date: string ... 3 more fields]


In [4]:
// Date, hour, day informations are extracted once before using spark  regex and date functions.
def dateInfExtractor(df: DataFrame): DataFrame = df.withColumn("date", regexp_replace(regexp_extract(col("date"),
"[0-9]{4}-[0-9]{2}-[0-9]{2}[A-Za-z][0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{3}", 0), "[A-Za-z]"," "))
                                            .withColumn("sDate", regexp_extract(col("date"),"[0-9]{4}-[0-9]{2}-[0-9]{2}",0))
                                            .withColumn("day",date_format(col("date"),"E"))
                                            .withColumn("hour",hour(col("date")))
                                            .withColumn("hStatus"
                                            ,when(hour(col("date"))>=2 && hour(col("date"))<6, "latenight")
                                            .when(hour(col("date"))>=6 && hour(col("date"))<12, "morning")
                                            .when(hour(col("date"))>=12 && hour(col("date"))<18, "afternoon")
                                            .when(hour(col("date"))>=18 && hour(col("date"))<=22, "evening")
                                            .otherwise("midnight")
                                            )

dateInfExtractor: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame


In [5]:
// Because of buying behaviour of user, I tried to check days in order to create baseline model.
def weekTypeExtractor(df : DataFrame): DataFrame = df.withColumn("weekend",
                                                                when(col("day")==="Sun" || col("day")==="Sat",1)
                                                               otherwise(0))

weekTypeExtractor: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame


In [6]:
// Looking for is there any relationship between another subcategory of type Column.
def specOfferExtractor(df:DataFrame) : DataFrame  = df.withColumn("specOffer"
                                                                ,when(col("type")==="S", "special")
                                                                .when(col("type")===0, "missing")
                                                                .when(length(col("type"))>=8, "brand")
                                                                .otherwise("normal")
                                                               )            

specOfferExtractor: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame


In [7]:
def sessAverageItemPriceViewed(dfStream: DataFrame) ={
    val cStreamBasketDFBeta = cStreamBasketDF
    .select("itemID","price")
    .dropDuplicates("itemID")
    .withColumnRenamed("itemID","itemID1")
    
    val updatedDFStream = dfStream.join(cStreamBasketDFBeta,dfStream("itemID")===cStreamBasketDFBeta("itemID1"),"left").drop("itemID1")
    .withColumn("price",col("price").cast(DoubleType)).groupBy("sessID").agg(sum("price"),count("sessID"),avg("price"),stddev("price"))
    .withColumnRenamed("sessID","sessID1").withColumnRenamed("count(sessID)","nOfPVis")
    
    dfStream.join(updatedDFStream,dfStream("sessID")===updatedDFStream("sessID1"),"inner").drop("sessID1")
}

sessAverageItemPriceViewed: (dfStream: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame


In [8]:
def sessAverageTimeBSessions(df: DataFrame)  ={
    val names = Window.partitionBy('sessID).orderBy('date.asc)

    val finalPrDF = df
      .withColumn("nexttime", lead(col("date"), 1) over names)
      .withColumn("timeBSess", datediff(col("nexttime"),
        col("date")).cast(IntegerType)).groupBy("sessID").agg(avg("timeBSess"))
       .withColumnRenamed("sessID","sessID1")
    
  df.join(finalPrDF,df("sessID")===finalPrDF("sessID1"),"inner").drop("sessID1")
}

sessAverageTimeBSessions: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame


In [9]:
// How much user spent time in each session?
def timeSpentEachSession(df: DataFrame) = df.join(df.groupBy("sessID").agg(min("date"),max("date"))
                                .withColumn("passedInSess(minute)", (to_timestamp(col("max(date)")).cast(LongType)-
                                                                           to_timestamp(col("min(date)")).cast(LongType))/60)
                                .drop("min(date)","max(date)"),Seq("sessID"))

timeSpentEachSession: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame


In [10]:
// How many click gets the item in this day?
def itemDayPopularityExtractor(df:DataFrame) = {
    val df1 = df.groupBy("sDate","itemID").count()
            .withColumnRenamed("sDate","sDate1")
            .withColumnRenamed("itemID","itemID1")
    df.join(df1,df("sDate")===df1("sDate1") && df("itemID")===df1("itemID1")).drop("sDate1","itemID1")
    .withColumnRenamed("count","clicksInDay")
}

itemDayPopularityExtractor: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame


In [11]:
// How many click gets the item for all times?
def itemGeneralPopularityExtractor(df:DataFrame) = {
    val df1 = df.groupBy("itemID").count()
            .withColumnRenamed("itemID","itemID1")
    df.join(df1,df("itemID")===df1("itemID1")).drop("itemID1").withColumnRenamed("count","clicksInAll")
}

itemGeneralPopularityExtractor: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame


In [12]:
// How popular is day?
def dayPopularityExtractor(df:DataFrame) = {
    val df1 = df
    .groupBy("sDate")
    .count()
    .withColumnRenamed("sDate","sDate1")
    .withColumnRenamed("count","dayPopularity")
    df.join(df1,df("sDate")===df1("sDate1")).drop("sDate1")
}

dayPopularityExtractor: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame


In [13]:
def userPathExtractor(df:DataFrame) = df.select("sessID","itemID","fullDate","type")
    .withColumn("userPath",collect_set("type"))

userPathExtractor: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame


In [14]:
// Functional programming concepts to function composition
val dateExtractCombinerCStreamBase = dateInfExtractor _ andThen 
                          weekTypeExtractor _ andThen 
                          specOfferExtractor _ andThen 
                          timeSpentEachSession _ andThen
                          dayPopularityExtractor _ andThen
                          itemDayPopularityExtractor _ andThen
                          itemGeneralPopularityExtractor _ 
// Since buy data for this problem only will be used for labelling, there is no need different extractor for Purchase Data

dateExtractCombinerCStreamBase: org.apache.spark.sql.DataFrame => org.apache.spark.sql.DataFrame = <function1>


In [15]:
val dateInformationsExtractedDF = dateExtractCombinerCStreamBase(cStreamDF)

dateInformationsExtractedDF: org.apache.spark.sql.DataFrame = [sessID: string, date: string ... 12 more fields]


In [16]:
val messyPreDF = dateInformationsExtractedDF
    .join(cStreamBasketDF.groupBy("sessID").count(),Seq("sessID"),"left")
    .withColumnRenamed("count","purchased")
    .withColumn("purchased"
    ,when(col("purchased")>0,1)
    .otherwise(0))
    .dropDuplicates("sessID")
    .drop("itemID")
    .drop("date")
    .drop("sDate")

messyPreDF: org.apache.spark.sql.DataFrame = [sessID: string, type: string ... 10 more fields]


In [14]:
println("When there is not a purchase, count of category types : ")
messyPreDF.filter(col("purchased")===0).groupBy("specOffer").count().show()
println("When there is a purchase, count of category types : ")
messyPreDF.filter(col("purchased")===1).groupBy("specOffer").count().show()

When there is not a purchase, count of category types : 
+---------+-------+
|specOffer|  count|
+---------+-------+
|  special|3054666|
|   normal|1548626|
|    brand|  18074|
|  missing|4118667|
+---------+-------+

When there is a purchase, count of category types : 
+---------+------+
|specOffer| count|
+---------+------+
|  special|168369|
|   normal| 89143|
|    brand|   730|
|  missing|251454|
+---------+------+



In [15]:
println("When there is not a purchase, count of category types : ")
messyPreDF.filter(col("purchased")===0).groupBy("weekend").count().show()
println("When there is a purchase, count of category types : ")
messyPreDF.filter(col("purchased")===1).groupBy("weekend").count().show()

When there is not a purchase, count of category types : 
+-------+-------+
|weekend|  count|
+-------+-------+
|      1|2815085|
|      0|5924948|
+-------+-------+

When there is a purchase, count of category types : 
+-------+------+
|weekend| count|
+-------+------+
|      1|194669|
|      0|315027|
+-------+------+



In [16]:
println("When there is not a purchase, count of category types : ")
messyPreDF.filter(col("purchased")===0).groupBy("day").count().show()
println("When there is a purchase, count of category types : ")
messyPreDF.filter(col("purchased")===1).groupBy("day").count().show()

When there is not a purchase, count of category types : 
+---+-------+
|day|  count|
+---+-------+
|Sun|1838984|
|Mon|1782983|
|Thu|1246785|
|Sat| 976101|
|Wed|1300885|
|Tue| 623987|
|Fri| 970308|
+---+-------+

When there is a purchase, count of category types : 
+---+------+
|day| count|
+---+------+
|Sun|122462|
|Mon| 99046|
|Thu| 68629|
|Sat| 72207|
|Wed| 68336|
|Fri| 58132|
|Tue| 20884|
+---+------+



In [17]:
println("When there is not a purchase, count of category types : ")
messyPreDF.filter(col("purchased")===0).groupBy("hStatus").count().show()
println("When there is a purchase, count of category types : ")
messyPreDF.filter(col("purchased")===1).groupBy("hStatus").count().show()

When there is not a purchase, count of category types : 
+---------+-------+
|  hStatus|  count|
+---------+-------+
|afternoon|2953906|
|  morning|2964755|
|latenight| 439608|
|  evening|2248916|
| midnight| 132848|
+---------+-------+

When there is a purchase, count of category types : 
+---------+------+
|  hStatus| count|
+---------+------+
|afternoon|190848|
|  morning|178765|
|latenight| 16403|
|  evening|120226|
| midnight|  3454|
+---------+------+



In [17]:
// For one-hot encoding,"that is not Spark Way" need some UDF approach even I don't want to use them a lot instead of built-ins.
val cateListContains  =
      udf((cateList: String, item: String) => if (cateList.contains(item)) 1 else 0)

cateListContains: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function2>,IntegerType,Some(List(StringType, StringType)))


In [18]:
// What is this function do is actually folding left from Target column that belongs to DataFrame, collect unique values 
// check whether particular column does have the value that we add as a new column.
def oneHotEncoderDF(dfParam: DataFrame,dfCol:String): DataFrame ={
    val targetCols = dfParam.select(dfCol).distinct.collect.map(x=>x(0).toString)
    targetCols.foldLeft(dfParam) {
      case (df, item) =>
        df.withColumn(item, cateListContains(col(dfCol), lit(item)))
}.drop(dfCol)}

oneHotEncoderDF: (dfParam: org.apache.spark.sql.DataFrame, dfCol: String)org.apache.spark.sql.DataFrame


In [19]:
// Instead of imperative way, I prefer functional way to define, drill into. This is the functions one-hot encode columns,
// drops the old categorical variable.
def colIterator(df: DataFrame,targetCols: List[String]) : DataFrame=
targetCols match{
    case Nil => df
    case(xs) => colIterator(oneHotEncoderDF(df,xs.head),xs.tail)
}

colIterator: (df: org.apache.spark.sql.DataFrame, targetCols: List[String])org.apache.spark.sql.DataFrame


In [52]:
// Given list of column names, my function can handle that this problem.
colIterator(messyPreDF,List("hStatus","specOffer")).drop("type").show()

+--------+---+----+-------+--------------------+-------------+-----------+-----------+---------+---------+-------+---------+-------+--------+-------+------+-----+-------+
|  sessID|day|hour|weekend|passedInSess(minute)|dayPopularity|clicksInDay|clicksInAll|purchased|afternoon|morning|latenight|evening|midnight|special|normal|brand|missing|
+--------+---+----+-------+--------------------+-------------+-----------+-----------+---------+---------+-------+---------+-------+--------+-------+------+-----+-------+
|10000108|Sun|  18|      1|  10.433333333333334|       393589|       1990|       8073|        0|        0|      0|        0|      1|       0|      1|     0|    0|      0|
|10000172|Mon|   5|      0|   3.466666666666667|       338077|        533|       2642|        0|        0|      0|        1|      0|       0|      1|     0|    0|      0|
|10000304|Sat|  11|      1|                0.15|        97536|         35|        881|        0|        0|      1|        0|      0|       0|    

In [55]:
// Now the schema is seems trainable with numerical values
colIterator(messyPreDF,List("day","hStatus","specOffer")).drop("type","hour").na.fill(0).printSchema()

root
 |-- sessID: string (nullable = true)
 |-- weekend: integer (nullable = false)
 |-- passedInSess(minute): double (nullable = false)
 |-- dayPopularity: long (nullable = false)
 |-- clicksInDay: long (nullable = false)
 |-- clicksInAll: long (nullable = false)
 |-- purchased: integer (nullable = false)
 |-- Sun: integer (nullable = false)
 |-- Mon: integer (nullable = false)
 |-- Thu: integer (nullable = false)
 |-- Sat: integer (nullable = false)
 |-- Wed: integer (nullable = false)
 |-- Tue: integer (nullable = false)
 |-- Fri: integer (nullable = false)
 |-- afternoon: integer (nullable = false)
 |-- morning: integer (nullable = false)
 |-- latenight: integer (nullable = false)
 |-- evening: integer (nullable = false)
 |-- midnight: integer (nullable = false)
 |-- special: integer (nullable = false)
 |-- normal: integer (nullable = false)
 |-- brand: integer (nullable = false)
 |-- missing: integer (nullable = false)



In [26]:
// At the end of preprocessing techniques, baseline model is ready for training.
val finalDF = colIterator(messyPreDF,List("hStatus","specOffer")).drop("type","hour","day").na.fill(0)

<console>: 47: error: not found: value messyPreDF

In [32]:
// Index and label columns dropped to determine features column.
val featuresCol  = finalDF.columns.filter(!Array("sessID","purchased").contains(_))

featuresCol: Array[String] = Array(weekend, passedInSess(minute), dayPopularity, clicksInDay, clicksInAll, afternoon, morning, latenight, evening, midnight, special, normal, brand, missing)


## Modelling Phase
### This process can be done by making checklist done as well as experimenting it. We don't have any prior knowledge about which techniques increase the accuracy, AUC, recall, precision ,f1 etc. We start to doing that by creating baseline model as simple as possible.

In [33]:
// This small code block transforms features columns to feature vector.
val assembler = new VectorAssembler()
  .setInputCols(featuresCol)
  .setOutputCol("features")

assembler: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_798704f44f3d


In [34]:
// When it is transformed 
val assembledDF = assembler.transform(finalDF).withColumnRenamed("purchased","label").withColumn("label",col("label").cast(DoubleType))
assembledDF.cache()

assembledDF: org.apache.spark.sql.DataFrame = [sessID: string, weekend: int ... 15 more fields]
res6: assembledDF.type = [sessID: string, weekend: int ... 15 more fields]


In [61]:
val Array(training, test) = assembledDF.randomSplit(Array(0.8,0.2))

training: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [sessID: string, weekend: int ... 15 more fields]
test: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [sessID: string, weekend: int ... 15 more fields]


In [62]:
val lr = new LogisticRegression()
    .setLabelCol("label")
    .setFeaturesCol("features")

lr: org.apache.spark.ml.classification.LogisticRegression = logreg_6e716be86d71


In [63]:
val lrmodel = lr.fit(training)

lrmodel: org.apache.spark.ml.classification.LogisticRegressionModel = LogisticRegressionModel: uid = logreg_6e716be86d71, numClasses = 2, numFeatures = 14


In [25]:
val predictionslr = lrmodel.transform(test)

predictionslr: org.apache.spark.sql.DataFrame = [sessID: string, weekend: int ... 18 more fields]


In [40]:
val evaluatorlr = new BinaryClassificationEvaluator()
                .setRawPredictionCol("rawPrediction")
                .setLabelCol("label")

evaluatorlr: org.apache.spark.ml.evaluation.BinaryClassificationEvaluator = binEval_bc0ac8e1c46e


In [41]:
evaluatorlr.evaluate(predictionslr)

res7: Double = 0.5


In [42]:
val rf = new RandomForestClassifier()
  .setLabelCol("label")
  .setFeaturesCol("features")

rf: org.apache.spark.ml.classification.RandomForestClassifier = rfc_ca62c939d298


In [43]:
val rfmodel = rf.fit(training)

rfmodel: org.apache.spark.ml.classification.RandomForestClassificationModel = RandomForestClassificationModel (uid=rfc_ca62c939d298) with 20 trees


In [44]:
val predictionsrf = rfmodel.transform(test)

predictionsrf: org.apache.spark.sql.DataFrame = [sessID: string, weekend: int ... 18 more fields]


In [45]:
val evaluatorrf = new BinaryClassificationEvaluator()
                .setRawPredictionCol("rawPrediction")
                .setLabelCol("label")

evaluatorrf: org.apache.spark.ml.evaluation.BinaryClassificationEvaluator = binEval_289cf702ebf9


In [65]:
evaluatorrf.getMetricName

res18: String = areaUnderROC


In [46]:
evaluatorrf.evaluate(predictionsrf)

res8: Double = 0.6885591158104136


#### Conclusion: For This Baseline Model, I preferred one linear and one non-linear model to see which one is performing better. For areaUnderROC Random Forest made a good job. If next baseline models will not perform well as good as this RF Model, this baseline features and model will be used.

## Statistical Analysis for Beliefs

In [57]:
messyPreDF.filter(col("purchased")===0.0).groupBy("hour").count().orderBy(desc("count")).show()

+----+------+
|hour| count|
+----+------+
|  19|650709|
|  18|622187|
|   9|551643|
|  17|542025|
|  10|533636|
|  20|526799|
|   8|519291|
|  11|516324|
|  16|491848|
|  12|487862|
|  15|487667|
|  14|480026|
|   7|468462|
|  13|464469|
|   6|375326|
|  21|301338|
|   5|249731|
|  22|147858|
|   4|118520|
|  23| 68230|
+----+------+
only showing top 20 rows



In [56]:
messyPreDF.filter(col("purchased")===1.0).groupBy("hour").count().orderBy(desc("count")).show()

+----+-----+
|hour|count|
+----+-----+
|  18|41203|
|  19|37135|
|  17|36936|
|   9|33439|
|   8|32689|
|  11|32635|
|  16|32537|
|  10|32522|
|  15|32228|
|  14|30279|
|  12|30149|
|  13|28707|
|   7|27830|
|  20|24851|
|   6|19650|
|  21|12197|
|   5|10259|
|  22| 4867|
|   4| 4182|
|  23| 1839|
+----+-----+
only showing top 20 rows



In [59]:
messyPreDF.filter(col("purchased")===0.0).groupBy("day").count().orderBy(desc("count")).show()

+---+-------+
|day|  count|
+---+-------+
|Sun|1839005|
|Mon|1782959|
|Wed|1300892|
|Thu|1246775|
|Sat| 976109|
|Fri| 970288|
|Tue| 624005|
+---+-------+



In [58]:
messyPreDF.filter(col("purchased")===1.0).groupBy("day").count().orderBy(desc("count")).show()

+---+------+
|day| count|
+---+------+
|Sun|122463|
|Mon| 99045|
|Sat| 72205|
|Thu| 68631|
|Wed| 68332|
|Fri| 58134|
|Tue| 20886|
+---+------+



In [64]:
messyPreDF.filter(col("purchased")===0.0).groupBy("hStatus").count().orderBy(desc("count")).show()

+---------+-------+
|  hStatus|  count|
+---------+-------+
|  morning|2964682|
|afternoon|2953897|
|  evening|2248891|
|latenight| 439659|
| midnight| 132904|
+---------+-------+



In [63]:
messyPreDF.filter(col("purchased")===1.0).groupBy("hStatus").count().orderBy(desc("count")).show()

+---------+------+
|  hStatus| count|
+---------+------+
|afternoon|190836|
|  morning|178765|
|  evening|120253|
|latenight| 16376|
| midnight|  3466|
+---------+------+



In [65]:
messyPreDF.show(10)

+--------+----+---+----+---------+-------+---------+--------------------+-------------+-----------+-----------+---------+
|  sessID|type|day|hour|  hStatus|weekend|specOffer|passedInSess(minute)|dayPopularity|clicksInDay|clicksInAll|purchased|
+--------+----+---+----+---------+-------+---------+--------------------+-------------+-----------+-----------+---------+
|10000108|   S|Sun|  18|  evening|      1|  special|  10.433333333333334|       393589|       1990|       8073|        0|
|10000172|   S|Mon|   5|latenight|      0|  special|   3.466666666666667|       338077|       1297|       7274|        0|
|10000304|   5|Sat|  11|  morning|      1|   normal|                0.15|        97536|         35|        881|        0|
|10000454|   S|Mon|  12|afternoon|      0|  special|                 0.0|       338077|       7287|      37077|        0|
|10000472|   0|Tue|   7|  morning|      0|  missing|                3.15|        55470|        110|       6953|        0|
|10000528|   S|Sun|  16|

## From statistical analysis, we are going to try play with variables, variable values, such as extracting night variable from from two hour ranges(midnight,latenight) etc.

In [165]:
val alphaOneDF = messyPreDF.withColumn("night",
                        when(col("hStatus")==="midnight"||col("hStatus")==="latenight",1).otherwise(0))
          .withColumn("weekendlk"
                      ,when(col("day")==="Sat",1)
                      .when(col("day")==="Sun",1)
                      .when(col("day")==="Mon",1)
                      .otherwise(0)).withColumnRenamed("passedInSess(minute)","pInSess")
                      .withColumn("workOut",when(col("hour")>17 && col("hour")<19,1).otherwise(0))
                      .drop("type","day","hour","weekend")

alphaOneDF: org.apache.spark.sql.DataFrame = [sessID: string, hStatus: string ... 9 more fields]


In [166]:
val alphaTwoDF = colIterator(alphaOneDF,List("specOffer","hStatus"))

alphaTwoDF: org.apache.spark.sql.DataFrame = [sessID: string, pInSess: double ... 16 more fields]


In [167]:
val updatedFeatureColOne = alphaTwoDF.columns.filter(!Array("sessID","purchased").contains(_))

updatedFeatureColOne: Array[String] = Array(pInSess, dayPopularity, clicksInDay, clicksInAll, night, weekendlk, workOut, special, normal, brand, missing, afternoon, morning, latenight, evening, midnight)


In [170]:
val scaler = new StandardScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  .setWithStd(true)
  .setWithMean(false)

scaler: org.apache.spark.ml.feature.StandardScaler = stdScal_dddf8cb9b832


## As well as feature transformations, scaling can help us a lot. So 'll try scaling approach for baselines one-by-one.

In [171]:
val scalerModelOne = scaler.fit(assembledDFOne)

// Normalize each feature to have unit standard deviation.
val scaledDataOne = scalerModelOne.transform(assembledDFOne).withColumnRenamed("purchased","label")
scaledDataOne.show()

+--------+------------------+-------------+-----------+-----------+-----+-----+---------+-------+-------+------+-----+-------+---------+-------+---------+-------+--------+--------------------+--------------------+
|  sessID|           pInSess|dayPopularity|clicksInDay|clicksInAll|label|night|weekendlk|workOut|special|normal|brand|missing|afternoon|morning|latenight|evening|midnight|            features|      scaledFeatures|
+--------+------------------+-------------+-----------+-----------+-----+-----+---------+-------+-------+------+-----+-------+---------+-------+---------+-------+--------+--------------------+--------------------+
|10000108|10.433333333333334|       393589|       1990|       8073|    0|    0|        1|      1|      1|     0|    0|      0|        0|      0|        0|      1|       0|(16,[0,1,2,3,5,6,...|(16,[0,1,2,3,5,6,...|
|10000172| 3.466666666666667|       338077|       1297|       7274|    0|    1|        1|      0|      1|     0|    0|      0|        0|      0|

scalerModelOne: org.apache.spark.ml.feature.StandardScalerModel = stdScal_dddf8cb9b832
scaledDataOne: org.apache.spark.sql.DataFrame = [sessID: string, pInSess: double ... 18 more fields]


In [102]:
val Array(training,test) = scaledDataOne.randomSplit(Array(0.8,0.2))

training: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [sessID: string, hour: int ... 23 more fields]
test: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [sessID: string, hour: int ... 23 more fields]


In [109]:
val rf = new RandomForestClassifier()
  .setLabelCol("label")
  .setFeaturesCol("features")

rf: org.apache.spark.ml.classification.RandomForestClassifier = rfc_5c380becfa90


In [110]:
val rfModel = rf.fit(training)

rfModel: org.apache.spark.ml.classification.RandomForestClassificationModel = RandomForestClassificationModel (uid=rfc_5c380becfa90) with 20 trees


In [111]:
val rfPreds = rfModel.transform(test)

rfPreds: org.apache.spark.sql.DataFrame = [sessID: string, hour: int ... 26 more fields]


In [112]:
val evaluatorRf = new BinaryClassificationEvaluator()
                .setRawPredictionCol("rawPrediction")
                .setLabelCol("label")

evaluatorRf: org.apache.spark.ml.evaluation.BinaryClassificationEvaluator = binEval_edd1bf2d6f31


In [113]:
evaluatorRf.evaluate(rfPreds)

res43: Double = 0.5


In [158]:
val lrModel = lr.fit(training)

lrModel: org.apache.spark.ml.classification.LogisticRegressionModel = LogisticRegressionModel: uid = logreg_1f7d3bca7c3c, numClasses = 2, numFeatures = 17


In [159]:
val lrPreds = lrModel.transform(test)

lrPreds: org.apache.spark.sql.DataFrame = [sessID: string, weekend: int ... 22 more fields]


In [160]:
evaluatorlr.evaluate(lrPreds)

res73: Double = 0.5


## This model performs worse than previous model. So we will not use this combination anymore.

In [305]:
alphaTwoDF.columns

res188: Array[String] = Array(sessID, pInSess, dayPopularity, clicksInDay, clicksInAll, purchased, night, weekendlk, workOut, special, normal, brand, missing, afternoon, morning, latenight, evening, midnight)


## Looking Correlations can help us.

In [310]:
alphaTwoDF.stat.corr("clicksInDay","clicksInAll")

res193: Double = 0.6145526630737359


In [317]:
alphaTwoDF.stat.corr("night","latenight")

res200: Double = 0.8705543522756433


In [320]:
val alphaThreeDF = alphaTwoDF.drop("midnight","latenight","clicksInDay")

alphaThreeDF: org.apache.spark.sql.DataFrame = [sessID: string, pInSess: double ... 13 more fields]


In [19]:
messyPreDF.groupBy("day","hour").count().orderBy(desc("count")).show()

+---+----+------+
|day|hour| count|
+---+----+------+
|Sun|  18|154941|
|Sun|  19|150458|
|Sun|   9|145022|
|Sun|  17|139771|
|Sun|  10|132639|
|Wed|  19|131506|
|Mon|  18|128533|
|Sun|  16|125256|
|Mon|  19|125097|
|Sun|  11|122582|
|Sun|  15|122028|
|Sun|   8|120213|
|Sun|  20|119525|
|Mon|  17|119466|
|Mon|   9|118981|
|Mon|   8|118174|
|Mon|  10|116924|
|Mon|   7|114440|
|Wed|  18|114040|
|Sun|  14|113439|
+---+----+------+
only showing top 20 rows



In [67]:
// Functions take start and end range, and calculates how many sales made between ranges
def hourRangeABTester(startPt: Int, endPt:Int) = {
    messyPreDF.withColumn("importantHalf", when(col("hour")>startPt && col("hour")<=endPt,1).otherwise(0)).filter(col("purchased")===col("importantHalf"))count()
}

hourRangeABTester: (startPt: Int, endPt: Int)Long


In [68]:
// Maximize day sales by obtaining best range for buying particular to this dataset.
Array((10,20),(12,20),(13,20),(14,20),(12,17),(11,18)).map(x=>hourRangeABTester(x._1,x._2))

res19: Array[Long] = Array(3796731, 4738258, 5173985, 5623761, 6434927, 5396014)


## 12PM-17PM seems good range to be predictor
##### res19: Array[Long] = Array(3796731, 4738258, 5173985, 5623761, 6434927, 5396014)

In [21]:
val alphaFourDF= colIterator(messyPreDF.withColumn("importantHalf",
                                  when(col("hour")>12&& col("hour")<=17,1).otherwise(0))
            ,List("hStatus","specoffer"))
            .withColumnRenamed("passedInSess(minute)","pInSess")
            .withColumnRenamed("purchased","label")
            .drop("clicksInDay","type","day")

alphaFourDF: org.apache.spark.sql.DataFrame = [sessID: string, hour: int ... 15 more fields]


In [22]:
val updatedFeatureColFourAlpha = alphaFourDF.columns.filter(!Array("sessID","label").contains(_))

updatedFeatureColFourAlpha: Array[String] = Array(hour, weekend, pInSess, dayPopularity, clicksInAll, importantHalf, afternoon, morning, latenight, evening, midnight, special, normal, brand, missing)


In [23]:
alphaFourDF.printSchema()

root
 |-- sessID: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- weekend: integer (nullable = false)
 |-- pInSess: double (nullable = true)
 |-- dayPopularity: long (nullable = false)
 |-- clicksInAll: long (nullable = false)
 |-- label: integer (nullable = false)
 |-- importantHalf: integer (nullable = false)
 |-- afternoon: integer (nullable = false)
 |-- morning: integer (nullable = false)
 |-- latenight: integer (nullable = false)
 |-- evening: integer (nullable = false)
 |-- midnight: integer (nullable = false)
 |-- special: integer (nullable = false)
 |-- normal: integer (nullable = false)
 |-- brand: integer (nullable = false)
 |-- missing: integer (nullable = false)



In [24]:
val assembler = new VectorAssembler()
  .setInputCols(updatedFeatureColFourAlpha)
  .setOutputCol("features")
val assembledDFFour = assembler.transform(alphaFourDF)

assembler: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_d01a57a96dbc
assembledDFFour: org.apache.spark.sql.DataFrame = [sessID: string, hour: int ... 16 more fields]


In [25]:
val Array(training,test) = assembledDFFour.na.fill(0.0).randomSplit(Array(0.8,0.2))

training: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [sessID: string, hour: int ... 16 more fields]
test: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [sessID: string, hour: int ... 16 more fields]


In [26]:
val rf = new RandomForestClassifier()
  .setLabelCol("label")
  .setFeaturesCol("features")

rf: org.apache.spark.ml.classification.RandomForestClassifier = rfc_67562365115c


In [71]:
val rfModel = rf.fit(training)
val rfPreds = rfModel.transform(test)

rfModel: org.apache.spark.ml.classification.RandomForestClassificationModel = RandomForestClassificationModel (uid=rfc_57fe9d1bfe21) with 20 trees
rfPreds: org.apache.spark.sql.DataFrame = [sessID: string, hour: int ... 19 more fields]


In [74]:
val evaluatorrf = new BinaryClassificationEvaluator()
                .setRawPredictionCol("rawPrediction")
                .setLabelCol("label")

evaluatorrf: org.apache.spark.ml.evaluation.BinaryClassificationEvaluator = binEval_c84c98a138f2


In [75]:
evaluatorrf.evaluate(rfPreds)

res21: Double = 0.5


## Three Models Compared. Accuracy scores around 0.95 but AUC score is terrible. We care about AUC score. So we will continue with first model and tune it.

In [22]:
val Array(trainingFinal, testFinal) = assembledDF.randomSplit(Array(0.8,0.2))

<console>: 44: error: not found: value assembledDF

In [126]:
val rf = new RandomForestClassifier()
  .setLabelCol("label")
  .setFeaturesCol("features")

rf: org.apache.spark.ml.classification.RandomForestClassifier = rfc_4bb95aac4948


In [79]:
val rfModelFinal = rf.fit(trainingFinal)
val rfPredFinal = rfModelFinal.transform(testFinal)

rfModelFinal: org.apache.spark.ml.classification.RandomForestClassificationModel = RandomForestClassificationModel (uid=rfc_69a96f7ab686) with 20 trees
rfPredFinal: org.apache.spark.sql.DataFrame = [sessID: string, weekend: int ... 18 more fields]


In [50]:
val evaluatorrf = new BinaryClassificationEvaluator()
                .setRawPredictionCol("rawPrediction")
                .setLabelCol("label")

evaluatorrf: org.apache.spark.ml.evaluation.BinaryClassificationEvaluator = binEval_dec0f8244e1a


In [81]:
evaluatorrf.evaluate(rfPredFinal)

res22: Double = 0.6809865725642923


In [40]:
val stdtScalerFinal = new StandardScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  .setWithStd(true)
  .setWithMean(true)

stdtScalerFinal: org.apache.spark.ml.feature.StandardScaler = stdScal_42f5494f4046


In [44]:
val rfFinalScaled = new RandomForestClassifier()
  .setLabelCol("label")
  .setFeaturesCol("scaledFeatures")

rfFinalScaled: org.apache.spark.ml.classification.RandomForestClassifier = rfc_dc38a9aaf699


In [127]:
val scaledFinalDF = stdtScalerFinal.fit(assembledDF).transform(assembledDF)

scaledFinalDF: org.apache.spark.sql.DataFrame = [sessID: string, weekend: int ... 16 more fields]


In [128]:
scaledFinalDF.columns

res21: Array[String] = Array(sessID, weekend, passedInSess(minute), dayPopularity, clicksInDay, clicksInAll, label, afternoon, morning, latenight, evening, midnight, special, normal, brand, missing, features, scaledFeatures)


In [129]:
val Array(traningFinalScaled, testFinalScaled) = scaledFinalDF.randomSplit(Array(0.8,0.2))

traningFinalScaled: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [sessID: string, weekend: int ... 16 more fields]
testFinalScaled: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [sessID: string, weekend: int ... 16 more fields]


In [130]:
val rfModelScaledFinal = rfFinalScaled.fit(traningFinalScaled)

rfModelScaledFinal: org.apache.spark.ml.classification.RandomForestClassificationModel = RandomForestClassificationModel (uid=rfc_dc38a9aaf699) with 20 trees


In [131]:
val rfPredScaledFinal = rfModelScaledFinal.transform(testFinalScaled)

rfPredScaledFinal: org.apache.spark.sql.DataFrame = [sessID: string, weekend: int ... 19 more fields]


In [132]:
evaluatorrf.evaluate(rfPredScaledFinal)

res22: Double = 0.6823373771250568


## We can't be sure about scaling is not helping us, maybe different part of data can help us when we scale, we are going to use 5-CV to research deeply this issue. But for now scaling helped us.

In [20]:
val updatedCombiner = dateInfExtractor _ andThen 
                          weekTypeExtractor _ andThen 
                          specOfferExtractor _ andThen 
                          sessAverageItemPriceViewed _ andThen
                          timeSpentEachSession _ andThen
                          dayPopularityExtractor _ andThen
                          itemDayPopularityExtractor _ andThen
                          itemGeneralPopularityExtractor _ 

updatedCombiner: org.apache.spark.sql.DataFrame => org.apache.spark.sql.DataFrame = <function1>


In [21]:
val updatedCombinerDF = updatedCombiner(cStreamDF)

updatedCombinerDF: org.apache.spark.sql.DataFrame = [sessID: string, date: string ... 16 more fields]


In [22]:
updatedCombinerDF.cache()

res0: updatedCombinerDF.type = [sessID: string, date: string ... 16 more fields]


In [23]:
val messyBetaDF = updatedCombinerDF
    .join(cStreamBasketDF.groupBy("sessID").count(),Seq("sessID"),"left")
    .withColumnRenamed("count","purchased")
    .withColumn("purchased"
    ,when(col("purchased")>0,1)
    .otherwise(0))
    .dropDuplicates("sessID")
    .drop("itemID")
    .drop("date")
    .drop("sDate")

messyBetaDF: org.apache.spark.sql.DataFrame = [sessID: string, type: string ... 14 more fields]


In [24]:
val finalUpdatedAlphaDF = messyBetaDF
    .withColumn("clicksCombination",col("clicksInDay")/col("clicksInAll"))
    .withColumn("importantHalf",
               when(col("hour")>12&& col("hour")<=17,1).otherwise(0))


finalUpdatedAlphaDF: org.apache.spark.sql.DataFrame = [sessID: string, type: string ... 16 more fields]


In [25]:
finalUpdatedAlphaDF.cache()

res1: finalUpdatedAlphaDF.type = [sessID: string, type: string ... 16 more fields]


In [26]:
finalUpdatedAlphaDF.printSchema()

root
 |-- sessID: string (nullable = true)
 |-- type: string (nullable = true)
 |-- day: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- hStatus: string (nullable = false)
 |-- weekend: integer (nullable = false)
 |-- specOffer: string (nullable = false)
 |-- sum(price): double (nullable = true)
 |-- nOfPVis: long (nullable = false)
 |-- avg(price): double (nullable = true)
 |-- stddev_samp(price): double (nullable = true)
 |-- passedInSess(minute): double (nullable = true)
 |-- dayPopularity: long (nullable = false)
 |-- clicksInDay: long (nullable = false)
 |-- clicksInAll: long (nullable = false)
 |-- purchased: integer (nullable = false)
 |-- clicksCombination: double (nullable = true)
 |-- importantHalf: integer (nullable = false)



In [None]:
// Maybe first three page can be important about prection power. But also it can be main page which means everyone can see.
finalUpdatedAlphaDF.filter(col("purchased")===1)
    .groupBy("type")
    .count()
    .orderBy(desc("count")).show()

In [27]:
val finalFeatAddedDF = finalUpdatedAlphaDF.withColumn("mostlyVisitedPage"
                               ,when(col("type")==="1"||col("type")==="2"|| col("type")==="3",1)
                                .otherwise(0))
                        .withColumnRenamed("purchased","label")
                        .withColumn("label",col("label").cast(DoubleType))

finalFeatAddedDF: org.apache.spark.sql.DataFrame = [sessID: string, type: string ... 17 more fields]


In [28]:
val finalAltDF = messyBetaDF
.drop("hour","type","day")
.withColumnRenamed("purchased","label")
.withColumn("label",col("label").cast(DoubleType))

finalAltDF: org.apache.spark.sql.DataFrame = [sessID: string, hStatus: string ... 11 more fields]


In [29]:
val finalAlphaDF = colIterator(finalAltDF,List("specOffer","hStatus")).na.fill(0)

finalAlphaDF: org.apache.spark.sql.DataFrame = [sessID: string, weekend: int ... 18 more fields]


In [30]:
finalAlphaDF.cache()

res3: finalAlphaDF.type = [sessID: string, weekend: int ... 18 more fields]


In [31]:
finalAlphaDF.printSchema()

root
 |-- sessID: string (nullable = true)
 |-- weekend: integer (nullable = false)
 |-- sum(price): double (nullable = false)
 |-- nOfPVis: long (nullable = false)
 |-- avg(price): double (nullable = false)
 |-- stddev_samp(price): double (nullable = false)
 |-- passedInSess(minute): double (nullable = false)
 |-- dayPopularity: long (nullable = false)
 |-- clicksInDay: long (nullable = false)
 |-- clicksInAll: long (nullable = false)
 |-- label: double (nullable = false)
 |-- special: integer (nullable = false)
 |-- normal: integer (nullable = false)
 |-- brand: integer (nullable = false)
 |-- missing: integer (nullable = false)
 |-- afternoon: integer (nullable = false)
 |-- morning: integer (nullable = false)
 |-- latenight: integer (nullable = false)
 |-- evening: integer (nullable = false)
 |-- midnight: integer (nullable = false)



In [32]:
// Index and label columns dropped to determine features column.
val featuresColAlpha  = finalAlphaDF.columns.filter(!Array("sessID","label").contains(_))

featuresColAlpha: Array[String] = Array(weekend, sum(price), nOfPVis, avg(price), stddev_samp(price), passedInSess(minute), dayPopularity, clicksInDay, clicksInAll, special, normal, brand, missing, afternoon, morning, latenight, evening, midnight)


In [33]:
featuresColAlpha

res5: Array[String] = Array(weekend, sum(price), nOfPVis, avg(price), stddev_samp(price), passedInSess(minute), dayPopularity, clicksInDay, clicksInAll, special, normal, brand, missing, afternoon, morning, latenight, evening, midnight)


In [34]:
// This small code block transforms features columns to feature vector.
val assemblerAlpha = new VectorAssembler()
  .setInputCols(featuresColAlpha)
  .setOutputCol("features")

assemblerAlpha: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_959b1e9884eb


In [35]:
val assembledAlphaDF = assemblerAlpha.transform(finalAlphaDF)

assembledAlphaDF: org.apache.spark.sql.DataFrame = [sessID: string, weekend: int ... 19 more fields]


In [36]:
val scalerFinalMaxAbsAlpha = new MaxAbsScaler()
  .setInputCol("features")
  .setOutputCol("maxAbsScaledFeatures")

scalerFinalMaxAbsAlpha: org.apache.spark.ml.feature.MaxAbsScaler = maxAbsScal_7e10cdae4725


In [37]:
val scalerFinalMinAlpha = new MinMaxScaler()
  .setInputCol("features")
  .setOutputCol("minMaxScaledFeatures")

scalerFinalMinAlpha: org.apache.spark.ml.feature.MinMaxScaler = minMaxScal_a3ed58b42f22


In [38]:
val scalerFinalStandartAlpha = new StandardScaler()
  .setInputCol("features")
  .setOutputCol("standardScaledFeatures")

scalerFinalStandartAlpha: org.apache.spark.ml.feature.StandardScaler = stdScal_34a2fd920ae4


In [39]:
val scaledFinalAlphaDF = scalerFinalMaxAbsAlpha.fit(assembledAlphaDF)
                    .transform(assembledAlphaDF)

scaledFinalAlphaDF: org.apache.spark.sql.DataFrame = [sessID: string, weekend: int ... 20 more fields]


In [40]:
val Array(trainingFinalAlpha,testFinalAlpha) = scaledFinalAlphaDF.randomSplit(Array(0.8,0.2))
trainingFinalAlpha.cache()
testFinalAlpha.cache()

trainingFinalAlpha: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [sessID: string, weekend: int ... 20 more fields]
testFinalAlpha: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [sessID: string, weekend: int ... 20 more fields]
res6: testFinalAlpha.type = [sessID: string, weekend: int ... 20 more fields]


In [41]:
val rfFinalAlphaScaled = new RandomForestClassifier()
  .setLabelCol("label")
  .setFeaturesCol("maxAbsScaledFeatures")

rfFinalAlphaScaled: org.apache.spark.ml.classification.RandomForestClassifier = rfc_d1a7b1a84798


In [42]:
val rfAlphaFinalModel = rfFinalAlphaScaled.fit(trainingFinalAlpha)

rfAlphaFinalModel: org.apache.spark.ml.classification.RandomForestClassificationModel = RandomForestClassificationModel (uid=rfc_d1a7b1a84798) with 20 trees


In [43]:
val rfPredAlphaFinal = rfAlphaFinalModel.transform(testFinalAlpha)

rfPredAlphaFinal: org.apache.spark.sql.DataFrame = [sessID: string, weekend: int ... 23 more fields]


In [44]:
val evaluatorAlphaFinal = new BinaryClassificationEvaluator()
                .setRawPredictionCol("rawPrediction")
                .setLabelCol("label")

evaluatorAlphaFinal: org.apache.spark.ml.evaluation.BinaryClassificationEvaluator = binEval_b0e27bb71aa7


In [41]:
// AUC SCORE
evaluatorAlphaFinal.evaluate(rfPredAlphaFinal)

res5: Double = 0.7133986723907214


In [67]:
val truePreds = rfPredAlphaFinal
.withColumn("acc",when(col("prediction")-col("label")===0,1).otherwise(0)).agg(sum("acc")).collect().toList(0)(0).asInstanceOf[Long]

truePreds: Long = 1746874


In [69]:
val totalCount = rfPredAlphaFinal.count()

totalCount: Long = 1848905


In [74]:
val accScore = truePreds*100/totalCount
println(s"Labels are %$accScore correctly predicted!")

Labels are %94 correctly predicted!


accScore: Long = 94


## Accuracy Score =%94 , AUC Score = %7133
### Since data is big and best scores captures based on Random Forest, Cannot use Cross Validation because it excepts Java Heap Error both ide and notebook. 

## I hope it helps !!!

In [75]:
// Adding Average Time Between Sessions doesn't help to our model in this case.Just increased to non-linearity.
val updatedCombinerWindowed = 
                          dateInfExtractor _ andThen 
                          weekTypeExtractor _ andThen 
                          specOfferExtractor _ andThen 
                          sessAverageItemPriceViewed _ andThen
                          sessAverageTimeBSessions _ andThen
                          timeSpentEachSession _ andThen
                          dayPopularityExtractor _ andThen
                          itemDayPopularityExtractor _ andThen
                          itemGeneralPopularityExtractor _ 

updatedCombinerWindowed: org.apache.spark.sql.DataFrame => org.apache.spark.sql.DataFrame = <function1>
