In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, window, desc, lag, avg, stddev, min, max, sum, approx_count_distinct
from pyspark.sql.window import Window
from kafka import KafkaProducer
from kafka import KafkaConsumer

import os
import csv
import random
import datetime
import json
import time

In [5]:
KAFKA_TOPIC = 'logs'
KAFKA_BROKER = 'localhost:9093'
LOG_FOLDER = 'logs'
GENERAL_LOG_FILE = os.path.join(LOG_FOLDER, 'kafka_logs.csv')

# Crear carpeta si no existe
if not os.path.exists(LOG_FOLDER):
    os.makedirs(LOG_FOLDER)

file_exists = os.path.isfile(GENERAL_LOG_FILE)
general_file = open(GENERAL_LOG_FILE, mode='a', newline='')
general_writer = csv.writer(general_file)
if not file_exists:
    general_writer.writerow(['timestamp', 'event_type', 'user', 'ip', 'message'])

# Diccionario para archivos por tipo de evento
event_files = {}
event_writers = {}

# Función para abrir (y crear si es necesario) archivo por tipo de evento
def get_event_writer(event_type):
    if event_type not in event_writers:
        file_path = os.path.join(LOG_FOLDER, f"{event_type}.csv")
        is_new = not os.path.exists(file_path)
        f = open(file_path, mode='a', newline='')
        writer = csv.writer(f)
        if is_new:
            writer.writerow(['timestamp', 'event_type', 'user', 'ip', 'message'])
        event_files[event_type] = f
        event_writers[event_type] = writer
    return event_writers[event_type]

# Consumidor Kafka
consumer = KafkaConsumer(
    KAFKA_TOPIC,
    bootstrap_servers=KAFKA_BROKER,
    auto_offset_reset='latest',
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

print(f"Escuchando mensajes en Kafka topic '{KAFKA_TOPIC}'...")

try:
    for message in consumer:
        log = message.value
        print("Recibido:", log)

        # Escribir en el log general
        general_writer.writerow([
            log['timestamp'],
            log['event_type'],
            log['user'],
            log['ip'],
            log['message']
        ])
        general_file.flush()

        # Escribir en el archivo por tipo de evento
        event_writer = get_event_writer(log['event_type'])
        event_writer.writerow([
            log['timestamp'],
            log['event_type'],
            log['user'],
            log['ip'],
            log['message']
        ])
        event_files[log['event_type']].flush()

except KeyboardInterrupt:
    print("Consumidor detenido por el usuario")
finally:
    general_file.close()
    consumer.close()
    for f in event_files.values():
        f.close()

Escuchando mensajes en Kafka topic 'logs'...
Recibido: {'timestamp': '2025-04-06 17:08:40', 'event_type': 'ERROR', 'user': 'guest', 'ip': '8.8.8.8', 'message': 'Error critico en el sistema detectado por el usuario guest'}
Recibido: {'timestamp': '2025-04-06 17:08:42', 'event_type': 'INFO', 'user': 'user1', 'ip': '172.16.0.2', 'message': 'Evento normal registrado para usuario user1 desde 172.16.0.2'}
Recibido: {'timestamp': '2025-04-06 17:08:43', 'event_type': 'LOGIN_FAILURE', 'user': 'admin', 'ip': '192.168.1.10', 'message': 'Intento de acceso fallido para usuario admin desde 192.168.1.10'}
Recibido: {'timestamp': '2025-04-06 17:08:44', 'event_type': 'INFO', 'user': 'guest', 'ip': '10.0.0.1', 'message': 'Evento normal registrado para usuario guest desde 10.0.0.1'}
Recibido: {'timestamp': '2025-04-06 17:08:45', 'event_type': 'INFO', 'user': 'guest', 'ip': '172.16.0.2', 'message': 'Evento normal registrado para usuario guest desde 172.16.0.2'}
Recibido: {'timestamp': '2025-04-06 17:08:46