In [None]:
import os
import math
import datetime
import pyspark.sql.functions as sf
from pyspark.sql.functions import desc
from pyspark.sql.window import Window
from pyspark import SparkConf, SparkContext
import itertools
from math import sqrt
from operator import add
from os.path import join, isfile, dirname
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
from pyspark.sql.types import TimestampType

In [None]:
# load job Clicks file into rdd
ratings_raw_data = sc.textFile("file:///home/cloudera/job_clicks.csv")
ratings_raw_data_header = ratings_raw_data.take(1)[0]
ratings_data = ratings_raw_data.filter(lambda line: line != ratings_raw_data_header).map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),int(tokens[1]),int(float(tokens[2])))).cache()


In [None]:
# load jobs category file into rdd
jobs_raw_data = sc.textFile("file:///home/cloudera/jobs.csv" )
jobs_raw_data_header = jobs_raw_data.take(1)[0]
print ("data size is ", ratings_data.count())

In [None]:
jobs_data = jobs_raw_data.filter(lambda line: line!=jobs_raw_data_header).map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),tokens[1])).cache()
print('Columns are:', jobs_raw_data_header)
jobs_data.take(3)

In [None]:
# Split data into train, validation and test datasets
rddTraining, rddValidating, rddTesting = ratings_data.randomSplit([6,2,2], seed=1001)

nbValidating = rddValidating.count()
nbTesting    = rddTesting.count()

print("Training: %d, validation: %d, test: %d" % (rddTraining.count(), nbValidating, rddTesting.count()))


In [None]:
#Model Training
# Here, I am using RMSE but for these kind of problems where we have implicit features, it is better to use ** Mean Percentage Ranking (MPR) **

def howFarAreWe(model, against, sizeAgainst):
    againstNoRatings = against.map(lambda x: (int(x[0]), int(x[1])) )
    againstWiRatings = against.map(lambda x: ((int(x[0]),int(x[1])), int(x[2])) )
    predictions = model.predictAll(againstNoRatings).map(lambda p: ( (p[0],p[1]), p[2]) )
    predictionsAndRatings = predictions.join(againstWiRatings).values()    
    return sqrt(predictionsAndRatings.map(lambda s: (s[0] - s[1]) ** 2).reduce(add) / float(sizeAgainst))

#finding best set of parameters
ranks  = [5,10]
reguls = [0.1, 1]
iters  = [5]
alpha = [10]

finalModel = None
finalRank  = 0
finalRegul = float(0)
finalIter  = -1
finalDist   = float(300)
finalAlpha = float(0)

In [None]:
#[START train_model]
for cRank, cRegul, cIter, cAlpha in itertools.product(ranks, reguls, iters, alpha):
    model = ALS.trainImplicit(rddTraining, cRank, cIter, float(cRegul),alpha=float(cAlpha))
    dist = howFarAreWe(model, rddValidating, nbValidating)
    if dist < finalDist:
        print(cIter, cRank,cAlpha,cRegul)
        print("Best so far:%f" % dist)
        finalModel = model
        finalRank  = cRank
        finalRegul = cRegul
        finalIter  = cIter
        finalDist  = dist
        finalAlpha  = cAlpha 

print("Rank %i" % finalRank) 
print("Regul %f" % finalRegul) 
print("Iter %i" % finalIter)  
print("Dist %f" % finalDist) 
print("Alpha %f" % finalAlpha)

In [None]:
#Model building with best set of parameters and predicting on test set

model = ALS.trainImplicit(rddTraining, rank=finalRank, iterations=finalIter, lambda_= float(finalRegul),alpha=float(finalAlpha))
# Calculate all predictions
rddTesting_withoutclicks = rddTesting.map(lambda r: ((r[0], r[1])))
predictions = model.predictAll(rddTesting_withoutclicks).map(lambda r: ((r[0], r[1]), (r[2])))
predictions.take(3)
# user id, node_id, actual clickss,pred clickss -> df below
rates_and_preds = rddTesting.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions) 
rates_and_preds.take(3)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print ('For testing data the RMSE is %s' % (error))

In [None]:
predictions.take(3)

In [None]:
rates_and_preds.take(3)

In [None]:
from pyspark.sql import SparkSession
x = rates_and_preds.map(lambda x : (x[0][0],x[0][1],x[1][0],x[1][1]))
hasattr(x, "toDF")
x.toDF().show(4)

In [None]:
#Get clicks (total and average) for each job by users
def get_counts_and_averages(ID_and_ratings_tuple):    
    nratings = len(ID_and_ratings_tuple[1]) 
    return ID_and_ratings_tuple[0], (nratings, sum([float(val) for val in ID_and_ratings_tuple[1]])/nratings)

In [None]:
job_ID_with_ratings_RDD = (ratings_data.map(lambda x: (x[1], x[2])).groupByKey())
job_ID_with_ratings_RDD_updated = job_ID_with_ratings_RDD.map(lambda x : (x[0], list(x[1])))
job_ID_with_avg_ratings_RDD = job_ID_with_ratings_RDD_updated.map(get_counts_and_averages)  # count and average rating
job_rating_counts_RDD = job_ID_with_avg_ratings_RDD.map(lambda x: (int(x[0]), x[1][0]))    # rating count per job
job_rating_counts_RDD.cache()
job_rating_counts_RDD.take(3)

In [None]:
# get user-wise jobs clicked
all_users_ratings_RDD = ratings_data.map(lambda x: (x[0], x[1])).groupByKey()
all_users_ratings_RDD = all_users_ratings_RDD.map(lambda x : (x[0], list(x[1])))    # jobs clicked by each user

### finding unrated jobs by each user- we will use this set for model's prediction/recommendations
job_ids = set(jobs_data.map(lambda x : x[0]).toLocalIterator()) # list of all job ids
unrated_jobs_RDD = all_users_ratings_RDD.map(lambda x: (x[0], list((job_ids) - set(x[1]))))

# #create user_id and unrated job id pairs
unrated_userjobs_RDD = unrated_jobs_RDD.flatMap(lambda x : [(x[0],i) for i in x[1]])

# # #model predictions for each user and not clicked job pairs
recommendations_RDD = model.predictAll(unrated_userjobs_RDD)
recommended_jobs_rating_RDD = recommendations_RDD.map(lambda x: (x.product,(x.user, x.rating)))
recommended_jobs_rating_RDD.cache()
print (recommended_jobs_rating_RDD.take(10))

In [None]:
#Join job title and clicks received for further filtering and recommendations
# #     # converting id into int for job_clicks_count RDD to perform join
# job_clicks_counts_RDD_updated = job_clicks_counts_RDD.map(lambda x: (int(x[0]), x[1]))

# join job name with job id, predicted rating for job and total number of ratings received by each job
recommendations_rating_title_and_count_RDD = recommended_jobs_rating_RDD.join(jobs_data).join(job_rating_counts_RDD)
recommendations_rating_title_and_count_RDD = recommendations_rating_title_and_count_RDD.map(lambda r: (r[0], r[1][0][1], r[1][0][0][0],round(r[1][0][0][1],2),r[1][1]))
recommendations_rating_title_and_count_RDD = recommendations_rating_title_and_count_RDD.map(lambda x: (x[2],(x[0],x[1], x[3],x[4])))
recommendations_rating_title_and_count_RDD.take(3)

In [None]:
#Top recommendations
# filter only those jobs which have been clicked by atleast 20 users
# take only top5 jobs by sorting based on preference confidence
top_jobs = recommendations_rating_title_and_count_RDD.groupBy(lambda x : x[0])\
                               .map(lambda x : list(x[1]))\
                               .map(lambda r: [i for i in r if i[1][3] > 20])\
                               .map(lambda a: [i for i in sorted(a, key=lambda x: -x[1][2])[:5]])   

#preparing dataframe to insert in Database
rec_jobs_df = top_jobs.map(lambda x: [(i[0],i[1][0],i[1][1],i[1][2]) for i in x]).flatMap(lambda x: x).toDF()\
                                .withColumnRenamed("_1", "user_id")\
                                .withColumnRenamed("_2", 'job_recommendations')\
                                .withColumnRenamed("_3", 'job_category')\
                                .withColumnRenamed("_4", 'preference_confidence')\
                                .withColumnRenamed("_5", "total_clicks")
                
# #final recommendation engine dataframe to be saved in Database
final_df_rec_eng = rec_jobs_df.withColumn("rec_date", sf.lit(datetime.datetime.now()).cast(TimestampType()))   
final_df_rec_eng = final_df_rec_eng.withColumn("rec_number", sf.row_number().over(Window.partitionBy("user_id").orderBy(desc("preference_confidence"))))    
final_df_rec_eng.show(15)