In [0]:
import time
import pandas as pd
import re
from pyspark.sql import SparkSession

def extract_accident_type(desc):
    """Extrai tipo de acidente da descrição via regex."""
    match = re.search(r"(colisão|capotamento|atropelamento|batida)",
                      str(desc), re.IGNORECASE)
    return match.group(1).lower() if match else "outros"




In [0]:
input_path   = "/FileStore/tables/US_Accidents_March23_reduzido-2.csv"
fractions    = [0.1, 0.25, 0.5, 0.75, 1.0]
threads      = 1
output_path  = "/dbfs/FileStore/tables/metrics_sequencial.csv"

In [0]:
spark = SparkSession.builder.getOrCreate()
df_spark = (spark.read
                .option("header", "true")
                .option("inferSchema", "true")
                .csv(input_path))
df = df_spark.toPandas()

In [0]:
def process_sample(df, fraction, threads):
    if fraction < 1.0:
        sample_df = df.sample(frac=fraction, random_state=42)
    else:
        sample_df = df.copy()

    metrics = {
        "threads": threads,
        "fraction": fraction,
        "n_rows": len(sample_df),
        "n_cols": df.shape[1],
    }
    total_start = time.time()

    # 1) Acidentes por estado
    t0 = time.time()
    _ = (sample_df.groupby("State")["ID"]
             .count().reset_index().rename(columns={"ID": "Qtd_Acidentes"}))
    metrics["t_acidentes_estado"] = time.time() - t0

    # 2) Clima grave (Severity >= 4)
    t0 = time.time()
    _ = (sample_df[sample_df["Severity"] >= 4]
             .groupby("Weather_Condition")["ID"]
             .count().reset_index().rename(columns={"ID": "Qtd_Grave"}))
    metrics["t_clima_grave"] = time.time() - t0

    # 3) Severidade média por hora
    t0 = time.time()
    sample_df["hora"] = pd.to_datetime(sample_df["Start_Time"]).dt.hour
    _ = (sample_df.groupby("hora")["Severity"]
             .mean().reset_index().rename(columns={"Severity": "Media_Severidade"}))
    metrics["t_severidade_hora"] = time.time() - t0

    # 4) Condições da via
    t0 = time.time()
    qtd_cruz = int(sample_df["Crossing"].sum())
    qtd_sinal = int(sample_df["Traffic_Signal"].sum())
    _ = pd.DataFrame([{"Qtd_Cruzamentos": qtd_cruz, "Qtd_Sinais": qtd_sinal}])
    metrics["t_condicoes_via"] = time.time() - t0

    # 5) Tipo de acidente via regex
    t0 = time.time()
    sample_df["tipo_acidente"] = sample_df["Description"].apply(extract_accident_type)
    _ = (sample_df["tipo_acidente"]
             .value_counts().reset_index()
             .rename(columns={"index": "Tipo", "tipo_acidente": "Qtd"}))
    metrics["t_tipo_acidente"] = time.time() - t0

    # Tempo total
    metrics["t_total"] = time.time() - total_start
    return metrics

In [0]:
all_metrics = []  # inicializa a lista de métricas

for frac in fractions:
    m = process_sample(df, frac, threads)
    all_metrics.append(m)

metrics_df = pd.DataFrame(all_metrics)

In [0]:
import os

# Garante que a pasta exista no driver (fuse mount do DBFS)
os.makedirs('/dbfs/FileStore/tables', exist_ok=True)

# Agora salva sem erro
metrics_df.to_csv(output_path, index=False)

# Exibe a tabela no notebook
display(metrics_df)  
print(f"Métricas salvas em: {output_path}")

threads,fraction,n_rows,n_cols,t_acidentes_estado,t_clima_grave,t_severidade_hora,t_condicoes_via,t_tipo_acidente,t_total
1,0.1,472839,46,0.1908490657806396,0.061612844467163,0.0909464359283447,0.0276031494140625,2.5941083431243896,2.965127229690552
1,0.25,1182098,46,0.2960846424102783,0.0629291534423828,0.1442503929138183,0.0044732093811035,6.214601278305054,6.722347497940064
1,0.5,2364197,46,0.5928852558135986,0.1187303066253662,1.6477365493774414,0.0058743953704833,12.73218274116516,15.097417831420898
1,0.75,3546296,46,0.9238123893737792,0.1593358516693115,0.2836236953735351,0.0089216232299804,19.152840614318848,20.52854323387146
1,1.0,4728394,46,0.8006985187530518,0.2281477451324463,0.5265469551086426,0.0178959369659423,24.81726622581482,26.390568017959595


Métricas salvas em: /dbfs/FileStore/tables/metrics_sequencial.csv
