<a href="https://colab.research.google.com/github/VictorL85/pyspark/blob/main/otimizacao.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

In [None]:
spark = SparkSession.builder.getOrCreate()

In [None]:
df_video = spark.read.parquet("/content/videos-preparados.snappy.parquet")
df_comments = spark.read.parquet("/content/videos-comments-tratados.snappy.parquet")

In [None]:
df_video.printSchema()

root
 |-- Title: string (nullable = true)
 |-- Video ID: string (nullable = true)
 |-- Published At: date (nullable = true)
 |-- Keyword: string (nullable = true)
 |-- Likes: integer (nullable = true)
 |-- Comments: integer (nullable = true)
 |-- Views: integer (nullable = true)
 |-- Interaction: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Keyword Index: double (nullable = true)
 |-- Features PCA: vector (nullable = true)
 |-- Features Normal: vector (nullable = true)
 |-- Features: vector (nullable = true)



In [None]:
df_comments.printSchema()

root
 |-- Video ID: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Published At: date (nullable = true)
 |-- Keyword: string (nullable = true)
 |-- Likes: integer (nullable = true)
 |-- Comments: integer (nullable = true)
 |-- Views: integer (nullable = true)
 |-- Interaction: integer (nullable = true)
 |-- Year: string (nullable = true)
 |-- Comment: string (nullable = true)
 |-- Sentiment: integer (nullable = true)
 |-- Likes Comment: integer (nullable = true)



In [None]:
df_video.createOrReplaceTempView("videos")
df_comments.createOrReplaceTempView("comments")

In [None]:
join_video_comments = spark.sql("SELECT v.*, c.Comment, c.Sentiment, c.`Likes Comment` FROM videos v JOIN comments c ON v.`Video ID` = c.`Video ID`")

In [None]:
# Realizando repartition das tabelas
df_video_re = df_video.repartition("Video ID")
df_comments_re = df_comments.repartition("Video ID")
# Criando tabelas temporárias das tabelas repartitionados
df_video_re.createOrReplaceTempView("videos_repartitioned")
df_comments_re.createOrReplaceTempView("comments_repartitioned")
# Realizando o join entre as tabelas usando o SQL
join_video_comments_re = spark.sql("""
    SELECT
        v.*,
        c.Comment,
        c.Sentiment,
        c.`Likes Comment`
    FROM videos_repartitioned v
    JOIN comments_repartitioned c
    ON v.`Video ID` = c.`Video ID`
""")

In [None]:
# Exibe o coalesce de join_video_comments
join_video_comments.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [Title#777, Video ID#778, Published At#779, Keyword#780, Likes#781, Comments#782, Views#783, Interaction#784, Year#785, Month#786, Keyword Index#787, Features PCA#788, Features Normal#789, Features#790, Comment#814, Sentiment#815, Likes Comment#816]
   +- BroadcastHashJoin [Video ID#778], [Video ID#805], Inner, BuildLeft, false
      :- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, false]),false), [plan_id=475]
      :  +- Filter isnotnull(Video ID#778)
      :     +- FileScan parquet [Title#777,Video ID#778,Published At#779,Keyword#780,Likes#781,Comments#782,Views#783,Interaction#784,Year#785,Month#786,Keyword Index#787,Features PCA#788,Features Normal#789,Features#790] Batched: true, DataFilters: [isnotnull(Video ID#778)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/content/videos-preparados.snappy.parquet], PartitionFilters: [], PushedFilters: [IsNotNull(`Video ID`)], ReadSchema:

In [None]:
# Exibe o coalesce de join_video_comments_re
join_video_comments_re.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [Title#777, Video ID#778, Published At#779, Keyword#780, Likes#781, Comments#782, Views#783, Interaction#784, Year#785, Month#786, Keyword Index#787, Features PCA#788, Features Normal#789, Features#790, Comment#814, Sentiment#815, Likes Comment#816]
   +- BroadcastHashJoin [Video ID#778], [Video ID#805], Inner, BuildLeft, false
      :- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, false]),false), [plan_id=514]
      :  +- Exchange hashpartitioning(Video ID#778, 200), REPARTITION_BY_COL, [plan_id=510]
      :     +- Filter isnotnull(Video ID#778)
      :        +- FileScan parquet [Title#777,Video ID#778,Published At#779,Keyword#780,Likes#781,Comments#782,Views#783,Interaction#784,Year#785,Month#786,Keyword Index#787,Features PCA#788,Features Normal#789,Features#790] Batched: true, DataFilters: [isnotnull(Video ID#778)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/content/videos-pre

In [None]:
# Filtrando as tabelas para o ano = 2023(pois foi o que a IA me recomendou com "dados necessários"
df_video_fil = df_video.filter(col("Year") == 2023)
df_comments_fil = df_comments.filter(col("Year").cast("int") == 2023)
# Realizando o join entre as tabelas filtradas com "Video ID" para uni-las
join_video_comments_op = df_video_fil.join(
    df_comments_fil,
    df_video_fil["Video ID"] == df_comments_fil["Video ID"],
    "inner"
).select(
    df_video_fil["*"],
    df_comments_fil["Comment"],
    df_comments_fil["Sentiment"],
    df_comments_fil["Likes Comment"]
)

In [None]:
# Exibe o coalesce de join_video_comments_op
join_video_comments_op.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [Title#777, Video ID#778, Published At#779, Keyword#780, Likes#781, Comments#782, Views#783, Interaction#784, Year#785, Month#786, Keyword Index#787, Features PCA#788, Features Normal#789, Features#790, Comment#814, Sentiment#815, Likes Comment#816]
   +- BroadcastHashJoin [Video ID#778], [Video ID#805], Inner, BuildLeft, false
      :- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, false]),false), [plan_id=543]
      :  +- Filter ((isnotnull(Year#785) AND (Year#785 = 2023)) AND isnotnull(Video ID#778))
      :     +- FileScan parquet [Title#777,Video ID#778,Published At#779,Keyword#780,Likes#781,Comments#782,Views#783,Interaction#784,Year#785,Month#786,Keyword Index#787,Features PCA#788,Features Normal#789,Features#790] Batched: true, DataFilters: [isnotnull(Year#785), (Year#785 = 2023), isnotnull(Video ID#778)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/content/videos-preparados.

In [None]:
#Salvando a tabela no formato parquet
join_video_comments_op.write.mode("overwrite").parquet("join-videos-comments-otimizado")

In [None]:
spark.stop()