#1) Setup: diret√≥rios, Drive, instala√ß√£o, Spark

In [1]:
# ============================================================
# SE√á√ÉO 0 ‚Äì Setup de ambiente, Google Drive e diret√≥rios base
# ============================================================
import os
import json
from datetime import datetime

# Monta o Google Drive se estiver no Colab
try:
    from google.colab import drive
    drive.mount('/content/drive')
    IN_COLAB = True
except ImportError:
    IN_COLAB = False

# Caminhos base
BASE_DIR = "/content/drive/MyDrive/Big Data/Trab Final BigData"
# BASE_DIR = "/content/drive/MyDrive/data-science/Big data - aula/Trab Final BigData"
DATA_DIR = f"{BASE_DIR}/dados_brutos"
PROCESSED_DIR = f"{BASE_DIR}/dados_processados"
MODEL_DIR = f"{BASE_DIR}/modelos"
METRICS_DIR = f"{BASE_DIR}/metricas"

# Criar pastas se n√£o existirem
for path in [DATA_DIR, PROCESSED_DIR, MODEL_DIR, METRICS_DIR]:
    os.makedirs(path, exist_ok=True)

print("BASE_DIR:", BASE_DIR)
print("DATA_DIR:", DATA_DIR)
print("PROCESSED_DIR:", PROCESSED_DIR)
print("MODEL_DIR:", MODEL_DIR)
print("METRICS_DIR:", METRICS_DIR)


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
BASE_DIR: /content/drive/MyDrive/Big Data/Trab Final BigData
DATA_DIR: /content/drive/MyDrive/Big Data/Trab Final BigData/dados_brutos
PROCESSED_DIR: /content/drive/MyDrive/Big Data/Trab Final BigData/dados_processados
MODEL_DIR: /content/drive/MyDrive/Big Data/Trab Final BigData/modelos
METRICS_DIR: /content/drive/MyDrive/Big Data/Trab Final BigData/metricas


In [2]:
# ============================================================
# SE√á√ÉO 1 ‚Äì Instala√ß√£o de depend√™ncias (apenas em Colab)
# ============================================================
# Esta c√©lula √© Notebook-only. Em um script train.py voc√™ pode
# substituir por um requirements.txt ou instalar manualmente.

if IN_COLAB:
    # Java para Spark
    !apt-get update > /dev/null
    !apt-get install openjdk-8-jdk-headless -qq > /dev/null

    !pip install pyspark matplotlib pandas seaborn --quiet


W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)


In [3]:
# ============================================================
# SE√á√ÉO 2 ‚Äì Cria√ß√£o da SparkSession principal
# ============================================================
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("MovieLens-ALS-EndToEnd")
    .config("spark.driver.memory", "8g")
    .config("spark.sql.shuffle.partitions", "200")
    .getOrCreate()
)

spark.sparkContext.setCheckpointDir(os.path.join(BASE_DIR, "checkpoint_spark"))

print("SparkSession criada com sucesso!")


SparkSession criada com sucesso!


#2) Ingest√£o Inteligente com cache (CSV ‚Üí Parquet)

In [4]:
# ============================================================
# SE√á√ÉO 3 ‚Äì Download e ingest√£o inteligente do MovieLens
# ============================================================
import os

# Caminhos para o zip e para a pasta descompactada
ML_ZIP_PATH = os.path.join(DATA_DIR, "ml-latest.zip")
ML_EXTRACT_DIR = os.path.join(DATA_DIR, "ml-latest")

# 3.1 ‚Äì Baixar zip somente se ainda n√£o existir (wget -nc)
if IN_COLAB:
    # Aqui vou chamar via shell, mas mantendo a sem√¢ntica do -nc.
    if not os.path.exists(ML_ZIP_PATH):
        !wget -nc https://files.grouplens.org/datasets/movielens/ml-latest.zip -P "$DATA_DIR"
    else:
        print("‚úÖ Arquivo ZIP j√° existe, n√£o ser√° baixado novamente.")
else:
    print("Rodando fora do Colab: certifique-se de que o arquivo ml-latest.zip j√° est√° em", ML_ZIP_PATH)

# 3.2 ‚Äì Descompactar apenas se a pasta ainda n√£o existir
if IN_COLAB:
    if not os.path.exists(ML_EXTRACT_DIR):
        !unzip -o "$ML_ZIP_PATH" -d "$DATA_DIR"
    else:
        print("‚úÖ Pasta j√° descompactada, pulando unzip.")
else:
    print("Rodando localmente: certifique-se de que a pasta 'ml-latest' j√° foi descompactada dentro de", DATA_DIR)

print("Arquivos brutos esperados em:", ML_EXTRACT_DIR)


‚úÖ Arquivo ZIP j√° existe, n√£o ser√° baixado novamente.
‚úÖ Pasta j√° descompactada, pulando unzip.
Arquivos brutos esperados em: /content/drive/MyDrive/Big Data/Trab Final BigData/dados_brutos/ml-latest


In [5]:
# ============================================================
# SE√á√ÉO 4 ‚Äì ETL com l√≥gica de cache (CSV -> Parquet)
#      - Se Parquet existir em PROCESSED_DIR, pular leitura do CSV
#      - Caso contr√°rio, ler CSV, tratar, salvar Parquet
# ============================================================
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, FloatType

RATINGS_PARQUET_PATH = os.path.join(PROCESSED_DIR, "ratings_parquet")
MOVIES_PARQUET_PATH = os.path.join(PROCESSED_DIR, "movies_parquet")

def load_or_create_processed_data():
    """Carrega ratings e movies em formato Parquet, ou cria a partir dos CSVs se ainda n√£o existirem."""
    if os.path.exists(RATINGS_PARQUET_PATH) and os.path.exists(MOVIES_PARQUET_PATH):
        print("‚úÖ Encontrado Parquet processado. Carregando de PROCESSED_DIR...")
        ratings = spark.read.parquet(RATINGS_PARQUET_PATH)
        movies = spark.read.parquet(MOVIES_PARQUET_PATH)
        return ratings, movies

    print("‚öôÔ∏è  Parquet n√£o encontrado. Lendo CSV bruto e processando...")

    ratings_csv_path = os.path.join(ML_EXTRACT_DIR, "ratings.csv")
    movies_csv_path = os.path.join(ML_EXTRACT_DIR, "movies.csv")

    # Leitura bruta
    df_r_raw = spark.read.csv(ratings_csv_path, header=True, inferSchema=True)
    df_m_raw = spark.read.csv(movies_csv_path, header=True, inferSchema=True)

    # Cast de tipos e sele√ß√£o de colunas relevantes
    ratings = (
        df_r_raw
        .select(
            F.col("userId").cast(IntegerType()).alias("userId"),
            F.col("movieId").cast(IntegerType()).alias("movieId"),
            F.col("rating").cast(FloatType()).alias("rating"),
            F.col("timestamp").cast("long").alias("timestamp")
        )
    )

    movies = (
        df_m_raw
        .select(
            F.col("movieId").cast(IntegerType()).alias("movieId"),
            F.col("title").cast("string").alias("title"),
            F.col("genres").cast("string").alias("genres")
        )
    )

    # Tratamento de nulos
    ratings_before_nulls = ratings.count()
    ratings = ratings.dropna(subset=["userId", "movieId", "rating"])
    ratings_after_nulls = ratings.count()

    movies_before_nulls = movies.count()
    movies = movies.dropna(subset=["movieId", "title", "genres"])
    movies_after_nulls = movies.count()

    # Remo√ß√£o de ratings fora do range esperado [0.5, 5.0]
    ratings_before_range = ratings.count()
    ratings = ratings.filter((F.col("rating") >= 0.5) & (F.col("rating") <= 5.0))
    ratings_after_range = ratings.count()

    # Tratamento de duplicatas em ratings: mesmo (userId, movieId)
    ratings_before_dups = ratings.count()
    ratings = (
        ratings
        .groupBy("userId", "movieId")
        .agg(F.avg("rating").alias("rating"))
    )
    ratings_after_dups = ratings.count()

    # Tratamento de duplicatas em movies: mesmo movieId
    movies_before_dups = movies.count()
    movies = movies.dropDuplicates(["movieId"])
    movies_after_dups = movies.count()

    # Salvar Parquet
    ratings.write.mode("overwrite").parquet(RATINGS_PARQUET_PATH)
    movies.write.mode("overwrite").parquet(MOVIES_PARQUET_PATH)

    print("‚úÖ Dados processados e salvos em formato Parquet.")

    # Retorna tamb√©m alguns metadados de ETL para registro
    etl_metrics = {
        "ratings": {
            "before_nulls": ratings_before_nulls,
            "after_nulls": ratings_after_nulls,
            "before_range_filter": ratings_before_range,
            "after_range_filter": ratings_after_range,
            "before_duplicates": ratings_before_dups,
            "after_duplicates": ratings_after_dups,
            "duplicates_removed": ratings_before_dups - ratings_after_dups,
        },
        "movies": {
            "before_nulls": movies_before_nulls,
            "after_nulls": movies_after_nulls,
            "before_duplicates": movies_before_dups,
            "after_duplicates": movies_after_dups,
            "duplicates_removed": movies_before_dups - movies_after_dups,
        },
    }

    return ratings, movies, etl_metrics

# Chamada principal de ETL
etl_result = load_or_create_processed_data()
if len(etl_result) == 2:
    ratings_df, movies_df = etl_result
    etl_metrics = None
else:
    ratings_df, movies_df, etl_metrics = etl_result

ratings_df.cache()
movies_df.cache()

print("Ratings count:", ratings_df.count())
print("Movies count:", movies_df.count())


‚úÖ Encontrado Parquet processado. Carregando de PROCESSED_DIR...
Ratings count: 33832162
Movies count: 86537


#3) Gera√ß√£o e salvamento das m√©tricas (schema, nulos, duplicatas)

In [6]:
# ============================================================
# SE√á√ÉO 5 ‚Äì C√°lculo e salvamento de m√©tricas de qualidade
#      - Schemas
#      - Contagem de nulos
#      - Contagem de duplicatas
# ============================================================
import pandas as pd
import os

NULL_COUNTS_PATH = os.path.join(METRICS_DIR, "null_counts.csv")
DUP_SUMMARY_PATH = os.path.join(METRICS_DIR, "duplicates_summary.json")

def save_quality_metrics(ratings, movies, extra_etl_metrics=None):
    # Se j√° tiver tudo salvo, s√≥ carrega e N√ÉO recalcula nada
    if os.path.exists(NULL_COUNTS_PATH) and os.path.exists(DUP_SUMMARY_PATH):
        print("‚úÖ M√©tricas de qualidade j√° existem. Carregando do disco e pulando rec√°lculo...")
        null_counts_df = pd.read_csv(NULL_COUNTS_PATH)
        with open(DUP_SUMMARY_PATH, "r") as f:
            duplicates_summary = json.load(f)
        return null_counts_df, duplicates_summary

    # ----- Schema -----
    ratings_schema_json = json.loads(ratings.schema.json())
    movies_schema_json = json.loads(movies.schema.json())

    with open(os.path.join(METRICS_DIR, "ratings_schema.json"), "w") as f:
        json.dump(ratings_schema_json, f, indent=2)

    with open(os.path.join(METRICS_DIR, "movies_schema.json"), "w") as f:
        json.dump(movies_schema_json, f, indent=2)

    # ----- Nulos por coluna -----
    def null_counts_df_func(df, dataset_name):
        null_counts = df.select([
            F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns
        ]).toPandas()
        null_counts = null_counts.T.reset_index()
        null_counts.columns = ["column", "null_count"]
        null_counts["dataset"] = dataset_name
        return null_counts

    ratings_nulls = null_counts_df_func(ratings, "ratings")
    movies_nulls = null_counts_df_func(movies, "movies")

    all_nulls = pd.concat([ratings_nulls, movies_nulls], ignore_index=True)
    all_nulls.to_csv(NULL_COUNTS_PATH, index=False)

    # ----- Duplicatas (mesma l√≥gica da ETL) -----
    duplicates_summary = {}

    dup_ratings = (
        ratings.groupBy("userId", "movieId")
        .agg(F.count("*").alias("qtd_votos"))
        .filter(F.col("qtd_votos") > 1)
    )
    duplicates_summary["ratings"] = {
        "pairs_with_more_than_one_rating": dup_ratings.count()
    }

    dup_movies = (
        movies.groupBy("movieId")
        .agg(F.count("*").alias("qtd_registros"))
        .filter(F.col("qtd_registros") > 1)
    )
    duplicates_summary["movies"] = {
        "duplicate_movieIds": dup_movies.count()
    }

    if extra_etl_metrics is not None:
        duplicates_summary["etl"] = extra_etl_metrics

    with open(DUP_SUMMARY_PATH, "w") as f:
        json.dump(duplicates_summary, f, indent=2)

    print("‚úÖ M√©tricas de qualidade salvas em METRICS_DIR.")
    return all_nulls, duplicates_summary

# chamada
null_counts_df, duplicates_summary = save_quality_metrics(
    ratings_df, movies_df, extra_etl_metrics=etl_metrics
)



‚úÖ M√©tricas de qualidade j√° existem. Carregando do disco e pulando rec√°lculo...


#4) Visualiza√ß√µes (3 gr√°ficos salvos em PNG no METRICS_DIR)

In [7]:
# ============================================================
# SE√á√ÉO 6 ‚Äì Visualiza√ß√µes (vers√£o nova com seaborn)
#      1) Filmes por G√™nero (Barra %)
#      2) Nota M√©dia por G√™nero
#      3) Distribui√ß√£o das Notas (Donut Chart)
# ============================================================
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql import functions as F

df_m = movies_df
df_r = ratings_df

print("\nüìä --- GERANDO GR√ÅFICOS (backend) ---")

# ---------------------------------------------------------
# 2. FILMES POR G√äNERO
# ---------------------------------------------------------

filmes_por_genero_path = os.path.join(METRICS_DIR, "filmes_por_genero.png")
filmes_por_genero_csv = os.path.join(METRICS_DIR, "filmes_por_genero.csv")

df_genres = df_m.withColumn("genre", F.explode(F.split(F.col("genres"), "\\|")))

# Remove o "g√™nero" que na verdade √© um t√≠tulo de filme incorreto
df_genres = df_genres.filter(F.col("genre") != ' We\'re Comin\' To Get Ya!"" (2014)"')

if os.path.exists(filmes_por_genero_path) and os.path.exists(filmes_por_genero_csv):
    print("‚úÖ Filmes por g√™nero j√° existem em disco. Pulando gera√ß√£o...")
else:
    genre_counts = (
        df_genres.groupBy("genre")
        .count()
        .orderBy(F.col("count").desc())
        .toPandas()
    )

    total_films = genre_counts["count"].sum()
    genre_counts["percent"] = genre_counts["count"] / total_films * 100.0
    genre_counts.to_csv(filmes_por_genero_csv, index=False)

    plt.figure(figsize=(12, 8))
    ax = sns.barplot(x="count", y="genre", data=genre_counts, palette="viridis")
    plt.xlim([0, genre_counts["count"].max() * 1.15])
    plt.title("Filmes por G√™nero (%)")
    plt.xlabel("Quantidade")

    # üî¥ R√≥tulo s√≥ com a %, sem valor absoluto
    for container in ax.containers:
        ax.bar_label(
            container,
            fmt=lambda x: f"{x/total_films*100:.1f}%",
            padding=3
        )

    plt.tight_layout()
    plt.savefig(filmes_por_genero_path, dpi=120, bbox_inches="tight")
    plt.close()
    print("‚úÖ Filmes por g√™nero salvo em:", filmes_por_genero_path)

print("‚úÖ Filmes por g√™nero salvo em:", filmes_por_genero_path)

# ---------------------------------------------------------
# 3. NOTA M√âDIA POR G√äNERO
# ---------------------------------------------------------

nota_media_genero_path = os.path.join(METRICS_DIR, "nota_media_por_genero.png")
nota_media_genero_csv = os.path.join(METRICS_DIR, "nota_media_por_genero.csv")

if os.path.exists(nota_media_genero_path) and os.path.exists(nota_media_genero_csv):
    print("‚úÖ Nota m√©dia por g√™nero j√° existe em disco. Pulando gera√ß√£o...")
else:
    avg_ratings_per_movie = df_r.groupBy("movieId").agg(
        F.avg("rating").alias("avg_rating")
    )

    genre_ratings = df_genres.join(avg_ratings_per_movie, "movieId")

    avg_genre = (
        genre_ratings.groupBy("genre")
        .agg(F.avg("avg_rating").alias("mean_rating"))
        .orderBy(F.col("mean_rating").desc())
        .toPandas()
    )

    avg_genre.to_csv(nota_media_genero_csv, index=False)

    plt.figure(figsize=(12, 6))
    sns.barplot(x="mean_rating", y="genre", data=avg_genre, palette="magma")
    plt.title("Nota M√©dia por G√™nero")
    plt.xlabel("Nota M√©dia (0‚Äì5)")
    plt.xlim(2, 4.5)
    plt.ylabel("G√™nero")
    plt.tight_layout()
    plt.savefig(nota_media_genero_path, dpi=120, bbox_inches="tight")
    plt.close()
    print("‚úÖ Nota m√©dia por g√™nero salva em:", nota_media_genero_path)

plt.close()
print("‚úÖ Nota m√©dia por g√™nero salva em:", nota_media_genero_path)

# ---------------------------------------------------------
# 4. DISTRIBUI√á√ÉO DAS NOTAS (Donut Chart)
# ---------------------------------------------------------

dist_notas_path = os.path.join(METRICS_DIR, "distribuicao_notas.png")
dist_notas_csv = os.path.join(METRICS_DIR, "distribuicao_notas.csv")

if os.path.exists(dist_notas_path) and os.path.exists(dist_notas_csv):
    print("‚úÖ Distribui√ß√£o das notas j√° existe em disco. Pulando gera√ß√£o...")
else:
    rating_counts = (
        df_r.groupBy("rating")
        .count()
        .orderBy("rating")
        .toPandas()
    )

    total_count = rating_counts["count"].sum()
    rating_counts["percent"] = rating_counts["count"] / total_count * 100.0
    rating_counts.to_csv(dist_notas_csv, index=False)

    plt.figure(figsize=(9, 9))
    wedges, texts, autotexts = plt.pie(
        rating_counts["count"],
        labels=rating_counts["rating"],
        autopct="%1.1f%%",
        startangle=90,
        colors=sns.color_palette("pastel"),
        pctdistance=0.85
    )

    centre_circle = plt.Circle((0, 0), 0.70, fc="white")
    fig = plt.gcf()
    fig.gca().add_artist(centre_circle)

    plt.title("Distribui√ß√£o das Notas (Share %)")
    plt.tight_layout()
    plt.savefig(dist_notas_path, dpi=120, bbox_inches="tight")
    plt.close()
    print("‚úÖ Distribui√ß√£o das notas salva em:", dist_notas_path)

plt.close()
print("‚úÖ Distribui√ß√£o das notas salva em:", dist_notas_path)


üìä --- GERANDO GR√ÅFICOS (backend) ---
‚úÖ Filmes por g√™nero j√° existem em disco. Pulando gera√ß√£o...
‚úÖ Filmes por g√™nero salvo em: /content/drive/MyDrive/Big Data/Trab Final BigData/metricas/filmes_por_genero.png
‚úÖ Nota m√©dia por g√™nero j√° existe em disco. Pulando gera√ß√£o...
‚úÖ Nota m√©dia por g√™nero salva em: /content/drive/MyDrive/Big Data/Trab Final BigData/metricas/nota_media_por_genero.png
‚úÖ Distribui√ß√£o das notas j√° existe em disco. Pulando gera√ß√£o...
‚úÖ Distribui√ß√£o das notas salva em: /content/drive/MyDrive/Big Data/Trab Final BigData/metricas/distribuicao_notas.png


#5) Modelagem ALS com Tuning + CrossValidation + Persist√™ncia

In [8]:
# ============================================================
# SE√á√ÉO 7 ‚Äì Treino do modelo ALS com ParamGrid + CrossValidator
#      - L√≥gica de cache do modelo: se existir em MODEL_DIR, carregar
#      - Se n√£o existir: treina, avalia e salva BestModel
# ============================================================
import os
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

ALS_MODEL_PATH = os.path.join(MODEL_DIR, "modelo_als_completo")

def train_or_load_als_model(ratings):
    # 7.1 ‚Äì Cache do modelo: se j√° existir salvo, apenas carregamos
    if os.path.exists(ALS_MODEL_PATH):
        print("‚úÖ Modelo ALS encontrado em disco. Carregando de:", ALS_MODEL_PATH)
        best_model = ALSModel.load(ALS_MODEL_PATH)
        return best_model, None

    print("‚öôÔ∏è  Modelo ALS n√£o encontrado. Iniciando pipeline de treino com CrossValidation...")

    from pyspark.sql.functions import count

    user_counts = ratings.groupBy("userId").agg(count("*").alias("user_cnt"))
    movie_counts = ratings.groupBy("movieId").agg(count("*").alias("movie_cnt"))

    ratings_filtered = (
        ratings
        .join(user_counts, "userId")
        .join(movie_counts, "movieId")
        .filter("user_cnt >= 5 AND movie_cnt >= 5")
        .select("userId", "movieId", "rating")
    )

    ratings_filtered = ratings_filtered.cache()
    print("Registros ap√≥s filtro de densidade:", ratings_filtered.count())

    # 7.2 ‚Äì Split train/test
    train, test = ratings_filtered.randomSplit([0.8, 0.2], seed=42)
    train = train.cache()
    test = test.cache()
    print("Train count:", train.count(), "| Test count:", test.count())

    # 7.3 ‚Äì Configura√ß√£o base do ALS
    als = ALS(
        userCol="userId",
        itemCol="movieId",
        ratingCol="rating",
        implicitPrefs=False,
        coldStartStrategy="drop",
        nonnegative=True,
        checkpointInterval=10,
        maxIter=10
    )

    # 7.4 ‚Äì Grade de hiperpar√¢metros
    param_grid = (
        ParamGridBuilder()
        .addGrid(als.rank, [10, 20, 40])
        .addGrid(als.regParam, [0.01, 0.05, 0.1])
        .addGrid(als.maxIter, [10, 15])
        .build()
    )

    # 7.5 ‚Äì Avaliador
    rmse_evaluator = RegressionEvaluator(
        metricName="rmse",
        labelCol="rating",
        predictionCol="prediction"
    )

    # 7.6 ‚Äì CrossValidator (k-fold)
    cv = CrossValidator(
        estimator=als,
        estimatorParamMaps=param_grid,
        evaluator=rmse_evaluator,
        numFolds=3,
        parallelism=4
    )

    # 7.7 ‚Äì Treino
    cv_model = cv.fit(train)
    best_model = cv_model.bestModel

    print("‚úÖ Treino conclu√≠do. Melhores hiperpar√¢metros:")
    print("  rank     =", best_model._java_obj.parent().getRank())
    print("  regParam =", best_model._java_obj.parent().getRegParam())
    print("  maxIter  =", best_model._java_obj.parent().getMaxIter())

    # 7.8 ‚Äì Avalia√ß√£o em teste
    predictions = best_model.transform(test)
    rmse_test = rmse_evaluator.evaluate(predictions)

    print(f"RMSE no conjunto de teste: {rmse_test:.4f}")

    # 7.9 ‚Äì Salvar modelo e m√©tricas
    best_model.save(ALS_MODEL_PATH)
    print("‚úÖ BestModel salvo em:", ALS_MODEL_PATH)

    als_metrics = {
        "timestamp": datetime.utcnow().isoformat(),
        "best_params": {
            "rank": best_model._java_obj.parent().getRank(),
            "regParam": best_model._java_obj.parent().getRegParam(),
            "maxIter": best_model._java_obj.parent().getMaxIter(),
        },
        "rmse_test": rmse_test,
    }

    with open(os.path.join(METRICS_DIR, "als_metrics.json"), "w") as f:
        json.dump(als_metrics, f, indent=2)

    print("‚úÖ M√©tricas do modelo ALS salvas em:", os.path.join(METRICS_DIR, "als_metrics.json"))

    return best_model, als_metrics

als_model, als_metrics = train_or_load_als_model(ratings_df)


‚úÖ Modelo ALS encontrado em disco. Carregando de: /content/drive/MyDrive/Big Data/Trab Final BigData/modelos/modelo_als_completo


#6) Agrupamentos e Parquets para gr√°ficos (top filmes, g√™neros, usu√°rios)

In [9]:
TOP_MOVIES_PARQUET = os.path.join(PROCESSED_DIR, "top_movies.parquet")
GENRE_STATS_PARQUET = os.path.join(PROCESSED_DIR, "genre_stats.parquet")
TOP10_MOST_WATCHED_PARQUET = os.path.join(PROCESSED_DIR, "top10_most_watched.parquet")
TOP10_BEST_RATED_PARQUET = os.path.join(PROCESSED_DIR, "top10_best_rated.parquet")

# --- Top filmes ---
if os.path.exists(TOP_MOVIES_PARQUET):
    print("‚úÖ top_movies.parquet j√° existe. Carregando...")
    top_movies_spark = spark.read.parquet(TOP_MOVIES_PARQUET)
else:
    print("‚öôÔ∏è Gerando top_movies.parquet...")
    top_movies_spark = (
        ratings_df.groupBy("movieId")
        .agg(
            F.count("*").alias("num_ratings"),
            F.avg("rating").alias("avg_rating")
        )
        .join(movies_df, on="movieId", how="left")
    )
    top_movies_spark.write.mode("overwrite").parquet(TOP_MOVIES_PARQUET)

# Tabelas derivadas para o front
min_ratings = 50  # limiar para "melhor nota"

top10_most_watched = (
    top_movies_spark
    .orderBy(F.desc("num_ratings"), F.desc("avg_rating"))
    .limit(10)
)
top10_most_watched.write.mode("overwrite").parquet(TOP10_MOST_WATCHED_PARQUET)

top10_best_rated = (
    top_movies_spark
    .filter(F.col("num_ratings") >= min_ratings)
    .orderBy(F.desc("avg_rating"), F.desc("num_ratings"))
    .limit(10)
)
top10_best_rated.write.mode("overwrite").parquet(TOP10_BEST_RATED_PARQUET)

print("‚úÖ Top 10 mais assistidos e Top 10 melhor nota salvos em dados_processados/")

# --- Stats por g√™nero ---
if os.path.exists(GENRE_STATS_PARQUET):
    print("‚úÖ genre_stats.parquet j√° existe. Carregando...")
    genre_stats = spark.read.parquet(GENRE_STATS_PARQUET)
else:
    if "genres" in movies_df.columns:
        movies_exploded = (
            movies_df
            .withColumn("genre", F.explode(F.split(F.col("genres"), "\\|")))
            .filter(F.col("genre") != "(no genres listed)")
        )

        ratings_genres = ratings_df.join(movies_exploded, on="movieId", how="left")

        genre_stats = (
            ratings_genres.filter(F.col("genre").isNotNull())
            .groupBy("genre")
            .agg(
                F.count("*").alias("num_ratings"),
                F.avg("rating").alias("avg_rating")
            )
        )
        genre_stats.write.mode("overwrite").parquet(GENRE_STATS_PARQUET)
        print("‚úÖ genre_stats.parquet salvo.")


‚úÖ top_movies.parquet j√° existe. Carregando...
‚úÖ Top 10 mais assistidos e Top 10 melhor nota salvos em dados_processados/
‚úÖ genre_stats.parquet j√° existe. Carregando...


#7) Split train/test para o ALS

In [10]:
# Bloco 7 ‚Äì Train/Test split

train_ratings, test_ratings = ratings_df.randomSplit([0.8, 0.2], seed=42)
train_ratings = train_ratings.cache()
test_ratings = test_ratings.cache()

print("Train:", train_ratings.count(), "Test:", test_ratings.count())


#8) Avalia√ß√£o do modelo ALS pesado (RMSE + MAPE)

In [11]:
# Bloco 8 ‚Äì Avalia√ß√£o do modelo ALS pesado (RMSE + MAPE)

from pyspark.ml.recommendation import ALSModel
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import abs as F_abs, col, avg as F_avg
import json
import os

heavy_model_path = f"{MODEL_DIR}/modelo_als_completo"
metrics_path = f"{METRICS_DIR}/als_metrics.json"
print("Caminho do modelo pesado:", heavy_model_path)

# Carrega modelo pesado; se n√£o existir, chama seu treino com CrossValidation
if os.path.exists(heavy_model_path):
    print("üîµ Encontrado modelo pesado (offline). Carregando:", heavy_model_path)
    model = ALSModel.load(heavy_model_path)
else:
    print("üü† Nenhum modelo encontrado. Treinando modelo ALS (CrossValidation)...")
    model, _ = train_or_load_als_model(ratings_df)

# Avalia√ß√£o no conjunto de teste
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction"
)

predictions = model.transform(test_ratings)

rmse = evaluator.evaluate(predictions)
print("RMSE no conjunto de teste:", rmse)

# MAPE
predictions_mape = predictions.withColumn(
    "ape",
    F_abs((col("rating") - col("prediction")) / col("rating"))
)
mape_test = predictions_mape.select(F_avg("ape").alias("mape")).first()["mape"]
print(f"MAPE no conjunto de teste: {mape_test:.4f}")

# Salva m√©tricas para o front
with open(metrics_path, "w") as f:
    json.dump(
        {
            "rmse": float(rmse),
            "mape": float(mape_test)
        },
        f,
        indent=2
    )

print("‚úÖ M√©tricas salvas em:", metrics_path)


#9) Recomenda√ß√µes pr√©-calculadas (Front)

In [12]:
# Bloco 9 ‚Äì Recomenda√ß√µes pr√©-calculadas para ~600 usu√°rios
from pyspark.sql.functions import explode, col

USER_RECS_PARQUET_NESTED = os.path.join(PROCESSED_DIR, "user_recs_600.parquet")
USER_RECS_PARQUET_FLAT = os.path.join(PROCESSED_DIR, "user_recs_600_flat.parquet")

print("PROCESSED_DIR:", PROCESSED_DIR)

if os.path.exists(USER_RECS_PARQUET_FLAT):
    print("‚úÖ user_recs_600_flat.parquet j√° existe em:", USER_RECS_PARQUET_FLAT)
else:
    print("‚öôÔ∏è Gerando recomenda√ß√µes pr√©-calculadas para ~600 usu√°rios...")

    try:
        modelo_para_recs = model
    except NameError:
        modelo_para_recs = als_model

    users_subset = (
        ratings_df
        .select("userId")
        .distinct()
        .orderBy("userId")
        .limit(600)
    )

    # 1) gera recomenda√ß√µes aninhadas (array<struct<movieId, rating>>)
    recs_600 = modelo_para_recs.recommendForUserSubset(users_subset, 50)

    # salva vers√£o aninhada
    recs_600.write.mode("overwrite").parquet(USER_RECS_PARQUET_NESTED)

    # 2) FLATTEN em Spark: (userId, movieId)
    recs_flat = (
        recs_600
        .select("userId", explode("recommendations").alias("rec"))
        .select(
            col("userId"),
            col("rec.movieId").alias("movieId"),
            col("rec.rating").alias("predicted_rating")
        )
    )

    recs_flat.write.mode("overwrite").parquet(USER_RECS_PARQUET_FLAT)

    print("‚úÖ Vers√£o aninhada salva em:", USER_RECS_PARQUET_NESTED)
    print("‚úÖ Vers√£o flatten salva em:", USER_RECS_PARQUET_FLAT)


PROCESSED_DIR: /content/drive/MyDrive/Big Data/Trab Final BigData/dados_processados
‚úÖ user_recs_600_flat.parquet j√° existe em: /content/drive/MyDrive/Big Data/Trab Final BigData/dados_processados/user_recs_600_flat.parquet


In [13]:
# ============================================================
# SE√á√ÉO 8 ‚Äì Encerramento
# ============================================================
spark.stop()
print("SparkSession encerrada.")

SparkSession encerrada.
