# Imports

In [40]:
import os
import re
from typing import List, Dict
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin
import zipfile
from pathlib import Path
import pandas as pd
import numpy as np

# Acesso a API
- Identificação e download dos arquivos ZIP selecionados 
- Extração dos ZIPs 
- Identificação dos arquivos com Despesas com Eventos/Sinistros

### Identificação e download dos arquivos ZIP selecionados 

In [2]:
def get_years_from_home_page(URL: str, session: requests.Session) -> List[str]:
    """
    Obtém a lista de anos disponíveis na página base.
    """
    resp = session.get(URL, timeout=30)
    resp.raise_for_status()

    soup = BeautifulSoup(resp.text, "html.parser")

    padrao_ano = re.compile(r"^(20\d{2})/$")

    anos = []
    for link in soup.find_all("a"):
        href = link.get("href", "")
        match = padrao_ano.match(href)
        if match:
            anos.append(match.group(1))

    if not anos:
        raise RuntimeError("Nenhum ano encontrado no diretório base.")

    return anos

def get_trimesters_from_page(URL: str, ano: str, session: requests.Session) -> List[Dict[str, any]]:
    """
    Obtém os 3 ultimos trimestres disponível para o ano fornecido.
    """
    url_ano = urljoin(URL, f"{ano}/")
    print(f"\nProcessando ano {ano}...")
    resp_ano = session.get(url_ano, timeout=30)
    if resp_ano.status_code != 200:
        print(f"Falha ao acessar {url_ano}")
        return []

    soup_ano = BeautifulSoup(resp_ano.text, "html.parser")
    arquivos = []

    # Para cada ZIP
    for link in soup_ano.find_all("a"):
        href = link.get("href", "")
            
        if not href.lower().endswith(".zip"):
                continue
            
        trimestre = extrair_trimestres(href)

        arquivos.append({
            "ano": ano,
            "trimestre": trimestre,
            "nome": href,
            "url": urljoin(url_ano, href)
        })

    if not arquivos:
        print(f"Nenhum ZIP encontrado para {ano}")
        return []

    # Selecionar os 3 ultimos semestres
    arquivos.sort(key=lambda x: x["trimestre"], reverse=True)
    return arquivos[:3]

def extrair_trimestres(nome_arquivo: str) -> str:
    """
    Extrai os trimestre dos nomes de arquivos ZIP.
    """
    nome = nome_arquivo.lower()

    # Encontrar trimestre no nome
    padroes_trimestre = [
        r'([1-4])\s*t',              # 1T, 1t
        r'([1-4])\s*trim',           # 1trim
        r'([1-4])\s*trimestre',      # 1trimestre
        r'([1-4])\s*[-_ ]\s*trim',   # 1-trim
    ]

    trimestre = None
    for padrao in padroes_trimestre:
        match = re.search(padrao, nome)
        if match:
            trimestre = int(match.group(1))
            break

    if trimestre is None:
        raise ValueError(f"Trimestre não encontrado no nome do arquivo: {nome_arquivo}")

    return trimestre

def download_trimesters(pasta_destino: str, session: requests.Session, ano: str, trimestres: List[Dict[str, any]]) -> None:
    pasta_ano = os.path.join(pasta_destino, ano)
    os.makedirs(pasta_ano, exist_ok=True)

    for arq in trimestres:
        destino = os.path.join(pasta_ano, arq["nome"])

        if os.path.exists(destino):
            print(f"Já existe: {arq['nome']}")
            continue

        print(f"Baixando: {arq['nome']}")

        with session.get(arq["url"], stream=True, timeout=60) as r:
            r.raise_for_status()
            with open(destino, "wb") as f:
                for chunk in r.iter_content(chunk_size=8192):
                    if chunk:
                        f.write(chunk)

def baixar_ultimos_3_trimestres(pasta_destino: str):
    URL = "https://dadosabertos.ans.gov.br/FTP/PDA/demonstracoes_contabeis/"
    os.makedirs(pasta_destino, exist_ok=True)

    session = requests.Session()
    session.headers.update({"User-Agent": "ANS-Crawler/1.0"})

    # 1- Acessar lista de anos
    anos = get_years_from_home_page(URL, session)

    # 2- Para cada ano obter trimestres
    for ano in anos:
        trimestres = get_trimesters_from_page(URL, ano, session)

        # 3- Baixar os trimestres selecionados
        download_trimesters(pasta_destino, session, ano, trimestres)

In [None]:
baixar_ultimos_3_trimestres(pasta_destino= "downloads")

### Extração dos ZIPs

In [4]:
def extrair_zip(caminho_zip: str, pasta_destino: str):
    """ 
    Extrai um aquivo ZIP para uma pasta destino
    """
    pasta_destino = Path(pasta_destino)
    pasta_destino.mkdir(parents=True, exist_ok=True)

    with zipfile.ZipFile(caminho_zip, "r") as zip_ref:
        zip_ref.extractall(pasta_destino)

def extrair_todos_os_zips_da_pasta(pasta_origem: str, pasta_destino: str):
    """ 
    Extrai todos os ZIPs de uma pasta e coloca os arq extraidos para a pasta detino
    """
    pasta_origem = Path(pasta_origem)
    pasta_destino = Path(pasta_destino)

    print(f"Extraindo pasta {pasta_origem}")

    pasta_destino.mkdir(parents=True, exist_ok=True)

    for zip_path in pasta_origem.rglob("*.zip"):
        with zipfile.ZipFile(zip_path, "r") as zip_ref:
            zip_ref.extractall(pasta_destino)

In [None]:
for i in range(2007, 2026):
    extrair_todos_os_zips_da_pasta(f"downloads/{i}/", f"downloads/{i}/extraido")

# extrair_todos_os_zips_da_pasta(f"downloads/2025/", f"downloads/2025/extraido")

### Identificação dos arquivos com "Despesas com Eventos/Sinistros" 
- Incrementalmente - Economiza memória e caso a ocorrência desejada seja encontrada nos primeiros chunks, o processamento é interrompido.
- FILTRO: Textos que começam com ‘despesas com eventos / sinistros’, independentemente da quantidade de espaços entre as palavras, e podendo ter qualquer texto depois.
- Para os anos de 2010, 2011, 2012, 2013, 2014, 2015, 2016 não foram encontradas ocorrencias de "Despesas com Eventos/Sinistros"

In [8]:
def csv_contem(csv_path: str, coluna: str, texto_procurado: str, chunksize: int =100_000) -> bool:
    """
    Verifica se o csv contem o texto na coluna especificada
    """

    encontrou = False

    for chunk in pd.read_csv(
    csv_path,
    sep=';',
    dtype=str,
    chunksize=chunksize,
    encoding='latin1'
    ):
        if chunk[coluna].str.contains(texto_procurado, case=False, na=False).any():
            encontrou = True
            break
    
    return encontrou

In [9]:
dict = {}
for i in range(2007, 2026):
        pasta_origem = Path(f"downloads/{i}/extraido/")
        dict[i] = []
        for csv_path in pasta_origem.glob("*.csv"):
                tem = csv_contem(csv_path, 'DESCRICAO', r'^despesas\s+com\s+eventos\s*/\s*sinistros.*')
                dict[i].append(tem)

print(dict)

{2007: [True, True, True], 2008: [True, True, True], 2009: [True, True, True], 2010: [False, False, False], 2011: [False, False, False], 2012: [False, False, False], 2013: [False, False, False], 2014: [False, False, False], 2015: [False, False, False], 2016: [False, False, False], 2017: [True, True, True], 2018: [True, True, True], 2019: [True, True, True], 2020: [True, True, True], 2021: [True, True, True], 2022: [True, True, True], 2023: [True, True, True], 2024: [True, True, True], 2025: [True, True, True]}


### Consolidação CSV - 3 Trimestres
- CNPJs duplicados com razões sociais diferentes -> NÃO EXISTE
- Tem cnpj com razao duplicado apenas no Merge com o relatório, pois existem contas contabeis diferentes, que não foram trazidas para o df final
- ValorDespesas ->
    - Tem a coluna VL_SALDO_INICIAL: If (df['VL_SALDO_FINAL'] > df['VL_SALDO_INICIAL']) ? 'VL_SALDO_FINAL' - 'VL_SALDO_INICIAL' : 0
    - Não tem coluna VL_SALDO_INICIAL: If (df['VL_SALDO_FINAL'] > 0) ? 'VL_SALDO_FINAL' : 0
- Data formatada com o pandas = CORRIGIDO

- Merge/Join com left, havendo registros que tem REG_ANS, mas não tem REGISTRO_OPERADORA no relatório
- PQ O LEFT?

#### Prova que no relatorio, não existe CNPJ com mais de uma Razao Social

In [10]:
# b = pd.read_csv(f"/Relatorio_cadop.csv", sep=';', dtype=str, encoding='latin1', usecols=["REGISTRO_OPERADORA", "CNPJ", "Razao_Social"])
# eh_1_para_1 = (b.groupby('CNPJ')['Razao_Social'].nunique() == 1).all()
# print(eh_1_para_1)

#### Consolidação

In [45]:
def baixar_relatorio_ans(pasta_destino: Path=None) -> pd.DataFrame:
    """
    Baixa o relatório de operadoras de planos de saúde ativas na ANS e exporta como df
    """
    url = "https://dadosabertos.ans.gov.br/FTP/PDA/operadoras_de_plano_de_saude_ativas/Relatorio_cadop.csv"
    nome_arquivo = "Relatorio_cadop.csv"

    # Diretório do arquivo .py que está rodando
    base_dir = os.getcwd()

    # Se não passar pasta_destino, usa o diretório do script
    if pasta_destino is None:
        pasta_destino = base_dir
    else:
        os.makedirs(pasta_destino, exist_ok=True)

    caminho_arquivo = os.path.join(pasta_destino, nome_arquivo)

    # Verifica se o arquivo não existe 
    if os.path.exists(caminho_arquivo) == False:
        print("BAIXOU RELATORIO")
        with requests.Session() as session:
            response = session.get(url, timeout=60)
            response.raise_for_status()  
        with open(caminho_arquivo, "wb") as f:
            f.write(response.content)

    return pd.read_csv(f"{pasta_destino}/Relatorio_cadop.csv", sep=';', dtype=str, encoding='latin1', usecols=["REGISTRO_OPERADORA", "CNPJ", "Razao_Social"])
def processar_csvs(pasta_origem: Path) -> pd.DataFrame:
    """
    Processa todos os CSVs da pasta de um ano e retorna um DataFrame normalizado com: REG_ANS | Ano | Trimestre | ValorDespesas
    """
    dfs = []
    # Para cada CSV na pasta
    for csv_path in pasta_origem.glob("*.csv"):
        df = pd.read_csv(
            csv_path,
            sep=';',
            dtype=str,
            encoding='latin1'
        )
        # Normalização
        df.columns = [c.upper().strip() for c in df.columns]
        df['DESCRICAO'] = df['DESCRICAO'].str.lower().str.strip()
        df['DATA'] = pd.to_datetime(df['DATA'], errors='coerce') # -> Consistencia para as datas

        # Valor numérico STR -> FLOAT
        df['VL_SALDO_FINAL'] = (
            df['VL_SALDO_FINAL']
            .str.replace('.', '', regex=False)
            .str.replace(',', '.', regex=False)
            .astype(float)
        )


        if 'VL_SALDO_INICIAL' in df.columns:

            df['VL_SALDO_INICIAL'] = (
            df['VL_SALDO_INICIAL']
            .str.replace('.', '', regex=False)
            .str.replace(',', '.', regex=False)
            .astype(float)
            )

            df['ValorDespesas'] = np.where(
                df['VL_SALDO_FINAL'] > df['VL_SALDO_INICIAL'],
                df['VL_SALDO_FINAL'] - df['VL_SALDO_INICIAL'],
                0.0
            )
        else:
            df['ValorDespesas'] = np.where(
                df['VL_SALDO_FINAL'] > 0,
                df['VL_SALDO_FINAL'],
                0.0
            )

            
        df_normalizado = df[['REG_ANS', 'ValorDespesas']].copy()
        df_normalizado['Ano'] = df['DATA'].dt.year
        df_normalizado['Trimestre'] = df['DATA'].dt.quarter

        dfs.append(df_normalizado)

    if not dfs:
        return pd.DataFrame(
            columns=['REG_ANS', 'Ano', 'Trimestre', 'ValorDespesas']
        )

    df_ano = pd.concat(dfs, ignore_index=True)
    return df_ano
def consolidar_despesas(pasta_origem:str, pasta_destino:str=''):
    """
    Baixa o relatório da ANS consolida os CSVs de uma pasta e salva em uma pasta destino o csv e o zip com os dados consolidados
    """
    pasta_origem = Path(pasta_origem)
    pasta_destino = Path(pasta_destino)
    pasta_destino.mkdir(parents=True, exist_ok=True)

    relatorio = baixar_relatorio_ans()
    df_ano = processar_csvs(pasta_origem)
    df_final = df_ano.merge(
    relatorio,
    left_on="REG_ANS",
    right_on="REGISTRO_OPERADORA",
    how="left"   
    ).drop(columns=["REGISTRO_OPERADORA", "REG_ANS"])

    df_final.to_csv(pasta_destino / "consolidado_despesas.csv", encoding='iso-8859-1', index=False)

    with zipfile.ZipFile(pasta_destino / "consolidado_despesas.zip", "w", zipfile.ZIP_DEFLATED) as zipf:
        zipf.write(pasta_destino / "consolidado_despesas.csv", arcname="consolidado_despesas.csv")

In [46]:
# consolidar_despesas(f"downloads/2007/extraido", f"downloads/2007/extraido/consolidado")
for i in range(2007, 2026):
    consolidar_despesas(f"downloads/{i}/extraido", f"downloads/{i}/extraido/consolidado")

In [None]:
pasta_origem = Path("downloads/2025/extraido")
a = processar_csvs(pasta_origem)
a.head()

Unnamed: 0,REG_ANS,ValorDespesas,Ano,Trimestre
0,316458,1070.0,2025,1
1,316458,1070.0,2025,1
2,316458,1070.0,2025,1
3,316458,1070.0,2025,1
4,316458,99024.72,2025,1


In [None]:
teste = pd.read_csv(f"downloads/2025/extraido/consolidado/consolidado_despesas.csv", encoding="utf-8")
print(teste.info())
print(teste.isna().sum())

<class 'pandas.DataFrame'>
RangeIndex: 2113924 entries, 0 to 2113923
Data columns (total 5 columns):
 #   Column         Dtype  
---  ------         -----  
 0   ValorDespesas  float64
 1   Ano            int64  
 2   Trimestre      int64  
 3   CNPJ           float64
 4   Razao_Social   str    
dtypes: float64(2), int64(2), str(1)
memory usage: 80.6 MB
None
ValorDespesas        0
Ano                  0
Trimestre            0
CNPJ             18739
Razao_Social     18739
dtype: int64
