In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import Window
import pyspark.sql.functions as f

In [2]:
spark = SparkSession.builder.master("local[2]").getOrCreate()
# spark.stop()

# Load data 

In [3]:
data_path = "Example_data_for_Apple_BigData_Tasks.csv"
df = spark.read.option("header", "true").csv(data_path)

# Preprocessing 

In [4]:
# Convert date columns
df = df.withColumn("eventTime" ,f.to_timestamp("eventTime", "MM/dd/yy HH:mm"))
df.createOrReplaceTempView("visits")
df.show()

+-------------+--------------------+--------+-------------------+-----------------+
|     category|             product|  userId|          eventTime|        eventType|
+-------------+--------------------+--------+-------------------+-----------------+
|        books|   Scala for Dummies|user 100|2018-03-01 12:00:00| view description|
|        books|   Scala for Dummies|user 100|2018-03-01 12:01:00|             like|
|        books|   Scala for Dummies|user 100|2018-03-01 12:01:00|     check status|
|        books|    Java for Dummies|user 100|2018-03-01 12:02:00| view description|
|        books|    Java for Dummies|user 100|2018-03-01 12:02:00|          dislike|
|        books|    Java for Dummies|user 100|2018-03-01 12:02:00|close description|
|        books|   Scala for Dummies|user 100|2018-03-01 12:04:00| view description|
|        books|   Scala for Dummies|user 100|2018-03-01 12:06:00|    add to bucket|
|        books|Sherlock Holmes, ...|user 200|2018-03-01 12:11:00| view descr

# Task 1: Enrich with session 


Task #1: 
Enrich incoming data with user sessions. Definition of a session: for each user, it contains consecutive events that belong to a single category  and are not more than 5 minutes away from each other. Output should look like this (session columns are in bold):
eventTime, eventType, category, userId, …, sessionId, sessionStartTime, sessionEndTime  
Implement it using 1) sql window functions and 2) Spark aggregator.

In [5]:
#SPARK SQL QUERY
# Take LAG() to compare previous eventTime with current . If Diff > 5 new session kicked off
# Set -1 to events having no previous event --> Again means begening of new session 

spark.sql(
"""SELECT *,
           MIN(eventTime) OVER (PARTITION BY sessId) as sessStartDate,
           MAX(eventTime) OVER (PARTITION BY sessId) as sessEndDate
   FROM (
       SELECT userId,category, eventType ,eventTime, 
          IF(
              IFNULL(
                      (
                          CAST(eventTime AS LONG) - 
                          CAST(LAG(eventTime) OVER (PARTITION BY userId, category ORDER BY eventTime) AS LONG)
                      )/60,
                      -1
                  ) BETWEEN 0 AND 5 ,
              FIRST(HASH(CONCAT(userId,category,eventType,eventTime))) OVER (PARTITION BY userId, category ORDER BY eventTime),
              HASH(CONCAT(userId,category,eventType,eventTime))

          ) as sessId
       FROM visits)
""").createOrReplaceTempView("visits_with_sess")

spark.sql("SELECT * FROM visits_with_sess").show(50)

+--------+-------------+-----------------+-------------------+-----------+-------------------+-------------------+
|  userId|     category|        eventType|          eventTime|     sessId|      sessStartDate|        sessEndDate|
+--------+-------------+-----------------+-------------------+-----------+-------------------+-------------------+
|user 100|        books| view description|2018-03-01 12:00:00| -575256446|2018-03-01 12:00:00|2018-03-01 12:06:00|
|user 100|        books|             like|2018-03-01 12:01:00| -575256446|2018-03-01 12:00:00|2018-03-01 12:06:00|
|user 100|        books|     check status|2018-03-01 12:01:00| -575256446|2018-03-01 12:00:00|2018-03-01 12:06:00|
|user 100|        books| view description|2018-03-01 12:02:00| -575256446|2018-03-01 12:00:00|2018-03-01 12:06:00|
|user 100|        books|          dislike|2018-03-01 12:02:00| -575256446|2018-03-01 12:00:00|2018-03-01 12:06:00|
|user 100|        books|close description|2018-03-01 12:02:00| -575256446|2018-0

In [6]:
#Spark aggregation
evnt_lag_mins = 5
sel_cols = ['userId','category', 'eventType',  'eventTime','sessId','sessStartDate','sessEndDate']
w = Window.partitionBy(f.col('userId'),f.col('category')).orderBy(f.col("eventTime"))
w_sess = Window.partitionBy(f.col('sessId'))

df_w = df.withColumn('prevEvtTS', f.lag('eventTime').over(w))\
         .withColumn("lagPrevEvtMin",(f.col('eventTime').cast("long")-f.col('prevEvtTS').cast("long"))/60).na.fill(-1)\
         .withColumn('hash',f.hash(f.concat(f.col('userId'),f.col('category'),f.col('eventType'),f.col('eventTime'))))\
         .withColumn('sessId',f.when(
                                    f.col('lagPrevEvtMin').between(0,evnt_lag_mins),
                                    f.first('hash').over(w))
                                .otherwise(f.col('hash')))\
         .withColumn("sessStartDate",f.min('eventTime').over(w_sess))\
         .withColumn("sessEndDate",f.max('eventTime').over(w_sess))

df_w.select(sel_cols).show(50)

+--------+-------------+-----------------+-------------------+-----------+-------------------+-------------------+
|  userId|     category|        eventType|          eventTime|     sessId|      sessStartDate|        sessEndDate|
+--------+-------------+-----------------+-------------------+-----------+-------------------+-------------------+
|user 100|        books| view description|2018-03-01 12:00:00| -575256446|2018-03-01 12:00:00|2018-03-01 12:06:00|
|user 100|        books|             like|2018-03-01 12:01:00| -575256446|2018-03-01 12:00:00|2018-03-01 12:06:00|
|user 100|        books|     check status|2018-03-01 12:01:00| -575256446|2018-03-01 12:00:00|2018-03-01 12:06:00|
|user 100|        books| view description|2018-03-01 12:02:00| -575256446|2018-03-01 12:00:00|2018-03-01 12:06:00|
|user 100|        books|          dislike|2018-03-01 12:02:00| -575256446|2018-03-01 12:00:00|2018-03-01 12:06:00|
|user 100|        books|close description|2018-03-01 12:02:00| -575256446|2018-0

# Task 2: Compute statistics

Task #2:
Compute the following statistics:
-	For each category find median session duration
-	For each category find # of unique users spending less than 1 min, 1 to 5 mins and more than 5 mins
-	For each category find top 10 products ranked by time spent by users on product pages - this may require different type of sessions. For this particular task, session lasts until the user is looking at particular product. When particular user switches to another product the new session starts.

In [7]:
# 2.1 For each category find median session duration

# due to requirment session lasts per category hovever in majority of cases real world sessions last for
# entire user web page visit not category page only.So sessionId grouping by category as well just to 
# avoid problems with real sessions

print("#################### Median session duration for each category ####################")
spark.sql(
    """ SELECT  category,
                PERCENTILE(sessDurat,0.5) as sessDurMedianMins,
                PERCENTILE_APPROX(sessDurat,0.5) as sessDurAproxMedianMins,
                MEAN(sessDurat) as sessDurMeanMins
        FROM (
             SELECT sessId, category,
                    FIRST(sessStartDate) as sessStartDate,
                    FIRST(sessEndDate) as sessEndDate,
                    (CAST(FIRST(sessEndDate) AS LONG)-CAST(FIRST(sessStartDate) AS LONG))/60 as sessDurat
             FROM visits_with_sess
             GROUP BY sessId,category
        )
        GROUP BY category
    """).show(50)

#################### Median session duration for each category ####################
+-------------+-----------------+----------------------+---------------+
|     category|sessDurMedianMins|sessDurAproxMedianMins|sessDurMeanMins|
+-------------+-----------------+----------------------+---------------+
|        books|              5.0|                   4.0|            5.0|
|    notebooks|              0.5|                   0.0|           0.75|
|mobile phones|              3.5|                   0.0|            3.5|
+-------------+-----------------+----------------------+---------------+



In [8]:

# 2.2 For each category find # of unique users spending less than 1 min, 1 to 5 mins and more than 5 mins
print("######################################################################################")
print("# of unique users spending less than 1 min, 1 to 5 mins and more than 5 mins For each category")
print("######################################################################################")

spark.sql("""
            SELECT  sessId, category, userId,
                    FIRST(sessStartDate) as sessStartDate,
                    FIRST(sessEndDate) as sessEndDate,
                    (CAST(FIRST(sessEndDate) AS LONG)-CAST(FIRST(sessStartDate) AS LONG))/60 as sessDurat
            FROM visits_with_sess
            GROUP BY sessId ,category ,userId
""").createOrReplaceTempView("category_per_session")

print("# of unique users spending less than 1 min")
spark.sql("""SELECT  category,
                     COUNT(DISTINCT userId) as NofUsrLess1M
            FROM category_per_session
            WHERE sessDurat<1
            GROUP BY category
""").show()

print("# of unique users spending more than 5 mins")
spark.sql("""SELECT  category,
                     COUNT(DISTINCT userId) as NofUsrMore5M
            FROM category_per_session
            WHERE sessDurat > 5
            GROUP BY category
""").show()

print("# of unique users spending 1 to 5 mins")
spark.sql("""SELECT  category,
                    COUNT(DISTINCT userId) as NofUsr1to5M
             FROM category_per_session
             WHERE sessDurat BETWEEN 1 AND 5
             GROUP BY category
""").show()

######################################################################################
# of unique users spending less than 1 min, 1 to 5 mins and more than 5 mins For each category
######################################################################################
# of unique users spending less than 1 min
+-------------+------------+
|     category|NofUsrLess1M|
+-------------+------------+
|    notebooks|           1|
|mobile phones|           1|
+-------------+------------+

# of unique users spending more than 5 mins
+-------------+------------+
|     category|NofUsrMore5M|
+-------------+------------+
|        books|           1|
|mobile phones|           1|
+-------------+------------+

# of unique users spending 1 to 5 mins
+---------+-----------+
| category|NofUsr1to5M|
+---------+-----------+
|    books|          1|
|notebooks|          2|
+---------+-----------+



In [9]:
# 2.3 For each category find top 10 products ranked by time spent by users on product pages - this may 
#     require different type of sessions. For this particular task, session lasts until the user is looking
#     at particular product. When particular user switches to another product the new session starts.

spark.sql(
"""SELECT *,
           MIN(eventTime) OVER (PARTITION BY prodSessId) as prodSessStartDate,
           MAX(eventTime) OVER (PARTITION BY prodSessId) as prodSessEndDate
   FROM (
       SELECT userId, category ,product ,eventTime, 
              IF(
                  IFNULL(LAG(product) OVER (PARTITION BY userId, product ORDER BY eventTime),product) = product ,
                  FIRST(HASH(CONCAT(userId,category,eventType,eventTime))) OVER (PARTITION BY userId, product ORDER BY eventTime),
                  HASH(CONCAT(userId,category,eventType,eventTime))
                ) as prodSessId
       FROM visits
    )
""").createOrReplaceTempView("visits_with_sess_per_prod")

# row_N = 1 for removing duplicates . Used instead of group by and manualy selecting all FIRST values 
spark.sql("""SELECT userId ,category ,product ,prodSessId ,prodSessStartDate ,prodSessEndDate , prodSessDurat
             FROM (
                 SELECT *,
                         ROW_NUMBER() OVER (PARTITION BY prodSessId ORDER BY product) as row_N  ,
                         (CAST(prodSessEndDate AS LONG)-CAST(prodSessStartDate AS LONG))/60 as prodSessDurat
                 FROM visits_with_sess_per_prod
             )
             where row_N = 1
""").createOrReplaceTempView("visits_with_sess_per_prod")
# spark.sql("SELECT * FROM visits_with_sess_per_prod").show(30)

spark.sql(""" SELECT *
            FROM(
                SELECT *,
                        ROW_NUMBER() OVER (PARTITION BY category, product ORDER BY prodSessDurat DESC) as topN
                 FROM visits_with_sess_per_prod
            )
            WHERE topN < 10
""").show()


+--------+-------------+--------------------+-----------+-------------------+-------------------+-------------+----+
|  userId|     category|             product| prodSessId|  prodSessStartDate|    prodSessEndDate|prodSessDurat|topN|
+--------+-------------+--------------------+-----------+-------------------+-------------------+-------------+----+
|user 100|        books|    Java for Dummies|-1158420428|2018-03-01 12:02:00|2018-03-01 12:02:00|          0.0|   1|
|user 200|        books|    Romeo and Juliet| 1279730207|2018-03-01 12:15:00|2018-03-01 12:15:00|          0.0|   1|
|user 100|    notebooks|      MacBook Pro 15|  653119526|2018-03-01 12:15:00|2018-03-01 12:16:00|          1.0|   1|
|user 100|        books|   Scala for Dummies| -575256446|2018-03-01 12:00:00|2018-03-01 12:06:00|          6.0|   1|
|user 200|        books|Sherlock Holmes, ...| 1026034203|2018-03-01 12:11:00|2018-03-01 12:13:00|          2.0|   1|
|user 100|mobile phones|       iPhone 8 Plus|  124119555|2018-03

In [10]:
#Spark Aggregations
w_sess = Window.partitionBy(f.col('sessId')).orderBy(f.col("sessId"))
         
df_w = df_w.withColumn("sessDurat",(f.col("sessEndDate").cast("long") - f.col("sessStartDate").cast("long"))/60)

# df_w.select(['userId','category',  'eventTime','sessId','sessStartDate','sessEndDate','sessDurat']).show(10)
df_unic = df_w.select(['userId','category','sessId','sessStartDate','sessEndDate','sessDurat']).dropDuplicates()
df_unic.show()


+--------+-------------+-----------+-------------------+-------------------+---------+
|  userId|     category|     sessId|      sessStartDate|        sessEndDate|sessDurat|
+--------+-------------+-----------+-------------------+-------------------+---------+
|user 100|        books| -575256446|2018-03-01 12:00:00|2018-03-01 12:06:00|      6.0|
|user 200|    notebooks| -215346863|2018-03-01 12:15:00|2018-03-01 12:17:00|      2.0|
|user 200|        books| 1026034203|2018-03-01 12:11:00|2018-03-01 12:15:00|      4.0|
|user 300|    notebooks|-2038065633|2018-03-01 12:14:00|2018-03-01 12:14:00|      0.0|
|user 100|mobile phones|  124119555|2018-03-01 12:06:00|2018-03-01 12:13:00|      7.0|
|user 100|    notebooks|  653119526|2018-03-01 12:15:00|2018-03-01 12:16:00|      1.0|
|user 300|mobile phones|  197472950|2018-03-01 12:05:00|2018-03-01 12:05:00|      0.0|
|user 300|    notebooks| 1939065348|2018-03-01 12:20:00|2018-03-01 12:20:00|      0.0|
+--------+-------------+-----------+-------

In [11]:
# 2.1 For each category find median session duration
df_unic.groupBy('category').agg(f.expr('percentile_approx(sessDurat, 0.5)').alias('sessDurMedian')).show()

+-------------+-------------+
|     category|sessDurMedian|
+-------------+-------------+
|        books|          4.0|
|    notebooks|          0.0|
|mobile phones|          0.0|
+-------------+-------------+



In [12]:
# 2.2 For each category find # of unique users spending less than 1 min, 1 to 5 mins and more than 5 mins

df_unic.where(f.col('sessDurat') < 1)\
        .groupBy('category')\
        .agg(f.countDistinct('userId').alias('NofUniqUserLess1M'))\
        .show()

df_unic.where(f.col('sessDurat') > 5)\
        .groupBy('category')\
        .agg(f.countDistinct('userId').alias('NofUniqUserMore5M'))\
        .show()

df_unic.where(f.col('sessDurat').between(1,5))\
        .groupBy('category')\
        .agg(f.countDistinct('userId').alias('NofUniqUser1to5M'))\
        .show()

+-------------+-----------------+
|     category|NofUniqUserLess1M|
+-------------+-----------------+
|    notebooks|                1|
|mobile phones|                1|
+-------------+-----------------+

+-------------+-----------------+
|     category|NofUniqUserMore5M|
+-------------+-----------------+
|        books|                1|
|mobile phones|                1|
+-------------+-----------------+

+---------+----------------+
| category|NofUniqUser1to5M|
+---------+----------------+
|    books|               1|
|notebooks|               2|
+---------+----------------+



In [13]:
# 2.3 For each category find top 10 products ranked by time spent by users on product pages -
#     this may require different type of sessions. For this particular task, session lasts until 
#     the user is looking at particular product. When particular user switches to another product the new session starts.

sel_cols = ['userId', 'category', 'product','sessId','sessProdStartDate','sessProdEndDate','sessProdDurat']
w_prod = Window.partitionBy(f.col('userId'),f.col('product')).orderBy(f.col("eventTime"))
w_prod_sess = Window.partitionBy(f.col('sessId'))

df_prod = df.withColumn('prevProd', f.lag('product').over(w_prod))\
         .withColumn('hash',f.hash(f.concat(f.col('userId'),f.col('category'),f.col('eventType'),f.col('eventTime'))))\
         .withColumn('sessId',f.when(
                                    f.col('prevProd')==f.col('product'),
                                    f.first('hash').over(w_prod))
                                .otherwise(f.col('hash')))\
         .withColumn("sessProdStartDate",f.min('eventTime').over(w_prod_sess))\
         .withColumn("sessProdEndDate",f.max('eventTime').over(w_prod_sess))\
         .withColumn("sessProdDurat",\
                         (f.col('sessProdEndDate').cast('long')-f.col('sessProdStartDate').cast('long'))/60)\

#leave only uniq sessions not each event 
df_prod_uniq = df_prod.select(sel_cols).dropDuplicates()

#select top_N products in category by session time
top_N = 10
w_prod_top = Window.partitionBy('category','product').orderBy(f.desc('sessProdDurat'))
df_prod_uniq.withColumn('rank',f.row_number().over(w_prod_top)).where(f.col('rank')<top_N).show()


+--------+-------------+--------------------+-----------+-------------------+-------------------+-------------+----+
|  userId|     category|             product|     sessId|  sessProdStartDate|    sessProdEndDate|sessProdDurat|rank|
+--------+-------------+--------------------+-----------+-------------------+-------------------+-------------+----+
|user 100|        books|    Java for Dummies|-1158420428|2018-03-01 12:02:00|2018-03-01 12:02:00|          0.0|   1|
|user 200|        books|    Romeo and Juliet| 1279730207|2018-03-01 12:15:00|2018-03-01 12:15:00|          0.0|   1|
|user 100|    notebooks|      MacBook Pro 15|  653119526|2018-03-01 12:15:00|2018-03-01 12:16:00|          1.0|   1|
|user 100|        books|   Scala for Dummies| -575256446|2018-03-01 12:00:00|2018-03-01 12:06:00|          6.0|   1|
|user 200|        books|Sherlock Holmes, ...| 1026034203|2018-03-01 12:11:00|2018-03-01 12:13:00|          2.0|   1|
|user 100|mobile phones|       iPhone 8 Plus|  124119555|2018-03