In [None]:
import time
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, sum, count, rank, rand, udf
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import StringType

# ===================================================================
# 1. SETUP DA SESSÃO E CONFIGURAÇÃO
# ===================================================================

spark = SparkSession.builder \
    .appName("BenchmarkCompleto") \
    .master("local[*]") \
    .getOrCreate()

print("Sessão Spark pronta em modo local!")

# Lista de arquivos para testar
arquivos_para_testar = [
    "convertido_100.csv", "convertido_100.json", "convertido_100.parquet",
    "convertido_500000.csv", "convertido_500000.json", "convertido_500000.parquet",
    "convertido_1000000.csv", "convertido_1000000.json", "convertido_1000000.parquet",
    "convertido_2000000.csv", "convertido_2000000.json", "convertido_2000000.parquet",
    "convertido_5000000.csv", "convertido_5000000.json", "convertido_5000000.parquet"
]

resultados_finais = []

# ===================================================================
# 2. DEFINIÇÃO DAS FUNÇÕES DE TESTE
# ===================================================================

def teste_agregacao_complexa(df):
    """
    TESTE 1: Agrupa por categoria e calcula várias métricas (soma, média, contagem).
    """
    df.groupBy("categoria").agg(
        F.sum("valor").alias("valor_total"),
        F.avg("valor").alias("valor_medio"),
        F.count("*").alias("qtd_transacoes")
    ).collect()

def teste_join(df):
    """
    TESTE 2: Simula um JOIN com uma tabela de "metadados" das categorias.
    """
    # Cria uma tabela de dimensão sintética para o join
    df_categorias = df.select("categoria").distinct().withColumn("meta_vendas", (rand() * 1000).cast("integer"))
    
    # Executa o join e uma ação para forçar a computação
    df.join(df_categorias, "categoria", "inner").count()

def teste_funcao_janela(df):
    """
    TESTE 3: Calcula o ranking das vendas mais valiosas dentro de cada categoria.
    """
    windowSpec  = Window.partitionBy("categoria").orderBy(col("valor").desc())
    
    # Adiciona a coluna de ranking e força a computação
    df.withColumn("rank", rank().over(windowSpec)).collect()

def teste_udf(df):
    """
    TESTE 4: Aplica uma função Python (lenta) para classificar o valor.
    """
    # Define uma função Python simples
    def classificar_valor(valor):
        if valor is None:
            return "N/A"
        if valor > 500:
            return "ALTO"
        elif valor > 100:
            return "MEDIO"
        else:
            return "BAIXO"

    # Registra a função como uma UDF
    classificar_valor_udf = udf(classificar_valor, StringType())

    # Aplica a UDF e força a computação
    df.withColumn("classificacao", classificar_valor_udf(col("valor"))).collect()


# ===================================================================
# 3. LOOP PRINCIPAL DE TESTES
# ===================================================================

for nome_arquivo in arquivos_para_testar:
    try:
        print(f"\n--- Processando arquivo: {nome_arquivo} ---")
        caminho_completo = f"/home/jovyan/work/data/{nome_arquivo}"
        formato = nome_arquivo.split('.')[-1]
        num_linhas = int(nome_arquivo.split('_')[1].split('.')[0])

        # Carrega o DataFrame
        if formato == 'csv':
            df = spark.read.csv(caminho_completo, header=True, inferSchema=True).cache()
        elif formato == 'json':
            df = spark.read.json(caminho_completo).cache()
        elif formato == 'parquet':
            df = spark.read.parquet(caminho_completo).cache()
        else:
            continue
        
        # Força o cache
        df.count()

        # Dicionário base para os resultados
        base_resultado = {"arquivo": nome_arquivo, "formato": formato, "linhas": num_linhas}

        # Executa e cronometra cada teste
        for teste_func, nome_teste in [(teste_agregacao_complexa, "Agregação Complexa"), 
                                        (teste_join, "Join"), 
                                        (teste_funcao_janela, "Função Janela"),
                                        (teste_udf, "UDF")]:
            print(f"  > Executando teste: {nome_teste}...")
            start_time = time.time()
            teste_func(df)
            tempo_execucao = time.time() - start_time
            
            # Cria uma cópia do resultado base e adiciona os dados do teste específico
            resultado_teste = base_resultado.copy()
            resultado_teste["tipo_teste"] = nome_teste
            resultado_teste["tempo_execucao_s"] = round(tempo_execucao, 4)
            resultados_finais.append(resultado_teste)
            print(f"    ...concluído em {tempo_execucao:.4f}s")
        
        # Limpa o cache para o próximo arquivo
        df.unpersist()

    except Exception as e:
        print(f"Ocorreu um erro ao processar {nome_arquivo}: {e}")

# ===================================================================
# 4. EXIBIÇÃO E ENCERRAMENTO
# ===================================================================
print("\n\n--- TODOS OS TESTES CONCLUÍDOS! ---")

if resultados_finais:
    df_resultados = pd.DataFrame(resultados_finais)
    display(df_resultados.sort_values(by=['linhas', 'formato', 'tipo_teste']))
else:
    print("Nenhum resultado foi coletado.")

spark.stop()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/07/04 00:20:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Sessão Spark pronta em modo local!

--- Processando arquivo: convertido_100.csv ---
  > Executando teste: Agregação Complexa...


                                                                                

    ...concluído em 1.8306s
  > Executando teste: Join...
    ...concluído em 1.6074s
  > Executando teste: Função Janela...
    ...concluído em 0.9615s
  > Executando teste: UDF...


                                                                                

    ...concluído em 1.5628s

--- Processando arquivo: convertido_100.json ---
  > Executando teste: Agregação Complexa...
    ...concluído em 0.4113s
  > Executando teste: Join...
    ...concluído em 0.5501s
  > Executando teste: Função Janela...
    ...concluído em 0.3288s
  > Executando teste: UDF...
    ...concluído em 0.4423s

--- Processando arquivo: convertido_100.parquet ---
  > Executando teste: Agregação Complexa...
    ...concluído em 0.2220s
  > Executando teste: Join...
    ...concluído em 0.3923s
  > Executando teste: Função Janela...
    ...concluído em 0.2995s
  > Executando teste: UDF...
    ...concluído em 0.3238s

--- Processando arquivo: convertido_500000.csv ---


                                                                                

  > Executando teste: Agregação Complexa...
    ...concluído em 0.5342s
  > Executando teste: Join...
    ...concluído em 0.8469s
  > Executando teste: Função Janela...


                                                                                

    ...concluído em 8.2121s
  > Executando teste: UDF...


                                                                                

    ...concluído em 9.9753s

--- Processando arquivo: convertido_500000.json ---


                                                                                

  > Executando teste: Agregação Complexa...
    ...concluído em 0.4705s
  > Executando teste: Join...
    ...concluído em 1.3370s
  > Executando teste: Função Janela...


                                                                                

    ...concluído em 5.6979s
  > Executando teste: UDF...


                                                                                

    ...concluído em 5.8314s

--- Processando arquivo: convertido_500000.parquet ---


                                                                                

  > Executando teste: Agregação Complexa...
    ...concluído em 0.4576s
  > Executando teste: Join...
    ...concluído em 0.6121s
  > Executando teste: Função Janela...


                                                                                

    ...concluído em 5.6540s
  > Executando teste: UDF...


                                                                                

    ...concluído em 4.9255s

--- Processando arquivo: convertido_1000000.csv ---


                                                                                

  > Executando teste: Agregação Complexa...
    ...concluído em 0.2848s
  > Executando teste: Join...
    ...concluído em 1.1105s
  > Executando teste: Função Janela...


                                                                                

    ...concluído em 13.3812s
  > Executando teste: UDF...


                                                                                

    ...concluído em 12.0892s

--- Processando arquivo: convertido_1000000.json ---


                                                                                

  > Executando teste: Agregação Complexa...
    ...concluído em 0.3351s
  > Executando teste: Join...
    ...concluído em 0.8717s
  > Executando teste: Função Janela...


                                                                                

    ...concluído em 9.3413s
  > Executando teste: UDF...


                                                                                

    ...concluído em 10.5622s

--- Processando arquivo: convertido_1000000.parquet ---


                                                                                

  > Executando teste: Agregação Complexa...
    ...concluído em 0.2339s
  > Executando teste: Join...
    ...concluído em 0.9867s
  > Executando teste: Função Janela...


                                                                                

    ...concluído em 8.7157s
  > Executando teste: UDF...


                                                                                

    ...concluído em 9.6303s

--- Processando arquivo: convertido_2000000.csv ---


                                                                                

  > Executando teste: Agregação Complexa...
    ...concluído em 0.5313s
  > Executando teste: Join...
    ...concluído em 1.2116s
  > Executando teste: Função Janela...


                                                                                

    ...concluído em 26.9335s
  > Executando teste: UDF...


                                                                                

    ...concluído em 26.0702s

--- Processando arquivo: convertido_2000000.json ---


                                                                                

  > Executando teste: Agregação Complexa...
    ...concluído em 0.3709s
  > Executando teste: Join...


                                                                                

    ...concluído em 1.6044s
  > Executando teste: Função Janela...


                                                                                

    ...concluído em 19.2009s
  > Executando teste: UDF...


                                                                                

    ...concluído em 25.0875s

--- Processando arquivo: convertido_2000000.parquet ---


                                                                                

  > Executando teste: Agregação Complexa...
    ...concluído em 0.3671s
  > Executando teste: Join...


                                                                                

    ...concluído em 1.2735s
  > Executando teste: Função Janela...


                                                                                

    ...concluído em 18.5901s
  > Executando teste: UDF...


                                                                                

    ...concluído em 20.7714s

--- Processando arquivo: convertido_5000000.csv ---


                                                                                