In [None]:
from pyspark.sql import SparkSession

# Criando a Spark Session
spark = SparkSession.builder.appName("VideoCommentsJoin").getOrCreate()

# Passo 1: Ler os arquivos Parquet
# Carregando os dados de vídeos
df_video = spark.read.parquet("videos-preparados.snappy.parquet")
# Carregando os dados de comentários
df_comments = spark.read.parquet("video-comments-tratados.snappy.parquet")

# Passo 2: Criar tabelas temporárias
# Criando tabelas temporárias para SQL
df_video.createOrReplaceTempView("videos")
df_comments.createOrReplaceTempView("comments")

# Passo 3: Realizar o JOIN utilizando SQL
join_video_comments = spark.sql("""
    SELECT v.*, c.*
    FROM videos v
    INNER JOIN comments c
    ON v.video_id = c.video_id
""")

# Passo 4: Repetindo os passos com repartition e coalesce
# Reparticionando os dataframes para otimizar a distribuição de dados
df_video_repartitioned = df_video.repartition(10)
df_comments_repartitioned = df_comments.repartition(10)

# Criando tabelas temporárias novamente
df_video_repartitioned.createOrReplaceTempView("videos_rep")
df_comments_repartitioned.createOrReplaceTempView("comments_rep")

# Fazendo o JOIN novamente
join_video_comments_rep = spark.sql("""
    SELECT v.*, c.*
    FROM videos_rep v
    INNER JOIN comments_rep c
    ON v.video_id = c.video_id
""")

# Passo 5: Utilizando explain para entender a diferença entre as estratégias
print("Explicação do JOIN normal:")
join_video_comments.explain(True)
print("Explicação do JOIN com repartition:")
join_video_comments_rep.explain(True)

# Passo 6: Realizando o JOIN de forma otimizada
# Utilizando coalesce para otimizar o número de partições antes de criar tabelas
df_video_optimized = df_video.coalesce(5)
df_comments_optimized = df_comments.coalesce(5)

# Criando tabelas temporárias otimizadas
df_video_optimized.createOrReplaceTempView("videos_opt")
df_comments_optimized.createOrReplaceTempView("comments_opt")

# Realizando o JOIN otimizado filtrando apenas colunas necessárias
join_video_comments_opt = spark.sql("""
    SELECT v.video_id, v.title, v.category, c.comment_text, c.user_id, c.like_count
    FROM videos_opt v
    INNER JOIN comments_opt c
    ON v.video_id = c.video_id
""")

# Passo 7: Salvando o resultado final no formato Parquet
join_video_comments_opt.write.mode("overwrite").parquet("join-videos-comments-otimizado")

# Comentários sobre as otimizações:
# 1. Utilizamos `repartition(10)` para distribuir os dados igualmente entre os nós antes do JOIN.
# 2. Utilizamos `coalesce(5)` para reduzir o número de partições antes do processamento final, otimizando a escrita dos dados.
# 3. Reduzimos as colunas selecionadas no JOIN para evitar leitura e processamento desnecessários.
# 4. `explain(True)` foi usado para analisar o plano de execução e identificar melhorias na estratégia de processamento.

print("Processo concluído com otimização!")
