In [53]:
! pip install pyspark



In [54]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [55]:
spark = SparkSession.builder.getOrCreate()

In [56]:
# Ler os arquivos em parquet
df_video = spark.read.parquet("videos-preparados.snappy.parquet")
df_comments = spark.read.parquet("videos-comments-tratados.snappy.parquet")

df_video.show(5)
df_comments.show(5)

+--------------------+-----------+------------+-------+------+--------+--------+-----------+----+-----+-------------+--------------------+--------------------+--------------------+
|               Title|   Video ID|Published At|Keyword| Likes|Comments|   Views|Interaction|Year|Month|Keyword Index|        Features PCA|     Features Normal|            Features|
+--------------------+-----------+------------+-------+------+--------+--------+-----------+----+-----+-------------+--------------------+--------------------+--------------------+
|ASMR MUKBANG DOUB...|--ZI0dSbbNU|  2020-04-18|mukbang|378858|   18860|17975269|   18372987|2020|    4|         30.0|[0.6985786560867407]|[0.02303716158264...|[378858.0,1.79752...|
|Deadly car bomb d...|--hxd1CrOqg|  2022-08-22|   news|  6379|    4853|  808787|     820019|2022|    8|         37.0|[0.8936407990235931]|[3.87946679100418...|[6379.0,808787.0,...|
|How Biden&#39;s s...|--ixiTypG8g|  2022-08-24|   news|  1029|    2347|   97434|     100810|202

In [57]:
# Criar tabelas temporárias
df_video.createOrReplaceTempView("videos")
df_comments.createOrReplaceTempView("comentarios")

# Realizar join
join_video_comments = df_video.join(df_comments, on = "Video ID")
join_video_comments.show(5)

+-----------+--------------------+------------+-------+-----+--------+------+-----------+----+-----+-------------+--------------------+--------------------+--------------------+--------------------+------------+-------+-----+--------+------+-----------+----+--------------------+---------+-------------+
|   Video ID|               Title|Published At|Keyword|Likes|Comments| Views|Interaction|Year|Month|Keyword Index|        Features PCA|     Features Normal|            Features|               Title|Published At|Keyword|Likes|Comments| Views|Interaction|Year|             Comment|Sentiment|Likes Comment|
+-----------+--------------------+------------+-------+-----+--------+------+-----------+----+-----+-------------+--------------------+--------------------+--------------------+--------------------+------------+-------+-----+--------+------+-----------+----+--------------------+---------+-------------+
|wAZZ-UWGVHI|Apple Pay Is Kill...|  2022-08-23|   tech| 3407|     672|135612|     139691

In [58]:
# Repartition para distribuir os dados entre os workers (bom para joins)
df_video_repartition = df_video.repartition("Video ID")
df_comments_repartition = df_comments.repartition("Video ID")

In [59]:
# Criar novas tabelas temporárias com dados reparticionados
df_video_repartition.createOrReplaceTempView("videos_repartition")
df_comments_repartition.createOrReplaceTempView("comments_repartition")

# Realizar join com dados reparticionados
join_video_comments_repartition = df_video_repartition.join(df_comments_repartition, on = "Video ID")
join_video_comments_repartition.show(5)

+-----------+--------------------+------------+-------+------+--------+--------+-----------+----+-----+-------------+--------------------+--------------------+--------------------+--------------------+------------+-------+------+--------+--------+-----------+----+--------------------+---------+-------------+
|   Video ID|               Title|Published At|Keyword| Likes|Comments|   Views|Interaction|Year|Month|Keyword Index|        Features PCA|     Features Normal|            Features|               Title|Published At|Keyword| Likes|Comments|   Views|Interaction|Year|             Comment|Sentiment|Likes Comment|
+-----------+--------------------+------------+-------+------+--------+--------+-----------+----+-----+-------------+--------------------+--------------------+--------------------+--------------------+------------+-------+------+--------+--------+-----------+----+--------------------+---------+-------------+
|115amzVdV44|How To Fix a Wate...|  2020-08-18| how-to|910553|   81975

In [60]:
# Reduzir número de partições com coalesce para gravação
join_video_comments_repartition = join_video_comments_repartition.coalesce(5)

In [61]:
#Otimização com análise de plano de execução
join_video_comments.explain()
join_video_comments_repartition.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [Video ID#1414, Title#1413, Published At#1415, Keyword#1416, Likes#1417, Comments#1418, Views#1419, Interaction#1420, Year#1421, Month#1422, Keyword Index#1423, Features PCA#1424, Features Normal#1425, Features#1426, Title#1442, Published At#1443, Keyword#1444, Likes#1445, Comments#1446, Views#1447, Interaction#1448, Year#1449, Comment#1450, Sentiment#1451, Likes Comment#1452]
   +- BroadcastHashJoin [Video ID#1414], [Video ID#1441], Inner, BuildLeft, false
      :- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, false]),false), [plan_id=2389]
      :  +- Filter isnotnull(Video ID#1414)
      :     +- FileScan parquet [Title#1413,Video ID#1414,Published At#1415,Keyword#1416,Likes#1417,Comments#1418,Views#1419,Interaction#1420,Year#1421,Month#1422,Keyword Index#1423,Features PCA#1424,Features Normal#1425,Features#1426] Batched: true, DataFilters: [isnotnull(Video ID#1414)], Format: Parquet, Location: 

In [62]:
# Otimizando a analise com as colunas necessárias
df_video_otimizado = df_video.select(("Video ID"),("Likes"), ("Comments"), ("Views"), ("Interaction"), ("Keyword"))

df_video_likes = df_video_otimizado

df_video_otimizado.show(5)

+-----------+------+--------+--------+-----------+-------+
|   Video ID| Likes|Comments|   Views|Interaction|Keyword|
+-----------+------+--------+--------+-----------+-------+
|--ZI0dSbbNU|378858|   18860|17975269|   18372987|mukbang|
|--hxd1CrOqg|  6379|    4853|  808787|     820019|   news|
|--ixiTypG8g|  1029|    2347|   97434|     100810|   news|
|-64r1hcxtV4| 45628|   17264| 5283664|    5346556|mukbang|
|-6IgkG5yZfo| 10959|     525|  844015|     855499|physics|
+-----------+------+--------+--------+-----------+-------+
only showing top 5 rows



In [63]:
df_video_otimizado = df_video.select(("Video ID"),("Likes"), ("Comments"), ("Views"), ("Interaction"), ("Keyword"))
df_video_otimizado = df_video_otimizado.filter((col("Likes").isNotNull()) & (col("Comments").isNotNull()))

df_comments_otimizado = df_comments.select(("Video ID"),("Likes"), ("Comments"), ("Views"), ("Interaction"), ("Keyword"))
df_comments_otimizado = df_comments_otimizado.filter((col("Likes").isNotNull()) & (col("Comments").isNotNull()))

df_comments_otimizado.show(5)
df_video_otimizado.show(5)

+-----------+-----+--------+------+-----------+-------+
|   Video ID|Likes|Comments| Views|Interaction|Keyword|
+-----------+-----+--------+------+-----------+-------+
|wAZZ-UWGVHI| 3407|     672|135612|     139691|   tech|
|wAZZ-UWGVHI| 3407|     672|135612|     139691|   tech|
|wAZZ-UWGVHI| 3407|     672|135612|     139691|   tech|
|wAZZ-UWGVHI| 3407|     672|135612|     139691|   tech|
|wAZZ-UWGVHI| 3407|     672|135612|     139691|   tech|
+-----------+-----+--------+------+-----------+-------+
only showing top 5 rows

+-----------+------+--------+--------+-----------+-------+
|   Video ID| Likes|Comments|   Views|Interaction|Keyword|
+-----------+------+--------+--------+-----------+-------+
|--ZI0dSbbNU|378858|   18860|17975269|   18372987|mukbang|
|--hxd1CrOqg|  6379|    4853|  808787|     820019|   news|
|--ixiTypG8g|  1029|    2347|   97434|     100810|   news|
|-64r1hcxtV4| 45628|   17264| 5283664|    5346556|mukbang|
|-6IgkG5yZfo| 10959|     525|  844015|     855499|physics|

In [64]:
# Reparticionar pelos campos de maior uso em joins e buscas
df_video_otimizado = df_video_otimizado.repartition("Video ID", "Keyword")
df_comments_otimizado = df_comments_otimizado.repartition("Video ID", "Keyword")

In [65]:
# Verificar plano de execução
df_comments_otimizado.explain()
df_video_otimizado.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Exchange hashpartitioning(Video ID#1441, Keyword#1444, 200), REPARTITION_BY_COL, [plan_id=2503]
   +- Project [Video ID#1441, Likes#1445, Comments#1446, Views#1447, Interaction#1448, Keyword#1444]
      +- Filter (isnotnull(Likes#1445) AND isnotnull(Comments#1446))
         +- FileScan parquet [Video ID#1441,Keyword#1444,Likes#1445,Comments#1446,Views#1447,Interaction#1448] Batched: true, DataFilters: [isnotnull(Likes#1445), isnotnull(Comments#1446)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/content/videos-comments-tratados.snappy.parquet], PartitionFilters: [], PushedFilters: [IsNotNull(Likes), IsNotNull(Comments)], ReadSchema: struct<Video ID:string,Keyword:string,Likes:int,Comments:int,Views:int,Interaction:int>


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Exchange hashpartitioning(Video ID#1414, Keyword#1416, 200), REPARTITION_BY_COL, [plan_id=2515]
   +- Project [Video ID#1414, Likes#1417

In [66]:
# Fazer um join das tabelas otimizadas
join_video_comments_otimizado = df_video_otimizado.join(df_comments_otimizado, on = ["Video ID", "Keyword", "Comments", "Interaction", "Views", "Likes"])
join_video_comments_otimizado.show(5)

+-----------+-------+--------+-----------+-------+-----+
|   Video ID|Keyword|Comments|Interaction|  Views|Likes|
+-----------+-------+--------+-----------+-------+-----+
|JsOL9GBAugY| sports|      57|    1054392|1051751| 2584|
|JsOL9GBAugY| sports|      57|    1054392|1051751| 2584|
|JsOL9GBAugY| sports|      57|    1054392|1051751| 2584|
|JsOL9GBAugY| sports|      57|    1054392|1051751| 2584|
|JsOL9GBAugY| sports|      57|    1054392|1051751| 2584|
+-----------+-------+--------+-----------+-------+-----+
only showing top 5 rows



In [67]:
# Salvar a tabela otimizada
join_video_comments_otimizado.coalesce(5)

join_video_comments_otimizado.write.mode("overwrite").option('header', 'true').parquet("join-videos-comments-otimizado")

In [68]:
# Parar execução
spark.stop()