### ** Atenção: **
Utilize Java JDK 1.8 ou 11 e Apache Spark 2.4.2

**Caso receba mensagem de erro "name 'sc' is not defined", interrompa o pyspark e apague o diretório metastore_db no mesmo diretório onde está este Jupyter notebook**

## <font color='blue'>Spark MLLib - Sistema de Recomendação</font>

<strong> Descrição </strong>
<ul style="list-style-type:square">
  <li>Também chamado de filtros colaborativos.</li>
  <li>Analisa dados passados para compreender comportamentos de pessoas/entidades.</li>
  <li>A recomendação é feita por similaridade de comportamento.</li>
  <li>Recomendação baseada em usuários ou items.</li>
  <li>Algoritmos de Recomendação esperam receber os dados em um formato específico: [user_ID, item_ID, score].</li>
  <li>Score, também chamado rating, indica a preferência de um usuário sobre um item. Podem ser valores booleanos, ratings ou mesmo volume de vendas.</li>
</ul>

In [10]:
# Imports
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS

In [11]:
# Spark Session - usada quando se trabalha com Dataframes no Spark
spSession = SparkSession.builder.master("local").appName("Kayo-SparkMLLib").getOrCreate()

22/08/18 00:33:52 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [12]:
sc = spSession.sparkContext

In [13]:
# Carrega os dados no formato ALS (user, item, rating)
ratingsRDD = sc.textFile("data/user-item.txt")
ratingsRDD.collect()

['1001,9001,10',
 '1001,9002,1',
 '1001,9003,9',
 '1002,9001,3',
 '1002,9002,5',
 '1002,9003,1',
 '1002,9004,10',
 '1003,9001,2',
 '1003,9002,6',
 '1003,9003,2',
 '1003,9004,9',
 '1003,9005,10',
 '1003,9006,8',
 '1003,9007,9',
 '1004,9001,9',
 '1004,9002,2',
 '1004,9003,8',
 '1004,9004,3',
 '1004,9010,10',
 '1004,9011,9',
 '1004,9012,8',
 '1005,9001,8',
 '1005,9002,3',
 '1005,9003,7',
 '1005,9004,1',
 '1005,9010,9',
 '1005,9011,10',
 '1005,9012,9',
 '1005,9013,8',
 '1005,9014,1',
 '1005,9015,1',
 '1006,9001,7',
 '1006,9002,4',
 '1006,9003,8',
 '1006,9004,1',
 '1006,9010,7',
 '1006,9011,6',
 '1006,9012,9']

In [14]:
# Convertendo as strings
ratingsRDD2 = ratingsRDD.map(lambda l: l.split(',')).map(lambda l:(int(l[0]), int(l[1]), float(l[2])))

In [15]:
# Criando um Dataframe
ratingsDF = spSession.createDataFrame(ratingsRDD2, ["user", "item", "rating"])

[Stage 2:>                                                          (0 + 1) / 1]                                                                                

In [16]:
ratingsDF.show()

+----+----+------+
|user|item|rating|
+----+----+------+
|1001|9001|  10.0|
|1001|9002|   1.0|
|1001|9003|   9.0|
|1002|9001|   3.0|
|1002|9002|   5.0|
|1002|9003|   1.0|
|1002|9004|  10.0|
|1003|9001|   2.0|
|1003|9002|   6.0|
|1003|9003|   2.0|
|1003|9004|   9.0|
|1003|9005|  10.0|
|1003|9006|   8.0|
|1003|9007|   9.0|
|1004|9001|   9.0|
|1004|9002|   2.0|
|1004|9003|   8.0|
|1004|9004|   3.0|
|1004|9010|  10.0|
|1004|9011|   9.0|
+----+----+------+
only showing top 20 rows



In [17]:
# Construindo o modelo
# ALS = Alternating Least Squares --> Algoritmo para sistema de recomendação, que otimiza a loss function 
# e funciona muito bem em ambientes paralelizados
# Não são todos os algoritmos de ML que apresentam bom desempenho em sistemas paralelos (ALS, Random Forest)
als = ALS(rank = 10, maxIter = 5)
modelo = als.fit(ratingsDF)

22/08/18 00:42:00 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/08/18 00:42:00 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
22/08/18 00:42:01 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


In [18]:
# Visualizando o Affinity Score
modelo.userFactors.orderBy("id").collect()

[Row(id=1001, features=[0.10471921414136887, -0.12986880540847778, -0.9920188784599304, -0.6670140624046326, 0.002714240225031972, -1.160642385482788, -0.0358116589486599, 0.6356903910636902, -0.0029970980249345303, -0.2793961465358734]),
 Row(id=1002, features=[0.519737184047699, 0.09015104919672012, 0.07795949280261993, 1.1141307353973389, 0.8528119921684265, 0.07211394608020782, 0.7324118614196777, 0.6625441908836365, -0.2170950025320053, -0.7342068552970886]),
 Row(id=1003, features=[0.2783157527446747, 0.14826028048992157, 0.37782737612724304, 0.6766780018806458, 1.1771079301834106, 0.16773365437984467, 0.26167353987693787, 0.5781129002571106, -0.5181673765182495, -0.2702103555202484]),
 Row(id=1004, features=[0.17526251077651978, -0.20905305445194244, -0.9899356961250305, -0.07294473052024841, 0.45437315106391907, -0.9831053614616394, 0.11612792313098907, 0.46997737884521484, -0.13534589111804962, -0.48477357625961304]),
 Row(id=1005, features=[0.15774418413639069, 0.208066433668

In [21]:
# Criando um dataset de teste com usuários e items para rating
testeDF = spSession.createDataFrame([(1001, 9003),(1001,9004),(1001,9005)], ["user", "item"])
testeDF.collect()

[Row(user=1001, item=9003),
 Row(user=1001, item=9004),
 Row(user=1001, item=9005)]

In [None]:
# Previsões  
# Quanto maior o Affinity Score, maior a probabilidade do usuário aceitar uma recomendação
previsoes = (modelo.transform(testeDF).collect())
previsoes