In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [0]:
!wget -q http://apache.osuosl.org/spark/spark-2.2.1/spark-2.2.1-bin-hadoop2.7.tgz

In [0]:
!tar xf spark-2.2.1-bin-hadoop2.7.tgz

In [0]:
!pip install -q findspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.2.1-bin-hadoop2.7"

In [0]:
import findspark
findspark.init()

In [0]:
from pyspark.mllib.recommendation import *
import random
from operator import *
from pyspark import SparkContext, SparkConf

conf =SparkConf().setAppName("cust data").setMaster("local[*]")
sc=SparkContext(conf=conf)






In [8]:

# ## Loading data
from google.colab import files
uploaded = files.upload()

Saving artist_alias_small.txt to artist_alias_small (2).txt
Saving artist_data_small.txt to artist_data_small (2).txt
Saving user_artist_data_small.txt to user_artist_data_small (2).txt


In [9]:
for fn in uploaded.keys():
  print('User uploaded file "{name}" with length {length} bytes'.format(name=fn, length=len(uploaded[fn])))

User uploaded file "artist_alias_small.txt" with length 9683 bytes
User uploaded file "user_artist_data_small.txt" with length 947715 bytes
User uploaded file "artist_data_small.txt" with length 758729 bytes


In [0]:

artistData=sc.textFile("artist_data_small.txt")
artistAlias=sc.textFile("artist_alias_small.txt")
userArtistData=sc.textFile("user_artist_data_small.txt")
userArtistData.count()
userArtistData1=userArtistData.map(lambda line:line.split(" ")).map(lambda r: (int(r[0]), int(r[1]),int(r[2])))
artistAlias1=artistAlias.map(lambda line:line.split("\t")).map(lambda r: (int(r[0]), int(r[1])))
artistData1=artistData.map(lambda line:line.split("\t")).map(lambda r: (int(r[0]), r[1]))




# ## Data Exploration


mean=userArtistData1.map(lambda r: (int(r[0]), int(r[1]))).groupByKey().map(lambda l:(l[0],len(list(l[1]))))
aggr=userArtistData1.map(lambda r: (int(r[0]), int(r[2]))).reduceByKey(lambda a,b:a+b)


topThree=aggr.takeOrdered(3, key=lambda x: -x[1])
topThree1=sc.parallelize(topThree)
a=topThree1.join(mean).map(lambda x: (x[0],x[1][0],x[1][0]/x[1][1]))
my_list=a.collect()
for list_elems in my_list:
    print("User "+str(list_elems[0])+" has a total play count of "+str(list_elems[1])+" and a mean play count of "+str(list_elems[2]))
  


# Splitting Data for Testing


trainData,validationData,testData=userArtistData1.randomSplit([0.4,0.4,0.2],13)
trainData.cache()
validationData.cache()
testData.cache()


# ## The Recommender Model
 
# ### Model Evaluation




def modelEval(bestModel,Data1, trainData):
    a=trainData.map(lambda x: ((x[0]),(x[1]))).groupByKey().map(lambda r: (int(r[0]), list(r[1]))).collect()
    object_dict = dict((x[0], x[1]) for x in a)
    t=Data1.map(lambda x: ((x[0]),(x[1]))).groupByKey().map(lambda r: (int(r[0]), list(r[1]))).collect()
    object_dict1 = dict((x[0], x[1]) for x in t)
    allArtists=userArtistData1.map(lambda x: x[1]).distinct()
    t1=Data1.map(lambda g:(g[0], g[1]))
    unique_test=Data1.map(lambda x: x[0]).distinct().collect()
    sum=0
    for users in unique_test:
        userEval=[]
        nonTrainArtists=set(allArtists.collect())-set(object_dict[users])
        for art in nonTrainArtists:
            userEval.append((users,art))                                      
        userEval=sc.parallelize(userEval)    
        trueArtist=object_dict1[users]
        mod=bestModel.predictAll(userEval)
        predictResult=mod.map(lambda l: (l[1],l[2])).takeOrdered(len(trueArtist), key=lambda x: -x[1]) 
        predictResult1=sc.parallelize(predictResult)
        predictResult1=predictResult1.map(lambda f:f[0]).collect()
        h=set(predictResult1) & set(trueArtist)
        d=len(h)/float(len(predictResult1))
        sum=sum+d
    return float(sum/float(len(unique_test)))       


# ### Model Construction



vals = [2, 10, 20]
for val in vals:
    expModel = ALS.trainImplicit(trainData, rank=val, seed=345)
    score=modelEval(expModel, validationData, trainData)
    print "The model score for rank "+str(val)+" is",'%.5f' % score




bestModel = ALS.trainImplicit(trainData, rank=10, seed=345)
modelEval(bestModel, testData, trainData)


# ## Trying Some Artist Recommendations
# Using the best model above, predicting the top 5 artists for user `2023977` using the [recommendProducts](http://spark.apache.org/docs/1.5.2/api/python/pyspark.mllib.html#pyspark.mllib.recommendation.MatrixFactorizationModel.recommendProducts) function.

x=bestModel.recommendProducts(2023977,5)
recomendations=sc.parallelize(x)
recomendationArtists=recomendations.map(lambda r:r[1]).collect()
y=artistAlias1.map(lambda x: ((x[0]),(x[1]))).groupByKey().map(lambda r: (int(r[0]), list(r[1]))).collect()
y_dict = dict((x[0], x[1]) for x in y)
realArt=artistData1.map(lambda x: ((x[0]),(x[1]))).groupByKey().map(lambda r: (int(r[0]), list(r[1]))).collect()
realArt_dict = dict((x[0], x[1]) for x in realArt)
i=0
for art in recomendationArtists:
    if art in realArt_dict.keys():
        i=i+1
        print "Artist %d :"%i+ realArt_dict[art][0]
    else:
        alias=y_dict[art]
        i=i+1
        print "Artist %d :"%i+ realArt_dict[alias][0]

User 2064012 has a total play count of 548427 and a mean play count of 9455
User 1059637 has a total play count of 674412 and a mean play count of 1878
User 2069337 has a total play count of 393515 and a mean play count of 1519
