##### Disclaimer 

A estratégia da carga incremental segue o modelo exposto no notebook **1 - delta_lake_poc**, onde se tem um landing zone que captura dados continuamente de fontes externas, uma **camada bronze** ("as is") como sendo a primeira camada que armazena todos os dados contidos na landing zone com adição de uma coluna "flag" de inserção. Nesta camada, temos um histórico bruto de todas as ingestões e conseguimos traçar uma linhagem dos dados por meio de análise de dados, se preciso. Avançando, temos a **camada silver** que traz um overview ou snapshot da última ingestão de dados, ou seja, é uma camada de preparo para posterior carga na camada gold. Nesta camada, aplicamos transformações, tipagem de campos e regras de negócio (caso necessário). Por fim, temos a **camada gold**, camada alta e refinada que traz o produto de dados pronto, tabelas a nível de negócio que trazem respostas confiáveis para o cliente.

PS:

1- A depender da necessidade do cliente, pode-se aplicar algum dos 6 tipos de Slowly Changing Dimensions (SCD), contudo, para fins didáticos, deixaremos de lado este recurso.

2- A depender da fonte de dados externa, podemos implementar o método de Streaming DataFrame do Apache Spark. Por meio de leituras continuas e com ou sem intervalos programados, é possível ir fazendo o incremento automático da fonte de dados de origem para destino. Por exemplo, semmpre que surgir um dado novo na landing zone, o fluxo de carga (bronze, silver e gold) é iniciado. Pode-se também utilizar Apache Kafka como conector externo. Aliada a estratégia de carga incremental, seja batch ou streaming, pode-se utilizar Delta Live Tables (DLT).   

Refs. 

https://www.linkedin.com/pulse/data-load-strategies-full-vs-incremental-janardhan-reddy-kasireddy/

https://mesum.medium.com/data-warehousing-historical-load-full-load-incremental-load-cb46f0d0c4f5

https://medium.com/@sharankarthick487/incremental-data-load-approaches-overview-11c90fd87fe6

In [0]:
import pandas
import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType

spark = SparkSession.builder.appName("data-ingestion").getOrCreate()

# paths
pathStage = "dbfs:/FileStore/tables/rpe/stage"
pathBronze = "dbfs:/FileStore/tables/rpe/bronze"
pathSilver = "dbfs:/FileStore/tables/rpe/silver"
pathGold = "dbfs:/FileStore/tables/rpe/gold"


## External data source
customers = (
    spark.createDataFrame(
        pandas.read_csv("https://raw.githubusercontent.com/gabrielnascimentost/databricks_rpe/main/database/raw/customer.csv")
    )
)

transactions =  (
    spark.createDataFrame(
        pandas.read_csv("https://raw.githubusercontent.com/gabrielnascimentost/databricks_rpe/main/database/raw/transaction.csv")
    )
)

customers.write.format("csv").mode("overwrite").save(f"{pathStage}/customers/{datetime.datetime.now().year}/{datetime.datetime.now().month}/{datetime.datetime.now().day}/customers.csv")

transactions.write.format("csv").mode("overwrite").save(f"{pathStage}/transactions/{datetime.datetime.now().year}/{datetime.datetime.now().month}/{datetime.datetime.now().day}/transactions.csv")

## define schema
customers_schema = (
    StructType([
		StructField("customer_id", StringType(), True),
		StructField("name", StringType(), True),
		StructField("email", StringType(), True),
		StructField("signup_date", StringType(), True)
    ])
)

transactions_schema = (
    StructType([
		StructField("transaction_id", StringType(), True),
		StructField("customer_id", StringType(), True),
		StructField("amount", StringType(), True),
		StructField("currency", StringType(), True),
        StructField("transaction_date", StringType(), True)
    ])
)

## copy "as is" - landing para bronze
customersBronze = (
    spark
    .read
    .format("csv")
    .schema(customers_schema)
    .load(f"{pathStage}/customers/{datetime.datetime.now().year}/{datetime.datetime.now().month}/{datetime.datetime.now().day}/customers.csv")
    .withColumn("dt_loading_stage", current_timestamp())
)

customersBronze.write.format("delta").mode("overwrite").save(f"{pathBronze}/customers")

transactionsBronze = (
    spark
    .read
    .format("csv")
    .schema(transactions_schema)
    .load(f"{pathStage}/transactions/{datetime.datetime.now().year}/{datetime.datetime.now().month}/{datetime.datetime.now().day}/transactions.csv")
    .withColumn("dt_loading_stage", current_timestamp())
)

transactionsBronze.write.format("delta").mode("overwrite").save(f"{pathBronze}/transactions")

## copy - bronze para silver filtrando somente o registro mais atual

customersSilver = (
    spark.sql(f'''
       SELECT
            CAST(customer_id AS INTEGER) AS customer_id,
            CAST(name AS STRING) name,
            CAST(email AS STRING) AS email,
            CAST(signup_date AS DATE) AS signup_date,
            CAST(dt_loading_stage AS TIMESTAMP) AS dt_insert_data
       FROM
          (
            SELECT 
                DENSE_RANK() OVER(ORDER BY dt_loading_stage DESC) AS rank, * 
            FROM delta.`{pathBronze}/customers`
          ) AS T
       WHERE
            T.rank = 1
    ''')
)

customersSilver.write.format("delta").mode("overwrite").option("mergeSchema", "true").save(f"{pathSilver}/customers")

transactionsSilver = (
    spark.sql(f'''
     SELECT
            CAST(transaction_id AS INTEGER) AS transaction_id,
            CAST(customer_id AS INTEGER) customer_id,
            CAST(amount AS FLOAT) AS amount,
            CAST(currency AS STRING) AS currency,
            CAST(transaction_date as DATE) as transaction_date,
            CAST(dt_loading_stage AS TIMESTAMP) AS dt_insert_data
       FROM
          (
            SELECT 
                DENSE_RANK() OVER(ORDER BY dt_loading_stage DESC) AS rank, * 
            FROM delta.`{pathBronze}/transactions`
          ) AS T
       WHERE
            T.rank = 1
''')
)

transactionsSilver.write.format("delta").mode("overwrite").option("mergeSchema", "true").save(f"{pathSilver}/transactions")


## Copy - silver para gold fazendo upsert (insert e update)

spark.sql(f'''
       MERGE INTO delta.`{pathGold}/dim_customer` dest
        USING (
             select 
                c.customer_id as customer_id,
                c.name,
                c.email,
                c.signup_date
                from delta.`{pathSilver}/customers` c
        ) orig
        ON orig.customer_id = dest.customer_id
        WHEN MATCHED THEN
        UPDATE SET
            dest.customer_id = orig.customer_id,
            dest.name        = orig.name,
            dest.email       = orig.email,
            dest.signup_date = orig.signup_date
        WHEN NOT MATCHED
        THEN INSERT (
            customer_id,
            name,
            email,
            signup_date
        )
        VALUES (
            orig.customer_id,
            orig.name,
            orig.email,
            orig.signup_date
        )         
''')

spark.sql(f'''   
        insert into delta.`{pathGold}/fat_sales`
        (
            customer_sk,
            transaction_id,
            amount_sale,
            transaction_date
        )
        select 
            dc.customer_sk,
            t.transaction_id,
            case t.currency
            when 'USD' then concat('$ ', cast(t.amount as string))
            when 'BRL' then concat('R$ ', cast(t.amount as string))
            when 'EUR' then concat('Ç', cast(t.amount as string))
            else 'NA'
            end as amount_sale,
            t.transaction_date
        from delta.`{pathSilver}/transactions` t 
        join delta.`{pathSilver}/customers` c on c.customer_id = t.customer_id
        join delta.`{pathGold}/dim_customer` dc on dc.customer_id = c.customer_id
''')



Out[1]: DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]