# Log Analysis Pipeline - Notebook Test

Este notebook testa o pipeline de análise de logs localmente.

In [None]:
# Importar módulos necessários
import sys
sys.path.append('../src')

# Importação ultra simplificada - apenas um único import
from log_analyzer.etl import run_pipeline
from log_analyzer.core.spark import get_spark_session

## Framework de ETL
O framework de ETL foi projetado para ser simples e modular, permitindo fácil integração e reutilização. A estrutura do projeto é a seguinte:

```
log_analyzer/
  etl/
    extractor.py   - Funções para extração de logs
    transformer.py - Funções para transformação de dados
    analyzer.py    - Funções para análises e métricas
    load.py        - Funções para carregamento de dados
    simple_pipeline.py - Pipeline simplificado que orquestra tudo
```


```python
from log_analyzer.etl import run_pipeline, extract_logs, transform_logs, analyze_logs
```

In [None]:
# Verificar se o Spark está funcionando
try:
    spark = get_spark_session()
    print(f"Spark version: {spark.version}")
    print(f"Spark UI: http://localhost:4040")
    print(f"SparkContext ativo: {not spark._jsc.sc().isStopped()}")
except Exception as e:
    print(f"⚠️ Erro ao inicializar o Spark: {str(e)}")
    print("Tentando criar uma nova sessão Spark...")
    from log_analyzer.core.spark import get_spark_session
    spark = get_spark_session(app_name="log_analyzer_notebook_retry")

In [None]:
# Executar o pipeline completo usando a API simplificada
result = run_pipeline(
    input_path="../data/logs.txt",
    output_path="../data"
)

print(f"Status: {result['status']}")
if result['status'] == 'success':
    print(f"✅ Pipeline executado com sucesso!")
    print(f"Registros processados: {result.get('processed_records', 0)}")
    print(f"Output: {result.get('output_path', '../data')}")
    
    # Exibir métricas-chave
    metrics = result.get("metrics", {})
    if metrics:
        print("\n📊 Métricas principais:")
        for key, value in metrics.items():
            if not isinstance(value, (dict, list)):
                print(f"  - {key}: {value}")
else:
    print(f"❌ Falha no pipeline: {result.get('error', 'Erro desconhecido')}")

In [None]:
import os
import random
from datetime import datetime, timedelta
from pathlib import Path

# Criar diretório para testes se não existir
test_dir = Path("../data/test")
test_dir.mkdir(parents=True, exist_ok=True)

# Caminho do arquivo de logs de teste
test_log_path = test_dir / "dummy_logs.txt"

# Lista de IPs de exemplo
sample_ips = [
    "192.168.1." + str(i) for i in range(1, 20)
] + ["10.0.0." + str(i) for i in range(1, 10)]

# URLs de exemplo 
sample_urls = [
    "/home", "/about", "/login", "/logout", "/dashboard", 
    "/profile", "/settings", "/api/data", "/static/main.css", 
    "/static/logo.png", "/static/app.js"
]

# Códigos de status HTTP
status_codes = [200, 200, 200, 200, 200, 301, 302, 404, 403, 500]

# Formatos de User Agent
user_agents = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15",
    "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko)",
    "Mozilla/5.0 (iPhone; CPU iPhone OS 14_7_1 like Mac OS X)"
]

# Gerar logs aleatórios no formato Apache Combined
with open(test_log_path, "w") as f:
    # Gerar 1000 linhas de logs
    base_time = datetime.now() - timedelta(days=2)
    
    for i in range(1000):
        ip = random.choice(sample_ips)
        timestamp = base_time + timedelta(minutes=i//10)  # Incremento a cada 10 registros
        formatted_time = timestamp.strftime("%d/%b/%Y:%H:%M:%S +0000")
        method = random.choice(["GET", "POST", "PUT", "DELETE"])
        url = random.choice(sample_urls)
        status = random.choice(status_codes)
        size = random.randint(100, 10000) if status != 404 else 0
        referer = "https://example.com" if random.random() > 0.5 else "-"
        user_agent = random.choice(user_agents)
        
        log_line = f'{ip} - - [{formatted_time}] "{method} {url} HTTP/1.1" {status} {size} "{referer}" "{user_agent}"\n'
        f.write(log_line)

print(f"✅ Arquivo de logs de teste criado em: {test_log_path}")
print(f"Tamanho: {os.path.getsize(test_log_path)} bytes")
print(f"Primeiras 3 linhas:")

# Mostrar algumas linhas do arquivo gerado
with open(test_log_path, "r") as f:
    for _ in range(3):
        print(f.readline().strip())

In [None]:
# Executar o pipeline com o arquivo de logs fictício
test_output_path = test_dir / "output"

print(f"🚀 Executando pipeline com os logs fictícios...")
dummy_result = run_pipeline(
    input_path=str(test_log_path),
    output_path=str(test_output_path),
    save_to_db=False
)

print(f"\nStatus: {dummy_result['status']}")
if dummy_result['status'] == 'success':
    print(f"✅ Pipeline executado com sucesso!")
    print(f"Registros processados: {dummy_result.get('processed_records', 0)}")
    print(f"Output: {dummy_result.get('output_path', str(test_output_path))}")
    
    # Exibir métricas-chave
    metrics = dummy_result.get("metrics", {})
    if metrics:
        print("\n📊 Métricas principais:")
        for key, value in metrics.items():
            if not isinstance(value, (dict, list)):
                print(f"  - {key}: {value}")
else:
    print(f"❌ Falha no pipeline: {dummy_result.get('error', 'Erro desconhecido')}")

In [None]:
# Analisar os resultados gerados com os logs fictícios
import json
import os
import glob

# Verificar os diretórios criados
test_output_dir = test_output_path
print(f"\n📂 Estrutura de diretórios criada:")
for root, dirs, files in os.walk(test_output_dir):
    level = root.replace(str(test_output_dir), '').count(os.sep)
    indent = ' ' * 4 * level
    print(f"{indent}{os.path.basename(root)}/")
    sub_indent = ' ' * 4 * (level + 1)
    for file in files:
        print(f"{sub_indent}{file}")

# Verificar se temos métricas diretamente no resultado do pipeline
if 'metrics' in dummy_result and dummy_result['metrics']:
    print(f"\n📊 Métricas disponíveis no resultado do pipeline:")
    for key, value in dummy_result['metrics'].items():
        if isinstance(value, (dict, list)):
            print(f"  - {key}: (objeto complexo)")
        else:
            print(f"  - {key}: {value}")
else:
    # Caso contrário, tentar buscar métricas em arquivos
    gold_dir = test_output_dir / "gold"
    if gold_dir.exists():
        # Procurar por arquivos JSON com métricas
        json_files = glob.glob(f"{gold_dir}/summary_metrics.json")
        if not json_files:
            json_files = glob.glob(f"{gold_dir}/*.json")
        
        if json_files:
            with open(json_files[0], 'r') as f:
                try:
                    metrics = json.load(f)
                    print(f"\n📊 Métricas Geradas com Logs Fictícios:")
                    if isinstance(metrics, dict):
                        for key, value in metrics.items():
                            print(f"  - {key}: {value}")
                    else:
                        print(metrics)
                except json.JSONDecodeError:
                    # Tenta ler como JSON lines (cada linha um JSON)
                    f.seek(0)
                    metrics = [json.loads(line) for line in f]
                    print(f"\n📊 Métricas Geradas com Logs Fictícios:")
                    for metric in metrics:
                        print(f"  - {metric.get('metric', 'unknown')}: {metric.get('value', 'N/A')}")
        else:
            print(f"\nNenhum arquivo JSON encontrado em {gold_dir}")
    else:
        print(f"\nDiretório Gold não foi criado: {gold_dir}")

# Tratamento de erro para sessão Spark encerrada
try:
    spark
except NameError:
    print("\n⚠️ Erro: A sessão Spark foi encerrada. Por favor, inicie uma nova sessão Spark.")

In [None]:
# Carregando e visualizando os dados processados dos logs fictícios
import traceback
bronze_path = test_output_dir / "bronze" / "logs.parquet"
silver_path = test_output_dir / "silver" / "logs.parquet"

try:
    # Verificar se o Spark está ativo
    if spark._jsc.sc().isStopped():
        print("⚠️ SparkContext foi encerrado. Criando uma nova sessão...")
        from log_analyzer.core.spark import get_spark_session
        spark = get_spark_session()
    
    if bronze_path.parent.exists():
        df_test_bronze = spark.read.parquet(str(bronze_path))
        print(f"\n📦 Bronze Layer (Logs Fictícios):")
        print(f"Total de registros: {df_test_bronze.count()}")
        print(f"Schema:")
        df_test_bronze.printSchema()
        print(f"Primeiros 5 registros:")
        df_test_bronze.show(5, truncate=False)
    else:
        print(f"Bronze layer não encontrada: {bronze_path}")

    if silver_path.parent.exists():
        df_test_silver = spark.read.parquet(str(silver_path))
        print(f"\n🥈 Silver Layer (Logs Fictícios):")
        print(f"Total de registros: {df_test_silver.count()}")
        
        # Analisar distribuição de status HTTP
        print(f"\nDistribuição de códigos de status HTTP:")
        df_test_silver.groupBy("status_int").count().orderBy("status_int").show()
        
        # Analisar top IPs
        print(f"\nTop 5 IPs por número de requisições:")
        from pyspark.sql.functions import count, desc
        df_test_silver.groupBy("ip").agg(count("*").alias("requests")).orderBy(desc("requests")).limit(5).show()
        
        # Verificar URLs mais acessadas
        print(f"\nTop 5 URLs mais acessadas:")
        df_test_silver.groupBy("url").agg(count("*").alias("requests")).orderBy(desc("requests")).limit(5).show(truncate=False)
    else:
        print(f"Silver layer não encontrada: {silver_path}")
except Exception as e:
    print(f"⚠️ Erro ao processar dados Parquet: {str(e)}")
    print("Continuando com o restante do notebook...")

In [None]:
# Ler e exibir as métricas finais
import json
import os
import glob
from pathlib import Path

# Reimplementamos o pipeline, então vamos executá-lo novamente para ter os dados mais recentes
print("🔄 Executando o pipeline novamente para obter as métricas mais recentes...")
fresh_result = run_pipeline(
    input_path="../data/logs.txt",
    output_path="../data"
)

if fresh_result["status"] == "success" and "metrics" in fresh_result:
    print("\n📊 Métricas do resultado direto do pipeline:")
    metrics = fresh_result["metrics"]
    
    # Exibir métricas estruturadas
    for key, value in metrics.items():
        if isinstance(value, dict):
            print(f"\n  📈 {key}:")
            for sub_key, sub_value in value.items():
                print(f"    - {sub_key}: {sub_value}")
        elif isinstance(value, list):
            print(f"\n  📈 {key} (top 5):")
            for i, item in enumerate(value[:5]):
                if isinstance(item, dict):
                    print(f"    {i+1}. {item}")
                else:
                    print(f"    {i+1}. {item}")
        else:
            print(f"  - {key}: {value}")
else:
    # Método alternativo: procurar por métricas na pasta gold
    print("\nBuscando métricas em arquivos JSON...")
    gold_dir = Path("../data/gold")
    json_files = glob.glob(f"{gold_dir}/summary_metrics.json")

    if not json_files:
        json_files = glob.glob(f"{gold_dir}/*.json")

    if json_files:
        with open(json_files[0], 'r') as f:
            try:
                # Tentar primeiro como um único JSON
                metrics = json.load(f)
                print("\n📊 Métricas Geradas:")
                for key, value in metrics.items():
                    print(f"  - {key}: {value}")
            except json.JSONDecodeError:
                # Se falhar, tentar como JSON lines (cada linha um objeto)
                f.seek(0)  # Voltar ao início do arquivo
                metrics = [json.loads(line) for line in f]
                print("\n📊 Métricas Geradas (formato JSON lines):")
                for metric in metrics:
                    print(f"  - {metric.get('metric', 'unknown')}: {metric.get('value', 'N/A')}")
    else:
        print("Nenhuma métrica JSON encontrada na pasta Gold!")

In [None]:
# Visualizar dados Bronze
try:
    # Verificar se o Spark está ativo
    if spark._jsc.sc().isStopped():
        print("⚠️ SparkContext foi encerrado. Criando uma nova sessão...")
        from log_analyzer.core.spark import get_spark_session
        spark = get_spark_session()
        
    df_bronze = spark.read.parquet("../data/bronze/logs.parquet")
    print(f"\n📦 Bronze Layer:")
    print(f"Total de registros: {df_bronze.count()}")
    print(f"\nSchema:")
    df_bronze.printSchema()
    print(f"\nPrimeiros 5 registros:")
    df_bronze.show(5, truncate=False)
except Exception as e:
    print(f"⚠️ Erro ao processar dados Bronze: {str(e)}")
    print("Continuando com o restante do notebook...")

In [None]:
# Visualizar dados Silver
try:
    # Verificar se o Spark está ativo
    if spark._jsc.sc().isStopped():
        print("⚠️ SparkContext foi encerrado. Criando uma nova sessão...")
        from log_analyzer.core.spark import get_spark_session
        spark = get_spark_session()
        
    # Tentar reutilizar a variável df_bronze se existir
    if 'df_bronze' not in locals() or df_bronze is None:
        try:
            df_bronze = spark.read.parquet("../data/bronze/logs.parquet")
        except:
            df_bronze = None
            print("Não foi possível carregar os dados Bronze para comparação")
    
    df_silver = spark.read.parquet("../data/silver/logs.parquet")
    print(f"\n🥈 Silver Layer:")
    print(f"Total de registros: {df_silver.count()}")
    
    if df_bronze is not None:
        print(f"\nCampos adicionados na transformação:")
        silver_cols = set(df_silver.columns)
        bronze_cols = set(df_bronze.columns)
        new_cols = silver_cols - bronze_cols
        print(f"Novos campos: {new_cols}")
except Exception as e:
    print(f"⚠️ Erro ao processar dados Silver: {str(e)}")
    print("Continuando com o restante do notebook...")

In [None]:
# Análise adicional - Top 5 IPs
try:
    # Verificar se o Spark está ativo
    if spark._jsc.sc().isStopped():
        print("⚠️ SparkContext foi encerrado. Criando uma nova sessão...")
        from log_analyzer.core.spark import get_spark_session
        spark = get_spark_session()
        
    # Se a variável df_silver não existir ou for None, recarregá-la
    if 'df_silver' not in locals() or df_silver is None:
        df_silver = spark.read.parquet("../data/silver/logs.parquet")
    
    from pyspark.sql.functions import count, desc
    top_ips = df_silver.groupBy("ip").agg(count("*").alias("requests")).orderBy(desc("requests")).limit(5)
    print("\n🎯 Top 5 IPs por número de requisições:")
    top_ips.show()
except Exception as e:
    print(f"⚠️ Erro ao analisar Top 5 IPs: {str(e)}")
    print("Continuando com o restante do notebook...")
    # Criar uma variável vazia para evitar erros nas células subsequentes
    top_ips = None

In [None]:
# Não fechar a sessão Spark aqui, vamos movê-la para o final
# spark.stop()
print("✅ Mantendo a sessão Spark ativa para o restante do notebook")

In [None]:
# Exemplo de uso das funções individuais do pipeline
from log_analyzer.etl import extract_logs, transform_logs, analyze_logs, load_to_parquet

print("\n🚀 Demonstração de uso das funções individuais do pipeline:")
print("Este exemplo mostra como você pode usar cada componente separadamente")
print("ou combiná-los da forma que precisar")

# Exemplo simplificado - não executa de verdade para não duplicar o processamento
print("\nEtapas individuais:")
print("1. df_bronze = extract_logs('arquivo_logs.txt', log_format='common')")
print("2. df_silver = transform_logs(df_bronze)")
print("3. resultados = analyze_logs(df_silver)")
print("4. load_to_parquet(df_silver, 'caminho/saida')")

print("\n✅ Com esta abordagem modular, você pode facilmente:")
print("- Reimplementar apenas partes específicas do pipeline")
print("- Combinar componentes de forma flexível")
print("- Testar cada etapa separadamente")
print("- Entender melhor o fluxo de dados")

In [None]:
# Exemplo de uso direto da API simplificada
try:
    from log_analyzer.etl.load import load_to_parquet
    
    # Verificar se temos a variável top_ips disponível
    if 'top_ips' not in locals() or top_ips is None:
        print("⚠️ Dados de top_ips não estão disponíveis. Criando dados de exemplo...")
        
        # Verificar se o Spark está ativo
        if spark._jsc.sc().isStopped():
            print("⚠️ SparkContext foi encerrado. Criando uma nova sessão...")
            from log_analyzer.core.spark import get_spark_session
            spark = get_spark_session()
        
        # Criar um DataFrame de exemplo simples
        data = [("192.168.1.1", 100), ("192.168.1.2", 80), ("192.168.1.3", 50)]
        top_ips = spark.createDataFrame(data, ["ip", "requests"])
    
    # Vamos salvar o resultado da análise dos top IPs em um formato simples
    output_path = "../data/examples/top_ips_example"
    print(f"\n💡 Demonstração da API simplificada de carregamento:")
    print(f"Salvando top IPs em: {output_path}")

    # Uso direto da função load_to_parquet sem precisar instanciar classes
    load_to_parquet(
        df=top_ips,
        output_path=output_path,
        mode="overwrite",
        partition_by=None
    )

    print("✅ Dados salvos com sucesso usando a API simplificada!")
except Exception as e:
    print(f"⚠️ Erro ao demonstrar carregamento de dados: {str(e)}")
    print("Finalizando o notebook...")

In [None]:
# Demonstração do problema de tipo em PySpark e sua solução
try:
    # Verificar se o Spark está ativo
    if 'spark' not in locals() or spark._jsc.sc().isStopped():
        print("Criando nova sessão Spark para demonstração...")
        from log_analyzer.core.spark import get_spark_session
        spark = get_spark_session(app_name="boolean_cast_demo")
    
    # Criar um DataFrame simples para demonstração
    from pyspark.sql import Row
    from pyspark.sql.functions import col, sum as spark_sum
    
    # Dados de exemplo: status HTTP
    data = [
        Row(url="/home", status=200),
        Row(url="/about", status=200),
        Row(url="/not-found", status=404),
        Row(url="/login", status=200),
        Row(url="/admin", status=403),
        Row(url="/error", status=500)
    ]
    
    df = spark.createDataFrame(data)
    
    print("DataFrame de demonstração:")
    df.show()
    
    # Demonstração do problema
    print("\nCriando uma expressão booleana: status >= 400")
    df.withColumn("is_error", col("status") >= 400).show()
    
    print("\n1) Contando erros corretamente (com cast para integer):")
    result = df.agg(spark_sum((col("status") >= 400).cast("int")).alias("total_errors"))
    result.show()
    
except Exception as e:
    print(f"⚠️ Erro na demonstração: {str(e)}")
    print("Continuando com o notebook...")

In [None]:
spark.stop()
print("✅ Sessão Spark encerrada com sucesso")