# Proyecto: Sistema de Recomendación de Libros Gutenberg con PySpark

Este Proyecto fue elaborado por Gerardo Gael Gallardo Garcia del grupo 5-2

## Paso 1: Descarga de Libros
Se descargaron 100 libros de la pagina de gutenberg

In [1]:
import re
import os
import requests

from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor

def get_links(n: int | list[int] = -1) -> tuple[list[str], list[str]]:
    """Gets the urls for books in the Gutenberg project with numbers `n`."""
    assert n == -1 or min(n) > 0, ( "n can only take values greater than zero "
                                    "or -1 if all links are desired" )
    url = "https://www.gutenberg.org/browse/scores/top"
    try:
        response = requests.get(url)
        parser = BeautifulSoup(response.text, 'html.parser')

        ordered_list = parser.find('ol')
        list_items = ordered_list.find_all('li')

        if(n != -1):
            list_filtered = [list_items[i-1] for i in list(n)]
        else:
            list_filtered = list_items

        prefix = "https://www.gutenberg.org"
        suffix = ".txt.utf-8"
        links, titles = [], []

        for li in list_filtered:
            links.append(prefix + li.find("a").get("href") + suffix)

            title = re.sub(r'\s+', '_', li.get_text())
            title = re.sub(r'_\(\d+\)$', '', title)
            title += '.txt'
            titles.append(title)

        return links, titles

    except requests.exceptions.RequestException:
        print("wrong url for Gutenberg project")


def download_file(url, name):
    response = requests.get(url, stream=True)
    with open(name, mode='wb') as file:
        for chunk in response.iter_content(chunk_size=10 * 1024):
            file.write(chunk)
    print(f"Downloaded file: {name}")


def store_files(links, names, folder):
    os.makedirs(folder, exist_ok=True)  # Ensure folder exists
    filepaths = [os.path.join(folder, name) for name in names]

    with ThreadPoolExecutor() as executor:
        executor.map(download_file, links, filepaths)


def store_files_slow(links, names, folder):
    os.makedirs(folder, exist_ok=True)
    for url, name in zip(links, names):
        download_file(url, os.path.join(folder, name))


def main(n=-1, folder="gutenberg_books"):
    links, titles = get_links(n)
    store_files(links, titles, folder)
    print("Done")


n = range(1, 101)
main(n, folder="Libros")

Downloaded file: Libros/The_Strange_Case_of_Dr._Jekyll_and_Mr._Hyde_by_Robert_Louis_Stevenson.txt
Downloaded file: Libros/Pride_and_Prejudice_by_Jane_Austen.txt
Downloaded file: Libros/Frankenstein;_Or,_The_Modern_Prometheus_by_Mary_Wollstonecraft_Shelley.txt
Downloaded file: Libros/The_King_in_Yellow_by_Robert_W._Chambers.txt
Downloaded file: Libros/A_Christmas_Carol_in_Prose;_Being_a_Ghost_Story_of_Christmas_by_Charles_Dickens.txt
Downloaded file: Libros/Little_Women;_Or,_Meg,_Jo,_Beth,_and_Amy_by_Louisa_May_Alcott.txt
Downloaded file: Libros/Moby_Dick;_Or,_The_Whale_by_Herman_Melville.txt
Downloaded file: Libros/A_Doll's_House_:_a_play_by_Henrik_Ibsen.txt
Downloaded file: Libros/The_Scarlet_Letter_by_Nathaniel_Hawthorne.txt
Downloaded file: Libros/Beowulf:_An_Anglo-Saxon_Epic_Poem.txt
Downloaded file: Libros/Alice's_Adventures_in_Wonderland_by_Lewis_Carroll.txt
Downloaded file: Libros/A_Room_with_a_View_by_E._M._Forster.txt
Downloaded file: Libros/Romeo_and_Juliet_by_William_Shakesp

## Paso 2: Limpieza de Datos
Este script elimina los datos no pertenecientes al libro, se podria considerar que son datos basuras implementados por gutenberg, los elimina al inicio y al final. Tambien elimina 3 libros no deseados pues estan en otro idioma o en un formato ilegible y elimina los caracteres especiales de los nombres de los libros pues Spark no puede leerlos

In [2]:
import os
import re


# Libros a eliminar 
LIBROS_A_ELIMINAR = {
    "冷眼观_by_Junqing_Wang.txt",
    "池北偶談_by_Shizhen_Wang.txt",
    "Tractatus_Logico-Philosophicus_by_Ludwig_Wittgenstein.txt"
}


def normalizar_nombre_archivo(folder, filename):
    """
    Elimina caracteres especiales del nombre del archivo
    para evitar problemas con Spark y URIs.
    """
    nombre_limpio = re.sub(r"[^\w\-.]", "_", filename)

    if nombre_limpio != filename:
        old_path = os.path.join(folder, filename)
        new_path = os.path.join(folder, nombre_limpio)
        os.rename(old_path, new_path)
        print(f"✏ Renombrado: {filename} → {nombre_limpio}")
        return nombre_limpio

    return filename


def clean_gutenberg_text(path):
    """
    Elimina encabezados y pies de Project Gutenberg
    sobrescribiendo el archivo original.
    """
    try:
        with open(path, "r", encoding="utf-8", errors="ignore") as f:
            text = f.read()

        start = re.search(
            r"\*\*\*\s*START OF THE PROJECT GUTENBERG EBOOK.*?\*\*\*",
            text,
            re.IGNORECASE | re.DOTALL
        )
        end = re.search(
            r"\*\*\*\s*END OF THE PROJECT GUTENBERG EBOOK.*?\*\*\*",
            text,
            re.IGNORECASE | re.DOTALL
        )

        if not start or not end:
            print(f"⚠️  Sin marcadores Gutenberg: {os.path.basename(path)}")
            return False

        clean_text = text[start.end():end.start()].strip()

        with open(path, "w", encoding="utf-8") as f:
            f.write(clean_text)

        print(f"✔ Limpiado: {os.path.basename(path)}")
        return True

    except Exception as e:
        print(f"❌ Error en {os.path.basename(path)}: {e}")
        return False


def clean_all_books(folder):
    folder = os.path.expanduser(folder)

    for file in os.listdir(folder):

        # Eliminar libros prohibidos
        if file in LIBROS_A_ELIMINAR:
            os.remove(os.path.join(folder, file))
            continue

        if not file.endswith(".txt"):
            continue

        # Normalizar nombre del archivo
        file = normalizar_nombre_archivo(folder, file)

      
        path = os.path.join(folder, file)

        clean_gutenberg_text(path)
            




if __name__ == "__main__":
    clean_all_books("~/Sistdist/Libros")
    print("Finalizado!")


✔ Limpiado: Pride_and_Prejudice_by_Jane_Austen.txt
✔ Limpiado: Metamorphosis_by_Franz_Kafka.txt
✏ Renombrado: A_Doll's_House_:_a_play_by_Henrik_Ibsen.txt → A_Doll_s_House___a_play_by_Henrik_Ibsen.txt
✔ Limpiado: A_Doll_s_House___a_play_by_Henrik_Ibsen.txt
✔ Limpiado: Dracula_by_Bram_Stoker.txt
✔ Limpiado: The_Tragical_History_of_Doctor_Faustus_by_Christopher_Marlowe.txt
✔ Limpiado: The_Picture_of_Dorian_Gray_by_Oscar_Wilde.txt
✔ Limpiado: Heart_of_Darkness_by_Joseph_Conrad.txt
✏ Renombrado: The_Interesting_Narrative_of_the_Life_of_Olaudah_Equiano,_Or_Gustavus_Vassa,_The_African_by_Equiano.txt → The_Interesting_Narrative_of_the_Life_of_Olaudah_Equiano__Or_Gustavus_Vassa__The_African_by_Equiano.txt
✔ Limpiado: The_Interesting_Narrative_of_the_Life_of_Olaudah_Equiano__Or_Gustavus_Vassa__The_African_by_Equiano.txt
✏ Renombrado: Moby_Dick;_Or,_The_Whale_by_Herman_Melville.txt → Moby_Dick__Or__The_Whale_by_Herman_Melville.txt
✔ Limpiado: Moby_Dick__Or__The_Whale_by_Herman_Melville.txt
✔ Limp

## Paso 3: Creación de Vocabularios
Este script crea un vocabulario de cada 1 de los libros y coloca un contador de las palabras y que tanto se repiten

In [3]:
import os
import shutil

from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    input_file_name, lower, regexp_replace,
    split, explode, regexp_extract,
    col, expr
)
from pyspark.ml.feature import StopWordsRemover
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, desc


# SESIÓN SPARK
spark = (
    SparkSession.builder
    .appName("VocabulariosRapidosOrdenados")
    .master("local[*]")
    .config("spark.sql.shuffle.partitions", "200")
    .config("spark.hadoop.mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
    .getOrCreate()
)


spark.sparkContext.setLogLevel("ERROR")


# CARGAR LIBROS
ruta_libros = "/home/kuatrogs/Sistdist/Libros/*.txt"

df = (
    spark.read.text(ruta_libros)
    .withColumn("filename", input_file_name())
)


# LIMPIEZA Y TOKENIZACIÓN
df = df.withColumn("line", lower("value"))

df = df.withColumn(
    "clean_line",
    regexp_replace("line", r"[^a-z0-9\s]", " ")
)

df = df.withColumn("tokens", split("clean_line", r"\s+"))

remover = StopWordsRemover(
    inputCol="tokens",
    outputCol="filtered",
    stopWords=StopWordsRemover.loadDefaultStopWords("english")
)
df = remover.transform(df)

# Filtrar tokens ANTES del explode
df = df.withColumn(
    "filtered",
    expr("filter(filtered, x -> x IS NOT NULL AND length(x) > 1)")
)

df = df.withColumn("word", explode("filtered"))


# EXTRAER NOMBRE DEL LIBRO
df = df.withColumn(
    "libro",
    regexp_extract("filename", r"([^/]+)\.txt$", 1)
)


# CONTAR FRECUENCIAS
frecuencia = (
    df
    .repartition("libro")
    .groupBy("libro", "word")
    .count()
)


# ORDENAR POR FRECUENCIA (DESC) POR LIBRO
window_libro = Window.partitionBy("libro").orderBy(desc("count"))

frecuencia_ordenada = (
    frecuencia
    .withColumn("rank", row_number().over(window_libro))
    .orderBy("libro", "rank")
)


# ESCRIBIR TODO EN PARALELO
ruta_vocab = "/home/kuatrogs/Sistdist/Vocabularios"
os.makedirs(ruta_vocab, exist_ok=True)

(
    frecuencia_ordenada
    .select("libro", "word", "count")
    .write
    .mode("overwrite")
    .option("sep", "\t")
    .option("header", "false")
    .partitionBy("libro")
    .csv(ruta_vocab)
)


# RENOMBRAR LIBRO
for carpeta in os.listdir(ruta_vocab):
    if not carpeta.startswith("libro="):
        continue

    libro = carpeta.replace("libro=", "")
    carpeta_libro = os.path.join(ruta_vocab, carpeta)

    for f in os.listdir(carpeta_libro):
        if f.startswith("part-"):
            origen = os.path.join(carpeta_libro, f)
            destino = os.path.join(
                ruta_vocab,
                f"vocabulario_{libro}.txt"
            )
            if os.path.exists(destino):
                os.remove(destino)

            os.rename(origen, destino)
            break

    shutil.rmtree(carpeta_libro)


spark.stop()
print("Vocabularios creados!")


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/12/10 13:06:56 WARN Utils: Your hostname, kuatrogs-B450M-DS3H-V2, resolves to a loopback address: 127.0.1.1; using 192.168.1.10 instead (on interface enp5s0)
25/12/10 13:06:56 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/10 13:07:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

Vocabularios creados!


## Paso 4: Creación de Matriz de Similitud
Este script genera la matriz de similitud en un archivo "csv" y limpia los nombres de los libros colocandolos en un formato similar al siguiente: `The_Brothers_Karamazov_by_Fyodor_Dostoyevsky`

In [7]:
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.functions import input_file_name
import numpy as np
import pandas as pd
import math
import os
import warnings
import logging

# CONFIGURACIÓN
warnings.filterwarnings("ignore")
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"

for logger in ["py4j", "pyspark", "org.apache.spark"]:
    logging.getLogger(logger).setLevel(logging.CRITICAL)

RUTA_VOCAB = "../Sistdist/Vocabularios/*.txt"
SALIDA_CSV = "../Sistdist/matriz_similitud_vocabularios.csv"

print("Iniciando proceso...")

# SPARK SESSION
spark = (
    SparkSession.builder
    .appName("SimilitudVocabulariosTFIDF_Manual")
    .config("spark.ui.showConsoleProgress", "false")
    .getOrCreate()
)

spark.sparkContext.setLogLevel("OFF")


# 1. Cargar vocabularios 
df = (
    spark.read.text(RUTA_VOCAB)
    .withColumn("archivo", F.regexp_replace(input_file_name(), ".*/", ""))
    .withColumn("archivo", F.regexp_replace("archivo", "^vocabulario_", ""))
    .withColumn("archivo", F.regexp_replace("archivo", r"\.txt$", ""))
)

# Extraer solo la palabra
df = df.withColumn("token", F.split(F.col("value"), r"\s+")[0])



# 2. CONTAR FRECUENCIAS: (doc, palabra, freq)
df_freq = (
    df.groupBy("archivo", "token")
      .agg(F.count("*").alias("freq"))
)

# Convertir a RDD 
rdd_freq = df_freq.rdd.map(lambda x: ((x["archivo"], x["token"]), x["freq"]))



# 3. Convertir a formato (doc, palabra, freq)
def parse_freq_tuple(x):
    return (x[0][0], x[0][1], x[1])

rdd_parsed = rdd_freq.map(parse_freq_tuple)


# 4. TOTAL DE PALABRAS POR DOCUMENTO
doc_totals = (
    rdd_parsed
    .map(lambda x: (x[0], x[2]))
    .reduceByKey(lambda a, b: a + b)
)


# 5. CALCULAR TF 
rdd_doc_word_freq = rdd_parsed.map(lambda x: ((x[0], x[1]), x[2]))

rdd_with_totals = (
    rdd_doc_word_freq
    .map(lambda x: (x[0][0], (x[0][1], x[1])))
    .join(doc_totals)
    .map(lambda x: ((x[0], x[1][0][0]), (x[1][0][1], x[1][1])))
)

rdd_tf = rdd_with_totals.map(lambda x: (x[0], x[1][0] / x[1][1]))


# 6. CALCULAR IDF 
total_docs = doc_totals.count()

# CONTAR EN CUANTOS DOCUMENTOS SE ENCUENTRA LA MISMA PALABRA
rdd_word_doc_count = (
    rdd_parsed
    .map(lambda x: (x[1], x[0]))
    .distinct()
    .map(lambda x: (x[0], 1))
    .reduceByKey(lambda a, b: a + b)
)

rdd_idf = rdd_word_doc_count.map(
    lambda x: (x[0], math.log(total_docs / x[1]))
)


# 7. CALCULAR TF-IDF 
rdd_tf_prep = rdd_tf.map(lambda x: (x[0][1], (x[0][0], x[1])))

rdd_tf_idf = (
    rdd_tf_prep
    .join(rdd_idf)
    .map(lambda x: ((x[1][0][0], x[0]), x[1][0][1] * x[1][1]))
)


# 8. Construcción del vocabulario
vocab_global_indexed = (
    rdd_tf_idf.map(lambda x: x[0][1]).distinct().zipWithIndex()
)

vocab_dict = vocab_global_indexed.collectAsMap()


# 9. Convertir TF-IDF a vectores 
rdd_doc_vectors = (
    rdd_tf_idf
    .map(lambda x: (x[0][0], (vocab_dict[x[0][1]], x[1])))
    .groupByKey()
    .map(lambda x: (x[0], list(x[1])))
)


# 10. Recolectar para similitud coseno
doc_vectors_list = rdd_doc_vectors.collect()
doc_names = [doc[0] for doc in doc_vectors_list]

# convertir a vectores (numpy)
vocab_size = len(vocab_dict)
vectores = np.zeros((len(doc_vectors_list), vocab_size))

for i, (doc, vec) in enumerate(doc_vectors_list):
    for idx, value in vec:
        vectores[i, idx] = value


# 11. CALCULAR SIMILITUD COSENO
norms = np.linalg.norm(vectores, axis=1, keepdims=True)
norms[norms == 0] = 1.0

vectores_norm = vectores / norms
similitud = np.dot(vectores_norm, vectores_norm.T)


# 12. Exportar CSV
df_result = pd.DataFrame(similitud, index=doc_names, columns=doc_names)
df_result.to_csv(SALIDA_CSV, encoding="utf-8", float_format="%.6f")

print("MATRIZ DE SIMILITUD CREADA")

spark.stop()
print("Proceso finalizado correctamente.")


Iniciando proceso...
MATRIZ DE SIMILITUD CREADA (TF-IDF MANUAL)
Proceso finalizado correctamente.


## Paso 5: Recomendación de Libros
Este script te recomienda "N" cantidad de libros que sean colocados en la variable `recomendaciones` y para seleccionar el libro cambiar la variable `libro`, teniendo en cuenta el formato de los nombres de la matriz csv generada en el paso anterior

In [8]:
import pandas as pd
import os


RUTA_MATRIZ = "../Sistdist/matriz_similitud_vocabularios.csv"


def recomendar_libros(libro_preferido: str, n_recomendaciones: int):
    """
    Recomienda N libros basados en similitud coseno TF-IDF.

    Parámetros:
        libro_preferido (str): nombre exacto del libro (archivo)
        recomendaciones (int): número de libros a recomendar

    Retorna:
        DataFrame con libros recomendados y puntaje de similitud
    """

    if not os.path.exists(RUTA_MATRIZ):
        raise FileNotFoundError("No se encontró la bibliotecaS de similitud")

    # Cargar matriz
    df = pd.read_csv(RUTA_MATRIZ, index_col=0)

    if libro_preferido not in df.columns:
        raise ValueError(f"Libro '{libro_preferido}' no existe en la biblioteca")

    # Obtener similitudes del libro favorito
    similitudes = df[libro_preferido]

    # Quitar el mismo libro
    similitudes = similitudes.drop(libro_preferido)

    # Ordenar por similitud (descendente)
    recomendaciones = (
        similitudes
        .sort_values(ascending=False)
        .head(n_recomendaciones)
        .reset_index()
    )

    recomendaciones.columns = ["libro_recomendado", "similitud"]

    return recomendaciones


# EJECUCION
if __name__ == "__main__":

    libro = "Alice_s_Adventures_in_Wonderland_by_Lewis_Carroll"   #Elige un nombre del libro de la matriz
    recomendaciones = 5  #Cantidad de recomendaciones deseadas

    try:
        recs = recomendar_libros(libro, recomendaciones)
        print("\nLibros recomendados:")
        print(recs.to_string(index=False))
    except Exception as e:
        print("Error:", e)


Libros recomendados:
                          libro_recomendado  similitud
             Bleak_House_by_Charles_Dickens   0.098878
The_Wonderful_Wizard_of_Oz_by_L._Frank_Baum   0.096608
         Three_Men__A_Novel_by_Maksim_Gorky   0.096550
   Anne_of_Green_Gables_by_L._M._Montgomery   0.092971
      Great_Expectations_by_Charles_Dickens   0.092949


## Paso 6: Palabras Importantes
Este Script selecciona "N" palabras mas importantes de un libro para poder utilizarlo tienes que cambiar la variable `N` para seleccionar la cantidad de palabras importantes que deseas ver y para cambiar de libro tienes que cambiar la variable `libro` teniendo en cuenta la matriz csv creada en el paso 4

In [3]:
from pyspark.sql import SparkSession, functions as F
import os

BASE_VOCABULARIOS = "../Sistdist/Vocabularios"


def palabras_descriptivas(libro: str, M: int):
    """
    Regresa M palabras que describen un documento deseado.

    Input:
        libro (str): nombre del libro
        M (int): número de palabras características

    Output:
        Lista de tuplas (palabra, frecuencia)

    Error:
        Lanza ValueError si el libro no existe.
    """

    ruta_libro = f"{BASE_VOCABULARIOS}/vocabulario_{libro}.txt"

    if not os.path.exists(ruta_libro):
        raise ValueError(f"❌ El libro '{libro}' no existe")


    # LECTURA DE DOCUMENTOS
    df = spark.read.text(ruta_libro)

    # PROCESAMIENTO
    df = (
        df.withColumn("split", F.split(F.col("value"), "\t"))
          .withColumn("palabra", F.col("split").getItem(0))
          .withColumn("frecuencia", F.col("split").getItem(1).cast("int"))
          .filter(F.col("frecuencia").isNotNull())
          .select("palabra", "frecuencia")
    )

    resultado = (
        df.orderBy(F.desc("frecuencia"))
          .limit(M)
          .collect()
    )

    return [(r["palabra"], r["frecuencia"]) for r in resultado]



# EJECUCIÓN
if __name__ == "__main__":

    spark = (
        SparkSession.builder
        .appName("PalabrasImportantes")
        .getOrCreate()
    )
    spark.sparkContext.setLogLevel("ERROR")

    libro = "Alice_s_Adventures_in_Wonderland_by_Lewis_Carroll"
    N = 5

    palabras = palabras_descriptivas(libro, N)

    print(f"\n{N} palabras más importantes de '{libro}':\n")
    for palabra, freq in palabras:
        print(f"({palabra}, {freq})")

    spark.stop()



5 palabras más importantes de 'Alice_s_Adventures_in_Wonderland_by_Lewis_Carroll':

(said, 462)
(alice, 399)
(little, 129)
(one, 104)
(know, 87)
