### Imports e Configurações

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import *


In [0]:
%run ./aws_postgres_keys

### Postgres jdbc URL

In [0]:
jdbcURL = "jdbc:postgresql://{}/{}".format(HOST,DATABASE_NAME)

### Create connection properties with Username & password

In [0]:
connProperties = {
        'user':USER_NAME,
        'password':PASSWORD
}

### Read the table from aws_postgres_sql

In [0]:
df_contrato = spark.read.jdbc(url=jdbcURL,table='contrato',properties = connProperties)
df_cliente = spark.read.jdbc(url=jdbcURL,table='clientes',properties = connProperties)
df_rating = spark.read.jdbc(url=jdbcURL,table='rating',properties = connProperties)
df_rating_cliente = spark.read.jdbc(url=jdbcURL,table='rating_cliente',properties = connProperties)

In [0]:
display(df_contrato)



In [0]:
display(df_cliente)


In [0]:
display(df_rating)


In [0]:
display(df_rating_cliente)

### Harmonizar colunas

In [0]:
df_contrato = df_contrato.toDF(*[col.replace('.', '_') for col in df_contrato.columns])
df_cliente = df_cliente.toDF(*[col.replace('.', '_') for col in df_cliente.columns])
df_rating = df_rating.toDF(*[col.replace('.', '_') for col in df_rating.columns])
df_rating_cliente = df_rating_cliente.toDF(*[col.replace('.', '_') for col in df_rating_cliente.columns])


### Criar tabelas Bronze

In [0]:
df_contrato.write.mode("overwrite").saveAsTable("`impacta-databricks`.bronze.contrato")
df_cliente.write.mode("overwrite").saveAsTable("`impacta-databricks`.bronze.cliente")
df_rating.write.mode("overwrite").saveAsTable("`impacta-databricks`.bronze.rating")
df_rating_cliente.write.mode("overwrite").saveAsTable("`impacta-databricks`.bronze.rating_cliente")

### Carregar Tabelas Bronze

In [0]:
df_bronze_contrato = spark.table("`impacta-databricks`.bronze.contrato")
df_bronze_cliente = spark.table("`impacta-databricks`.bronze.cliente")
df_bronze_rating = spark.table("`impacta-databricks`.bronze.rating")
df_bronze_rating_cliente = spark.table("`impacta-databricks`.bronze.rating_cliente")


### Exibir Tabelas Bronze do Catalogo

In [0]:
display(df_bronze_contrato)

In [0]:
display(df_bronze_cliente)

In [0]:
display(df_bronze_rating)

In [0]:
display(df_bronze_rating_cliente)

### Normalização e joins

In [0]:
df_prata = (
    df_bronze_contrato.alias("c")
    .join(df_bronze_cliente.alias("cl"), "id_cliente", "left")
    .join(df_bronze_rating_cliente.alias("rc"), "id_cliente", "left")
    .join(df_bronze_rating.alias("r"), "rating", "left")
    .select(
        "c.id_contrato",
        "c.id_cliente",
        "cl.primeiro_nome",
        "cl.sobrenome",
        "c.segmento_cliente",
        "c.valor_desembolsado",
        "c.saldo_devedor",
        "c.data_vencimento",
        "c.dias_atraso",
        "c.tipo_cliente",
        "r.rating",
        "r.descricao",
        "r.grau" 
    )
)


### Probabilidade de default

In [0]:
df_prata = (
    df_prata
    .withColumn(
        "pd",
        F.when(F.col("dias_atraso") <= 0, 0.01)
         .when((F.col("dias_atraso") > 0) & (F.col("dias_atraso") <= 30), 0.05)
         .when((F.col("dias_atraso") > 30) & (F.col("dias_atraso") <= 60), 0.15)
         .when((F.col("dias_atraso") > 60) & (F.col("dias_atraso") <= 90), 0.30)
         .otherwise(0.60)
    )
)


### Cálculo de PDD (Prata)

In [0]:
df_prata = (
    df_prata
     .withColumn(
        "pdd_calculada",
        F.round(F.col("saldo_devedor") * F.col("pd"), 2)
    )
    .withColumn("ano_mes", F.date_format("data_vencimento", "yyyy-MM"))
)


### Persistência Prata (Delta)

In [0]:
df_prata.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("`impacta-databricks`.prata.pdd_contrato")
    


In [0]:
df_prata_pdd_contrato = spark.table("`impacta-databricks`.prata.pdd_contrato")
display(df_prata_pdd_contrato)



## Camada Ouro

### Agregações para BI (Ouro)

In [0]:
df_ouro = (
    df_prata
    .groupBy("ano_mes", "segmento_cliente", "rating")
    .agg(
        F.sum("saldo_devedor").alias("saldo_total"),
        F.sum("pdd_calculada").alias("pdd_total"),
        F.countDistinct("id_contrato").alias("qtd_contratos")
    )
)


### Métricas derivadas

In [0]:
df_ouro = (
    df_ouro
    .withColumn(
        "indice_pdd",
        F.col("pdd_total") / F.col("saldo_total")
    )
)


### Persistência Ouro (Delta)

In [0]:
df_ouro.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("`impacta-databricks`.ouro.pdd_dashboard")
    
display(df_ouro)
