In [1]:
from pyspark.sql import SparkSession

In [2]:
# Criação da SparkSession
spark = SparkSession.builder \
    .appName("Join Videos e Comentários") \
    .getOrCreate()

In [5]:
# Leitura dos arquivos Parquet
df_video = spark.read.parquet("/content/videos-preparados.snappy (1).parquet")
df_comments = spark.read.parquet("/content/videos-comments-tratados.snappy (1).parquet")


In [6]:
# Criação das tabelas temporárias
df_video.createOrReplaceTempView("videos")
df_comments.createOrReplaceTempView("comments")

In [14]:
# Join com Spark SQL
join_video_comments = spark.sql("""
    SELECT *
    FROM videos v
    JOIN comments c
    ON v.`Video ID` = c.`Video ID`
""")


In [16]:
# repartition e coalesce

# Repartition para melhor paralelismo antes do join (por video_id)
df_video_rep = df_video.repartition("Video ID")
df_comments_rep = df_comments.repartition("Video ID")


In [19]:
# Criação de novas tabelas temporárias
df_video_rep.createOrReplaceTempView("videos_rep")
df_comments_rep.createOrReplaceTempView("comments_rep")

In [23]:
# Join com dados particionados
join_video_comments_rep = spark.sql("""
    SELECT *
    FROM videos_rep v
    JOIN comments_rep c
    ON v.`Video ID` = c.`Video ID` -- Added backticks around 'Video ID'
""")

In [24]:
# EXPLAIN para analisar o plano de execução
print("Plano de execução do join original:")
join_video_comments.explain(True)

print("Plano de execução do join com repartition:")
join_video_comments_rep.explain(True)

Plano de execução do join original:
== Parsed Logical Plan ==
'Project [*]
+- 'Join Inner, ('v.Video ID = 'c.Video ID)
   :- 'SubqueryAlias v
   :  +- 'UnresolvedRelation [videos], [], false
   +- 'SubqueryAlias c
      +- 'UnresolvedRelation [comments], [], false

== Analyzed Logical Plan ==
Title: string, Video ID: string, Published At: date, Keyword: string, Likes: int, Comments: int, Views: int, Interaction: int, Year: int, Month: int, Keyword Index: double, Features PCA: vector, Features Normal: vector, Features: vector, Video ID: string, Title: string, Published At: date, Keyword: string, Likes: int, Comments: int, Views: int, Interaction: int, Year: string, Comment: string, ... 2 more fields
Project [Title#28, Video ID#29, Published At#30, Keyword#31, Likes#32, Comments#33, Views#34, Interaction#35, Year#36, Month#37, Keyword Index#38, Features PCA#39, Features Normal#40, Features#41, Video ID#56, Title#57, Published At#58, Keyword#59, Likes#60, Comments#61, Views#62, Interactio

In [35]:
df_video = spark.read.parquet("/content/videos-preparados.snappy (1).parquet")
df_video.printSchema()


root
 |-- Title: string (nullable = true)
 |-- Video ID: string (nullable = true)
 |-- Published At: date (nullable = true)
 |-- Keyword: string (nullable = true)
 |-- Likes: integer (nullable = true)
 |-- Comments: integer (nullable = true)
 |-- Views: integer (nullable = true)
 |-- Interaction: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Keyword Index: double (nullable = true)
 |-- Features PCA: vector (nullable = true)
 |-- Features Normal: vector (nullable = true)
 |-- Features: vector (nullable = true)



In [39]:
# Leitura dos dados de vídeos com projeção de colunas relevantes
df_video_filtered = spark.read.parquet("/content/videos-preparados.snappy (1).parquet") \
    .select("Video ID", "Title", "Published At", "Keyword", "Likes", "Comments", "Views") \
    .withColumnRenamed("Video ID", "video_id")

# Leitura dos dados de comentários com projeção
df_comments_filtered = spark.read.parquet("/content/videos-comments-tratados.snappy (1).parquet") \
    .select("Video ID", "Comment", "Sentiment", "Likes Comment") \
    .withColumnRenamed("Video ID", "video_id")


In [40]:
# Otimizando particionamento para o join
df_video_filtered = df_video_filtered.repartition("Video ID")
df_comments_filtered = df_comments_filtered.repartition("Video ID")


In [41]:
# Criação das tabelas temporárias
df_video_filtered.createOrReplaceTempView("videos_opt")
df_comments_filtered.createOrReplaceTempView("comments_opt")


In [51]:
# Join otimizado apenas com colunas necessárias
join_video_comments_optimized = spark.sql("""
    SELECT
        v.`Video ID` AS video_id,
        v.Title AS video_title,
        v.`Published At` AS video_published_at,
        v.Keyword AS video_keyword,
        v.Likes AS video_likes,
        v.Comments AS video_comments,
        v.Views AS video_views,
        c.Comment AS comment_text,
        c.Sentiment AS comment_sentiment,
        c.`Likes Comment` AS comment_likes
    FROM videos v
    INNER JOIN comments c
    ON v.`Video ID` = c.`Video ID`
""")


In [52]:
# 3. Salvando no formato parquet
join_video_comments_optimized.write.mode("overwrite").parquet("join-videos-comments-otimizado")