In [0]:
# import delta
# def table_exists(catalog, database, table):
#     count = (
#         spark.sql(f"SHOW TABLES FROM {catalog}.{database}")
#         .filter(f"database = '{database}' AND tableName = '{table}'")
#         .count()
#     )
#     return count == 1

In [0]:
# Outra abordagem para a função table_exists utilizando SHOW TABLES IN
# import delta
# def table_exists(catalog, database, table):
#     df_tables = spark.sql(f"SHOW TABLES IN {catalog}.{database}")
#     # Usando aspas no filtro para evitar erro de parsing
#     count = (
#         df_tables
#         .filter(f"database = '{database}' AND tableName = '{table}'")
#         .count()
#     )
#     return count > 0

# Vantagem: não precisa tentar acessar a tabela; funciona rápido se o catálogo já está populado.
# Observação: se o database na listagem não vier preenchido (isso acontece dependendo da versão/configuração), o filtro pode falhar.


In [0]:
import delta
import sys

sys.path.insert(0, '../lib')

import utils
from pyspark.sql.functions import col, to_timestamp

In [0]:

catalog = 'bronze'
schemaname = 'upsell'
# tablename = dbutils.widgets.get('tablename')
# id_field = dbutils.widgets.get('id_field')
# timestamp_field = dbutils.widgets.get('timestamp_field')
tablename = 'transactions'
id_field = 'IdTransacao'
timestamp_field = 'DtCriacao'

checkpoint_location = f"/Volumes/raw/{schemaname}/cdc/{tablename}_checkpoints"

In [0]:
class ingestor:

    def __init__(self, catalog, schemaname, tablename, data_format):
        self.catalog = catalog
        self.schemaname = schemaname
        self.tablename = tablename
        self.format = data_format
        self.set_schema()

    def set_schema(self):
        self.data_schema = utils.import_schema(self.tablename)
    
    def load(self, path):
        df = spark.read\
                .format(self.format)\
                .schema(self.data_schema)\
                .load(path)
        # Converte a coluna de timestamp dinamicamente
        df = df.withColumn(self.timestamp_field, to_timestamp(col(self.timestamp_field), "yyyy-MM-dd HH:mm:ss"))
        
        return df

    def save(self, df):
        (df.coalesce(1)
            .write
            .format("delta")
            .mode("overwrite")
            .saveAsTable(f"{self.catalog}.{self.schemaname}.{self.tablename}"))
        return True
        
    def execute(self, path):
        df = self.load(path)
        return self.save(df)

In [0]:
class ingestorCDC(ingestor):

    def __init__(self, catalog, schemaname, tablename, data_format, id_field, timestamp_field):
        super().__init__(catalog, schemaname, tablename, data_format)
        self.id_field = id_field
        self.timestamp_field = timestamp_field
        self.set_deltatable()
        
    def set_deltatable(self):
        tablename = f"{self.catalog}.{self.schemaname}.{self.tablename}"
        self.deltatable = delta.DeltaTable.forName(spark, tablename)

    def upsert(self, df, BatchId=None):
        df.createOrReplaceTempView("cdc_temp_view")

        table_full_name = f"{self.catalog}.{self.schemaname}.{self.tablename}"
        merge_sql = f"""
        MERGE INTO {table_full_name} AS b
        USING (
            SELECT * FROM cdc_temp_view
            QUALIFY row_number() OVER (PARTITION BY {self.id_field} ORDER BY {self.timestamp_field} DESC) = 1
        ) AS d
        ON b.{self.id_field} = d.{self.id_field}
        WHEN MATCHED AND d.op = 'D' THEN DELETE
        WHEN MATCHED AND d.op = 'U' THEN UPDATE SET *
        WHEN NOT MATCHED AND (d.op = 'I' OR d.op = 'U') THEN INSERT *
        """

        spark.sql(merge_sql)

    def load(self, path):
        df = spark.readStream \
                    .format("cloudFiles") \
                    .option("cloudFiles.format", self.format) \
                    .schema(self.data_schema) \
                    .load(path)
        return df
    
    def save(self, df):
        stream = df.writeStream \
                    .option("checkpointLocation", f"/Volumes/raw/{self.schemaname}/cdc/{self.tablename}_checkpoints") \
                    .foreachBatch(lambda df, _: self.upsert(df)) \
                    .trigger(availableNow=True)
        return stream.start()


In [0]:
if not utils.table_exists(spark, catalog, schemaname, tablename):

    print('Tabela não existe, criando...')

    dbutils.fs.rm(checkpoint_location, True)
    
    ingest_full_load = ingestor(catalog=catalog,
                                schemaname=schemaname, 
                                tablename=tablename, 
                                data_format='parquet')
    ingest_full_load.execute(f"/Volumes/raw/{schemaname}/full_load/{tablename}/")

    print('Tabela criada com sucesso!')

else:
    print('Tabela já existe, ignorando full-load')

In [0]:
ingest_cdc = ingestorCDC(catalog=catalog,
                         schemaname=schemaname, 
                         tablename=tablename, 
                         data_format='parquet',
                         id_field=id_field,
                         timestamp_field=timestamp_field)

stream = ingest_cdc.execute(f"/Volumes/raw/{schemaname}/cdc/{tablename}/")

In [0]:
%sql
DESCRIBE bronze.upsell.transactions




In [0]:
%sql

select * from bronze.upsell.customers
order by DtAtualizacao desc

In [0]:
# from pyspark.sql.window import Window
# from pyspark.sql import functions as F

# bronze = delta.DeltaTable.forName(spark, f'{catalog}.{schema}.{tablename}')

# def upsert(df, table_name):
#     df.createOrReplaceTempView("cdc_temp_view")

#     merge_sql = f"""
#     MERGE INTO {table_name} AS b
#     USING (
#         SELECT * FROM cdc_temp_view
#         QUALIFY row_number() OVER (PARTITION BY {id_field} ORDER BY {timestamp_field} DESC) = 1
#     ) AS d
#     ON b.{id_field} = d.{id_field}
#     WHEN MATCHED AND d.op = 'D' THEN DELETE
#     WHEN MATCHED AND d.op = 'U' THEN UPDATE SET *
#     WHEN NOT MATCHED AND (d.op = 'I' OR d.op = 'U') THEN INSERT *
#     """

#     spark.sql(merge_sql)


# df_stream = spark.readStream \
#       .format("cloudFiles") \
#       .option("cloudFiles.format", "parquet") \
#       .schema(df_schema) \
#       .load(f"/Volumes/raw/data/cdc/{tablename}/")

# stream = df_stream.writeStream \
#       .option("checkpointLocation", f"/Volumes/raw/data/cdc/{tablename}_checkpoints") \
#       .foreachBatch(lambda df, BatchId: upsert(df, f'{catalog}.{schema}.{tablename}')) \
#       .trigger(availableNow=True)

In [0]:
# Opção sugerida pelo Assistant do Databricks:
# You need to specify the cloudFiles.schemaLocation option to enable schema inference and evolution when using Auto Loader. This option points to a directory where Databricks will store schema information for your stream. Here is the fixed code:

# df_stream = (
#     spark.readStream
#     .format("cloudFiles")
#     .option("cloudFiles.format", "parquet")
#     .option("cloudFiles.schemaLocation", f"/Volumes/raw/data/cdc/{tablename}/_schemas")
#     .load(f"/Volumes/raw/data/cdc/{tablename}/")
# )

# This code adds the required schemaLocation option. The directory specified will track schema changes over time for your streaming source.

In [0]:
start = stream.start()

In [0]:
start.stop()