<a href="https://colab.research.google.com/github/WeydisonAndrade/ans-data-pipeline/blob/main/ans_database_pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Extraindo e descompactando os dados

In [16]:
import zipfile
from pathlib import Path

# Caminho onde estão os ZIPs
# Alterado de '/content/.dockerenv' para '/content/' para arquivos enviados pelo usuário.
base_path = Path("/content/")

# Lista apenas os ZIPs esperados
zip_files = [
    base_path / "1T2025.zip",
    base_path / "2T2025.zip",
    base_path / "3T2025.zip"
]

for zip_file in zip_files:
    if not zip_file.exists():
        print(f"Arquivo não encontrado: {zip_file.name}")
        continue

    extract_path = base_path / zip_file.stem  # 1T2025, 2T2025, 3T2025
    extract_path.mkdir(exist_ok=True)

    with zipfile.ZipFile(zip_file, "r") as zip_ref:
        zip_ref.extractall(extract_path)

    print(f"Extração concluída: {zip_file.name}")

Extração concluída: 1T2025.zip
Extração concluída: 2T2025.zip
Extração concluída: 3T2025.zip


In [17]:
for pasta in ["1T2025", "2T2025", "3T2025"]:
    path = base_path / pasta
    print(f"\nConteúdo de {pasta}:")
    for arquivo in path.iterdir():
        print(" -", arquivo.name)



Conteúdo de 1T2025:
 - 1T2025.csv

Conteúdo de 2T2025:
 - 2T2025.csv

Conteúdo de 3T2025:
 - 3T2025.csv


In [None]:
#IMPORTANDO A BIBLIOTECA PANDAS

In [18]:
import pandas as pd
from pathlib import Path


In [None]:
#PROCESSAMENTO DE ARQUIVOS


In [29]:
#Identificando e processando arquivos que contém dados de Despesas com Eventos/Sinistros
import pandas as pd
from pathlib import Path

# Caminho onde estão as pastas extraídas
BASE_PATH = Path("/content/")

# Palavras-chave que indicam despesas / sinistros
COLUMN_KEYWORDS = [
    "valor", "vl_", "despesa", "sinistro", "evento", "custo"
]

def file_has_expense_data(file_path: Path) -> bool:
    """
    Inspeciona o conteúdo do arquivo para verificar se há colunas
    relacionadas a despesas, eventos ou sinistros.
    """
    try:
        if file_path.suffix.lower() in [".csv", ".txt"]:
            df = pd.read_csv(
                file_path,
                sep=None,
                engine="python",
                nrows=5,
                header=None,
                encoding="latin1"
            )

        elif file_path.suffix.lower() == ".xlsx":
            df = pd.read_excel(file_path, nrows=5, header=None)

        else:
            return False

        # Primeira linha como possível header
        possible_header = df.iloc[0].astype(str).str.lower()

        return any(
            any(keyword in col for keyword in COLUMN_KEYWORDS)
            for col in possible_header
        )

    except Exception:
        return False


def read_file_incremental(file_path: Path):
    """
    Lê arquivos de forma incremental quando possível.
    """
    if file_path.suffix.lower() in [".csv", ".txt"]:
        return pd.read_csv(
            file_path,
            sep=None,
            engine="python",
            chunksize=100_000,
            encoding="latin1"
        )

    elif file_path.suffix.lower() == ".xlsx":
        df = pd.read_excel(file_path)
        return [df]

    return []


def normalize_columns(df: pd.DataFrame) -> pd.DataFrame:
    """
    Normaliza nomes de colunas.
    """
    df.columns = (
        df.columns
        .astype(str)
        .str.strip()
        .str.lower()
        .str.replace(" ", "_")
    )
    return df


# ================= PIPELINE =================

processed_chunks = []

for trimestre_folder in BASE_PATH.iterdir():
    if not trimestre_folder.is_dir():
        continue

    for file in trimestre_folder.rglob("*"):
        if not file.is_file():
            continue

        if not file_has_expense_data(file):
            continue

        print(f"✔ Arquivo relevante identificado: {file.name}")

        chunks = read_file_incremental(file)

        for chunk in chunks:
            chunk = normalize_columns(chunk)
            processed_chunks.append(chunk)

# Consolidação final
if processed_chunks:
    df_final = pd.concat(processed_chunks, ignore_index=True)
    print("\nProcessamento concluído com sucesso")
    print(df_final.head())
else:
    print("\nNenhum arquivo com dados de despesas/sinistros foi identificado.")

✔ Arquivo relevante identificado: 3T2025.csv
✔ Arquivo relevante identificado: 2T2025.csv
✔ Arquivo relevante identificado: 1T2025.csv

Processamento concluído com sucesso
         data  reg_ans  cd_conta_contabil  \
0  2025-07-01   344800                 35   
1  2025-07-01   344800                351   
2  2025-07-01   344800               3511   
3  2025-07-01   344800              35111   
4  2025-07-01   344800             351119   

                                           descricao vl_saldo_inicial  \
0                               RECEITAS FINANCEIRAS       3094590,67   
1               RECEITAS DE APLICAÃÃES FINANCEIRAS       1222303,16   
2       RECEITAS COM TÃTULOS DE RENDA FIXA PRIVADOS       1222303,16   
3  RECEITAS COM DEPÃSITOS BANCÃRIOS A PRAZO - C...                0   
4  Receitas com DepÃ³sitos BancÃ¡rios a Prazo - C...                0   

  vl_saldo_final  
0     4212815,67  
1     1953149,18  
2     1953149,18  
3              0  
4              0  


In [31]:
#Corrigido, limpando, filtrando e organizando
df_final["descricao"] = (
    df_final["descricao"]
    .astype(str)
    .str.encode("latin1")
    .str.decode("utf-8", errors="ignore")
)

for col in ["vl_saldo_inicial", "vl_saldo_final"]:
    df_final[col] = (
        df_final[col]
        .astype(str)
        .str.replace(".", "", regex=False)
        .str.replace(",", ".", regex=False)
        .astype(float)
    )



In [32]:
#Filtragem de despesas

KEYWORDS_DESPESA = [
    "despesa",
    "sinistro",
    "evento",
    "indeniza",
    "assist",
    "custo"
]

mask = df_final["descricao"].str.lower().apply(
    lambda x: any(k in x for k in KEYWORDS_DESPESA)
)

df_despesas = df_final[mask].copy()


In [33]:
#Validação
print(f"Total de registros: {len(df_final)}")
print(f"Registros de despesas/sinistros: {len(df_despesas)}")

df_despesas.head()


Total de registros: 2113924
Registros de despesas/sinistros: 734261


Unnamed: 0,data,reg_ans,cd_conta_contabil,descricao,vl_saldo_inicial,vl_saldo_final
63,2025-07-01,344800,352,RECEITAS FINANCEIRAS COM OPERAES DE ASSISTNCIA...,510622.42,822366.99
90,2025-07-01,344800,35231,RECEITAS COM DEPSITOS DE CONTRAPRESTAES DE ASS...,0.0,0.0
91,2025-07-01,344800,352319,Receitas com Depsitos de Contraprestaes de Ass...,0.0,0.0
92,2025-07-01,344800,35231901,Receitas com Depsitos de Contraprestaes de Ass...,0.0,0.0
95,2025-07-01,344800,35232,RECEITAS COM DEPSITOS DE CONTRAPRESTAES DE ASS...,0.0,0.0


In [35]:
#Selecionando e padronizando colunas
import pandas as pd
from pathlib import Path
import zipfile

# Copiar para não alterar o dataframe original
df = df_despesas.copy()

# Padronizar nomes de colunas
df = df.rename(columns={
    "cnpj": "CNPJ",
    "razao_social": "RazaoSocial",
    "vl_saldo_final": "ValorDespesas",
    "data": "Data"
})


In [36]:
#Trimestre e ano, datas inconsistentes
# Converter datas com segurança
df["Data"] = pd.to_datetime(df["Data"], errors="coerce")

# Extrair ano e trimestre
df["Ano"] = df["Data"].dt.year
df["Trimestre"] = df["Data"].dt.to_period("Q").astype(str)


In [37]:
#Tratando valores zerados
# Criar flag de valor suspeito
df["ValorSuspeito"] = df["ValorDespesas"] <= 0

# Manter os registros, mas marcar
df_validos = df[df["ValorDespesas"] > 0]


In [40]:
#Resolvendo CNPJs duplicadas com razões sociais diferentes

# Verifica se as colunas 'CNPJ' e 'RazaoSocial' existem no DataFrame
if "CNPJ" in df_validos.columns and "RazaoSocial" in df_validos.columns:
    # Para cada CNPJ, manter a Razão Social mais frequente
    razao_padrao = (
        df_validos
        .groupby("CNPJ")["RazaoSocial"]
        .agg(lambda x: x.value_counts().idxmax())
    )

    df_validos["RazaoSocial"] = df_validos["CNPJ"].map(razao_padrao)
    print("CNPJs duplicados com razões sociais diferentes foram tratados.")
else:
    print("As colunas 'CNPJ' ou 'RazaoSocial' não foram encontradas. Não foi possível resolver CNPJs duplicados.")

As colunas 'CNPJ' ou 'RazaoSocial' não foram encontradas. Não foi possível resolver CNPJs duplicados.


In [42]:
#Selecionando colunas finais
# As colunas 'CNPJ' e 'RazaoSocial' não foram encontradas nos dados processados.
# Selecionando apenas as colunas disponíveis para continuar o processamento.
df_final = df_validos[[
    # "CNPJ", # Coluna não encontrada nos dados originais
    # "RazaoSocial", # Coluna não encontrada nos dados originais
    "Trimestre",
    "Ano",
    "ValorDespesas"
]].copy()

df_final.head()

Unnamed: 0,Trimestre,Ano,ValorDespesas
63,2025Q3,2025,822367.0
177,2025Q3,2025,292715600.0
178,2025Q3,2025,230068800.0
179,2025Q3,2025,231556200.0
180,2025Q3,2025,228901600.0


In [43]:
#Gerando CSV consolidado
output_dir = Path("/content/output")
output_dir.mkdir(exist_ok=True)

csv_path = output_dir / "consolidado_despesas.csv"

df_final.to_csv(csv_path, index=False, encoding="utf-8")


In [44]:
#Compactando CSV em Zip
zip_path = output_dir / "consolidado_despesas.zip"

with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zipf:
    zipf.write(csv_path, arcname="consolidado_despesas.csv")

zip_path


PosixPath('/content/output/consolidado_despesas.zip')