# Procesamiento de Logs en Tiempo Real

A continuación creearemos una simulación de cómo funcionaría esta arquitectura de priceamiento de logs en tiempo real. Para ello, utilizaremos un generador de logs que simulará los logs y cómo serán procesados bajo una lista de eventos simulados de seguridad.

In [21]:
import random
import time
import json

Creamos una lista de eventos de seguridad que se simularán en el generador de logs:

In [22]:
eventos = [
    'Inicio de sesión exitoso',
    'Intento de acceso no autorizado',
    'Escaneo de puertos detectado',
    'Malware encontrado',
    'Conexión SSH establecida',
    'Firewall activado',
    'Actualización de software',
    'Anomalía en el tráfico de red',
    'Usuario creado',
    'Error de autenticación'
]

Definimos la función `asignar_nivel` que asigna un nivel de severidad a cada evento de seguridad.

In [23]:
def asignar_nivel(evento):
    if evento in [
        'Intento de acceso no autorizado',
        'Escaneo de puertos detectado',
        'Malware encontrado',
        'Anomalía en el tráfico de red',
        'Error de autenticación'
    ]:
        return 'Alerta'
    else:
        return 'Información'

Con la función `generar_log` se simularán los logs generados por el sistema de manera alearoria, agarrando un evento de la lista de eventos y creando un objeto log con la información del evento y el nivel de severidad asignado.

In [24]:
def generar_log():
    evento = random.choice(eventos)
    log = {
        'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'),
        'evento': evento,
        'nivel': asignar_nivel(evento)
    }
    return log

Con un ciclo, generamos 20 logs, los cuales serán agregados a una lista. Cada log será generado con un intervalo de 1 ms.

In [25]:
logs = []
for _ in range(20):
    log = generar_log()
    logs.append(log)
    time.sleep(0.1)

Se imprime en consola todos los logs generados.

In [26]:
print("Logs generados:")
for log in logs:
    print(json.dumps(log, ensure_ascii=False))

Logs generados:
{"timestamp": "2024-10-14 17:22:16", "evento": "Escaneo de puertos detectado", "nivel": "Alerta"}
{"timestamp": "2024-10-14 17:22:16", "evento": "Conexión SSH establecida", "nivel": "Información"}
{"timestamp": "2024-10-14 17:22:16", "evento": "Actualización de software", "nivel": "Información"}
{"timestamp": "2024-10-14 17:22:16", "evento": "Firewall activado", "nivel": "Información"}
{"timestamp": "2024-10-14 17:22:17", "evento": "Actualización de software", "nivel": "Información"}
{"timestamp": "2024-10-14 17:22:17", "evento": "Escaneo de puertos detectado", "nivel": "Alerta"}
{"timestamp": "2024-10-14 17:22:17", "evento": "Escaneo de puertos detectado", "nivel": "Alerta"}
{"timestamp": "2024-10-14 17:22:17", "evento": "Error de autenticación", "nivel": "Alerta"}
{"timestamp": "2024-10-14 17:22:17", "evento": "Conexión SSH establecida", "nivel": "Información"}
{"timestamp": "2024-10-14 17:22:17", "evento": "Inicio de sesión exitoso", "nivel": "Información"}
{"timesta

Definimos la función `procesar_logs` que simula el procesamiento de logs en tiempo real. Esta función recibe una lista de logs y los procesa. y determina si alguno de estos logs tiene un nivel de alerta, en cuyo caso se registra en una lista.

In [27]:
def procesar_logs(logs):
    amenazas_detectadas = []
    for log in logs:
        if log['nivel'] == 'Alerta':
            amenazas_detectadas.append(log)
    return amenazas_detectadas

amenazas = procesar_logs(logs)

Imprimimos todas las amenazas detectadas.

In [28]:
print("\nAmenazas detectadas:")
for amenaza in amenazas:
    print(json.dumps(amenaza, ensure_ascii=False))


Amenazas detectadas:
{"timestamp": "2024-10-14 17:22:16", "evento": "Escaneo de puertos detectado", "nivel": "Alerta"}
{"timestamp": "2024-10-14 17:22:17", "evento": "Escaneo de puertos detectado", "nivel": "Alerta"}
{"timestamp": "2024-10-14 17:22:17", "evento": "Escaneo de puertos detectado", "nivel": "Alerta"}
{"timestamp": "2024-10-14 17:22:17", "evento": "Error de autenticación", "nivel": "Alerta"}
{"timestamp": "2024-10-14 17:22:17", "evento": "Malware encontrado", "nivel": "Alerta"}
{"timestamp": "2024-10-14 17:22:17", "evento": "Anomalía en el tráfico de red", "nivel": "Alerta"}
{"timestamp": "2024-10-14 17:22:18", "evento": "Malware encontrado", "nivel": "Alerta"}


Simulamos el almacenamiento de los reportes en S3, creando un archivo json local con los reportes.

In [29]:
with open('logs_simulados.json', 'w', encoding='utf-8') as f:
    json.dump(logs, f, ensure_ascii=False, indent=4)

In [30]:
with open('amenazas_detectadas.json', 'w', encoding='utf-8') as f:
    json.dump(amenazas, f, ensure_ascii=False, indent=4)

Simulamos el envío de las notificaciones al encontrar una amenaza con el reporte de seguridad

In [31]:
def enviar_notificaciones(amenazas):
    for amenaza in amenazas:
        mensaje = f"[ALERTA] {amenaza['evento']} detectado en {amenaza['timestamp']}"
        print(f"Enviando notificación: {mensaje}")

enviar_notificaciones(amenazas)

Enviando notificación: [ALERTA] Escaneo de puertos detectado detectado en 2024-10-14 17:22:16
Enviando notificación: [ALERTA] Escaneo de puertos detectado detectado en 2024-10-14 17:22:17
Enviando notificación: [ALERTA] Escaneo de puertos detectado detectado en 2024-10-14 17:22:17
Enviando notificación: [ALERTA] Error de autenticación detectado en 2024-10-14 17:22:17
Enviando notificación: [ALERTA] Malware encontrado detectado en 2024-10-14 17:22:17
Enviando notificación: [ALERTA] Anomalía en el tráfico de red detectado en 2024-10-14 17:22:17
Enviando notificación: [ALERTA] Malware encontrado detectado en 2024-10-14 17:22:18
