In [1]:
# Instalação do PySpark
!pip install pyspark



In [2]:
# Importações
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [3]:
# Montar o Google Drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
# Criar a SparkSession
spark = SparkSession.builder.getOrCreate()

In [5]:
# Leitura Arquivo
df_video = spark.read.parquet("/content/drive/MyDrive/Colab Notebooks/projeto/videos-comments-tratados-parquet/videos-preparados.snappy.parquet", header=True, inferSchema=True)

In [6]:
# Leitura Arquivo
df_comments = spark.read.parquet("/content/drive/MyDrive/Colab Notebooks/projeto/videos-comments-tratados-parquet/videos-comments-tratados.snappy.parquet", header=True, inferSchema=True)

In [7]:
# Criar tabelas temporárias para consultas SQL
df_video.createOrReplaceTempView("videos")
df_comments.createOrReplaceTempView("comments")

In [10]:
# Fazer o join usando spark.sql
join_video_comments = spark.sql("""
    SELECT v.*, c.*
    FROM videos v
    INNER JOIN comments c
    ON v.`Video ID` = c.`Video ID`
""")

In [12]:
# Repartition
df_video_rep = df_video.repartition(10, "Video ID")
df_comments_rep = df_comments.repartition(10, "Video ID")

df_video_rep.createOrReplaceTempView("videos_rep")
df_comments_rep.createOrReplaceTempView("comments_rep")

join_video_comments_rep = spark.sql("""
    SELECT v.*, c.*
    FROM videos_rep v
    INNER JOIN comments_rep c
    ON v.`Video ID` = c.`Video ID`
""")

In [13]:
# Coalesce
df_video_coalesce = df_video.coalesce(2)
df_comments_coalesce = df_comments.coalesce(2)

df_video_coalesce.createOrReplaceTempView("videos_coa")
df_comments_coalesce.createOrReplaceTempView("comments_coa")

join_video_comments_coa = spark.sql("""
    SELECT v.*, c.*
    FROM videos_coa v
    INNER JOIN comments_coa c
    ON v.`Video ID` = c.`Video ID`
""")

In [14]:
# Utilizar o explain para entender os planos de execução
print("Plano de execução padrão:")
join_video_comments.explain()

print("Plano de execução com repartition:")
join_video_comments_rep.explain()

print("Plano de execução com coalesce:")
join_video_comments_coa.explain()

Plano de execução padrão:
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [Video ID#1], [Video ID#28], Inner, BuildLeft, false
   :- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, false]),false), [plan_id=39]
   :  +- Filter isnotnull(Video ID#1)
   :     +- FileScan parquet [Title#0,Video ID#1,Published At#2,Keyword#3,Likes#4,Comments#5,Views#6,Interaction#7,Year#8,Month#9,Keyword Index#10,Features PCA#11,Features Normal#12,Features#13] Batched: true, DataFilters: [isnotnull(Video ID#1)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/content/drive/MyDrive/Colab Notebooks/projeto/videos-comments-tr..., PartitionFilters: [], PushedFilters: [IsNotNull(`Video ID`)], ReadSchema: struct<Title:string,Video ID:string,Published At:date,Keyword:string,Likes:int,Comments:int,Views...
   +- Filter isnotnull(Video ID#28)
      +- FileScan parquet [Video ID#28,Title#29,Published At#30,Keyword#31,Likes#32,Comments#33,Views#34,Interactio

In [18]:
# Refazer join de forma otimizada com filter
# --------------------------
# Aqui filtramos os dados necessários antes do join para reduzir volume de dados.
# - Selecionamos apenas colunas necessárias.
# - Aplicamos filtros para reduzir linhas.
# - Reparticionamos por chave de join para melhorar o shuffle.

# Selecionando e filtrando apenas dados relevantes
df_video_opt = df_video \
    .select("Video ID", "Title", "Keyword") \
    .filter("Keyword = 'Education'") \
    .repartition(10, "`Video ID`")

df_comments_opt = df_comments \
    .select("Comment", "Video ID", "Likes", "Sentiment") \
    .filter("Likes > 10") \
    .repartition(10, "`Video ID`")

# Criar tabelas temporárias otimizadas
df_video_opt.createOrReplaceTempView("videos_opt")
df_comments_opt.createOrReplaceTempView("comments_opt")

# Fazer join otimizado apenas com colunas relevantes
join_video_comments_opt = spark.sql("""
    SELECT v.`Video ID`, v.Title, c.Comment, c.Likes, c.Sentiment
    FROM videos_opt v
    INNER JOIN comments_opt c
    ON v.`Video ID` = c.`Video ID`
""")

In [19]:
# Salvar o resultado do join otimizado
# Salvar como parquet com compressão snappy para eficiência de armazenamento e leitura
join_video_comments_opt.write \
    .mode("overwrite") \
    .parquet("join-videos-comments-otimizado.snappy.parquet")