# Bronze to Silver - Estabelecimento

Os dados de bronze agora passam por um tratamento de dados para ficarem padronizados e j치 estejam em uma estrutura confi치vel para ser utilizada.

Selecionando catalogo `saude_sus`

In [0]:
spark.sql('USE CATALOG saude_sus')

Imports e variavels globais:

In [0]:
import pyspark.sql.functions as sf
from pyspark.sql.types import *
from pyspark.sql.window import Window
from delta.tables import DeltaTable

MAX_INT = 2147483647

Funcoes para tratamento dos dados:

In [0]:
def apply_casting(df, max_int):
    for column_name in df.columns:
        
        if column_name.startswith('FLG_'):
            df = df.withColumn(
                column_name,
                sf.when(sf.col(column_name).isNull(), None)
                .when(sf.trim(sf.col(column_name)).isin('NAO', 'N', '0', 'no', 'false'), False)
                .when(sf.trim(sf.col(column_name)).isin('SIM', 'S', '1', 'yes', 'true'), True)
                .when(sf.expr(f"try_cast({column_name} as int)") == 0, False)
                .when(sf.expr(f"try_cast({column_name} as int)") == 1, True)
                .otherwise(None).cast(BooleanType())
            )

        elif column_name == 'NUM_LOGRADOURO_ESTABELECIMENTO':
            df = df.withColumn(column_name, sf.expr("try_cast(NUM_LOGRADOURO_ESTABELECIMENTO as long)"))
            
            df = df.withColumn(column_name, 
                sf.when(
                    (sf.col(column_name) > max_int) | 
                    (sf.col(column_name) <= 0),
                    None
                ).otherwise(sf.col(column_name))
            )

        elif column_name.startswith(("COD_", "NUM_")):
            df = df.withColumn(column_name, sf.expr(f"try_cast({column_name} as int)"))

        elif column_name.startswith("DAT_"):
            df = df.withColumn(column_name, sf.col(column_name).cast(DateType()))

        elif "LATITUDE" in column_name or "LONGITUDE" in column_name:
            df = df.withColumn(column_name, sf.expr(f"try_cast({column_name} as double)"))

        else:
            df = df.withColumn(column_name, sf.col(column_name).cast(StringType()))
            
    return df

def normalize_phone_number(df):
    for column_name in df.columns:
        if column_name.startswith('TEL_'):
            df = df.withColumn(column_name, sf.regexp_replace(sf.col(column_name), '[^0-9]', ''))
    return df

In [0]:
df_bronze = spark.read.table('saude_sus.raw.raw_estabelecimento')

In [0]:
df_bronze = df_bronze.drop("_airbyte_generation_id", "_airbyte_meta", "_airbyte_raw_id", "_airbyte_extracted_at")

In [0]:
new_columns = {
    'codigo_uf' : 'COD_UF',
    'codigo_cnes' : 'COD_CNES',
    'numero_cnpj' : 'DSC_DOCUMENTO',
    'tipo_gestao' : 'DSC_TIPO_GESTAO',
    'nome_fantasia' : 'NOM_FANTASIA',
    'codigo_municipio' : 'COD_MUNICIPIO',
    'data_atualizacao' : 'DAT_ATUALIZACAO',
    'nome_razao_social' : 'NOM_RAZAO_SOCIAL',
    'codigo_tipo_unidade' : 'COD_TIPO_UNIDADE',
    'bairro_estabelecimento': 'END_BAIRRO_ESTABELECIMENTO',
    'numero_estabelecimento': 'NUM_LOGRADOURO_ESTABELECIMENTO',
    'endereco_estabelecimento': 'END_LOGRADOURO_ESTABELECIMENTO',
    'codigo_cep_estabelecimento': 'END_CEP_ESTABELECIMENTO',
    'descricao_turno_atendimento': 'DSC_TURNO_ATENDIMENTO',
    'codigo_estabelecimento_saude': 'DSC_ESTABELECIMENTO_SAUDE',
    'endereco_email_estabelecimento': 'DSC_EMAIL_ESTABELECIMENTO',
    'codigo_atividade_ensino_unidade': 'COD_ATIVIDADE_ENSINO',
    'descricao_esfera_administrativa': 'DSC_ESFERA_ADMINISTRATIVA',
    'numero_telefone_estabelecimento': 'TEL_ESTABELECIMENTO',
    'codigo_esfera_administrativa_unidade': 'DSC_ESFERA_ADMINISTRATIVA_ABREVIADO',
    'estabelecimento_possui_servico_apoio': 'FLG_POSSUI_SERVICO_APOIO',
    'latitude_estabelecimento_decimo_grau': 'END_LATITUDE_ESTABELECIMENTO',
    'longitude_estabelecimento_decimo_grau': 'END_LONGITUDE_ESTABELECIMENTO',
    'codigo_identificador_turno_atendimento': 'COD_IDENTIFICADOR_TURNO_ATENDIMENTO',
    'estabelecimento_possui_centro_neonatal': 'FLG_POSSUI_CENTRO_NEONATAL',
    'estabelecimento_possui_centro_cirurgico': 'FLG_POSSUI_CENTRO_CIRURGICO',
    'estabelecimento_possui_centro_obstetrico': 'FLG_POSSUI_CENTRO_OBSTETRICO',
    'descricao_natureza_juridica_estabelecimento': 'DSC_NATUREZA_JURIDICA',
    'estabelecimento_possui_atendimento_hospitalar': 'FLG_POSSUI_ATENDIMENTO_HOSPITALAR',
    'estabelecimento_possui_atendimento_ambulatorial': 'FLG_POSSUI_ATENDIMENTO_AMBULATORIAL',
    'estabelecimento_faz_atendimento_ambulatorial_sus': 'FLG_POSSUI_ATENDIMENTO_AMBULATORIAL_SUS',
    'numero_cnpj_entidade' : 'COD_CNPJ_ENTIDADE'
}

df_renamed = df_bronze.withColumnsRenamed(new_columns)

In [0]:
df_silver = (
    df_renamed
    .transform(lambda df: apply_casting(df, MAX_INT))
    .transform(normalize_phone_number)
    .where(sf.col("COD_CNES").isNotNull())
    .dropDuplicates()
)

In [0]:
map_uf = {
    "11": "RO", "12": "AC", "13": "AM", "14": "RR", "15": "PA", "16": "AP", "17": "TO",
    "21": "MA", "22": "PI", "23": "CE", "24": "RN", "25": "PB", "26": "PE", "27": "AL", "28": "SE", "29": "BA",
    "31": "MG", "32": "ES", "33": "RJ", "35": "SP",
    "41": "PR", "42": "SC", "43": "RS",
    "50": "MS", "51": "MT", "52": "GO", "53": "DF"
}

mapping_expr = sf.create_map([sf.lit(x) for x in [val for pair in map_uf.items() for val in pair]])

df_silver = df_silver.withColumn(
    "DSC_UF", 
    mapping_expr[(sf.col("COD_UF").cast('string'))]
)

In [0]:
df_silver_hist = df_silver.withColumn("DAT_INICIO_VIGENCIA", sf.col("DAT_ATUALIZACAO").cast("timestamp")) \
                               .withColumn("DAT_FIM_VIGENCIA", sf.lit(None).cast("timestamp")) \
                               .withColumn("FLG_VIGENTE", sf.lit(True))


In [0]:
colunas = [
    'COD_UF', 'COD_CNES', 'DSC_DOCUMENTO', 'DSC_TIPO_GESTAO', 'NOM_FANTASIA', 
    'COD_MUNICIPIO', 'NOM_RAZAO_SOCIAL', 'COD_TIPO_UNIDADE', 
    'END_BAIRRO_ESTABELECIMENTO', 'NUM_LOGRADOURO_ESTABELECIMENTO', 
    'END_LOGRADOURO_ESTABELECIMENTO', 'END_CEP_ESTABELECIMENTO', 
    'DSC_TURNO_ATENDIMENTO', 'DSC_ESTABELECIMENTO_SAUDE', 
    'DSC_EMAIL_ESTABELECIMENTO', 'COD_ATIVIDADE_ENSINO', 
    'DSC_ESFERA_ADMINISTRATIVA', 'TEL_ESTABELECIMENTO', 
    'DSC_ESFERA_ADMINISTRATIVA_ABREVIADO', 'FLG_POSSUI_SERVICO_APOIO', 
    'END_LATITUDE_ESTABELECIMENTO', 'END_LONGITUDE_ESTABELECIMENTO', 
    'COD_IDENTIFICADOR_TURNO_ATENDIMENTO', 'FLG_POSSUI_CENTRO_NEONATAL', 
    'FLG_POSSUI_CENTRO_CIRURGICO', 'FLG_POSSUI_CENTRO_OBSTETRICO', 
    'DSC_NATUREZA_JURIDICA', 'FLG_POSSUI_ATENDIMENTO_HOSPITALAR', 
    'FLG_POSSUI_ATENDIMENTO_AMBULATORIAL', 'FLG_POSSUI_ATENDIMENTO_AMBULATORIAL_SUS', 
    'COD_CNPJ_ENTIDADE'
]

In [0]:
change_condition = " OR ".join([
    f"(target.{col} != source.{col} OR (target.{col} IS NULL AND source.{col} IS NOT NULL) OR (target.{col} IS NOT NULL AND source.{col} IS NULL))"
    for col in colunas
])

In [0]:
if not spark.catalog.tableExists("saude_sus.trusted.TRU_ESTABELECIMENTO"):
    print("Primeira carga: Carregando TODO o hist칩rico da RAW...")
    
    window_spec = Window.partitionBy("COD_CNES").orderBy("DAT_ATUALIZACAO")
    
    df_silver_with_rank = df_silver \
        .withColumn("rank", sf.row_number().over(
            Window.partitionBy("COD_CNES").orderBy(sf.col("DAT_ATUALIZACAO").desc())
        ))
    
    df_silver_scd2 = df_silver_with_rank \
        .withColumn("DAT_INICIO_VIGENCIA", sf.col("DAT_ATUALIZACAO").cast("timestamp")) \
        .withColumn("FLG_VIGENTE", sf.when(sf.col("rank") == 1, sf.lit(True)).otherwise(sf.lit(False))) \
        .drop("rank")
    
    df_silver_final = df_silver_scd2 \
        .withColumn("DAT_FIM_VIGENCIA", 
                    sf.lead("DAT_INICIO_VIGENCIA").over(window_spec))
    
    df_silver_final.write \
        .format("delta") \
        .saveAsTable("saude_sus.trusted.TRU_ESTABELECIMENTO")
    
    print("Tabela criada com hist칩rico completo da RAW")
    
else:
    target_table = DeltaTable.forName(spark, "saude_sus.trusted.TRU_ESTABELECIMENTO")
    
    target_table.alias("target").merge(
        df_silver_hist.alias("source"),
        "target.COD_CNES = source.COD_CNES AND target.FLG_VIGENTE = true"
    ).whenMatchedUpdate(
        condition = f"source.DAT_ATUALIZACAO > target.DAT_ATUALIZACAO AND ({change_condition})",
        set = {
            "DAT_FIM_VIGENCIA": "source.DAT_ATUALIZACAO",
            "FLG_VIGENTE": "false"
        }
    ).execute()

    df_to_load = df_silver_hist.alias("s").join(
        target_table.toDF().alias("t"),
        (sf.col("s.COD_CNES") == sf.col("t.COD_CNES")) & (sf.col("t.FLG_VIGENTE") == True),
        "left_anti"
    )

    df_to_load.write.format("delta") \
        .option("mergeSchema", "true") \
        .mode("append") \
        .saveAsTable("saude_sus.trusted.TRU_ESTABELECIMENTO")
    print('Job TRU_ESTABELECIMENTO finalizado com sucesso.')