# Criação das tabelas Dimensão para o modelo Star Schema

## Carregando as bibliotecas necessárias

In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import  col, lit, monotonically_increasing_id


StatementMeta(, 1e010dbc-6732-4525-a390-97a2c5088413, 8, Finished, Available, Finished)

## SPARK SESSION

In [7]:

spark = SparkSession.builder.appName("TransformationsGoldLayerDimensions").getOrCreate()
spark.conf.set("spark.sql.parquet.vorder.enabled", "true")
spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", "true")
spark.conf.set("spark.microsoft.delta.optimizeWrite.binSize", "1073741824")

StatementMeta(, 1e010dbc-6732-4525-a390-97a2c5088413, 9, Finished, Available, Finished)

## Importação dos dados da camada Silver
Aqui temos algumas opções: 
- Importar do storage do Data Lake (landing zone);
- Importar de um shortcut;
- Importar da Tabela (Table) propagada do Lakehouse no schema broze;

Nos exemplos abaixo usaremos a terceira opção por simplicidade. 


In [8]:
# Usando Variaveis para parametrizar os codigos:
INPUT_LAYER = "silver"
WORKSPACE_NAME = spark.conf.get("trident.workspace.name")
LAKEHOUSE_NAME = spark.conf.get("trident.lakehouse.name")
OUTPUT_LAYER = "gold"

# Diretorio de saida completo
INPUT_PATH = f"abfss://{WORKSPACE_NAME}@onelake.dfs.fabric.microsoft.com/{LAKEHOUSE_NAME}.Lakehouse/Tables/{INPUT_LAYER}"
OUTPUT_PATH = f"abfss://{WORKSPACE_NAME}@onelake.dfs.fabric.microsoft.com/{LAKEHOUSE_NAME}.Lakehouse/Tables/{OUTPUT_LAYER}"
# Diretorio de saida curto
# Tables/{OUTPUT_LAYER}/olist_customers_dataset.csv

StatementMeta(, 1e010dbc-6732-4525-a390-97a2c5088413, 10, Finished, Available, Finished)

## Carregando os dados da camada Silver

In [9]:
# Carregar customers_df na camada Silver
customers_df = spark.read.format("delta")\
        .load(f"{INPUT_PATH}/customers_silver")

# Carregar geolocation_df na camada Silver
geolocation_df = spark.read.format("delta")\
        .load(f"{INPUT_PATH}/geolocation_silver")

# Carregar products_df na camada Silver
products_df = spark.read.format("delta")\
        .load(f"{INPUT_PATH}/products_silver")

# Carregar sellers_df na camada Silver
sellers_df = spark.read.format("delta")\
        .load(f"{INPUT_PATH}/sellers_silver")

# Carregar product_category_name_translation_df na camada Silver
product_category_name_translation_df = spark.read.format("delta")\
        .load(f"{INPUT_PATH}/product_category_name_translation")



StatementMeta(, 1e010dbc-6732-4525-a390-97a2c5088413, 11, Finished, Available, Finished)

# Dimensões

## Objetivos:

Criar as tabelas Dimensão para o StarSchema ser feito na camada GOLD.  

## Informações sobre as tabelas
A tabela DimCustomer armazena atributos relacionados aos clientes.  
A tabela DimProduct armazena atributos relacionados ao produto, como categoria do produto, peso, dimensões e tradução do nome.  
A tabela DimSeller armazenará atributos relacionados ao vendedor, como localização do vendedor (CEP), ID único do vendedor e informações adicionais. 
A tabela DimGeolocation armazenará detalhes geográficos relacionados às localizações de clientes e vendedores. 

As tabelas dimensão serão usadas para junções com tabelas de fatos como a FactSales.

### Transformações Aplicadas:
- Gerando Chaves Substitutas → Chave única customer_sk para junções eficientes
- Juntando Múltiplas Tabelas → Enriquecendo os dados de outras tabelas para criar uma tabela dimensão mais robusta
- Tratando Valores Ausentes → Substituindo NULLs por 'Unknown'
- Formatando Colunas de Data → Convertendo para formatos padrão
- Otimizando o Desempenho da Consulta → Armazenando como Tabela Delta


### Explicação das Transformações
- monotonically_increasing_id() → Gera uma chave substituta única (customer_sk).
- fillna() → Substitui valores NULL por 'Unknown' para evitar perda de dados.
- Seleção e Renomeação de Colunas → Mantém colunas relevantes e renomeia customer_id para maior clareza.
- Salvando como Formato Delta → Garante desempenho eficiente nas consultas.



## Dimensão Clientes

In [10]:
# Gerar Chave Surrogate
customers_df = customers_df.withColumn("customer_sk", monotonically_increasing_id())

# Tratar Valores Ausentes
customers_df = customers_df.fillna(
    {
        'customer_unique_id': 'Unknown', 
        'customer_id': 'Unknown',
        'customer_zip_code_prefix': 'Unknown',
        'customer_city': 'Unknown', 
        'customer_state': 'Unknown'
    })

# Selecionar Colunas Necessárias
dim_customers_df = customers_df.select(
    col("customer_sk").cast("string"),
    col("customer_id").alias("natural_customer_key"),  # Chave Natural
    col("customer_unique_id"),
    col("customer_zip_code_prefix"),
    col("customer_city"),
    col("customer_state")
)

# Salvar como Tabela Delta na Camada Gold
dim_customers_df.write.format("delta")\
                  .mode("overwrite")\
                  .option("path", f"{OUTPUT_PATH}/dim_customers")\
                  .save()

StatementMeta(, 1e010dbc-6732-4525-a390-97a2c5088413, 12, Finished, Available, Finished)

## Dimensão Produtos

In [11]:
# Gerar Chave Surrogate
products_df = products_df.withColumn("product_sk", monotonically_increasing_id())

# Tratar Valores Ausentes
products_df = products_df.fillna(
    {
        "product_sk": "Unknown",
        "product_id": "Unknown",
        "product_category_name": "Unknown",
        "product_name_length": "0",
        "product_description_length": "0",
        "product_photos_qty": "0",
        "product_weight_g": 0,
        "product_length_cm": 0,
        "product_height_cm": 0,
        "product_width_cm": 0    
})

# Unir com a Tabela de Tradução de Nome da Categoria do Produto
dim_products_df = products_df.join(
    product_category_name_translation_df, 
    on="product_category_name", 
    how="left"
)

# Selecionar Colunas Necessárias
dim_products_df = dim_products_df.select(
    col("product_sk").cast("string"),
    col("product_id").alias("natural_product_key"),  # Chave Natural
    col("product_category_name").alias("category"),
    col("product_category_name_english").alias("category_english"),
    col("product_weight_g"),
    col("product_length_cm"),
    col("product_height_cm"),
    col("product_width_cm")
)

# Salvar como Tabela Delta na Camada Gold
dim_products_df.write.format("delta")\
                .mode("overwrite")\
                .option("path", f"{OUTPUT_PATH}/dim_product")\
                .save()


StatementMeta(, 1e010dbc-6732-4525-a390-97a2c5088413, 13, Finished, Available, Finished)

## Dimensão Vendedores

In [12]:
# Gerar Chave Surrogate
sellers_df = sellers_df.withColumn("seller_sk", monotonically_increasing_id())

# Tratar Valores Ausentes
sellers_df = sellers_df.fillna({
    "seller_sk": "Unknown",
    "seller_id": "Unknown",
    "seller_zip_code_prefix": "0",
    "seller_city": "Unknown",
    "seller_state": "Unknown"
})

# Selecionar Colunas Necessárias e Renomear para Clareza
dim_sellers_df = sellers_df.select(
    col("seller_sk").cast("string"),
    col("seller_id").alias("natural_seller_key"),  # Chave Natural
    col("seller_zip_code_prefix").alias("seller_zip_code"),
    col("seller_city").alias("seller_city"),
    col("seller_state").alias("seller_state")
)

# Salvar como Tabela Delta na Camada Gold
dim_sellers_df.write.format("delta")\
                    .mode("overwrite")\
                    .option("path", f"{OUTPUT_PATH}/dim_seller")\
                    .save()


StatementMeta(, 1e010dbc-6732-4525-a390-97a2c5088413, 14, Finished, Available, Finished)

## Dimensão Geolocalização

In [13]:
# Remover Duplicatas (Garantindo entradas de geolocalização únicas)
geolocation_df = geolocation_df.dropDuplicates(["geolocation_lat", "geolocation_lat"])

# Gerar Chave Surrogate
geolocation_df = geolocation_df.withColumn("geolocation_sk", monotonically_increasing_id())

# Tratar Valores Ausentes
geolocation_df = geolocation_df.fillna(
{
    "geolocation_sk": "Unknown",
    "geolocation_zip_code_prefix": "0",
    "geolocation_lat": "0",
    "geolocation_lng": "0",
    "geolocation_city": "Unknown",
    "geolocation_state": "Unknown"
})

# Selecionar Colunas Necessárias e Renomear para Clareza
dim_geolocation_df = geolocation_df.select(
    col("geolocation_sk").cast("string"),
    col("geolocation_zip_code_prefix").alias("zip_code"),
    col("geolocation_lat").alias("latitude"),
    col("geolocation_lng").alias("longitude"),
    col("geolocation_city").alias("city"),
    col("geolocation_state").alias("state")
)

# Salvar como Tabela Delta na Camada Gold
dim_geolocation_df.write.format("delta")\
                     .mode("overwrite")\
                     .option("path", f"{OUTPUT_PATH}/dim_geolocation")\
                     .save()


StatementMeta(, 1e010dbc-6732-4525-a390-97a2c5088413, 15, Finished, Available, Finished)