In [2]:
!pip install faker
!pip install psycopg2-binary

import os
import requests
import json
import random
import psycopg2
from psycopg2 import sql
from faker import Faker

## DEFINE SENSITIVE VARIABLES
WAREHOUSE = os.environ.get("WAREHOUSE") 
AWS_ACCESS_KEY = os.environ.get("AWS_ACCESS_KEY_ID") 
AWS_SECRET_KEY = os.environ.get("AWS_SECRET_ACCESS_KEY") 
AWS_S3_ENDPOINT = os.environ.get("AWS_S3_ENDPOINT") ## MINIO ENDPOINT
PG_USER = os.environ.get("PG_USER")
PG_PASSWORD = os.environ.get("PG_PASSWORD")
PG_DB = os.environ.get("PG_DB")


def cria_conectores():
    postgres = {
        "config": {
            "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
            "database.dbname": PG_DB,
            "database.hostname": "postgres",
            "database.password": PG_PASSWORD,
            "database.port": "5432",
            "database.server.name": "postgres",
            "database.user": PG_USER,
            "delete.handling.mode": "rewrite",
            "plugin.name": "pgoutput",
            "table.include.list": "public.condominios, public.imoveis,public.moradores,public.transacoes",
            "table.whitelist": "public.condominios, public.imoveis,public.moradores,public.transacoes",
            "topic.prefix": "postgres"
        },
        "name": "postgres-source-connector"
    }
    requests.post('http://connect:8083/connectors', json=postgres)
    
    minio = {
        "name": "minio-sink-connector",
        "config": {
            "connector.class": "io.aiven.kafka.connect.s3.AivenKafkaConnectS3SinkConnector",
            "aws.access.key.id": AWS_ACCESS_KEY,
            "aws.secret.access.key": AWS_SECRET_KEY,
            "aws.s3.bucket.name": "condomanage",
            "aws.s3.endpoint": AWS_S3_ENDPOINT,
            "aws.s3.region": "us-east-1",
            "format.output.type": "parquet",
            "topics": "postgres.public.condominios, postgres.public.imoveis, postgres.public.moradores, postgres.public.transacoes",
            "file.compression.type": "none",
            "flush.size": "20",
            "file.name.template": "raw/cdc/{{topic}}/{{timestamp:unit=yyyy}}{{timestamp:unit=MM}}{{timestamp:unit=dd}}_{{partition:padding=true}}-{{start_offset:padding=true}}.parquet"
        }
    }
    
    requests.post('http://connect:8083/connectors', json=minio)




fake = Faker()

# Função para conectar ao banco de dados
def connect_to_db():
    return psycopg2.connect(
        dbname='db',
        user='user',
        password='admin',
        host='postgres',
        port='5432'
    )

# Função para executar comandos SQL
def execute_sql_commands(commands):
    conn = connect_to_db()
    try:
        with conn.cursor() as cursor:
            for command in commands:
                cursor.execute(command)
        conn.commit()
    except Exception as e:
        print(f"An error occurred: {e}")
    finally:
        conn.close()

# Inserir condomínios e recuperar IDs
def insert_condominios():
    conn = connect_to_db()
    condominios = []
    condominio_map = {}
    try:
        with conn.cursor() as cursor:
            for i in range(1, 101):
                nome = fake.company()
                endereco = fake.address().replace('\n', ', ')
                cursor.execute("INSERT INTO condominios (nome, endereco) VALUES (%s, %s) RETURNING condominio_id;", (nome, endereco))
                condominio_id = cursor.fetchone()[0]
                condominio_map[condominio_id] = (nome, endereco)
                condominios.append(condominio_id)
            conn.commit()
    except Exception as e:
        print(f"An error occurred while inserting condominios: {e}")
    finally:
        conn.close()
    return condominios

# Inserir moradores, imóveis e transações
def insert_related_data(condominio_ids):
    conn = connect_to_db()
    moradores = []
    imoveis = []
    transacoes = []
    try:
        with conn.cursor() as cursor:
            # Inserir moradores
            for i in range(1, 101):
                nome = fake.name()
                condominio_id = random.choice(condominio_ids)
                data_registro = fake.date_this_decade()
                cursor.execute("INSERT INTO moradores (nome, condominio_id, data_registro) VALUES (%s, %s, %s) RETURNING morador_id;", (nome, condominio_id, data_registro))
                morador_id = cursor.fetchone()[0]
                moradores.append(morador_id)

            # Inserir imóveis
            for i in range(1, 101):
                tipo = random.choice(['Apartamento', 'Casa'])
                condominio_id = random.choice(condominio_ids)
                valor = round(random.uniform(100000, 1000000), 2)
                cursor.execute("INSERT INTO imoveis (tipo, condominio_id, valor) VALUES (%s, %s, %s) RETURNING imovel_id;", (tipo, condominio_id, valor))
                imovel_id = cursor.fetchone()[0]
                imoveis.append(imovel_id)

            # Inserir transações
            for i in range(1, 101):
                imovel_id = random.choice(imoveis)
                morador_id = random.choice(moradores)
                data_transacao = fake.date_this_year()
                valor_transacao = round(random.uniform(50000, 500000), 2)
                cursor.execute("INSERT INTO transacoes (imovel_id, morador_id, data_transacao, valor_transacao) VALUES (%s, %s, %s, %s);", (imovel_id, morador_id, data_transacao, valor_transacao))

            conn.commit()
    except Exception as e:
        print(f"An error occurred while inserting related data: {e}")
    finally:
        conn.close()

cria_conectores()
# Executar o processo
condominio_ids = insert_condominios()
insert_related_data(condominio_ids)



In [3]:
#INGESTÃO NO DATA LAKE
import os
import pyspark
from pyspark.sql import SparkSession

## DEFINE SENSITIVE VARIABLES
WAREHOUSE = os.environ.get("WAREHOUSE") 
AWS_ACCESS_KEY = os.environ.get("AWS_ACCESS_KEY_ID") 
AWS_SECRET_KEY = os.environ.get("AWS_SECRET_ACCESS_KEY") 
AWS_S3_ENDPOINT = os.environ.get("AWS_S3_ENDPOINT") ## MINIO ENDPOINT
PG_USER = os.environ.get("PG_USER")
PG_PASSWORD = os.environ.get("PG_PASSWORD")
PG_DB = os.environ.get("PG_DB")

conf = (
    pyspark.SparkConf()
        .setAppName('app_name')
        .set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.3.4,io.delta:delta-core_2.12:2.4.0,org.postgresql:postgresql:42.6.0,software.amazon.awssdk:bundle:2.17.178,software.amazon.awssdk:url-connection-client:2.17.178')
        .set('spark.sql.extensions', 'io.delta.sql.DeltaSparkSessionExtension')
        .set('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.delta.catalog.DeltaCatalog')
        .set("spark.hadoop.fs.s3a.path.style.access", "true")
        .set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
        .set("spark.hadoop.fs.s3a..connection.ssl.enabled", "true")
        
        .set("spark.hadoop.fs.s3a.endpoint", AWS_S3_ENDPOINT)
        .set('spark.hadoop.fs.s3a.access.key', AWS_ACCESS_KEY)
        .set('spark.hadoop.fs.s3a.secret.key', AWS_SECRET_KEY)
)

# Inicializa a sessão Spark com suporte ao Delta Lake
spark = SparkSession.builder \
    .appName("DataIngestion") \
    .master("local")\
    .config(conf=conf)\
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

# Configurações do PostgreSQL
pg_url = "jdbc:postgresql://postgres:5432/" + PG_DB
pg_properties = {
    "user": PG_USER,
    "password": PG_PASSWORD,
    "driver": "org.postgresql.Driver"
}

# Carrega dados das tabelas PostgreSQL
def load_table(table_name):
    return spark.read.jdbc(url=pg_url, table=table_name, properties=pg_properties)

# Salva os dados no MinIO em formato Parquet
def save_to_minio(df, path):
    df.write.mode("overwrite").parquet(f"{WAREHOUSE}/raw/full/{path}")

tables = ('condominios', 'moradores', 'transacoes', 'imoveis')

for table in tables:
    df = load_table(f"public.{table}")
    save_to_minio(df, table)

spark.stop()

In [62]:
# BRONZE
import os
import pandas as pd
import struct

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import when
from pyspark.sql.functions import col, expr, to_date
from delta import configure_spark_with_delta_pip
from delta.tables import DeltaTable
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, LongType, DecimalType, BinaryType


def bytes_to_decimal(byte_list):
    if not byte_list:
        return None
    
    # Converte a lista de bytes para um número inteiro
    integer_value = int.from_bytes(bytearray(byte_list), byteorder='big', signed=False)
    
    # Converte o inteiro para decimal com precisão e escala
    precision = 15
    scale = 2
    decimal_value = integer_value / (10 ** scale)
    
    # Ajusta o valor para a precisão desejada
    return round(decimal_value, scale)
    

## DEFINE SENSITIVE VARIABLES
WAREHOUSE = os.environ.get("WAREHOUSE") 
AWS_ACCESS_KEY = os.environ.get("AWS_ACCESS_KEY_ID") 
AWS_SECRET_KEY = os.environ.get("AWS_SECRET_ACCESS_KEY") 
AWS_S3_ENDPOINT = os.environ.get("AWS_S3_ENDPOINT") ## MINIO ENDPOINT

conf = (
    pyspark.SparkConf()
        .setAppName('app_name')
        .set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.3.4,io.delta:delta-core_2.12:2.4.0,software.amazon.awssdk:bundle:2.17.178,software.amazon.awssdk:url-connection-client:2.17.178')
        .set('spark.sql.extensions', 'io.delta.sql.DeltaSparkSessionExtension')
        .set('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.delta.catalog.DeltaCatalog')
        .set("spark.hadoop.fs.s3a.path.style.access", "true")
        .set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
        .set("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
        .set("spark.hadoop.fs.s3a.endpoint", AWS_S3_ENDPOINT)
        .set('spark.hadoop.fs.s3a.access.key', AWS_ACCESS_KEY)
        .set('spark.hadoop.fs.s3a.secret.key', AWS_SECRET_KEY)
)

# Inicializa a sessão Spark com suporte ao Delta Lake
spark = SparkSession.builder \
    .appName("DataIngestion") \
    .master("local")\
    .config(conf=conf)\
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

def q(query, n=30):
    return spark.sql(query).show(n=n, truncate=False)

def table_exists(namespace, table):
    count = (spark.sql(f'SHOW TABLES FROM {namespace}')
                .filter(f"namespace = '{namespace}' AND tableName = '{table}'")
                .count())
    return count == 1

q('CREATE DATABASE IF NOT EXISTS condomanage')
q('USE condomanage')

# Carrega dados do raw
def load_raw(table_name):
    return spark.read.format('parquet').load(f's3a://condomanage/raw/full/{table_name}')
    
# Salva os dados no MinIO em formato delta
def save_to_delta(df, path):
    df.write.format('delta').mode('overwrite').option("mergeSchema", "true").save(path)

tables = ('condominios', 'moradores', 'transacoes', 'imoveis')
namespace = 'condomanage'

for table in tables:
    if not table_exists(namespace, table):
        print(f'Criando tabela {table}...')
        df = load_raw(table)
        save_to_delta(df, f"{WAREHOUSE}/bronze/{table}")
        q(f"""
            CREATE  TABLE  IF NOT EXISTS {table}
            USING DELTA
            LOCATION '{WAREHOUSE}/bronze/{table}'
        """)
    else:
        print(f'Tabela {table} já existente...')

++
||
++
++

++
||
++
++

Criando tabela condominios...
++
||
++
++

Criando tabela moradores...
++
||
++
++

Criando tabela transacoes...
++
||
++
++

Criando tabela imoveis...
++
||
++
++



In [27]:
#AGREGA DADOS CDC moradores

schema = StructType([
    StructField('value', StructType([
        StructField('before', StructType([
            StructField('morador_id', IntegerType(), True), 
            StructField('nome', StringType(), True), 
            StructField('condominio_id', IntegerType(), True), 
            StructField('data_registro', IntegerType(), True)
        ]), True), 
        StructField('after', StructType([
            StructField('morador_id', IntegerType(), True), 
            StructField('nome', StringType(), True), 
            StructField('condominio_id', IntegerType(), True), 
            StructField('data_registro', IntegerType(), True)
        ]), True), 
        StructField('source', StructType([
            StructField('version', StringType(), True), 
            StructField('connector', StringType(), True), 
            StructField('name', StringType(), True), 
            StructField('ts_ms', LongType(), True), 
            StructField('snapshot', StringType(), True), 
            StructField('db', StringType(), True), 
            StructField('sequence', StringType(), True), 
            StructField('schema', StringType(), True), 
            StructField('table', StringType(), True), 
            StructField('txId', LongType(), True), 
            StructField('lsn', LongType(), True), 
            StructField('xmin', LongType(), True)
        ]), True), 
        StructField('op', StringType(), True), 
        StructField('ts_ms', LongType(), True), 
        StructField('transaction', StructType([
            StructField('id', StringType(), True), 
            StructField('total_order', LongType(), True), 
            StructField('data_collection_order', LongType(), True)
        ]), True)
    ]), True)
])

df_stream = spark.readStream.format('parquet')\
    .schema(schema).load(f's3a://condomanage/raw/cdc/postgres.public.moradores')

deltaTable = DeltaTable.forPath(spark, 's3a://condomanage/bronze/moradores')

stream_moradores = (df_stream.writeStream\
    .option("checkpointLocation", f"s3a://condomanage/raw/cdc/moradores_checkpoint/")\
    .foreachBatch(lambda df, batchID: upsert_moradores(df, deltaTable))
    .trigger(availableNow=True))
    
def upsert_moradores(df, deltaTable):
    df_selecionado = df.select(
        "value.after.condominio_id",
        "value.after.data_registro",
        when(df.value.op == 'd', df.value.before.morador_id).otherwise(df.value.after.morador_id).alias('morador_id'),
        "value.after.nome",
        "value.op",
        "value.before",
        "value.ts_ms"
    )
    
   # RENOMEIA AS COLUNAS
    df_selecionado = df_selecionado.withColumnRenamed("value.after.condominio_id", "condominio_id") \
                                   .withColumnRenamed("value.after.data_registro", "data_registro") \
                                   .withColumnRenamed("value.after.morador_id", "morador_id") \
                                   .withColumnRenamed("value.after.nome", "nome") \
                                   .withColumnRenamed("value.op", "op") \
                                   .withColumnRenamed("value.before", "before") \
                                   .withColumnRenamed("value.ts_ms", "ts_ms")
    
    # SELECIONA A ULTIMA ATUALIZAÇÃO DO DADO
    df_selecionado.createOrReplaceGlobalTempView('view_moradores')
    
    cdc_unique = spark.sql('''
    WITH 
        qualify as (select 
            *, 
            ROW_NUMBER() over(partition by morador_id order by ts_ms desc) as qualify 
        from global_temp.view_moradores)
    select * from qualify where qualify = 1''')
    
    # AJUSTA O TIPO DA COLUNA DATA_REGISTRO
    cdc_unique = cdc_unique.withColumn("data_registro", to_date(expr("date_add('1970-01-01', data_registro)")))
    
    # UPSERT
    (deltaTable.alias('b')
        .merge(cdc_unique.alias('d'), 
        'b.morador_id = d.morador_id')
        .whenMatchedUpdateAll(condition = "d.op = 'u'")
        .whenMatchedDelete(condition = "d.op = 'd'")
        .whenNotMatchedInsertAll(condition = "d.op = 'c' or d.op = 'u'")
        .execute()
    )

In [26]:
# AGREGA CDC CONDOMINIOS

schema = StructType([
    StructField('value', StructType([
        StructField('before', StructType([
            StructField('condominio_id', IntegerType(), True), 
            StructField('nome', StringType(), True), 
            StructField('endereco', StringType(), True)
        ]), True), 
        StructField('after', 
            StructType([
                StructField('condominio_id', IntegerType(), True), 
                StructField('nome', StringType(), True), 
                StructField('endereco', StringType(), True)
            ]), True), 
        StructField('source', 
                    StructType([
                        StructField('version', StringType(), True), 
                        StructField('connector', StringType(), True), 
                        StructField('name', StringType(), True), 
                        StructField('ts_ms', LongType(), True), 
                        StructField('snapshot', StringType(), True),
                        StructField('db', StringType(), True), 
                        StructField('sequence', StringType(), True), 
                        StructField('schema', StringType(), True), 
                        StructField('table', StringType(), True), 
                        StructField('txId', LongType(), True), 
                        StructField('lsn', LongType(), True), 
                        StructField('xmin', LongType(), True)
                    ]), True), 
        StructField('op', StringType(), True), 
        StructField('ts_ms', LongType(), True), 
        StructField('transaction', StructType([
            StructField('id', StringType(), True), 
            StructField('total_order', LongType(), True), 
            StructField('data_collection_order', LongType(), True)
        ]), True)
    ]), True)
])

df_stream = spark.readStream.format('parquet')\
    .schema(schema).load(f's3a://condomanage/raw/cdc/postgres.public.condominios')

deltaTable = DeltaTable.forPath(spark, 's3a://condomanage/bronze/condominios')

stream_condominios = (df_stream.writeStream\
    .option("checkpointLocation", f"s3a://condomanage/raw/cdc/condominios_checkpoint/")\
    .foreachBatch(lambda df, batchID: upsert_condominios(df, deltaTable))
    .trigger(availableNow=True))
    


def upsert_condominios(df, deltaTable):
    df_selecionado = df.select(
        "value.after.endereco",
        "value.after.nome",
        when(df.value.op == 'd', df.value.before.condominio_id).otherwise(df.value.after.condominio_id).alias('condominio_id'),
        "value.op",
        "value.before",
        "value.ts_ms"
    )
    
   # RENOMEIA AS COLUNAS
    df_selecionado = df_selecionado.withColumnRenamed("value.after.endereco", "endereco") \
                                   .withColumnRenamed("value.after.nome", "nome") \
                                   .withColumnRenamed("value.after.condominio_id", "condominio_id") \
                                   .withColumnRenamed("value.op", "op") \
                                   .withColumnRenamed("value.before", "before") \
                                   .withColumnRenamed("value.ts_ms", "ts_ms")
    
    # SELECIONA A ULTIMA ATUALIZAÇÃO DO DADO
    df_selecionado.createOrReplaceGlobalTempView('view_condominios')
    
    cdc_unique = spark.sql('''
    WITH 
        qualify as (select 
            *, 
            ROW_NUMBER() over(partition by condominio_id order by ts_ms desc) as qualify 
        from global_temp.view_condominios)
    select * from qualify where qualify = 1''')
    
    # UPSERT
    (deltaTable.alias('b')
        .merge(cdc_unique.alias('d'), 
        'b.condominio_id = d.condominio_id')
        .whenMatchedUpdateAll(condition = "d.op = 'u'")
        .whenMatchedDelete(condition = "d.op = 'd'")
        .whenNotMatchedInsertAll(condition = "d.op = 'c'")
        .execute()
    )
   

In [25]:
# IMOVEIS Stream
# Definindo o schema do PySpark
schema = StructType([
StructField('value', StructType([
    StructField("before", StructType([
        StructField("imovel_id", IntegerType(), nullable=False),
        StructField("tipo", StringType(), nullable=False),
        StructField("condominio_id", IntegerType(), nullable=False),
        StructField("valor", BinaryType(), nullable=False)  # Coluna binária para decimal
    ]), nullable=True),
    StructField("after", StructType([
        StructField("imovel_id", IntegerType(), nullable=False),
        StructField("tipo", StringType(), nullable=False),
        StructField("condominio_id", IntegerType(), nullable=False),
        StructField("valor", BinaryType(), nullable=False)  # Coluna binária para decimal
    ]), nullable=True),
    StructField("source", StructType([
        StructField("version", StringType(), nullable=False),
        StructField("connector", StringType(), nullable=False),
        StructField("name", StringType(), nullable=False),
        StructField("ts_ms", LongType(), nullable=False),
        StructField("snapshot", StringType(), nullable=True),
        StructField("db", StringType(), nullable=False),
        StructField("sequence", StringType(), nullable=True),
        StructField("schema", StringType(), nullable=False),
        StructField("table", StringType(), nullable=False),
        StructField("txId", LongType(), nullable=True),
        StructField("lsn", LongType(), nullable=True),
        StructField("xmin", LongType(), nullable=True)
    ]), nullable=False),
    StructField("op", StringType(), nullable=False),
    StructField("ts_ms", LongType(), nullable=True),
    StructField("transaction", StructType([
        StructField("id", StringType(), nullable=False),
        StructField("total_order", LongType(), nullable=False),
        StructField("data_collection_order", LongType(), nullable=False)
    ]), nullable=True)
]))
])

df_stream = spark.readStream.format('parquet')\
    .schema(schema).load(f's3a://condomanage/raw/cdc/postgres.public.imoveis')

deltaTable = DeltaTable.forPath(spark, 's3a://condomanage/bronze/imoveis')

stream_imoveis = (df_stream.writeStream\
    .option("checkpointLocation", f"s3a://condomanage/raw/cdc/imoveis_checkpoint/")\
    .foreachBatch(lambda df, batchID: upsert_imoveis(df, deltaTable))
    .trigger(availableNow=True))
    


def upsert_imoveis(df, deltaTable):
    df_selecionado = df.select(
        "value.after.tipo",
        "value.after.valor",
        "value.after.condominio_id",
        when(df.value.op == 'd', df.value.before.imovel_id).otherwise(df.value.after.imovel_id).alias('imovel_id'),
        "value.op",
        #"value.before",
        "value.ts_ms"
    )
    
    df_selecionado = df_selecionado.withColumnRenamed("value.after.tipo", "tipo") \
                                   .withColumnRenamed("value.after.valor", "valor") \
                                   .withColumnRenamed("value.after.condominio_id", "condominio_id") \
                                    .withColumnRenamed("value.after.imovel_id", "imovel_id") \
                                   .withColumnRenamed("value.op", "op") \
                                   .withColumnRenamed("value.ts_ms", "ts_ms")
    df =df_selecionado.toPandas()
    
    df['valor'] = df.apply(lambda x: bytes_to_decimal(x['valor']), axis=1) 
    
    schema = StructType([
        StructField("tipo", StringType(), nullable=False),
        StructField("valor", DecimalType(15, 2), nullable=False),
        StructField("condominio_id", IntegerType(), nullable=False),
        StructField("imovel_id", IntegerType(), nullable=False),
        StructField("condominio_id", IntegerType(), nullable=False),
        StructField("op", StringType(), nullable=False),
        StructField("ts_ms", LongType(), nullable=True)
    ])
    
    df_selecionado = spark.createDataFrame(df)
    
    # SELECIONA A ULTIMA ATUALIZAÇÃO DO DADO
    df_selecionado.createOrReplaceGlobalTempView('view_imoveis')
    
    cdc_unique = spark.sql('''
    WITH 
        qualify as (select 
            *, 
            ROW_NUMBER() over(partition by imovel_id order by ts_ms desc) as qualify 
        from global_temp.view_imoveis)
    select * from qualify where qualify = 1''')
    
    # UPSERT
    (deltaTable.alias('b')
        .merge(cdc_unique.alias('d'), 
        'b.imovel_id = d.imovel_id')
        .whenMatchedUpdateAll(condition = "d.op = 'u'")
        .whenMatchedDelete(condition = "d.op = 'd'")
        .whenNotMatchedInsertAll(condition = "d.op = 'c'")
        .execute()
    )

In [48]:
# AGREGA CDC TRANSACOES

# Definindo o schema do PySpark
schema = StructType([
StructField('value', StructType([
    StructField("before", StructType([
        StructField("transacao_id", IntegerType(), nullable=False),
        StructField("imovel_id", IntegerType(), nullable=False),
        StructField("morador_id", IntegerType(), nullable=False),
        StructField("data_transacao", IntegerType(), nullable=False),
        StructField("valor_transacao", BinaryType(), nullable=False),# Coluna binária para decimal
    ]), nullable=True),
    StructField("after", StructType([
        StructField("transacao_id", IntegerType(), nullable=False),
        StructField("imovel_id", IntegerType(), nullable=False),
        StructField("morador_id", IntegerType(), nullable=False),
        StructField("data_transacao", IntegerType(), nullable=False),
        StructField("valor_transacao", BinaryType(), nullable=False),# Coluna binária para decimal
    ]), nullable=True),
    StructField("source", StructType([
        StructField("version", StringType(), nullable=False),
        StructField("connector", StringType(), nullable=False),
        StructField("name", StringType(), nullable=False),
        StructField("ts_ms", LongType(), nullable=False),
        StructField("snapshot", StringType(), nullable=True),
        StructField("db", StringType(), nullable=False),
        StructField("sequence", StringType(), nullable=True),
        StructField("schema", StringType(), nullable=False),
        StructField("table", StringType(), nullable=False),
        StructField("txId", LongType(), nullable=True),
        StructField("lsn", LongType(), nullable=True),
        StructField("xmin", LongType(), nullable=True)
    ]), nullable=False),
    StructField("op", StringType(), nullable=False),
    StructField("ts_ms", LongType(), nullable=True),
    StructField("transaction", StructType([
        StructField("id", StringType(), nullable=False),
        StructField("total_order", LongType(), nullable=False),
        StructField("data_collection_order", LongType(), nullable=False)
    ]), nullable=True)
]))
])

df_stream = spark.readStream.format('parquet')\
    .schema(schema).load(f's3a://condomanage/raw/cdc/postgres.public.transacoes')

deltaTable = DeltaTable.forPath(spark, 's3a://condomanage/bronze/transacoes')

stream_transacoes = (df_stream.writeStream\
    .option("checkpointLocation", f"s3a://condomanage/raw/cdc/transacoes_checkpoint/")\
    .foreachBatch(lambda df, batchID: upsert_transacoes(df, deltaTable))
    .trigger(availableNow=True))
    


def upsert_transacoes(df, deltaTable):
    df_selecionado = df.select(
        "value.after.imovel_id",
        "value.after.morador_id",
        "value.after.data_transacao",
        "value.after.valor_transacao",
        when(df.value.op == 'd', df.value.before.transacao_id).otherwise(df.value.after.transacao_id).alias('transacao_id'),
        "value.op",
        "value.ts_ms"
    )
    
    df_selecionado = df_selecionado.withColumnRenamed("value.after.imovel_id", "imovel_id") \
                                   .withColumnRenamed("value.after.morador_id", "morador_id") \
                                   .withColumnRenamed("value.after.data_transacao", "data_transacao") \
                                   .withColumnRenamed("value.after.valor_transacao", "valor_transacao") \
                                   .withColumnRenamed("value.after.transacao_id", "transacao_id") \
                                   .withColumnRenamed("value.op", "op") \
                                   .withColumnRenamed("value.ts_ms", "ts_ms")
    df = df_selecionado.toPandas()

    df['valor_transacao'] = df.apply(lambda x: bytes_to_decimal(x['valor_transacao']), axis=1) 
    
    schema = StructType([
        StructField("imovel_id", IntegerType(), nullable=False),
        StructField("morador_id", IntegerType(), nullable=False),
        StructField("data_transacao", IntegerType(), nullable=False),
        StructField("valor_transacao", DecimalType(15, 2), nullable=False),
        StructField("transacao_id", IntegerType(), nullable=False),
        StructField("op", StringType(), nullable=False),
        StructField("ts_ms", LongType(), nullable=True)
    ])
    
    df_selecionado = spark.createDataFrame(df)
    
    # SELECIONA A ULTIMA ATUALIZAÇÃO DO DADO
    df_selecionado.createOrReplaceGlobalTempView('view_transacoes')
    
    cdc_unique = spark.sql('''
    WITH 
        qualify as (select 
            imovel_id, morador_id, cast(data_transacao as integer) as data_transacao, valor_transacao, transacao_id, op, ts_ms, 
            ROW_NUMBER() over(partition by transacao_id order by ts_ms desc) as qualify 
        from global_temp.view_transacoes)
    select * from qualify where qualify = 1''')

    # AJUSTA O TIPO DA COLUNA DATA_REGISTRO
    cdc_unique = cdc_unique.withColumn("data_transacao", to_date(expr("date_add('1970-01-01', data_transacao)")))
    
    # UPSERT
    (deltaTable.alias('b')
        .merge(cdc_unique.alias('d'), 
        'b.transacao_id = d.transacao_id')
        .whenMatchedUpdateAll(condition = "d.op = 'u'")
        .whenMatchedDelete(condition = "d.op = 'd'")
        .whenNotMatchedInsertAll(condition = "d.op = 'c' or d.op = 'u'")
        .execute()
    )


In [54]:
stream_transacoes.start()
stream_imoveis.start()
stream_moradores.start()
stream_condominios.start()


<pyspark.sql.streaming.query.StreamingQuery at 0x724d59af81d0>

In [89]:
# Calcular total de transacoes
spark.sql('''
select c.nome, sum(t.valor_transacao) as total from transacoes t 
    inner join imoveis i on i.imovel_id = t.imovel_id
    inner join condominios c on c.condominio_id = i.condominio_id
group by c.condominio_id, c.nome order by 2 desc''').write.format('delta').mode('overwrite').option("mergeSchema", "true").save('s3a://condomanage/gold/total_transacoes')

In [90]:
# Calcular valor de transacoes por morador
spark.sql('''
select 
    m.nome, 
    sum(t.valor_transacao) as total 
from transacoes t 
    inner join moradores m on t.morador_id = m.morador_id
group by m.nome order by 2 desc''').write.format('delta').mode('overwrite').option("mergeSchema", "true").save('s3a://condomanage/gold/transacoes_morador')

In [91]:
# Agregar as transações diárias por tipo de imóvel
spark.sql('''
SELECT 
    t.data_transacao,
    i.tipo,
    SUM(t.valor_transacao) AS total_valor_transacao
FROM 
    transacoes t
JOIN 
    imoveis i ON t.imovel_id = i.imovel_id
GROUP BY 
    t.data_transacao,
    i.tipo
ORDER BY 
    t.data_transacao,
    i.tipo;''').write.format('delta').mode('overwrite').option("mergeSchema", "true").save('s3a://condomanage/gold/transacoes_imovel')