In [1]:
import sys
import os

# Agregar la ruta del proyecto a sys.path
sys.path.append(os.path.abspath(".."))       # si el notebook está dentro de /notebooks
sys.path.append(os.path.abspath("."))        # si el notebook está en la raíz

# Mostrar rutas para confirmar
sys.path


['/usr/lib/python312.zip',
 '/usr/lib/python3.12',
 '/usr/lib/python3.12/lib-dynload',
 '',
 '/home/arturo/venv/lib/python3.12/site-packages',
 '/home/arturo/project_gutenberg',
 '/home/arturo/project_gutenberg/notebooks']

In [2]:
import nltk
nltk.download('punkt_tab')
nltk.download('punkt')
nltk.download('stopwords')


[nltk_data] Downloading package punkt_tab to /home/arturo/nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!
[nltk_data] Downloading package punkt to /home/arturo/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to /home/arturo/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [3]:
from pyspark.sql import SparkSession
import os

from pyspark.sql.functions import udf, col
from pyspark.sql.types import ArrayType, StringType, IntegerType
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, IDF, Normalizer
import numpy as np
import os, json, pickle
from src.utils import strip_gutenberg_headers, preprocess_text, read_txt, load_all_books

spark = SparkSession.builder \
    .appName("ProyectoLibros") \
    .master("local[*]") \
    .config("spark.driver.memory","4g") \
    .getOrCreate()

spark

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/12/05 22:26:10 WARN Utils: Your hostname, arturo-VirtualBox, resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
25/12/05 22:26:10 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/05 22:26:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/12/05 22:26:12 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [4]:
data_dir = "../data"  # ajusta la ruta si es necesario
books = []
for fname in sorted(os.listdir(data_dir)):
    if not fname.endswith(".txt"):
        continue
    book_id = os.path.splitext(fname)[0]
    path = os.path.join(data_dir, fname)
    raw = read_txt(path)
    main = strip_gutenberg_headers(raw)
    # NOTA: aquí guardamos texto completo y tokens para Spark
    tokens = preprocess_text(main, language='english')
    books.append((int(book_id) if book_id.isdigit() else book_id, fname, main, tokens))

len(books)


10

In [6]:
# Celda 3: DataFrame Spark con (book_id, title, text, tokens)
df = spark.createDataFrame([(b[0], b[1], b[2], b[3]) for b in books],
                           schema=["book_id","title","text","tokens"])
df.show(4, truncate=60)


25/12/05 22:43:55 WARN TaskSetManager: Stage 1 contains a task of very large size (10072 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+-------+--------+------------------------------------------------------------+------------------------------------------------------------+
|book_id|   title|                                                        text|                                                      tokens|
+-------+--------+------------------------------------------------------------+------------------------------------------------------------+
|     11|  11.txt|[Illustration]\n\n\n\n\nAlice’s Adventures in Wonderland\...|[illustration, alice, adventures, wonderland, lewis, carr...|
|   1342|1342.txt|[Illustration:\n\n                             GEORGE ALL...|[illustration, george, allen, publisher, 156, charing, cr...|
|   1661|1661.txt|﻿The Project Gutenberg eBook of The Adventures of Sherloc...|[project, gutenberg, ebook, adventures, sherlock, holmes,...|
|   2554|2554.txt|CRIME AND PUNISHMENT\n\nBy Fyodor Dostoevsky\n\n\n\nTrans...|[crime, punishment, fyodor, dostoevsky, translated, const...|
+-------+----

In [7]:
# Celda 5: StopWordsRemover
remover = StopWordsRemover(inputCol="tokens", outputCol="tokens_nostop")
# si quieres usar tu propia lista: remover.setStopWords(my_list)
df = remover.transform(df)
df.select("book_id","title","tokens_nostop").show(3, truncate=60)


25/12/05 22:44:56 WARN TaskSetManager: Stage 2 contains a task of very large size (10072 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+-------+--------+------------------------------------------------------------+
|book_id|   title|                                               tokens_nostop|
+-------+--------+------------------------------------------------------------+
|     11|  11.txt|[illustration, alice, adventures, wonderland, lewis, carr...|
|   1342|1342.txt|[illustration, george, allen, publisher, 156, charing, cr...|
|   1661|1661.txt|[project, gutenberg, ebook, adventures, sherlock, holmes,...|
+-------+--------+------------------------------------------------------------+
only showing top 3 rows


In [8]:
# Celda 6: CountVectorizer y IDF
vocabSize = 20000
cv = CountVectorizer(inputCol="tokens_nostop", outputCol="raw_features", vocabSize=vocabSize, minDF=2.0)
cv_model = cv.fit(df)
df = cv_model.transform(df)

idf = IDF(inputCol="raw_features", outputCol="tfidf")
idf_model = idf.fit(df)
df = idf_model.transform(df)

# Normalizar tfidf
normalizer = Normalizer(inputCol="tfidf", outputCol="tfidf_norm", p=2.0)
df = normalizer.transform(df)

df.select("book_id","title","tfidf_norm").show(3, truncate=60)


25/12/05 22:45:52 WARN TaskSetManager: Stage 3 contains a task of very large size (10072 KiB). The maximum recommended task size is 1000 KiB.
25/12/05 22:46:02 WARN TaskSetManager: Stage 7 contains a task of very large size (10072 KiB). The maximum recommended task size is 1000 KiB.
25/12/05 22:46:32 WARN TaskSetManager: Stage 8 contains a task of very large size (10072 KiB). The maximum recommended task size is 1000 KiB.
[Stage 8:>                                                          (0 + 1) / 1]

+-------+--------+------------------------------------------------------------+
|book_id|   title|                                                  tfidf_norm|
+-------+--------+------------------------------------------------------------+
|     11|  11.txt|(16730,[0,1,2,3,4,5,6,7,8,9,11,12,14,15,16,17,18,19,20,21...|
|   1342|1342.txt|(16730,[0,1,2,3,4,5,6,7,8,9,10,11,12,14,15,16,17,18,19,20...|
|   1661|1661.txt|(16730,[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19...|
+-------+--------+------------------------------------------------------------+
only showing top 3 rows


                                                                                

In [9]:
# Celda 7: Convertir vectores de Spark a numpy en memoria
rows = df.select("book_id","title","tfidf_norm").collect()
book_ids = []
id_to_title = {}
id_to_vec = {}
for r in rows:
    bid = r["book_id"]
    book_ids.append(bid)
    id_to_title[bid] = r["title"]
    vec = np.array(r["tfidf_norm"].toArray())
    id_to_vec[bid] = vec

len(book_ids)


25/12/05 22:51:25 WARN TaskSetManager: Stage 9 contains a task of very large size (10072 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

10

In [32]:
# Celda 8: Matriz de similitud (dot product porque vectores normalizados)
n = len(book_ids)
sim_matrix = np.zeros((n, n), dtype=float)
for i in range(n):
    vi = id_to_vec[book_ids[i]]
    for j in range(i, n):
        vj = id_to_vec[book_ids[j]]
        s = float(np.dot(vi, vj))
        sim_matrix[i, j] = s
        sim_matrix[j, i] = s

# guardar índice -> book_id mapping
index_to_id = {i: book_ids[i] for i in range(n)}
id_to_index = {book_ids[i]: i for i in range(n)}


In [31]:
# Celda 9: recomendador
def recomendar(libro_id, N=5):
    if libro_id not in id_to_index:
        raise ValueError("book_id no encontrado")
    idx = id_to_index[libro_id]
    sims = sim_matrix[idx]
    pairs = [(index_to_id[i], float(sims[i])) for i in range(len(sims)) if i != idx]
    pairs_sorted = sorted(pairs, key=lambda x: x[1], reverse=True)
    results = [(pid, id_to_title[pid], score) for pid, score in pairs_sorted[:N]]
    return results

# Ejemplo (reemplaza por un book_id real):
ejemplo_id = book_ids[4]
print("Libro base:", ejemplo_id, id_to_title[ejemplo_id])
print("Recomendados:")
for bid, title, score in recomendar(ejemplo_id, N=5):
    print(bid, title, round(score, 4))


Libro base: 2600 2600.txt
Recomendados:
2554 2554.txt 0.2064
1661 1661.txt 0.0938
98 98.txt 0.0849
84 84.txt 0.0671
1342 1342.txt 0.0331


In [19]:
# Celda 10: top palabras por documento
vocab = cv_model.vocabulary  # lista de tokens

def top_palabras(libro_id, M=10):
    # buscar fila en df con tfidf (no normalizada — usamos tfidf model output)
    row = df.filter(col("book_id") == libro_id).select("tfidf").collect()
    if not row:
        raise ValueError("book_id no encontrado en DataFrame")
    vec = row[0]["tfidf"]  # SparseVector
    items = list(zip(vec.indices, vec.values))
    items_sorted = sorted(items, key=lambda x: x[1], reverse=True)
    top = [(vocab[idx], float(val)) for idx, val in items_sorted[:M]]
    return top

# Ejemplo:
print(top_palabras(ejemplo_id, M=7))


25/12/05 23:10:11 WARN TaskSetManager: Stage 15 contains a task of very large size (10072 KiB). The maximum recommended task size is 1000 KiB.

[('elizabeth', 508.5549974349543), ('jane', 392.38346120733877), ('mrs', 214.57207446389168), ('mr', 162.14192193341822), ('catherine', 133.53132034155934), ('illustration', 128.51854973937606), ('gardiner', 126.03044946063531)]


                                                                                

In [20]:
df.select("book_id", "title").show(200, truncate=False)


25/12/05 23:13:52 WARN TaskSetManager: Stage 16 contains a task of very large size (10072 KiB). The maximum recommended task size is 1000 KiB.
25/12/05 23:14:03 WARN TaskSetManager: Stage 17 contains a task of very large size (5477 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+-------+--------+
|book_id|title   |
+-------+--------+
|11     |11.txt  |
|1342   |1342.txt|
|1661   |1661.txt|
|2554   |2554.txt|
|2600   |2600.txt|
|2701   |2701.txt|
|3296   |3296.txt|
|43     |43.txt  |
|84     |84.txt  |
|98     |98.txt  |
+-------+--------+



In [21]:
df.filter(df.title.contains("1342")).show(truncate=False)


25/12/05 23:14:30 WARN TaskSetManager: Stage 18 contains a task of very large size (10072 KiB). The maximum recommended task size is 1000 KiB.
25/12/05 23:14:34 WARN TaskSetManager: Stage 19 contains a task of very large size (5477 KiB). The maximum recommended task size is 1000 KiB.
IOPub data rate exceeded.                                                       
The Jupyter server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--ServerApp.iopub_data_rate_limit`.

Current values:
ServerApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
ServerApp.rate_limit_window=3.0 (secs)



In [22]:
def top_palabras(libro_id, M=10):
    row = df.filter(col("book_id") == libro_id).select("tfidf").collect()
    if not row:
        raise ValueError("book_id no encontrado")
    vec = row[0]["tfidf"]  # SparseVector
    items = list(zip(vec.indices, vec.values))
    items_sorted = sorted(items, key=lambda x: x[1], reverse=True)
    vocab = cv_model.vocabulary
    top = [(vocab[idx], float(val)) for idx, val in items_sorted[:M]]
    return top


In [36]:
top_palabras(84, 7)


25/12/05 23:23:57 WARN TaskSetManager: Stage 30 contains a task of very large size (10072 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

[('elizabeth', 72.53807715351286),
 ('geneva', 46.774187428689395),
 ('cottagers', 29.883508634996),
 ('lake', 27.31322461531896),
 ('cottage', 23.051241310895918),
 ('labours', 22.087810730214436),
 ('fiend', 21.214753124961042)]

In [39]:
!python src/download_books.py


python: can't open file '/home/arturo/project_gutenberg/notebooks/src/download_books.py': [Errno 2] No such file or directory
