In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg
import time

In [8]:
#iniciar sessão do Spark
spark = SparkSession.builder.appName("Exemplo proc.Distribuido").getOrCreate()
#carregar dados
df = spark.read.csv("br_rf_arrecadacao_uf.csv", header=True, inferSchema=True)

#Operações básicas
filtered_df = df.filter(col("ipi_automoveis") > 1000)
grouped_df = df.groupBy("sigla_uf").agg(sum("irpf").alias("total_irpf"))

# Oprerações complexas para comparação de desempenho
def complex_operation(df):
    start_time = time.time()
    result = df.groupBy("ano", "sigla_uf") \
        .agg(sum("irpf").alias("total_irpf"), avg("ipi_fumo").alias("media_ipi_fumo")) \
        .orderBy("ano", "sigla_uf")
    result.count()  # Forçar a execução
    end_time = time.time()
    return end_time - start_time

spark_time = complex_operation(df)
print(f"Tempo de execução no Spark: {spark_time:.2f} segundos")


# Comparação com Pandas (se o conjunto de dados for pequeno o suficiente para caber na memoria)

pandas_df = df.toPandas()

def pandas_complex_operation(df):
    start_time = time.time()
    
    result = df.groupby(["ano", "sigla_uf"]).agg({
        "irpf": "sum",
        "ipi_fumo": "mean"
    }).reset_index()

    result = result.sort_values(by=["ano", "sigla_uf"])
    
    end_time = time.time()
    return end_time - start_time



pandas_time = pandas_complex_operation(pandas_df)
print(f"Tempo de execução no Pandas: {pandas_time:.2f} segundos")

# Visualizar plano de execução
df.groupBy("ano").agg(sum("irpf").alias("totalr")).explain()

# Encerrar sessão do Spark
spark.stop()
           

Tempo de execução no Spark: 0.57 segundos
Tempo de execução no Pandas: 0.05 segundos
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[ano#499], functions=[sum(irpf#509)])
   +- Exchange hashpartitioning(ano#499, 200), ENSURE_REQUIREMENTS, [plan_id=265]
      +- HashAggregate(keys=[ano#499], functions=[partial_sum(irpf#509)])
         +- FileScan csv [ano#499,irpf#509] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/c:/Users/natha/OneDrive/Área de Trabalho/POS/spark/Aula3/br_rf_a..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<ano:int,irpf:double>


