In [0]:
from datetime import datetime
from delta.tables import DeltaTable
from pyspark.sql import Column, DataFrame, Row, SparkSession
from pyspark.sql.window import Window
from typing import Callable, Dict, List, Optional, Tuple

import pyspark.sql.functions as f
import pyspark.sql.types as t



def mt_get_most_recent_data(data_frame: DataFrame, key_columns: List[str]) -> DataFrame:
    df = data_frame.where(        
        (data_frame["catulz_reg"] != f.lit("D"))
    ).withColumn(
        "row_nmbr",
        f.row_number().over(
            Window.partitionBy(
                *key_columns
            ).orderBy(
                data_frame["digtao_ptcao"].desc()
            )
        )
    ).where(
        f.col("row_nmbr") == f.lit(1)
    )

    return df.drop("row_nmbr")

def mt_get_spark_session() -> SparkSession:
    return (
        SparkSession
        .builder
        .config("db_tgt", "default")
        .config("db_stg", "default")
        .enableHiveSupport()
        .getOrCreate()
    )

def mt_log(message: str) -> None:
    print(f"[{datetime.now()}] - {message}")

def mt_table_exists(spark_session: SparkSession, table_path: str) -> bool:
    return spark_session.catalog.tableExists(table_path)


In [0]:
def print_divider(length: int = 40, extra_line_break: bool = True) -> None:
    print("-"*length)
    (print("\n") if extra_line_break else None)

In [0]:
mt_log("Início")

spark_session = mt_get_spark_session()

mt_log("Fim")

[2024-09-22 22:19:02.250175] - Início
[2024-09-22 22:19:02.387118] - Fim


In [0]:
def get_or_create_ctrl_table(spark_session: SparkSession) -> None:
    db_stg: str = spark_session.conf.get("db_stg")
    ctrl_table_path: str = f"{db_stg}.tbleto_emtd_ctrl"

    if (not mt_table_exists(spark_session, ctrl_table_path)):
        spark_session.createDataFrame(
            [],
            schema = t.StructType([
                t.StructField("caminho_tabela", t.StringType(), False),
                t.StructField("digtao_ptcao", t.StringType(), False)
            ])
        ).write.partitionBy("caminho_tabela").saveAsTable(ctrl_table_path)

    return spark_session.table(ctrl_table_path)

def map_boletos_mistos(data_frame: DataFrame) -> DataFrame:
    return data_frame.withColumn(
        "cidtfd_bleto",
        f.concat_ws(
            "-",
            data_frame["pk_1"],
            data_frame["pk_2"],
            data_frame["pk_3"],
            data_frame["pk_4"],
            data_frame["pk_5"]
        )
    ).withColumnRenamed("digtao_ptcao", "digtao_ptcao_boletos_mistos")
    
def map_titulos_pend(data_frame: DataFrame) -> DataFrame:
    return data_frame.withColumn(
        "cidtfd_bleto",
        f.concat_ws(
            "-",
            data_frame["pk_1"],
            data_frame["pk_2"],
            data_frame["pk_3"],
            data_frame["pk_4"],
            data_frame["pk_5"]
        )
    ).withColumnRenamed("digtao_ptcao", "digtao_ptcao_titulos_pend")

def merge_tables(spark_session: SparkSession, source_table: DataFrame) -> None:
    delta_bleto_emtd: DeltaTable = DeltaTable.forName(spark_session, "default.tbleto_emtd")
    
    insert_map: Dict[str, str] = { }
    update_map: Dict[str, str] = { }

    for bleto_emtd_column in delta_bleto_emtd.toDF().columns:
        map_value: str = (
            f"T1.{bleto_emtd_column}"
            if (bleto_emtd_column in source_table.columns) else
            "NULL"
        )
        insert_map[bleto_emtd_column] = map_value

        if (map_value != "NULL"):
            update_map[bleto_emtd_column] = map_value

    (
        delta_bleto_emtd.alias("T0")
        .merge(
            source_table.alias("T1"),
            "T0.cidtfd_bleto == T1.cidtfd_bleto"
        )
        .whenMatchedUpdate(set = update_map)
        .whenNotMatchedInsert(values = insert_map)
        .execute()
    )

def run(spark_session: SparkSession, partition_limit: int = 365) -> None:
    ctrl_table: DataFrame = get_or_create_ctrl_table(spark_session)
    db_stg: str = spark_session.conf.get("db_stg")
    db_tgt: str = spark_session.conf.get("db_tgt")
    tbleto_emtd_path: str = "default.tbleto_emtd"

    if (not mt_table_exists(spark_session, tbleto_emtd_path)):
        mt_log(f"A tabela {tbleto_emtd_path} não existe. Contate o suporte para realizar a criação dela antes de prosseguir.")
        return
    
    for table_path in ["default.titulos_pend_hive", "default.titulos_pend", "default.boletos_mistos_hive", "default.boletos_mistos"]:
        mt_log(f"Realizando merge com a tabela {table_path}")
        if (not mt_table_exists(spark_session, table_path)):
            mt_log("A tabela não existe. Avançando para a próxima iteração.")
            continue

        ctrl_table_table_partitions: DataFrame = ctrl_table.where(ctrl_table["caminho_tabela"] == f.lit(table_path))
        source_table_partitions: DataFrame = show_partitions(spark_session, table_path)
        partitions_to_merge: DataFrame = source_table_partitions.join(
            ctrl_table_table_partitions, (
                source_table_partitions["digtao_ptcao"] == ctrl_table_table_partitions["digtao_ptcao"]
            ),
            how = "leftanti"
        ).orderBy(
            source_table_partitions["digtao_ptcao"]
        ).limit(partition_limit)

        if (partitions_to_merge.count() == 0):
            mt_log("Nenhuma partição a ser sincronizada. Avançando para a próxima iteração.")
            continue

        source_table: DataFrame = spark_session.table(table_path)
        source_table = mt_get_most_recent_data(
            source_table.join(
                partitions_to_merge,
                on = "digtao_ptcao",
                how = "inner"
            ),
            ["pk_1", "pk_2", "pk_3", "pk_4", "pk_5"]
        )        
        merge_tables(
            spark_session,
            mappers[table_path](source_table)
        )

        mt_log("Salvando dados na tabela de controle.")
        partitions_to_merge.select(
            f.lit(table_path).alias("caminho_tabela"),
            partitions_to_merge["digtao_ptcao"]
        ).write.partitionBy("caminho_tabela").mode("append").saveAsTable(f"{db_stg}.tbleto_emtd_ctrl")
        mt_log("Dados salvos com sucesso.")

mappers: Dict[str, Callable] = {
    "default.boletos_mistos": map_boletos_mistos,
    "default.boletos_mistos_hive": map_boletos_mistos,
    "default.titulos_pend": map_titulos_pend,
    "default.titulos_pend_hive": map_titulos_pend
}

In [0]:
def show_partitions(spark_session: SparkSession, table_path: str) -> DataFrame:
    table_partitions: List[Row] = spark_session.sql(f"show partitions {table_path}").collect()
    df: DataFrame = None
    if (len(table_partitions) > 0):
        columns: List[str] = []
        partitions: List[Tuple] = []
        for partition_row in table_partitions:
            current_partition: Tuple = None

            for partition_pair in partition_row["partition"].split("/"):
                column, value = partition_pair.split("=")
                current_partition = (
                    (value, )
                    if (current_partition is None) else
                    current_partition + (value, )
                )

                if (column not in columns):
                    columns.append(column)
                
            partitions.append(current_partition)

        df = spark_session.createDataFrame(
            partitions,
            schema = columns
        )
    else:
        partition_columns: List[Column] = list(map(
            lambda y:
            t.StructField(y.name, t.StringType(), True),
            filter(
                lambda x:
                x.isPartition,
                spark_session.catalog.listColumns(table_path)
            )
        ))
        df = spark_session.createDataFrame(
            [],
            schema = t.StructType(partition_columns)
        )

    return df

In [0]:
run(spark_session)

[2024-09-22 23:45:21.196000] - Realizando merge com a tabela default.titulos_pend_hive
[2024-09-22 23:45:21.207594] - A tabela não existe. Avançando para a próxima iteração.
[2024-09-22 23:45:21.207627] - Realizando merge com a tabela default.titulos_pend
[2024-09-22 23:45:52.343177] - Salvando dados na tabela de controle.
[2024-09-22 23:45:56.568273] - Dados salvos com sucesso.
[2024-09-22 23:45:56.568376] - Realizando merge com a tabela default.boletos_mistos_hive
[2024-09-22 23:45:56.589051] - A tabela não existe. Avançando para a próxima iteração.
[2024-09-22 23:45:56.589092] - Realizando merge com a tabela default.boletos_mistos
[2024-09-22 23:46:21.297066] - Salvando dados na tabela de controle.
[2024-09-22 23:46:24.767732] - Dados salvos com sucesso.


In [0]:
display(spark_session.table("default.tbleto_emtd"))

cidtfd_bleto,pk_1,pk_2,pk_3,pk_4,pk_5,value,expire_date,qr_code,digtao_ptcao_boletos_mistos,digtao_ptcao_titulos_pend
-1758106074,237,3860,2053,2861,1798,561.85,2024-03-06,f93c910a-777f-4af2-af69-01f427224ac7,2024-03-02,2024-03-07
2113860051,237,6692,9628,4274,6671,7573.47,2024-03-06,,,2024-03-07
456478235,237,2239,3129,7138,3492,7071.39,2023-11-21,b0d05fa4-3e0a-448b-9d6d-3f6553aa16a6,2023-10-21,2023-11-22
-252113174,237,6745,3683,6310,5484,4079.39,2023-11-21,,,2023-11-22
-2133693855,237,2928,2151,2813,4724,2055.94,2024-01-21,15725912-0684-45fa-b98b-d036e71051d1,2024-04-27,2024-01-22
-1684286119,237,7534,3525,6307,4283,9545.92,2024-05-07,0bc7e4c7-8bde-4296-90b9-e045557cd87a,2023-12-14,2024-05-08
-1306641111,237,3934,9038,4562,7813,5349.17,2024-06-18,deaab73a-c089-4bbf-9447-852a6a2f50c4,2023-10-31,2024-06-19
-1914687904,237,7726,1501,3210,4223,6023.98,2024-05-30,8062e45e-3241-4b4f-90d4-aa699f176cb9,2024-05-26,2024-05-31
-1448806573,237,6692,1205,9504,1641,5917.79,2023-11-19,338153b9-9da3-4e32-8f08-d81b437edb93,2024-07-20,2023-11-20
-1051961637,237,2403,2662,9154,2787,3371.66,2024-08-03,525e3d2b-a86c-4d6c-b6a9-d66d0bfc2f06,2024-07-24,2024-08-04


In [0]:
display(spark_session.table("default.tbleto_emtd_ctrl"))

caminho_tabela,digtao_ptcao
default.titulos_pend,2023-09-24
default.titulos_pend,2023-09-27
default.titulos_pend,2023-09-30
default.titulos_pend,2023-10-01
default.titulos_pend,2023-10-04
default.titulos_pend,2023-10-06
default.titulos_pend,2023-10-12
default.titulos_pend,2023-10-13
default.titulos_pend,2023-10-17
default.titulos_pend,2023-10-20


In [0]:
mt_log("Início")

spark_session.createDataFrame(
    [],
    schema = t.StructType([
        t.StructField("cidtfd_bleto", t.StringType(), False),
        t.StructField("pk_1", t.IntegerType(), False),
        t.StructField("pk_2", t.IntegerType(), False),
        t.StructField("pk_3", t.IntegerType(), False),
        t.StructField("pk_4", t.IntegerType(), False),
        t.StructField("pk_5", t.IntegerType(), False),
        t.StructField("value", t.FloatType(), False),
        t.StructField("expire_date", t.DateType(), False),
        t.StructField("qr_code", t.StringType(), False),
        t.StructField("digtao_ptcao_boletos_mistos", t.StringType(), False),
        t.StructField("digtao_ptcao_titulos_pend", t.StringType(), False)
    ])
).write.partitionBy("expire_date").saveAsTable("tbleto_emtd")

mt_log("Fim")

[2024-09-22 23:37:42.874794] - Início
[2024-09-22 23:37:47.860957] - Fim


In [0]:
spark_session.sql("drop table default.tbleto_emtd")

DataFrame[]

In [0]:
%fs rm -r dbfs:/user/hive/warehouse/tbleto_emtd_ctrl