### Spark MLLib - Sistema de Recomendação

**Descrição**

    . Também chamado de filtros colaborativos
    
    . Analisa dados passados para compreender comportamentos de pessoas/entidades
    
    . A recomendação é feita por similaridade de comportamento
    
    . Recomendação é baseada em usuários ou itens
    
    . Algoritmos de Recomendação esperam receber os dados em um formato específico:[user_ID, item_ID, score].
    
    . Score, também chamado rating, indica a preferencia de um usuario sobre um item. Podem ser valores booleanos, ratings           ou mesmo volume de vendas.

In [1]:
# Imports
from pyspark.ml.recommendation import ALS

In [2]:
# Iniciando uma Spark Session
spSession = SparkSession.builder.master("local").appName("SparkMLLib-Recomendacao").getOrCreate()

In [3]:
# Carregando os dados
ratingRDD = sc.textFile("user-item.txt")

In [4]:
type(ratingRDD)

pyspark.rdd.RDD

In [5]:
# Numero de registros no conjunto de dados
ratingRDD.count()

38

In [6]:
# Verificando o cabeçalho - Nao ha cabeçalho
ratingRDD.first()

'1001,9001,10'

In [7]:
# Visualizando alguns registros
ratingRDD.take(10)

['1001,9001,10',
 '1001,9002,1',
 '1001,9003,9',
 '1002,9001,3',
 '1002,9002,5',
 '1002,9003,1',
 '1002,9004,10',
 '1003,9001,2',
 '1003,9002,6',
 '1003,9003,2']

#### Os dados estao como strings entao transformamos os dados para os tipos adequados

In [15]:
# Criando a função de transformação
def transformaDados(inputStr):
    
    attList = inputStr.split(",")
    
    col1 = int(attList[0])
    col2 = int(attList[1])
    col3 = float(attList[2])
    
    dados = [col1, col2, col3]
    
    return dados

In [16]:
# Transformando os dados
ratingRDD2 = ratingRDD.map(transformaDados)

In [17]:
# Visualizando o conjunto criado
ratingRDD2.take(5)

[[1001, 9001, 10.0],
 [1001, 9002, 1.0],
 [1001, 9003, 9.0],
 [1002, 9001, 3.0],
 [1002, 9002, 5.0]]

In [25]:
# Criando um dataframe - para a criaçao do algoritmo de recomendaçao e necessario ter as colunas como : [user, item, rating]
ratingDF = spSession.createDataFrame(ratingRDD2, ["user", "item", "rating"])

In [27]:
# Visualizando o dataFrame
ratingDF.select("user", "item", "rating").show(5)

+----+----+------+
|user|item|rating|
+----+----+------+
|1001|9001|  10.0|
|1001|9002|   1.0|
|1001|9003|   9.0|
|1002|9001|   3.0|
|1002|9002|   5.0|
+----+----+------+
only showing top 5 rows



In [28]:
# Criando o Modelo
# ALS -> Alternating Least Squares --> Algortimo para sistema de recomendação, que otimiza a loss function
# e funciona muito bem em ambientes paralelizados
als = ALS(rank = 10, maxIter = 5)
modelo = als.fit(ratingDF)

In [29]:
# Visualizando o Affinity Score
modelo.userFactors.orderBy("id").collect()

[Row(id=1001, features=[0.9681084156036377, -0.6955734491348267, 0.7195470929145813, -0.23173299431800842, 0.258536159992218, 0.21683301031589508, -0.3001883029937744, 0.9602264761924744, 0.4306694269180298, 0.176595076918602]),
 Row(id=1002, features=[0.2752208411693573, 0.45148375630378723, 0.007497432176023722, 0.029693691059947014, 0.7466303110122681, 1.5252691507339478, 0.1335616558790207, -0.6253135204315186, -0.16368703544139862, -0.18602345883846283]),
 Row(id=1003, features=[0.31628555059432983, 0.48526066541671753, -0.09361215680837631, 0.6461942791938782, 0.16269101202487946, 1.2503844499588013, 0.22005711495876312, -0.5321128368377686, -0.16695955395698547, 0.0727267861366272]),
 Row(id=1004, features=[0.9316565990447998, -0.7643890976905823, 0.4667227268218994, -0.060695432126522064, 0.544359028339386, 0.6515966057777405, -0.22367969155311584, 0.5234723091125488, 0.09014416486024857, 0.03905295953154564]),
 Row(id=1005, features=[0.7033907771110535, 0.16621631383895874, 0.

In [30]:
# Criando um dataset de teste com usuários e itens para rating
testeDF = spSession.createDataFrame([(1001, 9003), (1001, 9004), (1001, 9005)], ["user", "item"])

In [31]:
# Previsoes
# Quanto maior o Affinity Score, maior a probabilidade do usuário aceitar uma recomendação
previsoes = (modelo.transform(testeDF).collect())
previsoes

[Row(user=1001, item=9004, prediction=-0.6358490586280823),
 Row(user=1001, item=9005, prediction=-2.2901651859283447),
 Row(user=1001, item=9003, prediction=9.001792907714844)]

### FIM