### **2. Tarefas e Aplicações Práticas com Dados**
*(Notebook : `02_Manipulacao_e_IO_Basico`)*

1 - Manipulação Básica (Tarefas que envolvem Listas e Dicionários)

2 - Tratar Dados (Lógica de limpeza e validação de informações)

3 - Funções (O ato de usar as funções criadas)

4 - Ler/Escrever Arquivos (Operações de I/O para carregar e salvar dados)

5 - Pandas (Uso da biblioteca para análise tabular)

6 - ETL (Extrair, Transformar, Carregar) (Processo de engenharia de dados que usa todas as ferramentas)




#### 1 - Manipulação Básica (Tarefas que envolvem Listas e Dicionários)

**O que é**

São as estruturas mais simples para guardar dados em Python.
Quase tudo que você faz com dados começa com listas e dicionários.

- Lista: coleção ordenada → [1,2,3]
- Dicionário: dados em formato chave/valor → {"nome": "Ana"}


In [0]:
# Exemplo 1
alunos = ["Ana", "Pedro", "João"]
print(alunos[0])      # Ana

cliente = {"id": 1, "nome": "Maria"}
print(cliente["nome"])  # Maria


Ana
Maria


In [0]:
# Exemplo 2 (prático (Engenharia de Dados))
# Você recebe um JSON de API:
registro = {
    "id": 123,
    "nome": "João",
    "compras": ["pizza", "refrigerante", "doce"]
}


# A partir disso você:
# - extrai data
# - normaliza
# - transforma em tabela

for item in registro["compras"]:
    print(f"Cliente {registro['nome']} comprou {item}")


# Usado para:
# - entender payloads
# - quebrar arrays
# - organizar dados antes de enviar ao Spark, S3 etc.

Cliente João comprou pizza
Cliente João comprou refrigerante
Cliente João comprou doce


#### 2 - Tratar Dados (Lógica de limpeza e validação de informações)

**O que é**

É garantir que os dados estejam corretos, completos e confiáveis antes de seguir para o restante do pipeline.

Envolve:
- remover valores inválidos
- converter tipos
- padronizar textos
- remover duplicados
- validar regras


In [0]:
#Exemplo 1
idade = "25"
idade = int(idade)  # Converte o texto "25" para o número 25, permitindo cálculos.

nome = "  ana  "
nome = nome.strip().title()  # Remove espaços extras e coloca a primeira letra em maiúscula ("Ana"). 

#.strip() remove os espaços das pontas, e .title() coloca a primeira letra em maiúscula.

email = ""
if not email:
    print("Email inválido")  # Verifica se a variável está vazia; se sim, avisa que é inválido.

Email inválido


In [0]:
# Exemplo prático (Engenharia de Dados)
# Você recebe um CSV cheio de problemas:

# id;nome;idade
# 1;  JOAO ;20
# 2;ANA; 
# 3;Maria;abc
#arquivo: https://raw.githubusercontent.com/andressa-mangolin/pyspark-fundamentos-e-pratica/refs/heads/main/datasets/arquivo_teste.csv

#Você trata no Spark:

from pyspark.sql.functions import trim, col

df = df.withColumn("nome", trim(col("nome")))
df = df.filter(col("idade").cast("int").isNotNull())

# Problemas resolvidos:
# - espaços indesejados
# - idade não numérica
# - linhas vazias
# - Isso é “tratamento”.

In [0]:
# codigo completo
import requests
from pyspark.sql.functions import trim, col

# Caminhos 
url = "https://raw.githubusercontent.com/andressa-mangolin/pyspark-fundamentos-e-pratica/refs/heads/main/datasets/arquivo_teste.csv"
# Caminho de destino no Unity Catalog/DBFS:
dbfs_path = "/Volumes/workspace/lhdw/landingzone/vendas/processar/arquivo_teste.csv" 

# 1. Baixa o conteúdo do arquivo da URL (resolve o erro HTTPS)
response = requests.get(url)
response.raise_for_status() 

# 2. Salva o conteúdo no sistema de arquivos do Databricks (DBFS)
dbutils.fs.put(
    dbfs_path,
    response.text,
    overwrite=True
)

print(f"✅ Arquivo de teste salvo com sucesso em: {dbfs_path}")

# --- 2. LEITURA E TRATAMENTO (PYSPARK) ---

# 1. Lê o DataFrame a partir do local onde foi salvo
df = spark.read.csv(
    dbfs_path,  
    sep=";", 
    header=True
)

# 2. TRATAMENTO: Limpeza de espaços e remoção de linhas inválidas (Solução Definitiva)

# A. Limpa espaços extras da coluna 'nome'
df = df.withColumn("nome", trim(col("nome")))

# B. Limpa espaços e aplica a conversão segura (try_cast) na 'idade'.
# O try_cast resolve o erro 'CAST_INVALID_INPUT' retornando NULL se for 'abc' ou '""'.
df = df.withColumn("idade", trim(col("idade")))
df = df.withColumn("idade", col("idade").try_cast("int"))

# C. Filtra: Remove todas as linhas onde o try_cast falhou (ou seja, onde a idade é NULL).
df = df.filter(col("idade").isNotNull())

# --- 3. VALIDAÇÃO ---

print("\n--- DATAFRAME FINAL LIMPO (PROVA DE SUCESSO) ---")
df.show()

print("--- ESTRUTURA FINAL (Tipo de dado corrigido) ---")
df.printSchema()

Wrote 46 bytes.
✅ Arquivo de teste salvo com sucesso em: /Volumes/workspace/lhdw/landingzone/vendas/processar/arquivo_teste.csv

--- DATAFRAME FINAL LIMPO (PROVA DE SUCESSO) ---
+---+----+-----+
| id|nome|idade|
+---+----+-----+
|  1|JOAO|   20|
+---+----+-----+

--- ESTRUTURA FINAL (Tipo de dado corrigido) ---
root
 |-- id: string (nullable = true)
 |-- nome: string (nullable = true)
 |-- idade: integer (nullable = true)



In [0]:
from pyspark.sql.functions import trim, col

# O DataFrame 'df' já está carregado do caminho dbfs_path.

# 1. Limpa o NOME (remove espaços)
df = df.withColumn("nome", trim(col("nome")))

# 2. Limpa e Valida a IDADE (remove lixo e converte para número)
# Limpa espaços e usa try_cast para converter string para INT de forma segura (sem travar).
df = df.withColumn("idade", trim(col("idade")).try_cast("int"))

# 3. Filtra e remove todas as linhas inválidas (onde a idade virou NULL)
df = df.filter(col("idade").isNotNull())

# Exibe o resultado limpo
df.show()
df.printSchema()

+---+----+-----+
| id|nome|idade|
+---+----+-----+
|  1|JOAO|   20|
+---+----+-----+

root
 |-- id: string (nullable = true)
 |-- nome: string (nullable = true)
 |-- idade: integer (nullable = true)



#### 3 - Funções (O ato de usar as funções criadas)

**O que é:**

São blocos de código reaproveitáveis.
Engenharia de dados usa MUITO para evitar repetir lógica.

In [0]:
#Exemplo simples
# Define a função 'somar' que recebe dois valores (a e b).
def somar(a, b):
    # Devolve o resultado da soma de 'a' mais 'b'.
    return a + b

# Chama a função 'somar' com 5 e 3 e imprime o resultado (8).
print(somar(5, 3))


8


In [0]:
# Exemplo prático 
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType
from datetime import datetime # Necessário para o exemplo da data

# --- PARTE 1: CPF ---limpar o CPF
def limpar_cpf(cpf):
    return cpf.replace(".", "").replace("-", "")

# PASSO CRÍTICO: Registra a função Python como UDF. 
# Isso cria a variável 'limpar_cpf_udf'.
limpar_cpf_udf = udf(limpar_cpf, StringType())


# E usa no pipeline:
# Aplica a regra (limpar_cpf_udf) à coluna 'cpf'.
df = df.withColumn("cpf_limpo", limpar_cpf_udf(col("cpf")))


# --- PARTE 2: DATAS ---função para padronizar datas:
# Define a regra Python para formatar a data
def normalizar_data(data):
    return datetime.strptime(data, "%d/%m/%Y").strftime("%Y-%m-%d")

# Para usar esta função em uma coluna, você também teria que registrá-la:
# normalizar_data_udf = udf(normalizar_data, StringType())
# df = df.withColumn("data_normalizada", normalizar_data_udf(col("data_original")))

In [0]:
# Ação: Define uma regra para limpar o CPF.
def limpar_cpf(cpf):
    # Regra: Remove pontos e hífens.
    return cpf.replace(".", "").replace("-", "")

# --- TESTE ---
cpf_sujo = "123.456.789-00"

# Executa a regra com o valor de teste e imprime o resultado.
print(limpar_cpf(cpf_sujo))

12345678900


In [0]:
# Caminhos 
url = "https://raw.githubusercontent.com/andressa-mangolin/pyspark-fundamentos-e-pratica/refs/heads/main/datasets/clientes_cadastro.csv"
# Caminho de destino no Unity Catalog/DBFS:
dbfs_path = "/Volumes/workspace/lhdw/landingzone/vendas/processar/clientes_cadastro.csv" 

# 1. Baixa o conteúdo do arquivo da URL (resolve o erro HTTPS)
response = requests.get(url)
response.raise_for_status() 

# 2. Salva o conteúdo no sistema de arquivos do Databricks (DBFS)
dbutils.fs.put(
    dbfs_path,
    response.text,
    overwrite=True
)

print(f"✅ Arquivo de teste salvo com sucesso em: {dbfs_path}")



Wrote 255 bytes.
✅ Arquivo de teste salvo com sucesso em: /Volumes/workspace/lhdw/landingzone/vendas/processar/clientes_cadastro.csv


In [0]:
from pyspark.sql.functions import col, regexp_replace, length
import requests

# --- 1. CONFIGURAÇÃO E LEITURA (SETUP) ---

# Define os caminhos do seu projeto
url = "https://raw.githubusercontent.com/andressa-mangolin/pyspark-fundamentos-e-pratica/refs/heads/main/datasets/clientes_cadastro.csv"
dbfs_path_leitura = "/Volumes/workspace/lhdw/landingzone/vendas/processar/clientes_cadastro.csv" 
caminho_salvamento_final = "/Volumes/workspace/lhdw/landingzone/vendas/processar/clientes_cadastro_corrigo.parquet"

# [Garante que o arquivo de leitura exista]
response = requests.get(url)
response.raise_for_status()
dbutils.fs.put(dbfs_path_leitura, response.text, overwrite=True)
print(f"✅ Arquivo de dados brutos salvo em: {dbfs_path_leitura}")

# Lê o DataFrame
df = spark.read.csv(
    dbfs_path_leitura,  
    sep=";", 
    header=True
)

# --- 2. CORREÇÃO DE ERRO E LIMPEZA ---

# CORRIGE O ERRO UNRESOLVED_COLUMN: Remove espaços em nomes de coluna.
for nome_coluna in df.columns:
    df = df.withColumnRenamed(nome_coluna, nome_coluna.strip())

# Cria a coluna 'cpf_limpo' removendo TUDO que não é dígito (a 'Peneira Mágica').
df = df.withColumn(
    "cpf_limpo", 
    regexp_replace(col("cpf_original"), "[^0-9]", "")
)

# --- 3. VALIDAÇÃO E FILTRAGEM ---

# REGRA 1: O CPF ORIGINAL NÃO pode ser nulo.
regra_nao_nulo = col("cpf_original").isNotNull()

# REGRA 2: O CPF LIMPO deve ter EXATAMENTE 11 dígitos.
regra_tamanho_11 = (length(col("cpf_limpo")) == 11)

# REGRA FINAL: Combina as duas: deve ser NÃO NULO E ter 11 dígitos.
REGRA_VALIDADE = regra_nao_nulo & regra_tamanho_11

# Filtra o DataFrame, mantendo APENAS os registros que passaram em TODAS as regras.
df_validos = df.where(REGRA_VALIDADE)

# --- 4. SALVAMENTO E PROVA FINAL ---

# Salva o DataFrame APENAS com os registros válidos no formato PARQUET.
df_validos.write.mode("overwrite").parquet(caminho_salvamento_final)
print(f"\n💾 Arquivo CORRIGIDO salvo com sucesso em: {caminho_salvamento_final}")

# Mostra o resultado final (o que foi salvo).
print("\n--- CONTEÚDO DO ARQUIVO SALVO (Válidos) ---")
df_validos.show()

Wrote 255 bytes.
✅ Arquivo de dados brutos salvo em: /Volumes/workspace/lhdw/landingzone/vendas/processar/clientes_cadastro.csv

💾 Arquivo CORRIGIDO salvo com sucesso em: /Volumes/workspace/lhdw/landingzone/vendas/processar/clientes_cadastro_corrigo.parquet

--- CONTEÚDO DO ARQUIVO SALVO (Válidos) ---
+---+-----------+--------------+-----+-------+-----------+
| id|       nome|  cpf_original|idade|  renda|  cpf_limpo|
+---+-----------+--------------+-----+-------+-----------+
|  1| João Silva|123.456.789-00|   30|2500.50|12345678900|
|  2|Maria Souza|   99911122233|   45|4800.00|99911122233|
|  3|Pedro Alves|   11122233344|   22|1500.00|11122233344|
|  5| Rui Santos|   66677788899|   55|7000.00|66677788899|
|  6| Paula Lima|   00000000000|   19| 900.00|00000000000|
+---+-----------+--------------+-----+-------+-----------+



## Desmontagem e Reformatação do CPF

A função **`substr(A, B)`** é utilizada para **cortar** o CPF limpo (sem pontos ou traços, ex: `12345678900`) em pedaços.

O objetivo é alimentar o comando **`format_string`** para adicionar a pontuação de volta (`XXX.XXX.XXX-XX`).

### Como a Função `substr(A, B)` Corta a String

* **A** = Posição onde o corte **começa** (contando do 1).
* **B** = **Quantidade** de caracteres a serem pegos a partir do início.

| Peça | Código `substr(A, B)` | Início (A) | Quantidade (B) | O que a Peça Contém (Exemplo: 12345678900) |
| :---: | :---: | :---: | :---: | :--- |
| **1ª Peça** | `substr(1, 3)` | 1 | 3 | Pega os 3 primeiros dígitos. Resultado: **123** |
| **2ª Peça** | `substr(4, 3)` | 4 | 3 | Começa no 4º dígito, pega 3. Resultado: **456** |
| **3ª Peça** | `substr(7, 3)` | 7 | 3 | Começa no 7º dígito, pega 3. Resultado: **789** |
| **4ª Peça** | `substr(10, 2)` | 10 | 2 | Pega os 2 últimos (dígitos verificadores). Resultado: **00** |

---

### Montagem Final

O comando **`format_string`** usa esses quatro pedaços e os une, inserindo os separadores:

`"%s.%s.%s-%s"` $\rightarrow$ **`123.456.789-00`**

In [0]:
from pyspark.sql.functions import col, regexp_replace, length, format_string
import requests

# --- 1. CONFIGURAÇÃO E LEITURA (SETUP) ---

# Define os caminhos. Assumimos que o arquivo está na URL original.
url = "https://raw.githubusercontent.com/andressa-mangolin/pyspark-fundamentos-e-pratica/refs/heads/main/datasets/clientes_cadastro.csv"
dbfs_path_leitura = "/Volumes/workspace/lhdw/landingzone/vendas/processar/clientes_cadastro.csv" 
caminho_salvamento_final = "/Volumes/workspace/lhdw/landingzone/vendas/processar/clientes_cadastro_corrigo.parquet"

# [Garante que o arquivo de leitura exista]
response = requests.get(url)
response.raise_for_status()
dbutils.fs.put(dbfs_path_leitura, response.text, overwrite=True)

# Lê o DataFrame
df = spark.read.csv(
    dbfs_path_leitura,  
    sep=";", 
    header=True
)

# --- 2. PREPARAÇÃO E VALIDAÇÃO ---

#  CORREÇÃO: Remove espaços em nomes de coluna (Resolve o erro UNRESOLVED_COLUMN).
for nome_coluna in df.columns:
    df = df.withColumnRenamed(nome_coluna, nome_coluna.strip())

# A) LIMPEZA: Cria 'cpf_limpo' removendo tudo que não for dígito.
df = df.withColumn(
    "cpf_limpo", 
    regexp_replace(col("cpf_original"), "[^0-9]", "")
)

# B) REGRA: O CPF tem que SER NÃO NULO E ter 11 dígitos.
REGRA_VALIDADE = (col("cpf_original").isNotNull()) & (length(col("cpf_limpo")) == 11)

# C) FILTRAGEM: Mantém APENAS os registros VÁLIDOS.
df_validos = df.where(REGRA_VALIDADE)

# --- 3. REFORMATAÇÃO VISUAL (PASSO NOVO) ---

# Reverte o CPF limpo (11 dígitos) para o formato de exibição (XXX.XXX.XXX-XX).
df_final = df_validos.withColumn(
    "cpf_formatado",
    format_string(
        "%s.%s.%s-%s",
        # Usa SUBSTRING para pegar as partes: 3 primeiros, 3 seguintes, 3 seguintes, 2 finais.
        col("cpf_limpo").substr(1, 3),
        col("cpf_limpo").substr(4, 3),
        col("cpf_limpo").substr(7, 3),
        col("cpf_limpo").substr(10, 2)
    )
)

# --- 4. SALVAMENTO FINAL ---

# Salva o DataFrame FINAL (validado e formatado) no formato PARQUET.
df_final.write.mode("overwrite").parquet(caminho_salvamento_final)
print(f"\nArquivo CORRIGIDO e FORMATADO salvo com sucesso em: {caminho_salvamento_final}")

# --- 5. VERIFICAÇÃO ---
print("\n--- REGISTROS SALVOS (Com Formato Visual) ---")
# Mostra o resultado final, incluindo a coluna formatada.
df_final.select("nome", "cpf_original", "cpf_formatado", "idade").show(truncate=False)

Wrote 255 bytes.

Arquivo CORRIGIDO e FORMATADO salvo com sucesso em: /Volumes/workspace/lhdw/landingzone/vendas/processar/clientes_cadastro_corrigo.parquet

--- REGISTROS SALVOS (Com Formato Visual) ---
+-----------+--------------+--------------+-----+
|nome       |cpf_original  |cpf_formatado |idade|
+-----------+--------------+--------------+-----+
|João Silva |123.456.789-00|123.456.789-00|30   |
|Maria Souza|99911122233   |999.111.222-33|45   |
|Pedro Alves|11122233344   |111.222.333-44|22   |
|Rui Santos |66677788899   |666.777.888-99|55   |
|Paula Lima |00000000000   |000.000.000-00|19   |
+-----------+--------------+--------------+-----+



#### 4 - Ler/Escrever Arquivos (Operações de I/O para carregar e salvar dados)
**O que é**

São operações para interagir com arquivos:

- CSV
- JSON
- TXT
- Parquet (no Spark)
- S3, HDFS, Blob, etc.


In [0]:
# No PySpark, as operações de I/O são distribuídas e focam em formatos otimizados, como o Parquet, e sistemas de armazenamento em nuvem (como S3, que é muito usado no AWS).

# Lendo Dados Brutos (Input):
# Lê um arquivo CSV de um bucket S3 (fonte de dados brutos)
# df = spark.read.csv("s3://meu-bucket/raw/clientes.csv", header=True)

#Salvando Dados Limpos (Output):
# Escreve o resultado (df) no formato Parquet (otimizado e particionado)
# Este é o coração de um pipeline, movendo dados para a zona 'bronze' ou 'trusted'.
#df.write.parquet("s3://meu-bucket/bronze/clientes/")

In [0]:
from pyspark.sql.functions import lit

# --- 1. LEITURA SIMPLES (INPUT) ---
# Lê o arquivo CSV previamente salvo no DBFS
df_input = spark.read.csv(
    "/Volumes/workspace/lhdw/landingzone/vendas/processar/clientes_cadastro.csv",  
    sep=";", 
    header=True
)

# Adiciona uma coluna de processamento mínima
df_processado = df_input.withColumn("status", lit("PROCESSADO"))


# --- 2. ESCRITA SIMPLES (OUTPUT) ---
# Salva o resultado no formato Parquet
df_processado.write.mode("overwrite").parquet(
    "/Volumes/workspace/lhdw/landingzone/vendas/processar/clientes_cadastro_corrigo.parquet"
)

print("✅ Leitura e escrita completas.")

✅ Leitura e escrita completas.


In [0]:
%sh ls /Volumes/workspace/lhdw/landingzone/vendas/processar/

arquivo_teste.csv
clientes_cadastro_corrigo.parquet
clientes_cadastro.csv
customers.csv
geolocation.csv
order_items.csv
order_payments.csv
order_reviews.csv
orders.csv
product_category_name_translation.csv
products.csv
sellers.csv


#### 5 - Pandas (Uso da biblioteca para análise tabular)
**O que é:**
- Estrutura Principal: O Pandas organiza os dados em um objeto chamado DataFrame, que é uma tabela com linhas e colunas.
- Finalidade: Ideal para tratamento, limpeza, análise e preparação de dados.


In [0]:
# Exemplos de Uso Simples
# Para começar a usar, você importa a biblioteca (o alias pd é o padrão) e carrega seus dados.

import pandas as pd

# Define o caminho real do arquivo CSV no Unity Catalog/DBFS
caminho_csv = "/Volumes/workspace/lhdw/landingzone/vendas/processar/clientes_cadastro.csv"

# LÊ O ARQUIVO USANDO O CAMINHO COMPLETO E O SEPARADOR CORRETO
df = pd.read_csv(caminho_csv, sep=';')

# Calcula a média da coluna 'idade' e a armazena em uma nova coluna
df["idade_media"] = df["idade"].mean()

# Exibe as primeiras 5 linhas do DataFrame
print(df.head())

   id         nome    cpf_original  idade   renda  idade_media
0   1   João Silva  123.456.789-00     30  2500.5    33.166667
1   2  Maria Souza     99911122233     45  4800.0    33.166667
2   3  Pedro Alves     11122233344     22  1500.0    33.166667
3   4    Ana Costa  444.555.666-AA     28  3200.0    33.166667
4   5   Rui Santos     66677788899     55  7000.0    33.166667


In [0]:
%sh ls -l /Volumes/workspace/lhdw/landingzone/vendas/processar/

total 114313
-rwxrwxrwx 1 nobody nogroup       46 Dec 10 20:38 arquivo_teste.csv
drwxrwxrwx 2 nobody nogroup     4096 Dec 10 18:18 clientes_cadastro_corrigo.parquet
-rwxrwxrwx 1 nobody nogroup      255 Dec 10 20:38 clientes_cadastro.csv
-rwxrwxrwx 1 nobody nogroup       46 Dec  9 17:55 customers.csv
-rwxrwxrwx 1 nobody nogroup 61273883 Dec  4 22:06 geolocation.csv
-rwxrwxrwx 1 nobody nogroup 15438671 Dec  4 22:06 order_items.csv
-rwxrwxrwx 1 nobody nogroup  5777138 Dec  4 22:06 order_payments.csv
-rwxrwxrwx 1 nobody nogroup 14346950 Dec  4 22:06 order_reviews.csv
-rwxrwxrwx 1 nobody nogroup 17654914 Dec  4 22:06 orders.csv
-rwxrwxrwx 1 nobody nogroup     2542 Dec  4 22:06 product_category_name_translation.csv
-rwxrwxrwx 1 nobody nogroup  2379446 Dec  4 22:06 products.csv
-rwxrwxrwx 1 nobody nogroup   174703 Dec  4 22:06 sellers.csv


####6 - ETL (Extrair, Transformar, Carregar) (Processo de engenharia de dados que usa todas as ferramentas)

**O que é**

É o processo principal de engenharia de dados e usa todos os tópicos acima.

E = Extrair → pegar dados da origem

T = Transformar → tratar / limpar / padronizar

L = Carregar → salvar em outro formato ou banco

As transformações mais usadas em ETL são sempre focadas em limpar, padronizar, enriquecer e estruturar os dados.
Vou te passar a lista completa — as que realmente aparecem todos os dias no trabalho de Engenharia de Dados.

## Resumo das Transformações mais Usadas no ETL

| **Categoria**        | **Exemplos de Transformações**                        |
|----------------------|--------------------------------------------------------|
| **Limpeza**          | Remover nulos, eliminar duplicados, corrigir erros     |
| **Casting**          | Converter `string → int`, `string → float`, `string → date` |
| **Padronização**     | Padronizar nomes, cidades, normalizar texto            |
| **Datas**            | Parse de datas, extrair ano/mês/dia, calcular idade    |
| **Agregação**        | Soma, média, mínimo, máximo, contagem                  |
| **Join**             | Unir tabelas com `inner`, `left`, `right`, `full`      |
| **Enriquecimento**   | Adicionar informações externas (ex: tabela de referência) |
| **Regras de negócio**| Criar flags, categorias, classificações                |
| **Pivot**            | Transformar linhas em colunas ou colunas em linhas     |


### 1. Limpeza de valores (Cleaning)
O que é:

Remover ou ajustar dados inválidos, nulos, duplicados ou inconsistentes.

Exemplos:

- Remover linhas com valores vazios
- Tratar “NULL”, “-”, “sem dado”
- Remover espaços em excesso
- Excluir duplicatas

In [0]:
from pyspark.sql.functions import col, trim 

# Recarrega o DataFrame com o PySpark.
# Isso garante que 'df' seja um objeto PySpark e reconheça o método '.withColumn'.
caminho_parquet = "/Volumes/workspace/lhdw/landingzone/vendas/processar/clientes_cadastro_corrigo.parquet"
df = spark.read.parquet(caminho_parquet) 

# 1. TRATAMENTO (Limpar espaços do 'nome')
df = df.withColumn("nome", trim(col("nome")))

# 2. FILTRAGEM (Remover nulos na 'idade')
df = df.filter(col("idade").isNotNull())

# 3. DEDUPLICAÇÃO (Remover linhas duplicadas baseadas no 'id')
df = df.dropDuplicates(["id"])

# O DataFrame 'df' agora contém todas as transformações aplicadas.
df.show()

+---+-----------+--------------+-----+-------+----------+
| id|       nome|  cpf_original|idade|  renda|    status|
+---+-----------+--------------+-----+-------+----------+
|  2|Maria Souza|   99911122233|   45|4800.00|PROCESSADO|
|  6| Paula Lima|   00000000000|   19| 900.00|PROCESSADO|
|  4|  Ana Costa|444.555.666-AA|   28|3200.00|PROCESSADO|
|  5| Rui Santos|   66677788899|   55|7000.00|PROCESSADO|
|  1| João Silva|123.456.789-00|   30|2500.50|PROCESSADO|
|  3|Pedro Alves|   11122233344|   22|1500.00|PROCESSADO|
+---+-----------+--------------+-----+-------+----------+



In [0]:
from pyspark.sql.functions import col, trim

# Caminho do seu arquivo PARQUET corrigido
caminho_parquet = "/Volumes/workspace/lhdw/landingzone/vendas/processar/clientes_cadastro_corrigo.parquet"

# Carrega o arquivo usando o Spark. Isso garante que 'df' seja um objeto PySpark.
df = spark.read.parquet(caminho_parquet)

# Cria um novo DataFrame 'df_corrigido' com todas as etapas em sequência
df_corrigido = df.withColumn("nome", trim(col("nome"))) \
                 .filter(col("idade").isNotNull()) \
                 .dropDuplicates(["id"])

print("✅ O código funcionou após recarregar com spark.read!")
df_corrigido.show()

✅ O código funcionou após recarregar com spark.read!
+---+-----------+--------------+-----+-------+----------+
| id|       nome|  cpf_original|idade|  renda|    status|
+---+-----------+--------------+-----+-------+----------+
|  2|Maria Souza|   99911122233|   45|4800.00|PROCESSADO|
|  6| Paula Lima|   00000000000|   19| 900.00|PROCESSADO|
|  4|  Ana Costa|444.555.666-AA|   28|3200.00|PROCESSADO|
|  5| Rui Santos|   66677788899|   55|7000.00|PROCESSADO|
|  1| João Silva|123.456.789-00|   30|2500.50|PROCESSADO|
|  3|Pedro Alves|   11122233344|   22|1500.00|PROCESSADO|
+---+-----------+--------------+-----+-------+----------+



### 2. Cast de Tipos (Type Casting)

**O que é:**

Converter tipos para os corretos, ex: string → int, string → date.

Exemplos:

- “25” → 25
- “2023-10-01” → formato data real
- “true” → boolean

In [0]:
# Assume-se que 'df' é um PySpark DataFrame carregado.

# 1. Converte o valor da coluna 'idade' (que era texto) para número inteiro (int).
# Isso é obrigatório para fazer qualquer operação de média ou soma.
#df = df.withColumn("idade", col("idade").cast("int"))

# 2. Converte o valor da coluna 'data' (que era texto) para o formato de calendário (date).
# Isso é necessário para usar a coluna em filtros de tempo (ex: "depois de Janeiro/2025").
#df = df.withColumn("data", col("data").cast("date"))


In [0]:
from pyspark.sql.functions import col, trim # Importa tudo que você precisará!
from pyspark.sql import SparkSession # Garante que a sessão Spark esteja ativa (caso não esteja no Databricks)

# Se você não estiver no Databricks, use isso para criar a sessão Spark:
# spark = SparkSession.builder.appName("TipoConverter").getOrCreate()

# --- CAMINHO ---
caminho_parquet = "/Volumes/workspace/lhdw/landingzone/vendas/processar/clientes_cadastro_corrigo.parquet"

# --- CARREGAMENTO (Garante que 'df' é PySpark) ---
try:
    df = spark.read.parquet(caminho_parquet) 
except Exception as e:
    print(f"Erro ao ler o arquivo. Verifique se o caminho está 100% correto e se o arquivo existe: {e}")
    # Se o erro for aqui, o problema é no caminho ou na permissão!
    # Não execute o resto do código se o carregamento falhar.
    raise

# 1. CONVERTE IDADE: Texto -> Número Inteiro (int)
df = df.withColumn("idade", col("idade").cast("int"))

# 2. CONVERTE DATA: Texto -> Data de Calendário (date)
# (Assumindo que 'data_cadastro' seja o nome correto da sua coluna de data)
df = df.withColumn("data_cadastro", col("data_cadastro").cast("date")) 

# 3. CONVERTE RENDA: Texto -> Número Decimal (double)
# 'double' é usado para números com casas decimais (float)
df = df.withColumn("renda", col("renda").cast("double"))

# 4. CONVERTE STATUS: Texto -> Número Curto (short)
# 'short' é usado para números inteiros pequenos (códigos, IDs, etc.)
df = df.withColumn("status", col("status").cast("short"))

In [0]:
from pyspark.sql.functions import col, trim 
from pyspark.sql import SparkSession 

# --- CAMINHO ---
caminho_parquet = "/Volumes/workspace/lhdw/landingzone/vendas/processar/clientes_cadastro_corrigo.parquet"

# --- CARREGAMENTO ---
try:
    df = spark.read.parquet(caminho_parquet) 
except Exception as e:
    print(f"Erro ao ler o arquivo: {e}")
    raise

# --- TRANSFORMAÇÕES ENCADEDAS ---

df_final = df.withColumn("nome", trim(col("nome"))) \
             .withColumn("idade", col("idade").cast("int")) \
             .withColumn("renda", col("renda").cast("double")) \
             .filter(col("idade").isNotNull()) \
             .dropDuplicates(["id"])


# Exibe os dados transformados
df_final.show(5)

# Mostra a nova estrutura para confirmar os tipos (int e double)
df_final.printSchema()


+---+-----------+--------------+-----+------+----------+
| id|       nome|  cpf_original|idade| renda|    status|
+---+-----------+--------------+-----+------+----------+
|  2|Maria Souza|   99911122233|   45|4800.0|PROCESSADO|
|  6| Paula Lima|   00000000000|   19| 900.0|PROCESSADO|
|  4|  Ana Costa|444.555.666-AA|   28|3200.0|PROCESSADO|
|  5| Rui Santos|   66677788899|   55|7000.0|PROCESSADO|
|  1| João Silva|123.456.789-00|   30|2500.5|PROCESSADO|
+---+-----------+--------------+-----+------+----------+
only showing top 5 rows
root
 |-- id: string (nullable = true)
 |-- nome: string (nullable = true)
 |-- cpf_original: string (nullable = true)
 |-- idade: integer (nullable = true)
 |-- renda: double (nullable = true)
 |-- status: string (nullable = true)



### 3. Padronização de Texto
**O que é:**

Corrigir, formatar e deixar os textos uniformes.

Exemplos:

- UPPER / lower
- Remover acentos
- Corrigir variações (“Joao” → “João”)
- Normalizar nomes

In [0]:
from pyspark.sql.functions import upper

df = df.withColumn("cidade", upper(col("cidade")))

In [0]:
# Exemplo MAIÚSCULAS
from pyspark.sql.functions import col, upper

# --- CARREGAMENTO (Assumindo que 'df' é carregado) ---
caminho_parquet = "/Volumes/workspace/lhdw/landingzone/vendas/processar/clientes_cadastro_corrigo.parquet"
df = spark.read.parquet(caminho_parquet) 

# 1. Padroniza a coluna 'nome' para MAIÚSCULAS para facilitar buscas e agrupamentos.
df = df.withColumn("nome", upper(col("nome")))

df.show(5)

+---+-----------+--------------+-----+-------+----------+
| id|       nome|  cpf_original|idade|  renda|    status|
+---+-----------+--------------+-----+-------+----------+
|  1| JOÃO SILVA|123.456.789-00|   30|2500.50|PROCESSADO|
|  2|MARIA SOUZA|   99911122233|   45|4800.00|PROCESSADO|
|  3|PEDRO ALVES|   11122233344|   22|1500.00|PROCESSADO|
|  4|  ANA COSTA|444.555.666-AA|   28|3200.00|PROCESSADO|
|  5| RUI SANTOS|   66677788899|   55|7000.00|PROCESSADO|
+---+-----------+--------------+-----+-------+----------+
only showing top 5 rows


In [0]:
# Exemplo minusculo
from pyspark.sql.functions import col, lower

caminho_parquet = "/Volumes/workspace/lhdw/landingzone/vendas/processar/clientes_cadastro_corrigo.parquet"
df = spark.read.parquet(caminho_parquet) 

# 1. Padroniza a coluna 'nome' para MAIÚSCULAS para facilitar buscas e agrupamentos.
df = df.withColumn("nome", lower(col("nome")))

df.show(5)

+---+-----------+--------------+-----+-------+----------+
| id|       nome|  cpf_original|idade|  renda|    status|
+---+-----------+--------------+-----+-------+----------+
|  1| joão silva|123.456.789-00|   30|2500.50|PROCESSADO|
|  2|maria souza|   99911122233|   45|4800.00|PROCESSADO|
|  3|pedro alves|   11122233344|   22|1500.00|PROCESSADO|
|  4|  ana costa|444.555.666-AA|   28|3200.00|PROCESSADO|
|  5| rui santos|   66677788899|   55|7000.00|PROCESSADO|
+---+-----------+--------------+-----+-------+----------+
only showing top 5 rows


In [0]:
from pyspark.sql.functions import col, regexp_replace, upper, trim

caminho_parquet = "/Volumes/workspace/lhdw/landingzone/vendas/processar/clientes_cadastro_corrigo.parquet"
df = spark.read.parquet(caminho_parquet) 

df_limpo = df.withColumn("nome_limpo", 
        regexp_replace(col("nome"), "[ÁÀÂÃ]", "A")) \
    .withColumn("nome_limpo", 
        regexp_replace(col("nome_limpo"), "[ÉÈÊ]", "E")) \
    .withColumn("nome_limpo", 
        regexp_replace(col("nome_limpo"), "[ÍÌÎ]", "I")) \
    .withColumn("nome_limpo", 
        regexp_replace(col("nome_limpo"), "[ÓÒÔÕ]", "O")) \
    .withColumn("nome_limpo", 
        regexp_replace(col("nome_limpo"), "[ÚÙÛÜ]", "U")) \
    .withColumn("nome_limpo", 
        regexp_replace(col("nome_limpo"), "[Ç]", "C")) \
    .withColumn("nome_limpo", 
        regexp_replace(col("nome_limpo"), "[áàâã]", "a")) \
    .withColumn("nome_limpo", 
        regexp_replace(col("nome_limpo"), "[éèê]", "e")) \
    .withColumn("nome_limpo", 
        regexp_replace(col("nome_limpo"), "[íìî]", "i")) \
    .withColumn("nome_limpo", 
        regexp_replace(col("nome_limpo"), "[óòôõ]", "o")) \
    .withColumn("nome_limpo", 
        regexp_replace(col("nome_limpo"), "[úùûü]", "u")) \
    .withColumn("nome_limpo", 
        regexp_replace(col("nome_limpo"), "[ç]", "c")) \
    .withColumn("nome_limpo", 
        trim(col("nome_limpo"))) \
    .withColumn("nome_limpo", 
        upper(col("nome_limpo"))) \
    .dropDuplicates(["id"]) # Suas outras transformações

# O resultado: 'João Silva' vira 'JOAO SILVA'
df_limpo.show(5)

+---+-----------+--------------+-----+-------+----------+-----------+
| id|       nome|  cpf_original|idade|  renda|    status| nome_limpo|
+---+-----------+--------------+-----+-------+----------+-----------+
|  2|Maria Souza|   99911122233|   45|4800.00|PROCESSADO|MARIA SOUZA|
|  6| Paula Lima|   00000000000|   19| 900.00|PROCESSADO| PAULA LIMA|
|  4|  Ana Costa|444.555.666-AA|   28|3200.00|PROCESSADO|  ANA COSTA|
|  5| Rui Santos|   66677788899|   55|7000.00|PROCESSADO| RUI SANTOS|
|  1| João Silva|123.456.789-00|   30|2500.50|PROCESSADO| JOAO SILVA|
+---+-----------+--------------+-----+-------+----------+-----------+
only showing top 5 rows


In [0]:
# CARACTERES_ORIGEM = "áàâãéèêíìîóòôõúùûüçÁÀÂÃÉÈÊÍÌÎÓÒÔÕÚÙÛÜÇ"
# CARACTERES_DESTINO = "aaaaeeeiiioooouuuucAAAAEEEIIIOOOOUUUUC"

# 1ª Linha: TIRA OS ACENTOS
# Cria a coluna 'nome_limpo'. Pega a coluna 'nome' original e troca todos os caracteres 
# acentuados (ã, é, ç) pelos seus equivalentes sem acento (a, e, c).

# 2ª Linha: COLOCA EM MAIÚSCULAS
# Pega o resultado sem acentos da linha anterior (nome_limpo) e converte TODAS as letras para MAIÚSCULAS.           

# 3ª Linha: REMOVE ESPAÇOS
# Pega o resultado em maiúsculas da linha anterior e remove qualquer espaço em branco 
# que tenha sobrado no início ou no fim do texto (limpeza final).
            

from pyspark.sql.functions import col, translate, upper, trim

# Defina as strings de origem e destino
CARACTERES_ORIGEM = "áàâãéèêíìîóòôõúùûüçÁÀÂÃÉÈÊÍÌÎÓÒÔÕÚÙÛÜÇ"
CARACTERES_DESTINO = "aaaaeeeiiioooouuuucAAAAEEEIIIOOOOUUUUC"

# --- TRANSFORMAÇÃO ---
df_final = df.withColumn("nome_limpo", translate(col("nome"), CARACTERES_ORIGEM, CARACTERES_DESTINO)) \
             .withColumn("nome_limpo", upper(col("nome_limpo"))) \
             .withColumn("nome_limpo", trim(col("nome_limpo")))

df_final.show(5)

+---+-----------+--------------+-----+-------+----------+-----------+
| id|       nome|  cpf_original|idade|  renda|    status| nome_limpo|
+---+-----------+--------------+-----+-------+----------+-----------+
|  1| João Silva|123.456.789-00|   30|2500.50|PROCESSADO| JOAO SILVA|
|  2|Maria Souza|   99911122233|   45|4800.00|PROCESSADO|MARIA SOUZA|
|  3|Pedro Alves|   11122233344|   22|1500.00|PROCESSADO|PEDRO ALVES|
|  4|  Ana Costa|444.555.666-AA|   28|3200.00|PROCESSADO|  ANA COSTA|
|  5| Rui Santos|   66677788899|   55|7000.00|PROCESSADO| RUI SANTOS|
+---+-----------+--------------+-----+-------+----------+-----------+
only showing top 5 rows


### 4. Normalização de Dados (Standardization)

**O que é:**

Padronizar unidades ou formatos.

Exemplos:

- 10 km → 10.000 
- R$ 1.200,00 → 1200.00
- "Sim/Não" → 1/0
- "F/M" → "Feminino/Masculino"

In [0]:
df = df.withColumn("valor", col("valor").cast("double"))


In [0]:
df_final = df.withColumn("genero_completo", # Cria uma nova coluna para o resultado
             when(col("genero") == "F", "Feminino") # SE 'genero' for "F", então use "Feminino"
             .when(col("genero") == "M", "Masculino") # SE 'genero' for "M", então use "Masculino"
             .otherwise(col("genero")) # Caso contrário (se for Nulo ou outro código), mantém o valor original.
             )

In [0]:
from pyspark.sql.functions import col, trim, upper, translate, when, lit

# Strings para a função translate (remoção de acentos)
CARACTERES_ORIGEM = "áàâãéèêíìîóòôõúùûüçÁÀÂÃÉÈÊÍÌÎÓÒÔÕÚÙÛÜÇ"
CARACTERES_DESTINO = "aaaaeeeiiioooouuuucAAAAEEEIIIOOOOUUUUC"

# Caminho do seu arquivo Parquet
caminho_parquet = "/Volumes/workspace/lhdw/landingzone/vendas/processar/clientes_cadastro_corrigo.parquet"
df = spark.read.parquet(caminho_parquet)

df_final = df.withColumn("nome", 
             # Padroniza 'nome': Remove acentos (translate), coloca em MAIÚSCULAS (upper) e remove espaços (trim)
             trim(upper(translate(col("nome"), CARACTERES_ORIGEM, CARACTERES_DESTINO)))
             ) \
             .withColumn("idade", 
                 # Normalização de Tipo: Converte 'idade' para Número Inteiro (int)
                 col("idade").cast("int")) \
             .withColumn("renda", 
                 # Normalização de Tipo: Converte 'renda' para Número Decimal (double)
                 col("renda").cast("double")) \
             .withColumn("status_processado_flag", 
                 # Normalização de Status: Converte Status para FLAG numérica (1 para Processado, 0 caso contrário)
                 when(col("status") == "Processado", 1).otherwise(0)) \
             .filter(
                 # Filtragem: Remove linhas onde a idade é nula, garantindo a qualidade mínima dos dados
                 col("idade").isNotNull()) \
             .dropDuplicates(
                 # Deduplicação: Remove duplicatas do registro, usando o 'id' como chave
                 ["id"])

df_final.printSchema() # Confirma se as conversões (int, double, flag) funcionaram

df_final.show(5) # Mostra as 5 primeiras linhas do resultado

root
 |-- id: string (nullable = true)
 |-- nome: string (nullable = true)
 |-- cpf_original: string (nullable = true)
 |-- idade: integer (nullable = true)
 |-- renda: double (nullable = true)
 |-- status: string (nullable = true)
 |-- status_processado_flag: integer (nullable = false)

+---+-----------+--------------+-----+------+----------+----------------------+
| id|       nome|  cpf_original|idade| renda|    status|status_processado_flag|
+---+-----------+--------------+-----+------+----------+----------------------+
|  2|MARIA SOUZA|   99911122233|   45|4800.0|PROCESSADO|                     0|
|  6| PAULA LIMA|   00000000000|   19| 900.0|PROCESSADO|                     0|
|  4|  ANA COSTA|444.555.666-AA|   28|3200.0|PROCESSADO|                     0|
|  5| RUI SANTOS|   66677788899|   55|7000.0|PROCESSADO|                     0|
|  1| JOAO SILVA|123.456.789-00|   30|2500.5|PROCESSADO|                     0|
+---+-----------+--------------+-----+------+----------+---------------

### 5. Tratamento de Datas
**O que é:**

Corrigir formatos e extrair partes da data.

Exemplos:

- Converter “01/10/2025” → “2025-10-01”
- Extrair mês, ano, dia
- Criar coluna de período (YYYY-MM)

In [0]:
# 1. Converte a coluna 'data' (que era texto) em um formato de calendário real (DATE).
df = df.withColumn("data", to_date(col("data"), "dd/MM/yyyy"))

# 2. Cria a coluna 'ano', extraindo o número do ano da coluna 'data'.
df = df.withColumn("ano", year(col("data")))

# 3. Cria a coluna 'mes', extraindo o número do mês da coluna 'data'.
df = df.withColumn("mes", month(col("data")))

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, try_to_date # Usamos try_to_date para ser seguro

dados_teste = [("15/12/2025",), ("01/01/2024",), ("202501",)] 
df_teste = spark.createDataFrame(dados_teste, ["data_original"])

df_resultado = df_teste.withColumn("data_convertida", 
             # TENTA converter de "dd/MM/yyyy" para tipo DATE.
             try_to_date(col("data_original"), "dd/MM/yyyy"))

df_resultado.show(truncate=False)

+-------------+---------------+
|data_original|data_convertida|
+-------------+---------------+
|15/12/2025   |2025-12-15     |
|01/01/2024   |2024-01-01     |
|202501       |NULL           |
+-------------+---------------+



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, month, dayofmonth

# 2. Cria uma tabela (DataFrame) com datas
dados_teste = [("2025-12-15",), ("2024-01-01",)]
colunas = ["data"]
df_teste = spark.createDataFrame(dados_teste, colunas).withColumn("data", col("data").cast("date")) 

# 3. APLICA SUA LINHA: Extrai o ano e coloca na coluna 'ano'
df_resultado = df_teste.withColumn("ano", year(col("data"))) \
                       .withColumn("mes", month(col("data"))) \
                       .withColumn("dia", dayofmonth(col("data")))

# 4. Mostra o resultado final
df_resultado.show()

+----------+----+---+---+
|      data| ano|mes|dia|
+----------+----+---+---+
|2025-12-15|2025| 12| 15|
|2024-01-01|2024|  1|  1|
+----------+----+---+---+



### 6. Agregações (Aggregation)
**O que é:**

Agrupar e sumarizar dados.

Exemplos:

- Contar quantos clientes
- Somar vendas por mês
- Calcular idade média por cidade

In [0]:
df.groupBy("cidade").agg({"idade": "avg", "id": "count"})


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, count, col

caminho_parquet = "/Volumes/workspace/lhdw/landingzone/vendas/processar/clientes_cadastro_corrigo.parquet"
df_clientes = spark.read.parquet(caminho_parquet) # DataFrame carregado em df_clientes

# Calcula a Média de Renda, Média de Idade e o Total de Clientes
df_resultado = df_clientes.agg(
    avg(col("renda")).alias("media_renda_global"),
    avg(col("idade")).alias("media_idade_global"),
    count(col("id")).alias("total_clientes") # Usando 'id' para a contagem
)

print("--- Resultado da Média e Contagem Global de Clientes ---")
df_resultado.show(truncate=False)

--- Resultado da Média e Contagem Global de Clientes ---
+------------------+------------------+--------------+
|media_renda_global|media_idade_global|total_clientes|
+------------------+------------------+--------------+
|3316.75           |33.166666666666664|6             |
+------------------+------------------+--------------+



### 7. Junções (Joins)
**O que é:**

Combinar tabelas, algo ESSENCIAL em engenharia de dados.

Exemplos:

- Unir clientes + compras
- Tabela de produtos + preços
- Fato + dimensões


###Os 4 Tipos Principais de Junção (`Join`) em PySpark

Os Joins são cruciais para **combinar informações de duas tabelas** (DataFrames) com base em um campo em comum (a chave de ligação).

#### 1. `INNER JOIN` (Junção Interna)

* **O que faz:** Retorna **apenas as linhas que têm correspondência** nas duas tabelas.
* **Exemplo:** Quero ver só os clientes que **realmente fizeram compras**.
* **Sintaxe PySpark:** `df_A.join(df_B, "chave_comum", "inner")`

#### 2. `LEFT JOIN` (Junção Esquerda)

* **O que faz:** Retorna **TUDO** da Tabela A (esquerda) e só o que combina da Tabela B (direita).
* **Exemplo:** Quero ver **todos os clientes**, mesmo aqueles que **nunca compraram** (os campos da Tabela B ficam `null`).
* **Sintaxe PySpark:** `df_A.join(df_B, "chave_comum", "left")`

#### 3. `RIGHT JOIN` (Junção Direita)

* **O que faz:** Retorna **TUDO** da Tabela B (direita) e só o que combina da Tabela A (esquerda).
* **Exemplo:** Quero ver **todas as vendas**, mesmo que o cliente não esteja cadastrado (os campos da Tabela A ficam `null`).
* **Sintaxe PySpark:** `df_A.join(df_B, "chave_comum", "right")`

#### 4. `FULL OUTER JOIN` (Junção Externa Completa)

* **O que faz:** Retorna **TUDO** o que existe em **ambas** as tabelas. É a união total.
* **Exemplo:** Quero ver a união de todos os clientes e todas as vendas, mostrando onde eles se cruzam e onde estão separados.
* **Sintaxe PySpark:** `df_A.join(df_B, "chave_comum", "full")`


In [0]:
df_final = df_clientes.join(df_vendas, "id_cliente", "left")

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Tabelas de teste
df_clientes = spark.createDataFrame([(101, "Alice"), (102, "Bob")], ["id_cliente", "nome"])
df_vendas = spark.createDataFrame([(101, 50.0), (103, 75.0)], ["id_cliente", "valor"])

# APLICAÇÃO DO LEFT JOIN
df_final = df_clientes.join(df_vendas, "id_cliente", "left")

df_final.show(truncate=False)

+----------+-----+-----+
|id_cliente|nome |valor|
+----------+-----+-----+
|101       |Alice|50.0 |
|102       |Bob  |NULL |
+----------+-----+-----+



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Tabelas de teste
df_clientes = spark.createDataFrame([(101, "Alice"), (102, "Bob")], ["id_cliente", "nome"])
df_vendas = spark.createDataFrame([(101, 50.0), (103, 75.0)], ["id_cliente", "valor"])

# APLICAÇÃO DO INNER JOIN
df_final = df_clientes.join(df_vendas, "id_cliente", "inner")

df_final.show(truncate=False)

+----------+-----+-----+
|id_cliente|nome |valor|
+----------+-----+-----+
|101       |Alice|50.0 |
+----------+-----+-----+



### 8. Enriquecimento de Dados
**O que é:**

Adicionar novas informações para melhorar o dataset.

Exemplos:

- Tabela de CEP → adicionar cidade e estado
- Tabela de câmbio → converter dólar para real
- Criar coluna de faixa etária

In [0]:
df = df.withColumn(
    "faixa_etaria",
    when(col("idade") < 18, "jovem")
    .when(col("idade") < 60, "adulto")
    .otherwise("idoso")
)


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when

# --- Testando todas as faixas) ---
dados_teste = [
    (15,),   # Jovem
    (25,),   # Adulto
    (59,),   # Adulto (Limite)
    (60,),   # Idoso (Começa aqui)
    (80,)    # Idoso
]
df_teste = spark.createDataFrame(dados_teste, ["idade"])

# --- APLICAÇÃO DA LÓGICA DE ENRIQUECIMENTO ---
df_resultado = df_teste.withColumn(
    "faixa_etaria",
    when(col("idade") < 18, "jovem")   # Se for menor que 18
    .when(col("idade") < 60, "adulto") # Senão, se for menor que 60
    .otherwise("idoso")                 # Senão (se for 60 ou mais)
)

df_resultado.show(truncate=False)

+-----+------------+
|idade|faixa_etaria|
+-----+------------+
|15   |jovem       |
|25   |adulto      |
|59   |adulto      |
|60   |idoso       |
|80   |idoso       |
+-----+------------+



### 9. Pivot/Unpivot (Transposição)
**O que é:**

Transformar linhas em colunas e vice-versa.

Exemplos:

- Vendas por mês → colunas Jan, Fev, Mar
- Transformar métricas em linhas para facilitar leitura

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# 2. Dados de Entrada: (Produto, Mês, Valor)
dados_teste = [
    ("A", "Jan", 100),
    ("A", "Fev", 50),
    ("B", "Jan", 200)
]
df_teste = spark.createDataFrame(dados_teste, ["produto", "mes", "valor"])

# Agrupa por 'produto', transforma 'mes' em colunas, soma 'valor'.
df_pivot = df_teste.groupBy("produto").pivot("mes").sum("valor")

print("--- Resultado do Pivot ---")
df_pivot.show(truncate=False)


--- Resultado do Pivot ---
+-------+----+---+
|produto|Fev |Jan|
+-------+----+---+
|A      |50  |100|
|B      |NULL|200|
+-------+----+---+



### 10. Deduplicação
**O que é:**

Remover registros repetidos, muito comum em arquivos de origem ruim.

In [0]:
df = df.dropDuplicates(["id"])


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# --- Duplicatas no ID 101) ---
dados_teste = [
    (101, "Alice", 5000),   # Linha que será MANTIDA
    (102, "Bob", 3000),     # Linha Única
    (101, "Alice", 5500),   # Duplicata que será REMOVIDA
    (103, "Carlos", 8000)   # Linha Única
]
df_teste = spark.createDataFrame(dados_teste, ["id", "nome", "renda"])

print("--- DataFrame Original (Com Duplicatas) ---")
df_teste.show(truncate=False)

# Remove duplicatas baseadas APENAS na coluna "id"
df_resultado = df_teste.dropDuplicates(["id"])


print("--- DataFrame Após dropDuplicates(['id']) ---")
df_resultado.show(truncate=False)

--- DataFrame Original (Com Duplicatas) ---
+---+------+-----+
|id |nome  |renda|
+---+------+-----+
|101|Alice |5000 |
|102|Bob   |3000 |
|101|Alice |5500 |
|103|Carlos|8000 |
+---+------+-----+

--- DataFrame Após dropDuplicates(['id']) ---
+---+------+-----+
|id |nome  |renda|
+---+------+-----+
|101|Alice |5000 |
|102|Bob   |3000 |
|103|Carlos|8000 |
+---+------+-----+



### 11. Regras de Negócio
**O que é:**

Criar colunas derivadas com lógicas definidas pelo time de negócios.

Exemplos:

- Valor > 10.000 → marcar como transação suspeita
- Cliente sem e-mail → marcar como “incompleto”
- Entrega atrasada → atraso = true

In [0]:
df = df.withColumn("suspeita", col("valor") > 10000)



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

dados_teste = [
    (5000.00,),   # Não suspeito
    (10000.01,),  # Suspeito (maior que 10000)
    (25000.00,)   # Suspeito
]
df_teste = spark.createDataFrame(dados_teste, ["valor"])

# REGRA DE NEGÓCIO
# Cria a coluna 'suspeita'
df_resultado = df_teste.withColumn("suspeita", col("valor") > 10000)

print("--- Resultado da Regra de Suspeita ---")
df_resultado.show(truncate=False)

--- Resultado da Regra de Suspeita ---
+--------+--------+
|valor   |suspeita|
+--------+--------+
|5000.0  |false   |
|10000.01|true    |
|25000.0 |true    |
+--------+--------+

