# 1. sc

In [219]:
sc

<pyspark.context.SparkContext at 0x7ff5b6e78850>

# 2. library & function

In [220]:
from pyspark.mllib.recommendation import *
from pyspark.sql import SQLContext
from pyspark.sql.types import *
import pandas as pd

def eval(model, al, train, valid, rank):
    '''
    al: all
    trian: trianing
    valid: validation
    '''
    validMap = valid.map(lambda x:(x[0],x[1])).groupByKey().collectAsMap()
    trainMap = train.map(lambda x:(x[0],x[1])).groupByKey().collectAsMap() 
    alSongs = al.map(lambda x:(x[1])).collect()
    validUserScores = []
    for user in validMap.keys():
        userTrainSongs = trainMap.get(user) 
        userNotTrainSongs = list(set(alSongs).difference(set(userTrainSongs)))
        userNotTrainSongsRDD = sc.parallelize([(user,x) for x in userNotTrainSongs])
        prediction = model.predictAll(userNotTrainSongsRDD).map(lambda x: (x[2], x[1])).sortByKey(False).map(lambda x: x[1])
        userValidSongs = validMap.get(user)
        userValidSongsCounts = len(userValidSongs)
        validUserScores.append(1.0 * len(set(prediction.take(userValidSongsCounts)).intersection(set(userValidSongs))) / userValidSongsCounts)
    validUserMeanScore =  1.0 * sum(validUserScores) / len(validUserScores)
    print "Evaluation Score For Rank {} is {}".format(rank, validUserMeanScore)
    
    
def matchSong(targets, df):
    targets = map(lambda x: str(x[1]), targets)
    for r in df:
        if str(r[0]) in targets:
            print "Song: {}\nArtist: {}".format(r[2].encode('utf-8'),r[3].encode('utf-8'))

def matchArtist(targets, df):
    targets = map(lambda x: str(x[1]), targets)
    for r in df:
        if str(r[1]).encode('utf-8') in targets:
            print "Artist: {}".format(r[0].encode('utf-8'))

# 3. Music Recommendation

### 3.1 read data

In [221]:
musicData = sc.textFile('./id_train_triplets.csv', use_unicode=False).map(lambda x: x.split(',')).map(lambda x: (int(x[3]), int(x[4]), int(x[2])))

trainData, validationData, testData = musicData.randomSplit([0.4, 0.4, 0.2], 13)
trainData.cache()
validationData.cache()
testData.cache()

PythonRDD[21843] at RDD at PythonRDD.scala:48

### 3.2 evaluation / validation

In [223]:
ranks=[1, 3, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50]

print("ALS.trainImplicit")
for r in ranks:
    Model = ALS.trainImplicit(ratings=trainData, 
                              alpha=0.01, 
                              blocks=10, 
                              iterations=5, 
                              lambda_=0.01, 
                              nonnegative=False,
                              rank=r, 
                              seed=826) 
    eval(Model, musicData, trainData, validationData, r)
    
# print("ALS.train")
# for r in ranks:
#     Model = ALS.train(ratings=trainData,
#                       alpha=0.01,
#                       blocks=-1, 
#                       iterations=10, 
#                       lambda_=0.01, 
#                       nonnegative=False,
#                       rank=r,
#                       seed=826) 
#     eval(Model, musicData, trainData, validationData, r)

ALS.trainImplicit
Evaluation Score For Rank 1 is 0.0057980409104
Evaluation Score For Rank 3 is 0.0057980409104
Evaluation Score For Rank 5 is 0.0154134255258
Evaluation Score For Rank 10 is 0.00259291270527
Evaluation Score For Rank 15 is 0.00259291270527
Evaluation Score For Rank 20 is 0.00216076058773
Evaluation Score For Rank 25 is 0.00129645635264
Evaluation Score For Rank 30 is 0.00419547680784
Evaluation Score For Rank 35 is 0.00259291270527
Evaluation Score For Rank 40 is 0.00129645635264
Evaluation Score For Rank 45 is 0.00259291270527
Evaluation Score For Rank 50 is 0.00129645635264


### 3.3. model & predict

In [226]:
user = 8 #### Input
bestRank_1 = 5

print("ALS.trainImplicit")
bestModel_1 = ALS.trainImplicit(musicData, rank=bestRank_1, seed=826, blocks=10) 
#eval(bestModel_1, musicData, trainData, testData, rank=bestRank_1)
top3_1 = bestModel_1.recommendProducts(user,3)
top3_1
# bestRank_2 = 40
# print("ALS.train")
# bestModel_2 = ALS.train(trainData, rank=bestRank_2, seed=826) 
# eval(bestModel_2, musicData, trainData, testData, rank=bestRank_2)
# top3_2 = bestModel_2.recommendProducts(1,3)
# print top3_2

ALS.trainImplicit


[Rating(user=8, product=251, rating=0.0031352164611463504),
 Rating(user=8, product=176, rating=0.0026368706776661297),
 Rating(user=8, product=230, rating=0.002458122644772543)]

### 3.4 match

In [227]:
sqlContext = SQLContext(sc)

songID = sqlContext.read.format('com.databricks.spark.csv'). \
options(header='true'). \
load('./SongID_int.csv')
songID.createOrReplaceTempView("songID")
songID.printSchema()

songName = sqlContext.read.format('com.databricks.spark.csv'). \
options(header='true'). \
load('./t.csv')
songName.createOrReplaceTempView("songName")
songName.printSchema()

df = sqlContext.sql('select a.sid, a.song, b.artist, b.title from songID a left join songName b on a.song = b.songid').collect()
matchSong(top3_1, df)

root
 |-- sid: string (nullable = true)
 |-- song: string (nullable = true)

root
 |-- trackid: string (nullable = true)
 |-- songid: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist: string (nullable = true)

Song: All My Friends
Artist: LCD Soundsystem
Song: Harder Better Faster Stronger
Artist: Daft Punk
Song: Life In Technicolor ii
Artist: Coldplay


# 4. Artist Recommendation

### 4.1 group by 'artist' 
- 'uid - aid - counts' 

In [228]:
df2 = sqlContext.createDataFrame(df)
df2.createOrReplaceTempView("songMeta")
df2.printSchema()

schema = StructType([
    StructField("uid", IntegerType(), True),
    StructField("sid", StringType(), True),
    StructField("counts", IntegerType(), True)])
df3 = sqlContext.createDataFrame(musicData, schema)
df3.createOrReplaceTempView("musicData")
df3.printSchema()

df4 = sqlContext.sql("select a.uid, b.title from musicData a left join songMeta b on a.sid = b.sid")
df4.createOrReplaceTempView("UserArtist")

df5 = sqlContext.sql('select uid, title, count(*) from UserArtist group by uid, title').collect()
schema = StructType([
    StructField("uid",IntegerType(),True),
    StructField('artist',StringType(),True),
    StructField('counts',LongType(),True)])
df5 = sqlContext.createDataFrame(df5, schema)
df5.printSchema()
df5.createOrReplaceTempView("Artist")

df6 = sqlContext.sql('SELECT distinct a.artist, ROW_NUMBER() OVER (ORDER BY (SELECT 1)) AS aid FROM Artist AS a')
df6.createOrReplaceTempView("ArtistID")
df6.printSchema()

df7 = sqlContext.sql("select a.uid, b.aid, a.counts as counts from Artist a left join ArtistID b on a.artist = b.artist")
df7.printSchema()
df7_rdd = df7.rdd.map(tuple)

root
 |-- sid: string (nullable = true)
 |-- song: string (nullable = true)
 |-- artist: string (nullable = true)
 |-- title: string (nullable = true)

root
 |-- uid: integer (nullable = true)
 |-- sid: string (nullable = true)
 |-- counts: integer (nullable = true)

root
 |-- uid: integer (nullable = true)
 |-- artist: string (nullable = true)
 |-- counts: long (nullable = true)

root
 |-- artist: string (nullable = true)
 |-- aid: integer (nullable = true)

root
 |-- uid: integer (nullable = true)
 |-- aid: integer (nullable = true)
 |-- counts: long (nullable = true)



### 4.2 read data

In [229]:
trainData, validationData, testData = df7_rdd.randomSplit([0.4, 0.4, 0.3], 826)
trainData.cache()
validationData.cache()
testData.cache()

PythonRDD[32921] at RDD at PythonRDD.scala:48

### 4.3 evaluation

In [230]:
ranks=[1, 3, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50]

print("ALS.trainImplicit")
for r in ranks:
    Model = ALS.trainImplicit(ratings=trainData, 
                              alpha=0.01, 
                              blocks=10, 
                              iterations=5, 
                              lambda_=0.01, 
                              nonnegative=False,
                              rank=r, 
                              seed=826) 
    eval(Model, df7_rdd, trainData, validationData, r)

ALS.trainImplicit
Evaluation Score For Rank 1 is 0.0369167869168
Evaluation Score For Rank 3 is 0.0459196887768
Evaluation Score For Rank 5 is 0.0276434412149
Evaluation Score For Rank 10 is 0.0471239606954
Evaluation Score For Rank 15 is 0.0292293685151
Evaluation Score For Rank 20 is 0.0497213632928
Evaluation Score For Rank 25 is 0.0457681736253
Evaluation Score For Rank 30 is 0.039704131847
Evaluation Score For Rank 35 is 0.0567989417989
Evaluation Score For Rank 40 is 0.0329399436542
Evaluation Score For Rank 45 is 0.023852470281
Evaluation Score For Rank 50 is 0.0385092420807


### 4.4 model

In [231]:
user = 8 #### Input
bestRank_artist = 35

print("ALS.trainImplicit")
bestModel_artist = ALS.trainImplicit(df7_rdd, rank=bestRank_artist, seed=826, blocks=10) 
eval(bestModel_artist, df7_rdd, trainData, testData, rank=bestRank_1)
top3_artist = bestModel_artist.recommendProducts(user,3)
top3_artist

ALS.trainImplicit
Evaluation Score For Rank 5 is 0.492973856209


[Rating(user=8, product=242, rating=0.9955798618297991),
 Rating(user=8, product=150, rating=0.9955798618297991),
 Rating(user=8, product=232, rating=0.9955798618297991)]

### 4.5 test

In [265]:
user = 11
top3_artist = bestModel_artist.recommendProducts(user,3)
top3_artist
df6_df = df6.collect()
matchArtist(top3_artist, df6_df)

Artist: Taylor Swift
Artist: B.o.B
Artist: Green Day
