In [0]:
## Marketplace Feature Table -- Summary
# Tools Used - Pyspark

#step-1) Load the user data and convert signup Date into Date format "yyyy-mm-dd"
#step-2) Load the visitor log data and preprocess to convert the dat into date format and convert all strings to lower case.
# step-3) Dropping columns
#  a) dropped webclient ID, city and country as they add no useful information.
#  b) Dropped all rows where there is no UserID. Since this information is may be of just visitors and we dont need this data.
# step-4) Imputing null values of visitor log data.
#  a) First fill the visit date column with most visited date for each user. The logic behind filling the null values of this column is that         the null values may belong to date that the user was most active on.
#   b) Fill the product ID column based on UserID and visit date. We find the most viewed product for each visit date and for each user and we       fill the null values with that product. The logic is for that visit date the user might be viewing the same product which is viewed the        most on that day. If there are still null values left then we fill the null valueswith top viewed product for each user.
#   c) Fill the activity column based on userID and product ID. We find the top most activity for each product id for each user. So we fill         the null values with that activity if there is product ID already present for that row for each user. If there are still null values           where there is no product ID present for that row that we fill those cells with top most activity for that user.
#step-5) There will be still some null values left because there may be Users who have no Visit date , product ID and activity present in the     cells. So these users might have not visited the site or there is no sufficient information for these users to fill the null values.
#   a) We fill the null values of the visit date column with top visited date for the entire table.
#   b) we fill the null values of productID column with top viewed product for the entire table.
#   c) we fill the null values of activity column with top most activity for entire table.

#Step-6) save the table for querying.

# step-7) Create input feature table .


In [0]:
from pyspark.sql.types import *

from pyspark.sql import functions as func
import re





df_user_data=spark.read.format("com.databricks.spark.csv").option("header", "true").option("inferschema", "true").option("delimiter",",").load('/FileStore/tables/userTable.csv')



In [0]:
df_user_data.show(truncate = False)

In [0]:
df_user_data.printSchema()

In [0]:
# convert Date to Date type. 

In [0]:
df_user_data = df_user_data.withColumn("Signup_Date",df_user_data['Signup Date'].cast(DateType()))

In [0]:
df_user_data=df_user_data.drop("Signup Date")

In [0]:
df_user_data.show(2)

In [0]:
df_user_data.printSchema()

In [0]:
# Load the visitor log data

In [0]:
df_visitor_log=spark.read.format("com.databricks.spark.csv").option("header", "true").option("inferschema", "true").option("delimiter",",").load('/FileStore/tables/VisitorLogsData.csv')

In [0]:
df_visitor_log.show(15,truncate = False)

In [0]:
df_visitor_log.printSchema()

In [0]:
# we drop all rows where there is no user ID in visitor log data

In [0]:
df_visitor_log=df_visitor_log.where(df_visitor_log["UserID"].isNotNull())

In [0]:
df_visitor_log.show(5)

In [0]:
#checking null values in visitor log table

In [0]:
df_visitor_log.select([func.count(func.when(func.col(c).isNull(), c)).alias(c) for c in df_visitor_log.columns]).show()

In [0]:
# drop webclientID, city and country as I feel there is not much information that is useful from these columns

In [0]:
df_visitor_log=df_visitor_log.drop("webClientID","City","Country")

In [0]:
df_visitor_log.show(5)

In [0]:
# convert VisitDate to date type in visitor log

In [0]:
df_visitor_log= df_visitor_log.withColumn("VisitDate",func.when(df_visitor_log.VisitDateTime.rlike("^[0-9]*$"),func.to_timestamp(func.col("VisitDateTime")/1e9).cast(DateType())).otherwise(df_visitor_log['VisitDateTime'].cast(DateType())))

In [0]:
df_visitor_log.show()

In [0]:
# Drop visitDateTime in visitor_log

In [0]:
#df_visitor_log=df_visitor_log.drop("VisitDateTime")

In [0]:
df_visitor_log.show(2)

In [0]:
# Recheck the null values count of visitdate

In [0]:
df_visitor_log.where(df_visitor_log["VisitDate"].isNull()).count()

In [0]:
# check if the unique UserID in user_data table and visitor_log table are same

In [0]:
df_user_data.select(func.countDistinct("UserID")).show()

In [0]:
df_visitor_log.select(func.countDistinct("UserID")).show()

In [0]:
# convert the text in all columns to lower case

In [0]:
df_visitor_log=df_visitor_log.select(func.lower(df_visitor_log["ProductID"]).alias("ProductID"),df_visitor_log["UserID"],func.lower(df_visitor_log["Activity"]).alias("Activity"),func.lower(df_visitor_log["Browser"]).alias("Browser"),func.lower(df_visitor_log["OS"]).alias("OS"),df_visitor_log["VisitDate"],df_visitor_log["VisitDateTime"])

In [0]:
df_visitor_log.show(10)

In [0]:
# Imputing the null values.
# 1) First we fill the null values of visit date column. Group by user ID and fill the top most visited day of each user for filling the null values of visit date for that user.

In [0]:
# convert df_visitor_log into table for sql queries

In [0]:
df_visitor_log.registerTempTable("df_visitor_log")

In [0]:
#Using Sql query find the top visited date for each user

In [0]:
df_visitdate_null=spark.sql("select UserID,VisitDate, count(VisitDate) as visit_count from df_visitor_log group by UserID,VisitDate order by visit_count desc")

In [0]:
df_visitdate_null.cache()

In [0]:
df_visitdate_null.show(5)

In [0]:
# Remove all rows that contain null values as they should not be counted as top visited date

In [0]:
df_visitdate_null=df_visitdate_null.where(df_visitdate_null["VisitDate"].isNotNull())

In [0]:
df_visitdate_null.select(func.countDistinct("UserID")).show()

In [0]:
#Extract only top visited date for each userID

In [0]:
df_visitdate_null_1=df_visitdate_null.groupBy("UserID").agg(func.first("VisitDate"), func.max("visit_count"))

In [0]:
df_visitdate_null_1.cache()

In [0]:
df_visitdate_null_1.show(5)

In [0]:
df_visitdate_null_1.count()

In [0]:
# fill the null values of visitdatetime in original table df_visitor_log with top visted dates for each user id

In [0]:
df_visitor_log=df_visitor_log.join(df_visitdate_null_1, on =["UserID"], how='left_outer')

In [0]:
df_visitor_log.show()

In [0]:
# lets check how many users has no visit date at all.

In [0]:
df_visitor_log.where(df_visitor_log["first(VisitDate)"].isNull()).select(func.countDistinct("UserID")).show()

In [0]:
df_visitor_log.where(df_visitor_log["first(VisitDate)"].isNull()).count()

In [0]:
# using coalesce fill the null values.

In [0]:
df_visitor_log=df_visitor_log.withColumn('visit_date', func.coalesce('VisitDate', 'first(VisitDate)'))

In [0]:
df_visitor_log=df_visitor_log.drop("VisitDate","first(VisitDate)","max(visit_count)")

In [0]:
df_visitor_log.cache()

In [0]:
df_visitor_log.show(5)

In [0]:
df_visitor_log.where(df_visitor_log["visit_date"].isNull()).count()

In [0]:
df_visitor_log.show(5)

In [0]:
# There are still 805 rows of data in visitor log with null values in visit_date which we will address later.
#2) we will fill null values for productID column. we group by userID and visit_date to find the top viewed product for each user and for each visit_date. Suppose if the visit_date for any of the user has no productID viewed , then we take the top most product ID viewed by the user to fill null values.

In [0]:
df_visitor_log.registerTempTable("df_visitor_log")

In [0]:
df_product_null=spark.sql("select UserID,visit_date,productID, count(productID) as product_count from df_visitor_log group by UserID,visit_date,productID order by product_count desc")

In [0]:
df_product_null.cache()

In [0]:
df_product_null.show()

In [0]:
# remove the rows with null values

In [0]:
df_product_null=df_product_null.where(df_product_null["productID"].isNotNull())

In [0]:
df_product_null.show()

In [0]:
# we extract the top viewed prodcut for each visit date for each user

In [0]:
df_product_null_1=df_product_null.groupBy("UserID","visit_date").agg(func.first("productID"), func.max("product_count"))

In [0]:
df_product_null_1.show()

In [0]:
df_product_null_1.select(func.countDistinct("UserID")).show()

In [0]:
# fill the null values of productID in original table df_visitor_log with top productID for each user id and for each visit date

In [0]:
df_visitor_log=df_visitor_log.join(df_product_null_1, on =["UserID","visit_date"], how='left_outer')

In [0]:
df_visitor_log.show()

In [0]:
# using colaese to fill the null values

In [0]:
df_visitor_log=df_visitor_log.withColumn('product_id', func.coalesce('ProductID', 'first(productID)'))

In [0]:
df_visitor_log=df_visitor_log.drop("ProductID","first(productID)","max(product_count)")

In [0]:
df_visitor_log.show()

In [0]:
# For all the users that doesnot have prodcut ID for any of the visit date, fill the null values of remaining cells of productID column by top most product viewed by each user

In [0]:
df_visitor_log.registerTempTable("df_visitor_log")

In [0]:
# extract top viewed product for each user

In [0]:
df_product_null_2=spark.sql("select UserID,product_id, count(product_id) as max_count from df_visitor_log group by UserID, product_id order by max_count desc")

In [0]:
df_product_null_2.cache()

In [0]:
df_product_null_2.show()

In [0]:
df_product_null_2=df_product_null_2.where(df_product_null_2["product_id"].isNotNull())

In [0]:
df_product_null_2=df_product_null_2.groupBy("UserID").agg(func.first("product_id"), func.max("max_count"))

In [0]:
df_product_null_2.show()

In [0]:
# joining the dataframe creted with User ID and their top visited product with original table to fill the null values for each user

In [0]:
df_visitor_log=df_visitor_log.join(df_product_null_2, on =["UserID"], how='left_outer')

In [0]:
df_visitor_log.show()

In [0]:
# using coalaesce to fill the null values

In [0]:
df_visitor_log=df_visitor_log.withColumn('prod_ID', func.coalesce('product_id', 'first(product_id)'))

In [0]:
df_visitor_log=df_visitor_log.drop("product_id","first(product_id)","max(max_count)")

In [0]:
df_visitor_log.show()

In [0]:
df_visitor_log.where(df_visitor_log["prod_id"].isNull()).count()

In [0]:
# There are still 582 unique users with no product viewed which we will address later.
#2) we will fill null values for Activity column. we group by userID and product_id to find the top viewed activity for each user and for each product_id. Suppose if the product_id for any of the user has no Activity , then we take the top most activity for each user to fill null values.

In [0]:
df_visitor_log.registerTempTable("df_visitor_log")

In [0]:
df_activity_null=spark.sql("select UserID,prod_id,Activity, count(Activity) as activity_count from df_visitor_log group by UserID,prod_id,Activity order by activity_count desc")

In [0]:
df_activity_null.cache()

In [0]:
df_activity_null.show()

In [0]:
#Extract only rows where there are no null values for Activity

In [0]:
df_activity_null=df_activity_null.where(df_activity_null["Activity"].isNotNull())

In [0]:
df_activity_null.select(func.countDistinct('UserID')).show()

In [0]:
#find the top activity for each user and for each product

In [0]:
df_activity_null_1=df_activity_null.groupBy("UserID","prod_id").agg(func.first("Activity"), func.max("activity_count"))

In [0]:
df_activity_null_1.show()

In [0]:
df_activity_null_1.select(func.countDistinct("UserID")).show()

In [0]:
# fill the null values of Activity in original table df_visitor_log with top activity for each user id and for each product

In [0]:
df_visitor_log=df_visitor_log.join(df_activity_null_1, on =["UserID","prod_id"], how='left_outer')

In [0]:
df_visitor_log.show()

In [0]:
# use coalesce to fill the null values

In [0]:
df_visitor_log=df_visitor_log.withColumn('Act', func.coalesce('Activity', 'first(activity)'))

In [0]:
df_visitor_log=df_visitor_log.drop("Activity","first(Activity)","max(activity_count)")

In [0]:
df_visitor_log.show(5)

In [0]:
df_visitor_log.where(df_visitor_log["Act"].isNull()).count()

In [0]:
# fill the remaining null values with top most activity for each user

In [0]:
df_visitor_log.registerTempTable("df_visitor_log")

In [0]:
df_activity_null_2=spark.sql("select UserID,Act, count(Act) as max_count from df_visitor_log group by UserID, Act order by max_count desc")

In [0]:
df_activity_null_2.cache()

In [0]:
df_activity_null_2.show()

In [0]:
df_activity_null_2=df_activity_null_2.where(df_activity_null_2["Act"].isNotNull())

In [0]:
df_activity_null_2.select(func.countDistinct("UserID")).show()

In [0]:
df_activity_null_2=df_activity_null_2.groupBy("UserID").agg(func.first("Act"), func.max("max_count"))

In [0]:
df_activity_null_2.count()

In [0]:
df_visitor_log=df_visitor_log.join(df_activity_null_2,on =["UserID"], how='left_outer')

In [0]:
df_visitor_log.show()

In [0]:
#use coleasce to fill null values

In [0]:
df_visitor_log=df_visitor_log.withColumn('Activity', func.coalesce('Act', 'first(Act)'))

In [0]:
df_visitor_log=df_visitor_log.drop("Act","first(Act)","max(max_count)")

In [0]:
df_visitor_log.cache()

In [0]:
df_visitor_log.show()

In [0]:
# Lets check how many null values left in the dataframe.

In [0]:
df_visitor_log.select([func.count(func.when(func.col(c).isNull(), c)).alias(c) for c in df_visitor_log.columns]).show()

In [0]:
#df_visitor_log.write.csv('/FileStore/tables/visitor_log_intermediate.csv',header = 'true')

In [0]:
# there are still few null values in prod_id, visit_date and Activity column. we fill the null values with the corresponding top most appeared value for that column or simply mode of that column.

In [0]:
#df_visitor_log=spark.read.format("com.databricks.spark.csv").option("header", "true").option("inferschema", "true").option("delimiter",",").load('/FileStore/tables/visitor_log_intermediate.csv')

In [0]:
df_visitor_log.registerTempTable("df_visitor_log")

In [0]:
spark.sql("select prod_id,count(prod_id) as count from df_visitor_log group by prod_id order by count desc limit 1").show()

In [0]:
spark.sql("select Activity,count(Activity) as count from df_visitor_log group by Activity order by count desc limit 1").show()

In [0]:
spark.sql("select visit_date,count(visit_date) as count from df_visitor_log group by visit_date order by count desc limit 1").show()

In [0]:
df_visitor_log=df_visitor_log.fillna( { 'prod_id':"pr101042", 'Activity':"click"} )

In [0]:
df_visitor_log=df_visitor_log.withColumn('visit_date_time',func.when(func.col('visit_date').isNull(),func.to_date(func.lit('07.05.2018'),'dd.MM.yyyy')).otherwise(func.col('visit_date')))

In [0]:
df_visitor_log.drop('visit_date')

In [0]:
df_visitor_log.select([func.count(func.when(func.col(c).isNull(), c)).alias(c) for c in df_visitor_log.columns]).show()

In [0]:
df_visitor_log=df_visitor_log.drop("visit_date")

In [0]:
# Convert the VisitDateTime column as timestamp type. This column is required for querying since we need to hours and minutes information to order by date.

In [0]:
df_visitor_log= df_visitor_log.withColumn("date",func.when(df_visitor_log.VisitDateTime.rlike("^[0-9]*$"),func.to_timestamp(func.col("VisitDateTime")/1e9)).otherwise(df_visitor_log['VisitDateTime']))

In [0]:
df_visitor_log.printSchema()

In [0]:
df_visitor_log.registerTempTable("df_visitor_log")

In [0]:
df_visitor_log=spark.sql('select UserID,prod_ID,Activity,Browser,OS,visit_date_time,CAST(date AS TIMESTAMP) from df_visitor_log')

In [0]:
df_visitor_log= df_visitor_log.withColumn("temp",func.to_timestamp(func.col("visit_date_time")))

In [0]:
df_visitor_log.show()

In [0]:
df_visitor_log=df_visitor_log.withColumn('date_timestamp', func.coalesce('date', 'temp'))

In [0]:
df_visitor_log=df_visitor_log.drop("date","temp")

In [0]:
df_visitor_log.select([func.count(func.when(func.col(c).isNull(), c)).alias(c) for c in df_visitor_log.columns]).show()

In [0]:
df_visitor_log.write.csv('/FileStore/tables/visitor_log_final1.csv',header = 'true')

In [0]:
# Save as text file to store the preprocessed data andfor further querying the data

In [0]:
df_visitor_data=spark.read.format("com.databricks.spark.csv").option("header", "true").option("inferschema", "true").option("delimiter",",").load('/FileStore/tables/visitor_log_final1.csv')

In [0]:
#df_visitor_data=df_visitor_log

In [0]:
display(df_visitor_data)

UserID,prod_ID,Activity,Browser,OS,visit_date_time,date_timestamp
U106593,pr100017,click,chrome mobile,android,2018-05-15,2018-05-15T06:48:15.248+0000
U108297,pr101008,pageload,chrome mobile,android,2018-05-23,2018-05-23T07:02:01.790+0000
U132443,pr100241,click,firefox,windows,2018-05-10,2018-05-10T06:28:53.391+0000
U134616,pr100495,click,chrome,windows,2018-05-08,2018-05-08T12:40:02.153+0000
U130784,pr100363,click,chrome,chrome os,2018-05-11,2018-05-11T15:35:43.689+0000
U120983,pr100340,click,chrome,windows,2018-05-19,2018-05-19T00:02:31.347+0000
U120287,pr100166,click,chrome,windows,2018-05-19,2018-05-19T04:51:45.337+0000
U124307,pr101042,click,chrome,mac os x,2018-05-07,2018-05-07T05:54:39.408+0000
U113937,pr101042,click,safari,mac os x,2018-05-23,2018-05-23T09:44:44.023+0000
U115735,pr101042,click,chrome,windows,2018-05-13,2018-05-13T13:17:03.751+0000


In [0]:
# Extract the user vintage for each user

In [0]:
df_input_feature=df_user_data.withColumn("User_Vintage", 
              func.datediff(func.to_date(func.lit("2018-05-28")),
                       df_user_data["Signup_Date"]))

In [0]:
df_input_feature.show()

In [0]:
df_input_feature=df_input_feature.drop("User Segment","Signup_Date")

In [0]:
df_input_feature.show()

In [0]:
## No of days user visited in last 7 days

In [0]:
df_visitor_data.registerTempTable("df_visitor_data")

In [0]:
# filter data where date is greater than 20-05-2018

In [0]:
df_no_of_days=df_visitor_data.filter(df_visitor_data["visit_date_time"]>func.to_date(func.lit("2018-05-20")))

In [0]:
df_no_of_days.cache()

In [0]:
df_no_of_days.select(func.countDistinct("UserID")).show()

In [0]:
# only 16172 users visited in last 7 days

In [0]:
df_no_of_days.registerTempTable("df_no_of_days")

In [0]:
df_no_of_days_visited=spark.sql("select UserID, count(Distinct(visit_date_time)) as no_of_days_visited_7_days from df_no_of_days group by UserID")

In [0]:
df_input_feature=df_input_feature.join(df_no_of_days_visited, on=["UserID"], how='left_outer')

In [0]:
# fill the cells of users who not visited in last 7 days as 0

In [0]:
df_input_feature=df_input_feature.fillna({'no_of_days_visited_7_days':0})

In [0]:
df_input_feature=df_input_feature.drop("User Segment","Signup_Date")

In [0]:
df_input_feature.show()

In [0]:
# Number of products viewed in the last 15 days for each user

In [0]:
df_no_of_products=df_visitor_data.filter(df_visitor_data["visit_date_time"]>func.to_date(func.lit("2018-05-12")))

In [0]:
df_no_of_products.registerTempTable("df_no_of_products")

In [0]:
df_no_of_prodcuts_viewed=spark.sql("select UserID, count(Distinct(prod_id)) as no_of_products_viewed_15_days from df_no_of_products group by UserID")

In [0]:
df_no_of_prodcuts_viewed.show()

In [0]:
df_input_feature=df_input_feature.join(df_no_of_prodcuts_viewed, on=["UserID"], how='left_outer')

In [0]:
df_input_feature.show()

In [0]:
# As we can see some users have not viewed any products in the last 15 days and for those fill with 0

In [0]:
df_input_feature=df_input_feature.fillna({'no_of_products_viewed_15_days':0})

In [0]:
df_input_feature.cache()

In [0]:
df_input_feature.show()

In [0]:
# Find the top viewed product in the last 15 days for each user

In [0]:
# first arrange the count of products viewed in the last 15 days for each user in desc order

In [0]:
df_most_viewed_product=spark.sql("select UserID,prod_id as Most_Viewed_product_15_Days,Activity, count(Activity) as activity_count from df_no_of_products  where Activity == 'pageload' group by UserID,Most_Viewed_product_15_Days,Activity order by activity_count desc")

In [0]:
df_most_viewed_product.cache()

In [0]:
df_most_viewed_product.select(func.countDistinct("UserID")).show()

In [0]:
df_most_viewed_product.show(20)

In [0]:
# second extract the visit date for each user and for each product

In [0]:
df_most_viewed_product2=spark.sql("select UserID,prod_ID as Most_Viewed_product_15_Days,date_timestamp from df_no_of_products where Activity=='pageload' group by UserID,Most_Viewed_product_15_Days,date_timestamp order by UserID")

In [0]:
df_most_viewed_product2.cache()

In [0]:
df_most_viewed_product2.show(50)

In [0]:
#Extract the most recent date for each product and for each user

In [0]:
df_most_viewed_product2=df_most_viewed_product2.groupBy("UserID","Most_Viewed_product_15_Days").agg(func.max("date_timestamp").alias("visit_time_date"))

In [0]:
df_most_viewed_product2.show(50)

In [0]:
# Join the most recent date with most viewed product table

In [0]:
df_most_viewed_product3=df_most_viewed_product.join(df_most_viewed_product2, on=["UserID","Most_Viewed_product_15_Days"], how="left_outer")

In [0]:
df_most_viewed_product3.sort("UserID").show(50)

In [0]:
df_most_viewed_product3.registerTempTable("df_most_viewed_product3")

In [0]:
# Extracting the top viewed product for each user ID

In [0]:
df_most_viewed_product4=spark.sql('select UserID,max(activity_count) as max_count from df_most_viewed_product3 group by UserID')

In [0]:
df_most_viewed_product4.cache()

In [0]:
df_most_viewed_product4.sort("UserID").show(50)

In [0]:
# we will join this to previous table to filter the most viewed products for each user and we may see User has multiple products which have siminumber of page loads.And for these users we filter based on most recent Date.

In [0]:
df_most_viewed_product5=df_most_viewed_product3.join(df_most_viewed_product4, on =["UserID"], how='left_outer')

In [0]:
df_most_viewed_product5.sort('UserID').show(50)

In [0]:
df_most_viewed_product6=df_most_viewed_product5.filter(df_most_viewed_product5["activity_count"]==df_most_viewed_product5["max_count"])

In [0]:
df_most_viewed_product6.sort("UserID").show(20)

In [0]:
df_most_viewed_product6.registerTempTable('df_most_viewed_product6')

In [0]:
# Arrange the table in desc order based on visit date so that we filter the first row for each user

In [0]:
#df_most_viewed_product6=spark.sql("select UserID,Most_Viewed_product_15_Days,activity_count,visit_time_date from df_most_viewed_product6 group by UserID,Most_Viewed_product_15_Days,activity_count,visit_time_date order by visit_time_date desc")

In [0]:
df_most_viewed_product6=df_most_viewed_product6.sort(func.desc("visit_time_date"))

In [0]:
df_most_viewed_product6.show(20)

In [0]:
df_most_viewed_product6.filter(df_most_viewed_product6["UserID"]=="U100009").show()

In [0]:
df_most_viewed_product7=df_most_viewed_product6.groupBy("UserID").agg(func.first("Most_Viewed_product_15_Days"),func.max("visit_time_date"))

In [0]:
df_most_viewed_product7.sort("UserID").show(20)

In [0]:
# Now we extracted the most viewed product for each user in the last 15 dfays

In [0]:
df_input_feature=df_input_feature.join(df_most_viewed_product7, on=["UserID"], how="left_outer")

In [0]:
df_input_feature.show()

In [0]:
# For the users with no product viewed fill with "Product101"

In [0]:
df_input_feature=df_input_feature.fillna({'first(Most_Viewed_product_15_Days)':"Product101"})

In [0]:
df_input_feature=df_input_feature.drop("max(visit_time_date)")

In [0]:
df_input_feature.cache()

In [0]:
# Find the most frequent used OS by the user

In [0]:
 df_active_os=spark.sql("select UserID, OS as Most_Active_OS, count(OS) as OS_count from df_visitor_data group by UserID,Most_Active_OS order by OS_count desc")

In [0]:
df_active_os.show()

In [0]:
df_active_os.select(func.countDistinct("UserID")).show()

In [0]:
df_active_os=df_active_os.groupBy("UserID").agg(func.first("Most_Active_OS"), func.max("OS_count"))

In [0]:
df_active_os.show()

In [0]:
df_input_feature=df_input_feature.join(df_active_os, on=["UserID"], how="left_outer")

In [0]:
df_input_feature.cache()

In [0]:
# Find the most recent viewed product by the user

In [0]:
df_recently_viewed=spark.sql("select UserID,date_timestamp,prod_id as Recently_Viewed_Product from df_visitor_data where Activity=='pageload' group by UserID,date_timestamp,Recently_Viewed_Product")

In [0]:
df_recently_viewed=df_recently_viewed.sort(func.desc("date_timestamp"))

In [0]:
df_recently_viewed=df_recently_viewed.groupBy("UserID").agg(func.first("Recently_viewed_product"), func.max("date_timestamp"))

In [0]:
df_recently_viewed.cache()

In [0]:
df_recently_viewed.show()

In [0]:
df_recently_viewed.where(df_recently_viewed["first(Recently_viewed_product)"].isNull()).count()

In [0]:
df_recently_viewed.select(func.countDistinct("UserID")).show()

In [0]:
df_input_feature=df_input_feature.join(df_recently_viewed, on=["UserID"], how="left_outer")

In [0]:
df_input_feature.show()

In [0]:
# For the users with no product viewed fill with "Product101"

In [0]:
df_input_feature=df_input_feature.fillna({"first(Recently_viewed_product)":"Product101"})

In [0]:
df_input_feature.cache()

In [0]:
# To find the no. of pageloads and no. of clicks in the last 7 days

In [0]:
df_activity_recent=df_visitor_data.filter(df_visitor_data["visit_date_time"]>func.to_date(func.lit("2018-05-20")))

In [0]:
df_activity_recent.registerTempTable("df_activity_recent")

In [0]:
#Extract the number of pageloads in the last 7 days for each user

In [0]:
df_pageload_recent=spark.sql("select UserID,Activity,count(Activity) as Pageloads_last_7_days from df_activity_recent where Activity=='pageload' group by UserID,Activity")

In [0]:
df_pageload_recent.show()

In [0]:
df_pageload_recent.count()

In [0]:
df_pageload_recent=df_pageload_recent.drop("Activity")

In [0]:
# Extract the number of clicks in the last 7 days for each user

In [0]:
df_click_recent=spark.sql("select UserID,Activity,count(Activity) as Clicks_last_7_days from df_activity_recent where Activity=='click' group by UserID,Activity")

In [0]:
df_click_recent=df_click_recent.drop("Activity")

In [0]:
df_click_recent.count()

In [0]:
df_input_feature=df_input_feature.join(df_pageload_recent, on=["UserID"], how="left_outer")

In [0]:
df_input_feature=df_input_feature.join(df_click_recent, on=["UserID"], how='left_outer')

In [0]:
df_input_feature.show()

In [0]:
df_input_feature=df_input_feature.drop("max(OS_count)","max(visit_date_time)")

In [0]:
df_input_feature.show()

In [0]:
# chack if there are any null values in the table

In [0]:
df_input_feature.select([func.count(func.when(func.col(c).isNull(), c)).alias(c) for c in df_input_feature.columns]).show()

In [0]:
# Fill the null values

In [0]:
df_input_feature=df_input_feature.drop("max(date_timestamp)")

In [0]:
df_input_feature=df_input_feature.fillna({'Pageloads_last_7_days':0,'Clicks_last_7_days':0})

In [0]:
df_input_feature.select([func.count(func.when(func.col(c).isNull(), c)).alias(c) for c in df_input_feature.columns]).show()

In [0]:
df_input_feature=df_input_feature.select(df_input_feature["UserID"],df_input_feature["no_of_days_Visited_7_Days"].alias("No_of_days_Visited_7_Days"),df_input_feature["no_of_products_viewed_15_days"].alias("No_Of_Products_Viewed_15_Days"),df_input_feature["User_Vintage"].alias("User_Vintage"),df_input_feature["first(Most_Viewed_product_15_Days)"].alias("Most_Viewed_product_15_Days"),df_input_feature["first(Most_Active_OS)"].alias("Most_Active_OS"),df_input_feature["first(Recently_viewed_product)"].alias("Recently_Viewed_Product"),df_input_feature["Pageloads_last_7_days"],df_input_feature["Clicks_last_7_days"])

In [0]:
df_input_feature=df_input_feature.sort("UserID")

In [0]:
df_input_feature.show()

In [0]:
df_final=df_input_feature

In [0]:
display(df_final)

UserID,No_of_days_Visited_7_Days,No_Of_Products_Viewed_15_Days,User_Vintage,Most_Viewed_product_15_Days,Most_Active_OS,Recently_Viewed_Product,Pageloads_last_7_days,Clicks_last_7_days
U100002,0,2,53,pr100258,android,pr100258,0,0
U100003,1,2,1021,pr100079,windows,pr100079,1,2
U100004,1,15,341,pr100753,windows,pr100753,1,0
U100005,1,3,681,pr100234,android,pr100234,2,0
U100006,1,1,55,pr101111,android,pr101111,1,0
U100007,0,0,460,Product101,windows,pr100265,0,0
U100008,6,19,395,pr100855,android,pr100962,26,34
U100009,4,13,78,pr102091,android,pr100640,5,4
U100012,2,6,124,pr100055,mac os x,pr100055,8,21
U100013,3,3,1687,pr100177,mac os x,pr100134,7,3
