In [2]:
import requests
import os
import time
from pathlib import Path
import pandas as pd, pyarrow as pa
import glob 
import matplotlib.pyplot as plt
import duckdb
import numpy as np
from typing import Tuple, Dict, Any, List
import csv
import traceback

## 3. Benchmark de consultas :
* Receita por zona: TOP 10 zonas de pickup por receita total média
* Padrões temporais: Agregação por mês/semana (COUNT viagens, SUM receita, AVG distância)
* Horário de pico: Ranking de horários mais movimentados usando window functions (RANK/ROW_NUMBER)
* Análise de gorjeta: Taxa de gorjeta média por borough usando CASE WHEN para categorizar
* Viagens longas vs curtas: Comparativo usando CASE para categorizar distâncias

## 4. Para cada consulta escolhida, você deve:
* Implementar em SQL (DuckDB) e validar os resultados inspecionando os dados
* Implementar equivalente em Pandas e verificar se os resultados são consistentes
* Anotar quais consultas falharam no Pandas (memória, tempo, erro)
* Medir tempo de execução rodando cada consulta 5 vezes em cada ferramenta
* Registrar apenas consultas que executaram com sucesso para comparação de performance

### 4.1 Receita por zona: TOP 10 zonas de pickup por receita total média

In [4]:
PARQUET_PATH = "data/cleaned/yellow_tripdata_clean_snappy.parquet"  # ajuste se necessário
OUTPUT_DIR = "analysis_results"
os.makedirs(OUTPUT_DIR, exist_ok=True)

SQL_QUERY = """
-- Top 10 pickup zones by average revenue (total_amount)
SELECT
  pulocationid,
  AVG(total_amount) AS avg_total_amount,
  COUNT(*) AS trips
FROM parquet_scan('{parquet}')
GROUP BY pulocationid
ORDER BY avg_total_amount DESC
LIMIT 10;
""".format(parquet=PARQUET_PATH)

# If you want TOTAL revenue instead of average, use:
# SQL_QUERY_SUM = """
# SELECT pulocationid, SUM(total_amount) AS total_amount, COUNT(*) AS trips
# FROM parquet_scan('{parquet}')
# GROUP BY pulocationid
# ORDER BY total_amount DESC
# LIMIT 10;
# """.format(parquet=PARQUET_PATH)

REPEATS = 5
TIMEOUT_SECONDS = None  # ajuste se quiser impor timeout logic

def time_function(func, *args, repeats=5) -> Tuple[List[float], Any, Exception]:
    """
    Run func(*args) repeats times. Return list of successful times, last result (if any), and last exception (if failed).
    We stop collecting a failing run but continue trying subsequent repeats.
    """
    times = []
    last_result = None
    last_exc = None
    for i in range(repeats):
        start = time.perf_counter()
        try:
            res = func(*args)
            elapsed = time.perf_counter() - start
            times.append(elapsed)
            last_result = res
            last_exc = None
        except Exception as e:
            elapsed = time.perf_counter() - start
            last_exc = e
            # record failure but continue
            print(f"Run {i+1} failed: {e}")
            traceback.print_exc()
    return times, last_result, last_exc

##### 1) RUN WITH DUCKDB (SQL) #####
def duckdb_query_run(sql: str):
    # returns pandas.DataFrame result
    # Create a fresh in-memory connection each run to avoid caching side-effects
    conn = duckdb.connect(database=":memory:")
    try:
        # Use parquet_scan to let DuckDB stream parquet without fully materializing on Python side
        df = conn.execute(sql).df()
    finally:
        conn.close()
    return df

print("=== Executando consulta em DuckDB (SQL) ===")
duckdb_times, duckdb_result, duckdb_exc = time_function(duckdb_query_run, SQL_QUERY, repeats=REPEATS)
duckdb_successful = len(duckdb_times) > 0
print(f"DuckDB successful runs: {len(duckdb_times)} / {REPEATS}, times: {duckdb_times}")
if duckdb_successful:
    duckdb_result.to_csv(os.path.join(OUTPUT_DIR, "duckdb_top10_by_avg.csv"), index=False)

##### 2) RUN WITH PANDAS #####
# We implement Pandas equivalent: read needed columns from parquet and compute groupby.
# To reduce memory, we read only columns we need.
PANDAS_COLUMNS = ["pulocationid", "total_amount"]  # keep minimal set

def pandas_query_run(parquet_path: str):
    # This will attempt to load the necessary columns into memory and run the groupby
    # For large datasets this may raise MemoryError
    # We attempt to cast pulocationid to Int32 to keep memory smaller if possible
    df = pd.read_parquet(parquet_path, columns=PANDAS_COLUMNS, engine="pyarrow")
    # Drop missing pulocationid or total_amount
    df = df.dropna(subset=["pulocationid", "total_amount"])
    # Ensure types
    # If pulocationid is float due to earlier NaNs, convert to integer (nullable)
    if not pd.api.types.is_integer_dtype(df["pulocationid"].dtype):
        df["pulocationid"] = df["pulocationid"].astype("Int32")
    # Groupby average
    result = (
        df.groupby("pulocationid", observed=True, dropna=True)
          .agg(avg_total_amount=("total_amount", "mean"), trips=("total_amount","size"))
          .reset_index()
          .sort_values("avg_total_amount", ascending=False)
          .head(10)
    )
    return result

print("\n=== Executando consulta em Pandas ===")
pandas_times, pandas_result, pandas_exc = time_function(pandas_query_run, PARQUET_PATH, repeats=REPEATS)
pandas_successful = len(pandas_times) > 0
print(f"Pandas successful runs: {len(pandas_times)} / {REPEATS}, times: {pandas_times}")
if pandas_successful and pandas_result is not None:
    pandas_result.to_csv(os.path.join(OUTPUT_DIR, "pandas_top10_by_avg.csv"), index=False)

##### 3) VALIDAR CONSISTÊNCIA ENTRE RESULTADOS (se ambos sucederam) #####
def compare_topk(df1: pd.DataFrame, df2: pd.DataFrame, k=10, atol=1e-6) -> Dict[str, Any]:
    """Compare two top-k results by pulocationid and avg_total_amount within tolerance."""
    out = {"match_pulocationid": False, "max_abs_diff": None, "differences": None}
    if df1 is None or df2 is None:
        out["differences"] = "one of results is None"
        return out
    # Normalize ordering by pulocationid
    d1 = df1.set_index("pulocationid").sort_index()
    d2 = df2.set_index("pulocationid").sort_index()
    common = d1.index.intersection(d2.index)
    out["match_pulocationid"] = list(d1.index) == list(d2.index)
    # compute absolute differences for matching pulocationids
    diffs = {}
    max_abs = 0.0
    for pid in common:
        a = float(d1.at[pid, "avg_total_amount"])
        b = float(d2.at[pid, "avg_total_amount"])
        diff = abs(a - b)
        diffs[pid] = diff
        if diff > max_abs:
            max_abs = diff
    out["max_abs_diff"] = max_abs
    out["differences"] = diffs
    return out

comparison = None
if duckdb_successful and pandas_successful:
    comparison = compare_topk(duckdb_result, pandas_result)
    print("\n=== Comparação DuckDB x Pandas ===")
    print("Pulocationid order identical:", comparison["match_pulocationid"])
    print("Max abs difference in avg_total_amount:", comparison["max_abs_diff"])
    if comparison["max_abs_diff"] and comparison["max_abs_diff"] > 1e-6:
        print("Diferenças por pulocationid:", comparison["differences"])

##### 4) Montar relatório de timings e salvar #####
report_path = os.path.join(OUTPUT_DIR, "benchmark_report.csv")
with open(report_path, "w", newline='') as f:
    writer = csv.writer(f)
    writer.writerow(["tool", "run_index", "time_seconds", "success", "error"])
    # DuckDB runs
    for i in range(REPEATS):
        success = i < len(duckdb_times)
        t = duckdb_times[i] if i < len(duckdb_times) else None
        err = "" if success else (str(duckdb_exc) if duckdb_exc else "failed")
        writer.writerow(["duckdb", i+1, t, success, err])
    # Pandas runs
    for i in range(REPEATS):
        success = i < len(pandas_times)
        t = pandas_times[i] if i < len(pandas_times) else None
        err = "" if success else (str(pandas_exc) if pandas_exc else "failed")
        writer.writerow(["pandas", i+1, t, success, err])

print(f"\nRelatório salvo em: {report_path}")
if duckdb_successful:
    print("DuckDB top10 salvo em:", os.path.join(OUTPUT_DIR, "duckdb_top10_by_avg.csv"))
if pandas_successful:
    print("Pandas top10 salvo em:", os.path.join(OUTPUT_DIR, "pandas_top10_by_avg.csv"))

##### 5) Sumário final (apenas execuções bem-sucedidas)
def summarize_times(name: str, times: List[float]):
    if not times:
        return f"{name}: 0 successful runs"
    import statistics
    return (f"{name}: runs={len(times)}, mean={statistics.mean(times):.3f}s, "
            f"median={statistics.median(times):.3f}s, min={min(times):.3f}s, max={max(times):.3f}s")

print("\n=== Sumário de performance (apenas runs bem-sucedidas) ===")
print(summarize_times("DuckDB", duckdb_times))
print(summarize_times("Pandas", pandas_times))

# Nota sobre falhas
failures = []
if not duckdb_successful:
    failures.append("DuckDB failed")
if not pandas_successful:
    failures.append("Pandas failed")
if failures:
    print("\nObservações: Algumas ferramentas falharam. Anotei nas colunas do CSV.")

=== Executando consulta em DuckDB (SQL) ===
DuckDB successful runs: 5 / 5, times: [0.11354209994897246, 0.08971410000231117, 0.08790849999058992, 0.09429989999625832, 0.0890289000235498]

=== Executando consulta em Pandas ===
Pandas successful runs: 5 / 5, times: [1.426865000044927, 1.315702199935913, 1.2890115000773221, 1.2761596000054851, 1.2240158000495285]

=== Comparação DuckDB x Pandas ===
Pulocationid order identical: True
Max abs difference in avg_total_amount: 1.5802470443304628e-11

Relatório salvo em: analysis_results\benchmark_report.csv
DuckDB top10 salvo em: analysis_results\duckdb_top10_by_avg.csv
Pandas top10 salvo em: analysis_results\pandas_top10_by_avg.csv

=== Sumário de performance (apenas runs bem-sucedidas) ===
DuckDB: runs=5, mean=0.095s, median=0.090s, min=0.088s, max=0.114s
Pandas: runs=5, mean=1.306s, median=1.289s, min=1.224s, max=1.427s


### 4.2 Padrões temporais: Agregação por mês/semana (COUNT viagens, SUM receita, AVG distância)

In [None]:

df = pd.read_parquet(PARQUET_PATH, engine="pyarrow")

# ============================================================
# 1️⃣ Verificar colunas necessárias
# ============================================================

required_cols = ['tpep_pickup_datetime', 'trip_distance', 'total_amount']
missing = [c for c in required_cols if c not in df.columns]
if missing:
    raise ValueError(f"Colunas faltando no dataframe: {missing}")

# ============================================================
# 2️⃣ Consulta em DuckDB
# ============================================================
duckdb.register("trips", df)

sql_query = """
SELECT
    DATE_TRUNC('month', tpep_pickup_datetime) AS month,
    DATE_TRUNC('week', tpep_pickup_datetime) AS week,
    COUNT(*) AS total_viagens,
    SUM(total_amount) AS receita_total,
    AVG(trip_distance) AS distancia_media
FROM trips
GROUP BY 1, 2
ORDER BY 1
"""

tempos_duckdb = []
resultado_duckdb = None

for i in range(5):
    t0 = time.time()
    resultado_duckdb = duckdb.sql(sql_query).df()
    t1 = time.time()
    tempos_duckdb.append(t1 - t0)

tempo_medio_duckdb = sum(tempos_duckdb) / len(tempos_duckdb)

print(f"\n=== DuckDB ===")
print(resultado_duckdb.head())
print(f"Tempo médio DuckDB: {tempo_medio_duckdb:.3f}s")

# ============================================================
# 3️⃣ Consulta equivalente em Pandas
# ============================================================
tempos_pandas = []
resultado_pandas = None
pandas_ok = True
erro_pandas = None

try:
    for i in range(5):
        t0 = time.time()
        resultado_pandas = (
            df
            .assign(
                month=df["tpep_pickup_datetime"].dt.to_period("M").dt.to_timestamp(),
                week=df["tpep_pickup_datetime"].dt.to_period("W").dt.start_time
            )
            .groupby(["month", "week"], as_index=False)
            .agg(
                total_viagens=("tpep_pickup_datetime", "count"),
                receita_total=("total_amount", "sum"),
                distancia_media=("trip_distance", "mean")
            )
            .sort_values("month")
        )
        t1 = time.time()
        tempos_pandas.append(t1 - t0)

    tempo_medio_pandas = sum(tempos_pandas) / len(tempos_pandas)

    print(f"\n=== Pandas ===")
    print(resultado_pandas.head())
    print(f"Tempo médio Pandas: {tempo_medio_pandas:.3f}s")

except Exception as e:
    pandas_ok = False
    erro_pandas = str(e)
    print("\n❌ Erro ao executar com Pandas:", erro_pandas)

# ============================================================
# 4️⃣ Validação de consistência
# ============================================================
if pandas_ok:
    try:
        diff_viagens = abs(resultado_duckdb["total_viagens"].sum() - resultado_pandas["total_viagens"].sum())
        diff_receita = abs(resultado_duckdb["receita_total"].sum() - resultado_pandas["receita_total"].sum())

        print("\n=== Validação ===")
        print(f"Diferença total_viagens: {diff_viagens}")
        print(f"Diferença receita_total: {diff_receita}")
    except Exception as e:
        print("⚠️ Não foi possível comparar resultados:", e)

# ============================================================
# 5️⃣ Comparação final
# ============================================================
comparacao = pd.DataFrame([
    {
        "Consulta": "Padrões temporais (mês/semana)",
        "Ferramenta": "DuckDB",
        "Tempo Médio (s)": round(tempo_medio_duckdb, 3),
        "Sucesso": True, #Se chegar aqui vai ter dado certo mesmo 
        
    },
    {
        "Consulta": "Padrões temporais (mês/semana)",
        "Ferramenta": "Pandas",
        "Tempo Médio (s)": round(tempo_medio_pandas, 3) if pandas_ok else None,
        "Sucesso": pandas_ok,
        
    }
])

print("\n=== Comparação Final ===")
print(comparacao)

# (opcional) salvar resultados
comparacao.to_csv("analysis_results/comparacao_duckdb_pandas.csv", index=False)



=== DuckDB ===
       month       week  total_viagens  receita_total  distancia_media
0 2023-01-01 2023-01-30         175517   4.824456e+06         3.333357
1 2023-01-01 2023-01-16         678519   1.854441e+07         3.391944
2 2023-01-01 2023-01-02         610212   1.715389e+07         3.669232
3 2023-01-01 2022-12-26          71176   2.208902e+06         5.188594
4 2023-01-01 2023-01-09         688707   1.881083e+07         3.399218
Tempo médio DuckDB: 0.322s

=== Pandas ===
       month       week  total_viagens  receita_total  distancia_media
0 2023-01-01 2022-12-26          71176     2208901.82         5.188594
1 2023-01-01 2023-01-02         610212    17153888.42         3.669232
2 2023-01-01 2023-01-09         688707    18810826.76         3.399218
3 2023-01-01 2023-01-16         678519    18544410.30         3.391944
4 2023-01-01 2023-01-23         693696    18573693.40         3.236062
Tempo médio Pandas: 16.270s

=== Validação ===
Diferença total_viagens: 0
Diferença recei