In [0]:
from pyspark.sql.functions import row_number, current_timestamp, col
from pyspark.sql.window import Window
from pyspark.sql import DataFrame
from delta import DeltaTable
import datetime

# Config do spark para não processar lotes que não contenham dados
spark.conf.set("spark.sql.streaming.noDataMicroBatches.enabled", "true")

# Config de evolução de schema na delta table
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")

In [0]:
class Silver_table:
    def __init__(self, source_table_name: str, target_table_name: str, create_table_columns: str, column_merge: dict, merge_condition: str,  delete_condition: str, update_condition: str, insert_condition: str):
        self.source_table_name = source_table_name
        self.target_table_name = target_table_name
        self.create_table_columns = create_table_columns
        self.column_merge = column_merge
        self.merge_condition = merge_condition
        self.delete_condition = delete_condition
        self.update_condition = update_condition
        self.insert_condition = insert_condition
    

    # Merge com operação apenas de DELETE (Ex: Delete de registros na tabela stage)
    def merge_delete(self, source_df: DataFrame, stage_table: dict) -> None:
        target_delta_df = DeltaTable.forName(spark, stage_table["stage_name"])
        
        (
            target_delta_df.alias("t")
                .merge(source_df.alias("s"), stage_table["merge_condition"])
                .whenMatchedDelete(self.delete_condition)
                .execute()
        )
    
    # Merge somente com DELETE e UPDATE (Ex: Tabealas satélites que possuem algum tipo de filtro que qua quando alterado, é necessário deletear o registro na silver)
    def merge_delete_update(self, target_delta_df: DeltaTable, source_df: DataFrame) -> None:
        (
            target_delta_df.alias("t")
                .merge(source_df.alias("s"), self.merge_condition)
                .whenMatchedDelete(self.delete_condition)
                .whenMatchedUpdate(
                    condition = self.update_condition,
                    set = self.column_merge)
                .execute()
        )

    # Merge somente update com a tabela de destino (ex: update registros descritivos na silver)
    def merge_update(self, target_delta_df: DeltaTable, source_df: DataFrame) -> None:
        (
            target_delta_df.alias("t")
                .merge(source_df.alias("s"), self.merge_condition)
                .whenMatchedUpdate(
                    condition = self.update_condition,
                    set = self.column_merge)
                .execute()
        )

    # Merge somente com Insert e Update com a tabela de destino (ex: adicionando registros na tabela stage)
    def merge_update_insert(self, target_delta_df: DeltaTable, source_df: DataFrame) -> None:
        (
            target_delta_df.alias("t")
                .merge(source_df.alias("s"), self.merge_condition)
                .whenMatchedUpdate(
                    condition = self.update_condition,
                    set = self.column_merge)
                .whenNotMatchedInsert(
                    condition = self.insert_condition,
                    values = self.column_merge)
                .execute()
        )

    # Merge com todas as operações: DELETE, UDATE e INSERT
    def merge_delete_update_insert(self, target_delta_df: DeltaTable, source_df: DataFrame) -> None:
        (
            target_delta_df.alias("t")
                .merge(source_df.alias("s"), self.merge_condition)
                .whenMatchedDelete(self.delete_condition)
                .whenMatchedUpdate(
                    condition = self.update_condition,
                    set = self.column_merge)
                .whenNotMatchedInsert(
                    condition = self.insert_condition,
                    values = self.column_merge)
                .execute()
        )
    
    # Merge sem correspondencia com todas as operações: DELETE, UDATE e INSERT (Ex: merge com silver e delete de registros não correspondentes com DataFrame source)
    def merge_unmatched_delete_update_insert(self, target_delta_df: DeltaTable, source_df: DataFrame) -> None:
        (
            target_delta_df.alias("t")
                .merge(source_df.alias("s"), self.merge_condition)
                .whenMatchedUpdate(
                    condition = self.update_condition,
                    set = self.column_merge)
                .whenNotMatchedInsert(
                    condition = self.insert_condition,
                    values = self.column_merge)
                .whenNotMatchedBySourceDelete()
                .execute()
        )

    # Criação de tabela
    def create_table(self) -> None:
        spark.sql(f"""
            CREATE TABLE {self.target_table_name} (
                {self.create_table_columns}
            )
            TBLPROPERTIES (
                delta.enableDeletionVectors = true,
                delta.enableChangeDataFeed = true, 
                delta.autoOptimize.autoCompact=true,
                delta.autoOptimize.optimizeWrite=true,
                delta.columnMapping.mode = 'name',
                delta.minReaderVersion = '3',
                delta.minWriterVersion = '7',
                delta.feature.allowColumnDefaults = 'supported'
            )
        """)

        # Set nome do owner da tabela para o grupo UnityAdmin
        spark.sql(f"ALTER TABLE {self.target_table_name} SET OWNER TO UnityAdmin")
    
    # Tratamento de erro
    def handling_exceptions(self, error) -> None:
        type_error = str(error)
            
        if 'is not a Delta table' in type_error:
            self.create_table()
        else:
            assert False, type_error


In [0]:
class Stream(Silver_table):
    def __init__(self, source_table_name: str, target_table_name: str, create_table_columns: str, column_merge: dict, merge_condition: str,  delete_condition: str, update_condition: str, insert_condition: str, table_id: str, checkpoint_location: str, query_name: str, merge_type: str, tvw_table_changes: str = None, join_query: str = None,  stage_table: dict = None, stream_filter = None, window_orderby: str = "_commit_timestamp"):
        super().__init__(source_table_name, target_table_name, create_table_columns, column_merge, merge_condition, delete_condition, update_condition, insert_condition)
        self.table_id = table_id
        self.checkpoint_location = checkpoint_location
        self.query_name = query_name
        self.merge_type = merge_type
        self.tvw_table_changes = tvw_table_changes
        self.join_query = join_query
        self.stage_table = stage_table
        self.stream_filter = stream_filter
        self.window_orderby = window_orderby
        

    # Transformações e Merge Streaming Table
    def merge_changes(self, batch_df, batch_id):
        # Filtro para registros mais recentes
        window = Window.partitionBy(self.table_id).orderBy(col(self.window_orderby).desc())
        batch_df = batch_df.withColumn("rank", row_number().over(window)) \
                        .withColumn("_silverUpdateDate", current_timestamp()) \
                        .withColumnRenamed("_change_type", "_operation") \
                        .where("rank = 1") \
                        .drop("_commit_timestamp", "rank")
        

        # Caso seja uma tabela que precise de join
        if self.join_query is not None:
            assert self.tvw_table_changes is not None, f'{datetime.datetime.now()} - WARNING - The variable "tvw_table_changes: string" cannot be empty'

            batch_df.createOrReplaceTempView(self.tvw_table_changes)
            batch_df = batch_df.sparkSession.sql(self.join_query)

        
        # Tente criar um datafrme delta da tabela de destino. Caso não consiga, crie a tabela e e crie o dataframe delta
        try:
            target_delta_df = DeltaTable.forName(spark, self.target_table_name)
        except Exception as error:
            assert self.create_table_columns is not None, f'{datetime.datetime.now()} - WARNING - The target table does not exist! Set the variable "create_table_columns: str"'
            self.handling_exceptions(error)
            
            # Cria um datafrme delta da tabela de destino recem criada
            target_delta_df = DeltaTable.forName(spark, self.target_table_name)
        

        # Merge com a tabela de destino
        # satellite_table:
        if self.merge_type == "update":
            self.merge_update(target_delta_df, batch_df)
        
        # dependent_table
        elif self.merge_type == "delete_update":
            self.merge_delete_update(target_delta_df, batch_df)

            # Delete registros da stage
            if self.stage_table:                
                self.merge_delete(batch_df, self.stage_table)
        
        # update_stage
        elif self.merge_type == "update_insert":
            self.merge_update_insert(target_delta_df, batch_df)
        
        elif self.merge_type == "delete_update_insert":
            self.merge_delete_update_insert(target_delta_df, batch_df)
        
        # Caso não haja correspondência
        else:
            assert False, 'The "merge_type" variable (merge_type: str) was not filled in correctly. \nCheck if it is among the following options: \n"update"               -> to update only \n"delete_update"        -> to delete and update \n"update_insert"        -> to update and insert \n"delete_update_insert" -> to delete, update and insert'
    
    # Streaming Change Data Feed 
    def streaming_table_changes(self):
        print(f'{datetime.datetime.now()} - INFO - Start Streaming Table Changes: {self.source_table_name}')

        (
            spark.readStream
                .option("readChangeData", True)
                .table(self.source_table_name)
                .filter(self.stream_filter)
                .drop("_operation")
            .writeStream
                .foreachBatch(self.merge_changes)
                .option("checkpointLocation", f"{self.checkpoint_location}")
                .queryName(self.query_name)
                .trigger(availableNow = True)
                .outputMode("update")
            .start()
            .awaitTermination()
        )

        print(f'{datetime.datetime.now()} - INFO - End Streaming Table Changes: {self.source_table_name}')
    

In [0]:
class Join_table(Silver_table):
    def __init__(self, source_table_name: str, target_table_name: str, create_table_columns: str, column_merge: dict, merge_condition: str, delete_condition: str, update_condition: str, insert_condition: str, stage_table: dict, join_query: str):
        super().__init__(source_table_name, target_table_name, create_table_columns, column_merge, merge_condition,  delete_condition, update_condition, insert_condition)
        self.stage_table = stage_table
        self.join_query = join_query


    # Função para criar um DataFrame baseado na coluna "flag" com registros a serem deletados
    def delete_from_stage_with_flag(self, df: DataFrame) -> None:
        # DataFrame somente com registros que serão deletados da stage, fazendo deduplicidade. 
        tvw_stage_deletes = 'tvw_stage_deletes_' + self.target_table_name[self.target_table_name.rfind(".") + 1]
        df.createOrReplaceTempView(tvw_stage_deletes)  
        
        df = spark.sql(f"""
                SELECT DISTINCT {self.stage_table["pk"]}, _operation
                FROM {tvw_stage_deletes}
                WHERE flag = true OR (flag = false AND _operation = 'delete')
            """)

        self.merge_delete(df, self.stage_table)

    # Função de enriquecimento da tabela destino a partir de um join com uma tabela stage
    def enrich_join_with_stage(self) -> None:

        # Join com stage para enriquecer a tabela de destino
        join_df = spark.sql(self.join_query)
        
        # Tente criar um datafrme delta da tabela de destino. Caso não consiga, crie a tabela e e crie o dataframe delta
        try:
            target_delta_df = DeltaTable.forName(spark, self.target_table_name)
        except Exception as error:
            assert self.create_table_columns is not None, f'{datetime.datetime.now()} - WARNING - The target table does not exist! Set the variable "create_table_columns: str"'
            self.handling_exceptions(error)
            
            target_delta_df = DeltaTable.forName(spark, self.target_table_name)


        self.merge_delete_update_insert(target_delta_df, join_df)
        self.delete_from_stage_with_flag(join_df)

    # Função de enriquecimento da tabela destino a partir de um join com uma tabela stage com registros unificados
    def enrich_join_with_stage_unified(self) -> None:
        
        # Join com stage para enriquecer a tabela de destino
        join_df = spark.sql(self.join_query)

        # Tente criar um datafrme delta da tabela de destino. Caso não consiga, crie a tabela e e crie o dataframe delta
        try:
            target_delta_df = DeltaTable.forName(spark, self.target_table_name)
        except Exception as error:
            assert self.create_table_columns is not None, f'{datetime.datetime.now()} - WARNING - The target table does not exist! Set the variable "create_table_columns: str"'
            self.handling_exceptions(error)
            
            target_delta_df = DeltaTable.forName(spark, self.target_table_name)

        self.merge_delete_update_insert(target_delta_df, join_df)
        self.merge_delete(join_df, self.stage_table)

