In [None]:
import os
import shutil
import time
import threading
import glob
import sys

# --- 1. Configuración ---
try:
    import pyspark
except ImportError:
    print("Instalando PySpark...")
    !pip install pyspark
    !apt-get install openjdk-17-jdk -qq > /dev/null

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-17-openjdk-amd64"
os.environ["PYSPARK_PYTHON"] = "python"
os.environ["PYSPARK_DRIVER_PYTHON"] = "python"

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# --- 2. Limpieza Total ---
try:
    from pyspark.streaming import StreamingContext
    active_ssc = StreamingContext.getActive()
    if active_ssc is not None:
        active_ssc.stop(stopSparkContext=True, stopGracefully=False)
except:
    pass

try:
    SparkContext.getOrCreate().stop()
except:
    pass

# --- 3. Preparación de Directorios ---
BASE_DIR = "/content"
SOURCE_DIR = os.path.join(BASE_DIR, "origen_datos")
INPUT_DIR = os.path.join(BASE_DIR, "datos_streaming")
CHECKPOINT_DIR = "./checkpoint_tp3"

if os.path.exists(CHECKPOINT_DIR): shutil.rmtree(CHECKPOINT_DIR)
if os.path.exists(INPUT_DIR): shutil.rmtree(INPUT_DIR)
os.makedirs(INPUT_DIR)

if not os.path.exists(SOURCE_DIR):
    os.makedirs(SOURCE_DIR)
    print(f"Se creó el directorio {SOURCE_DIR}")
    print(">>> Subí los archivos ahí y volvé a ejecutar.")

# --- 4. Lógica Spark Streaming ---

BATCH_INTERVAL = 5
A = 1.0

sc = SparkContext("local[*]", "TP3_Final_Fix")
ssc = StreamingContext(sc, BATCH_INTERVAL)
ssc.checkpoint(CHECKPOINT_DIR)

# Hacemos que Spark mire la carpeta
stream = ssc.textFileStream(INPUT_DIR)

def parsear_linea(linea):
    try:
        parts = linea.split("\t")
        if len(parts) >= 3:
            return (int(parts[1]), float(parts[2]))
        return None
    except:
        return None

def actualizar_state(nuevos_valores, estado_anterior):
    score = estado_anterior if estado_anterior is not None else 0.0
    promedio_ventana = sum(nuevos_valores) if nuevos_valores else 0.0

    # Se descuenta A en todas las ventanas, haya combate o no

    nuevo_score = score + promedio_ventana - A
    return nuevo_score

# Pipeline
parsed = stream.map(parsear_linea).filter(lambda x: x is not None)

# Promedio local por ventana
promedios_ventana = parsed.mapValues(lambda x: (x, 1))\
    .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))\
    .mapValues(lambda v: v[0] / v[1])

# Actualización Global
scores_globales = promedios_ventana.updateStateByKey(actualizar_state)

# Ordenar Descendente
scores_ordenados = scores_globales.transform(
    lambda rdd: rdd.sortBy(lambda x: x[1], ascending=False)
)

# Mostrar
scores_ordenados.pprint(num=100)

# --- 5. INYECTOR ---
# El inyector busca simular la llegada de datos de manera progresiva, copia uno a uno los archivos del dataset hacia el directorio de procesamiento
def mover_archivos_uno_a_uno():
    time.sleep(5)


    while(True):
      with os.scandir(SOURCE_DIR) as it:
        for i, archivo in enumerate(it):
          if archivo.is_file():
            nombre = os.path.basename(archivo)
            destino = os.path.join(INPUT_DIR, nombre)

            # 1. Copia del archivo
            shutil.move(archivo, destino)

            # 2. Se actualiza la fecha de última modificación de los archivos, de esta manera Spark cree que son nuevos (la fecha debe ser posterior a la ejecución)"
            os.utime(destino, None)

            print(f">>> [MOVIDO] Archivo '{nombre}' ingresado.")

            time.sleep(4)

hilo = threading.Thread(target=mover_archivos_uno_a_uno)
hilo.start()

# --- 6. Ejecución ---
print(">>> Spark Streaming iniciado...")
ssc.start()

# Dejar descomentada una sola forma:

#ssc.awaitTermination() # Ejecución infinita

try:
    time.sleep(300) # Cuánto tiempo durará la ejecución
except KeyboardInterrupt:
    pass
finally:
    ssc.stop(stopSparkContext=True)
    print(">>> Fin.")