In [None]:
%python
%pip install faker

In [None]:
%python
dbutils.library.restartPython() 

In [None]:
%python
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from faker import Faker
import random

spark = SparkSession.builder.getOrCreate()
fake = Faker("pt_BR")
Faker.seed(42)
random.seed(42)

In [None]:
%python
from pyspark.sql.types import StructType, StructField, StringType

# Read clients from both tables
df_clientes = spark.table("corebank_catalog.bronze_corebank.clientes")
df_clientes_outros = spark.table("corebank_catalog.bronze_corebank.clientes_outros")

# Union both DataFrames
df_all_clientes = df_clientes.select(
    "id_cliente", "id_banco", "chave_pix"
).unionByName(
    df_clientes_outros.select("id_cliente", "id_banco", "chave_pix")
)

clientes_list = df_all_clientes.collect()

schema_transacao_pix = StructType([
    StructField("id_cliente_origem", StringType()),
    StructField("id_transacao", StringType()),
    StructField("id_banco_origem", StringType()),
    StructField("id_banco_destino", StringType()),
    StructField("id_cliente_destino", StringType()),
    StructField("valor", StringType()),
    StructField("data_hora", StringType()),
    StructField("chave_pix_destino", StringType())
])

import queue

# Simulated streaming source using a queue
transacao_pix_queue = queue.Queue()

def generate_transacao_pix_streaming():
    import threading
    import time
    import random

    def producer():
        while True:
            origem = random.choice(clientes_list)
            destino = random.choice(clientes_list)
            while destino.id_cliente == origem.id_cliente:
                destino = random.choice(clientes_list)
            data = [
                (
                    origem.id_cliente,
                    f"T{random.randint(1,1000000):08d}",
                    origem.id_banco,
                    destino.id_banco,
                    destino.id_cliente,
                    str(round(random.uniform(10, 1000), 2)),
                    time.strftime("%Y-%m-%d %H:%M:%S"),
                    destino.chave_pix
                )
            ]
            transacao_pix_queue.put(data)
            time.sleep(random.uniform(0.5, 2))

    threading.Thread(target=producer, daemon=True).start()

    while True:
        if not transacao_pix_queue.empty():
            data = transacao_pix_queue.get()
            df = spark.createDataFrame(data, schema_transacao_pix)
            # Rename columns to match the Delta table schema
            df = df.selectExpr(
                "id_cliente_origem as id_cliente",
                "id_transacao",
                "id_banco_origem as id_banco",
                "id_banco_destino",
                "id_cliente_destino",
                "valor as valor_transacao",
                "data_hora as dt_hr_transacao"
            )
            df.write.format("delta") \
                .mode("append") \
                .option("checkpointLocation", "/tmp/checkpoints/transacao_pix") \
                .saveAsTable("corebank_catalog.bronze_corebank.transacao_pix")
        time.sleep(1)

generate_transacao_pix_streaming()