In [82]:
from pyspark.sql import SparkSession, SQLContext, Row
from pyspark.sql.types import StructType, StructField, ArrayType, FloatType, IntegerType
import pyspark.sql.functions as f
import pandas as pd
import sys
import os

In [2]:
pd.set_option('display.max_columns', 500)
pd.set_option('display.max_rows', 40)

In [3]:
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [4]:
warehouse_location = "hdfs://m1.local.br:9000/user/hive/warehouse"
hive_metastore_uri = "thrift://m1.local.br:9083"
defaultFS = "hdfs://m1.local.br:9000"

# 1 - Conectando ao Hive com PySpark

In [5]:
# .config('spark.ui.port', '4040') \
spark = SparkSession.builder \
    .appName("Machine Learning - MovieLens dataset") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .config("spark.hadoop.fs.defaultFS", defaultFS) \
    .config("hive.metastore.uris", hive_metastore_uri) \
    .enableHiveSupport() \
    .getOrCreate()

spark

In [6]:
spark.catalog.refreshByPath(warehouse_location)

In [7]:
print("URI do Metastore do Hive:", spark.conf.get("hive.metastore.uris"))
print("Endereço do Hadoop (HDFS):", spark.conf.get("spark.hadoop.fs.defaultFS"))

URI do Metastore do Hive: thrift://m1.local.br:9083
Endereço do Hadoop (HDFS): hdfs://m1.local.br:9000


In [9]:
# Visualizando o nome do usuário
tables = spark.sql("SELECT current_user()")
tables.show()

+--------------+
|current_user()|
+--------------+
| Vinicius Luiz|
+--------------+



In [10]:
# Se conectando ao banco default
spark.sql("USE default")

DataFrame[]

In [11]:
# Visualizando as tabelas no database default
tables = spark.sql("SHOW tables")
tables.show()

+---------+-----------------+-----------+
|namespace|        tableName|isTemporary|
+---------+-----------------+-----------+
|  default|    genome_scores|      false|
|  default|genome_scores_tmp|      false|
|  default|      genome_tags|      false|
|  default|  genome_tags_tmp|      false|
|  default|            links|      false|
|  default|        links_tmp|      false|
|  default|           movies|      false|
|  default|       movies_tmp|      false|
|  default|          ratings|      false|
|  default|      ratings_tmp|      false|
|  default|             tags|      false|
|  default|         tags_tmp|      false|
+---------+-----------------+-----------+



In [12]:
SQL_COUNT = '''
select count(1)       as qtd_linhas
    , 'genome_scores' as table_name
  from genome_scores
union all
select count(1)       as qtd_linhas
    , 'genome_tags'   as table_name
  from genome_tags
union all
select count(1)       as qtd_linhas
    , 'movies'        as table_name
  from movies
union all
select count(1)       as qtd_linhas
    , 'ratings'       as table_name
  from ratings
union all
select count(1)       as qtd_linhas
    , 'tags'          as table_name
  from tags
union all
select count(1)       as qtd_linhas
    , 'links'         as table_name
  from links
'''

In [13]:
df_movies = spark.sql(SQL_COUNT)
df_movies.show(10, truncate=False)

+----------+-------------+
|qtd_linhas|table_name   |
+----------+-------------+
|18472128  |genome_scores|
|1128      |genome_tags  |
|86537     |movies       |
|33832162  |ratings      |
|2328315   |tags         |
|86537     |links        |
+----------+-------------+



# 2 - Definindo amostragem
O modelo ALS será treinado apenas com filmes que têm mais de **100 avaliações**. Isso pode ser importante para garantir que os filmes usados no treinamento do modelo tenham recebido um número suficiente de avaliações para gerar recomendações mais robustas e significativas.

Os números fornecidos indicam a distribuição do número de avaliações para os filmes:

- O dataset contém **83.239** filmes.
- **17.916** filmes contêm apenas 1 avaliação.
- **10.161** filmes contêm apenas 2 avaliações.
- **55.162** filmes contêm 3 avaliações ou mais.
- **43.873** filmes contêm 5 avaliações ou mais.
- **32.021** filmes contêm 10 avaliações ou mais.
- **16.116** filmes contém 50 avaliações ou mais.
- **12.253** filmes contêm 100 avaliações ou mais.
- **6.929** filmes contêm mais avaliações do que a média (406).

# 3 - Realizando a Hiperparametrização do modelo ALS

In [14]:
qtd_min_ratings = 100

SQL_AMOSTRAGEM = '''
SELECT m.movieid
     , count(1)      as qtd_ratings
  FROM ratings r
  JOIN movies m
    ON r.movieid = m.movieid
 GROUP BY m.movieid
 HAVING count(1) >= {qtd_min_ratings}
'''

df_movies_sample = spark.sql(SQL_AMOSTRAGEM.format(qtd_min_ratings = qtd_min_ratings))
df_movies_sample.count()

12253

In [15]:
df_movies_sample.createOrReplaceTempView("movies_sample")

In [16]:
# Importando a biblioteca RegressionEvaluator do PySpark para avaliação de modelos de regressão
from pyspark.ml.evaluation import RegressionEvaluator

# Importando a biblioteca ALS do PySpark para filtragem colaborativa e recomendação
from pyspark.ml.recommendation import ALS

# Importando a biblioteca TrainValidationSplit do PySpark para realizar validação
# Importando também a classe ParamGridBuilder para construir um grid de parâmetros a serem testados durante a validação cruzada
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

In [17]:
SQL_RATINGS ='''
SELECT r.userid
     , r.movieid
     , r.rating
 FROM ratings r
 JOIN movies_sample ms
   ON r.movieid = ms.movieid
'''

ratings = spark.sql(SQL_RATINGS)
ratings = ratings.withColumn("userid", f.col("userid").cast("long"))

ratings.show(5, truncate=False)
ratings.printSchema()
ratings.count()

+------+-------+------+
|userid|movieid|rating|
+------+-------+------+
|9     |474    |4.0   |
|22    |106100 |5.0   |
|24    |474    |4.5   |
|24    |6721   |3.0   |
|30    |72011  |4.0   |
+------+-------+------+
only showing top 5 rows

root
 |-- userid: long (nullable = true)
 |-- movieid: long (nullable = true)
 |-- rating: decimal(10,1) (nullable = true)



33060369

In [18]:
# Separando os dados em 70% para o treinamento e 30% para a validação
(train, test) = ratings.randomSplit([0.7, 0.3])

In [19]:
# Criando um modelo ALS
# Parâmetros:
# - userCol: Nome da coluna que contém os IDs dos usuários
# - itemCol: Nome da coluna que contém os IDs dos itens (por exemplo, filmes)
# - ratingCol: Nome da coluna que contém as classificações atribuídas pelos usuários aos itens
# - coldStartStrategy: Estratégia para lidar com novos usuários ou itens durante a previsão ("drop" irá descartar)
# - nonnegative: Se True, restringe os fatores latentes a valores não negativos

als = ALS(userCol="userid", itemCol="movieid", ratingCol="rating",
          coldStartStrategy="drop", nonnegative=True)

In [20]:
# Criando um grid de parâmetros para hiperparametrização do modelo ALS usando ParamGridBuilder

# rank: número de fatores latentes
# maxIter: número máximo de iterações
# regParam: parâmetro de regularização
param_grid = ParamGridBuilder()\
            .addGrid(als.rank, [10, 15, 20])\
            .addGrid(als.maxIter, [5, 10, 15])\
            .addGrid(als.regParam, [0.01, .1, .5])\
            .build()

In [21]:
# Definindo um avaliador para a métrica RMSE (Root Mean Squared Error)

# Parâmetros:
# - metricName: Nome da métrica a ser avaliada, neste caso, "rmse" para o Root Mean Squared Error
# - labelCol: Nome da coluna que contém os rótulos reais (neste caso, as classificações atribuídas pelos usuários)
# - predictionCol: Nome da coluna que contém as previsões geradas pelo modelo

evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

In [22]:
# Criando um objeto TrainValidationSplit no PySpark para realizar a divisão entre treino e validação durante a validação cruzada

# Parâmetros:
# - estimator: Estimador a ser validado, neste caso, o modelo ALS que foi criado anteriormente
# - estimatorParamMaps: Grid de parâmetros a serem testados durante a validação cruzada
# - evaluator: Avaliador a ser usado para avaliar o desempenho do modelo em diferentes configurações

train_validation = TrainValidationSplit(
    estimator=als,
    estimatorParamMaps=param_grid,
    evaluator=evaluator,
)

In [23]:
# Treinando o modelo
model = train_validation.fit(train)

In [24]:
# Obtendo o melhor modelo
best_model = model.bestModel
best_model

ALSModel: uid=ALS_b76d695af141, rank=20

In [25]:
# Gerando as previsões
predictions = best_model.transform(test)
predictions.show(5, truncate=False)

+------+-------+------+----------+
|userid|movieid|rating|prediction|
+------+-------+------+----------+
|28    |135    |1.0   |2.2751474 |
|28    |593    |4.0   |3.7553833 |
|28    |2184   |4.0   |3.2852948 |
|28    |2559   |3.0   |2.1906624 |
|31    |47     |3.0   |2.764629  |
+------+-------+------+----------+
only showing top 5 rows



In [26]:
# Avaliando o modelo
rmse = evaluator.evaluate(predictions)

Considerando que os ratings variam de 0 a 5, o Root Mean Square Error (RMSE) de **0.809221** indica que, em média, as previsões do modelo têm uma discrepância de aproximadamente 0.809221 unidades em relação aos valores reais em uma escala de 0 a 5. O termo "root" no RMSE implica que os erros foram primeiro elevados ao quadrado, acentuando a penalização dos erros maiores. A aplicação da raiz quadrada então retorna a métrica à mesma escala dos dados originais.

In [27]:
# Mostrando os resultados
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
print("Rank = %d" % best_model.rank)
print("MaxIter = %d" % best_model._java_obj.parent().getMaxIter())
print("RegParam = %g" % best_model._java_obj.parent().getRegParam())

Root Mean Squared Error (RMSE) on test data = 0.809221
Rank = 20
MaxIter = 15
RegParam = 0.1


In [51]:
# Mostrando as previsões
predictions.sort('userid', 'rating')
predictions.show(5, False)

+------+-------+------+----------+
|userid|movieid|rating|prediction|
+------+-------+------+----------+
|28    |135    |1.0   |2.2751474 |
|28    |593    |4.0   |3.7553833 |
|28    |2184   |4.0   |3.2852948 |
|28    |2559   |3.0   |2.1906624 |
|31    |47     |3.0   |2.764629  |
+------+-------+------+----------+
only showing top 5 rows



# 4 - Gerando recomendações para todos os usuários usando o emlhor modelo obtido após o treino

In [29]:
# Parâmetros:
# - best_model: O modelo que apresentou o melhor desempenho após a validação cruzada
# - recommendForAllUsers(10): Gerando recomendações para todos os usuários, onde 10 é o número de itens a serem recomendados para cada usuário
user_recs = best_model.recommendForAllUsers(10)



In [47]:
# Gerando recomendações para um usuário
def get_recommendations_for_user(recs):
    # Explodir a coluna de recomendações
    df_exploded = recs.select("userid", f.explode("recommendations").alias("rec"))

    # Selecionar as colunas desejadas
    df_transformed = df_exploded.select("rec.movieid", "rec.rating")

    # Renomear as colunas
    df_transformed = df_transformed.withColumnRenamed("rec.movieid", "movieid") \
                                   .withColumnRenamed("rec.rating", "rating")

    return df_transformed

In [48]:
# Gerando recomendações pro usuário 1

user_1 = user_recs.filter("userid = 1")

user_1.show(truncate = False)
user_1.printSchema()

+------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|userid|recommendations                                                                                                                                                                               |
+------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1     |[{318, 4.202657}, {182723, 4.170433}, {356, 4.1683593}, {1035, 4.163223}, {115969, 4.160298}, {364, 4.1588306}, {88125, 4.15752}, {176887, 4.1508374}, {159817, 4.1487494}, {4896, 4.1294866}]|
+------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+


**Recomendação final de 10 filmes para o usuário 1**

In [50]:
user_1_transformed = get_recommendations_for_user(user_1)
user_1_transformed.show(50, truncate=False)

+-------+---------+
|movieid|rating   |
+-------+---------+
|318    |4.202657 |
|182723 |4.170433 |
|356    |4.1683593|
|1035   |4.163223 |
|115969 |4.160298 |
|364    |4.1588306|
|88125  |4.15752  |
|176887 |4.1508374|
|159817 |4.1487494|
|4896   |4.1294866|
+-------+---------+



# 4 - Definindo o modelo

In [113]:
class Recommender():
    '''
Root Mean Squared Error (RMSE) on test data = 0.809221
Rank = 20
MaxIter = 15
RegParam = 0.1
    '''
    def __init__(self, rank: int = 20, maxIter: int = 15, regParam: float = 0.1):
        self.model = None
        self.ratings = None

        self.rank = rank
        self.maxIter = maxIter
        self.regParam = regParam

        self.schema = StructType([
            StructField("userid", IntegerType()),
            StructField("recommendations", ArrayType(StructType([
                StructField("movieid", IntegerType()),
                StructField("rating", FloatType())
            ])))
        ])

    def prepare_data(self, sql: str):
        self.ratings = spark.sql(sql)
        self.ratings = ratings.withColumn("userid", f.col("userid").cast("long"))
    
    def fit(self):
        als = ALS(userCol="userid", itemCol="movieid", ratingCol="rating",
                  coldStartStrategy="drop", nonnegative=True,
                  rank=self.rank, maxIter=self.maxIter, regParam=self.regParam)

        self.model = als.fit(self.ratings)
    
    def mount_user_ratings(self, userid: int, user_ratings: list):
        user = [
            (userid, user_ratings)
        ]
        return spark.createDataFrame(user, schema=self.schema)
    
    def recommendForUserSubset(self, user_ratings, numItems: int):
        recs = self.model.recommendForUserSubset(user_ratings, numItems=numItems)

        df_exploded = recs.select("userid", f.explode("recommendations").alias("rec"))

        df_transformed = df_exploded.select("rec.movieid", "rec.rating")

        df_transformed = df_transformed.withColumnRenamed("rec.movieid", "movieid") \
                                        .withColumnRenamed("rec.rating", "rating")

        return df_transformed

## 4.1 - Exemplo de uso

In [114]:
# Mês e Ano máximo
# extract(year from r.rating_date) = 2023
# extract(month from r.rating_date) = 7

SQL_RATINGS = ''' 
WITH movies_sample AS (
    SELECT m.movieid
         , count(1)      as qtd_ratings
      FROM ratings r
      JOIN movies m
        ON r.movieid = m.movieid
    GROUP BY m.movieid
    HAVING count(1) >= 100
)
SELECT r.userid
     , r.movieid
     , r.rating
 FROM ratings r
 JOIN movies_sample ms
   ON r.movieid = ms.movieid
WHERE 1=1
'''

In [115]:
recommender = Recommender()

In [116]:
recommender.prepare_data(SQL_RATINGS)

In [117]:
recommender.fit()

| movieid | title | year | genres |
|---------|-------|------|--------|
| 4896 | Harry Potter and the Sorcerer's Stone (a.k.a. Harry Potter and the Philosopher's Stone) (2001) | 2001 | [Adventure, Children, Fantasy] |
| 5816 | Harry Potter and the Chamber of Secrets (2002) | 2002 | [Adventure, Fantasy] |
| 8368 | Harry Potter and the Prisoner of Azkaban (2004) | 2004 | [Adventure, Fantasy, IMAX] |
| 40815 | Harry Potter and the Goblet of Fire (2005) | 2005 | [Adventure, Fantasy, Thriller, IMAX] |
| 54001 | Harry Potter and the Order of the Phoenix (2007) | 2007 | [Adventure, Drama, Fantasy, IMAX] |
| 69844 | Harry Potter and the Half-Blood Prince (2009) | 2009 | [Adventure, Fantasy, Mystery, Romance, IMAX] |
| 81834 | Harry Potter and the Deathly Hallows: Part 1 (2010) | 2010 | [Action, Adventure, Fantasy, IMAX] |
| 88125 | Harry Potter and the Deathly Hallows: Part 2 (2011) | 2011 | [Action, Adventure, Drama, Fantasy, Mystery, IMAX] |
| 186777 | The Greater Good - Harry Potter Fan Film (2013) | 2013 | [Action, Adventure, Fantasy] |
| 247038 | Harry Potter: A History Of Magic (2017) | 2017 | [Documentary] |

In [118]:
userid = 1

user_ratings = [
        (4896, 4.5),
        (5816, 3.5),
        (8368, 5.0),
        (40815, 4.5),
        (54001, 3.5),
        (69844, 4.0),
        (81834, 4.5),
        (88125, 5.0),
        (186777, 2.0),
        (247038, 2.5),
    ]

In [119]:
user_ratings = recommender.mount_user_ratings(userid, user_ratings)

In [120]:
user_recomendations = recommender.recommendForUserSubset(user_ratings, numItems=10)
user_recomendations.createOrReplaceTempView("user_recomendations")



In [121]:
SQL_RECOMENDATIONS  = '''
SELECT r.movieid
     , r.rating
     , m.title
     , m.year
     , m.genres
  FROM user_recomendations r
  JOIN movies m
    ON r.movieid = m.movieid
'''

spark.sql(SQL_RECOMENDATIONS).show(truncate=False)

+-------+---------+------------------------------------------+----+-----------------------------+
|movieid|rating   |title                                     |year|genres                       |
+-------+---------+------------------------------------------+----+-----------------------------+
|318    |4.5356064|The Shawshank Redemption (1994)           |1994|[Crime, Drama]               |
|170705 |4.430924 |Band of Brothers (2001)                   |2001|[Action, Drama, War]         |
|356    |4.3958735|Forrest Gump (1994)                       |1994|[Comedy, Drama, Romance, War]|
|182723 |4.3930745|Cosmos: A Spacetime Odissey               |null|[(no genres listed)]         |
|159817 |4.3702717|Planet Earth (2006)                       |2006|[Documentary]                |
|171011 |4.339723 |Planet Earth II (2016)                    |2016|[Documentary]                |
|527    |4.329928 |Schindler's List (1993)                   |1993|[Drama, War]                 |
|2324   |4.3226223|L