##Importing Libraries

In [0]:
import pandas as pd
import scipy.stats
from datetime import datetime
import matplotlib.pyplot as plt
import numpy as np
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SparkSession, functions as F
import pyspark.sql.functions as f
import os 
import math
import warnings
warnings.filterwarnings('ignore')
from pyspark.sql.types import *

## Data Exploration

In [0]:
item_features = spark.read.csv('/FileStore/tables/item_features.csv', header=True).withColumn(
    'feature_category_id',
    concat(
        lit('cat_'),
        col('feature_category_id')
    )
)

purchase =  spark.read.csv('/FileStore/tables/train_purchases.csv', header=True)
session =  spark.read.csv('/FileStore/tables/train_sessions.csv', header=True)

In [0]:
unique_features = item_features.groupBy('item_id').agg(
    count('item_id').alias('unique_items')
).count()

print(unique_features)

23691


##Unique Item Count

In [0]:
item_per_category = item_features.groupBy(
    'feature_category_id'
).agg(
    count('item_id').alias('item_per_category')
).orderBy(
    col('item_per_category').desc()
)
item_per_category.display()


feature_category_id,item_per_category
cat_47,23691
cat_56,23691
cat_50,23077
cat_68,23038
cat_61,22512
cat_72,20499
cat_69,20450
cat_7,20416
cat_55,17779
cat_30,15412


In [0]:
#to have item_id column later on
columns = ["feature_category_id"]
data = [["item_id"]]
additional_column = spark.createDataFrame(
    data=data
).toDF(
    *columns
)

#keeping most frequent items
freq_item = item_per_category.filter(
    col('item_per_category')>8000
).select(
    'feature_category_id'
    )

#heavily item per category list 
feature_indices = additional_column.unionByName(
    freq_item
).rdd.flatMap(
    lambda x: x
).collect()

freq_item.display()

feature_category_id
cat_47
cat_56
cat_50
cat_68
cat_61
cat_72
cat_69
cat_7
cat_55
cat_30


In [0]:
session = session.withColumn(
    'date_time',to_timestamp('date')
).withColumn(
    'date',to_date('date')
).dropDuplicates(
    subset = ['session_id' , 'item_id']
).withColumn(
    'dataset', lit('session')
)


session_timespent = session.groupBy(
    'session_id'
).agg(
    min('date_time').alias('min_time'),
    max('date_time').alias('max_time')
).withColumn(
    'DiffInSeconds',unix_timestamp("max_time")-unix_timestamp('min_time')
).withColumn(  
    'DiffInMinutes' , round(col('DiffInSeconds') / 60 , 2) 
)

In [0]:
session_timespent.display()

session_id,min_time,max_time,DiffInSeconds,DiffInMinutes
97872,2021-01-01T23:08:15.247+0000,2021-01-01T23:10:59.016+0000,164,2.73
123603,2021-04-13T20:46:35.506+0000,2021-04-13T20:56:28.975+0000,593,9.88
141386,2021-02-09T18:56:42.733+0000,2021-02-09T19:59:58.234+0000,3796,63.27
170392,2020-07-30T07:44:47.727+0000,2020-07-30T07:49:33.059+0000,286,4.77
183892,2021-01-11T23:09:50.300+0000,2021-01-11T23:10:39.973+0000,49,0.82
198542,2020-12-26T12:02:38.239+0000,2020-12-26T12:43:28.841+0000,2450,40.83
289064,2020-08-19T10:59:50.508+0000,2020-08-19T11:08:23.121+0000,513,8.55
292237,2020-10-30T21:25:49.896+0000,2020-10-30T21:26:01.065+0000,12,0.2
339370,2020-11-03T12:54:07.496+0000,2020-11-03T13:10:38.779+0000,991,16.52
358525,2021-04-01T19:34:49.066+0000,2021-04-01T19:44:29.016+0000,580,9.67


In [0]:
purchase = purchase.withColumn(
    'date_time',to_timestamp('date')
).withColumn(
    'date',to_date('date')
).dropDuplicates(
    subset = ['session_id' , 'item_id']
).withColumn(
    'dataset', lit('purchase')
)
purchase.display()

session_id,item_id,date,date_time,dataset
351,9683,2021-03-05,2021-03-05T19:45:49.050+0000,purchase
806,12516,2020-01-03,2020-01-03T01:32:39.331+0000,purchase
923,3630,2021-03-01,2021-03-01T22:59:24.967+0000,purchase
1076,13994,2020-11-30,2020-11-30T09:29:54.721+0000,purchase
1440,1812,2021-04-26,2021-04-26T17:28:44.642+0000,purchase
4252,25980,2020-07-16,2020-07-16T21:08:50.339+0000,purchase
7315,13381,2020-06-04,2020-06-04T17:21:19.121+0000,purchase
7903,13111,2020-04-12,2020-04-12T21:33:09.645+0000,purchase
8559,6511,2020-12-28,2020-12-28T15:50:31.342+0000,purchase
8938,9594,2021-05-30,2021-05-30T11:50:00.003+0000,purchase


In [0]:
#Create a dataframe with session and purchases for each item - to see which were in session and which was purchased 
all_session = session.unionByName(
    purchase
).orderBy(
    col('session_id').desc(),
    col('dataset').desc()
)

In [0]:
all_session.display()

session_id,item_id,date,date_time,dataset
999998,17058,2020-11-27,2020-11-27T14:54:34.758+0000,session
999998,12263,2020-11-27,2020-11-27T14:53:49.662+0000,session
999998,3122,2020-11-27,2020-11-27T14:54:09.786+0000,session
999998,4892,2020-11-27,2020-11-27T14:53:37.630+0000,session
999998,16962,2020-11-27,2020-11-27T14:51:03.401+0000,session
999998,1687,2020-11-27,2020-11-27T14:54:48.173+0000,session
999998,23611,2020-11-27,2020-11-27T14:53:15.818+0000,session
999998,17564,2020-11-27,2020-11-27T14:50:51.847+0000,session
999998,14190,2020-11-27,2020-11-27T14:52:37.901+0000,session
999998,291,2020-11-27,2020-11-27T14:51:39.026+0000,session


##Create a Test Dataset (Unseen Dataset)

In [0]:
# what is the latest date of the session
# last month will be validation data for out model -need to change it with last month 
val_data = all_session.filter(col('date') >= '2021-05-01')
val_data.count() 
#461813

Out[36]: 461813

##Create a Train Dataset

In [0]:
# taking of the data for training our model  - for computational reasons 
train_data = all_session.filter(
    (col('date') < '2021-05-01')
)
train_data.count() #4647037

Out[37]: 4647037

##Create  dataframe with items containing the most frequent  categories

In [0]:
#creating a pivot table that has the item id as rows  and the categories as columns  of the most frequent items and shows the category_value id
item_features_pivot = item_features.withColumn(
    'feature_value_id',col('feature_value_id').cast(IntegerType())
).groupBy(
    "item_id"
).pivot(
    "feature_category_id"
).sum(
    'feature_value_id'
)

items_dataset = item_features_pivot.select(
    feature_indices #most frequent categories
)

# this will show a df with the it_id | session_id | date| either session or purchase of the the items along with the categories(of the most frequent categories)

train_data_with_features = train_data.join(
    items_dataset,
    on='item_id',
    how='left'
).orderBy(
    col('session_id').desc(),
    col('dataset').desc()
)
train_data_with_features.display()

item_id,session_id,date,date_time,dataset,cat_47,cat_56,cat_50,cat_68,cat_61,cat_72,cat_69,cat_7,cat_55,cat_30,cat_4,cat_63,cat_32,cat_26,cat_73,cat_19,cat_17,cat_65,cat_46,cat_5,cat_3,cat_45,cat_59
16962,999998,2020-11-27,2020-11-27T14:51:03.401+0000,session,690,365,240.0,351.0,462.0,75.0,709.0,837.0,267.0,,618.0,861.0,902.0,77.0,544.0,765.0,378.0,,825.0,605.0,793.0,559.0,856.0
4892,999998,2020-11-27,2020-11-27T14:53:37.630+0000,session,603,365,240.0,351.0,462.0,75.0,885.0,837.0,267.0,,618.0,861.0,902.0,268.0,544.0,288.0,378.0,,825.0,605.0,793.0,559.0,856.0
23611,999998,2020-11-27,2020-11-27T14:53:15.818+0000,session,690,365,240.0,373.0,706.0,455.0,709.0,837.0,267.0,,618.0,861.0,902.0,268.0,544.0,765.0,378.0,,825.0,605.0,793.0,559.0,180.0
17564,999998,2020-11-27,2020-11-27T14:50:51.847+0000,session,690,365,240.0,596.0,462.0,75.0,709.0,837.0,267.0,,618.0,861.0,902.0,268.0,544.0,765.0,378.0,,825.0,605.0,793.0,559.0,856.0
291,999998,2020-11-27,2020-11-27T14:51:39.026+0000,session,690,365,240.0,373.0,706.0,371.0,709.0,837.0,129.0,669.0,618.0,861.0,902.0,268.0,544.0,765.0,378.0,,825.0,605.0,793.0,559.0,180.0
12263,999998,2020-11-27,2020-11-27T14:53:49.662+0000,session,603,365,240.0,373.0,462.0,75.0,885.0,837.0,267.0,,618.0,861.0,902.0,268.0,544.0,288.0,378.0,,825.0,605.0,793.0,559.0,856.0
14190,999998,2020-11-27,2020-11-27T14:52:37.901+0000,session,690,365,240.0,379.0,462.0,75.0,709.0,837.0,267.0,,618.0,861.0,902.0,268.0,544.0,292.0,378.0,,825.0,605.0,793.0,559.0,180.0
17058,999998,2020-11-27,2020-11-27T14:54:34.758+0000,session,549,365,240.0,351.0,706.0,75.0,709.0,837.0,267.0,,618.0,861.0,902.0,268.0,544.0,765.0,378.0,521.0,825.0,605.0,793.0,559.0,180.0
3122,999998,2020-11-27,2020-11-27T14:54:09.786+0000,session,690,365,240.0,875.0,706.0,455.0,709.0,837.0,267.0,,618.0,861.0,902.0,346.0,544.0,765.0,378.0,,825.0,605.0,793.0,559.0,180.0
1687,999998,2020-11-27,2020-11-27T14:54:48.173+0000,session,690,365,901.0,745.0,706.0,75.0,709.0,837.0,267.0,,618.0,861.0,902.0,268.0,544.0,112.0,378.0,,825.0,605.0,793.0,559.0,180.0


## Preprocessing of the Training Set

In [0]:
#computing the average of the feature value for each category for each session 
average_category_persession = train_data_with_features.groupBy(
    'session_id'
).agg(
    avg('cat_47').alias("avg_cat_47"),                       
    avg('cat_56').alias("avg_cat_56"),
    avg('cat_50').alias("avg_cat_50"),        
    avg('cat_68').alias("avg_cat_68"),
    avg('cat_61').alias("avg_cat_61"),
    avg('cat_72').alias("avg_cat_72"),
    avg('cat_69').alias("avg_cat_69"),
    avg('cat_7').alias("avg_cat_7"),
    avg('cat_55').alias("avg_cat_55"),
    avg('cat_30').alias("avg_cat_30"),
    avg('cat_4').alias("avg_cat_4"),
    avg('cat_63').alias("avg_cat_63"),
    avg('cat_32').alias("avg_cat_32"),
    avg('cat_26').alias("avg_cat_26"),
    avg('cat_73').alias("avg_cat_73"),
    avg('cat_19').alias("avg_cat_19"),
    avg('cat_17').alias("avg_cat_17"),
    avg('cat_65').alias("avg_cat_65"),
    avg('cat_46').alias("avg_cat_46"),
    avg('cat_5').alias("avg_cat_5"),
    avg('cat_3').alias("avg_cat_3"),
    avg('cat_45').alias("avg_cat_45"),
    avg('cat_59').alias("avg_cat_59")
  
)
#create the column, with avg Feautures
avg_item=freq_item.withColumn('feature_category_id',concat(lit('avg_'),'feature_category_id'))

#join pivot table with avg table
merged_df = train_data_with_features.join(
    average_category_persession,
    on='session_id',
    how='left'
)
#Replace null in pivot with avg value of the session of the feature
for i in range(freq_item.count()):
    merged_df= merged_df.withColumn(
    freq_item.collect()[i][0] , when(
        col(freq_item.collect()[i][0]).isNull() , col(avg_item.collect()[i][0])
    ).otherwise(
        col(freq_item.collect()[i][0])
    )
)
#keep initial pivot columns with updated values of null's    --each null will be replaced with the avg value of the feature of each session
sessions_features_means = merged_df.select(
    train_data_with_features.columns
)

#Normalize category feature values  by dividing the max value in each column (cat_*) --TAKES 20 MINUTES!
for i in range(freq_item.count()):
    
   sessions_features_means = sessions_features_means.withColumn(
   freq_item.collect()[i][0], (col(freq_item.collect()[i][0]) / sessions_features_means.select(max(freq_item.collect()[i][0])).collect()[0][0])   
)
# sessions_features_means.write.option(
#   "path", "/FileStore/tables/train_sessions_normalized_all/"
# ).saveAsTable(
#     "train_sessions_normalized_final"
#)
sessions_features_means.display()

item_id,session_id,date,date_time,dataset,cat_47,cat_56,cat_50,cat_68,cat_61,cat_72,cat_69,cat_7,cat_55,cat_30,cat_4,cat_63,cat_32,cat_26,cat_73,cat_19,cat_17,cat_65,cat_46,cat_5,cat_3,cat_45,cat_59
27202,1060904,2020-12-30,2020-12-30T11:02:10.838+0000,session,0.1085972850678733,0.4205069124423963,0.2663706992230855,0.044296788482835,0.7861915367483296,0.7417893544733862,0.6689265536723163,0.5393794749403341,0.7667804323094426,,0.2373271889400921,0.9706877113866968,1.0,0.3440308087291399,0.1672794117647058,0.1912144702842377,0.6156351791530945,0.6408364083640836,0.8656873032528857,0.9307692307692308,0.8920134983127109,0.6218020022246941,0.2102803738317757
8306,1060904,2020-12-30,2020-12-30T11:06:22.646+0000,purchase,0.6210407239819005,0.4205069124423963,0.2663706992230855,0.388704318936877,0.5144766146993318,0.0849377123442808,0.607909604519774,0.0023866348448687,0.3037542662116041,,0.2373271889400921,0.9706877113866968,1.0,0.3440308087291399,1.0,0.1912144702842377,0.6156351791530945,0.6408364083640836,0.8656873032528857,0.9307692307692308,0.8920134983127109,0.6218020022246941,0.2102803738317757
2188,1060999,2021-02-06,2021-02-06T09:10:09.702+0000,session,0.583710407239819,0.4205069124423963,0.2663706992230855,0.4928017718715393,0.5144766146993318,0.8471121177802945,0.8813559322033898,0.9522673031026252,0.3037542662116041,,0.2373271889400921,0.9706877113866968,,,1.0,,0.6156351791530945,0.7503075030750308,0.8656873032528857,,,,
9555,1060999,2021-02-06,2021-02-06T09:16:54.501+0000,purchase,0.1085972850678733,0.4205069124423963,0.3518312985571587,0.991140642303433,0.7861915367483296,0.7417893544733862,0.6689265536723163,0.5393794749403341,0.1467576791808873,,0.2373271889400921,0.9706877113866968,,,1.0,,0.6156351791530945,0.6408364083640836,0.8656873032528857,,,,
27225,1061068,2020-11-09,2020-11-09T13:20:04.495+0000,session,0.1391402714932126,0.4205069124423963,0.2352941176470588,0.388704318936877,0.5144766146993318,0.0849377123442808,1.0,0.4701670644391408,0.3037542662116041,,0.2373271889400921,0.9706877113866968,1.0,0.3440308087291399,1.0,0.9883720930232558,0.6156351791530945,0.6955719557195572,0.8656873032528857,0.9307692307692308,0.8920134983127109,0.6218020022246941,
26302,1061068,2020-11-09,2020-11-09T13:22:16.635+0000,session,0.0407239819004524,0.4205069124423963,0.2663706992230855,0.388704318936877,0.5144766146993318,0.0849377123442808,0.8813559322033898,0.0023866348448687,0.3037542662116041,,0.2373271889400921,0.9706877113866968,1.0,0.3440308087291399,1.0,0.9883720930232558,0.6156351791530945,0.6408364083640836,0.8656873032528857,0.9307692307692308,0.8920134983127109,0.6218020022246941,
5317,1061068,2020-11-09,2020-11-09T13:23:10.026+0000,purchase,0.0407239819004524,0.4205069124423963,0.2663706992230855,0.388704318936877,0.5144766146993318,0.0849377123442808,0.8813559322033898,0.0023866348448687,0.3037542662116041,,0.2373271889400921,0.9706877113866968,1.0,0.3440308087291399,1.0,0.9883720930232558,0.6156351791530945,0.7503075030750308,0.8656873032528857,0.9307692307692308,0.8920134983127109,0.6218020022246941,
20156,1061071,2020-09-15,2020-09-15T15:33:27.949+0000,session,0.5882352941176471,0.4205069124423963,1.0,0.8250276854928018,0.5144766146993318,,,,0.3037542662116041,,,,,,1.0,,,,,,,,
3393,1061071,2020-09-15,2020-09-15T15:33:32.967+0000,purchase,0.5882352941176471,0.4205069124423963,1.0,0.8250276854928018,0.5144766146993318,,,,0.3037542662116041,,,,,,1.0,,,,,,,,
8826,1061092,2020-09-03,2020-09-03T15:34:43.372+0000,session,0.9524886877828054,0.4205069124423963,0.2663706992230855,0.6600221483942414,0.7861915367483296,0.0849377123442808,0.7220338983050848,0.7386634844868735,0.3037542662116041,,0.1428571428571428,0.9470124013528748,1.0,0.3440308087291399,1.0,0.5665374677002584,0.6156351791530945,0.6773267732677326,0.8656873032528857,0.9307692307692308,0.8920134983127109,0.6218020022246941,0.3714953271028037


## Preprocessing of the Test set

In [0]:
# Join val_data and  the items dataset(the categories of the most frequent items) 
val_data_with_features = val_data.join(
    items_dataset,
    on='item_id',
    how='left'
).orderBy(
    col('session_id').desc(),
    col('dataset').desc()
)

import pyspark.sql.functions as f
#computing the average of the feature value for each category for each session 
val_average_category_persession = val_data_with_features.groupBy(
    'session_id'
).agg(
#     [avg(f.col(freq_item.collect()[i][0]))for i in range(freq_item.count())]
    avg('cat_47').alias("avg_cat_47"),                       
    avg('cat_56').alias("avg_cat_56"),
    avg('cat_50').alias("avg_cat_50"),        
    avg('cat_68').alias("avg_cat_68"),
    avg('cat_61').alias("avg_cat_61"),
    avg('cat_72').alias("avg_cat_72"),
    avg('cat_69').alias("avg_cat_69"),
    avg('cat_7').alias("avg_cat_7"),
    avg('cat_55').alias("avg_cat_55"),
    avg('cat_30').alias("avg_cat_30"),
    avg('cat_4').alias("avg_cat_4"),
    avg('cat_63').alias("avg_cat_63"),
    avg('cat_32').alias("avg_cat_32"),
    avg('cat_26').alias("avg_cat_26"),
    avg('cat_73').alias("avg_cat_73"),
    avg('cat_19').alias("avg_cat_19"),
    avg('cat_17').alias("avg_cat_17"),
    avg('cat_65').alias("avg_cat_65"),
    avg('cat_46').alias("avg_cat_46"),
    avg('cat_5').alias("avg_cat_5"),
    avg('cat_3').alias("avg_cat_3"),
    avg('cat_45').alias("avg_cat_45"),
    avg('cat_59').alias("avg_cat_59")
  
)
#create the column, with avg Feautures
#avg_item=freq_item.withColumn('feature_category_id',concat(lit('avg_'),'feature_category_id'))

#join pivot table with avg values  table
val_merged_df = val_data_with_features.join(
    val_average_category_persession, #df with the items with the most frequent categories 
    on='session_id',
    how='left'
)
#Replace null in pivot with avg value of the session of the feature
for i in range(freq_item.count()):
    val_merged_df= val_merged_df.withColumn(
    freq_item.collect()[i][0] , when(
        col(freq_item.collect()[i][0]).isNull() , col(avg_item.collect()[i][0])
    ).otherwise(
        col(freq_item.collect()[i][0])
    )
)    
#keep initial pivot columns with updated values of null's    --each null will be replaced with the avg value of the feature of each session
val_sessions_features_means =val_merged_df.select(
    val_data_with_features.columns
)
#Normalize category feature values  by dividing the max value in each column (cat_*) --TAKES 26 MINUTES!
for i in range(freq_item.count()):  
   val_sessions_features_means = val_sessions_features_means.withColumn(
   freq_item.collect()[i][0], (col(freq_item.collect()[i][0]) / val_sessions_features_means.select(max(freq_item.collect()[i][0])).collect()[0][0])   
)
val_sessions_features_means.write.option(
"path", "/FileStore/tables/validation_sessions_normalized_all"
).saveAsTable("validation_sessions_normalized_all")  

##Loading Tables and Feeding the ALS Model with the Train Table

In [0]:
train_table = spark.read.format('delta').load('/FileStore/tables/train_sessions_normalized_all')
val_table = spark.read.format('delta').load('/FileStore/tables/validation_sessions_normalized_all')
#has both sessions items and purchased items 

In [0]:
# join train_table with purchases  --enroll the purchased into the session dataset
sessions_features_with_purchased_items = train_table.join(
    purchase.drop(
        "date"
    ).withColumnRenamed("item_id","purchsed_item"),
    on="session_id",
    how='left'
)

cols_to_drop= ["date" ,"date_time", "purchsed_item", "dataset"]  # i have integrated the purchased into the session

train_data = sessions_features_with_purchased_items.drop(*cols_to_drop)

training_data = train_data.withColumnRenamed(
    'cat_47','x_cat_47'
).withColumnRenamed(
    'cat_56','x_cat_56'
).withColumnRenamed(
    'cat_50','x_cat_50'
).withColumnRenamed(
    'cat_68','x_cat_68'
).withColumnRenamed(
    'cat_61','x_cat_61'
).withColumnRenamed(
    'cat_72','x_cat_72'
).withColumnRenamed(
    'cat_69','x_cat_69'
).withColumnRenamed(
    'cat_7','x_cat_7'
).withColumnRenamed(
    'cat_55','x_cat_55'
).withColumnRenamed(
    'cat_30','x_cat_30'
).withColumnRenamed(
    'cat_4','x_cat_4'
).withColumnRenamed(
    'cat_63','x_cat_63'
).withColumnRenamed(
    'cat_32','x_cat_32'
).withColumnRenamed(
    'cat_26','x_cat_26'
).withColumnRenamed(
    'cat_73','x_cat_73'
).withColumnRenamed(
    'cat_19','x_cat_19'
).withColumnRenamed(
    'cat_17','x_cat_17'
).withColumnRenamed(
    'cat_65','x_cat_65'
).withColumnRenamed(
    'cat_46','x_cat_46'
).withColumnRenamed(
    'cat_5','x_cat_5'
).withColumnRenamed(
    'cat_3','x_cat_3'
).withColumnRenamed(
    'cat_45','x_cat_45'
).withColumnRenamed(
    'cat_59','x_cat_59'
).drop(*cols_to_drop)                                    

training_data.display()

session_id,item_id,x_cat_47,x_cat_56,x_cat_50,x_cat_68,x_cat_61,x_cat_72,x_cat_69,x_cat_7,x_cat_55,x_cat_30,x_cat_4,x_cat_63,x_cat_32,x_cat_26,x_cat_73,x_cat_19,x_cat_17,x_cat_65,x_cat_46,x_cat_5,x_cat_3,x_cat_45,x_cat_59
100010,23726,0.0407239819004524,0.4205069124423963,0.2663706992230855,0.388704318936877,0.7861915367483296,0.6308040770101925,0.6689265536723163,0.5393794749403341,0.800910125142207,,0.2853302611367127,0.9199549041713642,1.0,0.3440308087291399,1.0,0.9883720930232558,0.6156351791530945,0.6408364083640836,0.8656873032528857,0.9307692307692308,0.8920134983127109,0.1968854282536151,
100010,24874,0.9524886877828054,0.4205069124423963,1.0,0.388704318936877,0.5144766146993318,0.0849377123442808,0.6689265536723163,0.4940334128878281,0.3037542662116041,,0.2373271889400921,0.9706877113866968,1.0,0.3440308087291399,1.0,0.9883720930232558,0.6156351791530945,0.6408364083640836,0.8656873032528857,0.9307692307692308,0.8920134983127109,0.6218020022246941,
1000795,23065,0.7805429864253394,0.4205069124423963,0.2663706992230855,0.7774086378737541,0.5144766146993318,0.8471121177802945,0.8011299435028248,0.9988066825775656,0.3037542662116041,,0.2373271889400921,0.9706877113866968,1.0,0.4441591784338896,1.0,0.9883720930232558,0.6156351791530945,,0.8656873032528857,0.9307692307692308,0.8920134983127109,0.6218020022246941,0.2102803738317757
1000795,23241,0.7805429864253394,0.4205069124423963,0.2663706992230855,0.4130675526024363,0.7861915367483296,0.0849377123442808,0.8011299435028248,0.9988066825775656,0.3037542662116041,,0.2373271889400921,0.9706877113866968,1.0,0.3440308087291399,1.0,0.9883720930232558,0.6156351791530945,,0.8656873032528857,0.9307692307692308,0.8920134983127109,0.6218020022246941,0.2102803738317757
1000795,10782,0.7805429864253394,0.4205069124423963,0.2663706992230855,0.2259136212624584,0.7861915367483296,0.8471121177802945,0.8011299435028248,0.9988066825775656,0.9362912400455062,,0.2373271889400921,0.9706877113866968,1.0,0.4441591784338896,1.0,0.9883720930232558,0.6156351791530945,,0.8656873032528857,0.9307692307692308,0.8920134983127109,0.6218020022246941,0.4521028037383177
1000795,25855,0.7805429864253394,0.2753456221198156,0.2663706992230855,0.424141749723145,0.7861915367483296,0.5152887882219706,0.8011299435028248,0.9988066825775656,0.3037542662116041,,0.2373271889400921,0.9706877113866968,1.0,0.3440308087291399,1.0,0.9883720930232558,0.6156351791530945,,0.8656873032528857,0.9307692307692308,0.8920134983127109,0.6218020022246941,0.4521028037383177
1000795,24625,0.7805429864253394,0.4205069124423963,0.2663706992230855,0.4130675526024363,0.5144766146993318,0.0849377123442808,0.8011299435028248,0.9988066825775656,0.9362912400455062,,0.2373271889400921,0.9706877113866968,1.0,0.3440308087291399,1.0,0.9883720930232558,0.6156351791530945,,0.8656873032528857,0.9307692307692308,0.8920134983127109,0.6218020022246941,0.4521028037383177
1000795,11098,0.7805429864253394,0.4205069124423963,0.2663706992230855,0.9689922480620154,0.7861915367483296,0.5152887882219706,0.8011299435028248,0.9988066825775656,0.3037542662116041,,0.2373271889400921,0.9706877113866968,1.0,0.3440308087291399,1.0,0.1447028423772609,0.6156351791530945,,0.8656873032528857,0.9307692307692308,0.8920134983127109,0.6218020022246941,0.2102803738317757
1000795,26412,0.7805429864253394,0.4205069124423963,0.2663706992230855,0.9689922480620154,0.7861915367483296,0.5152887882219706,0.8011299435028248,0.9988066825775656,0.3037542662116041,,0.2373271889400921,0.9706877113866968,1.0,0.3440308087291399,1.0,0.9883720930232558,0.6156351791530945,,0.8656873032528857,0.9307692307692308,0.8920134983127109,0.6218020022246941,0.2102803738317757
100227,17376,0.1391402714932126,0.4205069124423963,0.3640399556048834,0.8239202657807309,0.5144766146993318,0.0849377123442808,1.0,0.4701670644391408,0.3037542662116041,0.2490430622009569,0.2373271889400921,0.9706877113866968,,,1.0,,,,,,,,


In [0]:
training_sum_vecs = training_data.na.fill(0).withColumn(
    "sum",
    ( col('x_cat_47')+
      col('x_cat_56')+
      col('x_cat_50')+
      col('x_cat_68')+
      col('x_cat_61')+
      col('x_cat_72')+
      col('x_cat_69')+
      col('x_cat_7')+
      col('x_cat_55')+
      col('x_cat_30')+
      col('x_cat_4')+
      col('x_cat_63')+
      col('x_cat_32')+
      col('x_cat_26')+
      col('x_cat_73')+
      col('x_cat_19')+
      col('x_cat_17')+
      col('x_cat_65')+
      col('x_cat_46')+
      col('x_cat_5')+
      col('x_cat_3')+
      col('x_cat_45')+
      col('x_cat_59')
    )
)
training_sum_vecs.display()
# df1.show()


session_id,item_id,x_cat_47,x_cat_56,x_cat_50,x_cat_68,x_cat_61,x_cat_72,x_cat_69,x_cat_7,x_cat_55,x_cat_30,x_cat_4,x_cat_63,x_cat_32,x_cat_26,x_cat_73,x_cat_19,x_cat_17,x_cat_65,x_cat_46,x_cat_5,x_cat_3,x_cat_45,x_cat_59,sum
1000415,20536,0.0407239819004524,0.4205069124423963,0.3518312985571587,0.8250276854928018,0.5144766146993318,0.0849377123442808,0.9096045197740112,0.639618138424821,0.3037542662116041,0.4110047846889952,0.2373271889400921,0.9706877113866968,0.5787139689578714,0.0564826700898587,1.0,0.1447028423772609,0.6156351791530945,0.6408364083640836,0.8656873032528857,0.9307692307692308,0.8920134983127109,0.6218020022246941,1.0,13.056143918364334
1000415,22146,0.583710407239819,0.4205069124423963,0.3518312985571587,0.991140642303433,0.7861915367483296,0.0849377123442808,1.0,0.7386634844868735,0.3037542662116041,0.4110047846889952,0.2373271889400921,0.9706877113866968,0.7893569844789357,0.100770218228498,1.0,0.1447028423772609,0.6156351791530945,0.7503075030750308,0.8656873032528857,0.9307692307692308,0.8087739032620922,0.6218020022246941,1.0,14.507561112171404
1000415,17503,0.6210407239819005,0.4205069124423963,0.3518312985571587,0.4130675526024363,0.7861915367483296,0.0849377123442808,0.9096045197740112,0.639618138424821,0.3037542662116041,0.4110047846889952,0.2373271889400921,0.9706877113866968,1.0,0.1450577663671373,1.0,0.1447028423772609,0.6156351791530945,0.6408364083640836,0.8656873032528857,0.9307692307692308,0.7255343082114736,0.6218020022246941,1.0,13.839597386822586
1000615,18068,0.7805429864253394,0.4205069124423963,0.2663706992230855,0.9689922480620154,0.5144766146993318,0.0849377123442808,0.8011299435028248,0.9988066825775656,0.3037542662116041,0.0,0.2373271889400921,0.9706877113866968,1.0,0.3440308087291399,1.0,0.9883720930232558,0.6156351791530945,0.6721138639957829,0.8656873032528857,0.9307692307692308,0.8920134983127109,0.6218020022246941,1.0,15.277956945276028
1000615,24896,0.583710407239819,0.4205069124423963,0.2663706992230855,0.8250276854928018,0.5484409799554566,0.0849377123442808,0.607909604519774,0.9522673031026252,0.3037542662116041,0.0,0.2446236559139785,0.9706877113866968,0.9297856614929784,0.2894736842105263,1.0,0.5734280792420327,0.6156351791530945,0.7503075030750308,0.8656873032528857,0.8966666666666667,0.8920134983127109,0.6218020022246941,0.7367601246105918,13.979796640073731
1000615,25893,0.2466063348416289,0.4205069124423963,0.2663706992230855,0.530454042081949,0.5144766146993318,0.0849377123442808,0.5638418079096045,0.9522673031026252,0.3037542662116041,0.0,0.2373271889400921,0.9706877113866968,1.0,0.3440308087291399,1.0,0.9883720930232558,0.6156351791530945,0.6408364083640836,0.8656873032528857,0.9307692307692308,0.8920134983127109,0.6218020022246941,1.0,13.990377117012391
1000615,11937,0.6210407239819005,0.4205069124423963,0.2663706992230855,0.530454042081949,0.5484409799554566,0.0849377123442808,0.5638418079096045,0.639618138424821,0.3037542662116041,0.0,0.2373271889400921,0.9706877113866968,0.5787139689578714,0.0166880616174582,1.0,0.7041343669250646,0.6156351791530945,0.6408364083640836,0.8656873032528857,0.9307692307692308,0.8920134983127109,0.6218020022246941,1.0,13.05326020247898
1000615,15651,0.7805429864253394,0.4205069124423963,0.2663706992230855,0.4130675526024363,0.5144766146993318,0.0849377123442808,0.8011299435028248,0.9988066825775656,0.3037542662116041,0.0,0.2373271889400921,0.9706877113866968,1.0,0.3440308087291399,1.0,0.3772609819121447,0.6156351791530945,0.6721138639957829,0.8656873032528857,0.9307692307692308,0.8920134983127109,0.6218020022246941,0.2102803738317757,13.321201512537112
1000615,8807,0.583710407239819,0.4205069124423963,0.2663706992230855,0.6600221483942414,0.5144766146993318,0.0849377123442808,0.607909604519774,0.9522673031026252,0.3037542662116041,0.0,0.2446236559139785,0.9706877113866968,0.9297856614929784,0.2894736842105263,1.0,0.5734280792420327,0.6156351791530945,0.7503075030750308,0.8656873032528857,0.8966666666666667,0.8920134983127109,0.6218020022246941,0.7367601246105918,13.780826737719048
1000615,6823,0.7805429864253394,0.4205069124423963,0.3518312985571587,0.6600221483942414,0.5144766146993318,0.0849377123442808,0.8011299435028248,0.9988066825775656,0.3037542662116041,0.0,0.2373271889400921,0.9706877113866968,1.0,0.3440308087291399,1.0,0.1912144702842377,0.6156351791530945,0.6721138639957829,0.8656873032528857,0.9307692307692308,0.8920134983127109,0.6218020022246941,1.0,14.257289822203308


In [0]:
#Unseen Data Set 
val_sessions_features_with_purchased_items = val_table.join(
    purchase.drop(
        "date").withColumnRenamed("item_id","purchsed_item"),
    on="session_id",
    how='left'
)
# cols_to_drop= ["date" ,"date_time" , "dataset",'cat_47','cat_56','cat_50','cat_68','cat_61','cat_72','cat_69' ,'cat_7','cat_55','cat_30','cat_4','cat_63','cat_32',
#                'cat_26','cat_73','cat_19','cat_17','cat_65','cat_46','cat_5','cat_3','cat_45','cat_59']
cols_to_drop= ["date" ,"date_time" , "dataset"]

test_data = val_sessions_features_with_purchased_items.withColumn(
    "session_id", col("session_id").cast(DoubleType())
).withColumn(
    "item_id", col("item_id").cast(DoubleType())
).withColumn(
    "purchsed_item", col("purchsed_item").cast(DoubleType())
).drop(*cols_to_drop)

test_data.display()

In [0]:
#FineTune the Model Using ParamGridBuilder
from pyspark.ml.tuning import  TrainValidationSplit , ParamGridBuilder
from pyspark.ml.recommendation import ALS 
from pyspark.ml.evaluation import RegressionEvaluator


#  split the training data into train and val(test)
training ,test  =training_sum_vecs.withColumn(
    "session_id", col("session_id").cast(DoubleType())
).withColumn(
    "item_id", col("item_id").cast(DoubleType())
).randomSplit([0.7, 0.3]) # training the model

als = ALS(
    implicitPrefs=True, 
    userCol="session_id", 
    itemCol= "item_id",
    ratingCol = "sum" , 
    coldStartStrategy="drop",
    nonnegative=True
)

param_grid = ParamGridBuilder().addGrid(als.rank , [15,20]).addGrid(als.maxIter , [5, 10]).addGrid(als.regParam , [0.02, 0.10]).build() 
evaluator2 = RegressionEvaluator(metricName="rmse",labelCol="sum",predictionCol="prediction")

#Build cross validation using TrainValidationSplit 
tvs = TrainValidationSplit(estimator=als , estimatorParamMaps =  param_grid , evaluator = evaluator2)

#fit ALS  to the training data 
model2 =tvs.fit(training)

#extract best model from the tuning excerice using ParamGridBuilder 
best_model= model2.bestModel

#Generate Predictins and evaluate the RMSE 

predictions2 = best_model.transform(test)
rmse2 = evaluator2.evaluate(predictions2)

print("RMSE = " + str(rmse2))
print("*** BEST MODEL ***")
print(" Rank: ", best_model.rank)
print(" MaxIter: ", best_model._java_obj.parent().getMaxIter())
print(" RegParam: ", best_model._java_obj.parent().getRegParam())


# RMSE = 12.215137151013003
# *** BEST MODEL ***
#  Rank:  15
#  MaxIter:  11
#  RegParam:  0.15


# RMSE = 12.232727214428332
# *** BEST MODEL ***
#  Rank:  8
#  MaxIter:  10
#  RegParam:  0.1


RMSE = 12.21054187852252
*** BEST MODEL ***
 Rank:  20
 MaxIter:  10
 RegParam:  0.1


In [0]:
# Save & load the model
rfPath = "/FileStore/tables/ALSFinal"
best_model.save(rfPath)
samealsModel = ALS.load(rfPath)
# Save and load model
model.save(sc, "target/tmp/myCollaborativeFilter")
sameModel = MatrixFactorizationModel.load(sc, "target/tmp/myCollaborativeFilter")
dbutils.fs.rm("/FileStore/tables/train_sessions_normalized_fina;.csv")


[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
[0;32m<command-3694853921792726>[0m in [0;36m<module>[0;34m[0m
[1;32m      2[0m [0mrfPath[0m [0;34m=[0m [0;34m"/FileStore/tables/ALSFinal"[0m[0;34m[0m[0;34m[0m[0m
[1;32m      3[0m [0mbest_model[0m[0;34m.[0m[0msave[0m[0;34m([0m[0mrfPath[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 4[0;31m [0msamealsModel[0m [0;34m=[0m [0mALS[0m[0;34m.[0m[0mload[0m[0;34m([0m[0mrfPath[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/spark/python/pyspark/ml/util.py[0m in [0;36mload[0;34m(cls, path)[0m
[1;32m    461[0m     [0;32mdef[0m [0mload[0m[0;34m([0m[0mcls[0m[0;34m,[0m [0mpath[0m[0;34m)[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[1;32m    462[0m         [0;34m"""Reads an ML instance from the input path, a shortcut of `read().load(path)`."""

In [0]:
results = best_model.recommendForAllUsers(5).withColumn(
    'next_item', explode('recommendations')
).withColumn(
    'item', col('next_item').item_id
).withColumn(
    'ranking', col('next_item').rating
).select('session_id','item','ranking')
# # get the name and the name in separate columns
# df=df.withColumn('name', F.col('exploded').getItem(0))
# df=df.withColumn('value', F.col('exploded').getItem(1))
results.display()

session_id,item,ranking
28,10390,0.44238698
28,18705,0.32970768
28,14550,0.3241707
28,412,0.32254535
28,25809,0.2926991
31,19882,0.29711872
31,2915,0.19427127
31,24874,0.19111024
31,18719,0.19042684
31,2188,0.1825643
