#1 Configurando bibliotecas e dependencias

In [1]:
!pip install pyspark
!pip install findspark

# instalar as dependências
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285387 sha256=a1dc649e865ba68e6fe44a10a42df039449b006da4cc475d8bdbdf421c0c790f
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [2]:
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator #evaluation é a biblioteca para verificação da qualidade do modelo
from pyspark.ml.recommendation import ALS # ALS é o modelo de recomendação que será utilizadp
from pyspark.sql import Row #row é o formato que o ALS trabalha, row conterá o id do usuario, id filme, nota e timestamp

In [3]:
spark = SparkSession.builder.master('local[*]').getOrCreate()

In [4]:
lines = spark.read.text("sample_movielens_ratings.txt").rdd
#(.rdd) para criar como um rdd nativo, não inserindo ele cria um dataframe

In [5]:
parts = lines.map(lambda row: row.value.split("::"))

In [6]:
ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), \
                                     movieId=int(p[1]), \
                                     rating=float(p[2]), \
                                     timestamp=int(p[3])))

In [7]:
ratings = spark.createDataFrame(ratingsRDD)

In [8]:
lines.collect()

[Row(value='0::2::3::1424380312'),
 Row(value='0::3::1::1424380312'),
 Row(value='0::5::2::1424380312'),
 Row(value='0::9::4::1424380312'),
 Row(value='0::11::1::1424380312'),
 Row(value='0::12::2::1424380312'),
 Row(value='0::15::1::1424380312'),
 Row(value='0::17::1::1424380312'),
 Row(value='0::19::1::1424380312'),
 Row(value='0::21::1::1424380312'),
 Row(value='0::23::1::1424380312'),
 Row(value='0::26::3::1424380312'),
 Row(value='0::27::1::1424380312'),
 Row(value='0::28::1::1424380312'),
 Row(value='0::29::1::1424380312'),
 Row(value='0::30::1::1424380312'),
 Row(value='0::31::1::1424380312'),
 Row(value='0::34::1::1424380312'),
 Row(value='0::37::1::1424380312'),
 Row(value='0::41::2::1424380312'),
 Row(value='0::44::1::1424380312'),
 Row(value='0::45::2::1424380312'),
 Row(value='0::46::1::1424380312'),
 Row(value='0::47::1::1424380312'),
 Row(value='0::48::1::1424380312'),
 Row(value='0::50::1::1424380312'),
 Row(value='0::51::1::1424380312'),
 Row(value='0::54::1::1424380312

In [9]:
ratings.show()

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     0|      2|   3.0|1424380312|
|     0|      3|   1.0|1424380312|
|     0|      5|   2.0|1424380312|
|     0|      9|   4.0|1424380312|
|     0|     11|   1.0|1424380312|
|     0|     12|   2.0|1424380312|
|     0|     15|   1.0|1424380312|
|     0|     17|   1.0|1424380312|
|     0|     19|   1.0|1424380312|
|     0|     21|   1.0|1424380312|
|     0|     23|   1.0|1424380312|
|     0|     26|   3.0|1424380312|
|     0|     27|   1.0|1424380312|
|     0|     28|   1.0|1424380312|
|     0|     29|   1.0|1424380312|
|     0|     30|   1.0|1424380312|
|     0|     31|   1.0|1424380312|
|     0|     34|   1.0|1424380312|
|     0|     37|   1.0|1424380312|
|     0|     41|   2.0|1424380312|
+------+-------+------+----------+
only showing top 20 rows



In [10]:
(training, test) = ratings.randomSplit([0.8, 0.2]) #divide o df em porções para treinamento e teste

In [11]:
als = ALS(maxIter=5, \
          regParam=0.01, \
          userCol="userId", \
          itemCol="movieId", \
          ratingCol="rating", \
          coldStartStrategy="drop")

In [12]:
model = als.fit(training) #treina o modelo com o dataset de treinamento

In [13]:
predictions = model.transform(test) #aplica o modelo no conjunto de teste para fazer predições
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                               predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Erro médio quadrático = " + str(rmse))

Erro médio quadrático = 1.6671500501750995


In [14]:
userRec = model.recommendForAllUsers(10)

In [15]:
userRec.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|    20|[{22, 4.9569845},...|
|    10|[{83, 4.5931425},...|
|     0|[{25, 4.6776648},...|
|     1|[{51, 3.8314211},...|
|    21|[{52, 6.278963}, ...|
|    11|[{32, 5.239352}, ...|
|    12|[{90, 7.219157}, ...|
|    22|[{51, 5.1317997},...|
|     2|[{93, 5.0680766},...|
|    13|[{52, 4.3222284},...|
|     3|[{18, 4.2041664},...|
|    23|[{48, 5.145026}, ...|
|     4|[{52, 3.968298}, ...|
|    24|[{93, 5.197677}, ...|
|    14|[{29, 5.635835}, ...|
|     5|[{32, 4.403255}, ...|
|    15|[{46, 5.1529207},...|
|    25|[{92, 5.362459}, ...|
|    26|[{51, 6.0281124},...|
|     6|[{41, 3.2415028},...|
+------+--------------------+
only showing top 20 rows



In [16]:
movieRecs = model.recommendForAllItems(10)
#faz a transposta da matriz de ratings, a fim de recomendar usuários em potencial para itens específicos

In [17]:
movieRecs.show()

+-------+--------------------+
|movieId|     recommendations|
+-------+--------------------+
|     20|[{17, 4.7388024},...|
|     40|[{2, 4.1062417}, ...|
|     10|[{23, 4.026212}, ...|
|     50|[{23, 4.109942}, ...|
|     80|[{26, 3.2243974},...|
|     70|[{21, 3.7992275},...|
|     60|[{24, 3.100827}, ...|
|     90|[{12, 7.219157}, ...|
|     30|[{22, 4.5683265},...|
|      0|[{28, 2.8014977},...|
|     31|[{12, 3.8505828},...|
|     81|[{28, 4.870546}, ...|
|     91|[{23, 3.6312578},...|
|      1|[{15, 3.7164266},...|
|     41|[{14, 4.6066194},...|
|     61|[{25, 2.9992385},...|
|     51|[{26, 6.0281124},...|
|     21|[{26, 2.972913}, ...|
|     11|[{18, 3.9135637},...|
|     71|[{25, 3.6825786},...|
+-------+--------------------+
only showing top 20 rows



In [18]:
users = ratings.select(als.getUserCol()).distinct()
#selecina os usuários que existem nesse universo

In [19]:
users.show()

+------+
|userId|
+------+
|    26|
|    29|
|    19|
|     0|
|    22|
|     7|
|    25|
|     6|
|     9|
|    27|
|    17|
|    28|
|     5|
|     1|
|    10|
|     3|
|    12|
|     8|
|    11|
|     2|
+------+
only showing top 20 rows



In [20]:
UserRecsOnlyItemId = userRec.select(userRec['userId'], \
                                    userRec['recommendations']['movieid'])

In [21]:
UserRecsOnlyItemId.show(10, False) #mostra somente as recomendações por usuário

+------+----------------------------------------+
|userId|recommendations.movieid                 |
+------+----------------------------------------+
|20    |[22, 74, 75, 77, 98, 18, 51, 29, 53, 36]|
|10    |[83, 92, 2, 9, 20, 91, 89, 57, 4, 49]   |
|0     |[25, 49, 92, 9, 24, 2, 85, 32, 53, 91]  |
|1     |[51, 32, 62, 22, 68, 9, 24, 95, 85, 31] |
|21    |[52, 72, 29, 32, 74, 27, 70, 31, 62, 87]|
|11    |[32, 27, 23, 18, 79, 69, 48, 7, 8, 87]  |
|12    |[90, 17, 35, 32, 20, 68, 49, 48, 16, 94]|
|22    |[51, 88, 30, 22, 72, 69, 32, 77, 68, 23]|
|2     |[93, 8, 83, 37, 39, 89, 40, 92, 34, 19] |
|13    |[52, 93, 74, 96, 92, 53, 29, 72, 8, 88] |
+------+----------------------------------------+
only showing top 10 rows

