# ETL das Dimensões - Olist

Este notebook realiza a leitura dos dados brutos (camada Bronze),
armazenados no DBFS, e cria as tabelas dimensão do Data Warehouse
na camada Silver.



In [0]:
#Importações
from pyspark.sql.functions import col, to_timestamp, year, month, dayofmonth


In [0]:
#Paths
BRONZE_PATH = "dbfs:/Workspace/dados_mpv/"


In [0]:
#Leitura dos CSVs (Bronze)
orders_df = spark.read.option("header", "true").csv(f"{BRONZE_PATH}olist_orders_dataset.csv")
items_df = spark.read.option("header", "true").csv(f"{BRONZE_PATH}olist_order_items_dataset.csv")
products_df = spark.read.option("header", "true").csv(f"{BRONZE_PATH}olist_products_dataset.csv")
customers_df = spark.read.option("header", "true").csv(f"{BRONZE_PATH}olist_customers_dataset.csv")
sellers_df = spark.read.option("header", "true").csv(f"{BRONZE_PATH}olist_sellers_dataset.csv")
category_df = spark.read.option("header", "true").csv(f"{BRONZE_PATH}product_category_name_translation.csv")


In [0]:
# Criar Schema Silver e Gold
spark.sql("CREATE SCHEMA IF NOT EXISTS workspace.silver")
spark.sql("CREATE SCHEMA IF NOT EXISTS workspace.gold")

In [0]:
# Dimensão Produto
dim_produto = products_df.join(
    category_df,
    products_df.product_category_name == category_df.product_category_name,
    "left"
).select(
    col("product_id").alias("id_produto"),
    col("product_category_name_english").alias("nome_categoria"),
    col("product_weight_g").alias("peso_produto"),
    col("product_length_cm").alias("comprimento_produto"),
    col("product_height_cm").alias("altura_produto"),
    col("product_width_cm").alias("largura_produto")
).dropDuplicates()

dim_produto.write.mode("overwrite").saveAsTable("silver.dim_produto")


In [0]:
# Dimensão Cliente
dim_cliente = customers_df.select(
    col("customer_id").alias("id_cliente"),
    col("customer_city").alias("cidade_cliente"),
    col("customer_state").alias("estado_cliente")
).dropDuplicates()

dim_cliente.write.mode("overwrite").saveAsTable("silver.dim_cliente")


In [0]:
# Dimensão Vendedor
dim_vendedor = sellers_df.select(
    col("seller_id").alias("id_vendedor"),
    col("seller_city").alias("cidade_vendedor"),
    col("seller_state").alias("estado_vendedor")
).dropDuplicates()

dim_vendedor.write.mode("overwrite").saveAsTable("silver.dim_vendedor")


In [0]:
# Dimensão Data
dim_data = orders_df.withColumn(
    "data", to_timestamp(col("order_purchase_timestamp"))
).select(
    col("order_purchase_timestamp").alias("id_data"),
    col("data"),
    year("data").alias("ano"),
    month("data").alias("mes"),
    dayofmonth("data").alias("dia")
).dropDuplicates()

dim_data.write.mode("overwrite").saveAsTable("silver.dim_data")


In [0]:
# Validação
display(spark.table("silver.dim_produto"))
display(spark.table("silver.dim_cliente"))
display(spark.table("silver.dim_vendedor"))
display(spark.table("silver.dim_data"))


In [0]:
spark.sql("SHOW TABLES IN workspace.silver").show()