# Otimização de Join com PySpark
Este notebook apresenta todas as etapas do exercício de otimização de joins utilizando PySpark.

## Etapas 1 a 4: Leitura normal e Join simples com SQL

In [1]:
!pip install pyspark



In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder \
    .appName("OtimizacaoJoin") \
    .getOrCreate()

# 1 - Leitura do parquet de vídeos
df_video = spark.read.parquet("videos-preparados.snappy.parquet")

# 2 - Leitura do parquet de comentários
df_comments = spark.read.parquet("/content/videos-comments-tratados.snappy.parquet")

# 3 - Criar tabelas temporárias
df_video.createOrReplaceTempView("videos")
df_comments.createOrReplaceTempView("comentarios")

# 4 - Join usando SQL
join_video_comments = spark.sql("""
    SELECT *
    FROM videos v
    JOIN comentarios c
    ON v.`Video ID` = c.`Video ID`
""")


join_video_comments.show(5)

+--------------------+-----------+------------+-------+-----+--------+------+-----------+----+-----+-------------+--------------------+--------------------+--------------------+-----------+--------------------+------------+-------+-----+--------+------+-----------+----+--------------------+---------+-------------+
|               Title|   Video ID|Published At|Keyword|Likes|Comments| Views|Interaction|Year|Month|Keyword Index|        Features PCA|     Features Normal|            Features|   Video ID|               Title|Published At|Keyword|Likes|Comments| Views|Interaction|Year|             Comment|Sentiment|Likes Comment|
+--------------------+-----------+------------+-------+-----+--------+------+-----------+----+-----+-------------+--------------------+--------------------+--------------------+-----------+--------------------+------------+-------+-----+--------+------+-----------+----+--------------------+---------+-------------+
|Apple Pay Is Kill...|wAZZ-UWGVHI|  2022-08-23|   te

## Etapa 5: Utilizando repartition e coalesce

In [8]:
# Reparticionar os dataframes
df_video_rep = df_video.repartition("Video ID")
df_comments_rep = df_comments.repartition("Video ID")

# Criar views
df_video_rep.createOrReplaceTempView("videos_rep")
df_comments_rep.createOrReplaceTempView("comentarios_rep")

# Join com dados reparticionados (usando crases por causa do espaço no nome da coluna)
join_video_comments_rep = spark.sql("""
    SELECT *
    FROM videos_rep v
    JOIN comentarios_rep c
    ON v.`Video ID` = c.`Video ID`
""")

# Coalesce para reduzir número de arquivos na escrita
join_video_comments_rep = join_video_comments_rep.coalesce(1)
join_video_comments_rep.show(5)


+--------------------+-----------+------------+-------+------+--------+--------+-----------+----+-----+-------------+--------------------+--------------------+--------------------+-----------+--------------------+------------+-------+------+--------+--------+-----------+----+--------------------+---------+-------------+
|               Title|   Video ID|Published At|Keyword| Likes|Comments|   Views|Interaction|Year|Month|Keyword Index|        Features PCA|     Features Normal|            Features|   Video ID|               Title|Published At|Keyword| Likes|Comments|   Views|Interaction|Year|             Comment|Sentiment|Likes Comment|
+--------------------+-----------+------------+-------+------+--------+--------+-----------+----+-----+-------------+--------------------+--------------------+--------------------+-----------+--------------------+------------+-------+------+--------+--------+-----------+----+--------------------+---------+-------------+
|How To Fix a Wate...|115amzVdV44|

## Etapas 6 e 7: Otimização real com seleção de colunas, filter e análise do plano de execução

In [13]:
# 1. Leitura com seleção das colunas corretas
df_video_opt = spark.read.parquet("videos-preparados.snappy.parquet") \
    .select("Video ID", "Title", "Published At")

df_comments_opt = spark.read.parquet("/content/videos-comments-tratados.snappy.parquet") \
    .select("Video ID", "Comment", "Likes Comment")

# 2. Filtrar apenas vídeos de 2020
df_video_opt = df_video_opt.filter(col("Published At").startswith("2020"))

# 3. Reparticionar
df_video_opt = df_video_opt.repartition("Video ID")
df_comments_opt = df_comments_opt.repartition("Video ID")

# 4. Criar views temporárias
df_video_opt.createOrReplaceTempView("videos_opt")
df_comments_opt.createOrReplaceTempView("comentarios_opt")

# 5. EXPLAIN - verificar o plano de execução
spark.sql("""
    SELECT *
    FROM videos_opt v
    JOIN comentarios_opt c
    ON v.`Video ID` = c.`Video ID`
""").explain()



== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [Video ID#605], [Video ID#636], Inner, BuildLeft, false
   :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]),false), [plan_id=315]
   :  +- Exchange hashpartitioning(Video ID#605, 200), REPARTITION_BY_COL, [plan_id=306]
   :     +- Project [Video ID#605, Title#604, Published At#606]
   :        +- Filter ((isnotnull(Published At#606) AND StartsWith(cast(Published At#606 as string), 2020)) AND isnotnull(Video ID#605))
   :           +- FileScan parquet [Title#604,Video ID#605,Published At#606] Batched: true, DataFilters: [isnotnull(Published At#606), StartsWith(cast(Published At#606 as string), 2020), isnotnull(Video..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/content/videos-preparados.snappy.parquet], PartitionFilters: [], PushedFilters: [IsNotNull(`Published At`), IsNotNull(`Video ID`)], ReadSchema: struct<Title:string,Video ID:string,Published At:date>
   +- Ex

In [15]:
# Join otimizado final com nomes de colunas corretos
join_otimizado = spark.sql("""
    SELECT
        v.`Video ID` AS video_id,
        v.`Title` AS title,
        c.`Comment` AS comment,
        c.`Likes Comment` AS likes_comment
    FROM videos_opt v
    JOIN comentarios_opt c
    ON v.`Video ID` = c.`Video ID`
""")

join_otimizado.show(5)


+-----------+--------------------+--------------------+-------------+
|   video_id|               title|             comment|likes_comment|
+-----------+--------------------+--------------------+-------------+
|115amzVdV44|How To Fix a Wate...|Please make anoth...|        76769|
|115amzVdV44|How To Fix a Wate...|I love how his vi...|           48|
|115amzVdV44|How To Fix a Wate...|The amount of eff...|            4|
|115amzVdV44|How To Fix a Wate...|Thank you! I was ...|            8|
|115amzVdV44|How To Fix a Wate...|I’m so glad I wat...|            5|
+-----------+--------------------+--------------------+-------------+
only showing top 5 rows



In [16]:
# Etapa 7 - Salvar o resultado otimizado
join_otimizado.coalesce(1).write.mode("overwrite").parquet("join-videos-comments-otimizado")

In [None]:
# Encerra a SparkSession para liberar os recursos
spark.stop()


## Etapa 8: Comentários sobre otimizações realizadas

- **Selecionar apenas colunas necessárias** reduz o volume de dados processados.
- **Filtro por data** garante que só os dados úteis sejam lidos e unidos.
- **Repartition** melhora a performance do join, evitando shuffle desnecessário.
- **Coalesce(1)** é usado antes de salvar o resultado para que apenas 1 arquivo parquet seja gerado, facilitando o uso futuro.
- **EXPLAIN** permite ver se o Spark está otimizando o plano de execução, como pushdown de filtros e broadcast join.
