In [None]:
# Notebook 2: Gerar Totais por Partição

import os
from typing import List
import pandas as pd
import pyarrow.dataset as ds
import pyarrow as pa
from tqdm import tqdm
from datetime import timedelta

# --------------------------------
# Funções auxiliares
# --------------------------------

def listar_parquets(diretorio: str) -> List[str]:
    """Lista todos os arquivos Parquet dentro de um diretório recursivamente."""
    arquivos = []
    for raiz, _, files in os.walk(diretorio):
        for file in files:
            if file.endswith(".parquet"):
                arquivos.append(os.path.join(raiz, file))
    return arquivos

def ler_parquet_para_dataframe(caminho: str) -> pd.DataFrame:
    """Lê um arquivo Parquet e retorna um DataFrame."""
    dataset = ds.dataset(caminho, format="parquet")
    table = dataset.to_table()
    return table.to_pandas()

def criar_diretorio(caminho: str) -> None:
    """Garante que o diretório existe."""
    os.makedirs(caminho, exist_ok=True)

def salvar_dataframe_particionado(
    df: pd.DataFrame,
    caminho_destino: str,
    particoes: List[str]
) -> None:
    """Salva o DataFrame como Parquet particionado."""
    criar_diretorio(caminho_destino)
    tabela = pa.Table.from_pandas(df, preserve_index=False)
    ds.write_dataset(
        data=tabela,
        base_dir=caminho_destino,
        format="parquet",
        partitioning=particoes,
        existing_data_behavior="overwrite_or_ignore"
    )

# --------------------------------
# Funções específicas de negócios
# --------------------------------

def gerar_tabela_sazonalidade(
    df: pd.DataFrame,
    sazonalidade: str,
    colunas_agrupamento: List[str],
    col_data_inicio: str,
    col_data_fim: str,
    col_contagem: str
) -> pd.DataFrame:
    """Gera tabela de contagens por período sazonal."""
    resultados = []

    if sazonalidade == 'anual':
        ano_min = df[col_data_inicio].min().year
        ano_max = df[col_data_fim].max().year
        
        grupos = df[colunas_agrupamento].drop_duplicates()
        
        for _, grupo in grupos.iterrows():
            filtro = (df[colunas_agrupamento] == grupo.values).all(axis=1)
            df_grupo = df[filtro]
            
            for ano in range(ano_min, ano_max + 1):
                inicio = pd.Timestamp(year=ano, month=1, day=1)
                fim = pd.Timestamp(year=ano + 1, month=1, day=1)

                contagem = df_grupo[
                    (df_grupo[col_data_inicio] < fim) & 
                    (df_grupo[col_data_fim] >= inicio)
                ].shape[0]

                resultado = grupo.to_dict()
                resultado.update({
                    'data_inicio': inicio,
                    'data_fim': fim - timedelta(days=1),
                    'sazonalidade': 'anual',
                    col_contagem: contagem
                })
                resultados.append(resultado)

    elif sazonalidade == 'mensal':
        inicio = df[col_data_inicio].min().replace(day=1)
        fim = df[col_data_fim].max().replace(day=1) + pd.offsets.MonthBegin(1)
        periodos = pd.date_range(start=inicio, end=fim, freq='MS')

        grupos = df[colunas_agrupamento].drop_duplicates()
        
        for _, grupo in grupos.iterrows():
            filtro = (df[colunas_agrupamento] == grupo.values).all(axis=1)
            df_grupo = df[filtro]
            
            for periodo in periodos:
                fim_periodo = periodo + pd.offsets.MonthBegin(1)

                contagem = df_grupo[
                    (df_grupo[col_data_inicio] < fim_periodo) &
                    (df_grupo[col_data_fim] >= periodo)
                ].shape[0]

                resultado = grupo.to_dict()
                resultado.update({
                    'data_inicio': periodo,
                    'data_fim': fim_periodo - timedelta(days=1),
                    'sazonalidade': 'mensal',
                    col_contagem: contagem
                })
                resultados.append(resultado)
    
    else:
        raise ValueError("Sazonalidade deve ser 'anual' ou 'mensal'")

    return pd.DataFrame(resultados)

# --------------------------------
# Função principal do Notebook
# --------------------------------

def gerar_totais_por_particao(
    caminho_base_bruto: str,
    caminho_destino_totais: str,
    colunas_particao: List[str],
    coluna_data_inicio: str,
    coluna_data_fim: str,
    coluna_contagem: str,
    sazonalidade: str
) -> None:
    """Percorre a árvore bruta e gera totais por partição."""
    arquivos = listar_parquets(caminho_base_bruto)

    print(f"Encontrados {len(arquivos)} arquivos Parquet para processar.")

    for arquivo in tqdm(arquivos, desc="Gerando totais"):
        df = ler_parquet_para_dataframe(arquivo)

        if df.empty:
            continue
        
        # Gera os totais por sazonalidade
        df_totais = gerar_tabela_sazonalidade(
            df=df,
            sazonalidade=sazonalidade,
            colunas_agrupamento=colunas_particao,
            col_data_inicio=coluna_data_inicio,
            col_data_fim=coluna_data_fim,
            col_contagem=coluna_contagem
        )

        if df_totais.empty:
            continue

        # Monta o caminho de destino
        caminho_relativo = os.path.relpath(os.path.dirname(arquivo), caminho_base_bruto)
        caminho_final = os.path.join(caminho_destino_totais, caminho_relativo)

        salvar_dataframe_particionado(
            df=df_totais,
            caminho_destino=caminho_final,
            particoes=colunas_particao
        )

# --------------------------------
# Exemplo de uso
# --------------------------------

gerar_totais_por_particao(
    caminho_base_bruto="../../dados/dados_brutos_iceberg",
    caminho_destino_totais="../../dados/totais_iceberg",
    colunas_particao=["uf", "municipio"],
    coluna_data_inicio="data_inicio",
    coluna_data_fim="data_fim",
    coluna_contagem="quantidade_pessoas",
    sazonalidade="anual"  # ou "mensal"
)
