# InfoCepts Data Engineering Hiring Hackathon

In [1]:
#this is to find where spark is installed on my system
import findspark
findspark.init()

In [2]:
#import pyspark
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
from datetime import datetime
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.functions import udf,col
import xml.etree.ElementTree as ET
import xmltodict
import csv

In [3]:
#start the spark builder
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

## Data preparation (converting xml to csv as xml file too large for pandas)

In [4]:
df_file_location = '/home/deen/Desktop/infoproject/info hackathon/InfoCept Participants Data/InfoCept Participants Data/infocept_xml_data_.xml'

In [5]:
# Reading xml file
with open(df_file_location, 'r') as file:
    filedata = file.read()    

In [6]:
# Converting xml to python dictionary (ordered dict)    
data_dict = xmltodict.parse(filedata)

In [7]:
#we iterate true to match the r
retail_data_list = [dict(x) for x in data_dict["data"]["row"]]

In [8]:
retail_data_list 

[{'index': '0',
  'user_session': '8b69208e-7c64-422d-b670-5b9d82d25606',
  'total_clicks': '7',
  'total_items': '2',
  'total_cats': '1',
  'max_dwell': '0.5115078598666931',
  'mean_dwell': '5.995877926032465',
  'total_duration': '166.71117484170466',
  'click_rate': '0.0725244742626448',
  'day_of_week': '1',
  'is_weekend': '0',
  'is_special_day': '0',
  'time_of_day': '2',
  'cat_most_viewed_n_times': '2.0',
  'cat_most_viewed': '2053013560982667520',
  'prod_most_viewed_n_times': '1.0',
  'prod_most_viewed': '1030297',
  'cat_views_freqs': '0.2678021521567366',
  'prod_views_freqs': '0.0009451491729694',
  'cat_buys_freqs': '0.2337402332570538',
  'prod_buys_freqs': '0.0006900279353354',
  'is_purchase': '0',
  'start_time_ts': 'October 07, 2019 Monday, 00:08:28',
  'end_time_ts': 'October 26, 2019 Saturday, 16:08:29'},
 {'index': '1',
  'user_session': '8068596c-a2b8-46ca-8791-e10d70520a1f',
  'total_clicks': '7',
  'total_items': '2',
  'total_cats': '1',
  'max_dwell': '7.6

In [14]:
#the headers
HEADERS = ['index','user_session','total_clicks','total_items','total_cats','max_dwell','mean_dwell',
           'total_duration','click_rate','day_of_week','is_weekend','is_special_day','time_of_day',
           'cat_most_viewed_n_times','cat_most_viewed','prod_most_viewed_n_times','prod_most_viewed',
           'cat_views_freqs','prod_views_freqs','cat_buys_freqs','prod_buys_freqs','is_purchase',
           'start_time_ts','end_time_ts']

In [17]:
#convert to csv 

rows = []

for retail in retail_data_list:
    index = retail['index'] 
    user_session = retail['user_session']  
    total_clicks  = retail['total_clicks']  
    total_items = retail['total_items']  
    total_cats  = retail['total_cats']  
    max_dwell = retail['max_dwell']  
    mean_dwell = retail['mean_dwell']  
    total_duration = retail['total_duration']  
    click_rate = retail['click_rate']  
    day_of_week = retail['day_of_week']  
    is_weekend = retail['is_weekend']  
    is_special_day = retail['is_special_day']  
    time_of_day = retail['time_of_day']  
    cat_most_viewed_n_times = retail['cat_most_viewed_n_times']  
    cat_most_viewed = retail['cat_most_viewed']  
    prod_most_viewed_n_times = retail['prod_most_viewed_n_times']  
    prod_most_viewed = retail['prod_most_viewed']  
    cat_views_freqs = retail['cat_views_freqs']  
    prod_views_freqs = retail['prod_views_freqs']  
    cat_buys_freqs = retail['cat_buys_freqs']  
    prod_buys_freqs = retail['prod_buys_freqs']  
    is_purchase = retail['is_purchase']  
    start_time_ts = retail['start_time_ts']  
    end_time_ts = retail['end_time_ts']
    rows.append([index,user_session,total_clicks,total_items,total_cats,max_dwell,mean_dwell,total_duration,click_rate,day_of_week,is_weekend,is_special_day,time_of_day,cat_most_viewed_n_times,cat_most_viewed,prod_most_viewed_n_times,prod_most_viewed,cat_views_freqs,prod_views_freqs,cat_buys_freqs,prod_buys_freqs,is_purchase,start_time_ts,end_time_ts])
    
with open('retail_data.csv', 'w',newline="") as f:
    write = csv.writer(f)
    write.writerow(HEADERS)
    write.writerows(rows)

In [4]:
#read csv file created 
df = pd.read_csv('./retail_data.csv')

In [26]:
#check for missing values 
pd.isnull(df).sum()

index                       0
user_session                0
total_clicks                0
total_items                 0
total_cats                  0
max_dwell                   0
mean_dwell                  0
total_duration              0
click_rate                  0
day_of_week                 0
is_weekend                  0
is_special_day              0
time_of_day                 0
cat_most_viewed_n_times     0
cat_most_viewed             0
prod_most_viewed_n_times    0
prod_most_viewed            0
cat_views_freqs             0
prod_views_freqs            0
cat_buys_freqs              0
prod_buys_freqs             0
is_purchase                 0
start_time_ts               0
end_time_ts                 0
dtype: int64

In [21]:
#convert the data to a spark dataframe
df_1 = spark.createDataFrame(df)

In [23]:
#casting each data into their Datatype and we write the output as df_1
df_1 \
    .withColumn("index",F.col("index").cast(IntegerType())) \
    .withColumn("user_session",F.col("user_session").cast(StringType())) \
    .withColumn("total_clicks",F.col("total_clicks").cast(IntegerType())) \
    .withColumn("total_items",F.col("total_items").cast(IntegerType())) \
    .withColumn("total_cats",F.col("total_cats").cast(IntegerType())) \
    .withColumn("max_dwell",F.col("max_dwell").cast(DoubleType())) \
    .withColumn("mean_dwell",F.col("mean_dwell").cast(DoubleType())) \
    .withColumn("total_duration",F.col("total_duration").cast(DoubleType())) \
    .withColumn("click_rate",F.col("click_rate").cast(DoubleType())) \
    .withColumn("day_of_week",F.col("day_of_week").cast(IntegerType())) \
    .withColumn("is_weekend",F.col("is_weekend").cast(IntegerType())) \
    .withColumn("is_special_day",F.col("is_special_day").cast(IntegerType())) \
    .withColumn("time_of_day",F.col("time_of_day").cast(IntegerType())) \
    .withColumn("cat_most_viewed_n_times",F.col("cat_most_viewed_n_times").cast(IntegerType())) \
    .withColumn("cat_most_viewed",F.col("cat_most_viewed").cast(IntegerType())) \
    .withColumn("prod_most_viewed_n_times",F.col("prod_most_viewed_n_times").cast(IntegerType())) \
    .withColumn("prod_most_viewed",F.col("prod_most_viewed").cast(IntegerType())) \
    .withColumn("cat_views_freqs",F.col("cat_views_freqs").cast(DoubleType())) \
    .withColumn("prod_views_freqs",F.col("prod_views_freqs").cast(DoubleType())) \
    .withColumn("cat_buys_freqs",F.col("cat_buys_freqs").cast(DoubleType())) \
    .withColumn("prod_buys_freqs",F.col("prod_buys_freqs").cast(DoubleType())) \
    .withColumn("is_purchase",F.col("is_purchase").cast(IntegerType())) \
    .withColumn("start_time_ts",F.col("start_time_ts").cast(StringType())) \
    .withColumn("end_time_ts",F.col("end_time_ts").cast(StringType())) \
    .write.parquet('df_1', mode='overwrite')


In [5]:
#we read the new dataframe using the spark library
df_1 = spark.read.parquet('df_1')

### #excercise 1 . .
Process the "start_time_ts" column to create 
"time_spec_points" column. Allot exact points equaling the time 
of the day the user is surfing, but allot extra 5 points if they
are surfing after 4 o clock in the afternoon. For example: If 
the user starts surfing at 9 o clock in the morning allot 9 points,
but user surfing after 4 o clock in the afternoon will receive 21 points.

In [101]:
#function to get the hours and compare 
def compare(codate):
    codate = datetime.strptime(codate, "%B %d, %Y %A, %H:%M:%S")
    cotime = codate.time()
    hrs = "16:00:00"
    hrs = datetime.strptime(hrs, '%H:%M:%S')
    hrstime = hrs.time()
    print(hrstime)
    points = cotime.hour
    
    if cotime >= hrstime:
        return points + 5
    else: 
        return points
    
#convert function to UDF
compareudf = udf(lambda x: compare(x))

In [102]:
df_start_time_ts = df_1 \
    .select("start_time_ts","index") \
    .withColumn("time_spec_points", compareudf("start_time_ts"))

In [103]:
df_start_time_ts.show(10)

+--------------------+------+----------------+
|       start_time_ts| index|time_spec_points|
+--------------------+------+----------------+
|October 09, 2019 ...|374784|              23|
|October 01, 2019 ...|374785|               7|
|October 02, 2019 ...|374786|              22|
|October 06, 2019 ...|374787|              15|
|October 13, 2019 ...|374788|              11|
|October 04, 2019 ...|374789|               4|
|October 05, 2019 ...|374790|               6|
|October 04, 2019 ...|374791|              14|
|October 04, 2019 ...|374792|              14|
|October 01, 2019 ...|374793|               5|
+--------------------+------+----------------+
only showing top 10 rows



### excercise 2
Reduce the "total_duration" round to 2 decimal places: example (166.711175 -> 166.71) (0.935320 -> 0.94)

In [104]:
#round to 2dp
def round_up(x): 
    total = F.round(x,2)
    return total

In [105]:
df_total_duration = df_1 \
    .select("total_duration","index") \
    .withColumn("total_duration", round_up(("total_duration")))

In [106]:
df_total_duration.show(5)

+--------------+------+
|total_duration| index|
+--------------+------+
|         10.45|374784|
|          0.02|374785|
|         31.17|374786|
|          1.48|374787|
|          0.02|374788|
+--------------+------+
only showing top 5 rows



### excercise 3 
Add the values of columns  "total_clicks", "total_items", "total_cats" to create a new column: "total_inventory". If the value "total_inventory" is greater than 10, then make sure the value is increased by 100%.

In [107]:
#using UDF and lamba function
add_total = udf(lambda x,y,z: ((x + y + z) * 2) if (x + y + z) > 10  else (x + y + z))

In [108]:
df_total_inventory = df_1 \
    .select("total_clicks","total_items","total_cats") \
    .withColumn("total_inventory", add_total(("total_clicks"),("total_items"),("total_cats")))

In [109]:
df_total_inventory.show(5)

+------------+-----------+----------+---------------+
|total_clicks|total_items|total_cats|total_inventory|
+------------+-----------+----------+---------------+
|          23|          2|         1|             52|
|           4|          2|         1|              7|
|           7|          2|         2|             22|
|           2|          3|         1|              6|
|          23|          2|         1|             52|
+------------+-----------+----------+---------------+
only showing top 5 rows



### excercise 4 
Create a new column "give_big_discount". The value of '"give_big_discount" should be 1 if day_of_week is 0 and is_special_day is 1, otherwise 0.

In [110]:
#using lambda function to check condition
day_week  = udf(lambda x,y: 1 if (x == 0 and y == 1)   else 0 )

In [111]:
df_give_big_discount = df_1 \
    .select("index", "day_of_week", "is_special_day") \
    .withColumn("give_big_discount", day_week(("day_of_week"),("is_special_day")))

In [112]:
df_give_big_discount.show(500)

+------+-----------+--------------+-----------------+
| index|day_of_week|is_special_day|give_big_discount|
+------+-----------+--------------+-----------------+
|374784|          5|             0|                0|
|374785|          5|             0|                0|
|374786|          5|             0|                0|
|374787|          5|             1|                0|
|374788|          5|             0|                0|
|374789|          4|             0|                0|
|374790|          3|             0|                0|
|374791|          0|             0|                0|
|374792|          1|             0|                0|
|374793|          5|             0|                0|
|374794|          5|             0|                0|
|374795|          5|             0|                0|
|374796|          5|             0|                0|
|374797|          4|             0|                0|
|374798|          5|             0|                0|
|374799|          5|        

### excercise 5
Create a new column prod_views_buys_ratio which should be a ratio of prod_views_freqs and prod_buys_freqs

In [113]:
df_prod_views_buys_ratio = df_1 \
    .select("index", "prod_views_freqs", "prod_buys_freqs") \
    .withColumn("prod_views_buys_ratio", (df_1["prod_views_freqs"] / df_1["prod_buys_freqs"]))

In [114]:
df_prod_views_buys_ratio.show(5)

+------+-----------------+-----------------+---------------------+
| index| prod_views_freqs|  prod_buys_freqs|prod_views_buys_ratio|
+------+-----------------+-----------------+---------------------+
|374784|9.451096233615E-4|6.903277846087E-4|   1.3690737131451525|
|374785|9.451070075614E-4|7.388047267779E-4|    1.279237900498055|
|374786|9.629738263031E-4|6.906869834825E-4|   1.3942261101370517|
|374787|9.454285975401E-4|6.898934959078E-4|    1.370397899310607|
|374788|9.457117763393E-4|  6.8940186458E-4|   1.3717859276685451|
+------+-----------------+-----------------+---------------------+
only showing top 5 rows



### excercise 6 
Create a new column create loyalty_points which should be calculated based on the following conditions: if a user is spending more than 3 seconds on a Sunday -- and loyalty points score will be 10 multiplied seconds over 3.

In [115]:
def datecheck(codate):
    codate = datetime.strptime(codate, "%B %d, %Y %A, %H:%M:%S")
    cotimes = datetime.strftime(codate, "%A")
    return cotimes

In [116]:
#day_conv = udf(lambda x: day_conv(x))
checks = udf(lambda x: datecheck(x))

In [117]:
#function to check the if is a sunday and its more than 3 seconds 
loyal = udf(lambda x,y: ((10 * (x - 3)/3) if (x > 3 and y == 0) else 0))
#loyal = udf(lambda x,y: loyal(x,y))

In [118]:
df_loyalty_points = df_1 \
    .select("index","day_of_week", "start_time_ts", "end_time_ts","total_duration") \
    .withColumn("start_time_check", checks("start_time_ts")) \
    .withColumn("end_time_ts", checks("end_time_ts")) \
    .withColumn("loyalty_points", loyal(("total_duration"),("day_of_week"))) 

In [119]:
df_loyalty_points.show(100)

+------+-----------+--------------------+-----------+------------------+----------------+------------------+
| index|day_of_week|       start_time_ts|end_time_ts|    total_duration|start_time_check|    loyalty_points|
+------+-----------+--------------------+-----------+------------------+----------------+------------------+
|374784|          5|October 09, 2019 ...|  Wednesday| 10.44792380395843|       Wednesday|                 0|
|374785|          5|October 01, 2019 ...|     Monday|0.0211062899590174|         Tuesday|                 0|
|374786|          5|October 02, 2019 ...|   Thursday| 31.16607832140132|       Wednesday|                 0|
|374787|          5|October 06, 2019 ...|   Thursday|  1.48410645166671|          Sunday|                 0|
|374788|          5|October 13, 2019 ...|     Monday|0.0188690047126103|          Sunday|                 0|
|374789|          4|October 04, 2019 ...|    Tuesday| 3.164912354358772|          Friday|                 0|
|374790|          3

In [120]:
df_submit = df_1 \
    .select("index","is_special_day","prod_views_freqs","prod_buys_freqs","day_of_week","total_duration","total_clicks","total_items","total_cats","start_time_ts") \
    .withColumn("total_duration", round_up(("total_duration"))) \
    .withColumn("give_big_discount", day_week(("day_of_week"),("is_special_day"))) \
    .withColumn("prod_views_buys_ratio", (df_1["prod_views_freqs"] / df_1["prod_buys_freqs"])) \
    .withColumn("start_time_check", checks("start_time_ts")) \
    .withColumn("loyalty_points", loyal(("total_duration"),("day_of_week"))) \
    .withColumn("total_inventory", add_total(("total_clicks"),("total_items"),("total_cats"))) \
    .withColumn("time_spec_points", compareudf("start_time_ts")) 


In [121]:
df_submit.show()

+------+--------------+-----------------+-----------------+-----------+--------------+------------+-----------+----------+--------------------+-----------------+---------------------+----------------+------------------+---------------+----------------+
| index|is_special_day| prod_views_freqs|  prod_buys_freqs|day_of_week|total_duration|total_clicks|total_items|total_cats|       start_time_ts|give_big_discount|prod_views_buys_ratio|start_time_check|    loyalty_points|total_inventory|time_spec_points|
+------+--------------+-----------------+-----------------+-----------+--------------+------------+-----------+----------+--------------------+-----------------+---------------------+----------------+------------------+---------------+----------------+
|374784|             0|9.451096233615E-4|6.903277846087E-4|          5|         10.45|          23|          2|         1|October 09, 2019 ...|                0|   1.3690737131451525|       Wednesday|                 0|             52|      

In [122]:
df_sub = df_submit.toPandas()

In [123]:
df_sub

Unnamed: 0,index,is_special_day,prod_views_freqs,prod_buys_freqs,day_of_week,total_duration,total_clicks,total_items,total_cats,start_time_ts,give_big_discount,prod_views_buys_ratio,start_time_check,loyalty_points,total_inventory,time_spec_points
0,374784,0,0.000945,0.000690,5,10.45,23,2,1,"October 09, 2019 Wednesday, 18:53:59",0,1.369074,Wednesday,0,52,23
1,374785,0,0.000945,0.000739,5,0.02,4,2,1,"October 01, 2019 Tuesday, 07:16:43",0,1.279238,Tuesday,0,7,7
2,374786,0,0.000963,0.000691,5,31.17,7,2,2,"October 02, 2019 Wednesday, 17:50:25",0,1.394226,Wednesday,0,22,22
3,374787,1,0.000945,0.000690,5,1.48,2,3,1,"October 06, 2019 Sunday, 15:19:44",0,1.370398,Sunday,0,6,15
4,374788,0,0.000946,0.000689,5,0.02,23,2,1,"October 13, 2019 Sunday, 11:38:09",0,1.371786,Sunday,0,52,11
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
499995,124923,0,0.000946,0.000691,3,3.20,9,3,1,"October 02, 2019 Wednesday, 11:03:16",0,1.369345,Wednesday,0,26,11
499996,124924,0,0.000945,0.000690,4,5.84,6,3,10,"October 12, 2019 Saturday, 14:46:48",0,1.368773,Saturday,0,38,14
499997,124925,0,0.000945,0.000690,0,5.07,10,3,1,"October 05, 2019 Saturday, 07:34:43",0,1.369833,Saturday,6.900000000000001,28,7
499998,124926,0,0.000962,0.000690,4,10.54,17,2,1,"October 11, 2019 Friday, 22:37:47",0,1.393645,Friday,0,40,27


In [124]:
df_sub_sort = df_sub.sort_values(by='index', ascending=True)

In [125]:
cols = ["total_duration","is_special_day","prod_views_freqs","prod_buys_freqs","give_big_discount","prod_views_buys_ratio","loyalty_points","total_inventory","time_spec_points"]


In [126]:
df_Final = df_sub_sort[cols] 

In [127]:
df_Final.head(100)

Unnamed: 0,total_duration,is_special_day,prod_views_freqs,prod_buys_freqs,give_big_discount,prod_views_buys_ratio,loyalty_points,total_inventory,time_spec_points
375072,166.71,0,0.000945,0.000690,0,1.369726,0,10,0
375073,0.43,0,0.000946,0.000690,0,1.371440,0,10,15
375074,0.94,0,0.000945,0.000690,0,1.369958,0,50,26
375075,0.42,0,0.000945,0.000739,0,1.279051,0,5,26
375076,2.49,0,0.000945,0.000690,0,1.370596,0,7,13
...,...,...,...,...,...,...,...,...,...
375167,0.72,0,0.000945,0.000696,0,1.356926,0,5,27
375168,1.21,0,0.000947,0.000689,0,1.373984,0,78,11
375169,0.40,0,0.000961,0.000720,0,1.334614,0,22,23
375170,29.18,0,0.000945,0.000690,0,1.369492,87.26666666666667,44,3


In [133]:
df_Final.to_csv("submitt-3.csv",index=False)


In [134]:
df = pd.read_csv("submitt-3.csv")

In [135]:
df.head(100)

Unnamed: 0,total_duration,is_special_day,prod_views_freqs,prod_buys_freqs,give_big_discount,prod_views_buys_ratio,loyalty_points,total_inventory,time_spec_points
0,166.71,0,0.000945,0.000690,0,1.369726,0.000000,10,0
1,0.43,0,0.000946,0.000690,0,1.371440,0.000000,10,15
2,0.94,0,0.000945,0.000690,0,1.369958,0.000000,50,26
3,0.42,0,0.000945,0.000739,0,1.279051,0.000000,5,26
4,2.49,0,0.000945,0.000690,0,1.370596,0.000000,7,13
...,...,...,...,...,...,...,...,...,...
95,0.72,0,0.000945,0.000696,0,1.356926,0.000000,5,27
96,1.21,0,0.000947,0.000689,0,1.373984,0.000000,78,11
97,0.40,0,0.000961,0.000720,0,1.334614,0.000000,22,23
98,29.18,0,0.000945,0.000690,0,1.369492,87.266667,44,3
