In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, year, avg, format_number, col, round, when
import pandas as pd

spark = SparkSession.builder.appName("worldwide_box_office").getOrCreate()

df = pd.read_csv("worldwide_box_office_1977_2025.csv")
df = spark.createDataFrame(df)

# retirando as duplicatas nos nomes dos filmes
df = df.dropDuplicates(["movie_title"])

# separando a data que está em ANO-MES-DIA para ficar em colunas
df = df.withColumn("release_year", split(df["release_date"], "-")[0]) \
       .withColumn("release_month", split(df["release_date"], "-")[1]) \
       .withColumn("release_day", split(df["release_date"], "-")[2])

df_cpi = spark.read.csv("CPIAUCNS.csv", header=True, inferSchema=True)

cpi_df = df_cpi.withColumn("year_cpi", year("observation_date")) \
    .groupBy("year_cpi") \
    .agg(avg("CPIAUCNS").alias("CPI_value"))

cpi_df = cpi_df.withColumn('CPI_Value', format_number('CPI_Value', 2))

df = df.join(cpi_df.withColumnRenamed("CPI_value", "CPI_original"), df.release_year == cpi_df.year_cpi, "left")

CPI_2024 = 313.69
df = df.withColumn("domestic_gross_adjusted", col("domestic_gross") * (CPI_2024 / col("CPI_original")))
df = df.withColumn("international_gross_adjusted", col("international_gross") * (CPI_2024 / col("CPI_original")))
df = df.withColumn("total_gross_adjusted", col("total_gross") * (CPI_2024 / col("CPI_original")))
df = df.withColumn("domestic_opening_adjusted", col("domestic_opening") * (CPI_2024 / col("CPI_original")))

df = df.withColumn('domestic_gross_adjusted', round('domestic_gross_adjusted').cast('long'))
df = df.withColumn('international_gross_adjusted', round('international_gross_adjusted').cast('long'))
df = df.withColumn('total_gross_adjusted', round('total_gross_adjusted').cast('long'))
df = df.withColumn('domestic_opening_adjusted', round('domestic_opening_adjusted').cast('long'))

df = df.withColumn("rating_category",
    when(col("indicative_rating").isin("G", "PG", "PG-13"), "Family")
    .when(col("indicative_rating").isin("R", "NC-17", "X"), "Adult")
    .otherwise("Missing")
)

# Categoria do filme de acordo com o tempo -> < 90 = short, >= 90 e <= 150 = medio, > 150 = longo
df = df.withColumn("duration_category",
    when(col("running_time") < 90, "Short")
    .when((col("running_time") >= 90) & (col("running_time") <= 120), "Medium")
    .when(col("running_time") > 120, "Long")
)

df = df.na.drop(subset="domestic_opening")

# 🔧 Cria a coluna distributor_group baseada no nome completo do distribuidor
df = df.withColumn(
    "distributor_group",
    when(
        col("domestic_distributor").isin(
            # ⭐ Major Studios
            "Warner Bros.", "Universal Pictures", "Paramount Pictures",
            "Walt Disney Studios Motion Pictures", "Sony Pictures Releasing",
            "20th Century Studios", "Twentieth Century Fox",
            "Lionsgate", "Columbia Pictures", "Metro-Goldwyn-Mayer (MGM)", "Fox Searchlight Pictures",
            "DreamWorks Distribution", "DreamWorks", "New Line Cinema"
        ), "Major Studio"
    ).when(
        col("domestic_distributor").isin(
            # ⭐ Mid-size Studios
            "A24", "Miramax", "Focus Features", "Summit Entertainment",
            "Searchlight Pictures", "STX Entertainment", "Neon",
            "Roadside Attractions", "IFC Films", "Open Road Films (II)",
            "Annapurna Pictures", "Bleecker Street Media", "Dimension Films",
            "Broad Green Pictures", "Magnolia Pictures", "Orion Pictures", "CMC Pictures"
        ), "Mid-size Studio"
    ).when(
        col("domestic_distributor").isin(
            # ⭐ Streaming Platforms
            "Netflix", "Amazon Studios", "Amazon MGM Studios"
        ), "Streaming Platform"
    ).when(
        col("domestic_distributor").isNull(), "Missing"
    ).otherwise("Small / Indie Studio")
)

# ✅ Verifica a contagem por grupo
df.groupBy("distributor_group").count().show(truncate=False)


#spark.stop()

In [None]:
# ========================================
# 🚀 Pipeline de Random Forest Regressor
# Prevendo total_gross_adjusted com:
# - OneHotEncoder em release_month
# - domestic_opening_adjusted
# - duration_category
# ========================================

# 🔧 Imports necessários
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.sql.functions import col

# ========================================
# 1. Tratamento de NULL em domestic_opening_adjusted
# ========================================
# Substituindo valores nulos por 0 para não gerar erro no modelo
df = df.fillna({'domestic_opening_adjusted': 0})

# ========================================
# 2. Indexação e OneHot Encoding de release_month
# ========================================

# StringIndexer transforma release_month em índices numéricos
indexer_month = StringIndexer(inputCol="release_month", outputCol="month_index")

# OneHotEncoder transforma os índices em vetores binários
encoder_month = OneHotEncoder(inputCol="month_index", outputCol="month_encoded")


indexer_rating = StringIndexer(inputCol="rating_category", outputCol="rating_index")
encoder_rating = OneHotEncoder(inputCol="rating_index", outputCol="rating_encoded")

# ========================================
# 3. Indexação de duration_category (Short, Medium, Long)
# ========================================
indexer_duration = StringIndexer(inputCol="duration_category", outputCol="duration_index")

# Teste para ver se consigo colocar o gênero!
genre_indexer = StringIndexer(inputCol="genres", outputCol="genre_index")
genre_encoder = OneHotEncoder(inputCol="genre_index", outputCol="genre_encoded")

# Teste para ver se consigo incorporar distribuidor (tecnicamente é algo que pode ser importante)
distributor_indexer = StringIndexer(inputCol="distributor_group", outputCol="distributor_index")
distributor_encoder = OneHotEncoder(inputCol="distributor_index", outputCol="distributor_encoded")

# ========================================
# 4. Montagem do vetor de features
# ========================================
assembler = VectorAssembler(
    inputCols=["month_encoded", "domestic_opening_adjusted", "duration_index", "genre_encoded", "distributor_encoded", "rating_encoded"],
    outputCol="features"
)

# ========================================
# 5. Modelo Random Forest Regressor
# ========================================
rf = RandomForestRegressor(featuresCol="features", labelCol="total_gross_adjusted")

# ========================================
# 6. Pipeline completo
# ========================================
pipeline = Pipeline(stages=[
    indexer_month,
    encoder_month,
    indexer_duration,
    genre_indexer,
    genre_encoder,
    distributor_indexer,
    distributor_encoder,
    indexer_rating,
    encoder_rating,
    assembler,
    rf
])

# ========================================
# 7. Treinamento do modelo
# ========================================
model = pipeline.fit(df)

# ========================================
# 8. Predição e visualização dos resultados
# ========================================
predictions = model.transform(df)

# Exibindo os 10 primeiros filmes com:
# - Título
# - Mês de lançamento
# - Domestic Opening ajustado
# - Receita real ajustada
# - Receita prevista pelo modelo
predictions.select(
    "movie_title",
    "genres",
    "distributor_group",
    "release_month",
    "domestic_opening_adjusted",
    "total_gross_adjusted",
    "prediction"
).show(10)

# ========================================
# 🔍 FIM DO PIPELINE
# ========================================


In [None]:
# ========================================
# 🚀 Pipeline com GBTRegressor
# Prevendo total_gross_adjusted com:
# - OneHotEncoder em release_month
# - domestic_opening_adjusted
# - duration_category
# - genre_encoded
# - distributor_encoded
# ========================================

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col

# ========================================
# 1. Filtrando dados com opening > 0
# ========================================
df_filtered = df.filter(col("domestic_opening_adjusted") > 0)

# ========================================
# 2. Tratamento de NULLs
# ========================================
df_filtered = df_filtered.fillna({'domestic_opening_adjusted': 0})

# ========================================
# 3. Indexação e Codificação
# ========================================
indexer_month = StringIndexer(inputCol="release_month", outputCol="month_index")
encoder_month = OneHotEncoder(inputCol="month_index", outputCol="month_encoded")

indexer_duration = StringIndexer(inputCol="duration_category", outputCol="duration_index")

genre_indexer = StringIndexer(inputCol="genres", outputCol="genre_index")
genre_encoder = OneHotEncoder(inputCol="genre_index", outputCol="genre_encoded")

distributor_indexer = StringIndexer(inputCol="distributor_group", outputCol="distributor_index")
distributor_encoder = OneHotEncoder(inputCol="distributor_index", outputCol="distributor_encoded")

# ========================================
# 4. Montagem do vetor de features
# ========================================
assembler = VectorAssembler(
    inputCols=["month_encoded", "domestic_opening_adjusted", "duration_index", "genre_encoded", "distributor_encoded"],
    outputCol="features"
)

# ========================================
# 5. Modelo: GBT Regressor
# ========================================
gbt = GBTRegressor(
    featuresCol="features",
    labelCol="total_gross_adjusted",
    maxIter=100,     # número de árvores
    maxDepth=5,      # profundidade da árvore
    stepSize=0.1     # taxa de aprendizado
)

# ========================================
# 6. Pipeline completo
# ========================================
pipeline = Pipeline(stages=[
    indexer_month,
    encoder_month,
    indexer_duration,
    genre_indexer,
    genre_encoder,
    distributor_indexer,
    distributor_encoder,
    assembler,
    gbt
])

# ========================================
# 7. Treinamento
# ========================================
model = pipeline.fit(df_filtered)

# ========================================
# 8. Predição
# ========================================
predictions = model.transform(df_filtered)

# ========================================
# 9. Avaliação do Modelo
# ========================================
evaluator_rmse = RegressionEvaluator(labelCol="total_gross_adjusted", predictionCol="prediction", metricName="rmse")
evaluator_mae = RegressionEvaluator(labelCol="total_gross_adjusted", predictionCol="prediction", metricName="mae")
evaluator_r2 = RegressionEvaluator(labelCol="total_gross_adjusted", predictionCol="prediction", metricName="r2")

rmse = evaluator_rmse.evaluate(predictions)
mae = evaluator_mae.evaluate(predictions)
r2 = evaluator_r2.evaluate(predictions)

print(f"✅ RMSE: {rmse:,.2f}")
print(f"✅ MAE: {mae:,.2f}")
print(f"✅ R²: {r2:.4f}")

# ========================================
# 🔍 Exibir amostra dos resultados
# ========================================
predictions.select(
    "movie_title",
    "release_month",
    "domestic_opening_adjusted",
    "total_gross_adjusted",
    "prediction"
).show(10)
