## CNES Transformations (Curated Estabelecimentos and Serviços)

This notebook reproduces the transformations from legacy Spark code using Polars (fast, memory-efficient) with a pandas fallback. It reads CSVs from `local_storage/csv/cnes_extract_202503`, performs joins/filters, and writes curated outputs.

- Input period: `202503`
- State filter: `SP` (CO_ESTADO_GESTOR == 35)
- Outputs:
  - `exploration/output/curated/curated_servicos_202503.csv`
  - `exploration/output/curated/curated_estabelecimentos_202503.csv`

If Polars is not installed, the first code cell will install it automatically.


In [None]:
# Setup: imports and installs
import sys, subprocess

def ensure(package):
    try:
        __import__(package)
    except ImportError:
        subprocess.check_call([sys.executable, '-m', 'pip', 'install', package])

# prefer polars for large CSVs
ensure('polars')
import polars as pl

from datetime import datetime
from pathlib import Path

# constants
ANO_MES = '202503'
BASE_DIR = Path('/Users/caio.maximiano/pessoal/cnes-project-analysis')
INPUT_DIR = BASE_DIR / 'local_storage' / 'csv' / f'cnes_extract_{ANO_MES}'
OUTPUT_DIR = BASE_DIR / 'exploration' / 'output' / 'curated'
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

STATE_CODE_SP = 35
STATE_UF_SP = "SP"


Collecting polars
  Downloading polars-1.32.3-cp39-abi3-macosx_11_0_arm64.whl.metadata (15 kB)
Downloading polars-1.32.3-cp39-abi3-macosx_11_0_arm64.whl (34.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m34.6/34.6 MB[0m [31m5.1 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hInstalling collected packages: polars
Successfully installed polars-1.32.3



[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.1.1[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [2]:
# Utilities

def read_cnes_csv(name: str) -> pl.DataFrame:
    """Reads a CNES CSV for the given logical name (without ANO_MES suffix)."""
    path = INPUT_DIR / f'{name}{ANO_MES}.csv'
    # CNES CSVs are semicolon-delimited, quoted with ", many big files
    return pl.read_csv(path, separator=';', quote_char='"', ignore_errors=True, infer_schema_length=1000)


def write_csv(df: pl.DataFrame, name: str):
    path = OUTPUT_DIR / f'{name}_{ANO_MES}.csv'
    df.write_csv(path)
    return path

# Small helpers analogous to Spark expressions
from datetime import date

def current_date_str():
    return date.today().isoformat()




In [5]:
# Load required tables

tbEstabelecimento = read_cnes_csv('tbEstabelecimento')
tbMunicipio = read_cnes_csv('tbMunicipio')
rlEstabServClass = read_cnes_csv('rlEstabServClass')
tbClassificacaoServico = read_cnes_csv('tbClassificacaoServico')
tbCargaHorariaSus = read_cnes_csv('tbCargaHorariaSus')
tbAtividadeProfissional = read_cnes_csv('tbAtividadeProfissional')
tbDadosProfissionalSus = read_cnes_csv('tbDadosProfissionalSus')

# Show schemas
for name, df in [
    ('tbEstabelecimento', tbEstabelecimento),
    ('tbMunicipio', tbMunicipio),
    ('rlEstabServClass', rlEstabServClass),
    ('tbClassificacaoServico', tbClassificacaoServico),
    ('tbCargaHorariaSus', tbCargaHorariaSus),
    ('tbAtividadeProfissional', tbAtividadeProfissional),
    ('tbDadosProfissionalSus', tbDadosProfissionalSus),
]:
    print(name, df.columns[:10])


ComputeError: invalid utf-8 sequence

In [None]:
# Normalize dtypes for join keys and filters

# Helper to cast only if column exists

def cast_if_present(df: pl.DataFrame, casts: dict) -> pl.DataFrame:
    present = [pl.col(col).cast(dtype) for col, dtype in casts.items() if col in df.columns]
    return df.with_columns(present) if present else df

# Keys as Utf8 to avoid dtype mismatch and preserve leading zeros

tbEstabelecimento = cast_if_present(
    tbEstabelecimento,
    {
        'CO_UNIDADE': pl.Utf8,
        'CO_MUNICIPIO_GESTOR': pl.Utf8,
        'CO_ESTADO_GESTOR': pl.Int64,
        'CO_SIGLA_ESTADO': pl.Utf8,
    },
)

tbMunicipio = cast_if_present(
    tbMunicipio,
    {
        'CO_MUNICIPIO': pl.Utf8,
        'NO_MUNICIPIO': pl.Utf8,
        'CO_SIGLA_ESTADO': pl.Utf8,
    },
)

rlEstabServClass = cast_if_present(
    rlEstabServClass,
    {
        'CO_UNIDADE': pl.Utf8,
        'CO_SERVICO': pl.Utf8,
        'CO_CLASSIFICACAO': pl.Utf8,
    },
)

tbClassificacaoServico = cast_if_present(
    tbClassificacaoServico,
    {
        'CO_SERVICO_ESPECIALIZADO': pl.Utf8,
        'CO_CLASSIFICACAO_SERVICO': pl.Utf8,
        'DS_CLASSIFICACAO_SERVICO': pl.Utf8,
    },
)

tbCargaHorariaSus = cast_if_present(
    tbCargaHorariaSus,
    {
        'CO_UNIDADE': pl.Utf8,
        'CO_PROFISSIONAL_SUS': pl.Utf8,
        'CO_CBO': pl.Utf8,
        'TP_SUS_NAO_SUS': pl.Utf8,
    },
)

tbAtividadeProfissional = cast_if_present(
    tbAtividadeProfissional,
    {
        'CO_CBO': pl.Utf8,
        'DS_ATIVIDADE_PROFISSIONAL': pl.Utf8,
    },
)

tbDadosProfissionalSus = cast_if_present(
    tbDadosProfissionalSus,
    {
        'CO_PROFISSIONAL_SUS': pl.Utf8,
        'NO_PROFISSIONAL': pl.Utf8,
    },
)


In [None]:
# Transform: estab_munic join with SP filter
# cond_sp: tbEstabelecimento.CO_MUNICIPIO_GESTOR == tbMunicipio.CO_MUNICIPIO and tbEstabelecimento.CO_ESTADO_GESTOR == 35

# Allow fallback by UF if numeric code is missing in some drops
estab_filtered = tbEstabelecimento.filter(
    (pl.col('CO_ESTADO_GESTOR') == STATE_CODE_SP) |
    (pl.col('CO_SIGLA_ESTADO') == STATE_UF_SP)
)

estab_munic = (
    estab_filtered
    .join(tbMunicipio, left_on='CO_MUNICIPIO_GESTOR', right_on='CO_MUNICIPIO', how='inner', suffix='_mun')
)

# Normalize expected right-side names to unsuffixed
rename_map = {}
for col in ['NO_MUNICIPIO', 'CO_MUNICIPIO', 'CO_SIGLA_ESTADO']:
    right = f"{col}_mun"
    if right in estab_munic.columns:
        rename_map[right] = col
if rename_map:
    estab_munic = estab_munic.rename(rename_map)

print('estab_munic', estab_munic.shape)



NameError: name 'tbEstabelecimento' is not defined

In [None]:
# Transform: curated_servicos
# cond_serv: rlEstabServClass.CO_SERVICO == tbClassificacaoServico.CO_SERVICO_ESPECIALIZADO AND
#            rlEstabServClass.CO_CLASSIFICACAO == tbClassificacaoServico.CO_CLASSIFICACAO_SERVICO

serv_join = (
    rlEstabServClass
    .join(
        tbClassificacaoServico,
        left_on=['CO_SERVICO', 'CO_CLASSIFICACAO'],
        right_on=['CO_SERVICO_ESPECIALIZADO', 'CO_CLASSIFICACAO_SERVICO'],
        how='inner',
        suffix='_cls'
    )
    .join(
        estab_munic,
        left_on='CO_UNIDADE',
        right_on='CO_UNIDADE',
        how='inner'
    )
)

curated_servicos = (
    serv_join
    .select([
        'CO_UNIDADE',
        'NO_MUNICIPIO',
        'CO_MUNICIPIO',
        'CO_SERVICO',
        'CO_CLASSIFICACAO',
        'DS_CLASSIFICACAO_SERVICO',
    ])
    .with_columns([
        (pl.col('CO_UNIDADE').cast(pl.Utf8) + '_' + pl.col('CO_SERVICO').cast(pl.Utf8) + '_' + pl.col('CO_CLASSIFICACAO').cast(pl.Utf8)).alias('SK_REGISTRO'),
        pl.lit(current_date_str()).alias('DATA_INGESTAO')
    ])
    .unique(subset=['SK_REGISTRO'])
)

print('curated_servicos', curated_servicos.shape)



In [None]:
# Transform: curated_estabelecimentos

joined = (
    tbCargaHorariaSus
    .join(tbAtividadeProfissional, left_on='CO_CBO', right_on='CO_CBO', how='inner')
    .join(estab_munic, left_on='CO_UNIDADE', right_on='CO_UNIDADE', how='inner')
    .join(tbDadosProfissionalSus, left_on='CO_PROFISSIONAL_SUS', right_on='CO_PROFISSIONAL_SUS', how='inner')
)

curated_estabelecimentos = (
    joined
    .select([
        'CO_UNIDADE',
        'CO_PROFISSIONAL_SUS',
        'NO_PROFISSIONAL',
        'CO_CBO',
        'TP_SUS_NAO_SUS',
        'DS_ATIVIDADE_PROFISSIONAL',
        'NO_FANTASIA',
        'NO_BAIRRO',
        'NO_MUNICIPIO',
        'CO_MUNICIPIO',
        'CO_SIGLA_ESTADO',
        'CO_CEP',
    ])
    .with_columns([
        (pl.col('CO_CEP').cast(pl.Utf8) + ',' + pl.col('NO_MUNICIPIO').cast(pl.Utf8) + ',' + pl.col('CO_SIGLA_ESTADO').cast(pl.Utf8) + ',Brasil').alias('ds_localidade'),
        (pl.col('CO_UNIDADE').cast(pl.Utf8) + '_' + pl.col('CO_PROFISSIONAL_SUS').cast(pl.Utf8) + '_' + pl.col('CO_CBO').cast(pl.Utf8)).alias('SK_REGISTRO'),
        pl.lit(current_date_str()).alias('DATA_INGESTAO')
    ])
    .unique(subset=['SK_REGISTRO'])
)

print('curated_estabelecimentos', curated_estabelecimentos.shape)



In [None]:
# Write outputs

path_serv = write_csv(curated_servicos, 'curated_servicos')
path_estab = write_csv(curated_estabelecimentos, 'curated_estabelecimentos')

print('Wrote:', path_serv)
print('Wrote:', path_estab)
