In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *


In [0]:
# Set scope
scope = "b_coders"

# Read the secrets 
try:
    db_host = dbutils.secrets.get(scope=scope, key="db_host")
    db_port = dbutils.secrets.get(scope=scope, key="db_port")
    db_user = dbutils.secrets.get(scope=scope, key="db_username")
    db_password = dbutils.secrets.get(scope=scope, key="db_password")
    db_database = dbutils.secrets.get(scope=scope, key="db_database")

    print("✅ Segredos carregados com sucesso.")
except Exception as e:
    print(f"❌ Erro ao carregar os segredos: {e}")


In [0]:
# Monta a URL JDBC com as variáveis seguras
jdbc_url = f"jdbc:postgresql://{db_host}:{db_port}/{db_database}"

# Exibe a URL sem a senha (por segurança)
print("🔗 URL JDBC montada com sucesso.")


In [0]:
# Define as propriedades da conexão
connection_properties = {
    "user": db_user,
    "password": db_password,
    "driver": "org.postgresql.Driver"
}


In [0]:

tabela_clientes = "beraldcoders_tbqu.clientes"

df_clientes = spark.read.jdbc(
    url=jdbc_url,
    table=tabela_clientes,
    properties=connection_properties
)

# Exibir as primeiras linhas
df_clientes.count()


In [0]:
tabela_pedidos = "beraldcoders_tbqu.pedidos"

df_pedidos = spark.read.jdbc(
    url=jdbc_url,
    table=tabela_pedidos,
    properties=connection_properties
)

# Exibir as primeiras linhas
df_pedidos.display()

In [0]:
tabela_produtos = "beraldcoders_tbqu.produtos"

df_produtos = spark.read.jdbc(
    url=jdbc_url,
    table=tabela_produtos,
    properties=connection_properties
)

# Exibir as primeiras linhas
df_produtos.display()

In [0]:
df_clientes = df_clientes.withColumn("dt_carga", current_date())
df_pedidos = df_pedidos.withColumn("dt_carga", current_date())
df_produtos = df_produtos.withColumn("dt_carga", current_date())

In [0]:
%sql
drop table if exists raw_db.clientes;

In [0]:
df_clientes.write \
    .mode("append") \
    .option("format", "delta") \
    .option("mergeSchema", "true") \
    .option("clusterBy", "estado") \
    .saveAsTable("raw_db.clientes")

In [0]:
%sql
ALTER TABLE raw_db.clientes CLUSTER BY (estado);


In [0]:
%sql
-- ou, preferencialmente:
OPTIMIZE raw_db.clientes FULL;  -- aplica o Liquid Clustering em todos os dados



In [0]:
%sql
select * from raw_db.clientes

In [0]:
df_pedidos.write \
    .mode("append") \
    .option("format", "delta") \
    .option("mergeSchema", "true") \
    .option("clusterBy", "auto") \
    .saveAsTable("raw_db.pedidos")

In [0]:
%sql
select * from raw_db.pedidos;

In [0]:
df_produtos.write \
    .mode("append") \
    .option("format", "delta") \
    .option("mergeSchema", "true") \
    .option("clusterBy", "auto") \
    .saveAsTable("raw_db.produtos")

In [0]:
%sql
select * from raw_db.produtos;