In [0]:
catalog = dbutils.widgets.get("catalog")
schema = dbutils.widgets.get("schema")
table_name = dbutils.widgets.get("table")

In [0]:
#catalog = "bronze"
#schema = "sistema_leitos"
#table_name = "leitos"
#table_id = "comp"

In [0]:
def table_exists(catalog, schema, tablename):
    query = f"show tables from {catalog}.{schema}"
    table_ok = spark.sql(query).filter(f"tableName = '{tablename}'").count()

    return table_ok == 1

In [0]:
def upsert_delta_table(
    df, catalog, schema, table_name, on_condition, columns_to_update
):
    # spark.catalog.tableExists("bronze.sistema_leitos.hospitais") check if column exist
    if not table_exists(catalog, schema, table_name):
        print(f"Criando tabela {catalog}.{schema}.{table_name}!!!")
        (
            df.coalesce(1)
            .write.format("delta")
            .mode("overwrite")
            .saveAsTable(f"{catalog}.{schema}.{table_name}")
        )

        return

    from delta.tables import DeltaTable

    print(f"Atualizando tabela {catalog}.{schema}.{table_name}!!!")

    delta_table = DeltaTable.forName(spark, f"{catalog}.{schema}.{table_name}")

    (
        delta_table.alias("target")
        .merge(df.alias("source"), on_condition)
        .whenMatchedUpdate(
            condition="target.row_hash != source.row_hash",
            set={
                **{c: f"source.{c}" for c in columns_to_update},
                "updated_at": "current_timestamp()",
            },
        )
        .whenNotMatchedInsertAll()
        .execute()
    )

    sumarry = (
        delta_table.history(1)
        .select(
            "operationMetrics.numTargetRowsInserted",
            "operationMetrics.numTargetRowsUpdated",
            "operationMetrics.numTargetRowsDeleted",
        )
        .collect()[0]
    )

    print(f"Linhas inseridas: {sumarry['numTargetRowsInserted']}")
    print(f"Linhas atualizadas: {sumarry['numTargetRowsUpdated']}")

In [0]:
from pyspark.sql import functions as F
import re

df = (
    spark.read.format("csv")
    .options(
        header=True,
        delimiter=",",
        inferSchema=True,
        encoding="latin1",
    )
    .load("/Volumes/raw-data/sistema_leitos/leitos/")
)

df = df.withColumnsRenamed({y: re.sub(r"\W+", "_", y).lower() for y in df.columns})

cols_to_hash = df.columns[20:]

df = df.withColumns(
    {
        "row_hash": F.sha2(
            F.concat_ws(
                "||",
                *[F.coalesce(F.col(c).cast("string"), F.lit("")) for c in cols_to_hash],
            ),
            256,
        ),
        "updated_at": F.current_timestamp(),
    }
)

on_condition = f"target.comp = source.comp and target.cnes = source.cnes"

upsert_delta_table(df, catalog, schema, table_name, on_condition, cols_to_hash)

# Como fazer a ingestão incremental em caso de CDC:
## 1 - Ler todos os arquivos do CDC
`spark.read.format("extensao").options(add_option).load("path_dos_arquivos")`
## 2 - Criar deduplicação baseado no id para pegar o arquivo mais recente
Ex: ordernar pela data de alteração e fazer um unique mantendo o primeiro registro
## 3 - Carregar a tabela delta atual
```
import delta

delta.DeltaTable.ForName(spark, "schema.nome_tabela")
```
## 4 - Fazer o UPSERT
merge da tabela atual com as novas informações designando o que fazer para cada tipo de operação
```
tab_atual.alias("ta")
.merge(tab_nova.alias("tn"), "ta.id = tn.id")
.whenMatchedDelete(condition = "tn.operation = 'D'")
.whenMatchedUpdateAll(condition = "tn.operation = 'U'")
.whenMatchedInsertAll(condition = "tn.operation = 'i'")
```