## Pipeline: Bronze to Silver

This pipeline ingests raw data from the Bronze layer and applies cleansing, normalization, and validation rules to produce a standardized and reliable Silver dataset, ready for analytical and downstream consumption.

In [51]:
silver_path = "abfss://silver@lablicitacoessa.dfs.core.windows.net/Licitacoes_Gov/"
bronze_path = "abfss://bronze@lablicitacoessa.dfs.core.windows.net/Licitacoes_Gov/"

df_bronze = spark.read.format("delta").load(bronze_path)

display(df_bronze)

StatementMeta(sparkpool1, 1, 52, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 0f74b638-5056-4150-9972-d703d9b8d5ca)

In [52]:
from pyspark.sql.functions import col, cast


df_silver = df_bronze.select(
    col("airbyte_id").cast("string"),
    col("codigo_municipio_uasg").cast("string"),
    col("data_abertura_proposta").cast("date"),
    col("data_entrega_edital").cast("date"),
    col("data_entrega_proposta").cast("date"),
    col("data_publicacao").cast("date"),
    col("dt_alteracao").cast("date"),
    col("endereco_entrega_edital").cast("string"),
    col("funcao_responsavel").cast("string"),
    col("id_compra").cast("string"),
    col("identificador").cast("string"),
    col("informacoes_gerais").cast("string"),
    col("modalidade").cast("long"),
    col("nome_modalidade").cast("string"),
    col("nome_responsavel").cast("string"),
    col("numero_aviso").cast("long"),
    col("numero_processo").cast("string"),
    col("objeto").cast("string"),
    col("pertence14133").cast("boolean"),
    col("situacao_aviso").cast("string"),
    col("tipo_pregao").cast("string"),
    col("tipo_recurso").cast("string"),
    col("uasg").cast("string"),
    col("valor_estimado_total").cast("double"),
)

df_silver = df_silver.withColumnsRenamed({
    "airbyte_id" : "DSC_SOURCE_ID",
    "codigo_municipio_uasg": "COD_MUNICIPIO_UASG",
    "data_abertura_proposta": "DAT_ABERTURA_PROPOSTA",
    "data_entrega_edital": "DAT_ENTREGA_EDITAL",
    "data_entrega_proposta": "DAT_ENTREGA_PROPOSTA",
    "data_publicacao": "DAT_PUBLICACAO",
    "dt_alteracao": "DAT_ALTERACAO",
    "endereco_entrega_edital": "END_ENTREGA_EDITAL",
    "funcao_responsavel": "DSC_FUNCAO_RESPONSAVEL",
    "id_compra": "COD_COMPRA",
    "identificador": "DSC_IDENTIFICADOR",
    "informacoes_gerais": "DSC_INFORMACOES_GERAIS",
    "modalidade": "NUM_MODALIDADE",
    "nome_modalidade": "NOM_MODALIDADE",
    "nome_responsavel": "NOM_RESPONSAVEL",
    "numero_aviso": "NUM_AVISOS",
    "numero_processo": "NUM_PROCESSO",
    "objeto": "DSC_OBJETO",
    "pertence14133": "IDT_PERTENCE_14133",
    "situacao_aviso": "DSC_SITUACAO_AVISO",
    "tipo_pregao": "DSC_TIPO_PREGAO",
    "tipo_recurso": "DSC_TIPO_RECURSO",
    "uasg": "COD_UASG",
    "valor_estimado_total": "VAL_ESTIMADO_TOTAL",
})

StatementMeta(sparkpool1, 1, 53, Finished, Available, Finished)

In [53]:
from pyspark.sql import functions as sf

accents = "áàâãäéèêëíìîïóòôõöúùûüçÁÀÂÃÄÉÈÊËÍÌÎÏÓÒÔÕÖÚÙÛÜÇ"
without_accents = "aaaaaeeeeiiiiooooouuuucAAAAAEEEEIIIIOOOOOUUUUC"

df_silver_cols = df_silver.columns

for col_name in df_silver_cols:
    if (col_name.startswith("DSC_") or col_name.startswith("NOM_") or col_name.startswith("END_")) and col_name != "DSC_SOURCE_ID":
        df_silver = df_silver.withColumn(
            col_name, 
            sf.translate(
                sf.upper(sf.trim(sf.col(col_name))), 
                accents, 
                without_accents
            )
        )


StatementMeta(sparkpool1, 1, 54, Finished, Available, Finished)

In [54]:
from pyspark.sql.functions import coalesce, lit

df_silver = df_silver.withColumn(
    "DSC_INFORMACOES_GERAIS",
    coalesce(col("DSC_INFORMACOES_GERAIS"), lit(""))
)

df_silver = df_silver.withColumn(
    "VAL_ESTIMADO_TOTAL",
    coalesce(col("VAL_ESTIMADO_TOTAL"), lit(0))
)

StatementMeta(sparkpool1, 1, 55, Finished, Available, Finished)

In [55]:
display(df_silver)

StatementMeta(sparkpool1, 1, 56, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 093d18be-75dc-47f4-931b-c7b5b5618fdc)

In [56]:
try:
    delta_table = DeltaTable.forPath(spark, silver_path)
    delta_table.alias("target").merge(
        df_silver.alias("source"),
        "target.DSC_SOURCE_ID = source.DSC_SOURCE_ID"
    ).whenMatchedUpdateAll() \
    .whenNotMatchedInsertAll() \
    .execute()


    df_silver.write.format("delta").mode("overwrite").save(silver_path)

except Exception as e:
    print(f"\n Salvando nova tabela Delta em: {silver_path}")
    df_silver.write.mode("overwrite") \
        .format("delta") \
        .option("overwriteSchema", "true") \
        .save(silver_path)

StatementMeta(sparkpool1, 1, 57, Finished, Available, Finished)


 Salvando nova tabela Delta em: abfss://bronze@lablicitacoessa.dfs.core.windows.net/Licitacoes_Gov/
