In [2]:
import pandas as pd
import numpy as np
import glob
import math
import csv
from os import listdir
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.mllib.recommendation import ALS

In [5]:
'''
Step 1 - Set up the Spark
'''
conf = pyspark.SparkConf()\
        .setAppName('appName')\
        .set('spark.executor.memory', '80G')\
        .set('spark.driver.memory', '80G')\
        .set('spark.driver.maxResultSize', '80G')\
        .setMaster('local[*]')

sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)
#sc.getConf().getAll()

In [6]:
'''
Step 2 - Combine all users into one df & give each user a user_id
'''

# Get all text files in data folder
files = []
for file in glob.glob("data/*.txt"):
    files.append(file)
    
# Put all the files into one dataframe and give each user an ID
users_dict = []
df = pd.DataFrame()
for idx,file in enumerate(files): 
    file_name = file[6:12]
    file_dict = [file_name,idx+1]
    users_dict.append(file_dict)
    df_file = pd.read_csv(file, sep = '\t', header=None)
    df_file["user_id"] = idx + 1
    df = df.append(df_file, ignore_index=True)
df.columns = ['s_vertex_id','e_vertex_id', 'time', 'freq', 'user_id']

# Save the users_dict
with open('users_dict.csv', 'w', newline='') as resultFile:
    wr = csv.writer(resultFile, dialect='excel')
    wr.writerows(users_dict)

df.head(5)

Unnamed: 0,s_vertex_id,e_vertex_id,time,freq,user_id
0,203492,217680,0,1,1
1,217680,217681,0,1,1
2,217681,217682,0,1,1
3,217682,196445,0,1,1
4,196445,196446,0,1,1


In [7]:
'''
Step 3 - Set a behavior_id for each ['s_vertex_id','e_vertex_id',time] pair
'''

behavior_dict = df.copy()
behavior_dict.drop(['freq', 'user_id'], axis=1, inplace=True)
behavior_dict.drop_duplicates(subset=['s_vertex_id', 'e_vertex_id', 'time'], keep="first", inplace=True)
behavior_dict.reset_index(drop = True, inplace = True)

indices = list(range(1, len(behavior_dict) + 1))
behavior_dict['behavior_id'] = indices

behavior_dict_w  = behavior_dict.copy()
behavior_dict_w = behavior_dict_w.values.tolist()
behavior_dict_w = list(map(lambda tokens: tokens[:4], behavior_dict_w))

# Save the behavior dict
with open('behavior_dict.csv', 'w', newline='') as resultFile:
    wr = csv.writer(resultFile, dialect='excel')
    wr.writerows(behavior_dict_w)

# Join the item_dict with the dataframe
df = pd.merge(df, behavior_dict, on=['s_vertex_id', 'e_vertex_id','time'], how='inner')
df.head(5)

Unnamed: 0,s_vertex_id,e_vertex_id,time,freq,user_id,behavior_id
0,203492,217680,0,1,1,1
1,203492,217680,0,1,23,1
2,203492,217680,0,1,24,1
3,203492,217680,0,1,25,1
4,217680,217681,0,1,1,2


In [9]:
'''
Step 4 - Create the prediction RDD

'''

num_users = len(files)
num_behaviors = len(behavior_dict)

users_id = list(range(1, num_users+1))
behaviors_RDD = sc.range(1, num_behaviors+1)


prediction_RDD = behaviors_RDD.flatMap(lambda b: map(lambda u: (u,b), users_id) )
prediction_RDD.take(5)

[(1, 1), (2, 1), (3, 1), (4, 1), (5, 1)]

In [10]:
'''
Step 5 - Create the training RDD
'''

df = df.drop(['segment_id', 'time'], axis = 1)
df.dropna()
df = df[['user_id', 'behavior_id', 'freq']]
df = df.values.tolist()
df = list(map(lambda tokens: tuple(tokens), df))
training_RDD = sc.parallelize(df)
training_RDD.take(5)

[(1, 1, 1), (6, 1, 1), (16, 1, 1), (18, 1, 1), (24, 1, 1)]

In [11]:
'''
Step 6 - Set the params for training
'''
seed = 5
iterations = 10
regularization_parameter = 0.1
ranks = [15, 20, 25, 30]
errors = []
err = 0
tolerance = 0.02
nonnegative = True
# alpha â€“ A constant used in computing confidence. (default: 0.01)
alpha = 0.01

In [12]:
'''
Step 7 - Using ALS to train a model
'''
for idx,rank in enumerate(ranks):
    model = ALS.trainImplicit(training_RDD, rank=rank, seed=seed, iterations=iterations, lambda_=regularization_parameter, nonnegative=nonnegative, alpha=alpha)
    predictions = model.predictAll(prediction_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = training_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors.append(error)

for i in range(len(ranks)):
    print('Rank %d | RMSE %f' %(ranks[i], errors[i]))
    
if len(ranks) > 1:
    # Get best rank
    best_rank = ranks[errors.index(min(errors))]
    model = ALS.trainImplicit(training_RDD, rank=best_rank, seed=seed, iterations=iterations, lambda_=regularization_parameter, nonnegative=nonnegative, alpha=alpha)
    predictions = model.predictAll(prediction_RDD).map(lambda r: ((r[0], r[1]), r[2]))

Rank 15 | RMSE 0.845529
Rank 20 | RMSE 0.763278
Rank 25 | RMSE 0.678170
Rank 30 | RMSE 0.603680


In [13]:
'''
Step 8 - Save the prediction
'''
predictions = predictions.sortByKey().map(lambda x: (x[0][0], x[0][1], x[1]))
predictions.coalesce(1).saveAsTextFile("predictions.csv")