
# Cat√°logo Clientes

##  Transforma√ß√µes aplicadas no DataFrame `df_clientes`
- **Coluna `nome`**
  - Remove caracteres invis√≠veis (`\u00A0`, `\u2000-\u200B`).
  - Aplica `trim` para retirar espa√ßos extras.
  - Converte para formato capitalizado (`initcap`).

- **Coluna `cidade`**
  - Remove caracteres invis√≠veis.
  - Elimina prefixos como `da`, `de`, `do`, `das` no in√≠cio.
  - Aplica `trim` e `initcap` para padronizar.

- **Coluna `email`**
  - Remove espa√ßos extras (`trim`).
  - Converte para mai√∫sculas (`upper`).

- **Tratamento de valores nulos**
  - Substitui `NULL` em:
    - `email` ‚Üí `"Desconhecido"`
    - `nome` ‚Üí `"Desconhecido"`
    - `cidade` ‚Üí `"N√£o Informado"`
  - Remove linhas sem `id` (`na.drop(subset=["id"])`).




In [0]:
# Forma correta para ler uma tabela Delta registrada no cat√°logo/schema
df1 = spark.table("hive_metastore.bronze.vendas_clientes")
display(df1)


# df.filter(col("nome").isNull()).show();

# #Filtra registros sem valores nulos 
# df.filter(col("nome").isNotNull()).show();

# #Conta quantos valores nulos existem em uma coluna 
# df.filter(col("nome").isNull()).count();

# # Filtra valrias colunas nulas de uma vez 
# df.filter(col("nome").isNull() | col("cidade").isNull()).show();




In [0]:
from pyspark.sql.functions import col, trim, initcap, regexp_replace, upper

# Fun√ß√£o para limpar espa√ßos invis√≠veis
def limpar_espacos(campo):
    return regexp_replace(col(campo), r"[\u00A0\u2000-\u200B]", " ")

# Fun√ß√£o para normalizar nomes pr√≥prios
def normalizar_nome(campo):
    return initcap(trim(limpar_espacos(campo)))

# Fun√ß√£o para normalizar cidades (removendo prefixos da/de/do/das)
def normalizar_cidade(campo):
    return initcap(
        trim(
            regexp_replace(
                limpar_espacos(campo),
                r"^(da|de|do|das)\s+", ""
            )
        )
    )

df_clientes = (
    df1.withColumn("nome", normalizar_nome("nome"))
      .withColumn("cidade", normalizar_cidade("cidade"))
      .withColumn("email", upper(trim(col("email"))))
      .na.fill({
          "email": "DESCONHECIDO",
          "nome": "DESCONHECIDO",
          "cidade": "N√ÉO INFORMADO"
      })
      .na.drop(subset=["id"])
)

display(df_clientes)



# df_clientes.write \
#     .format("delta") \
#     .mode("append") \
#     .saveAsTable("silver.vendas_clientes")

## üì¶ Tabela `vendas_produtos`

### Tratativas Aplicadas
1. **Separa√ß√£o de nome em categoria e n√∫mero**
   - `categoria`: primeira parte do campo `nome`.
   - `numero`: segunda parte do campo `nome`, convertido para inteiro.

2. **Corre√ß√£o de categorias**
   - Uso de dicion√°rio de corre√ß√µes para padronizar valores.
   - Exemplos: `"movel"` ‚Üí `"M√≥vel"`, `"eletronico"` ‚Üí `"Eletr√¥nico"`.
   - Se nulo ‚Üí `"Desconhecido"`.

3. **Reconstru√ß√£o do nome corrigido**
   - `nome_corrigido`: concatena√ß√£o de `categoria_corrigida` + `numero`.

4. **Limpeza de dados**
   - Remo√ß√£o de registros sem `id`.
   - Aplica√ß√£o de `trim` para remover espa√ßos extras.

5. **Sele√ß√£o final de colunas**
   - Mantidas: `id`, `nome_corrigido`, `categoria_corrigida`.

6. **Persist√™ncia**
   - Dados gravados em `silver.vendas_produtos`.
   - Formato: Delta Lake.
   - Modo: `append`.




In [0]:
from pyspark.sql.functions import split, col
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType
from pyspark.sql.functions import concat_ws
from pyspark.sql.functions import when, col,trim, abs , round


df2 = spark.table("hive_metastore.bronze.vendas_produtos")
display(df2)

# separa em duas partes: antes e depois do espa√ßo
df2 = df2.withColumn("categoria", split(col("nome"), " ")[0]) \
         .withColumn("numero", split(col("nome"), " ")[1].cast("int"))

df2 = df2.select("id","nome","categoria", "numero", "preco")

df2.select('categoria').distinct().show()


correcoes = {
    
    "movel": "M√≥vel",
    "m√≥vei": "M√≥vel",   # corrige o erro "M√≥vei"
    "eletronico": "Eletr√¥nico",
    "brinquedo": "Brinquedo",
    "alimento": "Alimento",
    "roupa": "Roupa",
    "livro": "Livro"


}

def corrigir_categoria(cat):
    if cat is None:
        return "Desconhecido"
    return correcoes.get(cat.lower(), cat)

corrigir_udf = udf(corrigir_categoria, StringType())

df2 = df2.withColumn("categoria_corrigida", corrigir_udf(col("categoria")))

df2 = df2.select("id","nome","categoria_corrigida", "numero", "preco")


df2 = df2.withColumn(
    "nome_corrigido",
    concat_ws(" ", col("categoria_corrigida"), col("numero"))
)

df2 = df2.select("id", "nome_corrigido","categoria_corrigida")



df2 = df2.select("id", "nome_corrigido","categoria_corrigida")

df2 = df2.na.drop(subset=["id"])

df2 = df2.withColumn("nome_corrigido", trim(col("nome_corrigido"))) \
         .withColumn("categoria_corrigida", trim(col("categoria_corrigida")))

display(df2)

df2.write \
    .format("delta") \
    .mode("append") \
    .saveAsTable("silver.vendas_produtos")


display(df2)



# Cat√°logo de vendas_itens__pedidos

Este documento descreve as regras de tratamento aplicadas √† tabela `vendas_itens__pedidos` no processo de ETL (Extract, Transform, Load) utilizando **PySpark**.

---

## üéØ Objetivo
- Garantir integridade dos dados.
- Corrigir valores inconsistentes (nulos ou negativos).
- Padronizar colunas para an√°lise.
- Persistir os dados limpos na camada **Silver**.

---

## üîß Tratativas Aplicadas

### 1. Remo√ß√£o de Registros Inv√°lidos
- Linhas com `id` nulo s√£o descartadas.
- Isso garante que cada item de pedido tenha uma chave prim√°ria v√°lida.

---

### 2. Corre√ß√£o de Pre√ßo Unit√°rio
- Coluna: `preco_unitario_corrigido`
- Regras:
  - Se `preco_unitario` for **nulo**, substitui por `0`.
  - Se `preco_unitario` for **negativo**, aplica valor absoluto.
  - Arredondamento para **2 casas decimais**.

---

### 3. Corre√ß√£o de Quantidade
- Coluna: `quantidade_corrigida`
- Regras:
  - Se `quantidade` for **nula**, substitui por `0`.
  - Caso contr√°rio, mant√©m o valor original.

---

### 4. Sele√ß√£o de Colunas Relevantes
Somente as seguintes colunas s√£o mantidas:
- `id`
- `pedido_id`
- `produto_id`
- `quantidade_corrigida`
- `preco_unitario_corrigido`

---

### 5. Persist√™ncia dos Dados
- Os dados tratados s√£o gravados na tabela **Silver**:
  - Nome: `silver.vendas_itens__pedidos`
  - Formato: **Delta Lake**
  - Modo: `append` (adiciona registros sem sobrescrever os existentes)

---



In [0]:
from pyspark.sql.functions import col, when, abs

df3 = spark.table("hive_metastore.bronze.vendas_itens__pedidos")


df3 = df3.na.drop(subset=["id"])


df3 = (
    df3
    .withColumn(
        "preco_unitario_corrigido",
        when(col("preco_unitario").isNull(), 0)
        .otherwise(abs(col("preco_unitario")))
    )
    .withColumn(
        "quantidade_corrigida",
        when(col("quantidade").isNull(), 0)
        .otherwise(col("quantidade"))
    )
)

df3 = df3.withColumn("preco_unitario_corrigido", round(col("preco_unitario_corrigido"), 2))

df3 = df3.select("id", "pedido_id","produto_id","quantidade_corrigida","preco_unitario_corrigido")

display(df3)


df3.write \
    .format("delta") \
    .mode("append") \
    .saveAsTable("silver.vendas_itens__pedidos")

# Cat√°logo pedidos

# Altera√ß√µes realizadas no DataFrame `df3`

## 1. Leitura da Tabela Bronze
- Carregamento da tabela **`hive_metastore.bronze.vendas_itens__pedidos`** como fonte inicial.

---

## 2. Remo√ß√£o de Registros Inv√°lidos
- Exclus√£o de linhas com `id` nulo utilizando `na.drop`.

---

## 3. Corre√ß√£o da Coluna `preco_unitario`
- Criada a coluna **`preco_unitario_corrigido`**:
  - Valores nulos substitu√≠dos por **0**.
  - Valores negativos transformados em positivos com `abs`.
  - Arredondamento aplicado para **2 casas decimais**.

---

## 4. Corre√ß√£o da Coluna `quantidade`
- Criada a coluna **`quantidade_corrigida`**:
  - Valores nulos substitu√≠dos por **0**.
  - Valores v√°lidos mantidos.

---

## 5. Sele√ß√£o de Colunas Relevantes
- Mantidas apenas as colunas necess√°rias:
  - `id`
  - `pedido_id`
  - `produto_id`
  - `quantidade_corrigida`
  - `preco_unitario_corrigido`

---

## 6. Escrita na Tabela Silver
- Persist√™ncia dos dados transformados na tabela **`silver.vendas_itens__pedidos`**.
- Formato: **Delta**.
- Modo: **append** (acr√©scimo de registros).

---




In [0]:
df4 = spark.table("hive_metastore.bronze.vendas_pedidos")

df4 = df4.select("id", "cliente_id", "data")

df4 = df4.na.drop(subset=["id"])

df4 = df4.dropDuplicates(["id"])

display(df4)

df4.write \
    .format("delta") \
    .mode("append") \
    .saveAsTable("silver.vendas_pedidos")
