<a href="https://colab.research.google.com/github/cwaltrick/data_science/blob/master/Modelo_ALS.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# instalar as dependências
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

In [2]:
# configurar as variáveis de ambiente
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

# tornar o pyspark "importável"
import findspark
findspark.init('spark-2.4.4-bin-hadoop2.7')

In [3]:
from __future__ import print_function

import sys
if sys.version >= '3':
    long = int
    
from pyspark.sql import SparkSession

from pyspark.ml.evaluation import RegressionEvaluator #evaluation é a biblioteca para verificação da qualidade do modelo
from pyspark.ml.recommendation import ALS # ALS é o modelo de recomendação que será utilizadp
from pyspark.sql import Row #row é o formato que o ALS trabalha, row conterá o id do usuario, id filme, nota e timestamp

In [4]:
spark = SparkSession.builder.master('local[*]').getOrCreate() #criar/iniciar a sessão spark

In [6]:
lines = spark.read.text("sample_movielens_ratings.txt").rdd #Carregar os dados. RDD é uma estrutura paralelizada do spark

In [7]:
parts = lines.map(lambda row: row.value.split("::")) #pega os itens de lines e aplica map para quebrar em partes
#fez expressão lambda, nomeou cada linha como row e quebra cada row a cada "::" retorna um array com 4 itens

In [8]:
#ratingsRDD: pega cada parte do item acima e converte para formato Row, instanciando nome e posição
ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]), rating=float(p[2]), timestamp=long(p[3])))

In [9]:
ratings = spark.createDataFrame(ratingsRDD) #pega ratingsRDD e coloca em formato de tabela

In [10]:
ratings.show()

+-------+------+----------+------+
|movieId|rating| timestamp|userId|
+-------+------+----------+------+
|      2|   3.0|1424380312|     0|
|      3|   1.0|1424380312|     0|
|      5|   2.0|1424380312|     0|
|      9|   4.0|1424380312|     0|
|     11|   1.0|1424380312|     0|
|     12|   2.0|1424380312|     0|
|     15|   1.0|1424380312|     0|
|     17|   1.0|1424380312|     0|
|     19|   1.0|1424380312|     0|
|     21|   1.0|1424380312|     0|
|     23|   1.0|1424380312|     0|
|     26|   3.0|1424380312|     0|
|     27|   1.0|1424380312|     0|
|     28|   1.0|1424380312|     0|
|     29|   1.0|1424380312|     0|
|     30|   1.0|1424380312|     0|
|     31|   1.0|1424380312|     0|
|     34|   1.0|1424380312|     0|
|     37|   1.0|1424380312|     0|
|     41|   2.0|1424380312|     0|
+-------+------+----------+------+
only showing top 20 rows



In [11]:
(training, test) = ratings.randomSplit([0.8, 0.2]) #divide o df em porções para treinamento e teste

In [12]:
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop") 
#instancia o modelo ALS; maxIter é o máximo de iterações, regParam é coeficiente de aprendizado,
#coldstart é quando o usuário fez poucas iterações com o sistemas ou o sistema tem a matriz muito esparsa, drop: se algum usuário
#tiver problema de coldstart, não será considerado no sistema

In [14]:
model = als.fit(training) #treina o modelo com o dataset de treinamento

In [15]:
predictions = model.transform(test) #aplica o modelo no conjunto de teste para fazer predições
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                               predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Erro médio quadrático = " + str(rmse))

Erro médio quadrático = 1.82015383545805


In [16]:
userRec = model.recommendForAllUsers(10) #pegar todos os usuários e gerar 10 recomendações

In [17]:
userRec.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|    28|[[91, 7.199192], ...|
|    26|[[30, 6.8022966],...|
|    27|[[18, 4.345591], ...|
|    12|[[35, 5.1877465],...|
|    22|[[4, 5.187028], [...|
|     1|[[17, 4.4631], [9...|
|    13|[[2, 3.0728006], ...|
|     6|[[25, 4.814437], ...|
|    16|[[76, 5.6596413],...|
|     3|[[32, 5.3414116],...|
|    20|[[46, 5.877379], ...|
|     5|[[18, 4.877655], ...|
|    19|[[51, 5.3770857],...|
|    15|[[46, 4.7499933],...|
|    17|[[90, 5.0351977],...|
|     9|[[51, 5.090912], ...|
|     4|[[92, 5.3576517],...|
|     8|[[25, 5.7912273],...|
|    23|[[46, 6.087576], ...|
|     7|[[25, 4.92321], [...|
+------+--------------------+
only showing top 20 rows



In [18]:
movieRecs = model.recommendForAllItems(10) #faz a transposta da matriz de ratings, a fim de recomendar usuários em potencial para itens específicos

In [19]:
movieRecs.show()

+-------+--------------------+
|movieId|     recommendations|
+-------+--------------------+
|     31|[[28, 4.4335637],...|
|     85|[[16, 4.4600816],...|
|     65|[[23, 4.8359885],...|
|     53|[[21, 4.8665752],...|
|     78|[[5, 1.405179], [...|
|     34|[[23, 5.4059834],...|
|     81|[[12, 5.1232557],...|
|     28|[[18, 5.062015], ...|
|     76|[[16, 5.6596413],...|
|     26|[[22, 4.0410576],...|
|     27|[[2, 5.0788355], ...|
|     44|[[11, 3.253865], ...|
|     12|[[28, 4.8772264],...|
|     91|[[28, 7.199192], ...|
|     22|[[26, 5.148896], ...|
|     93|[[27, 1.0710598],...|
|     47|[[8, 4.567012], [...|
|      1|[[16, 4.053723], ...|
|     52|[[8, 4.92321], [2...|
|     13|[[23, 4.0366826],...|
+-------+--------------------+
only showing top 20 rows



In [20]:
users = ratings.select(als.getUserCol()).distinct() #selecina os usuários que existem nesse universo

In [21]:
users.show()

+------+
|userId|
+------+
|    26|
|    29|
|    19|
|     0|
|    22|
|     7|
|    25|
|     6|
|     9|
|    27|
|    17|
|    28|
|     5|
|     1|
|    10|
|     3|
|    12|
|     8|
|    11|
|     2|
+------+
only showing top 20 rows



In [22]:
UserRecsOnlyItemId = userRec.select(userRec['userId'], userRec['recommendations']['movieid'])

In [23]:
UserRecsOnlyItemId.show(10, False) #mostra somente as recomendações por usuário

+------+----------------------------------------+
|userId|recommendations.movieid                 |
+------+----------------------------------------+
|28    |[91, 92, 12, 81, 79, 31, 89, 49, 35, 82]|
|26    |[30, 32, 94, 17, 22, 88, 7, 98, 90, 24] |
|27    |[18, 2, 48, 19, 55, 66, 23, 44, 7, 33]  |
|12    |[35, 81, 17, 88, 79, 64, 69, 27, 31, 16]|
|22    |[4, 51, 75, 74, 52, 88, 30, 9, 85, 58]  |
|1     |[17, 90, 62, 51, 69, 85, 28, 22, 38, 76]|
|13    |[2, 52, 29, 18, 53, 9, 43, 92, 58, 83]  |
|6     |[25, 62, 51, 90, 76, 85, 58, 2, 95, 63] |
|16    |[76, 62, 90, 29, 51, 54, 85, 1, 53, 69] |
|3     |[32, 51, 30, 80, 7, 85, 76, 8, 29, 87]  |
+------+----------------------------------------+
only showing top 10 rows

