In [14]:
import gc  # Garbage Collector - Para gestión manual de memoria y liberación de recursos
import re  # Expresiones Regulares - Para manipulación y búsqueda de patrones en texto
import time  # Funciones de tiempo - Para medición de ejecución, delays y timestamps
import logging  # Sistema de logging - Para registro y monitorización de eventos de la aplicación
import ipaddress  # Manipulación de direcciones IP - Para validación y operaciones con IPs
import numpy as np  # NumPy - Librería fundamental para computación científica con arrays
import pandas as pd  # Pandas - Manipulación y análisis de datos estructurados (DataFrames)
from tqdm import tqdm  # Barras de progreso - Para mostrar progreso en loops iterativos
import geoip2.database  # GeoIP2 - Para geolocalización de direcciones IP
from datetime import datetime  # Manipulación de fechas y horas - Para manejo de timestamps
from sqlalchemy import create_engine  # SQLAlchemy - ORM y conexión a bases de datos SQL
from data.config import DB_CONNECTIONS  # Configuración personalizada - Conexiones a BD del proyecto
from ua_parser import user_agent_parser  # User Agent Parser - Para analizar strings de user agents

from Modulos.update_dimensions_table import actualizarTablaDimension
import sys
sys.path.append('..')

from data.config import DB_CONNECTIONS

# ----------------------------- SCRIPT HIPER-MEGA-ULTRA-OPTIMIZADO ------------------------------

In [3]:
# logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

def liberar_memoria(*variables):
    """Libera memoria de variables"""
    for var in variables:
        if var is not None:
            del var
    gc.collect()
    
def mostrar_uso_memoria():
    """Muestra uso actual de memoria"""
    import psutil
    import os
    process = psutil.Process(os.getpid())
    memory_mb = process.memory_info().rss / 1024 / 1024
    logger.info(f"Memoria en uso: {memory_mb:.1f} MB")

def procesar_archivo_log(filepath, chunk_size=100000):
    """Procesa archivo de log completo sin límite de líneas"""
    logger.info("=" * 80)
    logger.info("                  PROCESAMIENTO DE LOGS")
    logger.info("=" * 80)
    logger.info(f"Archivo: {filepath}")
    logger.info(f"Tamaño de chunk: {chunk_size:,}")
    mostrar_uso_memoria()
    
    colz = ["IP", "timestamp", "status code", "bytes sent", "user agent"]
    all_chunks = []
    start_time = time.time()
    
    with open(filepath, 'r') as f:
        lines_processed = 0
        chunk_count = 0
        
        while True:
            li = []
            chunk_start_time = time.time()
            
            # Procesar chunk
            for _ in range(chunk_size):
                try:
                    x = f.readline()
                    if not x:  # EOF
                        break
                    log = x.split(" ")
                    if len(log) < 12:
                        continue
                    li.append([log[0], log[3].strip('['), int(log[8]), int(log[9]), ' '.join(log[11:-1]).strip('"')])
                    lines_processed += 1
                except Exception as e:
                    continue
            
            if not li:  # No más datos
                logger.info("Fin del archivo alcanzado")
                break
            
            # Crear DataFrame del chunk
            chunk_df = pd.DataFrame(li, columns=colz)
            chunk_df["timestamp"] = pd.to_datetime(chunk_df["timestamp"], format='%d/%b/%Y:%H:%M:%S')
            all_chunks.append(chunk_df)
            
            chunk_count += 1
            chunk_time = time.time() - chunk_start_time
            
            # Log cada 10 chunks para reducir verbosidad
            if chunk_count % 10 == 0 or chunk_count == 1:
                logger.info(f"Procesados {chunk_count} chunks | {lines_processed:,} líneas | {chunk_time:.2f}s último chunk")
                
                # Estimación de velocidad
                if lines_processed > 0:
                    elapsed_time = time.time() - start_time
                    speed = lines_processed / elapsed_time
                    logger.info(f"Velocidad promedio: {speed:.0f} líneas/segundo")
            
            # Liberar memoria del chunk individual
            liberar_memoria(li, chunk_df)
    
    # Combinar todos los chunks
    if all_chunks:
        logger.info("Combinando todos los chunks...")
        combine_start = time.time()
        logs_df = pd.concat(all_chunks, ignore_index=True)
        combine_time = time.time() - combine_start
        
        # Liberar memoria de chunks individuales
        liberar_memoria(all_chunks)
        
        total_time = time.time() - start_time
        logger.info("=" * 60)
        logger.info("                  PROCESAMIENTO COMPLETADO")
        logger.info("=" * 60)
        logger.info(f"Tiempo de combinación: {combine_time:.2f}s")
        logger.info(f"Tiempo total: {total_time/60:.2f} minutos")
        logger.info(f"Total de registros: {len(logs_df):,}")
        logger.info(f"Memoria del DataFrame: {logs_df.memory_usage(deep=True).sum()/1024**2:.2f} MB")
        mostrar_uso_memoria()
        
        return logs_df
    else:
        logger.warning("No se procesaron datos válidos")
        return pd.DataFrame(columns=colz)

def parse_ua(ua):
    """Parse para navegador y dispositivo"""
    try:
        parsed = user_agent_parser.Parse(str(ua))
        return {
            "nombre": parsed.get("user_agent", {}).get("family") or "desconocido",
            "so": parsed.get("os", {}).get("family") or "desconocido",
            "marca": parsed.get("device", {}).get("brand") or "desconocido",
            "modelo": parsed.get("device", {}).get("model") or "desconocido"
        }
    except Exception:
        return {"nombre": "desconocido", "so": "desconocido", "marca": "desconocido", "modelo": "desconocido"}

def cargar_DW(logs_df, engine_cubo):
    """Carga todas las dimensiones y tabla de hechos"""
    
    logger.info("=" * 80)
    logger.info("                  CARGA DE DIMENSIONES Y TABLA DE HECHOS")
    logger.info("=" * 80)
    
    # -----------------------------------------------------------
    # Dimensión d_navegador
    # -----------------------------------------------------------
    logger.info("─" * 60)
    logger.info("                  Procesando dimensión NAVEGADOR")
    logger.info("─" * 60)
    navegador_start = time.time()
    
    unique_uas = logs_df["user agent"].dropna().astype(str).unique()
    logger.info(f"Parseando {len(unique_uas):,} user agents únicos...")

    parsed_list = [parse_ua(ua) | {"user agent": ua} for ua in tqdm(unique_uas, desc="Parsing UA")]
    ua_df = pd.DataFrame(parsed_list)

    # merge con logs_df para traer columnas ya parseadas
    logs_df = logs_df.merge(ua_df, on="user agent", how="left")
    
    d_navegador = ua_df[["nombre"]].drop_duplicates().reset_index(drop=True)
    d_navegador.insert(0, "id_navegador", range(1, len(d_navegador) + 1))
    
    logger.info(f"Dimensión navegador creada: {len(d_navegador)} registros únicos")
    d_navegador = actualizarTablaDimension(engine_cubo, "d_navegador", d_navegador, pk="id_navegador")
    
    navegador_time = time.time() - navegador_start
    logger.info(f"Dimensión NAVEGADOR completada en {navegador_time:.2f}s")
    
    # Liberar memoria
    liberar_memoria(unique_uas, parsed_list)

    # -----------------------------------------------------------
    # Dimensión d_dispositivo
    # -----------------------------------------------------------
    logger.info("─" * 60)
    logger.info("                  Procesando dimensión DISPOSITIVO")
    logger.info("─" * 60)
    dispositivo_start = time.time()
    
    d_dispositivo = ua_df[["so", "marca", "modelo"]].drop_duplicates().reset_index(drop=True)
    d_dispositivo.insert(0, "id_dispositivo", range(1, len(d_dispositivo) + 1))
    
    # Limpiar modelos
    logger.info("Aplicando limpieza de modelos...")
    d_dispositivo['modelo'] = d_dispositivo['modelo'].apply(lambda x: x.split(' ', 1)[1] if x.lower().startswith('rola ') else x)
    d_dispositivo['modelo'] = d_dispositivo['modelo'].apply(lambda x: x.split('Build')[0].strip() if 'Build' in x else x)
    
    logger.info(f"Dimensión dispositivo creada: {len(d_dispositivo)} dispositivos únicos")
    d_dispositivo = actualizarTablaDimension(engine_cubo, "d_dispositivo", d_dispositivo, pk="id_dispositivo")
    
    dispositivo_time = time.time() - dispositivo_start
    logger.info(f"Dimensión DISPOSITIVO completada en {dispositivo_time:.2f}s")
    
    # Liberar ua_df ya que no se necesita más
    liberar_memoria(ua_df)

    # -----------------------------------------------------------
    # Dimensión d_estadoserver
    # -----------------------------------------------------------
    logger.info("─" * 60)
    logger.info("                  Procesando dimensión ESTADO SERVER")
    logger.info("─" * 60)
    estado_start = time.time()
    
    # Diccionario de status codes
    status_descriptions = {
        # Respuestas informativas (100-199)
        100: "Continue",
        101: "Switching Protocols",
        102: "Processing",
        103: "Early Hints",
        
        # Respuestas exitosas (200-299)
        200: "OK",
        201: "Created",
        202: "Accepted",
        203: "Non-Authoritative Information",
        204: "No Content",
        205: "Reset Content",
        206: "Partial Content",
        207: "Multi-Status",
        208: "Already Reported",
        226: "IM Used",
        
        # Redirecciones (300-399)
        300: "Multiple Choices",
        301: "Moved Permanently",
        302: "Found",
        303: "See Other",
        304: "Not Modified",
        305: "Use Proxy",
        307: "Temporary Redirect",
        308: "Permanent Redirect",
        
        # Errores del cliente (400-499)
        400: "Bad Request",
        401: "Unauthorized",
        402: "Payment Required",
        403: "Forbidden",
        404: "Not Found",
        405: "Method Not Allowed",
        406: "Not Acceptable",
        407: "Proxy Authentication Required",
        408: "Request Timeout",
        409: "Conflict",
        410: "Gone",
        411: "Length Required",
        412: "Precondition Failed",
        413: "Payload Too Large",
        414: "URI Too Long",
        415: "Unsupported Media Type",
        416: "Range Not Satisfiable",
        417: "Expectation Failed",
        418: "I'm a teapot",
        421: "Misdirected Request",
        422: "Unprocessable Entity",
        423: "Locked",
        424: "Failed Dependency",
        425: "Too Early",
        426: "Upgrade Required",
        428: "Precondition Required",
        429: "Too Many Requests",
        431: "Request Header Fields Too Large",
        451: "Unavailable For Legal Reasons",
        499: "Client Closed Request",
        
        # Errores del servidor (500-599)
        500: "Internal Server Error",
        501: "Not Implemented",
        502: "Bad Gateway",
        503: "Service Unavailable",
        504: "Gateway Timeout",
        505: "HTTP Version Not Supported",
        506: "Variant Also Negotiates",
        507: "Insufficient Storage",
        508: "Loop Detected",
        510: "Not Extended",
        511: "Network Authentication Required"
    }
    
    
    d_estadoserver = pd.DataFrame(logs_df["status code"].unique(), columns=["id_estadoserver"])
    d_estadoserver["descripcion"] = d_estadoserver["id_estadoserver"].map(status_descriptions).fillna("Unknown")
    d_estadoserver["tipo"] = pd.cut(d_estadoserver["id_estadoserver"], 
                            bins=[0, 200, 300, 400, 500, 600, float('inf')],
                            labels=["Informativo", "Exitoso", "Redirección", 
                                   "Error Cliente", "Error Servidor", "Otro"],
                            right=False)
    
    d_estadoserver = d_estadoserver.sort_values("id_estadoserver").reset_index(drop=True)
    
    logger.info(f"Dimensión estado server creada: {len(d_estadoserver)} códigos únicos")
    d_estadoserver = actualizarTablaDimension(engine_cubo, "d_estadoserver", d_estadoserver, pk="id_estadoserver")
    
    estado_time = time.time() - estado_start
    logger.info(f"Dimensión ESTADO SERVER completada en {estado_time:.2f}s")
    
    # -----------------------------------------------------------
    # Dimensión d_ubicacion
    # -----------------------------------------------------------
    logger.info("─" * 60)
    logger.info("                  Procesando dimensión UBICACIÓN")
    logger.info("─" * 60)
    ubicacion_start = time.time()
    
    logs_df['IP'] = logs_df['IP'].astype(str).str.strip()
    unique_ips = logs_df['IP'].unique()
    logger.info(f"Geolocalizando {len(unique_ips):,} IPs únicas...")
    
    reader = geoip2.database.Reader('../data/GeoLite2-City.mmdb')
    
    geo_data = []
    for ip in tqdm(unique_ips, desc="Geolocalizando"):
        try:
            response = reader.city(ip)
            geo_data.append({
                'IP': ip,
                'continente': response.continent.name or 'Privada/Local',
                'pais': response.country.name or 'Privada/Local',
                'ciudad': response.city.name or 'Privada/Local',
                'latitud': response.location.latitude,
                'longitud': response.location.longitude
            })
        except:
            geo_data.append({
                'IP': ip,
                'continente': 'Privada/Local',
                'pais': 'Privada/Local',
                'ciudad': 'Privada/Local',
                'latitud': None,
                'longitud': None
            })
    
    reader.close()
    
    # Merge con logs_df
    geo_df = pd.DataFrame(geo_data)
    logs_df = logs_df.merge(geo_df, on='IP')
    
    d_ubicacion = logs_df[["continente", "pais", "ciudad", "latitud", "longitud"]].drop_duplicates().reset_index(drop=True)
    d_ubicacion.insert(0, "id_ubicacion", range(1, len(d_ubicacion) + 1))
    
    logger.info(f"Dimensión ubicación creada: {len(d_ubicacion)} ubicaciones únicas")
    d_ubicacion = actualizarTablaDimension(engine_cubo, "d_ubicacion", d_ubicacion, pk="id_ubicacion")
    
    ubicacion_time = time.time() - ubicacion_start
    logger.info(f"Dimensión UBICACIÓN completada en {ubicacion_time/60:.2f} minutos")
    
    # Liberar memoria de geolocalización
    liberar_memoria(unique_ips, geo_data, geo_df)
    
    # -----------------------------------------------------------
    # Dimensión d_tiempo
    # -----------------------------------------------------------
    logger.info("─" * 60)
    logger.info("                  Procesando dimensión TIEMPO")
    logger.info("─" * 60)
    tiempo_start = time.time()
    
    logs_df["mes"] = logs_df["timestamp"].dt.month
    logs_df["dia"] = logs_df["timestamp"].dt.day
    
    d_tiempo = logs_df[["mes", "dia"]].drop_duplicates().sort_values(["mes", "dia"]).reset_index(drop=True)
    d_tiempo.insert(0, "id_tiempo", range(1, len(d_tiempo) + 1))
    
    logger.info(f"Dimensión tiempo creada: {len(d_tiempo)} fechas únicas")
    d_tiempo = actualizarTablaDimension(engine_cubo, 'd_tiempo', d_tiempo, pk='id_tiempo')
    
    tiempo_time = time.time() - tiempo_start
    logger.info(f"Dimensión TIEMPO completada en {tiempo_time:.2f}s")
    
    # -----------------------------------------------------------
    # Liberación de memoria
    # -----------------------------------------------------------
    logger.info("Liberando memoria de columnas no necesarias...")
    memory_before = logs_df.memory_usage(deep=True).sum() / 1024**2
    logs_df = logs_df.drop(columns=["IP", "timestamp", "continente", "pais", "ciudad", "user agent"])
    memory_after = logs_df.memory_usage(deep=True).sum() / 1024**2
    logger.info(f"Memoria liberada: {memory_before - memory_after:.2f} MB")
    
    # -----------------------------------------------------------
    # Tabla de Hechos
    # -----------------------------------------------------------
    logger.info("─" * 60)
    logger.info("                  Creando TABLA DE HECHOS")
    logger.info("─" * 60)
    hechos_start = time.time()
    
    # Merges
    logger.info("Realizando joins con dimensiones...")
    logs_df = logs_df.merge(d_navegador, on=["nombre"])
    logs_df = logs_df.merge(d_dispositivo, on=["so", "marca", "modelo"])
    logs_df = logs_df.merge(d_ubicacion, on=["latitud", "longitud"])
    logs_df = logs_df.merge(d_tiempo, on=["mes", "dia"])
    logs_df = logs_df.rename(columns={"status code": "id_estadoserver"})
    
    logger.info("Calculando métricas agregadas...")
    hechos = logs_df.groupby(
        ["id_tiempo", "id_ubicacion", "id_dispositivo", "id_navegador", "id_estadoserver"]
    ).agg(
        cant_pais_dia=("pais", "nunique"),
        cant_navegador_dia=("id_navegador", "nunique"),
        cant_dispositivo_dia=("id_dispositivo", "nunique"),
        cant_bytes_dispositivo=("bytes sent", "sum"),
        cant_estadoserver_dia=("id_estadoserver", "nunique")
    ).reset_index()
    
    hechos["id_hecho"] = hechos.index + 1
    
    logger.info(f"Tabla de hechos creada: {len(hechos):,} registros agregados")
    actualizarTablaDimension(engine_cubo, 'factable', hechos, pk='id_hecho')
    
    hechos_time = time.time() - hechos_start
    logger.info(f"Tabla de hechos cargada en {hechos_time:.2f}s")
    
    # Liberación final de memoria
    logger.info("Liberando memoria...")
    liberar_memoria(d_navegador, d_dispositivo, d_estadoserver, d_ubicacion, d_tiempo, logs_df, hechos)
    mostrar_uso_memoria()
    
    logger.info("=" * 80)
    logger.info("                    PROCESO ETL FINALIZADO")
    logger.info("=" * 80)

# Establecer conexiones a las bases de datos
engine_cubo = create_engine(DB_CONNECTIONS["engine_cubo"])

# Ejecución en Jupyter
filepath = "../logs/access_ssl_20230404.log"

# Procesar archivo
logs_df = procesar_archivo_log(filepath, chunk_size=100000)

# Crear dimensiones y hechos
cargar_DW(logs_df, engine_cubo)

# Liberación final completa
logger.info("Memoria final liberada")
liberar_memoria(logs_df)

2025-09-19 02:40:47,388 - INFO -                   PROCESAMIENTO DE LOGS
2025-09-19 02:40:47,392 - INFO - Archivo: ../logs/access.log
2025-09-19 02:40:47,394 - INFO - Tamaño de chunk: 100,000
2025-09-19 02:40:47,396 - INFO - Memoria en uso: 166.3 MB
2025-09-19 02:40:48,497 - INFO - Procesados 1 chunks | 99,997 líneas | 1.10s último chunk
2025-09-19 02:40:48,498 - INFO - Velocidad promedio: 90874 líneas/segundo
2025-09-19 02:40:57,261 - INFO - Procesados 10 chunks | 999,985 líneas | 0.82s último chunk
2025-09-19 02:40:57,262 - INFO - Velocidad promedio: 101371 líneas/segundo
2025-09-19 02:41:08,390 - INFO - Procesados 20 chunks | 1,999,964 líneas | 0.92s último chunk
2025-09-19 02:41:08,391 - INFO - Velocidad promedio: 95266 líneas/segundo
2025-09-19 02:41:18,442 - INFO - Procesados 30 chunks | 2,999,916 líneas | 0.83s último chunk
2025-09-19 02:41:18,443 - INFO - Velocidad promedio: 96629 líneas/segundo
2025-09-19 02:41:28,407 - INFO - Procesados 40 chunks | 3,999,887 líneas | 0.86s úl