In [12]:
from pyspark.sql import SparkSession
from delta import *
from pyspark.sql.types import *
from pyspark.sql.functions import col

In [2]:
builder = (SparkSession.builder
            .appName("test")
            .enableHiveSupport()
            .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
            .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
            .config("spark.sql.legacy.parquet.nanosAsLong", "true")
            .config("spark.sql.parquet.enableVectorizedReader","false")
            )

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [None]:
catalog = 'bronze'
database = 'postgres'
table = 'financeiro.clientes'
file_format = 'parquet'
id_field = 'cliente_id'
timestamp_field = 'modificado_em'
# schema = StructType([
#     StructField("cliente_id", LongType(), True),
#     StructField("nome", StringType(), True),
#     StructField("cidade", StringType(), True),
#     StructField("modificado_em", StringType(), True)
# ])

table_formatted = table.replace(".", "_")
table_name = f"{catalog}.{database}_{table_formatted}"

In [59]:
def ingest_full_load():
    raw = f's3a://raw/{database}/full_load/{table}/'
    df = (spark.read
        .format(file_format)
        .load(raw)
        )
    
    df = df.withColumn(timestamp_field, col(timestamp_field).cast("timestamp"))
    
    (df.write
    .mode("overwrite")
    .format("delta")
    .saveAsTable(table_name))

In [62]:
ingest_full_load()

In [70]:
raw = f's3a://raw/{database}/cdc/{table}/'
df_cdc = (spark.read
              .format("parquet")
              .load(raw)
      )
df_cdc = df_cdc.withColumn(timestamp_field, col(timestamp_field).cast("timestamp"))

df_cdc.createOrReplaceGlobalTempView(f"view_{table_formatted}")
query = f'''
WITH ranked_data AS (
  SELECT *,
         ROW_NUMBER() OVER (PARTITION BY {id_field} ORDER BY {timestamp_field} DESC) AS rn
  FROM global_temp.view_{table_formatted}
)
SELECT *
FROM ranked_data
WHERE rn = 1;
'''
df_cdc = spark.sql(query)
df_cdc = df_cdc.drop("rn")


In [71]:
deltatable = DeltaTable.forName(spark,table_name)

In [72]:
(deltatable
    .alias("b")
    .merge(df_cdc.alias("d"), f"b.{id_field} = d.{id_field}")
    .whenMatchedDelete(condition = "d.operacao = 'DELETE'")
    .whenMatchedUpdateAll(condition = "d.operacao = 'UPDATE'")
    .whenNotMatchedInsertAll(condition = "d.operacao = 'INSERT' OR d.operacao = 'UPDATE'")
    .execute()
 )

In [73]:
spark.sql(f"select count(*) from {table_name}").show()

+--------+
|count(1)|
+--------+
|   14100|
+--------+

