# Importação de bibliotecas e dependências



In [None]:
!apt-get install openjdk-11-jdk -y
pip install pyspark
pip install requests beautifulsoup4 tqdm


Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
The following additional packages will be installed:
  fonts-dejavu-core fonts-dejavu-extra libatk-wrapper-java
  libatk-wrapper-java-jni libxt-dev libxtst6 libxxf86dga1 openjdk-11-jre
  x11-utils
Suggested packages:
  libxt-doc openjdk-11-demo openjdk-11-source visualvm mesa-utils
The following NEW packages will be installed:
  fonts-dejavu-core fonts-dejavu-extra libatk-wrapper-java
  libatk-wrapper-java-jni libxt-dev libxtst6 libxxf86dga1 openjdk-11-jdk
  openjdk-11-jre x11-utils
0 upgraded, 10 newly installed, 0 to remove and 35 not upgraded.
Need to get 6,920 kB of archives.
After this operation, 16.9 MB of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu jammy/main amd64 fonts-dejavu-core all 2.37-2build1 [1,041 kB]
Get:2 http://archive.ubuntu.com/ubuntu jammy/main amd64 fonts-dejavu-extra all 2.37-2build1 [2,041 kB]
Get:3 http://archive.ubuntu.com/ubuntu jam

In [None]:
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["PATH"] += ":/usr/lib/jvm/java-11-openjdk-amd64/bin"


# Extração dos dados do FTP da ANS

In [None]:
import os
import requests
from bs4 import BeautifulSoup
from tqdm import tqdm

# Diretório base do FTP exposto via HTTP
BASE_URL = "https://dadosabertos.ans.gov.br/FTP/PDA/informacoes_consolidadas_de_beneficiarios-024/"
ESTADOS_SUL = ["SC", "RS", "PR"]
ANO = "2025"

# Pasta local para salvar arquivos
DOWNLOAD_DIR = "dados_ans_sul"
os.makedirs(DOWNLOAD_DIR, exist_ok=True)

def get_links(url):
    response = requests.get(url)
    soup = BeautifulSoup(response.text, 'html.parser')
    return [a['href'] for a in soup.find_all('a', href=True)]

def baixar_arquivo(url, destino):
    response = requests.get(url, stream=True)
    tamanho_total = int(response.headers.get('content-length', 0))

    with open(destino, 'wb') as f, tqdm(
        desc=destino,
        total=tamanho_total,
        unit='B',
        unit_scale=True,
        unit_divisor=1024,
    ) as barra:
        for data in response.iter_content(chunk_size=1024):
            f.write(data)
            barra.update(len(data))

def extracao_dados_benificiario():
    # Pega os subdiretórios de 2025 (ex: 202501, 202502)
    meses_links = [link for link in get_links(BASE_URL) if link.startswith(ANO)]

    for mes in meses_links:
        mes_url = f"{BASE_URL}{mes}"
        arquivos = get_links(mes_url)

        for arquivo in arquivos:
            if arquivo.endswith(".zip") and any(f"-{uf}-" in arquivo for uf in ESTADOS_SUL):
                url_arquivo = f"{mes_url}{arquivo}"
                destino_local = os.path.join(DOWNLOAD_DIR, arquivo)

                if not os.path.exists(destino_local):
                    print(f"Baixando {arquivo}...")
                    baixar_arquivo(url_arquivo, destino_local)
                else:
                    print(f"Arquivo {arquivo} já existe. Pulando.")

extracao_dados_benificiario()

Baixando pda-024-icb-PR-2025_01.zip...


dados_ans_sul/pda-024-icb-PR-2025_01.zip: 100%|██████████| 26.2M/26.2M [00:02<00:00, 12.3MB/s]


Baixando pda-024-icb-RS-2025_01.zip...


dados_ans_sul/pda-024-icb-RS-2025_01.zip: 100%|██████████| 22.6M/22.6M [00:01<00:00, 12.4MB/s]


Baixando pda-024-icb-SC-2025_01.zip...


dados_ans_sul/pda-024-icb-SC-2025_01.zip: 100%|██████████| 17.0M/17.0M [00:01<00:00, 10.8MB/s]


Baixando pda-024-icb-PR-2025_02.zip...


dados_ans_sul/pda-024-icb-PR-2025_02.zip: 100%|██████████| 25.9M/25.9M [00:02<00:00, 13.4MB/s]


Baixando pda-024-icb-RS-2025_02.zip...


dados_ans_sul/pda-024-icb-RS-2025_02.zip: 100%|██████████| 22.5M/22.5M [00:01<00:00, 13.4MB/s]


Baixando pda-024-icb-SC-2025_02.zip...


dados_ans_sul/pda-024-icb-SC-2025_02.zip: 100%|██████████| 17.0M/17.0M [00:01<00:00, 10.8MB/s]


Baixando pda-024-icb-PR-2025_03.zip...


dados_ans_sul/pda-024-icb-PR-2025_03.zip: 100%|██████████| 26.0M/26.0M [00:02<00:00, 10.4MB/s]


Baixando pda-024-icb-RS-2025_03.zip...


dados_ans_sul/pda-024-icb-RS-2025_03.zip: 100%|██████████| 22.6M/22.6M [00:01<00:00, 12.4MB/s]


Baixando pda-024-icb-SC-2025_03.zip...


dados_ans_sul/pda-024-icb-SC-2025_03.zip: 100%|██████████| 17.0M/17.0M [00:01<00:00, 10.1MB/s]


Baixando pda-024-icb-PR-2025_04.zip...


dados_ans_sul/pda-024-icb-PR-2025_04.zip: 100%|██████████| 26.2M/26.2M [00:02<00:00, 12.8MB/s]


Baixando pda-024-icb-RS-2025_04.zip...


dados_ans_sul/pda-024-icb-RS-2025_04.zip: 100%|██████████| 22.7M/22.7M [00:01<00:00, 12.5MB/s]


Baixando pda-024-icb-SC-2025_04.zip...


dados_ans_sul/pda-024-icb-SC-2025_04.zip: 100%|██████████| 17.1M/17.1M [00:01<00:00, 11.7MB/s]


Baixando pda-024-icb-PR-2025_05.zip...


dados_ans_sul/pda-024-icb-PR-2025_05.zip: 100%|██████████| 24.1M/24.1M [00:02<00:00, 12.5MB/s]


Baixando pda-024-icb-RS-2025_05.zip...


dados_ans_sul/pda-024-icb-RS-2025_05.zip: 100%|██████████| 20.7M/20.7M [00:01<00:00, 11.5MB/s]


Baixando pda-024-icb-SC-2025_05.zip...


dados_ans_sul/pda-024-icb-SC-2025_05.zip: 100%|██████████| 16.0M/16.0M [00:01<00:00, 9.45MB/s]


# Descompactação de Arquivos


In [None]:
import os
import zipfile

# Diretórios
CAMINHO_ZIPS = "dados_ans_sul"
CAMINHO_EXTRAIDOS = "dados_extraidos"
CAMINHO_PARQUET = "dados_parquet"
os.makedirs(CAMINHO_EXTRAIDOS, exist_ok=True)
os.makedirs(CAMINHO_PARQUET, exist_ok=True)

def descomprimir_arquivos(diretorio):

    # Extrair arquivos .zip
    arquivos_zip = [f for f in os.listdir(diretorio) if f.endswith(".zip")]

    for arquivo_zip in arquivos_zip:
        caminho_zip = os.path.join(diretorio, arquivo_zip)

        with zipfile.ZipFile(caminho_zip, 'r') as zip_ref:
            zip_ref.extractall(CAMINHO_EXTRAIDOS)

descomprimir_arquivos(CAMINHO_ZIPS)




# Tratamento dos dados


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, trimfrom, split, sum as _sum, round
from pyspark.sql.types import IntegerType, StringType

# Inicializa Spark
spark = SparkSession.builder \
    .appName("ANS - Beneficiarios Sul") \
    .getOrCreate()

# Lendo os arquivos CSV extraídos
arquivos_csv = [os.path.join(CAMINHO_EXTRAIDOS, f) for f in os.listdir(CAMINHO_EXTRAIDOS) if f.endswith('.csv')]

dfs = []

for caminho_csv in arquivos_csv:
    try:
        df = spark.read \
            .option("header", "true") \
            .option("sep", ";") \
            .option("encoding", "latin1") \
            .csv(caminho_csv)

        # Remove espaços dos nomes das colunas
        for col_name in df.columns:
            df = df.withColumnRenamed(col_name, col_name.strip())

        # Tipos de colunas numéricas
        colunas_inteiras = [
            "CD_OPERADORA", "QT_BENEFICIARIO_ATIVO", "QT_BENEFICIARIO_ADERIDO",
            "QT_BENEFICIARIO_CANCELADO", "CD_PLANO", "CD_MUNICIPIO"
        ]

        for c in colunas_inteiras:
            if c in df.columns:
                df = df.withColumn(c, col(c).cast(IntegerType()))

        dfs.append(df)

    except Exception as e:
        print(f"Erro ao processar {caminho_csv}: {e}")

# Unir todos os DataFrames e validar o esquema
if dfs:
    df_final = dfs[0]
    for df in dfs[1:]:
        df_final = df_final.unionByName(df, allowMissingColumns=True)

    print("\n Esquema inferido:")
    df_final.printSchema()

else:
    print("Nenhum dado foi carregado.")


# Iremos analisar o churn de clientes nas unidades da UNIMED da região do Sul

df_churn = df_final.filter(col("NM_RAZAO_SOCIAL").startswith("UNIMED"))

# Agrupa por Competência (ID_CMPT_MOVEL), UF, Faixa Etária
df_churn = df_churn.filter(col("SG_UF").isin(["RS", "SC", "PR"])) \
    .groupBy("ID_CMPT_MOVEL", "SG_UF", "DE_FAIXA_ETARIA", "NM_RAZAO_SOCIAL", "DE_CONTRATACAO_PLANO") \
    .agg(
        _sum("QT_BENEFICIARIO_ATIVO").alias("BENEF_ATIVOS"),
        _sum("QT_BENEFICIARIO_ADERIDO").alias("BENEF_ADERIDOS"),
        _sum("QT_BENEFICIARIO_CANCELADO").alias("BENEF_CANCELADOS")
    )

# Calcula churn rate
df_churn = df_churn.withColumn(
    "CHURN_RATE",
    round((col("BENEF_CANCELADOS") / (col("BENEF_ATIVOS") + col("BENEF_ADERIDOS"))) * 100, 2)
)
df_churn.count()

df_churn = df_churn.withColumnRenamed("NM_RAZAO_SOCIAL", "RAZAO_SOCIAL")
df_churn = df_churn.withColumnRenamed("DE_FAIXA_ETARIA", "FAIXA_ETARIA")
df_churn = df_churn.withColumnRenamed("ID_CMPT_MOVEL", "COMPETENCIA")
df_churn = df_churn.withColumnRenamed("DE_CONTRATACAO_PLANO", "CONTRATACAO_PLANO")

df_churn = df_churn = df_churn.withColumn("ANO", split(col("COMPETENCIA"), "-").getItem(0))
df_churn = df_churn = df_churn.withColumn("MES", split(col("COMPETENCIA"), "-").getItem(1))

df_churn.write \
  .mode("overwrite") \
  .option("header", True) \
  .option("delimiter", ";") \
  .csv("dados")
