# üöÄ Lab DataOps: Governan√ßa e Qualidade de Dados com PySpark e Great Expectations

## Objetivos do Laborat√≥rio

Neste laborat√≥rio pr√°tico, voc√™ ir√°:
- Implementar **testes de qualidade de dados** automatizados com **Great Expectations**
- Aplicar as **6 dimens√µes da qualidade** na pr√°tica
- Criar um **pipeline DataOps** com valida√ß√µes profissionais
- Simular cen√°rios reais de **governan√ßa de dados**
- Gerar **relat√≥rios de qualidade** automatizados

### Conceitos Aplicados
- **DataOps**: Automa√ß√£o e monitoramento cont√≠nuo
- **Governan√ßa**: Regras, pol√≠ticas e responsabilidades
- **Qualidade**: Acur√°cia, Completude, Consist√™ncia, Pontualidade, Unicidade e Validade
- **Great Expectations**: Framework profissional para valida√ß√£o de dados

## 1. Configura√ß√£o do Ambiente PySpark e Great Expectations

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pandas as pd
from datetime import datetime, timedelta
import os

# Great Expectations imports
try:
    import great_expectations as gx
    print(f"‚úÖ Great Expectations vers√£o: {gx.__version__}")
except ImportError:
    print("‚ùå Great Expectations n√£o encontrado. Instalando...")
    !pip install great-expectations==0.18.8 sqlalchemy==1.4.46
    import great_expectations as gx
    print(f"‚úÖ Great Expectations instalado: {gx.__version__}")

# Inicializar Spark Session
spark = SparkSession.builder \
    .appName("DataOps_Governanca_Lab_GX") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

print(f"‚úÖ Spark Session iniciada: {spark.version}")
print(f"üìä Contexto: {spark.sparkContext.appName}")
print(f"üéØ Great Expectations vers√£o: {gx.__version__}")

## 2. Cria√ß√£o de Dados de Exemplo com PySpark

In [None]:
# Dados simulados com problemas de qualidade intencionais
dados_clientes = [
    (1, "Jo√£o Silva", "joao@email.com", "11999887766", "2023-01-15", "Ativo", 25, "SP"),
    (2, "Maria Santos", "maria.santos@gmail.com", "11888776655", "2023-02-20", "Ativo", 32, "RJ"),
    (3, "Pedro", "pedro@invalid", "119999", "2023-03-10", "Inativo", 150, "MG"),  # Problemas: email inv√°lido, telefone incompleto, idade imposs√≠vel
    (4, "Ana Costa", "ana@email.com", "11777665544", "2023-04-05", "Ativo", 28, "SP"),
    (1, "Jo√£o Silva", "joao@email.com", "11999887766", "2023-01-15", "Ativo", 25, "SP"),  # Duplicata
    (5, "", "carlos@email.com", "11666554433", "2023-05-12", "Pendente", None, "RS"),  # Nome vazio, idade nula
    (6, "Lucia Oliveira", "lucia@email.com", "11555443322", "2022-12-01", "Ativo", 45, "BA"),
    (7, "Roberto Lima", "roberto@email.com", "11444332211", "2023-06-18", "Cancelado", 38, "PR"),  # Status n√£o padr√£o
    (8, "Fernanda", None, "11333221100", "2023-07-22", "Ativo", 29, "SC"),  # Email nulo
    (9, "Marcos Pereira", "marcos@email.com", "11222110099", "2023-08-30", "Ativo", -5, "GO")  # Idade negativa
]

# Schema definido (Governan√ßa: Padr√µes de Dados)
schema_clientes = StructType([
    StructField("id_cliente", IntegerType(), False),
    StructField("nome", StringType(), True),
    StructField("email", StringType(), True),
    StructField("telefone", StringType(), True),
    StructField("data_cadastro", StringType(), True),
    StructField("status", StringType(), True),
    StructField("idade", IntegerType(), True),
    StructField("estado", StringType(), True)
])

df_clientes_spark = spark.createDataFrame(dados_clientes, schema_clientes)
print("üìã Dataset de clientes criado com problemas de qualidade intencionais")
df_clientes_spark.show()

# Converter para pandas para Great Expectations (evita problemas de serializa√ß√£o)
df_clientes_pandas = df_clientes_spark.toPandas()
print(f"üìä Dataset convertido para pandas: {len(df_clientes_pandas)} registros")

## 3. Configura√ß√£o do Great Expectations

In [None]:
# Configurar diret√≥rio do Great Expectations
project_dir = "/tmp/gx_lab_dataops"
os.makedirs(project_dir, exist_ok=True)
os.chdir(project_dir)

# Inicializar Data Context do Great Expectations
try:
    context = gx.get_context()
    print("üìÅ Contexto Great Expectations carregado")
except:
    context = gx.get_context(project_root_dir=project_dir)
    print("üìÅ Novo contexto Great Expectations criado")

print(f"‚úÖ Great Expectations configurado")
print(f"üìä Dataset preparado: {len(df_clientes_pandas)} registros")

## 4. Cria√ß√£o de Expectativas de Qualidade

In [None]:
# Criar validator usando pandas (evita problemas de serializa√ß√£o do Spark)
try:
    validator = context.sources.pandas_default.read_dataframe(df_clientes_pandas)
    print("‚úÖ Validator criado com sucesso")
except Exception as e:
    print(f"‚ö†Ô∏è Erro: {e}")
    print("üîß Tentando m√©todo alternativo...")
    # M√©todo alternativo mais b√°sico
    validator = gx.validator.validator.Validator(
        execution_engine=gx.execution_engine.PandasExecutionEngine(),
        interactive_evaluation=True
    )
    validator.load_batch_list([gx.core.batch.Batch(data=df_clientes_pandas)])
    print("‚úÖ Validator criado com m√©todo alternativo")

## 5. Definindo Expectativas das 6 Dimens√µes da Qualidade

In [None]:
print("üéØ CRIANDO EXPECTATIVAS DAS 6 DIMENS√ïES DA QUALIDADE")
print("=" * 60)

# 1. COMPLETUDE (Completeness) - Campos n√£o podem ser nulos
print("\nüìä 1. COMPLETUDE - Campos obrigat√≥rios")
validator.expect_column_values_to_not_be_null("id_cliente")
validator.expect_column_values_to_not_be_null("nome")
validator.expect_column_values_to_not_be_null("email")
print("   ‚úÖ Expectativas de completude criadas")

# 2. UNICIDADE (Uniqueness) - Valores √∫nicos
print("\nüîë 2. UNICIDADE - Chaves √∫nicas")
validator.expect_column_values_to_be_unique("id_cliente")
validator.expect_column_values_to_be_unique("email")
print("   ‚úÖ Expectativas de unicidade criadas")

# 3. VALIDADE (Validity) - Formatos e dom√≠nios
print("\n‚úÖ 3. VALIDADE - Formatos e dom√≠nios")
# Email v√°lido
validator.expect_column_values_to_match_regex(
    "email", 
    r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
)
# Idade v√°lida (0-120)
validator.expect_column_values_to_be_between("idade", min_value=0, max_value=120)
# Status v√°lido
validator.expect_column_values_to_be_in_set("status", ["Ativo", "Inativo", "Pendente"])
print("   ‚úÖ Expectativas de validade criadas")

# 4. CONSIST√äNCIA (Consistency) - Regras de neg√≥cio
print("\nüîÑ 4. CONSIST√äNCIA - Regras de neg√≥cio")
# Telefone deve ter 11 d√≠gitos
validator.expect_column_value_lengths_to_equal("telefone", 11)
# Data de cadastro n√£o pode ser futura
validator.expect_column_values_to_be_dateutil_parseable("data_cadastro")
print("   ‚úÖ Expectativas de consist√™ncia criadas")

# 5. PONTUALIDADE (Timeliness) - Dados atuais
print("\n‚è∞ 5. PONTUALIDADE - Dados atuais")
# Data de cadastro deve ser dos √∫ltimos 2 anos
data_limite = (datetime.now() - timedelta(days=730)).strftime("%Y-%m-%d")
validator.expect_column_values_to_be_between(
    "data_cadastro", 
    min_value=data_limite, 
    max_value=datetime.now().strftime("%Y-%m-%d")
)
print("   ‚úÖ Expectativas de pontualidade criadas")

# 6. ACUR√ÅCIA (Accuracy) - Dados corretos
print("\nüéØ 6. ACUR√ÅCIA - Dados corretos")
# Nome n√£o pode ser vazio
validator.expect_column_values_to_not_match_regex("nome", r"^\s*$")
# Estado deve ser sigla v√°lida (2 caracteres)
validator.expect_column_value_lengths_to_equal("estado", 2)
print("   ‚úÖ Expectativas de acur√°cia criadas")

print("\nüíæ Todas as expectativas criadas com sucesso!")

## 6. Execu√ß√£o das Valida√ß√µes

In [None]:
# Executar valida√ß√µes
print("üîç EXECUTANDO VALIDA√á√ïES DE QUALIDADE")
print("=" * 50)

# Validar os dados
validation_result = validator.validate()

# Analisar resultados
print(f"\nüìä RESULTADOS DA VALIDA√á√ÉO:")
print(f"   Total de expectativas: {validation_result.statistics['evaluated_expectations']}")
print(f"   Expectativas que passaram: {validation_result.statistics['successful_expectations']}")
print(f"   Expectativas que falharam: {validation_result.statistics['unsuccessful_expectations']}")
print(f"   Taxa de sucesso: {validation_result.statistics['success_percent']:.1f}%")

# Status geral
if validation_result.success:
    print("\n‚úÖ VALIDA√á√ÉO PASSOU - Dados dentro dos padr√µes de qualidade")
else:
    print("\n‚ùå VALIDA√á√ÉO FALHOU - Problemas de qualidade detectados")
    print("\nüö® PROBLEMAS ENCONTRADOS:")
    
    for result in validation_result.results:
        if not result.success:
            expectation_type = result.expectation_config.expectation_type
            column = result.expectation_config.kwargs.get('column', 'N/A')
            print(f"   ‚ùå {expectation_type} - Coluna: {column}")
            if 'partial_unexpected_count' in result.result:
                print(f"      Registros com problema: {result.result['partial_unexpected_count']}")

## 7. Corre√ß√£o de Dados com PySpark

In [None]:
def corrigir_dados_qualidade_spark(df_spark):
    """
    Aplica corre√ß√µes usando PySpark baseadas nos resultados das valida√ß√µes
    """
    print("üîß APLICANDO CORRE√á√ïES DE QUALIDADE COM PYSPARK")
    print("=" * 50)
    
    # 1. Remover duplicatas (manter primeira ocorr√™ncia)
    df_corrigido = df_spark.dropDuplicates(["id_cliente"])
    print(f"‚úÖ Duplicatas removidas: {df_spark.count() - df_corrigido.count()} registros")
    
    # 2. Corrigir idades inv√°lidas
    df_corrigido = df_corrigido.withColumn(
        "idade",
        when((col("idade") < 0) | (col("idade") > 120), None)
        .otherwise(col("idade"))
    )
    print("‚úÖ Idades inv√°lidas corrigidas")
    
    # 3. Padronizar status
    df_corrigido = df_corrigido.withColumn(
        "status",
        when(col("status") == "Cancelado", "Inativo")
        .otherwise(col("status"))
    )
    print("‚úÖ Status padronizados")
    
    # 4. Remover registros com campos cr√≠ticos vazios
    df_corrigido = df_corrigido.filter(
        col("nome").isNotNull() & 
        (col("nome") != "") &
        col("email").isNotNull()
    )
    print("‚úÖ Registros com campos cr√≠ticos vazios removidos")
    
    print(f"\nüìä Registros ap√≥s corre√ß√£o: {df_corrigido.count()}")
    return df_corrigido

# Aplicar corre√ß√µes
df_clientes_corrigido_spark = corrigir_dados_qualidade_spark(df_clientes_spark)
df_clientes_corrigido_pandas = df_clientes_corrigido_spark.toPandas()

print("\nüìã Dados ap√≥s corre√ß√£o:")
df_clientes_corrigido_spark.show()

## 8. Re-valida√ß√£o com Dados Corrigidos

In [None]:
print("üîÑ RE-VALIDA√á√ÉO COM DADOS CORRIGIDOS")
print("=" * 50)

# Criar novo validator com dados corrigidos
try:
    validator_corrigido = context.sources.pandas_default.read_dataframe(df_clientes_corrigido_pandas)
    print("‚úÖ Novo validator criado")
except Exception as e:
    print(f"‚ö†Ô∏è Usando m√©todo alternativo: {str(e)[:50]}...")
    validator_corrigido = gx.validator.validator.Validator(
        execution_engine=gx.execution_engine.PandasExecutionEngine(),
        interactive_evaluation=True
    )
    validator_corrigido.load_batch_list([gx.core.batch.Batch(data=df_clientes_corrigido_pandas)])

# Aplicar as mesmas expectativas
print("üéØ Aplicando expectativas aos dados corrigidos...")

# Completude
validator_corrigido.expect_column_values_to_not_be_null("id_cliente")
validator_corrigido.expect_column_values_to_not_be_null("nome")
validator_corrigido.expect_column_values_to_not_be_null("email")

# Unicidade
validator_corrigido.expect_column_values_to_be_unique("id_cliente")
validator_corrigido.expect_column_values_to_be_unique("email")

# Validade
validator_corrigido.expect_column_values_to_match_regex("email", r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$")
validator_corrigido.expect_column_values_to_be_between("idade", min_value=0, max_value=120)
validator_corrigido.expect_column_values_to_be_in_set("status", ["Ativo", "Inativo", "Pendente"])

# Consist√™ncia
validator_corrigido.expect_column_value_lengths_to_equal("telefone", 11)
validator_corrigido.expect_column_values_to_be_dateutil_parseable("data_cadastro")

# Pontualidade
data_limite = (datetime.now() - timedelta(days=730)).strftime("%Y-%m-%d")
validator_corrigido.expect_column_values_to_be_between("data_cadastro", min_value=data_limite, max_value=datetime.now().strftime("%Y-%m-%d"))

# Acur√°cia
validator_corrigido.expect_column_values_to_not_match_regex("nome", r"^\s*$")
validator_corrigido.expect_column_value_lengths_to_equal("estado", 2)

# Executar valida√ß√£o
validation_result_corrigido = validator_corrigido.validate()

# Comparar resultados
print("\nüìä COMPARA√á√ÉO DE RESULTADOS")
print("=" * 40)
print(f"DADOS ORIGINAIS:")
print(f"   Taxa de sucesso: {validation_result.statistics['success_percent']:.1f}%")
print(f"   Expectativas que falharam: {validation_result.statistics['unsuccessful_expectations']}")

print(f"\nDADOS CORRIGIDOS:")
print(f"   Taxa de sucesso: {validation_result_corrigido.statistics['success_percent']:.1f}%")
print(f"   Expectativas que falharam: {validation_result_corrigido.statistics['unsuccessful_expectations']}")

if validation_result_corrigido.success:
    print("\nüéâ SUCESSO! Dados corrigidos passaram em todas as valida√ß√µes")
else:
    print("\n‚ö†Ô∏è Ainda existem problemas nos dados corrigidos")
    
melhoria = validation_result_corrigido.statistics['success_percent'] - validation_result.statistics['success_percent']
print(f"\nüéØ Melhoria alcan√ßada: +{melhoria:.1f} pontos percentuais")

## 9. An√°lise com PySpark - Estat√≠sticas de Qualidade

In [None]:
print("üìä AN√ÅLISE DE QUALIDADE COM PYSPARK")
print("=" * 50)

# An√°lise dos dados originais
print("\nüîç DADOS ORIGINAIS:")
print(f"Total de registros: {df_clientes_spark.count()}")
print(f"Duplicatas por ID: {df_clientes_spark.count() - df_clientes_spark.dropDuplicates(['id_cliente']).count()}")
print(f"Valores nulos em 'nome': {df_clientes_spark.filter(col('nome').isNull() | (col('nome') == '')).count()}")
print(f"Valores nulos em 'email': {df_clientes_spark.filter(col('email').isNull()).count()}")
print(f"Idades inv√°lidas: {df_clientes_spark.filter((col('idade') < 0) | (col('idade') > 120)).count()}")

# An√°lise dos dados corrigidos
print("\n‚úÖ DADOS CORRIGIDOS:")
print(f"Total de registros: {df_clientes_corrigido_spark.count()}")
print(f"Duplicatas por ID: {df_clientes_corrigido_spark.count() - df_clientes_corrigido_spark.dropDuplicates(['id_cliente']).count()}")
print(f"Valores nulos em 'nome': {df_clientes_corrigido_spark.filter(col('nome').isNull() | (col('nome') == '')).count()}")
print(f"Valores nulos em 'email': {df_clientes_corrigido_spark.filter(col('email').isNull()).count()}")
print(f"Idades inv√°lidas: {df_clientes_corrigido_spark.filter((col('idade') < 0) | (col('idade') > 120)).count()}")

# Distribui√ß√£o por status
print("\nüìà DISTRIBUI√á√ÉO POR STATUS (Dados Corrigidos):")
df_clientes_corrigido_spark.groupBy("status").count().orderBy("count", ascending=False).show()

# Distribui√ß√£o por estado
print("\nüó∫Ô∏è DISTRIBUI√á√ÉO POR ESTADO (Dados Corrigidos):")
df_clientes_corrigido_spark.groupBy("estado").count().orderBy("count", ascending=False).show()

## 10. Gera√ß√£o de Relat√≥rios de Qualidade

In [None]:
# Gerar Data Docs (relat√≥rios HTML)
print("üìà GERANDO RELAT√ìRIOS DE QUALIDADE (DATA DOCS)")
print("=" * 50)

try:
    # Construir Data Docs
    context.build_data_docs()
    
    # Obter URLs dos relat√≥rios
    data_docs_sites = context.get_docs_sites_urls()
    
    print("‚úÖ Relat√≥rios de qualidade gerados com sucesso!")
    print("\nüìä RELAT√ìRIOS DISPON√çVEIS:")
    
    for site_name, url in data_docs_sites.items():
        print(f"   üìã {site_name}: {url}")
        
    print("\nüí° Abra os URLs acima no navegador para visualizar os relat√≥rios detalhados")
    
except Exception as e:
    print(f"‚ö†Ô∏è Erro ao gerar Data Docs: {str(e)}")

# Resumo executivo
print("\nüìã RESUMO EXECUTIVO DE QUALIDADE")
print("=" * 50)
print(f"üìä Dataset Original (PySpark):")
print(f"   - Registros: {df_clientes_spark.count()}")
print(f"   - Taxa de qualidade: {validation_result.statistics['success_percent']:.1f}%")
print(f"   - Status: {'‚úÖ APROVADO' if validation_result.success else '‚ùå REPROVADO'}")

print(f"\nüìä Dataset Corrigido (PySpark):")
print(f"   - Registros: {df_clientes_corrigido_spark.count()}")
print(f"   - Taxa de qualidade: {validation_result_corrigido.statistics['success_percent']:.1f}%")
print(f"   - Status: {'‚úÖ APROVADO' if validation_result_corrigido.success else '‚ùå REPROVADO'}")

melhoria = validation_result_corrigido.statistics['success_percent'] - validation_result.statistics['success_percent']
print(f"\nüéØ Melhoria alcan√ßada: +{melhoria:.1f} pontos percentuais")
print(f"üìâ Redu√ß√£o de registros: {df_clientes_spark.count() - df_clientes_corrigido_spark.count()} registros removidos")

## 11. Pipeline DataOps Completo com PySpark + Great Expectations

In [None]:
def pipeline_dataops_pyspark_gx(df_spark, context):
    """
    Pipeline DataOps completo usando PySpark + Great Expectations
    """
    print("üöÄ PIPELINE DATAOPS PYSPARK + GREAT EXPECTATIONS")
    print("=" * 60)
    
    pipeline_result = {
        "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        "status": "EXECUTANDO",
        "etapas_concluidas": [],
        "metricas_qualidade": {},
        "spark_stats": {}
    }
    
    try:
        # Etapa 1: An√°lise inicial com PySpark
        print("\nüìä Etapa 1: An√°lise inicial com PySpark")
        pipeline_result["spark_stats"]["registros_originais"] = df_spark.count()
        pipeline_result["spark_stats"]["duplicatas"] = df_spark.count() - df_spark.dropDuplicates(['id_cliente']).count()
        pipeline_result["etapas_concluidas"].append("analise_spark")
        
        # Etapa 2: Valida√ß√£o com Great Expectations
        print("\nüéØ Etapa 2: Valida√ß√£o com Great Expectations")
        df_pandas = df_spark.toPandas()
        validator = context.sources.pandas_default.read_dataframe(df_pandas)
        
        # Aplicar expectativas
        validator.expect_column_values_to_not_be_null("id_cliente")
        validator.expect_column_values_to_be_unique("id_cliente")
        validator.expect_column_values_to_be_between("idade", min_value=0, max_value=120)
        
        validation_result = validator.validate()
        pipeline_result["metricas_qualidade"]["inicial"] = validation_result.statistics
        pipeline_result["etapas_concluidas"].append("validacao_gx")
        
        # Etapa 3: Corre√ß√£o com PySpark (se necess√°rio)
        if not validation_result.success:
            print("\nüîß Etapa 3: Corre√ß√£o com PySpark")
            df_corrigido = corrigir_dados_qualidade_spark(df_spark)
            pipeline_result["spark_stats"]["registros_corrigidos"] = df_corrigido.count()
            pipeline_result["etapas_concluidas"].append("correcao_spark")
            
            # Etapa 4: Re-valida√ß√£o
            print("\nüîç Etapa 4: Re-valida√ß√£o")
            df_corrigido_pandas = df_corrigido.toPandas()
            validator_final = context.sources.pandas_default.read_dataframe(df_corrigido_pandas)
            
            # Aplicar expectativas novamente
            validator_final.expect_column_values_to_not_be_null("id_cliente")
            validator_final.expect_column_values_to_be_unique("id_cliente")
            validator_final.expect_column_values_to_be_between("idade", min_value=0, max_value=120)
            
            validation_result_final = validator_final.validate()
            pipeline_result["metricas_qualidade"]["final"] = validation_result_final.statistics
            pipeline_result["etapas_concluidas"].append("revalidacao")
            
            if validation_result_final.success:
                pipeline_result["status"] = "‚úÖ SUCESSO"
            else:
                pipeline_result["status"] = "‚ö†Ô∏è SUCESSO PARCIAL"
        else:
            pipeline_result["status"] = "‚úÖ SUCESSO"
        
        # Etapa 5: Relat√≥rios
        print("\nüìà Etapa 5: Gerando relat√≥rios")
        context.build_data_docs()
        pipeline_result["etapas_concluidas"].append("relatorios")
        
    except Exception as e:
        pipeline_result["status"] = "‚ùå ERRO"
        pipeline_result["erro"] = str(e)
    
    return pipeline_result

# Executar pipeline completo
resultado_pipeline = pipeline_dataops_pyspark_gx(df_clientes_spark, context)

print(f"\nüìã Status final do pipeline: {resultado_pipeline['status']}")
print(f"üìÖ Timestamp: {resultado_pipeline['timestamp']}")
print(f"‚úÖ Etapas conclu√≠das: {', '.join(resultado_pipeline['etapas_concluidas'])}")
print(f"üìä Estat√≠sticas Spark: {resultado_pipeline['spark_stats']}")

## 12. Conclus√µes

### üéØ O que Aprendemos com PySpark + Great Expectations

Neste laborat√≥rio, implementamos um **pipeline DataOps profissional** combinando PySpark e Great Expectations:

#### ‚úÖ Vantagens da Combina√ß√£o:
- **PySpark**: Processamento distribu√≠do e escal√°vel
- **Great Expectations**: Valida√ß√µes padronizadas e relat√≥rios
- **Integra√ß√£o**: Convers√£o pandas para compatibilidade
- **Performance**: Corre√ß√µes em larga escala com Spark
- **Governan√ßa**: Controle completo da qualidade

#### ‚úÖ 6 Dimens√µes Implementadas:
- **Completude**: `expect_column_values_to_not_be_null`
- **Unicidade**: `expect_column_values_to_be_unique`
- **Validade**: `expect_column_values_to_match_regex`, `expect_column_values_to_be_between`
- **Consist√™ncia**: `expect_column_value_lengths_to_equal`
- **Pontualidade**: `expect_column_values_to_be_between` (datas)
- **Acur√°cia**: `expect_column_values_to_not_match_regex`

### üí° Li√ß√µes Principais

> **"PySpark + Great Expectations = Pipeline DataOps escal√°vel e profissional"**

- **Escalabilidade**: PySpark para grandes volumes
- **Padroniza√ß√£o**: Great Expectations para valida√ß√µes
- **Automa√ß√£o**: Pipeline completo automatizado
- **Relat√≥rios**: Data Docs profissionais

## üßπ Limpeza do Ambiente

In [None]:
# Finalizar Spark Session
spark.stop()
print("‚úÖ Spark Session finalizada")
print("üéì Laborat√≥rio PySpark + Great Expectations conclu√≠do com sucesso!")
print("\nüìö Continue explorando DataOps com PySpark e Great Expectations!")
print("\nüîó Recursos adicionais:")
print("   - PySpark: https://spark.apache.org/docs/latest/api/python/")
print("   - Great Expectations: https://docs.greatexpectations.io/")
print("   - DataOps: https://dataops.live/")