In [0]:
from pyspark.sql.types import StructType
import delta

In [0]:
path_stream = '/Volumes/raw/tabnews/datalake/Ano_*/Mês_*/Dia_*/Minuto_*/Segundo_*/*.json'
path_batch = '/Volumes/raw/tabnews/datalake/Ano_2022/Mês_05/Dia_06/Minuto_15/Segundo_20/*.json'
path_schema = '/Volumes/raw/tabnews/datalake/Ano_2022/Mês_05/Dia_06/Minuto_15/Segundo_20/*.json'
path_checkpoint = '/Volumes/raw/tabnews/datalake/checkpoint'

In [0]:
query_transformacao_batch = """CREATE OR REPLACE TEMPORARY VIEW somente_ultima_atualizacao AS
SELECT * 
FROM Raw_TabNews_LoadFull
QUALIFY ROW_NUMBER() OVER (PARTITION BY id ORDER BY UPDATED_AT DESC) = 1"""
query_transformacao_stream = """ SELECT *, 
       ROW_NUMBER() OVER (PARTITION BY id ORDER BY UPDATED_AT DESC) as row_num 
FROM global_temp.table_temp"""
query_validador_tabela= """SHOW TABLES FROM bronze.tabnews"""

In [0]:
def definir_schema (path_schema=path_schema):
    return (spark.read
                 .option('multiline', 'true')
                 .json(path_schema)
                 .schema)


def executando_em_batch(path_batch=path_batch,query_transformacao_batch=query_transformacao_batch): 
    df = (spark.read.option('multiline', 'true')
                .schema(definir_schema())
                .json(path_batch))
    df.createOrReplaceTempView('Raw_TabNews_LoadFull')
    spark.sql(query_transformacao_batch)
    df_somente_ultima_atualizacao = spark.table("somente_ultima_atualizacao")
    df_somente_ultima_atualizacao.write.mode('overwrite') \
                                .format('delta') \
                                .option('overwriteSchema', 'true') \
                                .option('mergeSchema', 'true') \
                                .saveAsTable('bronze.tabnews.Bronze_TabNews_Stream')


def upsert(df, delta_table, query):
    df.createOrReplaceGlobalTempView('table_temp')
    df_transformacao = (spark.sql(query)
                             .filter("row_num = 1")
                             .drop("row_num"))
    (delta_table.alias("d")
                .merge(df_transformacao.alias("n"), 'd.id = n.id')
                .whenMatchedUpdateAll()
                .whenNotMatchedInsertAll()
                .execute())


def validador_existencia_tabela (query_validador_tabela=query_validador_tabela):
    validador_tabela_criada = (spark.sql(query_validador_tabela)
                                    .filter("database == 'tabnews'")
                                    .filter("tableName == 'bronze_tabnews_stream'")
                                    .count())
    return validador_tabela_criada == 1


def executando_streaming (path_stream=path_stream, path_checkpoint=path_checkpoint):
    delta_table = (delta.DeltaTable
                        .forName(spark, 'bronze.tabnews.Bronze_TabNews_Stream'))
    df_streaming = (spark.readStream.format("cloudFiles")
                                .option("cloudFiles.format", "json")
                                .option("cloudFiles.schemaLocation", path_checkpoint)
                                .option("multiLine", "true")
                                .schema(definir_schema())
                                .load(path_stream)
                                .writeStream
                                .foreachBatch(lambda df, batchID: upsert(df, delta_table, query_transformacao_stream))
                                .option("checkpointLocation", path_checkpoint)
                                .trigger(availableNow=True)
                                .start())

In [0]:
if __name__ == "__main__":
    if not validador_existencia_tabela():
        executando_em_batch()
        print('Criando a tabela Bronze_TabNews_Stream')
        executando_streaming()
    else:
        print('Tabela já existente, executando streaming')
        executando_streaming()        
