### PySpark

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, sum, year, row_number, to_date
from pyspark.sql.window import Window
import time

# Inicializa Spark com paralelismo local máximo
spark = (
    SparkSession.builder
    .appName("Analises50Milhoes")
    .master("local[*]")  # Usa todos os núcleos disponíveis
    .config("spark.driver.memory", "6g")  # aumenta heap do driver
    .config("spark.executor.memory", "6g")  # aumenta heap do executor
    .config("spark.sql.shuffle.partitions", "4")
    .getOrCreate()
)

tempos = []

start_total = time.time()

# Leitura
t0 = time.time()
df = spark.read.csv("dados_500_milhoes.csv", header=True, inferSchema=True)
df = df.withColumn("data", to_date("data"))
df = df.repartition(8)  # paralelismo explícito
t1 = time.time()
tempos.append(("Leitura e preparação", t1 - t0))

# 1. Filtro + média
t0 = time.time()
res1 = df.filter((col("data") >= "2023-06-01") & (col("data") <= "2024-06-01")) \
         .groupBy("categoria").agg(avg("valor").alias("media_valor"))
res1.count()
t1 = time.time()
tempos.append(("Filtro + média", t1 - t0))

# 2. Contagem com condições
t0 = time.time()
res2 = df.filter((col("valor") > 500) & (col("flag") == True) & (col("categoria") != "C")).count()
t1 = time.time()
tempos.append(("Contagem com condições", t1 - t0))

# 3. Agrupamento por ano + categoria
t0 = time.time()
df = df.withColumn("ano", year("data"))
res3 = df.groupBy("ano", "categoria").agg(avg("valor").alias("media"), sum("valor").alias("soma"))
res3.count()
t1 = time.time()
tempos.append(("Agrupamento por ano + categoria", t1 - t0))

# 4. Média móvel
t0 = time.time()
windowSpec = Window.partitionBy("categoria").orderBy("data").rowsBetween(-6, 0)
res4 = df.withColumn("media_movel", avg("valor").over(windowSpec))
res4.select("media_movel").count()
t1 = time.time()
tempos.append(("Média móvel", t1 - t0))

# 5. Top 10 maiores por categoria
t0 = time.time()
rank_spec = Window.partitionBy("categoria").orderBy(col("valor").desc())
res5 = df.withColumn("rank", row_number().over(rank_spec)).filter(col("rank") <= 10)
res5.count()
t1 = time.time()
tempos.append(("Top 10 por categoria", t1 - t0))

# Tempo total
end_total = time.time()
tempos.append(("Tempo total", end_total - start_total))

# Criar DataFrame de tempos
schema = ["etapa", "tempo_segundos"]
df_tempos_spark = spark.createDataFrame(tempos, schema=schema)

df_tempos_spark.show(truncate=False)


                                                                                

+-------------------------------+------------------+
|etapa                          |tempo_segundos    |
+-------------------------------+------------------+
|Leitura e preparação           |37.36648511886597 |
|Filtro + média                 |35.93171310424805 |
|Contagem com condições         |21.732516527175903|
|Agrupamento por ano + categoria|48.21799921989441 |
|Média móvel                    |10.60341739654541 |
|Top 10 por categoria           |78.31700611114502 |
|Tempo total                    |232.16963577270508|
+-------------------------------+------------------+



#### Pandas

In [None]:
import pandas as pd
import numpy as np
import time

tempos = []

start_total = time.time()

# Leitura do CSV
t0 = time.time()
df = pd.read_csv("dados_500_milhoes.csv", parse_dates=['data'])
t1 = time.time()
tempos.append(("Leitura e preparação", t1 - t0))

# 1. Filtro + média por categoria
t0 = time.time()
filtro = df[(df['data'] >= '2023-06-01') & (df['data'] <= '2024-06-01')]
res1 = filtro.groupby('categoria')['valor'].mean()
t1 = time.time()
tempos.append(("Filtro + média", t1 - t0))

# 2. Contagem com condições
t0 = time.time()
res2 = df[(df['valor'] > 500) & (df['flag']) & (df['categoria'] != 'C')].shape[0]
t1 = time.time()
tempos.append(("Contagem com condições", t1 - t0))

# 3. Agrupamento por ano + categoria
t0 = time.time()
df['ano'] = df['data'].dt.year
res3 = df.groupby(['ano', 'categoria']).agg(media=('valor', 'mean'), soma=('valor', 'sum')).reset_index()
t1 = time.time()
tempos.append(("Agrupamento por ano + categoria", t1 - t0))

# 4. Média móvel de 7 dias por categoria
t0 = time.time()
df['data'] = pd.to_datetime(df['data'])
df = df.sort_values(['categoria', 'data'])
res4 = df.groupby('categoria').rolling('7D', on='data')['valor'].mean().reset_index()
t1 = time.time()
tempos.append(("Média móvel", t1 - t0))

# 5. Top 10 maiores por categoria
t0 = time.time()
res5 = df.sort_values('valor', ascending=False).groupby('categoria').head(10)
t1 = time.time()
tempos.append(("Top 10 por categoria", t1 - t0))

end_total = time.time()
tempos.append(("Tempo total", end_total - start_total))

# Criar DataFrame com tempos
df_tempos_pandas = pd.DataFrame(tempos, columns=["etapa", "tempo_segundos"])
print(df_tempos_pandas)


                             etapa  tempo_segundos
0             Leitura e preparação       44.281432
1                   Filtro + média        4.792721
2           Contagem com condições        6.184318
3  Agrupamento por ano + categoria       11.140221
4                      Média móvel       64.974075
5             Top 10 por categoria       57.383218
6                      Tempo total      188.756314


In [None]:
import duckdb
import time
import pandas as pd

tempos = []

start_total = time.time()
con = duckdb.connect()

# 1. Filtro + média
t0 = time.time()
res1 = con.execute("""
SELECT categoria, AVG(valor) AS media_valor
FROM 'dados_500_milhoes.csv'
WHERE data BETWEEN '2023-06-01' AND '2024-06-01'
GROUP BY categoria
""").df()
t1 = time.time()
tempos.append(("Filtro + média", t1 - t0))

# 2. Contagem com condições
t0 = time.time()
res2 = con.execute("""
SELECT COUNT(*) AS total
FROM 'dados_500_milhoes.csv'
WHERE valor > 500 AND flag = TRUE AND categoria != 'C'
""").fetchone()[0]
t1 = time.time()
tempos.append(("Contagem com condições", t1 - t0))

# 3. Agrupamento por ano + categoria
t0 = time.time()
res3 = con.execute("""
SELECT EXTRACT(YEAR FROM data) AS ano, categoria,
       AVG(valor) AS media, SUM(valor) AS soma
FROM 'dados_500_milhoes.csv'
GROUP BY ano, categoria
""").df()
t1 = time.time()
tempos.append(("Agrupamento por ano + categoria", t1 - t0))

# 4. Média móvel por categoria (janela de 7 linhas)
t0 = time.time()
res4 = con.execute("""
SELECT *, 
       AVG(valor) OVER (
           PARTITION BY categoria 
           ORDER BY data 
           ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
       ) AS media_movel
FROM 'dados_500_milhoes.csv'
""").df()
t1 = time.time()
tempos.append(("Média móvel", t1 - t0))

# 5. Top 10 maiores por categoria
t0 = time.time()
res5 = con.execute("""
SELECT *
FROM (
    SELECT *, ROW_NUMBER() OVER (PARTITION BY categoria ORDER BY valor DESC) AS rank
    FROM 'dados_500_milhoes.csv'
) WHERE rank <= 10
""").df()
t1 = time.time()
tempos.append(("Top 10 por categoria", t1 - t0))

end_total = time.time()
tempos.append(("Tempo total", end_total - start_total))

# Criar DataFrame com os tempos
df_tempos_duck = pd.DataFrame(tempos, columns=["etapa", "tempo_segundos"])
print(df_tempos_duck)


                             etapa  tempo_segundos
0                   Filtro + média        5.146259
1           Contagem com condições        4.181640
2  Agrupamento por ano + categoria        4.557449
3                      Média móvel       86.124369
4             Top 10 por categoria       61.358282
5                      Tempo total      161.373033
