In [1]:
#Vamos a importar librerías para calcular TF-IDF
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors, DenseVector
import os
import shutil
import math

# Inicializar Spark
spark = SparkSession.builder \
    .appName("ProyectoSparkie-Similitudes") \
    .master("local[*]") \
    .getOrCreate()

sc = spark.sparkContext

print(" - Spark inicializado - ")

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/12/07 19:25:26 WARN Utils: Your hostname, maria-lopez-VirtualBox, resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
25/12/07 19:25:26 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/07 19:25:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


 - Spark inicializado - 


In [2]:
# Aquí se cargan las frecuencias sacadas de Vocabulario luego de aplicar los filtros
freq_path = "../data/processed/frecuencias_rdd"

# Leer el RDD guardado
rdd_freq = sc.textFile(freq_path)

# Función para parsear el formato para que sean tuplas: "(('documento.txt', 'palabra'), frecuencia)"
def parse_freq(line):
    """Convierte string del RDD a tupla Python"""
    # Formato ejemplo: "(('Doc.txt', 'palabra'), 123)", 123 son la frecuencia con la que aparece
    import ast
    parsed = ast.literal_eval(line)
    documento = parsed[0][0]
    palabra = parsed[0][1]
    freq = parsed[1]
    return (documento, palabra, freq)

rdd_parsed = rdd_freq.map(parse_freq)

print("- Primeros 5 registros:")
for item in rdd_parsed.take(5):
    print(item)
    
print(f"\n- Total de pares (documento, palabra): {rdd_parsed.count()}")

- Primeros 5 registros:


                                                                                

('The_Republic_by_Plato.txt', 'republic', 107)
('The_Republic_by_Plato.txt', 'jowett', 2)
('The_Republic_by_Plato.txt', 'note', 31)
('The_Republic_by_Plato.txt', 'also', 404)
('The_Republic_by_Plato.txt', 'ebook', 1)


[Stage 1:>                                                          (0 + 2) / 2]


- Total de pares (documento, palabra): 821702


                                                                                

In [3]:
# Aquí se calcula el TF

# 1. Calcular total de palabras por documento 
doc_totals = rdd_parsed.map(lambda x: (x[0], x[2])) \
                       .reduceByKey(lambda a, b: a + b)

print("- Total de palabras por documento (primeros 5):")
for item in doc_totals.take(5):
    print(f"  {item[0]}: {item[1]} palabras")

# 2. Calcular TF: freq / total_palabras_doc, la formula que nos dio el profesor de TF = Numero de apariciones del termino N en documento M / Numero de tockens en el doc j
# Para recordar, la estructura es así: ((doc, palabra), freq)
rdd_doc_word_freq = rdd_parsed.map(lambda x: ((x[0], x[1]), x[2]))

# Join con totales: ((doc, palabra), (freq, total))
rdd_with_totals = rdd_doc_word_freq.map(lambda x: (x[0][0], (x[0][1], x[1]))) \
                                   .join(doc_totals) \
                                   .map(lambda x: ((x[0], x[1][0][0]), (x[1][0][1], x[1][1])))

# Calcular con la formula del profe TF = freq / total
rdd_tf = rdd_with_totals.map(lambda x: (x[0], x[1][0] / x[1][1]))

print("\n- TF calculado (primeros 5):")
for item in rdd_tf.take(5):
    print(f"  {item[0]}: TF = {item[1]:.6f}")

- Total de palabras por documento (primeros 5):


                                                                                

  The_Republic_by_Plato.txt: 93584 palabras
  The_2006_CIA_World_Factbook_by_United_States._Central_Intelligence_Agency.txt: 884379 palabras
  Little_Women;_Or,_Meg,_Jo,_Beth,_and_Amy_by_Louisa_May_Alcott.txt: 92687 palabras
  Middlemarch_by_George_Eliot.txt: 147972 palabras
  The_King_in_Yellow_by_Robert_W._Chambers.txt: 34765 palabras

- TF calculado (primeros 5):




  ('Middlemarch_by_George_Eliot.txt', 'george'): TF = 0.000061
  ('Middlemarch_by_George_Eliot.txt', 'york'): TF = 0.000027
  ('Middlemarch_by_George_Eliot.txt', 'company'): TF = 0.000331
  ('Middlemarch_by_George_Eliot.txt', 'dear'): TF = 0.001656
  ('Middlemarch_by_George_Eliot.txt', 'henry'): TF = 0.000041


                                                                                

In [4]:
# Calcular ahora el IDF:
# IDF o peso = log(total_documentos / documentos_que_contienen_palabra)

# 1. Contar total de documentos para tenerlo en una variable
total_docs = doc_totals.count()
print(f"- Total de documentos: {total_docs}")

# 2. Contar en cuántos documentos aparece cada palabra
# Estructura así queda: (palabra, 1) -> (palabra, num_docs)
rdd_word_doc_count = rdd_parsed.map(lambda x: (x[1], x[0])) \
                               .distinct() \
                               .map(lambda x: (x[0], 1)) \
                               .reduceByKey(lambda a, b: a + b)

#Comprobar que se hizo bien:
print("\n- Documentos por palabra (primeros 10):")
for item in rdd_word_doc_count.take(10):
    print(f"  '{item[0]}': aparece en {item[1]} documentos")

# Ahora sí calcular IDF
# 3. Calcular IDF
rdd_idf = rdd_word_doc_count.map(lambda x: (x[0], math.log(total_docs / x[1])))

#Verificamos que todo bien:
print("\n- IDF calculado (primeros 10):")
for item in rdd_idf.take(10):
    print(f"  '{item[0]}': IDF = {item[1]:.4f}")


- Total de documentos: 99

- Documentos por palabra (primeros 10):


[Stage 10:>                                                         (0 + 2) / 2]

  'plato': aparece en 23 documentos
  'translated': aparece en 50 documentos
  'benjamin': aparece en 16 documentos
  'see': aparece en 97 documentos
  '150': aparece en 24 documentos
  'contents': aparece en 87 documentos
  'introduction': aparece en 67 documentos
  'analysis': aparece en 33 documentos
  'persons': aparece en 81 documentos
  'iii': aparece en 77 documentos

- IDF calculado (primeros 10):
  'plato': IDF = 1.4596
  'translated': IDF = 0.6831
  'benjamin': IDF = 1.8225
  'see': IDF = 0.0204
  '150': IDF = 1.4171
  'contents': IDF = 0.1292
  'introduction': IDF = 0.3904
  'analysis': IDF = 1.0986
  'persons': IDF = 0.2007
  'iii': IDF = 0.2513


                                                                                

In [5]:
# Ahora sí calculamos el TF-IDF = TF * IDF

# Preparar TF, lo transformamos: ((doc, palabra), tf) -> (palabra, (doc, tf))
rdd_tf_prep = rdd_tf.map(lambda x: (x[0][1], (x[0][0], x[1])))

# Join con IDF: (palabra, ((doc, tf), idf))
rdd_tf_idf = rdd_tf_prep.join(rdd_idf) \
                        .map(lambda x: ((x[1][0][0], x[0]), x[1][0][1] * x[1][1]))
# Verificamos
print("- TF-IDF calculado (primeros 10):")
for item in rdd_tf_idf.take(10):
    print(f"  Doc: {item[0][0][:40]}... | Palabra: '{item[0][1]}' | TF-IDF: {item[1]:.6f}")

# Guardar TF-IDF en nuestra carpetita de procesados
output_tfidf = "../data/processed/tfidf_rdd"
if os.path.exists(output_tfidf):
    shutil.rmtree(output_tfidf)
rdd_tf_idf.saveAsTextFile(output_tfidf)

print(f"\n- TF-IDF guardado en: {output_tfidf} con rotundo exito")

- TF-IDF calculado (primeros 10):


                                                                                

  Doc: Middlemarch_by_George_Eliot.txt... | Palabra: 'boston' | TF-IDF: 0.000010
  Doc: Golden_Days_for_Boys_and_Girls,_Vol._XII... | Palabra: 'boston' | TF-IDF: 0.000426
  Doc: Precious_balms.txt... | Palabra: 'boston' | TF-IDF: 0.000322
  Doc: War_and_Peace_by_graf_Leo_Tolstoy.txt... | Palabra: 'boston' | TF-IDF: 0.000051
  Doc: How_to_Observe__Morals_and_Manners_by_Ha... | Palabra: 'boston' | TF-IDF: 0.000049
  Doc: The_Great_Gatsby_by_F._Scott_Fitzgerald.... | Palabra: 'boston' | TF-IDF: 0.000067
  Doc: The_2003_CIA_World_Factbook_by_United_St... | Palabra: 'boston' | TF-IDF: 0.000100
  Doc: Narrative_of_the_Life_of_Frederick_Dougl... | Palabra: 'boston' | TF-IDF: 0.000246
  Doc: The_Scarlet_Letter_by_Nathaniel_Hawthorn... | Palabra: 'boston' | TF-IDF: 0.000407
  Doc: The_Souls_of_Black_Folk_by_W._E._B._Du_B... | Palabra: 'boston' | TF-IDF: 0.000128





- TF-IDF guardado en: ../data/processed/tfidf_rdd con rotundo exito


                                                                                

In [6]:
# Convertir TF-IDF a vectores por documento para usarlos en pasos posteriores

# 1. Crear vocabulario global con índices
vocab_global = rdd_tf_idf.map(lambda x: x[0][1]).distinct().zipWithIndex()
vocab_dict = vocab_global.collectAsMap()
vocab_size = len(vocab_dict)

#Imprimimos para ver que todo esté bien:
print(f"- Tamaño del vocabulario: {vocab_size} palabras únicas")
print(f"\n- Primeras 10 palabras del vocabulario:")
for word, idx in list(vocab_dict.items())[:10]:
    print(f"  '{word}': índice {idx}")

# 2. Convertir TF-IDF a formato (doc, [(idx, tfidf), ...])
#Formato: (documento, [(índice_palabra, tfidf), ...])
rdd_doc_vectors = rdd_tf_idf.map(lambda x: (x[0][0], (vocab_dict[x[0][1]], x[1]))) \
                            .groupByKey() \
                            .map(lambda x: (x[0], list(x[1])))

print(f"\n- Vectores creados para {rdd_doc_vectors.count()} documentos")
print("\n- Primer documento (primeras 10 dimensiones):")
first_doc = rdd_doc_vectors.first()
print(f"  Documento: {first_doc[0][:50]}...")
print(f"  Vector (primeros 10): {first_doc[1][:10]}")

                                                                                

- Tamaño del vocabulario: 120381 palabras únicas

- Primeras 10 palabras del vocabulario:
  'boston': índice 0
  'husband': índice 1
  'blessed': índice 2
  'miss': índice 3
  'iii': índice 4
  'old': índice 5
  'xxi': índice 6
  'xxv': índice 7
  'three': índice 8
  'love': índice 9





- Vectores creados para 99 documentos

- Primer documento (primeras 10 dimensiones):
  Documento: Golden_Days_for_Boys_and_Girls,_Vol._XII,_Jan._3,_...
  Vector (primeros 10): [(0, 0.00042555406127492234), (1, 1.5735175681184916e-05), (2, 2.717984851291704e-05), (3, 2.388931302330423e-05), (4, 1.9706298775261205e-05), (5, 0.00011965729363756082), (8, 8.127510054824112e-05), (9, 3.30354989483807e-05), (19, 4.7205527043554754e-05), (20, 2.8308111466896454e-05)]


                                                                                

In [7]:
#Calculamos ahora la similitud por coseno:

def cosine_similarity_sparse(vec1, vec2):
    """
    Calcula similitud coseno entre dos vectores dispersos
    vec1, vec2: lista de tuplas [(índice, valor), ...]
    """
    # Convertir a diccionarios para búsqueda rápida
    dict1 = dict(vec1)
    dict2 = dict(vec2)
    
    # Producto punto
    dot_product = sum(dict1.get(idx, 0) * dict2.get(idx, 0) 
                      for idx in set(dict1.keys()) | set(dict2.keys()))
    
    # Normas (La parte de abajo de la formula [||V||*||W||])
    norm1 = math.sqrt(sum(v**2 for v in dict1.values()))
    norm2 = math.sqrt(sum(v**2 for v in dict2.values()))
    
    if norm1 == 0 or norm2 == 0:
        return 0.0
    
    return dot_product / (norm1 * norm2)

# Broadcast de vectores para comparación eficiente
doc_vectors_list = rdd_doc_vectors.collect()
print(f"- Vectores recolectados: {len(doc_vectors_list)} documentos")

                                                                                

- Vectores recolectados: 99 documentos


In [8]:
# Crear todas las combinaciones de pares de documentos
from itertools import combinations

print("- Calculando similitudes entre todos los pares...")

# Generar pares y calcular similitudes
similarities = []
doc_names = [doc[0] for doc in doc_vectors_list]
total_pairs = len(doc_names) * (len(doc_names) - 1) // 2

print(f"- Total de pares a calcular: {total_pairs}")

#Comparamos cada documento con todos los demás
for i, (doc1, vec1) in enumerate(doc_vectors_list):
    for doc2, vec2 in doc_vectors_list[i+1:]:
        sim = cosine_similarity_sparse(vec1, vec2)
        similarities.append((doc1, doc2, sim))
    
    if (i + 1) % 10 == 0:
        print(f"  Procesados: {i+1}/{len(doc_vectors_list)} documentos...")

# Convertir a RDD para guardarlo después
rdd_similarities = sc.parallelize(similarities)

print(f"\n- Similitudes calculadas: {rdd_similarities.count()} pares")
print("\n- Top 10 pares más similares:")

#Guardamos: (doc1, doc2, similitud)
top_similar = rdd_similarities.sortBy(lambda x: x[2], ascending=False).take(10)
for doc1, doc2, sim in top_similar:
    print(f"  {sim:.4f} | {doc1[:40]}... ↔ {doc2[:40]}...")

- Calculando similitudes entre todos los pares...
- Total de pares a calcular: 4851
  Procesados: 10/99 documentos...
  Procesados: 20/99 documentos...
  Procesados: 30/99 documentos...
  Procesados: 40/99 documentos...
  Procesados: 50/99 documentos...
  Procesados: 60/99 documentos...
  Procesados: 70/99 documentos...
  Procesados: 80/99 documentos...
  Procesados: 90/99 documentos...

- Similitudes calculadas: 4851 pares

- Top 10 pares más similares:
  0.9990 | A_Christmas_Carol_in_Prose;_Being_a_Ghos... ↔ A_Christmas_Carol_by_Charles_Dickens.txt...
  0.9958 | Little_Women;_Or,_Meg,_Jo,_Beth,_and_Amy... ↔ Little_Women_by_Louisa_May_Alcott.txt...
  0.5293 | The_Confessions_of_St._Augustine_by_Bish... ↔ Paradise_Lost_by_John_Milton.txt...
  0.5235 | The_2003_CIA_World_Factbook_by_United_St... ↔ The_2006_CIA_World_Factbook_by_United_St...
  0.4415 | The_Adventures_of_Sherlock_Holmes_by_Art... ↔ The_Hound_of_the_Baskervilles_by_Arthur_...
  0.4249 | The_Confessions_of_St._Augustine_by_

In [9]:
# Guardar matriz ahora sí en nuestra carpeta de procesados
output_sim = "../data/processed/similarity_matrix_rdd"
if os.path.exists(output_sim):
    shutil.rmtree(output_sim)

rdd_similarities.saveAsTextFile(output_sim)
print(f"- Matriz de similitudes guardada en: {output_sim}")

- Matriz de similitudes guardada en: ../data/processed/similarity_matrix_rdd


In [10]:
sc.stop()