## Load Libraries

In [47]:
import os
import pandas as pd
from datetime import datetime, timedelta

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window

## Initiate Spark Session and Set Spark Parameters

In [48]:
spark = SparkSession.builder.appName("June 2021").getOrCreate()

spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
spark.conf.set("spark.sql.session.timeZone", "IST")
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

key,value
spark.sql.legacy....,LEGACY


## Define Required Directories

In [49]:
PROJECT_DIR = "/media/bharathkancharla/techspace/Data Science World/Challenges/Analytical Vidhya/JOB-A-THON - June 2021"
DATA_DIR = os.path.join(PROJECT_DIR, "data")

In [50]:
ref_date = "28-May-2018"

## Support Functions

### Pandas date sequence function

In [51]:
def date_shift(ref, noofdayshift, as_string=False):
    shift_date = pd.to_datetime(ref) + timedelta(days=noofdayshift)
    if as_string:
        return str(shift_date)
    else:
        return shift_date

### ETL Pipeline Function

In [52]:
def feature_generation(usersData, visitorLogs):
    """
    This function build ETL Pipeline such that passing the information of user data and log data,
    It can generate the input feature table automatically

    Args:
        usersData ([DataFrame]): Registered user information like signup date and segment.
        visitorLogs ([DataFrame]):  Browsing log data of all the visitors and the users.

    Returns:
        [DataFrame]: feature table
    """
    ## Create a time User VS Total Timeline data frame
    timeline = pd.DataFrame(
        pd.date_range(start=date_shift(ref_date, -21), end=date_shift(ref_date, -1))
    )
    users_df = users.select("UserID").distinct().toPandas()

    timeline["key"] = 0
    users_df["key"] = 0

    userTimeline = users_df.merge(timeline, on="key", how="outer")
    userTimeline = userTimeline.rename(columns={0: "VisitDateTime"})
    userTimeline = userTimeline.drop("key", axis=1)

    spark.conf.set("spark.sql.session.timeZone", "UTC")
    userTimeline = spark.createDataFrame(userTimeline)

    spark.conf.set("spark.sql.session.timeZone", "IST")
    userTimeline = userTimeline.withColumn(
        "VisitDateTime", to_date(col("VisitDateTime").cast("date"))
    )

    merged_data = userTimeline.join(
        visitorLogs, on=["UserID", "VisitDateTime"], how="left"
    )

    final_merged_data = merged_data.join(usersData, on="UserID", how="left").orderBy(
        "UserID", desc("VisitDateTime")
    )
    final_merged_data = final_merged_data.withColumn(
        "last_7_days", date_add(to_date(lit(ref_date), format="dd-MMM-yyyy"), -7)
    )
    final_merged_data = final_merged_data.withColumn(
        "last_15_days", date_add(to_date(lit(ref_date), format="dd-MMM-yyyy"), -15)
    )
    final_merged_data = final_merged_data.withColumn(
        "Is_Active", when(col("webClientID").isNotNull(), 1).otherwise(0)
    )

    windowval = (
        Window.partitionBy("UserID")
        .orderBy(desc("VisitDateTime"))
        .rangeBetween(Window.unboundedPreceding, 0)
    )
    final_merged_data = final_merged_data.withColumn(
        "cum_sum", sum("Is_Active").over(windowval)
    )

    ## -------------------------------------Features Generation------------------------------------------------------------------
    # User_Vintage
    User_Vintage = (
        usersData.select("UserID", "Signup Date")
        .withColumn(
            "User_Vintage",
            datediff(
                to_date(lit(ref_date), format="dd-MMM-yyyy"), to_date("Signup Date")
            ),
        )
        .select("UserID", "User_Vintage")
    )

    # Most_Active_OS
    counts = final_merged_data.groupBy(["UserID", "OS"]).count()
    counts = counts.filter(col("OS").isNotNull()).orderBy("UserID", desc("count"))
    win = Window().partitionBy("UserID").orderBy(col("count").desc())
    result = (
        counts.withColumn("row_num", row_number().over(win))
        .where(col("row_num") == 1)
        .select("UserID", "OS")
    )
    Most_Active_OS = result.orderBy("UserID").withColumnRenamed("OS", "Most_Active_OS")

    # Recently_Viewed_Product
    Recently_Viewed_Product = (
        final_merged_data.filter(col("Activity") == "pageload")
        .filter(col("ProductID").isNotNull())
        .orderBy("UserID", desc("VisitDateTime"))
        .drop_duplicates(["UserID"])
        .select("UserID", "ProductID")
    ).withColumnRenamed("ProductID", "Recently_Viewed_Product")

    # No_of_days_Visited_7_Days
    last_7_days = final_merged_data.filter(
        (col("VisitDateTime") >= col("last_7_days")) & (col("webClientID").isNotNull())
    )

    No_of_days_Visited_7_Days = (
        final_merged_data.filter(
            (col("VisitDateTime") >= col("last_7_days"))
            & (col("webClientID").isNotNull())
        )
        .groupBy("UserID")
        .agg(countDistinct("VisitDateTime").alias("No_of_days_Visited_7_Days"))
    )

    # Pageloads_last_7_days & Clicks_last_7_days
    pageload_clicks = (
        last_7_days.filter(
            (col("VisitDateTime") >= col("last_7_days"))
            & (col("webClientID").isNotNull())
        )
        .groupBy("UserID")
        .pivot("Activity")
        .count()
        .drop("null")
    )

    # Most_Viewed_product_15_Days
    last_15_days = final_merged_data.filter(
        (col("VisitDateTime") >= col("last_15_days")) & (col("ProductID").isNotNull())
    ).orderBy("UserID", "VisitDateTime")

    counts = last_15_days.groupBy(["UserID", "ProductID"]).count()
    counts = counts.filter(col("ProductID").isNotNull()).orderBy(
        "UserID", desc("count")
    )
    win = Window().partitionBy("UserID").orderBy(col("count").desc())
    most_viewed_product = (
        counts.withColumn("row_num", row_number().over(win))
        .where(col("row_num") == 1)
        .drop_duplicates(["UserID"])
        .select("UserID", "ProductID")
    )
    most_viewed_product = most_viewed_product.withColumnRenamed(
        "ProductID", "Most_Viewed_product_15_Days"
    )

    no_products_viewed = last_15_days.groupBy("UserID").agg(
        expr("count(distinct ProductID)").alias("No_Of_Products_Viewed_15_Days")
    )

    useridDF = usersData.select("UserID").distinct()
    # Join Multiple DataFrames
    finalDF = (
        useridDF.join(No_of_days_Visited_7_Days, on=["UserID"], how="left")
        .join(no_products_viewed, on=["UserID"], how="left")
        .join(User_Vintage, on=["UserID"], how="left")
        .join(most_viewed_product, on=["UserID"], how="left")
        .join(Most_Active_OS, on=["UserID"], how="left")
        .join(Recently_Viewed_Product, on=["UserID"], how="left")
        .join(pageload_clicks, on=["UserID"], how="left")
    )

    finalDF = finalDF.withColumnRenamed("click", "Clicks_last_7_days")
    finalDF = finalDF.withColumnRenamed("pageload", "Pageloads_last_7_days")

    # If a user has not viewed any product then put it as Product101.
    finalDF = finalDF.na.fill(value="Product101", subset=["Most_Viewed_product_15_Days"])
    finalDF = finalDF.na.fill(value="Product101", subset=["Recently_Viewed_Product"])
    finalDF = finalDF.na.fill(value=0, subset=["No_of_days_Visited_7_Days","No_Of_Products_Viewed_15_Days", 
    "User_Vintage", "Clicks_last_7_days","Pageloads_last_7_days"])
    
    finalDF = finalDF.orderBy("UserID")

    return finalDF

## Load Data

In [53]:
visitor_logs = spark.read.csv(os.path.join(DATA_DIR, "VisitorLogsData.csv"), header=True, inferSchema=True)
users = spark.read.csv(os.path.join(DATA_DIR, "userTable.csv"), header=True, inferSchema=True)

visitor_logs

webClientID,VisitDateTime,ProductID,UserID,Activity,Browser,OS,City,Country
WI10000050298,2018-05-07 04:28:...,pr100631,,,Chrome Mobile,Android,Chennai,India
WI10000025922,2018-05-13 07:26:...,pr100707,,,Chrome,Windows,,Taiwan
WI100000204522,2018-05-11 11:43:...,pr100030,,click,Chrome,windows,Gurgaon,India
WI10000011974,2018-05-13 15:20:...,Pr100192,,CLICK,Chrome,Windows,,
WI100000441953,2018-05-08 20:44:...,Pr100762,,click,Chrome,mac os x,Iselin,United States
WI10000042224,2018-05-24 13:10:...,pr100109,,click,Chrome,Mac OS X,,Taiwan
WI100000532584,2018-05-23 14:52:...,pr100020,,click,Opera,Windows,Baku,Azerbaijan
WI100000377250,2018-05-13 03:13:...,Pr101495,,CLICK,Chrome,Windows,Bayan Lepas,Malaysia
WI10000031378,2018-05-10 02:04:...,Pr100013,,click,Chrome,Windows,Chakwal,Pakistan
WI100000278874,2018-05-25 13:40:...,Pr101701,,PAGELOAD,Chrome,Windows,,


In [54]:
users

UserID,Signup Date,User Segment
U133159,2018-04-14 07:01:...,C
U129368,2017-12-02 09:38:...,B
U109654,2013-03-19 11:38:...,B
U108998,2018-01-18 08:29:...,C
U131393,2018-03-27 08:05:...,B
U101889,2018-01-24 12:50:...,B
U113233,2017-03-18 09:27:...,B
U115728,2017-08-18 11:40:...,B
U132899,2018-03-14 08:32:...,B
U107492,2018-04-10 20:42:...,B


## EDA

### Filter out the visitor log data which doesn't contain `UserID` and `VisitDateTime`

In [55]:
visitor_logs = visitor_logs.filter(col("UserID").isNotNull())
visitor_logs = visitor_logs.filter(col("VisitDateTime").isNotNull())

### replace empty string to null for consistency across the data set

In [56]:
# Remove the 
visitor_logs = visitor_logs.select([when(trim(col(c))=="",None).otherwise(col(c)).alias(c) for c in visitor_logs.columns])
users = users.select([when(trim(col(c))=="",None).otherwise(col(c)).alias(c) for c in users.columns])

### Convert VisitDateTime into Date format

In [57]:
visitor_logs = visitor_logs.withColumn("VisitDateTime", when(col("VisitDateTime").contains("-"), to_date(col("VisitDateTime"))).\
    otherwise(to_timestamp(col('VisitDateTime')/1000000000).cast('date')))

### Filter Visitor data for 21 days

visitor_logs = visitor_logs.withColumn("end_date", date_add(to_date(lit(ref_date), format="dd-MMM-yyyy"), -1))
visitor_logs = visitor_logs.withColumn("start_date", date_add(to_date(lit(ref_date), format="dd-MMM-yyyy"), -21))

visitor_logs = visitor_logs.filter((col("VisitDateTime") >=col("start_date")) & (col("VisitDateTime") <= col("end_date")))
visitor_logs = visitor_logs.drop("end_date", "start_date")

### Normalize data in columns by converting the data to lowercase

In [58]:
cols_to_lower = ["ProductID","Activity", "Browser", "OS", "City", "Country"]

for c in cols_to_lower:
    visitor_logs = visitor_logs.withColumn(c, lower(col(c)))

In [59]:
for c in ["Activity", "Browser", "OS", "City", "Country"]:
    print(f"Unique of the column: {c}")
    visitor_logs.select(c).distinct().show()

Unique of the column: Activity
+--------+
|Activity|
+--------+
|    null|
|   click|
|pageload|
+--------+

Unique of the column: Browser
+--------------------+
|             Browser|
+--------------------+
|            chromium|
|       mobile safari|
|             firefox|
|      firefox mobile|
|       chrome mobile|
|          apple mail|
|              chrome|
|          qq browser|
|      yandex browser|
|                edge|
|          uc browser|
|             maxthon|
|        opera mobile|
|         amazon silk|
|mobile safari ui/...|
|             coc coc|
|    samsung internet|
|   chrome mobile ios|
|         firefox ios|
|chrome mobile web...|
+--------------------+
only showing top 20 rows

Unique of the column: OS
+---------+
|       OS|
+---------+
|  android|
|   ubuntu|
|   fedora|
|    linux|
|chrome os|
|      ios|
|    tizen|
| mac os x|
|  windows|
+---------+

Unique of the column: City
+-------------+
|         City|
+-------------+
| nieuw-vennep|
|        s

## Feature Generation

In [60]:
feature_table = feature_generation(users,visitor_logs)

feature_table

UserID,No_of_days_Visited_7_Days,No_Of_Products_Viewed_15_Days,User_Vintage,Most_Viewed_product_15_Days,Most_Active_OS,Recently_Viewed_Product,Clicks_last_7_days,Pageloads_last_7_days
U100002,0,2,53,pr100258,android,pr100258,0,0
U100003,1,2,1021,pr100079,windows,pr100079,2,1
U100004,1,15,341,pr100355,windows,pr100753,0,1
U100005,1,3,681,pr100234,android,pr100234,0,1
U100006,1,1,55,pr101111,android,pr101111,0,1
U100007,0,0,460,Product101,windows,pr100113,0,0
U100008,6,17,395,pr100855,android,pr100962,23,21
U100009,4,10,78,pr101070,android,pr100640,4,4
U100012,2,5,124,pr100055,mac os x,pr100055,14,5
U100013,3,3,1687,pr100177,mac os x,pr100134,3,5


In [61]:
feature_table.repartition(1).write.option("header",True).csv(os.path.join(PROJECT_DIR, "Results", "final_submission_updated2.csv"))

In [None]:
feature_table.toPandas().to_csv(os.path.join(PROJECT_DIR, "Results", "final_submission_pd_updated2.csv"), index=False)