In [0]:
%sql
use catalog database_mvp;

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS bronze COMMENT 'Camada para dados brutos ingeridos';
CREATE SCHEMA IF NOT EXISTS silver COMMENT 'Camada para dados limpos, filtrados e enriquecidos';
CREATE SCHEMA IF NOT EXISTS gold COMMENT 'Camada para dados agregados e prontos para análise';
     

In [0]:
%python

path = "/Volumes/database_mvp/bronze/tabelas-dimensoes/Lojas.csv"

df_lojas = (
    spark.read
        .option("header", "true")
        .option("encoding", "ISO-8859-1")   # Excel Brasil
        .option("delimiter", ";")           # tente ; também!
        .option("escape", '"')              # permite aspas dentro de campos
        .csv(path)
)
df_lojas.display()

In [0]:
%python
df_lojas.write.format("delta").mode("overwrite").saveAsTable("database_mvp.bronze.lojas_raw")

In [0]:

path = "/Volumes/database_mvp/bronze/tabelas-dimensoes/Produtos.csv"

df_produtos = (
    spark.read
        .option("header", "true")
        .option("encoding", "ISO-8859-1")   # Excel Brasil
        .option("delimiter", ";")           # tente ; também!
        .option("escape", '"')              # permite aspas dentro de campos
        .csv(path)
)
df_lojas.display()

In [0]:
df_produtos.write.format("delta").mode("overwrite").saveAsTable("database_mvp.bronze.produtos_raw")

In [0]:
path = "/Volumes/database_mvp/bronze/tabelas-dimensoes/Faturamento.csv"

df_faturamento = (
    spark.read
        .option("header", "true")
        .option("encoding", "ISO-8859-1")   # Excel Brasil
        .option("delimiter", ";")           # tente ; também!
        .option("escape", '"')              # permite aspas dentro de campos
        .csv(path)
)
df_faturamento.display()

In [0]:
df_faturamento.write.format("delta").mode("overwrite").saveAsTable("database_mvp.bronze.faturamento_raw")

In [0]:
from pyspark.sql.functions import col, trim, upper, regexp_replace

df_produtos_bronze = spark.table("database_mvp.bronze.produtos_raw")

df_produtos_silver = (
    df_produtos_bronze
        .select(
            col("COD_SKU").cast("string").alias("COD_SKU"),
            trim(col("DESC_SKU")).cast("string").alias("DESC_SKU"),
            trim(col("DESC_COR")).cast("string").alias("DESC_COR"),
            trim(col("DESC_AGRUPAMENTO")).cast("string").alias("DESC_AGRUPAMENTO"),
            trim(col("DESC_GRUPO")).cast("string").alias("DESC_GRUPO"),
            trim(col("DESC_LINHA")).cast("string").alias("DESC_LINHA"),
            trim(col("FORNECEDOR")).cast("string").alias("FORNECEDOR"),
            trim(col("DESC_COLECAO")).cast("string").alias("DESC_COLECAO"),
            # primeiro limpamos a vírgula -> ponto
            regexp_replace(col("PRECO_IMP"), ",", ".").alias("PRECO_IMP_LIMPO")
        )
        # convertemos o texto limpo em decimal
        .withColumn("PRECO_IMP", col("PRECO_IMP_LIMPO").cast("decimal(18,2)"))
        .drop("PRECO_IMP_LIMPO")
        # padronizar textos em maiúsculo nas descrições/categorias
        .withColumn("DESC_SKU", upper(col("DESC_SKU")))
        .withColumn("DESC_AGRUPAMENTO", upper(col("DESC_AGRUPAMENTO")))
        .withColumn("DESC_GRUPO", upper(col("DESC_GRUPO")))
        .withColumn("DESC_LINHA", upper(col("DESC_LINHA")))
)

df_produtos_silver.write.format("delta").mode("overwrite") \
    .saveAsTable("database_mvp.silver.dim_produto_base")

In [0]:
%sql
--Checando a qualidade

SELECT  *
FROM database_mvp.silver.dim_produto_base
LIMIT 50;


In [0]:
%sql
DESCRIBE TABLE database_mvp.silver.dim_produto_base;

In [0]:
df = spark.table("database_mvp.silver.dim_produto_base")

df_limpo = (
    df
        .withColumn("COD_SKU", trim(col("COD_SKU")))
        .withColumn("DESC_SKU", trim(col("DESC_SKU")))
        .withColumn("DESC_COR", trim(col("DESC_COR")))
        .withColumn("DESC_AGRUPAMENTO", trim(col("DESC_AGRUPAMENTO")))
        .withColumn("DESC_GRUPO", trim(col("DESC_GRUPO")))
        .withColumn("DESC_LINHA", trim(col("DESC_LINHA")))
        .withColumn("FORNECEDOR", trim(col("FORNECEDOR")))
        .withColumn("DESC_COLECAO", trim(col("DESC_COLECAO")))
)

df_limpo.write.format("delta").mode("overwrite") \
    .saveAsTable("database_mvp.silver.dim_produto_base")

In [0]:
%sql
SELECT 
  COD_SKU,
  LENGTH(COD_SKU) AS tam_cod,
  DESC_SKU,
  LENGTH(DESC_SKU) AS tam_desc
FROM database_mvp.silver.dim_produto_base
LIMIT 20;

In [0]:
%sql
-- Procurando valores errados

SELECT *
FROM database_mvp.silver.dim_produto_base
WHERE PRECO_IMP IS NULL

In [0]:
%sql
-- verificação PRECO_IMP foi convertido corretamente
SELECT COD_SKU, PRECO_IMP
FROM database_mvp.silver.dim_produto_base
WHERE PRECO_IMP IS NOT NULL
LIMIT 20;

In [0]:
%sql
SELECT COUNT(*) 
FROM database_mvp.silver.dim_produto_base;


In [0]:
display(spark.table("database_mvp.silver.dim_produto_base"))

In [0]:
from pyspark.sql.functions import (
    col,
    trim,
    when,
    regexp_replace,
    to_date,
    substring,
    concat,
    lit
)


df_lojas_bronze = spark.table("database_mvp.bronze.lojas_raw")

df_lojas_silver = (
    df_lojas_bronze
        # tipos básicos e remoção de espaços nas colunas principais
        .withColumn("LOJA", trim(col("LOJA")).cast("string"))
        .withColumn("TIPO", trim(col("TIPO")).cast("string"))
        .withColumn("NOME", trim(col("NOME")).cast("string"))
        .withColumn("SUPERVISOR", trim(col("SUPERVISOR")).cast("string"))
        .withColumn("ESTADO", trim(col("ESTADO")).cast("string"))
        # ajustar formato da data conforme vier no CSV
        # se estiver dd/MM/yyyy, use "dd/MM/yyyy"
        .withColumn("PRI_COMPRA_DT", to_date(col("PRI_COMPRA"), "dd/MM/yyyy"))
        
        # flag de loja ativa (nome NÃO começa com "X")
        .withColumn(
            "LOJA_ATIVA",
            (~col("NOME").startswith("X")).cast("boolean")
        )
        # nome limpo, sem o "X " do início
        .withColumn(
            "NOME_LIMPO",
            regexp_replace(col("NOME"), "^X\\s*", "")
        )
        # descrição do tipo da loja
        .withColumn(
            "TIPO_DESC",
            when(col("TIPO") == "I", "Importadora")
            .when(col("TIPO").isin("A", "P"), "Loja Própria")
            .when(col("TIPO") == "F", "Franquia")
            .when(col("TIPO") == "E", "E-commerce")
            .otherwise("Outro")
        )
        .select(
            "LOJA",
            "TIPO",
            "TIPO_DESC",
            "NOME",
            "NOME_LIMPO",
            "SUPERVISOR",
            "ESTADO",
            "PRI_COMPRA_DT",
            "LOJA_ATIVA"
        )
)

df_lojas_silver.write.format("delta").mode("overwrite") \
    .saveAsTable("database_mvp.silver.dim_loja_base")

In [0]:
from pyspark.sql.functions import (
    col, trim, when, regexp_replace, try_to_date
)

df_lojas_bronze = spark.table("database_mvp.bronze.lojas_raw")

df_lojas_silver = (
    df_lojas_bronze
        .withColumn("LOJA", trim(col("LOJA")).cast("string"))
        .withColumn("TIPO", trim(col("TIPO")).cast("string"))
        .withColumn("NOME", trim(col("NOME")).cast("string"))
        .withColumn("SUPERVISOR", trim(col("SUPERVISOR")).cast("string"))
        .withColumn("ESTADO", trim(col("ESTADO")).cast("string"))
        
        # tenta converter data; se for inválida, vira NULL
        .withColumn(
            "PRI_COMPRA_DT",
            try_to_date(trim(col("PRI_COMPRA")), "dd/MM/yyyy")
        )
        
        # flag de loja ativa
        .withColumn(
            "LOJA_ATIVA",
            (~col("NOME").startswith("X")).cast("boolean")
        )

        # nome limpo (sem X no início)
        .withColumn(
            "NOME_LIMPO",
            regexp_replace(col("NOME"), "^X\\s*", "")
        )

        # descrição do tipo
        .withColumn(
            "TIPO_DESC",
            when(col("TIPO") == "I", "Importadora")
            .when(col("TIPO").isin("A", "P"), "Loja Própria")
            .when(col("TIPO") == "F", "Franquia")
            .when(col("TIPO") == "E", "E-commerce")
            .otherwise("Outro")
        )

        .select(
            "LOJA",
            "TIPO",
            "TIPO_DESC",
            "NOME",
            "NOME_LIMPO",
            "SUPERVISOR",
            "ESTADO",
            "PRI_COMPRA_DT",
            "LOJA_ATIVA"
        )
)

df_lojas_silver.write.format("delta").mode("overwrite") \
    .saveAsTable("database_mvp.silver.dim_loja_base")


In [0]:
%sql
DESCRIBE TABLE database_mvp.silver.dim_loja_base;

In [0]:
%sql
SELECT *
FROM database_mvp.silver.dim_loja_base
LIMIT 50;

In [0]:
%sql
SELECT
  LOJA,
  LENGTH(LOJA) AS len_loja,
  NOME,
  LENGTH(NOME) AS len_nome,
  NOME_LIMPO,
  LENGTH(NOME_LIMPO) AS len_nome_limpo
FROM database_mvp.silver.dim_loja_base
WHERE   LENGTH(NOME) > LENGTH(NOME_LIMPO)
LIMIT 20;


In [0]:
%sql
SELECT *
FROM database_mvp.silver.dim_loja_base
WHERE LENGTH(NOME) > LENGTH(NOME_LIMPO)
LIMIT 50;


In [0]:
%sql
SELECT TIPO, TIPO_DESC, COUNT(*) AS qtde
FROM database_mvp.silver.dim_loja_base
GROUP BY TIPO, TIPO_DESC
ORDER BY qtde DESC;

In [0]:
%sql
SELECT 
  COUNT(*) AS total,
  SUM(CASE WHEN PRI_COMPRA_DT IS NULL THEN 1 ELSE 0 END) AS datas_invalidas
FROM database_mvp.silver.dim_loja_base;

In [0]:
%sql
SELECT LOJA, COUNT(*) AS qtde
FROM database_mvp.silver.dim_loja_base
GROUP BY LOJA
HAVING COUNT(*) > 1;

In [0]:
%sql
SELECT *
FROM database_mvp.silver.dim_loja_base
WHERE NOME_LIMPO LIKE 'X%';


In [0]:
# percebi durante o check de qualidade de dados que todas as data estavam NULL. Vou tratar:

from pyspark.sql.functions import col, trim, try_to_date

# 1) Ler Bronze e Silver
bronze = spark.table("database_mvp.bronze.lojas_raw")
silver = spark.table("database_mvp.silver.dim_loja_base")

# 2) Recalcular PRI_COMPRA_DT a partir da PRI_COMPRA da Bronze (yyyyMMdd)
df_corrigido = (
    silver.alias("s")
        .join(
            bronze.select(
                trim(col("LOJA")).cast("string").alias("LOJA_BZ"),
                trim(col("PRI_COMPRA")).alias("PRI_COMPRA_BZ")
            ),
            col("s.LOJA") == col("LOJA_BZ"),
            "left"
        )
        .withColumn(
            "PRI_COMPRA_DT",
            try_to_date(col("PRI_COMPRA_BZ"), "yyyyMMdd")  # ex: 20251207 → 2025-12-07
        )
        .drop("LOJA_BZ", "PRI_COMPRA_BZ")
)

# 3) Salvar de volta na Silver
df_corrigido.write.format("delta").mode("overwrite") \
    .saveAsTable("database_mvp.silver.dim_loja_base")

In [0]:
%sql
SELECT LOJA, PRI_COMPRA_DT
FROM database_mvp.silver.dim_loja_base
WHERE PRI_COMPRA_DT IS NOT NULL
LIMIT 20;

In [0]:
# CAMADA SILVER DA TABELA DE FATURAMENTO FATURAMENTO
from pyspark.sql.functions import col, substring, concat, lit, to_date, regexp_replace, trim

df_fat_bronze = spark.table("database_mvp.bronze.faturamento_raw")

df_vendas_silver = (
    df_fat_bronze
        # garantir ANOMES como texto e tirar espaços
        .withColumn("ANOMES", trim(col("ANOMES")).cast("string"))

        # derivar ano e mês
        .withColumn("ANO", substring(col("ANOMES"), 1, 4).cast("int"))
        .withColumn("MES", substring(col("ANOMES"), 5, 2).cast("int"))

        # primeiro dia do mês → para DIM_TEMPO
        .withColumn(
            "DATA_MES",
            to_date(concat(col("ANOMES"), lit("01")), "yyyyMMdd")
        )

        # padronizar nome: FILIAL → LOJA (string)
        .withColumn("LOJA", trim(col("FILIAL")).cast("string"))

        # padronizar nome: SKU → COD_SKU (string)
        .withColumn("COD_SKU", trim(col("SKU")).cast("string"))

        # quantidade numérica
        .withColumn("QTD", col("QTD").cast("int"))

        # tratar VALOR com vírgula decimal → ponto → DECIMAL
        .withColumn(
            "VALOR",
            regexp_replace(trim(col("VALOR")).cast("string"), ",", ".") \
                .cast("decimal(18,2)")
        )

        # selecionar somente colunas finais
        .select(
            "DATA_MES",
            "ANOMES",
            "ANO",
            "MES",
            "LOJA",
            "COD_SKU",
            "QTD",
            "VALOR"
        )
)

df_vendas_silver.write.format("delta").mode("overwrite") \
    .saveAsTable("database_mvp.silver.vendas_base")

In [0]:
%sql
SELECT *
FROM database_mvp.silver.vendas_base
LIMIT 20;

In [0]:
%sql
SELECT VALOR
FROM database_mvp.silver.vendas_base
WHERE VALOR IS NULL
LIMIT 20;

In [0]:
%sql
SELECT *
FROM database_mvp.silver.vendas_base
WHERE QTD < 0
LIMIT 20;

In [0]:
%sql
SELECT LOJA
FROM database_mvp.silver.vendas_base
WHERE LOJA NOT IN (SELECT LOJA FROM database_mvp.silver.dim_loja_base)
GROUP BY LOJA;