In [None]:
## O seguite código só serve para importar o SPARK
## Se der para mudar o Kernal para o SPARK, pode ser retirado

import findspark
findspark.init()
findspark.find()
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
%config IPCompleter.greedy=True

conf = pyspark.SparkConf().setAppName('appName').setMaster('local')
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)

## Imports

In [None]:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import *
from pyspark.sql.functions import broadcast
from pyspark.ml.recommendation import ALS
from Lib import random

## <a id='2'> Ler Datasets </a>

In [None]:
rawUserArtistData = sc.textFile("../../profiledata_06-May-2005/user_artist_data.txt")
rawArtistData = sc.textFile("../../profiledata_06-May-2005/artist_data.txt")
rawArtistAlias = sc.textFile("../../profiledata_06-May-2005/artist_alias.txt")

## <a id='2'> Helper Functions </a>

As seguintes funções são usadas depois pelas Main Functions, as principais resposáveis pelo funcinamento do programa.
Por isso, mudámos a ordem em que as funções são apresentadas, metendo as Helper antes das outras.

## buildArtistById

In [None]:
#no livro é usado o flatMap, mas o resultado aqui é identico

def buildArtistByID(rawArtistData):
    artistDF = rawArtistData.map(lambda line: line.split("\t",1) if len(line.split("\t"))>1 else None).filter(lambda value: value is not None).toDF(["id","name"])
    artistDF = artistDF.withColumn("id", artistDF["id"].cast(IntegerType()))
    return artistDF

## buildArtistAlias

In [None]:
def buildArtistAlias(rawArtistAlias):
    artistAliasDF = rawArtistAlias.map(lambda line: line.split("\t",1) if line.split("\t")[0]!="" else None).filter(lambda value: value is not None).toDF(["artist_id","alias"])

    artistAliasDF = artistAliasDF.withColumn("artist_id", artistAliasDF["artist_id"].cast(IntegerType()))
    artistAliasDF = artistAliasDF.withColumn("alias", artistAliasDF["alias"].cast(IntegerType()))
    
    return artistAliasDF

## buildCounts

In [None]:
def buildCounts(rawUserArtistData, bArtistAlias):
    buildDF = rawUserArtistData.map(lambda line: line.split(" ")).toDF(["user","artist","count"])

    buildDF = buildDF.withColumn("user", buildDF["user"].cast(IntegerType()))
    buildDF = buildDF.withColumn("artist", buildDF["artist"].cast(IntegerType()))
    buildDF = buildDF.withColumn("count", buildDF["count"].cast(IntegerType()))
    
    joinExpression = buildDF["artist"] == bArtistAlias["artist_id"]
    joinedDF = buildDF.join(bArtistAlias, joinExpression, "left_outer")
    joinedDF = joinedDF.withColumn("artist", when(joinedDF["alias"].isNotNull(), joinedDF["alias"]).otherwise(joinedDF["artist"])).select("user","artist","count")
    
    return joinedDF

## makeRecommendations

In [None]:
def makeRecommendations(model, userID, howMany):
    spark.conf.set("spark.sql.crossJoin.enabled", "true")
    toRecommend = model.itemFactors.selectExpr("id as artist").withColumn("user", lit(userID))
    top = model.transform(toRecommend).select("artist","prediction").orderBy(col("prediction").desc()).limit(howMany)
    
    return top

## <a id='2'> Main Functions </a>

Principais funções

## Preparation

In [None]:
def preparation(rawUserArtistData, rawArtistData, rawArtistAlias):
    userArtistDF = rawUserArtistData.map(lambda line: line.split(" ")).toDF(["user","artist"]).select("user", "artist")

    userArtistDF = userArtistDF.withColumn("user", userArtistDF["user"].cast(IntegerType()))
    userArtistDF = userArtistDF.withColumn("artist", userArtistDF["artist"].cast(IntegerType()))
    
    #userArtistDF.select(min("user"), max("user"), min("artist"), max("artist")).show()
    
    artistByID = buildArtistByID(rawArtistData)
    artistAlias = buildArtistAlias(rawArtistAlias)
    
    badID = artistAliasDF.head()[0]
    goodID = artistAliasDF.head()[1]
    
    #artistDF.filter(artistDF.id.isin(badID,goodID)).show()

In [None]:
artistAlias = buildArtistAlias(rawArtistAlias)

In [None]:
artistAlias.filter(artistAlias["alias"].isin(6969903)).show()

## Model

In [None]:
def model(rawUserArtistData, rawArtistData, rawArtistAlias):
    bArtistAlias = broadcast(buildArtistAlias(rawArtistAlias))
    trainData = buildCounts(rawUserArtistData, bArtistAlias)
    
    model = ALS()\
        .setSeed(random.randint(1000000000000000000, 9999999999999999999))\
        .setImplicitPrefs(True)\
        .setRank(10)\
        .setRegParam(0.01)\
        .setAlpha(1.0)\
        .setMaxIter(5)\
        .setUserCol("user")\
        .setItemCol("artist")\
        .setRatingCol("count")\
        .setPredictionCol("prediction")\
        .fit(trainData)
    
    trainData.unpersist()
    
    #model.userFactors.select("features").show(truncate=False)
    
    return model

In [None]:
model_ = model(rawUserArtistData, rawArtistData, rawArtistAlias)

Test model:

In [None]:
userID = 2093760
bArtistAlias = broadcast(buildArtistAlias(rawArtistAlias))
trainData = buildCounts(rawUserArtistData, bArtistAlias)
artistByID = buildArtistByID(rawArtistData)

existingArtistIDs = trainData.filter(trainData["user"]==userID).select("artist").collect()

#artistByID.filter(artistByID["id"].isin(existingArtistIDs)).show()

In [None]:
#It works!!
artistByID.filter(artistByID["id"].isin(1180, 1255340, 378, 813, 942)).show()

Test Recommendations with model

In [None]:
topRecommendations = makeRecommendations(model_, userID, 5)
topRecommendations.show()

In [None]:
recommendArtistIDs = topRecommendations.select("artist").collect()

In [None]:
#recommendArtistIDs
artistByID.filter(artistByID["id"].isin(2814, 1300642, 1001819, 1037970, 4605)).show()

In [None]:
model_.userFactors.unpersist()
model_.itemFactors.unpersist()

## Evaluate

## Recommend