In [None]:
# Instalamos Spark para Python
!pip install pyspark

# Instalamos Java SDK 8
import os, sys

# !apt-get install -y openjdk-8-jdk-headless -qq > /dev/null      #install openjdk
# !apt-get update
!apt-get install -y openjdk-8-jdk -qq > /dev/null      #install openjdk
!echo $(/usr/libexec/java_home -v 1.8)
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"     #set environment variable
!echo 2 | update-alternatives --config java
!java -version       #check java version

In [None]:
#Lo configuramos para que lea desde el Drive
from google.colab import drive
drive.mount('/content/drive')

In [195]:
from math import log
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

input_path = '/content/drive/MyDrive/Colab/A3/input/'

# Creamos el contexto de Spark
sc = SparkContext("local", "TP3")

# Crear el StreamingContext para procesar documentos cada 10 segundos
# Estos documentos provienen de otro entorno, donde se mueve un archivo (review) de la carpeta "reviews" a la carpeta de entrada "input" cada 10 segundos.
# Esta configuración asegura que se procese un único documento por cada intervalo de tiempo de 10 segundos.
ssc = StreamingContext(sc, 10)

ssc.checkpoint("/content/drive/MyDrive/Colab/A3/buffer")

In [None]:
# Crear el DStream leyendo archivos de texto en el directorio
streamOriginal = ssc.textFileStream(input_path)

num_documentos = sc.accumulator(0)

# Función para actualizar el estado de los términos
def fUpdate(nuevos_valores, estado_actual):
    tf_acumulado = 0  # Acumulador del TF
    d_t = 0  # D(t)
    num_documentos = 0  # #D

    # Verificar si hay un estado previo para actualizar el estado actual
    if estado_actual:
        tf_acumulado = estado_actual[0]  # Obtener el valor acumulado del TF
        d_t = estado_actual[1]  # Obtener el D(t) del término

    # Verificar si hay nuevos valores para actualizar el estado actual
    if nuevos_valores:
        # Actualizar el TF acumulado sumando los nuevos valores
        tf_acumulado += nuevos_valores[0][0]
        # Incrementar D(t) considerando que esta funcion (fUpdate) se llama una vez por documento
        d_t += 1
        # Obtener la cantidad total de documentos
        num_documentos = nuevos_valores[0][1]
    else:
        # Si no hay nuevos valores, se usa el estado previo para actualizar el contador de documentos
        num_documentos = int(estado_actual[4] or 0) + 1

    # Calcular el IDF del término
    idf = log(float(num_documentos) / float(d_t)) if d_t > 0 else 0
    # Calcular el TF-IDF del término
    tf_idf = float(tf_acumulado) * float(idf)

    # Retornar el estado actualizado del término y sus valores: TF, D(t), IDF(t), TF-IDF, #D
    return (tf_acumulado, d_t, idf, tf_idf, num_documentos)


# Función para calcular el TF-IDF de cada palabra por documento
def calcular_tf_idf(documento):
  global num_documentos

  if documento.isEmpty():
    return

  # Obtener las palabras de cada documento
  palabras_en_documento = documento.map(lambda doc: doc.split())

  # Calcular la frecuencia de cada palabra por documento
  frecuencia_palabras = palabras_en_documento.flatMap(lambda palabras: [(palabra, 1) for palabra in palabras]).reduceByKey(lambda x, y: x + y)

  # Contar la cantidad total de palabras por documento
  total_palabras = palabras_en_documento.map(len).reduce(lambda x, y: x + y)

  # Incrementar el contador de documentos
  num_documentos.add(1)

  d = num_documentos.value

  # Calcular el TF de cada palabra
  tf_y_total_documentos = frecuencia_palabras.map(lambda t: (t[0], (t[1] / total_palabras, d))) # <termino, (TF, #D)>

  return tf_y_total_documentos


# Procesamiento de los datos en streaming
stream = streamOriginal.transform(calcular_tf_idf)
stream.pprint()  # Mostrar resultados

history = stream.updateStateByKey(fUpdate)
history.pprint()  # Mostrar los términos y sus valores TF, D(t), IDF(t), TF-IDF, #D

# Iniciar la recepción de datos y procesamiento
ssc.start()
ssc.awaitTermination()

In [194]:
ssc.stop()