In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [3]:
spark = SparkSession.builder.appName('movie-ALS-val').getOrCreate()

In [4]:
data = spark.sparkContext.textFile('file:///d:/sparkcourse/Recommendation-movies/ml-100k/u.data').map(lambda x : x.split()).map(lambda x: (int(x[0]),int(x[1]),int(x[2]),int(x[3])))

In [5]:
df = data.toDF(['user','movie','rating','timestamp'])

In [6]:
df.show()

+----+-----+------+---------+
|user|movie|rating|timestamp|
+----+-----+------+---------+
| 196|  242|     3|881250949|
| 186|  302|     3|891717742|
|  22|  377|     1|878887116|
| 244|   51|     2|880606923|
| 166|  346|     1|886397596|
| 298|  474|     4|884182806|
| 115|  265|     2|881171488|
| 253|  465|     5|891628467|
| 305|  451|     3|886324817|
|   6|   86|     3|883603013|
|  62|  257|     2|879372434|
| 286| 1014|     5|879781125|
| 200|  222|     5|876042340|
| 210|   40|     3|891035994|
| 224|   29|     3|888104457|
| 303|  785|     3|879485318|
| 122|  387|     5|879270459|
| 194|  274|     2|879539794|
| 291| 1042|     4|874834944|
| 234| 1184|     2|892079237|
+----+-----+------+---------+
only showing top 20 rows



In [7]:
def trainTestSplit(df,time):
    test = df.select('*').where(col('timestamp')>time)
    train = df.select('*').where(col('timestamp')<=time)
    return(train.drop(col('timestamp')).cache(),test.drop(col('timestamp')).cache())
    

In [8]:
trainDf,testDf = trainTestSplit(df,890000000) #Set above 15 Mar 1998 as test set and those before as train set
print('TrainLength '+str(trainDf.count())+' | TestLength '+str(testDf.count()))

TrainLength 82281 | TestLength 17719


In [49]:
from pyspark.ml.recommendation import ALS
from pyspark.mllib.evaluation import RankingMetrics
from pyspark.ml.evaluation import RankingEvaluator

In [10]:
def combineRddsForRankingMeasure(predDf):
    pred = predDf.select('user','movie').orderBy(col('prediction').desc())
    act = predDf.select('user','movie').orderBy(col('rating').desc()).where(col('rating')>=3)#Act thres for good ratings
    predRdd = pred.rdd.map(tuple).groupByKey().mapValues(list)
    actRdd = act.rdd.map(tuple).groupByKey().mapValues(list)
    predAct = predRdd.leftOuterJoin(actRdd)
    return(predAct)

In [11]:
#def changeToDf(predActRdd):
#    user = predActRdd[0]
#    items = predActRdd[1]
#    df = (user,items[0],items[1]).toDF(['user','prediction','actual'])
#    return(df)

In [12]:
def dropKey(data):
    predAct = data[1]
    return(predAct)

In [13]:
def findBestFitRank(itera,rankArray,trainDf):
    highPrec = 0
    flagRank = 0
    for rank in rankArray:
        als = ALS(maxIter=itera,regParam=0.01,rank=rank,implicitPrefs=False,userCol="user",itemCol="movie",ratingCol="rating",coldStartStrategy="drop")
        trainSplit,valSplit = trainDf.randomSplit([0.9,0.1])
        model = als.fit(trainSplit)
        pred = model.transform(valSplit)
        predTrans = combineRddsForRankingMeasure(pred)
        predAndAct = predTrans.map(lambda x: x[1]).filter(lambda x: x[1] != None)
        metrics = RankingMetrics(predAndAct)
        precision = metrics.precisionAt(10)
        if(precision>highPrec):
            highPrec = precision
            flagRank = rank
    return(highPrec,flagRank)

In [14]:
def findMaxIter(rank,iterArray,trainDf):
    highPrec = 0
    flagIter = 0
    for itera in iterArray:
        als = ALS(maxIter=itera,regParam=0.01,rank=rank,implicitPrefs=False,userCol="user",itemCol="movie",ratingCol="rating",coldStartStrategy="drop")
        trainSplit,valSplit = trainDf.randomSplit([0.9,0.1])
        model = als.fit(trainSplit)
        pred = model.transform(valSplit)
        predTrans = combineRddsForRankingMeasure(pred)
        predAndAct = predTrans.map(lambda x: x[1]).filter(lambda x: x[1] != None)
        metrics = RankingMetrics(predAndAct)
        precision = metrics.precisionAt(10)
        if(precision>highPrec):
            highPrec = precision
            flagIter = itera
    return(highPrec,flagIter)

In [19]:
score,rank = findBestFitRank(11,[4,6,8],trainDf) #Tuning rank hyperparameter

In [20]:
print("Best_Score "+str(score)+" | Best_Rank "+str(rank))

Best_Score 0.595838926174497 | Best_Rank 8


In [21]:
score,itera = findMaxIter(8,[9,11,13],trainDf)

In [22]:
print("Best_Score "+str(score)+" | Best_Iter "+str(itera)) #Tuning iteration hyperparameter

Best_Score 0.6079019073569478 | Best_Iter 11


In [23]:
def training(df):
    als = ALS(maxIter = 11, rank =8, regParam = 0.01, implicitPrefs = False, userCol = "user", itemCol = "movie", ratingCol = "rating",coldStartStrategy="drop")
    model = als.fit(df)
    return(model)

In [24]:
model = training(trainDf)

In [25]:
def testError(df,alsModel):
    predDf = alsModel.transform(df)
    testRdd = combineRddsForRankingMeasure(predDf)
    predAndAct = testRdd.map(lambda x: x[1]).filter(lambda x: x[1] != None)
    metrics = RankingMetrics(predAndAct)
    precision = metrics.precisionAt(10)
    MAP = metrics.meanAveragePrecision
    ndcg = metrics.ndcgAt(10)
    print("Precision at 10 - "+str(precision))
    print("MAP - "+str(MAP))
    print("ndcg - "+str(ndcg))
    

In [26]:
rankingMeasure = testError(testDf,model) 

Precision at 10 - 0.5555555555555555
MAP - 0.906664292842798
ndcg - 0.9311795896162314


In [35]:
def nameDict():
    movieName = {}
    with open("d:/sparkcourse/Recommendation-movies/ml-100k/u.ITEM") as doc:
        for line in doc:
            fields = line.split('|')
            movieName[int(fields[0])] = fields[1]
    return(movieName)

In [60]:
bMovieName = spark.sparkContext.broadcast(nameDict())

In [61]:
#Recommendations
def userRecommendation(userID,model,count):
    userRecs = model.recommendForAllUsers(count)
    reco = userRecs.where(userRecs.user == userID).select("recommendations.movie").collect()
    for i in reco:
        for j in i[0]:
            print(bMovieName.value[j])

In [62]:
userID = 62
userRecommendation(userID,model,10)

Inspector General, The (1949)
Chungking Express (1994)
Faust (1994)
Little Princess, The (1939)
Meet John Doe (1941)
Jean de Florette (1986)
Aparajito (1956)
Old Man and the Sea, The (1958)
Nénette et Boni (1996)
Close Shave, A (1995)
