# 🧪 Iris MLOps - Test Runner

Este notebook executa todos os testes do projeto Iris MLOps para validar:
- Qualidade dos dados
- Funcionalidade dos pipelines
- Integridade das tabelas

## 📋 Status dos Testes

In [None]:
# Imports necessários
import sys
import os
import pytest
import subprocess
from datetime import datetime
import pandas as pd
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# Configurar paths
project_root = '/Workspace/Repos/iris_bundle'
sys.path.append(project_root)
sys.path.append(f'{project_root}/tests')

print(f"📁 Project root: {project_root}")
print(f"🐍 Python version: {sys.version}")
print(f"⏰ Test execution time: {datetime.now()}")

## 🔧 Configuração do Ambiente de Teste

In [None]:
# Inicializar Spark Session para testes
spark = SparkSession.builder \
    .appName("IrisMLOps_TestRunner") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

print(f"✅ Spark Session criada: {spark.version}")
print(f"📊 Catálogo atual: {spark.catalog.currentDatabase()}")

# Verificar se as tabelas existem
try:
    tables = spark.catalog.listTables("default")
    iris_tables = [t.name for t in tables if 'iris' in t.name]
    print(f"🗂️ Tabelas Iris encontradas: {iris_tables}")
except Exception as e:
    print(f"⚠️ Erro ao listar tabelas: {e}")

## 🧪 Teste 1: Validação de Dados Mock

In [None]:
# Teste do banco mock
print("🔍 Executando teste do banco mock...")

try:
    from tests.mock_db import create_mock_iris_db
    
    # Criar conexão mock
    conn = create_mock_iris_db()
    
    # Verificar dados
    df_mock = pd.read_sql_query("SELECT * FROM iris LIMIT 5", conn)
    
    print(f"✅ Mock DB criado com sucesso!")
    print(f"📊 Registros no mock: {len(pd.read_sql_query('SELECT * FROM iris', conn))}")
    print(f"🏷️ Colunas disponíveis: {list(df_mock.columns)}")
    
    # Mostrar amostra
    display(df_mock)
    
    conn.close()
    
except Exception as e:
    print(f"❌ Erro no teste mock: {e}")

## 🧪 Teste 2: Validação de Tabelas Unity Catalog

In [None]:
# Teste das tabelas Unity Catalog
print("🔍 Executando validação das tabelas Unity Catalog...")

# Definir tabelas para testar
tables_to_test = [
    "default.iris_bronze",
    "default.iris_silver", 
    "default.iris_gold"
]

test_results = {}

for table_name in tables_to_test:
    try:
        print(f"\n📋 Testando tabela: {table_name}")
        
        # Verificar se a tabela existe
        df = spark.table(table_name)
        count = df.count()
        
        # Verificar schema
        schema_info = [(field.name, field.dataType.simpleString()) for field in df.schema.fields]
        
        # Verificar nulls
        null_counts = {}
        for col in df.columns:
            null_count = df.filter(F.col(col).isNull()).count()
            null_counts[col] = null_count
        
        test_results[table_name] = {
            "status": "✅ PASSOU",
            "count": count,
            "schema": schema_info,
            "nulls": null_counts
        }
        
        print(f"  ✅ Registros: {count}")
        print(f"  📊 Colunas: {len(df.columns)}")
        print(f"  🚫 Nulls totais: {sum(null_counts.values())}")
        
    except Exception as e:
        test_results[table_name] = {
            "status": "❌ FALHOU",
            "error": str(e)
        }
        print(f"  ❌ Erro: {e}")

# Mostrar resumo
print("\n📊 RESUMO DOS TESTES:")
for table, result in test_results.items():
    print(f"{table}: {result['status']}")

## 🧪 Teste 3: Qualidade de Dados - Camada Silver

In [None]:
# Teste específico da qualidade de dados Silver
print("🔍 Executando teste de qualidade - Silver layer...")

try:
    # Carregar tabela silver
    df_silver = spark.table("default.iris_silver")
    
    # Testes de qualidade
    quality_tests = {
        "total_records": df_silver.count(),
        "duplicate_check": df_silver.count() - df_silver.dropDuplicates().count(),
        "null_check": {},
        "data_types": {},
        "value_ranges": {}
    }
    
    # Verificar nulls por coluna
    for col in df_silver.columns:
        null_count = df_silver.filter(F.col(col).isNull()).count()
        quality_tests["null_check"][col] = null_count
    
    # Verificar tipos de dados
    for field in df_silver.schema.fields:
        quality_tests["data_types"][field.name] = field.dataType.simpleString()
    
    # Verificar ranges para colunas numéricas
    numeric_cols = [f.name for f in df_silver.schema.fields if 'double' in f.dataType.simpleString()]
    
    for col in numeric_cols:
        stats = df_silver.select(
            F.min(col).alias('min'),
            F.max(col).alias('max'),
            F.mean(col).alias('mean'),
            F.stddev(col).alias('stddev')
        ).collect()[0]
        
        quality_tests["value_ranges"][col] = {
            "min": float(stats['min']),
            "max": float(stats['max']),
            "mean": float(stats['mean']),
            "stddev": float(stats['stddev'])
        }
    
    # Avaliar resultados
    print(f"\n📊 RESULTADOS DE QUALIDADE:")
    print(f"  📈 Total de registros: {quality_tests['total_records']}")
    print(f"  🔍 Duplicatas encontradas: {quality_tests['duplicate_check']}")
    print(f"  🚫 Total de nulls: {sum(quality_tests['null_check'].values())}")
    
    # Verificar se passou nos testes
    quality_passed = (
        quality_tests['total_records'] > 0 and
        quality_tests['duplicate_check'] == 0 and
        sum(quality_tests['null_check'].values()) == 0
    )
    
    if quality_passed:
        print("\n✅ TODOS OS TESTES DE QUALIDADE PASSARAM!")
    else:
        print("\n❌ ALGUNS TESTES DE QUALIDADE FALHARAM!")
    
    # Criar DataFrame com resultados para visualização
    quality_df = pd.DataFrame([
        {"Teste": "Total de Registros", "Resultado": quality_tests['total_records'], "Status": "✅" if quality_tests['total_records'] > 0 else "❌"},
        {"Teste": "Sem Duplicatas", "Resultado": quality_tests['duplicate_check'], "Status": "✅" if quality_tests['duplicate_check'] == 0 else "❌"},
        {"Teste": "Sem Nulls", "Resultado": sum(quality_tests['null_check'].values()), "Status": "✅" if sum(quality_tests['null_check'].values()) == 0 else "❌"}
    ])
    
    display(quality_df)
    
except Exception as e:
    print(f"❌ Erro no teste de qualidade: {e}")

## 🧪 Teste 4: Execução dos Testes Unitários com Pytest

In [None]:
# Executar pytest nos arquivos de teste
print("🔍 Executando testes unitários com pytest...")

try:
    # Mudar para o diretório do projeto
    os.chdir('/Workspace/Repos/iris_bundle')
    
    # Executar pytest
    result = subprocess.run(
        ['python', '-m', 'pytest', 'tests/', '-v', '--tb=short'],
        capture_output=True,
        text=True
    )
    
    print(f"📤 Exit code: {result.returncode}")
    print(f"\n📋 STDOUT:")
    print(result.stdout)
    
    if result.stderr:
        print(f"\n⚠️ STDERR:")
        print(result.stderr)
    
    if result.returncode == 0:
        print("\n✅ TODOS OS TESTES UNITÁRIOS PASSARAM!")
    else:
        print("\n❌ ALGUNS TESTES UNITÁRIOS FALHARAM!")
        
except Exception as e:
    print(f"❌ Erro ao executar pytest: {e}")

## 🧪 Teste 5: Verificação do Modelo MLflow

In [None]:
# Teste do modelo MLflow
print("🔍 Verificando modelo MLflow...")

try:
    import mlflow
    from mlflow import MlflowClient
    
    # Configurar MLflow
    client = MlflowClient()
    
    # Verificar se existe algum modelo registrado
    try:
        models = client.search_registered_models()
        iris_models = [m for m in models if 'iris' in m.name.lower()]
        
        if iris_models:
            print(f"✅ Modelos Iris encontrados: {len(iris_models)}")
            for model in iris_models:
                print(f"  📦 Modelo: {model.name}")
                latest_version = client.get_latest_versions(model.name, stages=["None", "Staging", "Production"])
                if latest_version:
                    print(f"    🏷️ Versão mais recente: {latest_version[0].version}")
                    print(f"    📊 Status: {latest_version[0].current_stage}")
        else:
            print("⚠️ Nenhum modelo Iris encontrado no registry")
            
    except Exception as e:
        print(f"⚠️ Registry não disponível, testando fallback: {e}")
        
        # Testar carregamento de modelo via runs
        experiments = client.search_experiments()
        print(f"📊 Experimentos encontrados: {len(experiments)}")
        
        # Buscar runs recentes
        if experiments:
            runs = client.search_runs(experiment_ids=[exp.experiment_id for exp in experiments[:3]])
            model_runs = [run for run in runs if any('model' in tag for tag in run.data.tags.keys())]
            print(f"🎯 Runs com modelos: {len(model_runs)}")
            
            if model_runs:
                print("✅ Fallback de modelo disponível")
            else:
                print("⚠️ Nenhum modelo encontrado em runs")
    
except Exception as e:
    print(f"❌ Erro na verificação do MLflow: {e}")

## 📊 Relatório Final dos Testes

In [None]:
# Gerar relatório final
print("📋 RELATÓRIO FINAL DE TESTES - IRIS MLOPS")
print("=" * 50)
print(f"⏰ Executado em: {datetime.now()}")
print(f"🖥️ Ambiente: Databricks Workspace")
print(f"📦 Spark Version: {spark.version}")
print()

# Resumo dos componentes testados
components = [
    "🗃️ Mock Database",
    "📊 Unity Catalog Tables", 
    "🔍 Data Quality (Silver)",
    "🧪 Unit Tests (Pytest)",
    "🤖 MLflow Models"
]

print("🧪 COMPONENTES TESTADOS:")
for i, component in enumerate(components, 1):
    print(f"  {i}. {component}")

print()
print("✅ PRÓXIMOS PASSOS:")
print("  1. 🚀 Deploy via CI/CD: git push para main")
print("  2. 📊 Executar pipeline: make run-pipeline")
print("  3. 🔄 Monitorar jobs no Databricks Workflows")
print()
print("💡 DICA: Execute este notebook antes de cada deploy para garantir qualidade!")

# Cleanup
spark.stop()
print("\n🧹 Spark session finalizada.")