In [1]:
# 1. IMPORTACIÓN DE LIBRERÍAS NECESARIAS
from pyspark.sql import SparkSession
from pyspark.sql.functions import input_file_name, split, element_at, col, udf, monotonically_increasing_id
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
from pyspark.sql.types import DoubleType

In [2]:
# ==============================================================================
# FUNCIÓN 2: Obtener vectores representativos (TF-IDF)
# ==============================================================================

In [3]:
# Iniciar la sesión de Spark (Configuración de memoria para procesar textos grandes)
sesion_spark = (SparkSession.builder
    .appName("Proyecto-TF-IDF-Libros")
    .master("local[*]")
    .config("spark.driver.memory", "8g")
    .config("spark.executor.memory", "8g")
    .config("spark.driver.maxResultSize", "4g")
    .getOrCreate()
)

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/07 15:10:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
# Leer los archivos de texto de la carpeta 'libros' y agregar la ruta del archivo
df_lectura = sesion_spark.read.text("libros/*.txt").withColumn("ruta_archivo", input_file_name())

25/12/07 15:10:52 WARN FileStreamSink: Assume no metadata directory. Error while looking for metadata directory in the path: libros/*.txt.
java.io.FileNotFoundException: File libros/*.txt does not exist
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:917)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1238)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:907)
	at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:462)
	at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:56)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:381)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.org$apache$spark$sql$catalyst$analysis$ResolveDataSource$$loadV1BatchSource(ResolveDataSource.scala:143)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource$$anonfun$

In [5]:
# Procesar el RDD para agrupar todo el texto por archivo (en caso de que venga en múltiples líneas)
rdd_documentos = df_lectura.rdd \
    .map(lambda fila: (fila["ruta_archivo"], fila["value"])) \
    .groupByKey() \
    .map(lambda x: (x[0], " ".join(x[1]))) \
    .toDF(["ruta_archivo", "texto_completo"])

                                                                                

In [6]:
# Tokenización: Convertir el texto en una lista de palabras
tokenizador = Tokenizer(inputCol="texto_completo", outputCol="tokens")
datos_tokenizados = tokenizador.transform(rdd_documentos)

In [7]:
# HashingTF: Calcular la frecuencia de términos (TF)
tf_hashing = HashingTF(inputCol="tokens", outputCol="features_tf", numFeatures=20000)
datos_tf = tf_hashing.transform(datos_tokenizados)

In [8]:
# IDF: Calcular la frecuencia inversa de documento
idf = IDF(inputCol="features_tf", outputCol="features_tfidf")
modelo_idf = idf.fit(datos_tf)
datos_tfidf = modelo_idf.transform(datos_tf)

                                                                                

In [9]:
# Limpiar el nombre del archivo (quitar toda la ruta y dejar solo ejemplo.txt)
datos_tfidf = datos_tfidf.withColumn(
    "nombre_archivo",
    element_at(split("ruta_archivo", "/"), -1)
)

In [10]:
print("--- Muestra de los vectores TF-IDF generados ---")
datos_tfidf.select("nombre_archivo", "features_tfidf").limit(20).show(truncate=50)

--- Muestra de los vectores TF-IDF generados ---
+---------------+--------------------------------------------------+
| nombre_archivo|                                    features_tfidf|
+---------------+--------------------------------------------------+
|  libro_100.txt|(20000,[0,1,2,6,8,9,11,12,13,15,16,17,18,20,22,...|
| libro_1023.txt|(20000,[0,1,2,3,6,9,11,13,15,16,17,18,20,22,23,...|
|  libro_145.txt|(20000,[1,2,4,6,7,8,9,11,12,13,14,15,16,17,18,2...|
| libro_6130.txt|(20000,[2,9,11,13,14,15,16,18,20,23,25,27,29,30...|
| libro_2591.txt|(20000,[2,13,14,15,17,23,32,37,39,45,47,52,54,5...|
| libro_2852.txt|(20000,[2,6,9,13,15,20,22,23,29,30,32,37,42,44,...|
|libro_72679.txt|(20000,[0,15,16,20,22,30,32,39,42,45,47,48,73,7...|
|libro_25162.txt|(20000,[14,15,32,73,78,86,132,160,274,282,364,3...|
| libro_1080.txt|(20000,[15,23,29,42,53,90,120,130,148,157,189,2...|
|  libro_135.txt|(20000,[0,2,5,7,9,11,12,13,15,16,17,18,20,21,22...|
|libro_28054.txt|(20000,[1,2,13,14,15,16,17,18,20,22,2

In [11]:
# ==============================================================================
# PUNTO 3: Obtener matriz de similitudes (Similitud del Coseno)
# ==============================================================================

In [12]:
# Indexar los datos para poder comparar libros (ID único)
datos_indexados = datos_tfidf.select("nombre_archivo", "features_tfidf") \
                             .withColumn("id_unico", monotonically_increasing_id())

In [13]:
# Producto cartesiano (comparar todos contra todos)
cruce_libros = datos_indexados.alias("libro_a").crossJoin(datos_indexados.alias("libro_b"))

In [14]:
# Funciones definidas por el usuario (UDF) para matemáticas de vectores
# Calcular producto punto
udf_producto_punto = udf(lambda x, y: float(x.dot(y)), DoubleType())

In [15]:
# Calcular norma (magnitud del vector)
udf_norma = udf(lambda x: float(x.norm(2)), DoubleType())

In [16]:
# Calcular la similitud del coseno: (A . B) / (||A|| * ||B||)
df_similitud = cruce_libros.withColumn(
    "producto_punto", udf_producto_punto("libro_a.features_tfidf", "libro_b.features_tfidf")
).withColumn(
    "norma_a", udf_norma("libro_a.features_tfidf")
).withColumn(
    "norma_b", udf_norma("libro_b.features_tfidf")
).withColumn(
    "similitud_coseno",
    (col("producto_punto") / (col("norma_a") * col("norma_b")))
)

In [17]:
# Seleccionar columnas relevantes para el resultado inicial
resultado_preliminar = df_similitud.select(
    col("libro_a.nombre_archivo").alias("doc1"),
    col("libro_b.nombre_archivo").alias("doc2"),
    "similitud_coseno"
)

In [18]:
# Mostrar los más similares (incluyendo duplicados por ahora)
print("--- Similitudes preliminares ---")
resultado_preliminar.orderBy(col("similitud_coseno").desc()).show(truncate=False)

--- Similitudes preliminares ---




+---------------+---------------+------------------+
|doc1           |doc2           |similitud_coseno  |
+---------------+---------------+------------------+
|libro_2591.txt |libro_2591.txt |1.0000000000000002|
|libro_36034.txt|libro_36034.txt|1.0000000000000002|
|libro_161.txt  |libro_161.txt  |1.0000000000000002|
|libro_45.txt   |libro_45.txt   |1.0000000000000002|
|libro_43.txt   |libro_43.txt   |1.0000000000000002|
|libro_36965.txt|libro_36965.txt|1.0000000000000002|
|libro_345.txt  |libro_345.txt  |1.0000000000000002|
|libro_37106.txt|libro_37106.txt|1.0000000000000002|
|libro_4363.txt |libro_4363.txt |1.0000000000000002|
|libro_514.txt  |libro_514.txt  |1.0000000000000002|
|libro_8492.txt |libro_8492.txt |1.0000000000000002|
|libro_730.txt  |libro_730.txt  |1.0000000000000002|
|libro_34901.txt|libro_34901.txt|1.0000000000000002|
|libro_135.txt  |libro_135.txt  |1.0000000000000002|
|libro_72679.txt|libro_72679.txt|1.0000000000000002|
|libro_6761.txt |libro_6761.txt |1.00000000000

                                                                                

In [19]:
# Limpieza de la matriz:
# 1. Eliminar comparaciones de un libro consigo mismo (doc1 != doc2)
# 2. Eliminar duplicados inversos (A vs B es lo mismo que B vs A) usando los IDs
df_similitud_con_ids = df_similitud.select(
    col("libro_a.nombre_archivo").alias("doc1"),
    col("libro_b.nombre_archivo").alias("doc2"),
    col("libro_a.id_unico").alias("id1"),
    col("libro_b.id_unico").alias("id2"),
    "similitud_coseno"
)

In [20]:
# Filtrar para dejar solo una combinación única (triángulo superior de la matriz)
resultado_unico = df_similitud_con_ids.filter("id1 < id2")

In [21]:
# Seleccionar columnas finales
resultado_final = resultado_unico.select("doc1", "doc2", "similitud_coseno")

In [22]:
print("--- Matriz de similitudes final (limpia) ---")
resultado_final.orderBy(col("similitud_coseno").desc()).show(truncate=False)

--- Matriz de similitudes final (limpia) ---




+---------------+---------------+-------------------+
|doc1           |doc2           |similitud_coseno   |
+---------------+---------------+-------------------+
|libro_24022.txt|libro_46.txt   |0.9981523991634758 |
|libro_514.txt  |libro_37106.txt|0.9949470553307366 |
|libro_41445.txt|libro_84.txt   |0.9771499352528935 |
|libro_27509.txt|libro_27558.txt|0.8699657885508798 |
|libro_25162.txt|libro_17135.txt|0.8576226100208258 |
|libro_26.txt   |libro_3296.txt |0.6267090152275689 |
|libro_26.txt   |libro_8800.txt |0.5464832453034972 |
|libro_8800.txt |libro_3296.txt |0.5375796202666467 |
|libro_100.txt  |libro_3296.txt |0.5280672952137621 |
|libro_100.txt  |libro_26.txt   |0.4715037776196598 |
|libro_100.txt  |libro_8800.txt |0.45912600798761816|
|libro_76.txt   |libro_74.txt   |0.4298719769053569 |
|libro_3296.txt |libro_1998.txt |0.42209595698161756|
|libro_1661.txt |libro_244.txt  |0.41936330119126186|
|libro_8800.txt |libro_228.txt  |0.3997522450214314 |
|libro_1080.txt |libro_46.tx

                                                                                

In [23]:
# ==============================================================================
# PUNTO 4: Función de recomendación (Top 10 libros)
# ==============================================================================

In [24]:
def recomendar_libros(libro_base):
    # Buscar coincidencias donde el libro base es el doc1
    recomendaciones_1 = resultado_final.filter(col("doc1") == libro_base) \
                        .select(col("doc2").alias("libro_recomendado"), "similitud_coseno")

    # Buscar coincidencias donde el libro base es el doc2
    recomendaciones_2 = resultado_final.filter(col("doc2") == libro_base) \
                        .select(col("doc1").alias("libro_recomendado"), "similitud_coseno")

    # Unir ambos resultados (porque A->B es igual que B->A)
    total_recomendaciones = recomendaciones_1.union(recomendaciones_2)

    # Ordenar por mayor similitud y tomar los 10 mejores
    return total_recomendaciones.orderBy(col("similitud_coseno").desc()).limit(10)

In [25]:
print(f"--- Recomendaciones para libro_X.txt ---")
recomendar_libros("libro_46.txt").show(truncate=False)

--- Recomendaciones para libro_X.txt ---




+-----------------+--------------------+
|libro_recomendado|similitud_coseno    |
+-----------------+--------------------+
|libro_24022.txt  |0.9981523991634758  |
|libro_1080.txt   |0.3979155797187435  |
|libro_17135.txt  |0.10426004912624162 |
|libro_25162.txt  |0.0944696544494817  |
|libro_2701.txt   |0.0771094212783218  |
|libro_2591.txt   |0.04430401704343562 |
|libro_34901.txt  |0.04227661693997056 |
|libro_4363.txt   |0.04178167435288526 |
|libro_1661.txt   |0.036057843284959104|
|libro_408.txt    |0.034625795747214146|
+-----------------+--------------------+



                                                                                

In [26]:
# ==============================================================================
# FUNCIÓN 5: Resumir libro (Contar palabras más frecuentes)
# ==============================================================================

In [27]:
def resumir_libro(nombre_archivo, cantidad_palabras=20):
    # Buscar el libro específico dentro del DataFrame original 'rdd_documentos'
    df_libro_encontrado = rdd_documentos.filter(col("ruta_archivo").endswith(nombre_archivo))

    if df_libro_encontrado.count() == 0:
        print("Error: Libro no encontrado:", nombre_archivo)
        return

    print(f"Resumen del libro {nombre_archivo} ({cantidad_palabras} palabras más comunes):")

    # Extraer el texto completo
    texto_raw = df_libro_encontrado.collect()[0]["texto_completo"]

    # Convertir a RDD para usar MapReduce
    rdd_procesamiento = sesion_spark.sparkContext.parallelize([texto_raw])

    # Contar frecuencia de palabras (MapReduce)
    frecuencias = (
        rdd_procesamiento.flatMap(lambda linea: linea.split())  # Separar por espacios
           .map(lambda palabra: palabra.lower())                # Convertir a minúsculas
           .map(lambda palabra: (palabra, 1))                   # Asignar valor 1
           .reduceByKey(lambda a, b: a + b)                     # Sumar ocurrencias
    )

    # Convertir a DataFrame para visualizar mejor
    df_frecuencias = frecuencias.toDF(["palabra", "frecuencia"])

    # Ordenar de mayor a menor frecuencia
    df_ordenado = df_frecuencias.orderBy(col("frecuencia").desc())

    # Mostrar resultado
    df_ordenado.show(cantidad_palabras, truncate=False)

    return df_ordenado

In [28]:
# Ejecutar la función de resumen
resumir_libro("libro_46.txt", 20)

Resumen del libro libro_46.txt (20 palabras más comunes):
+---------+----------+
|palabra  |frecuencia|
+---------+----------+
|scrooge  |362       |
|said     |221       |
|upon     |120       |
|one      |111       |
|gutenberg|97        |
|ghost    |96        |
|christmas|91        |
|spirit   |90        |
|project  |88        |
|would    |84        |
|man      |79        |
|old      |72        |
|time     |68        |
|good     |67        |
|little   |64        |
|could    |62        |
|know     |61        |
|like     |61        |
|tm       |57        |
|work     |56        |
+---------+----------+
only showing top 20 rows


DataFrame[palabra: string, frecuencia: bigint]