## Data Quality com PySpark 
### 1. Inicialização do Spark e Criação da SparkSession
- Este código deve ser o primeiro bloco a ser executado no Jupyter Notebook ou Databricks. Ele configura a sessão do Spark.

In [None]:
# Importa o Spark e outras bibliotecas necessárias
import pyspark # O módulo principal
from pyspark.sql import SparkSession
from pyspark.sql import functions as F # O alias 'F' MAIÚSCULO é usado para todas as funções (F.col, F.when, etc.)
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, DateType

# Cria ou obtém uma SparkSession
# Master: 'local[*]' indica que o Spark deve usar todos os núcleos disponíveis na máquina
# appName: Nome da aplicação (importante para monitoramento)
spark = (SparkSession.builder
    .master("local[*]")
    .appName("AulaPraticaPySpark_DataQuality")
    .config("spark.executor.memory", "4g") # Opcional: Configurações de memória
    .config("spark.driver.memory", "4g")  # Opcional: Configurações de memória
    .getOrCreate()
)

# Imprime o status da sessão para confirmação
print("SparkSession inicializada com sucesso!")
print(f"Versão do Spark: {spark.version}")
print(f"Aplicação: {spark.sparkContext.appName}")

**Explicando código com mais detalhes**
- **SparkSession:** É o principal ponto de entrada para interagir com o cluster Spark. É como ligar o motor do carro para começar a processar dados distribuídos.

- **.master("local[*]"):** Para o ambiente de aula, usamos o modo local. O * diz ao Spark para usar todos os núcleos de CPU disponíveis, simulando um processamento paralelo em um único nó. Em produção, isso seria o endereço do seu gerenciador de cluster (ex: YARN, Mesos ou K8s).

- **.config(...):** Permite configurar recursos como memória. Em Big Data, é crucial saber balancear memória (executor.memory) e núcleos (num-executors) para otimizar a performance.

### 2. Snippet de Carregamento de Dados (Raw Layer)<br>
- Para demonstrar as técnicas de Data Quality, usaremos um arquivo chamado vendas_brutas.csv com dados brutos, incluindo nulos e tipos incorretos e definiremos seu schema antes do carregamento.

*DEFINIÇÃO DO SCHEMA (CRUCIAL PARA DATA QUALITY E PERFORMANCE)*
- **Conexão com a Teoria:** Evita o inferSchema, que requer uma leitura completa do arquivo.
- Definir o schema antecipadamente garante que os tipos de dados sejam os esperados, prevenindo erros e otimizando a leitura.

In [None]:

schema_vendas = StructType([
    StructField("ID_VENDA", IntegerType(), nullable=False),
    StructField("DATA_REGISTRO_RAW", StringType(), nullable=True), # O tipo original é String, pois o formato é inconsistente ('2024-03-20' vs '2024/03/20')
    StructField("VALOR_BRUTO_RAW", StringType(), nullable=True),  # O tipo original é String, pois pode vir com vírgula (250,99) ou nulo
    StructField("PRODUTO", StringType(), nullable=True),
    StructField("STATUS_VENDA", StringType(), nullable=True)
])



- CARREGAMENTO DO DATAFRAME
    - Use um caminho real para o arquivo, ou simule-o usando a API createDataFrame

In [None]:
caminho_arquivo = "vendas_brutas.csv" 
df_vendas_raw = (
    spark.read
    .csv(
        caminho_arquivo,
        header=True,
        schema=schema_vendas,
        sep=",",
        # O PySpark lê a primeira linha como cabeçalho
        # e garante que os dados sigam o schema definido (Data Quality!)
    )
)

- INSPEÇÃO INICIAL

In [None]:
print("\n--- Schema Original (Raw) ---")
df_vendas_raw.printSchema()

print("\n--- Primeiros Registros ---")
df_vendas_raw.show(truncate=False)

### 3. **Limpeza (Cleaning):** Tratamento de Valores Nulos
- **Teoria:** Lidar com missing values (null, None, ou NaN) é o primeiro passo para garantir a Integridade dos dados. 
- **Prática (PySpark):** Usamos .fillna() ou funções condicionais com when/otherwise.

In [None]:
# Ação 1: Limpar/Preencher o campo STATUS_VENDA
# Se o status for nulo, vamos preencher com 'EM_PROCESSAMENTO' (regra de negócio)
df_limpeza = df_vendas_raw.withColumn(
    "STATUS_VENDA",
    F.coalesce(F.col("STATUS_VENDA"), F.lit("EM_PROCESSAMENTO")) # Coalesce pega o primeiro valor não nulo
)

# Ação 2: Tratamento de nulos em VALOR_BRUTO_RAW
# Se o valor for nulo, vamos preencher com 0 (regra de negócio para vendas não registradas)
df_limpeza = df_limpeza.na.fill(value="0.0", subset=['VALOR_BRUTO_RAW'])

print("\n--- 1. Limpeza (Cleaning): Nulos Tratados ---")
df_limpeza.show(truncate=False)

### 4. Transformação (Transformation): Padronização e Casting de Tipos
**Teoria:** Garantir a Consistência dos dados, convertendo para formatos e tipos padronizados (e.g., String para Date, remoção de caracteres inválidos). 
**Prática (PySpark):** Uso de funções como regexp_replace, to_date, cast, upper.

In [None]:
df_transformacao = (
    df_limpeza
    # Transformação 1: Padronizar STATUS_VENDA para caixa alta (CONSISTÊNCIA)
    .withColumn("STATUS_VENDA", F.upper(F.col("STATUS_VENDA")))

    # Transformação 2: Limpar e Converter VALOR_BRUTO_RAW para Decimal (TIPAGEM CORRETA)
    # 1. Substitui vírgulas (',') por ponto ('.')
    # 2. Converte o resultado para tipo Double
    .withColumn(
        "VALOR_VENDA",
        F.regexp_replace(F.col("VALOR_BRUTO_RAW"), ",", ".").cast(DoubleType())
    )

    # Transformação 3: Converter DATA_REGISTRO_RAW para Tipo Date (FORMATO CORRETO)
    # Tenta inferir o formato de data (yyyy-MM-dd, yyyy/MM/dd, etc.) - PySpark é flexível
    .withColumn(
        "DATA_VENDA",
        F.to_date(F.col("DATA_REGISTRO_RAW"), "yyyy-MM-dd") # Tenta formato padrão, o PySpark pode inferir variações simples
    )
    .withColumn("DATA_VENDA", F.coalesce(F.col("DATA_VENDA"), F.to_date(F.col("DATA_REGISTRO_RAW"), "yyyy/MM/dd"))) # Tenta o formato com barra, caso o anterior falhe
    .drop("DATA_REGISTRO_RAW", "VALOR_BRUTO_RAW") # Remove as colunas RAW
)

print("\n--- 2. Transformação: Padronização e Casting ---")
df_transformacao.printSchema()
df_transformacao.show(truncate=False)

### 5. Enriquecimento (Enrichment): Adição de Colunas de Negócio
**Teoria:** Adicionar valor ao dataset derivando novos campos de dados existentes, criando features úteis para análise.

**Prática (PySpark):** Uso de withColumn com expressões lógicas (when/otherwise).

In [None]:
# Ação: Criar uma nova coluna indicando se a venda é considerada de "Alto Valor" (> 200)
df_enriquecimento = df_transformacao.withColumn(
    "FLAG_ALTO_VALOR",
    F.when(F.col("VALOR_VENDA") >= 200, F.lit(True)).otherwise(F.lit(False))
)

print("\n--- 3. Enriquecimento: Nova Feature Adicionada ---")
df_enriquecimento.show(truncate=False)

### 6. Normalização (Normalization): Padronização de Categorias
**Teoria:** Reduzir a variância de valores categóricos para garantir que a mesma entidade tenha sempre a mesma representação.

**Prática (PySpark):** Aplicar lookups ou regras de substituição em campos como PRODUTO.

In [None]:
# Ação: Padronizar o nome do produto (ex: 'LAPTOP' pode vir como 'NoteBook', 'Laptop')
# Vamos usar o PRODUTO para garantir que todos sejam capitalizados
df_normalizacao = df_enriquecimento.withColumn(
    "PRODUTO_PADRAO",
    F.when(F.upper(F.col("PRODUTO")).like("%LAPTOP%"), F.lit("NOTEBOOK/LAPTOP"))
    .when(F.upper(F.col("PRODUTO")).like("%MOUSE%"), F.lit("PERIFERICO_SIMPLES"))
    .otherwise(F.upper(F.col("PRODUTO")))
).drop("PRODUTO").withColumnRenamed("PRODUTO_PADRAO", "PRODUTO") # Substitui a coluna original

print("\n--- 4. Normalização: Categorias Padronizadas ---")
df_normalizacao.show(truncate=False)

### 7. Desduplicação (Deduplication): Remoção de Registros Redundantes
**Teoria:** Garantir que cada evento/entidade única seja representada apenas uma vez no dataset final (fundamental para Precisão).

**Prática (PySpark):** Usar .dropDuplicates() ou Window Functions para cenários avançados (como manter o registro mais recente).
No nosso exemplo, as linhas 1004 e 1005 são duplicatas.

In [None]:
# Usamos apenas as colunas que definem a unicidade do registro (chave primária natural)
colunas_chave = ["ID_VENDA"]

# Ação: Remover duplicatas estritas
df_curated = df_normalizacao.dropDuplicates(subset=colunas_chave)

# Demonstração de Desduplicação Avançada (Teoria: Manter o Mais Recente)
# Isso é útil quando ID_VENDA pudesse se repetir com versões diferentes.
# w = Window.partitionBy("ID_VENDA").orderBy(F.col("DATA_VENDA").desc())
# df_curated_avancado = (df_normalizacao
#     .withColumn("row_number", F.row_number().over(w))
#     .filter(F.col("row_number") == 1)
#     .drop("row_number")
# )

print(f"\n--- 5. Desduplicação: Registros Únicos ({df_normalizacao.count()} vs {df_curated.count()}) ---")
df_curated.show(truncate=False)
df_curated.printSchema()

### 8. Armazenamento Otimizado (Parquet & ORC)
1. Escrita no Formato Parquet (Otimização Colunar)
- **Conexão com a Teoria:**
Parquet: É o formato padrão da indústria, otimizado para consultas analíticas (OLAP). O armazenamento colunar permite que o Spark (e o SerDe do Parquet) leia apenas as colunas que a query solicita, ignorando o resto.

- **Particionamento:** Usar partitionBy() organiza os dados fisicamente no disco/storage (e.g., por ano/mês ou por STATUS_VENDA). Isso permite ao Spark ignorar diretórios inteiros (Predicate Pushdown).

- **Compactação:** Usaremos a compactação Snappy (o padrão para Parquet), que oferece um bom equilíbrio entre taxa de compressão e velocidade de descompactação.

In [None]:
# Definindo o caminho de saída (simulação de um Data Lake)
caminho_parquet = "data/curated/vendas_parquet"

# Ação: Salvar o DataFrame tratado (df_curated) em Parquet
(
    df_curated.write
    .mode("overwrite") # Sobrescreve se o diretório já existir
    .partitionBy("STATUS_VENDA") # Particionamento físico: otimiza queries que filtram por status
    .option("compression", "snappy") # Snappy é o padrão (bom trade-off)
    .parquet(caminho_parquet)
)

print(f"\n--- 1. Escrita em Parquet concluída ---")
print(f"Dados salvos e particionados em: {caminho_parquet}")

2. Escrita no Formato ORC (Alternativa Colunar)
💾 Conexão com a Teoria:
ORC (Optimized Row Columnar): Outro formato colunar robusto, popular no ecossistema Hive. Possui recursos avançados como indexação de dados e Bloom Filters, que podem ser mais eficientes em certos cenários de leitura seletiva.

In [None]:
# Definindo o caminho de saída
caminho_orc = "data/curated/vendas_orc"

# Ação: Salvar o mesmo DataFrame em ORC
(
    df_curated.write
    .mode("overwrite")
    .partitionBy("STATUS_VENDA")
    .orc(caminho_orc)
)

print(f"\n--- 2. Escrita em ORC concluída ---")
print(f"Dados salvos e particionados em: {caminho_orc}")

### 9.Análise da Otimização (O Ponto Alto da Aula)
- Agora vamos provar a teoria do Predicate Pushdown e do SerDe na prática.

1. Prática do Predicate Pushdown (Ignorando Arquivos)
**Teoria:** Ao particionarmos por STATUS_VENDA, se pedirmos apenas vendas 'CANCELADO', o Spark ignora os diretórios 'CONCLUIDO' e 'EM_PROCESSAMENTO'.

In [None]:
# Leitura Otimizada - Filtro na partição
# Apenas os diretórios (partições) que contêm 'CANCELADO' serão lidos
df_leitura_filtrada = spark.read.parquet(caminho_parquet).filter(F.col("STATUS_VENDA") == "CANCELADO")

print("\n--- 3.1. Predicate Pushdown: Filtro no Disco ---")
print(f"Total de Registros lidos: {df_leitura_filtrada.count()}")
df_leitura_filtrada.show()


2. Prática do SerDe (Deserializando Apenas o Necessário)
Teoria: O SerDe (Serializer/Deserializer) do Parquet só desserializa as colunas que você projeta na sua query. Se lermos apenas ID_VENDA, o Spark não carrega os bytes das colunas VALOR_VENDA ou PRODUTO para a memória.

In [None]:
# Leitura Otimizada - Seleção de Colunas (Projeção)
# O Spark carrega apenas os bytes da coluna 'ID_VENDA' e 'VALOR_VENDA'
df_projecao = spark.read.parquet(caminho_parquet).select("ID_VENDA", "VALOR_VENDA")

print("\n--- 3.2. SerDe: Deserialização Seletiva (Projeção) ---")
print(f"Colunas carregadas: {df_projecao.columns}")
df_projecao.printSchema()

# Ponto de Discussão:
# Comparação 1: Se este fosse um arquivo CSV, o SerDe leria a linha inteira para depois descartar as colunas não pedidas.
# Comparação 2: No Parquet, o Spark/SerDe acessa apenas a 'faixa' vertical da coluna no arquivo físico.

# 🚀 Conclusão: PySpark - Da Teoria à Produção

Parabéns! 

Vocês completaram com sucesso a jornada prática de 4 horas, aplicando os fundamentos de Big Data em um pipeline PySpark funcional.

## 📝 Resumo do Aprendizado Essencial

Onde a Teoria e a Prática se Encontraram:

| Teoria (Aulas) | Prática (Código PySpark) | Conceito Reforçado |
| :--- | :--- | :--- |
| **Data Pipeline: Processing** | Inicialização da `SparkSession` | PySpark é o motor de **refino** do dado bruto (`Raw Layer`). |
| **Data Quality (5 Técnicas)** | `F.coalesce()`, `to_date()`, `dropDuplicates()` | A disciplina de **qualidade** (Limpeza, Transformação, Desduplicação) é implementada com funções *built-in* do Spark. |
| **Formatos Colunares** | `.write.parquet()`, `.write.orc()` | **Parquet** e **ORC** são os formatos de armazenamento otimizado para economia de I/O e *analytics*. |
| **SerDe & Otimização** | `.read.parquet().select("col")` | O Spark/SerDe acessa apenas as colunas necessárias (**Projeção**) e ignora diretórios (**Predicate Pushdown**). |

---

## 🧭 Próximos Passos e Conexão com o Projeto Final

Para que vocês transformem este conhecimento prático em uma solução de Engenharia de Dados de nível profissional, o foco deve ser na modernização e automação da arquitetura:

### 1. Data Lakehouse: Confiabilidade sobre o Storage

O `df_curated` salvo em Parquet é a nossa base. O próximo passo é garantir **Transações ACID** e **versionamento** sobre ele.

* **Próximo Tópico:** Formatos de Tabela Aberta como **Delta Lake** (ou Apache Iceberg).
* **Motivo:** Permitem funcionalidades cruciais de um Data Warehouse, como `UPSERT` (atualização/inserção) e `DELETE` direto no Data Lake.

### 2. Orquestração e Automação do Pipeline

Um *pipeline* de dados precisa ser executado de forma automática e monitorada.

* **Próximo Tópico:** **Apache Airflow**.
* **Objetivo:** Agendar o código PySpark desenvolvido hoje (as transformações e a escrita em Parquet) em um *workflow* (DAG) gerenciado, garantindo automação e observabilidade.

### 3. Preparação para o Projeto Final

Utilizem o `df_curated` e a lógica de Data Quality implementada hoje como o *core* da fase **Processing** do seu projeto. A próxima etapa é integrar o Delta Lake e construir a automação no Airflow.