In [11]:
from urllib.request import urlopen, urlretrieve
from bs4 import BeautifulSoup
from datetime import datetime
import zipfile
import os
import numpy as np
import glob
import pandas as pd
import re


BASE_URL = "https://dadosabertos.ans.gov.br/FTP/PDA/demonstracoes_contabeis/"
DATA_DIR = "data"

html = urlopen(BASE_URL).read()
soup = BeautifulSoup(html, "html.parser")

ano_atual = datetime.now().year
anos = []

for link in soup.find_all("a"):
    href = link.get("href")
    if href and href.endswith("/") and href[:4].isdigit():
        ano = int(href[:4])
        if ano <= ano_atual:
            anos.append(ano)

ano_mais_recente = max(anos)
URL_ANO = f"{BASE_URL}{ano_mais_recente}/"

print(f"Ano mais recente: {ano_mais_recente}")
print(f"URL selecionada: {URL_ANO}")

os.makedirs(DATA_DIR, exist_ok=True)

ANO_DIR = os.path.join(DATA_DIR, str(ano_mais_recente))
os.makedirs(ANO_DIR, exist_ok=True)

html_ano = urlopen(URL_ANO).read()
soup_ano = BeautifulSoup(html_ano, "html.parser")

zips = []
for link in soup_ano.find_all("a"):
    href = link.get("href")
    if href and href.lower().endswith(".zip"):
        zips.append(href)

print("\nZIPs encontrados:")
for z in zips:
    print(" -", z)

for z in zips:
    zip_url = URL_ANO + z
    zip_path = os.path.join(ANO_DIR, z)
    extract_folder = os.path.join(ANO_DIR, z.replace('.zip', ''))

    if not os.path.exists(zip_path):
        print(f"\nBaixando: {z}")
        urlretrieve(zip_url, zip_path)
    else:
        print(f"\nArquivo ZIP já existe: {z}")

    if not os.path.exists(extract_folder):
        print(f"Extraindo: {z} para {extract_folder}")
        with zipfile.ZipFile(zip_path, "r") as zip_ref:
            zip_ref.extractall(extract_folder)
    else:
        print(f"Conteúdo de {z} já foi extraído anteriormente.")

Ano mais recente: 2025
URL selecionada: https://dadosabertos.ans.gov.br/FTP/PDA/demonstracoes_contabeis/2025/

ZIPs encontrados:
 - 1T2025.zip
 - 2T2025.zip
 - 3T2025.zip

Arquivo ZIP já existe: 1T2025.zip
Conteúdo de 1T2025.zip já foi extraído anteriormente.

Arquivo ZIP já existe: 2T2025.zip
Conteúdo de 2T2025.zip já foi extraído anteriormente.

Arquivo ZIP já existe: 3T2025.zip
Conteúdo de 3T2025.zip já foi extraído anteriormente.


In [12]:
print("Validando arquivos extraídos...")
pastas_extracao = glob.glob(os.path.join(ANO_DIR, "*/"))

for pasta in pastas_extracao:
    csvs = glob.glob(os.path.join(pasta, "*.csv"))
    if len(csvs) > 0:
        print(f"Sucesso: {os.path.basename(os.path.normpath(pasta))} contém {len(csvs)} arquivo(s) CSV.")
    else:
        print(f"Erro: A pasta {pasta} está vazia ou não contém CSVs.")

Validando arquivos extraídos...
Sucesso: 1T2025 contém 1 arquivo(s) CSV.
Sucesso: 2T2025 contém 1 arquivo(s) CSV.
Sucesso: 3T2025 contém 1 arquivo(s) CSV.


In [13]:
print("Verificando encoding dos arquivos...")
for pasta in pastas_extracao:
    csv_file = glob.glob(os.path.join(pasta, "*.csv"))[0]
    try:
        
        pd.read_csv(csv_file, sep=None, engine='python', nrows=5, encoding='utf-8')
        print(f"✅ {os.path.basename(csv_file)}: UTF-8")
    except:
        try:
        
            pd.read_csv(csv_file, sep=None, engine='python', nrows=5, encoding='latin-1')
            print(f"{os.path.basename(csv_file)}: ISO-8859-1 (Latin-1)")
        except Exception as e:
            print(f"Erro ao detectar encoding de {csv_file}: {e}")

Verificando encoding dos arquivos...
✅ 1T2025.csv: UTF-8
✅ 2T2025.csv: UTF-8
✅ 3T2025.csv: UTF-8


In [14]:
lista_para_merge = []
termo_gatilho = "Despesas com Eventos/Sinistros"

caminhos_csv = glob.glob(os.path.join(ANO_DIR, "**/*.csv"), recursive=True)

for caminho in caminhos_csv:
    if "output" in caminho:
        continue
        
    nome = os.path.basename(caminho)
    df_individual = pd.read_csv(caminho, sep=';', encoding='latin-1', low_memory=False)
    
    if df_individual['DESCRICAO'].astype(str).str.contains(termo_gatilho, case=False).any():
        print(f"Incluido: {nome}")
        lista_para_merge.append(df_individual)
    else:
        print(f"Ignorado: {nome}")

if lista_para_merge:
    df_consolidado = pd.concat(lista_para_merge, ignore_index=True)
    print(f"Linhas no merge: {len(df_consolidado)}")
else:
    df_consolidado = pd.DataFrame()
    print("Nenhum arquivo validado.")

Incluido: 1T2025.csv
Incluido: 2T2025.csv
Incluido: 3T2025.csv
Linhas no merge: 2113924


In [15]:
BASE_OUTPUT = "output"
SUBPASTA_CONSOLIDADO = os.path.join(BASE_OUTPUT, "massa_dados_trimestrais")
os.makedirs(SUBPASTA_CONSOLIDADO, exist_ok=True)

final_csv = os.path.join(SUBPASTA_CONSOLIDADO, "dados_contabeis_unificados.csv")
final_zip = os.path.join(SUBPASTA_CONSOLIDADO, "dados_contabeis_unificados.zip")

if os.path.exists(final_csv) and os.path.exists(final_zip):
    print("Arquivos ja existem. Pulando processamento.")
else:
    if not df_consolidado.empty:

        df_consolidado['CD_CONTA_CONTABIL'] = df_consolidado['CD_CONTA_CONTABIL'].astype(str)
        

        df_consolidado = df_consolidado[df_consolidado['CD_CONTA_CONTABIL'].str.startswith('4')].copy()

        df_consolidado['DATA'] = pd.to_datetime(df_consolidado['DATA'], errors='coerce')
        df_consolidado['REG_ANS'] = df_consolidado['REG_ANS'].astype(str).str.strip()
        
        for col in ['VL_SALDO_INICIAL', 'VL_SALDO_FINAL']:
            if df_consolidado[col].dtype == 'object':
                df_consolidado[col] = df_consolidado[col].astype(str).str.replace(',', '.')
            df_consolidado[col] = pd.to_numeric(df_consolidado[col], errors='coerce').fillna(0).astype(float)

        df_consolidado['DESCRICAO'] = df_consolidado['DESCRICAO'].fillna('NAO INFORMADO').str.strip().str.upper()
        
        df_consolidado.to_csv(final_csv, sep=';', index=False, encoding='utf-8')
        df_consolidado.to_csv(final_zip, sep=';', index=False, encoding='utf-8', compression={'method': 'zip', 'archive_name': 'dados_contabeis_unificados.csv'})
        print(f"Arquivos gerados em: {SUBPASTA_CONSOLIDADO}")
    else:
        print("DataFrame vazio.")

Arquivos ja existem. Pulando processamento.


In [16]:
SUBPASTA_CADASTRO = os.path.join("output", "dados_cadastrais")
os.makedirs(SUBPASTA_CADASTRO, exist_ok=True)

BASE_URL_CADASTRO = "https://dadosabertos.ans.gov.br/FTP/PDA/operadoras_de_plano_de_saude_ativas/"
html_cad = urlopen(BASE_URL_CADASTRO).read()
soup_cad = BeautifulSoup(html_cad, "html.parser")

arquivo_csv = ""
for link in soup_cad.find_all("a"):
    href = link.get("href")
    if href and href.endswith(".csv"):
        arquivo_csv = href
        break

if arquivo_csv:
    url_final_cadastro = BASE_URL_CADASTRO + arquivo_csv
    caminho_destino = os.path.join(SUBPASTA_CADASTRO, arquivo_csv)
    
    if not os.path.exists(caminho_destino):
        print(f"Baixando: {arquivo_csv}")
        urlretrieve(url_final_cadastro, caminho_destino)
    else:
        print(f"Arquivo {arquivo_csv} ja existe.")
    
    df_cadastro = pd.read_csv(caminho_destino, sep=';', encoding='latin-1', dtype={'CNPJ': str, 'Registro_ANS': str})

Arquivo Relatorio_cadop.csv ja existe.


In [17]:
df_contabil_validado = df_consolidado.copy()

df_contabil_validado['VL_SALDO_FINAL'] = pd.to_numeric(
    df_contabil_validado['VL_SALDO_FINAL'].astype(str).str.replace(',', '.'), 
    errors='coerce'
)

df_contabil_validado['valor_negativo'] = (df_contabil_validado['VL_SALDO_FINAL'] < 0).astype(int)
df_contabil_validado['valor_zero'] = (df_contabil_validado['VL_SALDO_FINAL'] == 0).astype(int)
df_contabil_validado['valor_nulo'] = df_contabil_validado['VL_SALDO_FINAL'].isna().astype(int)

df_contabil_validado['valor_suspeito'] = np.where(
    df_contabil_validado['valor_nulo'] | df_contabil_validado['valor_negativo'] | df_contabil_validado['valor_zero'], 
    1, 
    0
)

df_contabil_validado['DATA_CONVERTIDA'] = pd.to_datetime(
    df_contabil_validado['DATA'], 
    format='%Y-%m-%d', 
    errors='coerce'
)

df_contabil_validado['data_inconsistente'] = df_contabil_validado['DATA_CONVERTIDA'].isna().astype(int)
df_contabil_validado['Ano'] = df_contabil_validado['DATA_CONVERTIDA'].dt.year
df_contabil_validado['Trimestre'] = df_contabil_validado['DATA_CONVERTIDA'].dt.quarter

print(f"Relatório de Inconsistências (Tarefa 1.3):")
print(f"- Datas fora do padrão YYYY-MM-DD: {df_contabil_validado['data_inconsistente'].sum()}")
print(f"- Valores estritamente negativos: {df_contabil_validado['valor_negativo'].sum()}")
print(f"- Valores zerados: {df_contabil_validado['valor_zero'].sum()}")
print(f"- Valores não numéricos (NaN): {df_contabil_validado['valor_nulo'].sum()}")
print(f"- Total de valores suspeitos: {df_contabil_validado['valor_suspeito'].sum()}")

Relatório de Inconsistências (Tarefa 1.3):
- Datas fora do padrão YYYY-MM-DD: 0
- Valores estritamente negativos: 163802
- Valores zerados: 688365
- Valores não numéricos (NaN): 0
- Total de valores suspeitos: 852167


In [18]:
df_cadastro['casos_suspeitos'] = df_cadastro.groupby('CNPJ')['Razao_Social'].transform('nunique')
df_cadastro['casos_suspeitos'] = (df_cadastro['casos_suspeitos'] > 1).astype(int)

df_conflitos = df_cadastro[df_cadastro['casos_suspeitos'] == 1].groupby('CNPJ')['Razao_Social'].unique().reset_index()

if not df_conflitos.empty:
    print(f"Total de CNPJs com duplicidade de Razao Social: {len(df_conflitos)}")
    print("\nExemplos de conflitos (Top 10):")
    print(df_conflitos.head(10).to_string(index=False))
else:
    print("Nenhuma inconsistencia de CNPJ encontrada.")

Nenhuma inconsistencia de CNPJ encontrada.


In [19]:
output_dir = 'output/consolidado'
csv_path = os.path.join(output_dir, 'consolidado_despesas.csv')
zip_path = os.path.join(output_dir, 'consolidado_despesas.zip')

if os.path.exists(csv_path) and os.path.exists(zip_path):
    print(f"Arquivos já existem em {output_dir}. Pulando processamento.")
    df_final = pd.read_csv(csv_path, sep=';')
else:
    os.makedirs(output_dir, exist_ok=True)
    
    col_registro_cad = 'REGISTRO_OPERADORA'
    col_registro_cont = 'REG_ANS'

    df_contabil_validado[col_registro_cont] = df_contabil_validado[col_registro_cont].astype(str).str.strip()
    df_cadastro[col_registro_cad] = df_cadastro[col_registro_cad].astype(str).str.strip()

    df_final = pd.merge(
        df_contabil_validado,
        df_cadastro[[col_registro_cad, 'CNPJ', 'Razao_Social', 'casos_suspeitos', 'Modalidade', 'UF']],
        left_on=col_registro_cont,
        right_on=col_registro_cad,
        how='left'
    )

    df_entrega = df_final[[
        'CNPJ', 'Razao_Social', 'Trimestre', 'Ano', 'VL_SALDO_FINAL',
        'data_inconsistente', 'valor_suspeito', 'casos_suspeitos'
    ]].rename(columns={'Razao_Social': 'RazaoSocial', 'VL_SALDO_FINAL': 'ValorDespesas'})

    df_entrega.to_csv(csv_path, sep=';', index=False, encoding='utf-8')
    df_entrega.to_csv(
        zip_path, 
        sep=';', 
        index=False, 
        encoding='utf-8', 
        compression={'method': 'zip', 'archive_name': 'consolidado_despesas.csv'}
    )
    print("Processamento concluído e arquivos salvos.")

Arquivos já existem em output/consolidado. Pulando processamento.


In [20]:
def validar_cnpj(cnpj):
    cnpj = re.sub(r'\D', '', str(cnpj))
    if len(cnpj) != 14 or len(set(cnpj)) == 1:
        return False
    def calcular_digito(cnpj, pesos):
        soma = sum(int(a) * b for a, b in zip(cnpj, pesos))
        resto = soma % 11
        return 0 if resto < 2 else 11 - resto
    pesos1 = [5, 4, 3, 2, 9, 8, 7, 6, 5, 4, 3, 2]
    pesos2 = [6, 5, 4, 3, 2, 9, 8, 7, 6, 5, 4, 3, 2]
    return int(cnpj[12]) == calcular_digito(cnpj[:12], pesos1) and \
           int(cnpj[13]) == calcular_digito(cnpj[:13], pesos2)

output_dir_agregado = 'output/agregado'
agregado_path = os.path.join(output_dir_agregado, 'despesas_agregadas.csv')
zip_final_path = os.path.join(output_dir_agregado, 'Teste_Nathan.zip')

if os.path.exists(agregado_path):
    print(f"Arquivo {agregado_path} já existe. Pulando agregação.")
else:
    os.makedirs(output_dir_agregado, exist_ok=True)
    
    col_reg = 'REGISTRO_OPERADORA'
    df_validacao = pd.merge(
        df_contabil_validado,
        df_cadastro[[col_reg, 'CNPJ', 'Razao_Social', 'Modalidade', 'UF']],
        left_on='REG_ANS',
        right_on=col_reg,
        how='left'
    )

    df_limpo = df_validacao[
        (df_validacao['VL_SALDO_FINAL'] > 0) & 
        (df_validacao['Razao_Social'].notna()) & 
        (df_validacao['Razao_Social'].str.strip() != '')
    ].copy()

    df_limpo['cnpj_valido'] = df_limpo['CNPJ'].apply(validar_cnpj)
    df_limpo = df_limpo[df_limpo['cnpj_valido'] == True]

    df_agregado = df_limpo.groupby(
        ['CNPJ', 'Razao_Social', 'Modalidade', 'UF', 'Trimestre', 'Ano', col_reg]
    ).agg(
        ValorDespesas=('VL_SALDO_FINAL', 'sum')
    ).reset_index()

    stats = df_agregado.groupby(['CNPJ', 'Razao_Social']).agg(
        MediaTrimestral=('ValorDespesas', 'mean'),
        DesvioPadraoTrimestral=('ValorDespesas', 'std')
    ).reset_index()

    df_final_entrega = pd.merge(df_agregado, stats, on=['CNPJ', 'Razao_Social'], how='left')

    df_final_entrega = df_final_entrega.rename(columns={
        'Razao_Social': 'RazaoSocial',
        col_reg: 'RegistroANS'
    })

    df_final_entrega = df_final_entrega.sort_values(by=['ValorDespesas'], ascending=False)

    df_final_entrega.to_csv(agregado_path, sep=';', index=False, encoding='utf-8')
    
    df_final_entrega.to_csv(
        zip_final_path, 
        sep=';', 
        index=False, 
        encoding='utf-8', 
        compression={'method': 'zip', 'archive_name': 'despesas_agregadas.csv'}
    )
    print("Processamento concluído com métricas trimestrais.")

Arquivo output/agregado\despesas_agregadas.csv já existe. Pulando agregação.
