In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

#  Criar a Spark Session
spark = SparkSession.builder \
    .appName("Join Video Comments") \
    .getOrCreate()

#  Ler os arquivos parquet
df_video = spark.read.parquet("/content/videos-preparados.snappy.parquet")
df_comments = spark.read.parquet("/content/videos-comments-tratados.snappy.parquet")

#  Imprimir os esquemas dos DataFrames
print("Esquema do DataFrame de vídeos:")
df_video.printSchema()

print("\nEsquema do DataFrame de comentários:")
df_comments.printSchema()

#  Imprimir as primeiras linhas de cada DataFrame
print("\nPrimeiras linhas do DataFrame de vídeos:")
df_video.show(15)

print("\nPrimeiras linhas do DataFrame de comentários:")
df_comments.show(15)

In [None]:
#  Criar tabelas temporárias para os DataFrames
df_video.createOrReplaceTempView("video_table")
df_comments.createOrReplaceTempView("comments_table")
# Exibindo as mensagens de confirmação
print("Tabela temporária 'video_table' criada com sucesso.")
print("Tabela temporária 'comments_table' criada com sucesso.")
# Visualizando o conteúdo das tabelas temporárias
print("Conteúdo da tabela 'video_table':")
spark.sql("SELECT * FROM video_table").show()

print("Conteúdo da tabela 'comments_table':")
spark.sql("SELECT * FROM comments_table").show()

In [None]:
#  Mostrar os esquemas dos DataFrames para verificar os nomes das colunas
print("Esquema de df_video:")
df_video.printSchema()

print("\nEsquema de df_comments:")
df_comments.printSchema()

#  Renomear colunas para evitar espaços e inconsistências
df_video_cleaned = df_video.withColumnRenamed("Video ID", "video_id")
df_comments_cleaned = df_comments.withColumnRenamed("Video ID", "video_id")

#  Criar tabelas temporárias com os nomes padronizados
df_video_cleaned.createOrReplaceTempView("video_table")
df_comments_cleaned.createOrReplaceTempView("comments_table")

#  Realizar o JOIN usando Spark SQL
join_video_comments = spark.sql("""
    SELECT v.*, c.*
    FROM video_table v
    JOIN comments_table c
    ON v.video_id = c.video_id
""")

#  Mostrar algumas linhas do resultado para validar o JOIN
print("\nResultado do JOIN:")
join_video_comments.show()

In [None]:
print("Esquema de df_video:")
df_video.printSchema()

print("\nEsquema de df_comments:")
df_comments.printSchema()

In [None]:
df_video_cleaned = df_video.withColumnRenamed("Video ID", "video_id")
print("Renamed 'Video ID' to 'video_id' in df_video")
df_comments_cleaned = df_comments.withColumnRenamed("Video ID", "video_id")
print("Renamed 'Video ID' to 'video_id' in df_comments")

In [None]:
#  Realizar o JOIN usando Spark SQL

join_video_comments = spark.sql("""
    SELECT v.*, c.*
    FROM video_table v
    JOIN comments_table c
    ON v.video_id = c.video_id
""")

# Exibir os resultados do JOIN
join_video_comments.show()

In [None]:
#  Verificar os esquemas dos DataFrames antes de realizar o JOIN
print("Esquema de df_video:")
df_video.printSchema()

print("\nEsquema de df_comments:")
df_comments.printSchema()

#  Padronizar os nomes das colunas removendo espaços e convertendo para minúsculas
# Renomear colunas de df_video
df_video_cleaned = df_video.select(
    *[col(c).alias(c.lower().replace(" ", "_")) for c in df_video.columns]
)

# Renomear colunas de df_comments
df_comments_cleaned = df_comments.select(
    *[col(c).alias(c.lower().replace(" ", "_")) for c in df_comments.columns]
)

#  Verificar os esquemas após padronizar os nomes
print("\nEsquema de df_video_cleaned:")
df_video_cleaned.printSchema()

print("\nEsquema de df_comments_cleaned:")
df_comments_cleaned.printSchema()

#  Criar tabelas temporárias
df_video_cleaned.createOrReplaceTempView("video_table")
df_comments_cleaned.createOrReplaceTempView("comments_table")

#  Realizar o JOIN utilizando as colunas padronizadas
join_video_comments = spark.sql("""
    SELECT v.*, c.*
    FROM video_table v
    JOIN comments_table c
    ON v.video_id = c.video_id
""")

#  Mostrar o resultado do JOIN
print("\nResultado do JOIN:")
join_video_comments.show()

#  Realizar o JOIN com reparticionamento
df_video_repartition = df_video_cleaned.repartition(10)
df_comments_repartition = df_comments_cleaned.repartition(10)

# Criar tabelas temporárias novamente
df_video_repartition.createOrReplaceTempView("video_table_repartition")
df_comments_repartition.createOrReplaceTempView("comments_table_repartition")

#  JOIN com reparticionamento
join_video_comments_repartition = spark.sql("""
    SELECT v.*, c.*
    FROM video_table_repartition v
    JOIN comments_table_repartition c
    ON v.video_id = c.video_id
""")

#  Mostrar o resultado do JOIN com reparticionamento
print("\nResultado do JOIN com reparticionamento:")
join_video_comments_repartition.show()

In [None]:
#  Verificar os esquemas originais (apenas para conferência)
print("Esquema original de df_video:")
df_video.printSchema()

print("\nEsquema original de df_comments:")
df_comments.printSchema()

#  Renomear colunas para padronizar os nomes
df_video_cleaned = df_video.select(
    *[col(c).alias(c.lower().replace(" ", "_")) for c in df_video.columns]
)
df_comments_cleaned = df_comments.select(
    *[col(c).alias(c.lower().replace(" ", "_")) for c in df_comments.columns]
)

#  Aplicar `coalesce` após renomear as colunas
df_video_coalesce = df_video_cleaned.coalesce(2)
df_comments_coalesce = df_comments_cleaned.coalesce(2)

#  Criar tabelas temporárias para os DataFrames coalesce
df_video_coalesce.createOrReplaceTempView("video_table_coalesce")
df_comments_coalesce.createOrReplaceTempView("comments_table_coalesce")

#  Realizar o JOIN utilizando as tabelas temporárias coalesce
join_video_comments_coalesce = spark.sql("""
    SELECT v.*, c.*
    FROM video_table_coalesce v
    JOIN comments_table_coalesce c
    ON v.video_id = c.video_id
""")

#  Exibir o resultado do JOIN
print("\nResultado do JOIN com coalesce:")
join_video_comments_coalesce.show()

In [None]:
#  Visualizar o plano de execução
print("Plano de execução com repartition:")
join_video_comments_repartition.explain()

print("Plano de execução com coalesce:")
join_video_comments_coalesce.explain()

In [None]:
#  Mostrar o esquema do DataFrame original para verificar os nomes das colunas
print("Esquema de df_video:")
df_video.printSchema()

print("\nEsquema de df_comments:")
df_comments.printSchema()

#  Renomear as colunas do DataFrame para padronizar
df_video_cleaned = df_video.select(
    *[col(c).alias(c.lower().replace(" ", "_")) for c in df_video.columns]
)

df_comments_cleaned = df_comments.select(
    *[col(c).alias(c.lower().replace(" ", "_")) for c in df_comments.columns]
)

#  Mostrar os esquemas após a renomeação
print("\nEsquema de df_video_cleaned:")
df_video_cleaned.printSchema()

print("\nEsquema de df_comments_cleaned:")
df_comments_cleaned.printSchema()

#  Selecionar apenas as colunas necessárias após a limpeza
df_video_selected = df_video_cleaned.select("video_id", "title", "keyword")
df_comments_selected = df_comments_cleaned.select("video_id", "comment", "likes")

#  Filtrar apenas os dados relevantes
df_video_filtered = df_video_selected.filter("video_id IS NOT NULL")
df_comments_filtered = df_comments_selected.filter("video_id IS NOT NULL")

#  Criar tabelas temporárias
df_video_filtered.createOrReplaceTempView("video_table")
df_comments_filtered.createOrReplaceTempView("comments_table")

#  Realizar o JOIN
join_video_comments = spark.sql("""
    SELECT v.video_id, v.title, v.keyword, c.comment, c.likes
    FROM video_table v
    JOIN comments_table c
    ON v.video_id = c.video_id
""")

#  Mostrar o resultado do JOIN
print("\nResultado do JOIN:")
join_video_comments.show()

In [None]:
#  Criar tabelas temporárias para garantir que as colunas foram padronizadas e filtradas
df_video_filtered.createOrReplaceTempView("video_table")
df_comments_filtered.createOrReplaceTempView("comments_table")

#  Realizar o JOIN otimizado
join_video_comments_optimized = spark.sql("""
    SELECT v.video_id, v.title, v.keyword, c.comment, c.likes
    FROM video_table v
    JOIN comments_table c
    ON v.video_id = c.video_id
""")

#  Verificar o resultado do JOIN
print("\nResultado do JOIN otimizado:")
join_video_comments_optimized.show()

#  Salvar o resultado otimizado
join_video_comments_optimized.write.mode("overwrite").parquet("join-videos-comments-otimizado")