<a href="https://colab.research.google.com/github/Zhenriquee/ANALISE_OPERADORAS/blob/main/Extra%C3%A7%C3%A3o_e_Tratamento_dos_Dados_ANS.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Coleta e Tratamento de Dados da ANS

## Objetivo:
Realizar a extração dos dados da ANS com a finalidade de analisar o posicionamento da Unimed Caruaru com Relação as outras Operadoras, no final iremos armazenar esses dados em um arquivo .db e Utilizar o streamlit para projeção desses dados.

## Links Utilizados para Extração


*   [Qtd. Beneficiarios por Trimestre](https://dadosabertos.ans.gov.br/FTP/Base_de_dados/Microdados/dados_dbc/beneficiarios/operadoras/)
*   [Demonstração Contabeis](https://dadosabertos.ans.gov.br/FTP/PDA/demonstracoes_contabeis/)



### Bibliotecas Utilizadas

In [None]:
import os
import requests
import sqlite3
import pandas as pd
import uuid
import re
import io
import zipfile
import unicodedata
from bs4 import BeautifulSoup
from urllib.parse import urljoin
from datasus_dbc import decompress
from dbfread import DBF
from concurrent.futures import ProcessPoolExecutor, as_completed

## Inicio Extração e Tratamento Qtd. Beneficiarios por Trimestre



In [None]:
# --- FUNÇÕES AUXILIARES (Worker) ---
# Precisam estar fora da classe para o multiprocessing funcionar bem no Windows

def _gerar_chave_trimestre(id_cmpt):
    try:
        s_cmpt = str(id_cmpt).strip()
        if len(s_cmpt) < 6: return None
        ano = s_cmpt[:4]
        mes = int(s_cmpt[4:6])
        trimestre = (mes - 1) // 3 + 1
        return f"{ano}-T{trimestre}"
    except:
        return None

def processar_arquivo_worker(link):
    """
    Função isolada que roda em um núcleo separado da CPU.
    Baixa, Converte, Filtra e Agrupa. Retorna um DataFrame pronto (ou None).
    """
    nome_arquivo = link.split('/')[-1]

    # Gera nomes únicos para evitar colisão entre processos
    id_unico = str(uuid.uuid4())
    temp_dbc = f"temp_{id_unico}.dbc"
    temp_dbf = f"temp_{id_unico}.dbf"

    colunas_desejadas = ['ID_CMPT', 'CD_OPERADO', 'NR_BENEF_T']
    resultado_df = None

    try:
        # 1. Download
        r = requests.get(link, stream=True, timeout=30)
        with open(temp_dbc, 'wb') as f:
            for chunk in r.iter_content(chunk_size=8192):
                f.write(chunk)

        # 2. Descompressão
        decompress(temp_dbc, temp_dbf)

        # 3. Leitura e Pandas
        table = DBF(temp_dbf, encoding='iso-8859-1', load=True)
        df = pd.DataFrame(iter(table))

        if not df.empty:
            # Verifica colunas
            if all(col in df.columns for col in colunas_desejadas):
                df = df[colunas_desejadas].copy()
                df['NR_BENEF_T'] = pd.to_numeric(df['NR_BENEF_T'], errors='coerce').fillna(0)

                # Agrupa e Soma (Reduzindo drasticamente o tamanho dos dados antes de retornar)
                df_agrupado = df.groupby(['ID_CMPT', 'CD_OPERADO'], as_index=False)['NR_BENEF_T'].sum()

                # Cria chave Trimestre
                df_agrupado['ID_TRIMESTRE'] = df_agrupado['ID_CMPT'].apply(_gerar_chave_trimestre)

                resultado_df = df_agrupado
            else:
                print(f"   [Worker] Ignorado {nome_arquivo}: Colunas ausentes.")

    except Exception as e:
        print(f"   [Worker] Erro em {nome_arquivo}: {e}")

    finally:
        # Limpeza rigorosa dos arquivos temporários deste processo
        if os.path.exists(temp_dbc): os.remove(temp_dbc)
        if os.path.exists(temp_dbf): os.remove(temp_dbf)

    return resultado_df

# --- CLASSE PRINCIPAL ---

class ImportadorANSParalelo:
    def __init__(self, db_path='dados_ans.db'):
        self.db_path = db_path

    def etapa_1_e_2_obter_links(self, url_origem):
        print(f"--- Mapeando arquivos em: {url_origem} ---")
        try:
            response = requests.get(url_origem)
            soup = BeautifulSoup(response.content, 'html.parser')
            links = []
            for link in soup.find_all('a'):
                href = link.get('href')
                if href and href.lower().endswith('.dbc'):
                    links.append(urljoin(url_origem, href))
            print(f"Total de arquivos encontrados: {len(links)}")
            return links
        except Exception as e:
            print(f"Erro ao obter links: {e}")
            return []

    def etapa_3_processar_paralelo(self, lista_links, tabela_destino='beneficiarios_agrupados', max_workers=4):
        """
        Gerencia os workers e grava no banco sequencialmente.
        """
        conn = sqlite3.connect(self.db_path)
        total = len(lista_links)
        processados = 0

        print(f"--- Iniciando Processamento Paralelo ({max_workers} Workers) ---")

        # Inicia o Pool de Processos
        with ProcessPoolExecutor(max_workers=max_workers) as executor:
            # Submete todas as tarefas
            # future_to_link é um dicionário para rastrear qual link pertence a qual tarefa
            future_to_link = {executor.submit(processar_arquivo_worker, link): link for link in lista_links}

            for future in as_completed(future_to_link):
                processados += 1
                link = future_to_link[future]
                nome = link.split('/')[-1]

                try:
                    df_resultado = future.result()

                    if df_resultado is not None and not df_resultado.empty:
                        # O momento da escrita no banco é sequencial (Thread Principal)
                        df_resultado.to_sql(tabela_destino, conn, if_exists='append', index=False)
                        print(f"[{processados}/{total}] Salvo: {nome} ({len(df_resultado)} registros)")
                    else:
                        print(f"[{processados}/{total}] Vazio/Ignorado: {nome}")

                except Exception as exc:
                    print(f"[{processados}/{total}] Falha ao recuperar resultado de {nome}: {exc}")

        conn.close()
        print("--- Processo Paralelo Finalizado ---")

# --- EXECUÇÃO ---

if __name__ == "__main__":
    # URL da ANS
    url_ans = "https://dadosabertos.ans.gov.br/FTP/Base_de_dados/Microdados/dados_dbc/beneficiarios/operadoras/"

    # Define quantos núcleos do processador você quer usar
    # Se seu PC for potente, pode aumentar. Geralmente 4 ou 8 é um bom número.
    WORKERS = os.cpu_count() or 4

    bot = ImportadorANSParalelo(db_path='base_ans_paralela.db')

    links = bot.etapa_1_e_2_obter_links(url_ans)

    if links:
        # Executa em paralelo
        bot.etapa_3_processar_paralelo(links, max_workers=WORKERS)

## Inicio Extração e Tratamento Demonstração Contabeis

In [None]:
class ExtratorContabil:
    def __init__(self, db_path='dados_ans.db'):
        self.db_path = db_path
        self.url_base = "https://dadosabertos.ans.gov.br/FTP/PDA/demonstracoes_contabeis/"

    def _obter_links_anos(self):
        """Busca as pastas de anos (ex: 2022/, 2023/)"""
        print(f"--- Buscando pastas de anos em: {self.url_base} ---")
        try:
            response = requests.get(self.url_base)
            response.raise_for_status()
            soup = BeautifulSoup(response.content, 'html.parser')

            links_anos = []
            for link in soup.find_all('a'):
                href = link.get('href')
                # Procura por pastas que pareçam anos (4 digitos + /)
                if href and re.match(r'\d{4}/', href):
                    full_link = urljoin(self.url_base, href)
                    links_anos.append(full_link)

            return links_anos
        except Exception as e:
            print(f"Erro ao listar anos: {e}")
            return []

    def _obter_zips_do_ano(self, url_ano):
        """Dentro da pasta do ano, busca os arquivos .zip"""
        try:
            response = requests.get(url_ano)
            soup = BeautifulSoup(response.content, 'html.parser')

            links_zips = []
            for link in soup.find_all('a'):
                href = link.get('href')
                if href and href.lower().endswith('.zip'):
                    full_link = urljoin(url_ano, href)
                    links_zips.append(full_link)
            return links_zips
        except Exception as e:
            print(f"Erro ao listar zips de {url_ano}: {e}")
            return []

    def _extrair_trimestre_do_nome(self, nome_arquivo):
        """
        Analisa o nome '1T2024.zip' ou '2024_1T.zip' e retorna '2024-T1'
        """
        # Regex para capturar Trimestre (1 a 4) e Ano (20xx)
        # Padrão comum na ANS: 1T2022, 1t2022, etc.
        match = re.search(r'([1-4])t(20\d{2})', nome_arquivo.lower())

        if match:
            trimestre = match.group(1)
            ano = match.group(2)
            return f"{ano}-T{trimestre}"

        # Tentativa de padrão inverso (caso exista 2022_1t)
        match_inv = re.search(r'(20\d{2}).*([1-4])t', nome_arquivo.lower())
        if match_inv:
            ano = match_inv.group(1)
            trimestre = match_inv.group(2)
            return f"{ano}-T{trimestre}"

        return None

    def executar_extracao(self, tabela_destino='demonstracoes_contabeis'):
        conn = sqlite3.connect(self.db_path)

        pastas_anos = self._obter_links_anos()

        for pasta in pastas_anos:
            print(f"> Entrando na pasta: {pasta}")
            links_zips = self._obter_zips_do_ano(pasta)

            for link_zip in links_zips:
                nome_arquivo = link_zip.split('/')[-1]
                chave_trimestre = self._extrair_trimestre_do_nome(nome_arquivo)

                if not chave_trimestre:
                    print(f"   [Pular] Não foi possível identificar trimestre no nome: {nome_arquivo}")
                    continue

                print(f"   Processing: {nome_arquivo} -> Trimestre: {chave_trimestre}")

                try:
                    # 1. Download em Memória (Stream)
                    r = requests.get(link_zip)

                    # 2. Abrir ZIP da memória (io.BytesIO)
                    with zipfile.ZipFile(io.BytesIO(r.content)) as z:
                        # Procura o CSV dentro do zip
                        csvs = [n for n in z.namelist() if n.lower().endswith('.csv')]

                        if not csvs:
                            print("   [Erro] Nenhum CSV encontrado dentro do zip.")
                            continue

                        # Vamos assumir que o primeiro CSV é o correto
                        nome_csv = csvs[0]

                        # 3. Ler CSV direto do ZIP
                        with z.open(nome_csv) as f:
                            # ANS costuma usar separador ';' e encoding 'latin1' ou 'utf-8'
                            # 'CD_CONTA_CONTABIL' as vezes vem como string ou int, vamos forçar conversão depois
                            df = pd.read_csv(f, sep=';', encoding='iso-8859-1', dtype=str)

                            # Tratamento de colunas (Minúsculo para padronizar busca)
                            df.columns = [c.upper() for c in df.columns]

                            if 'CD_CONTA_CONTABIL' in df.columns:
                                # 4. Filtrar CD_CONTA_CONTABIL == 31
                                # Convertemos para string para garantir a comparação exata
                                df_filtrado = df[df['CD_CONTA_CONTABIL'] == '31'].copy()

                                if not df_filtrado.empty:
                                    # 5. Adicionar coluna de Trimestre
                                    df_filtrado['ID_TRIMESTRE'] = chave_trimestre

                                    # Conversão de tipos úteis (Ex: VL_SALDO_FINAL para float)
                                    # A ANS usa vírgula como decimal no CSV pt-br
                                    if 'VL_SALDO_FINAL' in df_filtrado.columns:
                                        df_filtrado['VL_SALDO_FINAL'] = df_filtrado['VL_SALDO_FINAL'].str.replace(',', '.', regex=False)
                                        df_filtrado['VL_SALDO_FINAL'] = pd.to_numeric(df_filtrado['VL_SALDO_FINAL'], errors='coerce')

                                    # 6. Salvar no Banco
                                    df_filtrado.to_sql(tabela_destino, conn, if_exists='append', index=False)
                                    print(f"      -> Sucesso! {len(df_filtrado)} registros conta 31 salvos.")
                                else:
                                    print("      -> Arquivo lido, mas sem registros da conta 31.")
                            else:
                                print(f"      -> Coluna CD_CONTA_CONTABIL não encontrada. Colunas: {list(df.columns)}")

                except Exception as e:
                    print(f"      -> Erro ao processar zip: {e}")

        conn.close()
        print("--- Extração Contábil Finalizada ---")

# --- EXECUÇÃO ---
if __name__ == "__main__":
    # Importante: Use o MESMO nome de banco que você usou no script anterior
    # para que as tabelas fiquem juntas.
    # Exemplo anterior: 'base_ans_paralela.db' ou 'base_ans_v2.db'

    db_nome = 'base_ans_paralela.db'

    print(f"Iniciando extração contábil para o banco: {db_nome}")
    extrator = ExtratorContabil(db_path=db_nome)
    extrator.executar_extracao()

## Inicio Extração Dimensão Operadora


In [None]:
class ImportadorCadop:
    def __init__(self, db_path, csv_path):
        self.db_path = db_path
        self.csv_path = csv_path

    def _limpar_nome_coluna(self, texto):
        """Limpa apenas o cabeçalho (nome da coluna) para o banco de dados"""
        txt = unicodedata.normalize('NFKD', str(texto)).encode('ASCII', 'ignore').decode('ASCII')
        return txt.strip().lower().replace(' ', '_').replace('.', '').replace('/', '')

    def processar_cadop(self, tabela_destino='dim_operadoras'):
        if not os.path.exists(self.csv_path):
            print(f"ERRO: Arquivo não encontrado: {self.csv_path}")
            return

        print(f"--- Atualizando Tabela CADOP (Correção de Acentos) ---")

        try:
            conn = sqlite3.connect(self.db_path)

            # 1. MUDANÇA CRÍTICA: encoding='utf-8-sig'
            # 'utf-8' resolve o problema do "SÃ£o".
            # O sufixo '-sig' é importante caso o arquivo tenha sido salvo pelo Excel (remove caracteres ocultos no início).
            # dtype=str: Lemos TUDO como texto para proteger CPNJ e Código ANS (zeros a esquerda).
            df = pd.read_csv(
                self.csv_path,
                sep=';',
                encoding='utf-8-sig',
                dtype=str
            )

            # 2. Padronizar nomes das colunas
            df.columns = [self._limpar_nome_coluna(c) for c in df.columns]

            # 3. TRATAMENTO INTELIGENTE DE COLUNAS
            # O usuário pediu para verificar string vs int.
            # Como carregamos tudo como 'str' para segurança, iteramos para limpar o texto.

            print("   Aplicando tratamento nas colunas de texto...")
            for coluna in df.columns:
                # Verifica se a coluna é do tipo objeto (string/texto)
                if df[coluna].dtype == 'object':
                    # .str.strip() remove espaços vazios no começo e fim que atrapalham SQL
                    # Ex: "São Paulo " vira "São Paulo"
                    df[coluna] = df[coluna].str.strip()

            print(f"   Colunas processadas: {list(df.columns[:5])} ...")

            # 4. Salvar no SQLite (DROP e CREATE com if_exists='replace')
            df.to_sql(tabela_destino, conn, if_exists='replace', index=False)

            # 5. Recriar Índice
            if 'registro_ans' in df.columns:
                cursor = conn.cursor()
                cursor.execute(f'CREATE INDEX IF NOT EXISTS idx_cadop_reg ON {tabela_destino}(registro_ans);')
                conn.commit()

            conn.close()

            # Validação visual
            exemplo_cidade = df['bairro'].iloc[0] if 'bairro' in df.columns else 'Coluna não achada'
            exemplo_razao = df['razao_social'].iloc[0]
            print(f"   -> Sucesso! Tabela atualizada.")
            print(f"   -> Teste de Acento: '{exemplo_razao}'")

        except UnicodeDecodeError:
            print("   -> ERRO DE ENCODING: O arquivo não é UTF-8. Tente trocar para 'latin-1' no código.")
        except Exception as e:
            print(f"   -> Erro ao processar: {e}")

# --- EXECUÇÃO ---
if __name__ == "__main__":
    db_nome = 'base_ans_paralela.db'
    arquivo_csv = 'Relatorio_cadop.csv'

    if os.path.exists(arquivo_csv):
        importador = ImportadorCadop(db_path=db_nome, csv_path=arquivo_csv)
        importador.processar_cadop()
    else:
        print(f"Arquivo {arquivo_csv} não encontrado.")