# Carregamento, tratamento e Análise dos dados presentes na camada Gold

### Procedimentos

Para que nossa camada <b>Gold</b> seja construida e carregada corratamente, precisamos primeiramente realizar alguns passos, sendo eles:

- Configurar Spark e Acesso ao banco 
- Consultar os dados da camada Silver
- Montar a estrutura Gold com os dados da silver
- Inserir no banco
- Tratar Movimentacoes

### Configurar Spark e Acesso ao banco 

Primeira coisa a se fazer é configurar a ferramente que será utilizada para conectar ao banco de dados e interagir com o mesmo, sendo ela o PySpark. Para configurar o PySpark primeiramente temos que importar sua biblioteca e carregar nossas variaveis de ambiente para que ela possa acessar nosso banco de dados.

In [20]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, when, monotonically_increasing_id, lit
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, DateType

import glob
import warnings
import os

BASE_URL = "jdbc:postgresql://"
HOST = "imoveis.dos.sonhos.db"
PORT = "5432"
USER = "usrImoveisDosSonhos"
PASSWORD = "S3nh@F0rte"
DATABASE = "ImoveisDosSonhosDB"

JDBC_URL = f'{BASE_URL}{HOST}:{PORT}/{DATABASE}'
TABELA = "silver.imovelcaixa"
PROPRIEDADES = { "user": USER, "password": PASSWORD, "driver": "org.postgresql.Driver" }


warnings.filterwarnings('ignore')

spark = SparkSession.builder \
    .appName("ImovelDosSonhosETL") \
    .config("spark.jars.packages", "org.postgresql:postgresql:42.5.0") \
    .config("spark.sql.debug.maxToStringFields", 1000) \
    .config("spark.ui.showConsoleProgress", "false") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")
print("SparkSession iniciada com o driver PostgreSQL.")

print("Bibliotecas importadas e Variáveis configuradas")

SparkSession iniciada com o driver PostgreSQL.
Bibliotecas importadas e Variáveis configuradas


### Consultar os dados da camada Silver

In [30]:
imoveis_silver = spark.read.jdbc(
    url=JDBC_URL,
    table=TABELA,
    properties=PROPRIEDADES
)

### Montar a estrutura Gold com os dados da silver

In [31]:
## Localizacao
df_localizacao_unique = imoveis_silver.select("UF", "Cidade", "Bairro", "Endereco", "NumeroImovel").distinct()
df_movimentacao_unique = imoveis_silver.select("ValorAvaliacao", "Desconto", "NumeroImovel").distinct()
df_fato_unique = imoveis_silver.select("NumeroImovel", "ValorAvaliacao", "Desconto", "UF", "Cidade", "Bairro", "Endereco",).distinct();

df_dim_localizacao = df_localizacao_unique \
    .withColumn("num_localizacao", monotonically_increasing_id()) \
    .select(
        col("NumeroImovel").alias("num_imovel_Bk"),
        col("num_localizacao"),
        col("UF").alias("sig_estado"),
        col("Cidade").alias("nom_cidade"),
        col("Bairro").alias("nom_bairro"),
        col("Endereco").alias("nom_endereco")
    )

df_dim_movimentacao = df_movimentacao_unique \
    .withColumn("num_movimentacao", monotonically_increasing_id()) \
    .select(
        col("NumeroImovel").alias("num_imovel_Bk"),
        col("num_movimentacao")
    )

df_dim_localizacao_existente = spark.read.jdbc(
    url=JDBC_URL,
    table="dw.dim_localizacao",
    properties=PROPRIEDADES
)

df_localizacao_cadastro = df_dim_localizacao.alias("candidata").join(
    df_dim_localizacao_existente.alias("existente"),
    on=[
        col("candidata.sig_estado") == col("existente.sig_estado"),
        col("candidata.nom_cidade") == col("existente.nom_cidade"),
        col("candidata.nom_bairro") == col("existente.nom_bairro"),
        col("candidata.nom_endereco") == col("existente.nom_endereco")
    ],
    how="left_anti"
).select(
    col("candidata.num_imovel_Bk"),
    col("candidata.num_localizacao"),
    col("candidata.sig_estado"),
    col("candidata.nom_cidade"),
    col("candidata.nom_bairro"),
    col("candidata.nom_endereco")
)


df_localizacao_cadastro.select(
    col("num_localizacao"),
    col("sig_estado"),
    col("nom_cidade"),
    col("nom_bairro"),
    col("nom_endereco")
).write.jdbc(
    url=JDBC_URL,
    table="dw.dim_localizacao", 
    mode="append",
    properties=PROPRIEDADES
)

df_dim_movimentacao.select(
    col("num_movimentacao"),
    lit("CADASTRO").alias("nom_tipo_movimentacao"),
    lit("SEM VALOR REGISTRADO").alias("dsc_de"),
    lit("PREÇO BASE").alias("dsc_para")
).write.jdbc(
    url=JDBC_URL,
    table="dw.dim_movimentacao", 
    mode="append",
    properties=PROPRIEDADES
)


## Imovel
df_imovel_unique = imoveis_silver.select(
        "NumeroImovel",
        "Descricao",
        "LinkDeAcesso",
        "ModalidadeDeVenda"
    ).distinct()

df_dim_imovel = df_imovel_unique \
    .select(
        col("NumeroImovel").alias("num_imovel"),
        col("LinkDeAcesso").alias("url_acesso"),
        col("Descricao").alias("dsc"),
        col("ModalidadeDeVenda").alias("nom_modalidade_venda")
    )

df_dim_imovel_existente = spark.read.jdbc(
    url=JDBC_URL,
    table="dw.dim_imovel",
    properties=PROPRIEDADES
)


df_imovel_cadastro = df_dim_imovel.alias("candidata").join(
    df_dim_imovel_existente.alias("existente"),
    on=[
        col("candidata.num_imovel") == col("existente.num_imovel"),
        col("candidata.url_acesso") == col("existente.url_acesso"),
        col("candidata.dsc") == col("existente.dsc"),
        col("candidata.nom_modalidade_venda") == col("existente.nom_modalidade_venda")
    ],
    how="left_anti"
).select(
    col("candidata.num_imovel"),
    col("candidata.url_acesso"),
    col("candidata.dsc"),
    col("candidata.nom_modalidade_venda")
)

df_imovel_cadastro.write.jdbc(
    url=JDBC_URL,
    table="dw.dim_imovel", 
    mode="append",
    properties=PROPRIEDADES
)
print("DW Importado")

DW Importado


In [34]:
df_fato_unico = imoveis_silver.select("ValorAvaliacao", "Desconto", "NumeroImovel").distinct()

df_fato = df_fato_unico.alias("silver") \
    .withColumn("num_venda_imovel", monotonically_increasing_id()) \
    .join(
        df_dim_imovel.alias("imovel"),
        on=[col("silver.NumeroImovel") == col("imovel.num_imovel")],
        how="inner"
    ) \
    .join(
        df_dim_localizacao.alias("localizacao"),
        on=[col("silver.NumeroImovel") == col("localizacao.num_imovel_BK")],
        how="inner"
    ) \
    .join(
        df_dim_movimentacao.alias("movimentacao"),
        on=[col("silver.NumeroImovel") == col("movimentacao.num_imovel_BK")],
        how="inner"
    ) \
    .select(
        col("num_venda_imovel"),
        col("imovel.num_imovel").alias("FK_num_imovel"),
        col("movimentacao.num_movimentacao").alias("FK_num_movimentacao"),
        col("localizacao.num_localizacao").alias("FK_num_localizacao"),
        col("silver.ValorAvaliacao").alias("val_avaliacao"),
        col("silver.Desconto").alias("val_desconto")
    )


df_fato.write.jdbc(
        url=JDBC_URL,
        table="dw.Fato_VendaImovel", 
        mode="append",
        properties=PROPRIEDADES
    )

print("dw.Fato_VendaImovel importada.")

dw.Fato_VendaImovel importada.
