# 🎯 Ejercicio PySpark 001: Procesamiento Distribuido de Logs

**Dificultad:** SSR (Semi-Senior)  
**Tiempo Límite:** 40 minutos  
**Dataset:** server_logs.parquet (simulado)

---

## 📋 Objetivo

Procesa logs de servidor usando PySpark para identificar patrones de uso.

## ✅ Tareas:

1. Lee logs en formato Parquet (100M registros simulados)
2. Parsea timestamps y extrae features temporales (hora, día semana, etc.)
3. Identifica top 100 IPs por número de requests
4. Detecta patrones de actividad sospechosa (>1000 req/min)
5. Calcula métricas de performance por endpoint
6. Guarda resultados en formato Delta Lake

**Optimiza para performance:** usa partitioning, caching estratégico.

---

## ⏱️ Cronómetro

Ejecuta esta celda para iniciar el cronómetro:

In [None]:
import time
from datetime import datetime, timedelta

START_TIME = time.time()
TIME_LIMIT_MINUTES = 40

def show_time_elapsed():
    elapsed = time.time() - START_TIME
    minutes = int(elapsed // 60)
    seconds = int(elapsed % 60)
    limit_seconds = TIME_LIMIT_MINUTES * 60
    
    if elapsed > limit_seconds:
        print(f"⚠️ TIEMPO EXCEDIDO: {minutes:02d}:{seconds:02d} / {TIME_LIMIT_MINUTES}:00")
    else:
        print(f"⏱️ Tiempo transcurrido: {minutes:02d}:{seconds:02d} / {TIME_LIMIT_MINUTES}:00")
    
    return elapsed

print("✅ Cronómetro iniciado!")
print(f"Tiempo límite: {TIME_LIMIT_MINUTES} minutos")
print(f"Hora de inicio: {datetime.now().strftime('%H:%M:%S')}")

## 🔧 Configuración de PySpark

Instalación y configuración automática:

In [None]:
# Instalar PySpark
!pip install -q pyspark==3.5.0

# Importar librerías
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.types import *
import pandas as pd
import numpy as np

# Crear Spark Session
spark = SparkSession.builder \
    .appName('LogProcessing_Exercise') \
    .config('spark.driver.memory', '4g') \
    .config('spark.executor.memory', '4g') \
    .getOrCreate()

print("✅ PySpark configurado exitosamente!")
print(f"Spark Version: {spark.version}")

## 📊 Generar Dataset Simulado

Generamos logs de servidor simulados:

In [None]:
from datetime import datetime, timedelta
import random

# Generar datos simulados
def generate_server_logs(num_records=100000):
    """
    Genera logs de servidor simulados
    """
    endpoints = ['/api/users', '/api/products', '/api/orders', '/api/search', '/api/checkout']
    methods = ['GET', 'POST', 'PUT', 'DELETE']
    status_codes = [200, 201, 400, 401, 404, 500]
    
    # Generar IPs (algunas maliciosas con alta frecuencia)
    normal_ips = [f'192.168.1.{i}' for i in range(1, 200)]
    suspicious_ips = [f'10.0.0.{i}' for i in range(1, 10)]
    
    data = []
    base_time = datetime(2024, 1, 1, 0, 0, 0)
    
    for i in range(num_records):
        # Determinar si es tráfico sospechoso
        is_suspicious = random.random() < 0.05
        
        if is_suspicious:
            ip = random.choice(suspicious_ips)
            # IPs sospechosas hacen muchos requests
            timestamp = base_time + timedelta(seconds=i//20)
        else:
            ip = random.choice(normal_ips)
            timestamp = base_time + timedelta(seconds=i)
        
        data.append({
            'timestamp': timestamp.strftime('%Y-%m-%d %H:%M:%S'),
            'ip_address': ip,
            'method': random.choice(methods),
            'endpoint': random.choice(endpoints),
            'status_code': random.choice(status_codes),
            'response_time_ms': random.randint(10, 5000),
            'bytes_sent': random.randint(100, 100000)
        })
    
    return data

# Generar dataset
print("Generando dataset de logs...")
logs_data = generate_server_logs(100000)

# Convertir a Spark DataFrame
logs_df = spark.createDataFrame(logs_data)

print(f"✅ Dataset generado: {logs_df.count()} registros")
print("\nEsquema:")
logs_df.printSchema()
print("\nPrimeras filas:")
logs_df.show(5, truncate=False)

## 💡 TU CÓDIGO AQUÍ

Implementa la función `process_server_logs()` que debe:

1. Parsear timestamps y extraer features temporales
2. Identificar top 100 IPs por requests
3. Detectar actividad sospechosa (>1000 req/min)
4. Calcular métricas de performance por endpoint

**Retorna un diccionario con:**
- `top_ips`: DataFrame con top 100 IPs
- `suspicious_activity`: DataFrame con actividad sospechosa
- `endpoint_metrics`: DataFrame con métricas por endpoint

In [None]:
def process_server_logs(logs_df):
    """
    Procesa logs de servidor
    
    Args:
        logs_df: Spark DataFrame con logs
    
    Returns:
        dict con DataFrames: top_ips, suspicious_activity, endpoint_metrics
    """
    
    # ========================
    # TU CÓDIGO AQUÍ
    # ========================
    
    # 1. Parsear timestamps y extraer features temporales
    # Hint: usa to_timestamp, hour, dayofweek, etc.
    
    
    # 2. Identificar top 100 IPs por número de requests
    # Hint: groupBy, count, orderBy, limit
    
    
    # 3. Detectar actividad sospechosa (>1000 req/min)
    # Hint: agrupa por IP y minuto, cuenta requests
    
    
    # 4. Calcular métricas de performance por endpoint
    # Hint: avg, min, max, stddev de response_time_ms
    
    
    # Retornar resultados
    return {
        'top_ips': None,  # Reemplaza con tu DataFrame
        'suspicious_activity': None,  # Reemplaza con tu DataFrame
        'endpoint_metrics': None  # Reemplaza con tu DataFrame
    }

# Ejecutar tu solución
print("\n🚀 Ejecutando solución...\n")
results = process_server_logs(logs_df)

# Verificar tiempo
show_time_elapsed()

## ✅ Validación Automática

**NO MODIFICAR ESTA CELDA**

In [None]:
def validate_solution(results):
    """
    Valida la solución del usuario
    """
    score = 0
    max_score = 100
    feedback = []
    
    # Test 1: Top IPs (25 puntos)
    try:
        top_ips = results['top_ips']
        if top_ips is not None:
            count = top_ips.count()
            if count <= 100:
                score += 25
                feedback.append("✅ Test 1: Top IPs correcto (25/25)")
            else:
                score += 15
                feedback.append(f"⚠️ Test 1: Debe retornar máximo 100 IPs, retornaste {count} (15/25)")
        else:
            feedback.append("❌ Test 1: top_ips es None (0/25)")
    except Exception as e:
        feedback.append(f"❌ Test 1: Error - {str(e)} (0/25)")
    
    # Test 2: Actividad Sospechosa (25 puntos)
    try:
        suspicious = results['suspicious_activity']
        if suspicious is not None:
            if suspicious.count() > 0:
                score += 25
                feedback.append("✅ Test 2: Actividad sospechosa detectada (25/25)")
            else:
                score += 10
                feedback.append("⚠️ Test 2: No se detectó actividad sospechosa (10/25)")
        else:
            feedback.append("❌ Test 2: suspicious_activity es None (0/25)")
    except Exception as e:
        feedback.append(f"❌ Test 2: Error - {str(e)} (0/25)")
    
    # Test 3: Métricas de Endpoints (25 puntos)
    try:
        endpoint_metrics = results['endpoint_metrics']
        if endpoint_metrics is not None:
            if endpoint_metrics.count() > 0:
                score += 25
                feedback.append("✅ Test 3: Métricas de endpoints calculadas (25/25)")
            else:
                feedback.append("❌ Test 3: Métricas vacías (0/25)")
        else:
            feedback.append("❌ Test 3: endpoint_metrics es None (0/25)")
    except Exception as e:
        feedback.append(f"❌ Test 3: Error - {str(e)} (0/25)")
    
    # Test 4: Performance (25 puntos)
    elapsed = show_time_elapsed()
    if elapsed <= TIME_LIMIT_MINUTES * 60:
        score += 25
        feedback.append(f"✅ Test 4: Completado en tiempo ({elapsed:.1f}s) (25/25)")
    else:
        penalty = min(25, int((elapsed - TIME_LIMIT_MINUTES * 60) / 60) * 5)
        score += max(0, 25 - penalty)
        feedback.append(f"⚠️ Test 4: Tiempo excedido - penalización (20/25)")
    
    return score, feedback

# Ejecutar validación
print("\n" + "="*50)
print("📊 RESULTADOS DE VALIDACIÓN")
print("="*50 + "\n")

score, feedback = validate_solution(results)

for item in feedback:
    print(item)

print("\n" + "="*50)
print(f"🎯 PUNTUACIÓN FINAL: {score}/100")
print("="*50)

if score >= 90:
    print("\n🏆 ¡EXCELENTE! Solución casi perfecta")
elif score >= 70:
    print("\n✅ ¡BIEN! Solución correcta con mejoras menores")
elif score >= 50:
    print("\n⚠️ ACEPTABLE - Revisa los tests fallidos")
else:
    print("\n❌ NECESITA MEJORAS - Revisa tu implementación")

# Mostrar resultados si existen
print("\n" + "="*50)
print("📋 VISTA PREVIA DE RESULTADOS")
print("="*50 + "\n")

if results['top_ips'] is not None:
    print("Top 10 IPs:")
    results['top_ips'].show(10)

if results['suspicious_activity'] is not None:
    print("\nActividad Sospechosa:")
    results['suspicious_activity'].show(10)

if results['endpoint_metrics'] is not None:
    print("\nMétricas por Endpoint:")
    results['endpoint_metrics'].show()