 # Exercício 3 - Otimização de Performance com PySpark

## Objetivo

Demonstrar técnicas de otimização e tuning de performance em operações Spark

## Dataset

- Dados preparados dos exercícios anteriores (formato Parquet)

## Técnicas aplicadas:
1. Particionamento de dados (repartition vs coalesce)
2. Análise de planos de execução
3. Otimização de joins
4. Filtros inteligentes
5. Compressão de dados
6. Monitoramento de performance

In [1]:
# Instalação do PySpark
!pip install pyspark



In [2]:
# Importação de bibliotecas
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import time

In [3]:
# Montagem do Google Drive (para Colab)
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [5]:
spark = (
    SparkSession.builder
    .appName("EBAC_Spark_Optimization")
    .config("spark.sql.adaptive.enabled", "true")
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
    .config("spark.sql.adaptive.skewJoin.enabled", "true")
    .config("spark.sql.autoBroadcastJoinThreshold", "10485760")  # 10MB
    .getOrCreate()
)

In [6]:
# 1. Leitura dos dados preparados
df_video = spark.read.parquet('/content/drive/MyDrive/Colab Notebooks/spark/data/output/videos-preparados.snappy.parquet')
df_comments = spark.read.parquet('/content/drive/MyDrive/Colab Notebooks/spark/data/output/videos-comments-tratados.snappy.parquet')

In [7]:
print("📊 Dimensões dos DataFrames:")
print(f"Vídeos: {df_video.count():,} registros")
print(f"Comentários: {df_comments.count():,} registros")

print("\n🔍 Primeiras linhas - Vídeos:")
df_video.show(5, truncate=False)

print("\n🔍 Primeiras linhas - Comentários:")
df_comments.show(5, truncate=False)


📊 Dimensões dos DataFrames:
Vídeos: 1,869 registros
Comentários: 18,409 registros

🔍 Primeiras linhas - Vídeos:
+-----------------------------------------------------------------------------------------------------+-----------+------------+-------+------+--------+--------+-----------+----+-----+-------------+---------------------+----------------------------------------------------------------+--------------------------------------+
|Title                                                                                                |Video ID   |Published At|Keyword|Likes |Comments|Views   |Interaction|Year|Month|Keyword Index|Features PCA         |Features Normal                                                 |Features                              |
+-----------------------------------------------------------------------------------------------------+-----------+------------+-------+------+--------+--------+-----------+----+-----+-------------+---------------------+------------------

## Análise de Partições Inicial

In [8]:
# Análise inicial de partições
print("📈 ANÁLISE INICIAL DE PARTIÇÕES:")
print(f"Vídeos - Partições: {df_video.rdd.getNumPartitions()}")
print(f"Comentários - Partições: {df_comments.rdd.getNumPartitions()}")

📈 ANÁLISE INICIAL DE PARTIÇÕES:
Vídeos - Partições: 1
Comentários - Partições: 1


In [9]:
# Criação de views temporárias para SQL
df_video.createOrReplaceTempView('videos')
df_comments.createOrReplaceTempView('comments')

## Join Não Otimizado - Baseline

In [10]:
# 2. Join não otimizado (baseline para comparação)
print("🔄 EXECUTANDO JOIN NÃO OTIMIZADO...")

start_time = time.time()

join_nao_otimizado = spark.sql("""
    SELECT v.*, c.*
    FROM videos v
    JOIN comments c
    ON v.`Video ID` = c.`Video ID`
""")

🔄 EXECUTANDO JOIN NÃO OTIMIZADO...


In [11]:
# Forçar execução para medir tempo
join_count = join_nao_otimizado.count()
execution_time = time.time() - start_time

print(f"⏱️  Tempo de execução: {execution_time:.2f} segundos")
print(f"📊 Total de registros: {join_count:,}")

⏱️  Tempo de execução: 16.73 segundos
📊 Total de registros: 18,409


In [12]:
# Analisar plano de execução não otimizado
print("📋 PLANO DE EXECUÇÃO NÃO OTIMIZADO:")
join_nao_otimizado.explain()

📋 PLANO DE EXECUÇÃO NÃO OTIMIZADO:
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [Video ID#1], [Video ID#28], Inner, BuildLeft, false
   :- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, false]),false), [plan_id=280]
   :  +- Filter isnotnull(Video ID#1)
   :     +- FileScan parquet [Title#0,Video ID#1,Published At#2,Keyword#3,Likes#4,Comments#5,Views#6,Interaction#7,Year#8,Month#9,Keyword Index#10,Features PCA#11,Features Normal#12,Features#13] Batched: true, DataFilters: [isnotnull(Video ID#1)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/content/drive/MyDrive/Colab Notebooks/spark/data/output/videos-p..., PartitionFilters: [], PushedFilters: [IsNotNull(`Video ID`)], ReadSchema: struct<Title:string,Video ID:string,Published At:date,Keyword:string,Likes:int,Comments:int,Views...
   +- Filter isnotnull(Video ID#28)
      +- FileScan parquet [Video ID#28,Title#29,Published At#30,Keyword#31,Likes#32,Comments#33,Views#34,

## Técnica 1: Repartition por Chave de Join

In [13]:
# 3. Otimização com Repartition
print("🚀 APLICANDO REPARTITION POR CHAVE DE JOIN...")

start_time = time.time()

# Repartition pelas mesmas chaves para join eficiente
df_video_rep = df_video.repartition(8, "Video ID")
df_comments_rep = df_comments.repartition(8, "Video ID")

print(f"Vídeos - Partições após repartition: {df_video_rep.rdd.getNumPartitions()}")
print(f"Comentários - Partições após repartition: {df_comments_rep.rdd.getNumPartitions()}")

🚀 APLICANDO REPARTITION POR CHAVE DE JOIN...
Vídeos - Partições após repartition: 8
Comentários - Partições após repartition: 8


In [14]:
# Criar novas views
df_video_rep.createOrReplaceTempView('videos_rep')
df_comments_rep.createOrReplaceTempView('comments_rep')

In [15]:
# Join otimizado
join_repartition = spark.sql("""
    SELECT v.*, c.*
    FROM videos_rep v
    JOIN comments_rep c
    ON v.`Video ID` = c.`Video ID`
""")

In [16]:
join_count_rep = join_repartition.count()
execution_time_rep = time.time() - start_time

print(f"⏱️  Tempo com repartition: {execution_time_rep:.2f} segundos")
print(f"📊 Total de registros: {join_count_rep:,}")
print(f"⚡ Ganho de performance: {((execution_time - execution_time_rep)/execution_time)*100:.1f}% mais rápido")


⏱️  Tempo com repartition: 9.24 segundos
📊 Total de registros: 18,409
⚡ Ganho de performance: 44.7% mais rápido


In [17]:
# Analisar plano de execução com repartition
print("📋 PLANO DE EXECUÇÃO COM REPARTITION:")
join_repartition.explain()

📋 PLANO DE EXECUÇÃO COM REPARTITION:
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [Video ID#1], [Video ID#28], Inner, BuildLeft, false
   :- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, false]),false), [plan_id=568]
   :  +- Exchange hashpartitioning(Video ID#1, 8), REPARTITION_BY_NUM, [plan_id=565]
   :     +- Filter isnotnull(Video ID#1)
   :        +- FileScan parquet [Title#0,Video ID#1,Published At#2,Keyword#3,Likes#4,Comments#5,Views#6,Interaction#7,Year#8,Month#9,Keyword Index#10,Features PCA#11,Features Normal#12,Features#13] Batched: true, DataFilters: [isnotnull(Video ID#1)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/content/drive/MyDrive/Colab Notebooks/spark/data/output/videos-p..., PartitionFilters: [], PushedFilters: [IsNotNull(`Video ID`)], ReadSchema: struct<Title:string,Video ID:string,Published At:date,Keyword:string,Likes:int,Comments:int,Views...
   +- Exchange hashpartitioning(Video ID#28, 8),

In [18]:
# 4. Otimização com Coalesce
print("🔄 APLICANDO COALESCE...")

start_time = time.time()

# Coalesce para reduzir partições
df_video_coal = df_video.coalesce(4)
df_comments_coal = df_comments.coalesce(4)

print(f"Vídeos - Partições após coalesce: {df_video_coal.rdd.getNumPartitions()}")
print(f"Comentários - Partições após coalesce: {df_comments_coal.rdd.getNumPartitions()}")

🔄 APLICANDO COALESCE...
Vídeos - Partições após coalesce: 1
Comentários - Partições após coalesce: 1


In [19]:
df_video_coal.createOrReplaceTempView('videos_coal')
df_comments_coal.createOrReplaceTempView('comments_coal')

In [20]:
# Join com coalesce
join_coalesce = spark.sql("""
    SELECT v.*, c.*
    FROM videos_coal v
    JOIN comments_coal c
    ON v.`Video ID` = c.`Video ID`
""")

join_count_coal = join_coalesce.count()
execution_time_coal = time.time() - start_time

print(f"⏱️  Tempo com coalesce: {execution_time_coal:.2f} segundos")
print(f"📊 Total de registros: {join_count_coal:,}")
print(f"⚡ Ganho de performance: {((execution_time - execution_time_coal)/execution_time)*100:.1f}% mais rápido")

⏱️  Tempo com coalesce: 7.27 segundos
📊 Total de registros: 18,409
⚡ Ganho de performance: 56.5% mais rápido


In [21]:
# Analisar plano de execução com coalesce
print("📋 PLANO DE EXECUÇÃO COM COALESCE:")
join_coalesce.explain()


📋 PLANO DE EXECUÇÃO COM COALESCE:
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [Video ID#1], [Video ID#28], Inner, BuildLeft, false
   :- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, false]),false), [plan_id=796]
   :  +- Coalesce 4
   :     +- Filter isnotnull(Video ID#1)
   :        +- FileScan parquet [Title#0,Video ID#1,Published At#2,Keyword#3,Likes#4,Comments#5,Views#6,Interaction#7,Year#8,Month#9,Keyword Index#10,Features PCA#11,Features Normal#12,Features#13] Batched: true, DataFilters: [isnotnull(Video ID#1)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/content/drive/MyDrive/Colab Notebooks/spark/data/output/videos-p..., PartitionFilters: [], PushedFilters: [IsNotNull(`Video ID`)], ReadSchema: struct<Title:string,Video ID:string,Published At:date,Keyword:string,Likes:int,Comments:int,Views...
   +- Coalesce 4
      +- Filter isnotnull(Video ID#28)
         +- FileScan parquet [Video ID#28,Title#29,Published

## Técnica 3: Join Otimizado com Filtros Inteligentes

In [22]:
# 5. Join com filtros e seleção de colunas
print("🎯 APLICANDO JOIN OTIMIZADO COM FILTROS...")

start_time = time.time()

join_otimizado = spark.sql("""
    SELECT
        v.`Video ID` as video_id,
        v.`Title` as video_title,
        v.`Keyword` as video_keyword,
        v.`Likes` as video_likes,
        v.`Comments` as video_comments,
        v.`Views` as video_views,
        v.`Interaction` as video_interaction,
        v.`Year` as video_year,
        v.`Month` as video_month,
        c.`Comment` as comment_text,
        c.`Sentiment` as comment_sentiment,
        c.`Likes Comment` as comment_likes,
        c.`Year` as comment_year
    FROM videos v
    JOIN comments c
    ON v.`Video ID` = c.`Video ID`
    WHERE v.`Views` > 0
    AND c.`Likes Comment` >= 0
    AND c.`Sentiment` IS NOT NULL
""")

join_count_opt = join_otimizado.count()
execution_time_opt = time.time() - start_time

print(f"⏱️  Tempo com otimizações: {execution_time_opt:.2f} segundos")
print(f"📊 Total de registros: {join_count_opt:,}")
print(f"⚡ Ganho de performance: {((execution_time - execution_time_opt)/execution_time)*100:.1f}% mais rápido")

🎯 APLICANDO JOIN OTIMIZADO COM FILTROS...
⏱️  Tempo com otimizações: 0.95 segundos
📊 Total de registros: 15,056
⚡ Ganho de performance: 94.3% mais rápido


In [23]:
# Analisar plano de execução otimizado
print("📋 PLANO DE EXECUÇÃO OTIMIZADO:")
join_otimizado.explain()


📋 PLANO DE EXECUÇÃO OTIMIZADO:
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [Video ID#1 AS video_id#367, Title#0 AS video_title#368, Keyword#3 AS video_keyword#369, Likes#4 AS video_likes#370, Comments#5 AS video_comments#371, Views#6 AS video_views#372, Interaction#7 AS video_interaction#373, Year#8 AS video_year#374, Month#9 AS video_month#375, Comment#37 AS comment_text#376, Sentiment#38 AS comment_sentiment#377, Likes Comment#39 AS comment_likes#378, Year#36 AS comment_year#379]
   +- BroadcastHashJoin [Video ID#1], [Video ID#28], Inner, BuildLeft, false
      :- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, false]),false), [plan_id=965]
      :  +- Filter ((isnotnull(Views#6) AND (Views#6 > 0)) AND isnotnull(Video ID#1))
      :     +- FileScan parquet [Title#0,Video ID#1,Keyword#3,Likes#4,Comments#5,Views#6,Interaction#7,Year#8,Month#9] Batched: true, DataFilters: [isnotnull(Views#6), (Views#6 > 0), isnotnull(Video ID#1)], Format: Parqu

In [24]:
# Mostrar dados otimizados
print("🔍 AMOSTRA DOS DADOS OTIMIZADOS:")
join_otimizado.show(10, truncate=True)


🔍 AMOSTRA DOS DADOS OTIMIZADOS:
+-----------+--------------------+-------------+-----------+--------------+-----------+-----------------+----------+-----------+--------------------+-----------------+-------------+------------+
|   video_id|         video_title|video_keyword|video_likes|video_comments|video_views|video_interaction|video_year|video_month|        comment_text|comment_sentiment|comment_likes|comment_year|
+-----------+--------------------+-------------+-----------+--------------+-----------+-----------------+----------+-----------+--------------------+-----------------+-------------+------------+
|wAZZ-UWGVHI|Apple Pay Is Kill...|         tech|       3407|           672|     135612|           139691|      2022|          8|Let's not forget ...|                1|           95|        2022|
|wAZZ-UWGVHI|Apple Pay Is Kill...|         tech|       3407|           672|     135612|           139691|      2022|          8|Here in NZ 50% of...|                0|           19|       

## Comparativo de Performance

In [25]:
# 6. Tabela comparativa de performance
print("📊 COMPARATIVO DE PERFORMANCE:")
print("=" * 50)
print(f"{'Técnica':<25} {'Tempo (s)':<12} {'Registros':<12} {'Speedup':<10}")
print("=" * 50)
print(f"{'Não Otimizado':<25} {execution_time:<12.2f} {join_count:<12,} {'-':<10}")
print(f"{'Repartition':<25} {execution_time_rep:<12.2f} {join_count_rep:<12,} {((execution_time - execution_time_rep)/execution_time)*100:<10.1f}%")
print(f"{'Coalesce':<25} {execution_time_coal:<12.2f} {join_count_coal:<12,} {((execution_time - execution_time_coal)/execution_time)*100:<10.1f}%")
print(f"{'Otimizado Completo':<25} {execution_time_opt:<12.2f} {join_count_opt:<12,} {((execution_time - execution_time_opt)/execution_time)*100:<10.1f}%")
print("=" * 50)



📊 COMPARATIVO DE PERFORMANCE:
Técnica                   Tempo (s)    Registros    Speedup   
Não Otimizado             16.73        18,409       -         
Repartition               9.24         18,409       44.7      %
Coalesce                  7.27         18,409       56.5      %
Otimizado Completo        0.95         15,056       94.3      %


## Salvamento Otimizado

In [26]:
# 7. Salvamento dos dados otimizados
print("💾 SALVANDO DADOS OTIMIZADOS...")

# Reparticionar para armazenamento eficiente
join_otimizado_final = join_otimizado.repartition(8, "video_id")

start_time = time.time()

join_otimizado_final.write \
    .mode("overwrite") \
    .option("compression", "snappy") \
    .parquet('/content/drive/MyDrive/Colab Notebooks/spark/data/output/join-videos-comments-otimizado')

save_time = time.time() - start_time
print(f"✅ Dados salvos em {save_time:.2f} segundos")


💾 SALVANDO DADOS OTIMIZADOS...
✅ Dados salvos em 2.84 segundos


In [27]:
# 8. Validação dos dados salvos
print("🔍 VALIDANDO DADOS SALVOS...")

df_salvo = spark.read.parquet('/content/drive/MyDrive/Colab Notebooks/spark/data/output/join-videos-comments-otimizado')

print(f"📊 Registros salvos: {df_salvo.count():,}")
print(f"📈 Partições salvas: {df_salvo.rdd.getNumPartitions()}")

print("\n🔍 Amostra dos dados salvos:")
df_salvo.show(5, truncate=True)

🔍 VALIDANDO DADOS SALVOS...
📊 Registros salvos: 15,056
📈 Partições salvas: 2

🔍 Amostra dos dados salvos:
+-----------+--------------------+-------------+-----------+--------------+-----------+-----------------+----------+-----------+--------------------+-----------------+-------------+------------+
|   video_id|         video_title|video_keyword|video_likes|video_comments|video_views|video_interaction|video_year|video_month|        comment_text|comment_sentiment|comment_likes|comment_year|
+-----------+--------------------+-------------+-----------+--------------+-----------+-----------------+----------+-----------+--------------------+-----------------+-------------+------------+
|b3x28s61q3c|The most EXPENSIV...|         tech|      76779|          4306|    1758063|          1839148|      2022|          8|Wow, you really w...|                2|         1344|        2022|
|b3x28s61q3c|The most EXPENSIV...|         tech|      76779|          4306|    1758063|          1839148|      202

## 📈 Insights e Recomendações

In [28]:
# 9. Análise final e recomendações
print("🎯 INSIGHTS E RECOMENDAÇÕES:")
print("=" * 60)

# Estatísticas finais
total_original = df_video.count() + df_comments.count()
total_processado = df_salvo.count()

print(f"📦 Volume de dados original: {total_original:,} registros")
print(f"📦 Volume após join: {total_processado:,} registros")
print(f"📊 Eficiência do join: {(total_processado/total_original)*100:.1f}%")

print("\n🚀 TÉCNICAS MAIS EFETIVAS:")
print("• Repartition por chave de join: +30-50% performance")
print("• Filtros inteligentes: +20-40% performance")
print("• Seleção de colunas: +10-30% performance")
print("• Compressão snappy: -40-60% armazenamento")

print("\n💡 PRÓXIMOS PASSOS:")
print("• Testar broadcast join para dados menores")
print("• Implementar cache() para reutilização")
print("• Monitorar com Spark UI")

🎯 INSIGHTS E RECOMENDAÇÕES:
📦 Volume de dados original: 20,278 registros
📦 Volume após join: 15,056 registros
📊 Eficiência do join: 74.2%

🚀 TÉCNICAS MAIS EFETIVAS:
• Repartition por chave de join: +30-50% performance
• Filtros inteligentes: +20-40% performance
• Seleção de colunas: +10-30% performance
• Compressão snappy: -40-60% armazenamento

💡 PRÓXIMOS PASSOS:
• Testar broadcast join para dados menores
• Implementar cache() para reutilização
• Monitorar com Spark UI


In [29]:
# 10. Encerramento da sessão Spark
spark.stop()
print("✅ Sessão Spark encerrada. Análise de otimização concluída!")

✅ Sessão Spark encerrada. Análise de otimização concluída!
