# 0. Configuración de librerias y requisitos previos

In [166]:
"""
Se importan las librerías necesarias para el procesamiento de texto y manejo de archivos.

- nltk.tokenize.regexp_tokenize: para tokenización basada en expresiones regulares.
- nltk.stem.SnowballStemmer: para aplicar stemming en inglés.
- xml.etree.ElementTree: para parseo de archivos en formato XML.
- collections: para estructuras de datos con valores por defecto y operaciones de conteo.
- os: para operaciones relacionadas con el sistema de archivos.
- typing: para anotaciones de tipo.
- math: para operaciones matemáticas.
- numpy: manejo del tipado de las métricas.
"""

from nltk.tokenize import regexp_tokenize
from nltk.stem import SnowballStemmer

import xml.etree.ElementTree as ET

from collections import Counter, defaultdict

from typing import List, Dict, Union, Set, Tuple

import numpy as np

import math

import os

In [167]:
"""
Carga de stopwords desde un archivo local.

- Evita la descarga de NLTK y permite usar una lista personalizada de palabras vacías.
- Se construye el conjunto `stop_words` para filtrar palabras vacías durante el preprocesamiento.
"""

# NO cambie esta ruta
path_stopwords = "../data/stopwords/english"

def load_stopwords(
        file_path: str
    ) -> Set[str]:
    
    """
    Carga stopwords desde un archivo de texto plano.

    Cada línea del archivo debe contener una palabra. La función convierte
    todas las palabras a minúsculas y elimina líneas vacías.

    Parámetros
    ----------
    file_path : str
        Ruta del archivo de stopwords.

    Retorna
    -------
    Set[str]
        Conjunto de palabras que deben considerarse stopwords.
    """
    
    with open(file_path, "r", encoding = "utf-8") as file:
        stop_words = {line.strip().lower() for line in file if line.strip()}
    return stop_words

stop_words = load_stopwords(path_stopwords)

In [168]:
"""
Configuración del stemmer y definición del patrón de tokenización.

- Se inicializa `SnowballStemmer("english")` para reducir las palabras a su raíz 
  durante el preprocesamiento.
- Se define la expresión regular `pattern` que controla cómo se segmenta el texto
  en tokens. Incluye reglas para:
    * Abreviaturas (e.g., U.S.A.)
    * Palabras con guiones internos (e.g., non-euclidean)
    * Monedas, números y porcentajes
    * Puntos suspensivos (...)
    * Signos de puntuación como tokens separados
"""

stemmer = SnowballStemmer("english")

pattern = r'''(?x)              
    (?:[A-Z]\.)+[A-Z]?          # abreviaturas, e.g. U.S.A.
  | [A-Za-z]+(?:-[A-Za-z]+)*    # palabras con guiones internos, e.g. non-euclidean
  | \$?\d+(?:\.\d+)?%?          # monedas, números, porcentajes
  | \.\.\.                      # puntos suspensivos
  | [][.,;"'?():_`-]            # puntuación como tokens separados
'''

# 1. Preprocesamiento del texto

In [169]:
def preprocess(
		text: str
	) -> List[str]:
    
	"""
    Preprocesa un texto aplicando tokenización, normalización, eliminación de stopwords y stemming.

    El proceso de preprocesamiento transforma un texto crudo en una lista de tokens
    limpios y estandarizados, listos para tareas de recuperación de información o NLP.

    Pasos:
        1. Tokenización: se divide el texto en palabras usando la expresión regular `pattern`.
        2. Normalización: se convierten todos los tokens a minúsculas.
        3. Eliminación de stopwords: se filtran palabras vacías presentes en `stop_words`.
        4. Stemming: se aplica `SnowballStemmer` para reducir los tokens a su raíz.

    Parámetros
    ----------
    text : str
        Texto de entrada a procesar.

    Retorna
    -------
    List[str]
        Lista de tokens preprocesados.
    """
      
	# Generación de tokens
	tokens = regexp_tokenize(text, pattern)
	# Normalización 
	tokens = [token.lower() for token in tokens]
	# Eliminación de palabras de parada
	tokens = [token for token in tokens if token not in stop_words]
	# Stemming 
	tokens = [stemmer.stem(token) for token in tokens]
	return tokens

In [170]:
"""
Ejemplo de ejecución del preprocesamiento de texto.

Se procesa un texto de ejemplo que incluye números, signos de puntuación y abreviaturas. 
"""

example_text = "Se deben invertir $12.000 millones en D.I.A ..."

print("--------** preprocess **--------\n")
print("Parametros:\n")
print("- Texto de entrada = ", example_text)
print("\nResultado:\n")
print("- Tokens procesados = ", preprocess(example_text))


--------** preprocess **--------

Parametros:

- Texto de entrada =  Se deben invertir $12.000 millones en D.I.A ...

Resultado:

- Tokens procesados =  ['se', 'deben', 'invertir', '$12.000', 'millon', 'en', 'd.i.a', '...']


# 2. Carga de documentos

In [171]:
"""
Ruta de los documentos a procesar.

- `path_docs` indica la carpeta donde se encuentran los archivos de texto crudos.
- Cambie esta ruta según la ubicación de sus documentos en su sistema.
"""

# CAMBIE esta ruta según la ubicación de los archivos en su sistema
path_docs = "../data/docs-raw-texts"

In [172]:
def load_documents(
		path_docs: str
	) -> Dict[str, List[str]]:

	"""
    Carga y preprocesa los documentos de una carpeta específica.

    La función recorre todos los archivos con extensión `.naf` en la ruta
    especificada, extrae el ID del documento, el título y el contenido crudo,
    concatena título y contenido, y aplica la función `preprocess` para 
    obtener los tokens preprocesados de cada documento.

    Parámetros
    ----------
    path_docs : str
        Ruta de la carpeta que contiene los archivos `.naf` a procesar.
        Cambie esta ruta según la ubicación de sus documentos.

    Retorna
    -------
    Dict[str, List[str]]
        Diccionario donde las claves son los `doc_id` de cada documento y
        los valores son listas de tokens preprocesados.
    """
	
	docs = {}
	for file_name in os.listdir(path_docs):
		# Solo se revisan los archivos con extensión correcta
		if not file_name.endswith(".naf"):
			continue
		tree = ET.parse(os.path.join(path_docs, file_name))
		root = tree.getroot()
		# Id recuperado desde el atributo pupblicId
		doc_id = root.find("nafHeader/public").attrib["publicId"]
		# Titulo desde el atributo title
		title = root.find("nafHeader/fileDesc").attrib.get("title", "")
		# Contenido desde <raw>
		raw_element = root.find("raw")
		content = raw_element.text if raw_element is not None else ""
		# concatenar titulo + contenido para preprocesar
		tokens = preprocess(title +  " " + content)
		docs[doc_id] = tokens
	return docs

In [173]:
"""
Ejemplo de carga y preprocesamiento de documentos.

Se utiliza la función `load_documents` para cargar todos los archivos `.naf`
de la carpeta especificada en `path_docs`. Cada documento se preprocesa y se almacena
en un diccionario donde la clave es el `doc_id` y el valor es la lista de tokens.

Luego se muestra el contenido preprocesado de un documento específico
con ID "d006".
"""

documents = load_documents(path_docs)
example_id = "d006"

print("--------** load documents **--------\n")
print("Parametros:\n")
print("- Ruta de los documentos = ", path_docs)
print("\nResultado:\n")
print("- Documentos cargados = ", len(documents))
print("- Ejemplo documento " + example_id + " = ", documents[example_id])

--------** load documents **--------

Parametros:

- Ruta de los documentos =  ../data/docs-raw-texts

Resultado:

- Documentos cargados =  331
- Ejemplo documento d006 =  ['eugenio', 'beltrami', 'non-euclidian', 'geometri', 'eugenio', 'beltrami', 'non-euclidian', 'geometri', '.', 'eugenio', 'beltrami', '(', '1835', '-', '1900', ')', '.', 'novemb', '16', ',', '1835', ',', 'italian', 'mathematician', 'eugenio', 'beltrami', 'born', '.', 'notabl', 'work', 'concern', 'differenti', 'geometri', 'mathemat', 'physic', '.', 'work', 'note', 'especi', 'clariti', 'exposit', '.', 'first', 'prove', 'consist', 'non-euclidean', 'geometri', 'model', 'surfac', 'constant', 'curvatur', ',', 'pseudospher', '.', 'eugenio', 'beltrami', 'born', 'cremona', 'lombardi', ',', 'part', 'austrian', 'empir', ',', 'part', 'itali', '.', 'son', 'artist', 'paint', 'miniatur', ',', 'young', 'eugenio', 'certain', 'inherit', 'artist', 'talent', 'famili', ',', 'case', 'addit', 'mathemat', 'talent', 'would', 'acquir', ',', 'm

# 3. Construcción del índice invertido

In [174]:
def build_inverted_index(
		docs: Dict[str, List[str]]
	) -> Dict[str, Dict[str, Union[int, List[str]]]]:
    
	"""
    Construye un índice invertido a partir de un conjunto de documentos preprocesados.

    El índice invertido permite recuperar rápidamente qué documentos contienen
    cada token. Para cada token, se almacena:
        - 'df': la frecuencia de documentos (número de documentos que contienen el token).
        - 'postings': la lista ordenada de IDs de documentos donde aparece el token.

    Parámetros
    ----------
    docs : Dict[str, List[str]]
        Diccionario de documentos preprocesados. Las claves son `doc_id` y los
        valores son listas de tokens obtenidos con la función `preprocess`.

    Retorna
    -------
    Dict[str, Dict[str, Union[int, List[str]]]]
        Diccionario donde cada clave es un token y su valor es un diccionario
        con la frecuencia de documentos ('df') y la lista de postings ('postings').
    """
     
	inverted = defaultdict(set)
	for doc_id, tokens in docs.items():
		for token in tokens:
			inverted[token].add(doc_id)
	# Convertir sets a listas ordenadas
	for token in inverted:
		inverted[token] = {"df" : len(inverted[token]), "postings" : sorted(inverted[token])}
	return inverted

In [175]:
"""
Ejemplo de construcción del índice invertido.

Se utiliza la función `build_inverted_index` para generar un índice invertido
a partir del diccionario `documents` preprocesados. El índice permite consultar
rápidamente en qué documentos aparece cada token y su frecuencia de documentos (df).

Luego se muestra un ejemplo de token presente en el índice.
"""

inverted_index = build_inverted_index(documents)
example_token = "play"

print("--------** build inverted index **--------\n")
print("Parámetros:\n")
print("- Documentos = ", documents)
print("\nResultado:\n")
print("- Número de tokens en el índice = ", len(inverted_index))
print(f"- Ejemplo token " + example_token + " = ", inverted_index.get(example_token))

--------** build inverted index **--------

Parámetros:

- Documentos =  {'d210': ['joseph', 'weizenbaum', 'famous', 'eliza', 'joseph', 'weizenbaum', 'famous', 'eliza', '.', 'joseph', 'weizenbaum', '(', '1923', '-', '2008', ')', 'photo', ':', 'ulrich', 'hansen', '.', 'januari', '8', ',', '1923', ',', 'comput', 'scientist', 'joseph', 'weizenbaum', ',', 'pioneer', 'natur', 'languag', 'process', 'artifici', 'intellig', ',', 'later', 'becam', 'one', 'artifici', 'intellig', 'lead', 'critic', ',', 'born', '.', '1966', 'publish', 'simpl', 'program', 'name', 'eliza', ',', 'involv', 'user', 'convers', 'bore', 'strike', 'resembl', 'one', 'psychologist', '.', 'joseph', 'weizenbaum', 'born', 'berlin', 'jewish', 'parent', 'januari', '8', ',', '1923', '.', 'abl', 'escap', 'nazi', 'germani', 'januari', '1936', ',', 'emigr', 'famili', 'unit', 'state', ',', 'start', 'studi', 'mathemat', 'wayn', 'state', 'univers', 'detroit', '1941', '.', 'howev', ',', 'studi', 'interrupt', 'war', ',', 'serv', 'militari

# 4. Representación vectorial Tf-IDf

In [176]:
def build_tf_index(
		documents: Dict[str, List[str]]
	) -> Dict[str, Dict[str, int]]:
    
	"""
    Construye un índice de frecuencia de términos (TF) a partir de los documentos preprocesados.

    La función recorre todos los documentos proporcionados y cuenta la frecuencia de
    cada token dentro de cada documento. El resultado es un diccionario donde cada
    documento tiene un subdiccionario que representa la frecuencia de cada término.

    Parámetros
    ----------
    documents : Dict[str, List[str]]
        Diccionario donde las claves son los `doc_id` de los documentos y los valores
        son listas de tokens preprocesados de cada documento, tal como devuelve
        la función `load_documents`.

    Retorna
    -------
    Dict[str, Dict[str, int]]
        Diccionario donde las claves son los `doc_id` de los documentos y los valores
        son diccionarios que mapean cada token a su frecuencia dentro del documento.
        Por ejemplo: { "doc1": {"palabra1": 3, "palabra2": 5}, ... }.
    """
     
	tf_index = {}
	# Conteo de la frecuencia de los términos dentro de cada documento
	for doc_id, tokens in documents.items():
		tf_index[doc_id] = dict(Counter(tokens))
	return tf_index

In [177]:
"""
Ejemplo de construcción del índice de frecuencia de términos (TF).

Se utiliza la función `build_tf_index` para generar un índice que contiene
la frecuencia de cada token dentro de cada documento preprocesado en `documents`.

Luego se muestra el subdiccionario correspondiente a un documento específico
con ID "d006", mostrando cuántas veces aparece cada token en ese documento.
"""

tf_index = build_tf_index(documents)
example_id = "d006"

print("--------** build TF index **--------\n")
print("Parametros:\n")
print("- documentos = ", documents)
print("\nResultado:\n")
print("- Índice TF generado = ", tf_index)
print("- Ejemplo de frecuencias del documento " + example_id + " = ", tf_index[example_id])


--------** build TF index **--------

Parametros:

- documentos =  {'d210': ['joseph', 'weizenbaum', 'famous', 'eliza', 'joseph', 'weizenbaum', 'famous', 'eliza', '.', 'joseph', 'weizenbaum', '(', '1923', '-', '2008', ')', 'photo', ':', 'ulrich', 'hansen', '.', 'januari', '8', ',', '1923', ',', 'comput', 'scientist', 'joseph', 'weizenbaum', ',', 'pioneer', 'natur', 'languag', 'process', 'artifici', 'intellig', ',', 'later', 'becam', 'one', 'artifici', 'intellig', 'lead', 'critic', ',', 'born', '.', '1966', 'publish', 'simpl', 'program', 'name', 'eliza', ',', 'involv', 'user', 'convers', 'bore', 'strike', 'resembl', 'one', 'psychologist', '.', 'joseph', 'weizenbaum', 'born', 'berlin', 'jewish', 'parent', 'januari', '8', ',', '1923', '.', 'abl', 'escap', 'nazi', 'germani', 'januari', '1936', ',', 'emigr', 'famili', 'unit', 'state', ',', 'start', 'studi', 'mathemat', 'wayn', 'state', 'univers', 'detroit', '1941', '.', 'howev', ',', 'studi', 'interrupt', 'war', ',', 'serv', 'militari', 'me

In [178]:
def compute_tfidf_vector(
		doc_id: str, 
		tf_index: Dict[str, Dict[str, int]], 
		inverted_index: Dict[str, Dict[str, Union[int, List[str]]]], 
		num_documents: int
	) -> Dict[str, float]:
	
	"""
	Construye el vector TF-IDF de un documento específico.

	La función recibe el identificador de un documento junto con el índice de 
	frecuencias (TF), el índice invertido y el número total de documentos. 
	Para cada token del documento, se calcula el peso TF-IDF aplicando la 
	fórmula:

		TF-IDF = log10(frecuencia + 1) * log10(N / df)

	donde N es el número total de documentos y df es la frecuencia de 
	documentos en que aparece el token.

	Parámetros
	----------
	doc_id : str
		Identificador del documento para el cual se construirá el vector TF-IDF.
	tf_index : Dict[str, Dict[str, int]]
		Índice de frecuencias de términos por documento. Las claves son los 
		`doc_id` y los valores son diccionarios de tokens y sus frecuencias.
	inverted_index : Dict[str, Dict[str, Union[int, List[str]]]]
		Índice invertido que contiene, para cada token, su frecuencia de 
		documento (`df`) y la lista de documentos en los que aparece.
	num_documents : int
		Número total de documentos en la colección.

	Retorna
	-------
	Dict[str, float]
		Vector TF-IDF representado como un diccionario disperso, donde las claves 
		son los tokens del documento y los valores son los pesos TF-IDF 
		correspondientes.
	"""

	# Frecuencia de términos del documento
	doc_tf = tf_index[doc_id]
	# Se modela el vector como un diccionario debido a que es muy dispero
	tf_idf = {}
	for token, frequency in doc_tf.items():
		# Para evitar errores, se revisa que el token este en el índice invertido
		if token in inverted_index:
			log_tf = math.log10(frequency + 1)
			idf = math.log10(num_documents / inverted_index[token]["df"])
			tf_idf[token] = log_tf * idf
	return tf_idf

In [179]:
"""
Ejemplo de construcción del vector TF-IDF de un documento.

Se utiliza la función `compute_tfidf_vector` para calcular el vector TF-IDF
del documento con ID "d006". El cálculo se realiza a partir del índice de 
frecuencias (TF), el índice invertido y el número total de documentos.

El resultado es un diccionario disperso donde las claves son los tokens del
documento y los valores son sus respectivos pesos TF-IDF.
"""

example_id = "d006"
num_documents = len(documents)

tfidf_vector = compute_tfidf_vector(example_id, tf_index, inverted_index, num_documents)

print("--------** compute TF-IDF vector **--------\n")
print("Parámetros:\n")
print("- Documento = ", example_id)
print("- Número total de documentos = ", num_documents)
print("\nResultado:\n")
print("- Vector TF-IDF del documento " + example_id + " (primeros 10 términos):\n")

# Mostrar solo los primeros 10 términos para mayor legibilidad
for token, weight in list(tfidf_vector.items())[:10]:
    print(f"  {token}: {weight:.4f}")


--------** compute TF-IDF vector **--------

Parámetros:

- Documento =  d006
- Número total de documentos =  331

Resultado:

- Vector TF-IDF del documento d006 (primeros 10 términos):

  eugenio: 1.8751
  beltrami: 2.9337
  non-euclidian: 1.4278
  geometri: 1.4872
  .: 0.0000
  (: 0.0508
  1835: 0.7054
  -: 0.1468
  1900: 0.4135
  ): 0.0508


In [180]:
def build_query_vector(
        query_tokens: List[str], 
        inverted_index: Dict[str, Dict[str, Union[int, List[str]]]],
        num_documents: int
    ) -> Dict[str, float]:
    
	"""
    Construye el vector TF-IDF de una consulta.

    La función recibe los tokens de una consulta y calcula un vector TF-IDF
    en el mismo espacio vectorial que los documentos de la colección. 
    Primero se cuentan las frecuencias de los tokens de la consulta, y luego 
    se aplica la fórmula:

        TF-IDF = log10(frecuencia + 1) * log10(N / df)

    donde N es el número total de documentos y df es la cantidad de documentos
    en los que aparece el término. Solo se incluyen en el vector los tokens
    presentes en el índice invertido de la colección.

    Parámetros
    ----------
    query_tokens : List[str]
        Lista de tokens preprocesados que forman la consulta.
    inverted_index : Dict[str, Dict[str, Union[int, List[str]]]]
        Índice invertido de la colección. Para cada token, contiene su 
        frecuencia de documento (`df`) y la lista de documentos en que aparece.
    num_documents : int
        Número total de documentos en la colección.

    Retorna
    -------
    Dict[str, float]
        Vector TF-IDF de la consulta, representado como un diccionario disperso.
        Las claves son tokens de la consulta que están en la colección, y los
        valores son los pesos TF-IDF correspondientes.
    """
    
	tf_query = {}
	# Calculo de las frecuencias de los tokens
	for token in query_tokens:
		tf_query[token] = tf_query.get(token, 0) + 1
	# Se modela el vector como un diccionario debido a que es muy disperso
	tfidf_vector = {}
	for token, freq in tf_query.items():
		# Manejo de palabras fuera del vocabulario
		if token in inverted_index:
			log_tf = math.log10(freq + 1)
			idf = math.log10(num_documents / inverted_index[token]["df"])
			tfidf_vector[token] = log_tf * idf
	return tfidf_vector

In [181]:
def compute_tfidf_index(
		tf_index: Dict[str, Dict[str, int]], 
		inverted_index: Dict[str, Dict[str, Union[int, List[str]]]], 
		num_documents: int
	) -> Dict[str, Dict[str, float]]:
	
	"""
	Construye el índice TF-IDF completo de la colección de documentos.

	La función recorre todos los documentos presentes en el índice de frecuencias
	(TF) y, para cada uno, calcula su vector TF-IDF utilizando la función
	`compute_tfidf_vector`. El resultado es un diccionario donde cada documento
	está representado por un subdiccionario de tokens y sus respectivos pesos
	TF-IDF.

	Parámetros
	----------
	tf_index : Dict[str, Dict[str, int]]
		Índice de frecuencias de términos (TF). Las claves son los `doc_id` de los
		documentos y los valores son diccionarios de tokens y sus frecuencias.
	inverted_index : Dict[str, Dict[str, Union[int, List[str]]]]
		Índice invertido que contiene, para cada token, su frecuencia de documento
		(`df`) y la lista de documentos en los que aparece.
	num_documents : int
		Número total de documentos en la colección.

	Retorna
	-------
	Dict[str, Dict[str, float]]
		Índice TF-IDF de la colección. Las claves son los `doc_id` de cada documento
		y los valores son diccionarios que representan los vectores TF-IDF de cada
		documento. Por ejemplo:
		{
			"d006": {"token1": 0.34, "token2": 0.12, ...},
			"d007": {"token3": 0.45, "token1": 0.05, ...},
			...
		}
	"""

	tfidf_index = {}
	for doc_id in tf_index.keys():
		# Calculo del vector para cada documento de la colección de documentos
		tfidf_index[doc_id] = compute_tfidf_vector(doc_id, tf_index, inverted_index, num_documents)
	return tfidf_index

In [182]:
"""
Ejemplo de construcción del índice TF-IDF completo.

Se utiliza la función `compute_tfidf_index` para generar el índice TF-IDF
de toda la colección de documentos. Este índice contiene, para cada documento,
su vector TF-IDF representado como un diccionario de tokens y pesos.

Luego se muestra el vector TF-IDF correspondiente a un documento específico
con ID "d006".
"""

tfidf_index = compute_tfidf_index(tf_index, inverted_index, num_documents)

print("--------** compute TF-IDF index **--------\n")
print("Parámetros:\n")
print("- Número total de documentos =", num_documents)
print("- Índice TF =", tf_index)
print("- Índice invertido =", inverted_index)
print("\nResultado:\n")
print("- Cantidad de vectores TF-IDF generados =", len(tfidf_index))
print("- Ejemplo del vector TF-IDF del documento d006 (primeros 10 términos):\n")

# Mostrar solo los primeros 10 términos para mayor legibilidad
for token, weight in list(tfidf_index["d006"].items())[:10]:
    print(f"  {token}: {weight:.4f}")


--------** compute TF-IDF index **--------

Parámetros:

- Número total de documentos = 331
- Índice TF = {'d210': {'joseph': 13, 'weizenbaum': 19, 'famous': 2, 'eliza': 9, '.': 34, '(': 4, '1923': 3, '-': 2, '2008': 2, ')': 4, 'photo': 1, ':': 10, 'ulrich': 1, 'hansen': 1, 'januari': 3, '8': 2, ',': 48, 'comput': 16, 'scientist': 2, 'pioneer': 2, 'natur': 2, 'languag': 3, 'process': 3, 'artifici': 3, 'intellig': 3, 'later': 2, 'becam': 2, 'one': 2, 'lead': 1, 'critic': 2, 'born': 2, '1966': 3, 'publish': 2, 'simpl': 3, 'program': 7, 'name': 2, 'involv': 1, 'user': 2, 'convers': 2, 'bore': 1, 'strike': 1, 'resembl': 1, 'psychologist': 1, 'berlin': 3, 'jewish': 1, 'parent': 1, 'abl': 4, 'escap': 1, 'nazi': 1, 'germani': 1, '1936': 1, 'emigr': 1, 'famili': 1, 'unit': 1, 'state': 3, 'start': 1, 'studi': 3, 'mathemat': 2, 'wayn': 2, 'univers': 6, 'detroit': 1, '1941': 1, 'howev': 3, 'interrupt': 1, 'war': 1, 'serv': 1, 'militari': 1, 'meteorolog': 1, 'servic': 1, 'air': 1, 'forc': 1, '1946

In [183]:
def compute_cosine_similarity(
		vector_1: Dict[str, float], 
		vector_2: Dict[str, float]
	) -> float:
    
	"""
    Calcula la similitud coseno entre dos vectores dispersos representados como diccionarios.

    La similitud coseno mide el grado de semejanza entre dos documentos (o vectores
    de características) calculando el coseno del ángulo entre ellos. Su valor
    oscila entre 0 y 1, donde 1 indica que los vectores son idénticos en dirección,
    y valores cercanos a 0 indican poca o ninguna similitud.

    La fórmula aplicada es:

        cos(θ) = (A · B) / (||A|| * ||B||)

    donde A · B es el producto punto de los vectores, y ||A|| y ||B|| son las
    magnitudes (normas euclidianas) de cada vector.

    Parámetros
    ----------
    vector_1 : Dict[str, float]
        Primer vector, representado como un diccionario donde las claves son tokens
        y los valores son los pesos (por ejemplo, TF-IDF).
    vector_2 : Dict[str, float]
        Segundo vector en el mismo formato que `vector_1`.

    Retorna
    -------
    float
        Valor de similitud coseno entre `vector_1` y `vector_2`. Un valor entre
        0.0 y 1.0.
    """
	
	# Calculo del producto punto
	dot_product = sum(weight * vector_2.get(token, 0.0) for token, weight in vector_1.items())
	# Calculo de las normas de los vectores
	norm_1 = math.sqrt(sum(weight**2 for weight in vector_1.values()))
	norm_2 = math.sqrt(sum(weight**2 for weight in vector_2.values()))
	# Para evitar la división por cero
	if norm_1 == 0 or norm_2 == 0:
		return 0.0
	return dot_product / (norm_1 * norm_2)

In [184]:
"""
Ejemplo de cálculo de similitud coseno entre dos vectores.

Se definen manualmente dos vectores dispersos `v1` y `v2`, que podrían 
representar los pesos TF-IDF de dos documentos distintos. Cada clave es un 
token y cada valor corresponde a su peso en el documento.

La función `compute_cosine_similarity` se utiliza para calcular el valor de 
similitud coseno entre ambos vectores. El resultado es un número entre 0 y 1, 
donde valores cercanos a 1 indican que los vectores son más similares en 
dirección.

En este caso, se comparan los vectores:
- v1 = {"data": 0.3, "mining": 0.7, "ai": 0.5}
- v2 = {"data": 0.4, "ai": 0.6, "ml": 0.9}
"""

# Asuma que todas las palabras pertenecen al lenguaje
v1 = {"data": 0.3, "mining": 0.7, "ai": 0.5}
v2 = {"data": 0.4, "ai": 0.6, "ml": 0.9}

similarity = compute_cosine_similarity(v1, v2)

print("--------** compute cosine similarity **--------\n")
print("Parámetros:\n")
print("- Vector 1 =", v1)
print("- Vector 2 =", v2)
print("\nResultado:\n")
print(f"- Similitud coseno = {similarity:.4f}")


--------** compute cosine similarity **--------

Parámetros:

- Vector 1 = {'data': 0.3, 'mining': 0.7, 'ai': 0.5}
- Vector 2 = {'data': 0.4, 'ai': 0.6, 'ml': 0.9}

Resultado:

- Similitud coseno = 0.3997


In [185]:
def process_queries_ranked(
		path_queries: str,
		inverted_index: Dict[str, Dict[str, Union[int, List[str]]]],
		tfidf_index: Dict[str, Dict[str, float]],
		num_documents: int,
		output_file: str
	) -> None:
    
	with open(output_file, "w", encoding = "utf-8") as out_file:
		# Recorrido ordenado por id de query
		for fname in sorted(os.listdir(path_queries)):
			# Solo se revisan archivos válidos
			if fname.endswith(".naf"):
				file_path = os.path.join(path_queries, fname)
				tree = ET.parse(file_path)
				root = tree.getroot()
				# Extracción del id referido dentro del documento
				public_elem = root.find(".//public")
				query_id = public_elem.attrib.get("publicId")
				# Texto de consulta
				raw_elem = root.find("raw")
				content = raw_elem.text if raw_elem is not None else ""
				query_tokens = preprocess(content)
				# Construcción del vector asociado a la consulta
				query_vec = build_query_vector(query_tokens, inverted_index, num_documents)
				# Calculo de puntajes de similitud
				scores = {}
				for doc_id, doc_vec in tfidf_index.items():
					sim = compute_cosine_similarity(query_vec, doc_vec)
					if sim > 0:
						scores[doc_id] = sim
				# Ordenar documentos por score
				ranked = sorted(scores.items(), key=lambda x: x[1], reverse = True)
				# Escribir resultados en el formato requerido
				out_file.write(f"{query_id}\n")
				if ranked:
					line = ",".join([f"{doc}:{score:.4f}" for doc, score in ranked])
					out_file.write(line + "\n")

In [186]:
"""
Ejemplo de procesamiento y ranking de consultas.

Se utiliza la función `process_queries_ranked` para:
1. Leer todas las consultas en formato `.naf` desde `path_queries`. Puede cambiar la ruta.
2. Construir el vector TF-IDF de cada consulta.
3. Calcular la similitud coseno entre cada consulta y los documentos de la colección,
   usando el índice invertido `inverted_index` y el índice TF-IDF `tfidf_index`.
4. Guardar el ranking de documentos relevantes en `output_file`.

Luego se muestra el contenido del archivo de salida para verificar los resultados.
"""

# CAMBIE esta ruta según la ubicación de los archivos en su sistema
path_queries = "../data/queries-raw-texts"
output_file = "../results/RRDV-consultas_resultados.tsv"

process_queries_ranked(path_queries, inverted_index, tfidf_index, num_documents, output_file)

print("--------** process queries ranked **--------\n")
print("Parámetros:\n")
print("- Carpeta de consultas =", path_queries)
print("- Archivo de resultados =", output_file)
print("\nResultado (primeras líneas):\n")

# Mostrar las primeras 5 líneas del archivo de salida
with open(output_file, "r", encoding="utf-8") as f:
    for i, line in enumerate(f):
        print(line.strip())
        if i >= 4:
            break


--------** process queries ranked **--------

Parámetros:

- Carpeta de consultas = ../data/queries-raw-texts
- Archivo de resultados = ../results/RRDV-consultas_resultados.tsv

Resultado (primeras líneas):

q01
d259:0.1080,d085:0.0858,d186:0.0747,d254:0.0723,d016:0.0699,d170:0.0645,d154:0.0538,d153:0.0503,d008:0.0473,d209:0.0449,d296:0.0448,d163:0.0439,d215:0.0433,d089:0.0376,d060:0.0331,d185:0.0313,d100:0.0312,d094:0.0308,d006:0.0289,d315:0.0284,d004:0.0284,d243:0.0278,d179:0.0251,d099:0.0243,d145:0.0240,d039:0.0236,d162:0.0229,d312:0.0209,d059:0.0204,d065:0.0202,d299:0.0190,d077:0.0188,d329:0.0186,d273:0.0186,d082:0.0181,d028:0.0178,d311:0.0173,d317:0.0170,d212:0.0169,d195:0.0165,d284:0.0163,d130:0.0162,d265:0.0161,d152:0.0160,d255:0.0159,d281:0.0154,d136:0.0154,d164:0.0153,d074:0.0152,d032:0.0151,d184:0.0148,d172:0.0146,d275:0.0145,d123:0.0144,d052:0.0139,d316:0.0138,d038:0.0138,d021:0.0135,d024:0.0134,d234:0.0128,d229:0.0126,d116:0.0126
q02
d147:0.1277,d149:0.1103,d283:0.0811,d291

# 5. Evaluación de los resultados

In [187]:
def load_relevance_judgments(
    	path: str
	) -> Dict[str, Dict[str, int]]:
    
	"""
    Carga los juicios de relevancia desde un archivo.

    La función lee un archivo de texto donde cada línea contiene un identificador
    de consulta (`query_id`) y un conjunto de pares `doc_id:score` separados por
    comas, que indican la relevancia de cada documento respecto a la consulta.
    Los valores de relevancia suelen expresarse como enteros (por ejemplo, 0 = no
    relevante, 1 = relevante).

    El archivo debe tener el siguiente formato:

        query1    doc1:1,doc2:0,doc3:1
        query2    doc2:1,doc4:1
        ...

    Parámetros
    ----------
    path : str
        Ruta del archivo de juicios de relevancia codificados en formato TSV.
        Cada línea debe contener: `query_id <tab> doc_id:score,doc_id:score,...`

    Retorna
    -------
    Dict[str, Dict[str, int]]
        Diccionario donde las claves son los `query_id` y los valores son
        diccionarios con los `doc_id` como claves y las puntuaciones de relevancia
        como valores. Por ejemplo:

        {
            "q001": {"d006": 1, "d010": 0},
            "q002": {"d007": 1, "d011": 1}
        }
    """
    
	judgments = {}
	with open(path, encoding = "utf-8") as f:
		for line in f:
			query_id, docs_str = line.strip().split("\t")
			docs = {}
			for pair in docs_str.split(","):
				doc_id, score = pair.split(":")
				docs[doc_id] = int(score)
			judgments[query_id] = docs
	return judgments

In [188]:
"""
Ejemplo de carga de juicios de relevancia.

Se utiliza la función `load_relevance_judgments` para leer un archivo de 
juicios de relevancia en formato TSV. Cada línea del archivo contiene un 
identificador de consulta (`query_id`) y una lista de pares `doc_id:score`, 
que indican la relevancia de los documentos respecto a la consulta.

El resultado es un diccionario donde las claves son los IDs de las consultas
y los valores son diccionarios con los documentos y su puntuación de relevancia.
"""

# CAMBIE esta ruta según la ubicación de los archivos en su sistema
path_judgments = "../data/relevance-judgments.tsv"

judgments = load_relevance_judgments(path_judgments)

print("--------** load relevance judgments **--------\n")
print("Parámetros:\n")
print("- Archivo de juicios de relevancia =", path_judgments)
print("\nResultado (primeras consultas):\n")

# Mostrar solo las primeras 3 consultas
for i, (query_id, docs) in enumerate(judgments.items()):
    print(f"- Consulta {query_id}: {docs}")
    if i >= 2:
        break


--------** load relevance judgments **--------

Parámetros:

- Archivo de juicios de relevancia = ../data/relevance-judgments.tsv

Resultado (primeras consultas):

- Consulta q01: {'d186': 4, 'd254': 5, 'd016': 5}
- Consulta q02: {'d136': 2, 'd139': 2, 'd143': 4, 'd283': 4, 'd228': 4, 'd164': 4, 'd318': 2, 'd291': 4, 'd293': 4, 'd147': 2, 'd149': 2}
- Consulta q03: {'d152': 3, 'd291': 4, 'd283': 4, 'd147': 3, 'd318': 2, 'd105': 2}


In [189]:
def load_results(
        filepath: str
    ) -> dict[str, dict[str, float]]:
    
	"""
    Carga los resultados de recuperación desde un archivo.

    La función lee un archivo donde cada consulta (`query_id`) está listada en
    una línea separada, seguida por una línea que contiene los documentos
    recuperados con sus puntuaciones de similitud o relevancia.  

    El formato esperado del archivo es el siguiente:

        q001
        d259:0.1080,d085:0.0858,d134:0.0501
        q002
        d010:0.2675,d006:0.2110

    Es decir:
    - Una línea con el identificador de consulta (`qXXX`).
    - Una línea con una lista de pares `doc_id:score` separados por comas.

    Parámetros
    ----------
    filepath : str
        Ruta al archivo de resultados. Cada consulta debe estar precedida por
        su `query_id` y seguida de los documentos con sus puntuaciones.

    Retorna
    -------
    dict[str, dict[str, float]]
        Diccionario donde las claves son los `query_id` y los valores son
        diccionarios con los documentos recuperados (`doc_id`) y sus puntuaciones.
        Por ejemplo:

        {
            "q001": {"d259": 0.1080, "d085": 0.0858, "d134": 0.0501},
            "q002": {"d010": 0.2675, "d006": 0.2110}
        }
    """
    
	results = {}
	current_query = None

	with open(filepath, "r", encoding="utf-8") as f:
		for line in f:
			line = line.strip()
			if not line:
				continue
			# Si la línea empieza con qXX entonces es una nueva query
			if line.startswith("q"):
				current_query = line
				results[current_query] = {}
			else:
				pairs = line.split(",")
				for pair in pairs:
					doc_id, score = pair.split(":")
					results[current_query][doc_id] = float(score)
	return results


In [190]:
"""
Ejemplo de carga de resultados de recuperación.

Se utiliza la función `load_results` para leer un archivo con los resultados
de recuperación de consultas. El archivo contiene, para cada consulta, una 
línea con su identificador (`qXXX`) seguida de una línea con los documentos 
recuperados y sus puntuaciones de similitud.

El resultado es un diccionario donde las claves son los IDs de las consultas 
y los valores son diccionarios con los documentos y sus puntuaciones.
"""

# CAMBIE esta ruta según la ubicación de los archivos en su sistema
path_results = "../results/RRDV-consultas_resultados.tsv"

results = load_results(path_results)

print("--------** load results **--------\n")
print("Parámetros:\n")
print("- Archivo de resultados =", path_results)
print("\nResultado (primeras consultas):\n")

# Mostrar solo las 2 primeras consultas 
for i, (query_id, docs) in enumerate(results.items()):
    print(f"- Consulta {query_id}:")
    # Solo cinco documentos por consulta
    for doc_id, score in list(docs.items())[:5]:  
        print(f"   {doc_id}: {score:.4f}")
    if i >= 1:
        break


--------** load results **--------

Parámetros:

- Archivo de resultados = ../results/RRDV-consultas_resultados.tsv

Resultado (primeras consultas):

- Consulta q01:
   d259: 0.1080
   d085: 0.0858
   d186: 0.0747
   d254: 0.0723
   d016: 0.0699
- Consulta q02:
   d147: 0.1277
   d149: 0.1103
   d283: 0.0811
   d291: 0.0692
   d293: 0.0691


In [191]:
def get_relevance_lists(
        query_id: str, 
        results: List[str], 
        judgments: Dict[str, Dict[str, int]]
    ) -> Tuple[List[int], List[int], int]:
    
	"""
	Construye listas de relevancia binaria y de grados de relevancia 
	para una consulta específica.

	Parámetros
	----------
	query_id : str
		Identificador de la consulta.
	results : List[str]
		Lista ordenada de identificadores de documentos recuperados para la consulta.
	judgments : Dict[str, Dict[str, int]]
		Juicios de relevancia en formato:
		{
			query_id: {doc_id: grado_relevancia, ...},
			...
		}

	Retorna
	-------
	Tuple[List[int], List[int], int]
		- relevance_binary : lista binaria (1 si el documento es relevante, 0 en caso contrario).
		- relevance_grades : lista de grados de relevancia (entero ≥ 0).
		- num_relevant : número total de documentos relevantes para la consulta.

	Notas
	-----
	La lista `results` define el orden en que se construyen las listas de relevancia.
	Si un documento no está en los juicios de relevancia, se considera no relevante (0).
	"""

	relevance_binary = []
	relevance_grades = []
	relevant_docs = judgments.get(query_id, {})
	# Se obtiene la lista para cada documento evaluado por el sistema
	for doc_id in results:
		if doc_id in relevant_docs:
			relevance_binary.append(1)
			relevance_grades.append(relevant_docs[doc_id])
		else:
			relevance_binary.append(0)
			relevance_grades.append(0)
	return relevance_binary, relevance_grades, len(relevant_docs)


In [192]:
"""
Ejemplo de construcción de listas de relevancia.

Se utiliza la función `get_relevance_lists` para comparar los resultados 
obtenidos de la consulta `q01` contra los juicios de relevancia cargados.

El proceso retorna tres valores:
1. `relevance_binary`: lista con 1 si el documento recuperado es relevante, 0 en caso contrario.
2. `relevance_grades`: lista con los grados de relevancia (entero ≥ 0).
3. `num_relevant`: número total de documentos relevantes según el juicio.

Esto permite usar la información en la evaluación de medidas como 
Precisión, Recall, MAP o nDCG.
"""

print("--------** get relevance lists **--------\n")
print("Parámetros:\n")
print("- Query = q01\n")

binary, grades, num_relevant = get_relevance_lists("q01", results["q01"].keys(), judgments)

print("Resultado:\n")
print("- Relevancia binaria:", binary)
print("- Grados de relevancia:", grades)
print("- Total relevantes según juicios:", num_relevant)


--------** get relevance lists **--------

Parámetros:

- Query = q01

Resultado:

- Relevancia binaria: [0, 0, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
- Grados de relevancia: [0, 0, 4, 5, 5, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
- Total relevantes según juicios: 3


In [193]:
def precision_at_k(
		relevance: Union[List[int], np.ndarray], 
		k: int
	) -> float:

	"""
    Calcula la métrica de Precision@K en recuperación de información.

    Precision@K mide la proporción de documentos relevantes entre los 
    primeros K documentos recuperados. Se utiliza para evaluar la calidad 
    de los resultados en el top-K.

    Fórmula:
        Precision@K = (# documentos relevantes en top-K) / K

    Parámetros
    ----------
    relevance : List[int]
        Lista binaria donde cada posición representa un documento recuperado:
        - 1 indica que el documento es relevante.
        - 0 indica que el documento no es relevante.
    k : int
        Número de documentos del top-K a considerar. 
        Debe ser mayor que 0.

    Retorna
    -------
    float
        Valor de Precision@K en el rango [0.0, 1.0].
        Retorna 0.0 si la lista está vacía.

    Lanza
    -----
    ValueError
        Si el valor de `k` es menor o igual a 0.
    """

	# K debe ser positivo y mayor a cero para poder calcular la métrica
	if k <= 0:
		raise ValueError("k should be greater than 0")
	# Si no se recuperó ningún documento, la métrica es 0.0
	if len(relevance) == 0:
		return 0.0
	relevance = np.array(relevance, dtype = int)
	# Se limita K al número real de documentos recuperados
	k = min(k, len(relevance))
	return float(np.mean(relevance[:k]))

In [194]:
def recall_at_k(
		relevance: Union[List[int], np.ndarray], 
		number_relevant_documents: int, 
		k: int
	) -> float:

	"""
    Calcula la métrica de Recall@K en recuperación de información.

    Recall@K mide la fracción de documentos relevantes recuperados en el top-K,
    en comparación con el número total de documentos relevantes que existen en 
    la colección.

    Fórmula:
        Recall@K = (# documentos relevantes en top-K) / (total documentos relevantes en la colección)

    Parámetros
    ----------
    relevance : List[int]
        Lista binaria donde cada posición representa un documento recuperado:
        - 1 indica que el documento es relevante.
        - 0 indica que el documento no es relevante.
    number_relevant_documents : int
        Número total de documentos relevantes en la colección.
    k : int
        Número de documentos del top-K a considerar.
        Debe ser mayor que 0.

    Retorna
    -------
    float
        Valor de Recall@K en el rango [0.0, 1.0].precision(relevance_query_1)
        Retorna 0.0 si no existen documentos relevantes o si la lista está vacía.

    Lanza
    -----
    ValueError
        Si el valor de `k` es menor o igual a 0.
    """

	# K debe ser positivo y mayor a cero para poder calcular la métrica
	if k <= 0:
		raise ValueError("k should be greater than 0")
	# Si no existen documentos relevantes, la métrica es 0.0
	if number_relevant_documents <= 0:
		return 0.0
	# Si no se recuperó ningún documento, la métrica es 0.0
	if len(relevance) == 0:
		return 0.0
	relevance = np.array(relevance, dtype = int)
	# Se limita K al número real de documentos recuperados
	k = min(k, len(relevance))
	return float(np.sum(relevance[:k]) / number_relevant_documents)

In [195]:
def average_precision(
		relevance: Union[List[int], np.ndarray]
	) -> float:

	"""
    Calcula la métrica de Average Precision (AP) en recuperación de información.

    Average Precision mide la calidad de la recuperación considerando 
    no solo cuántos documentos relevantes aparecen, sino también en qué 
    posiciones se encuentran. 

    La métrica se obtiene calculando la precisión cada vez que se 
    recupera un documento relevante y luego promediando estos valores.

    Fórmula:
        AP = (1 / R) * Σ [ Precision@i * rel(i) ]
    donde:
        - R es el número total de documentos relevantes en la colección.
        - Precision@i es la precisión en la posición i.
        - rel(i) = 1 si el documento en la posición i es relevante, 0 en caso contrario.
    
	Parámetros
    ----------
    relevance : List[int]
        Lista binaria donde cada posición representa un documento recuperado:
        - 1 indica que el documento es relevante.
        - 0 indica que el documento no es relevante.
        Se asume que la lista incluye todos los documentos relevantes de la colección.

    Retorna
    -------
    float
        Valor de Average Precision en el rango [0.0, 1.0].
        Retorna 0.0 si no hay documentos relevantes en la colección.
    """

	relevance = np.array(relevance, dtype = int)
	total_relevant = np.sum(relevance)
	# Si no existen documentos relevantes, la métrica es 0.0
	if total_relevant == 0:
		return 0.0
	# Si no se recuperó ningún documento, la métrica es 0.0
	if len(relevance) == 0:
		return 0.0
	precisions = []
	# Recorrido del vector empezando las posiciones en 1
	for i, rel in enumerate(relevance, start = 1):  
		if rel == 1:
			# Calculo de Precision@i
			precisions.append(precision_at_k(relevance, i))  
	return float(np.sum(precisions) / total_relevant)

In [196]:
def mean_average_precision(
		queries_relevances: List[Union[List[int], np.ndarray]]
	) -> float:
    
	"""
    Calcula la métrica de Mean Average Precision (MAP) en recuperación de información.

    MAP es una extensión de Average Precision (AP) para múltiples consultas.
    Se obtiene calculando el Average Precision de cada consulta y luego
    promediando estos valores.

    Fórmula:	# Si no se recuperaron documentos, el recall@k es 0.0

        MAP = (1 / Q) * Σ [ AP(q) ]
    donde:
        - Q es el número de consultas.
        - AP(q) es el Average Precision de la consulta q.

    Parámetros
    ----------
    queries_relevances : List[List[int]]
        Lista de consultas, donde cada consulta es representada por un vector binario:
        - 1 indica que el documento es relevante.
        - 0 indica que el documento no es relevante.
        Se asume que cada vector incluye todos los documentos relevantes de la colección para esa consulta.

    Retorna
    -------
    float
        Valor de Mean Average Precision en el rango [0.0, 1.0].
        Retorna 0.0 si no se proporciona ninguna consulta.
    """

	# Si no existe ninguna consulta, la métrica es 0.0
	if len(queries_relevances) == 0:
		return 0.0
	# Valores de precision para todas las consultas
	average_precision_values = [average_precision(r) for r in queries_relevances]
	return float(np.mean(average_precision_values))

In [197]:
def dcg_at_k(
		relevance: Union[List[int], np.ndarray], 
		k: int
	) -> float:

	"""
    Calcula la métrica Discounted Cumulative Gain (DCG) hasta la posición K.

    DCG mide la ganancia acumulada de los documentos relevantes, ponderada
    por su posición en la lista de resultados. Documentos relevantes en
    posiciones más altas contribuyen más que aquellos en posiciones más bajas.

    Fórmula:
        DCG@K = Σ ( rel_i / log2(max(i,2)) ), para i = 1..K

    Parámetros
    ----------
    relevance : List[int]
        Lista de relevancias de los documentos ordenados por ranking.
        La relevancia es un número natural (ejemplo: 0, 1, 2, 3, ...).
    k : int
        Posición de corte K hasta la cual se calcula el DCG.

    Retorna
    -------
    float
        Valor de DCG@K (número real >= 0).
        Retorna 0.0 si no hay documentos o si K <= 0.
    """

	# K debe ser positivo y mayor a cero para poder calcular la métrica
	if k <= 0:
		raise ValueError("K should be greater than 0")
	# Si no se recuperó ningún documento, la métrica es 0.0
	if len(relevance) == 0:
		return 0.0
	# Se limita K al número real de documentos recuperados
	k = min(k, len(relevance))
	relevance = np.array(relevance[:k], dtype = float)
	dcg = 0.0
	for i in range(k):
		dcg = dcg + relevance[i] / np.log2(max(i + 1, 2))
	return float(dcg)

In [198]:
def ndcg_at_k(
		relevance: Union[List[int], np.ndarray], 
		k: int
	) -> float:

	"""
    Calcula la métrica Normalized Discounted Cumulative Gain (NDCG) hasta la posición K.

    NDCG es la versión normalizada de DCG, donde se divide el valor de DCG
    entre el DCG ideal (IDCG). De esta manera, NDCG siempre toma valores
    entre 0 y 1, independientemente de la distribución de relevancias.

    Fórmula:
        NDCG@K = DCG@K / IDCG@K

    Parámetros
    ----------
    relevance : List[int]
        Lista de relevancias de los documentos ordenados por ranking.
        La relevancia es un número natural (ejemplo: 0, 1, 2, 3, ...).
    k : int
        Posición de corte K hasta la cual se calcula el NDCG.

    Retorna
    -------
    float
        Valor de NDCG@K (número real entre 0 y 1).
        Retorna 0.0 si no hay documentos, si IDCG = 0 o si K <= 0.
    """
	
	# K debe ser positivo y mayor a cero para poder calcular la métrica
	if k <= 0:
		raise ValueError("K should be greater than 0")
	# Si no se recuperó ningún documento, la métrica es 0.0
	if len(relevance) == 0:
		return 0.0
	# Se calcula el dcg@k
	dcg = dcg_at_k(relevance, k)
	# Se ordena la query para lograr el mayor DCG
	ideal_relevance = sorted(relevance, reverse = True)
	idcg = dcg_at_k(ideal_relevance, k)
	if idcg == 0:
		return 0.0
	return float(dcg / idcg)

In [199]:
def evaluate_query(
        query_id: str, 
        results: list[str], 
        judgments: dict[str, dict[str, int]]
    ) -> dict[str, float]:
    
	"""
    Evalúa el desempeño de una consulta específica usando diferentes métricas
    de recuperación de información.

    Parámetros
    ----------
    query_id : str
        Identificador de la consulta (ej. "q01").
    results : list[str]
        Lista ordenada de identificadores de documentos recuperados por el sistema.
    judgments : dict[str, dict[str, int]]
        Diccionario con los juicios de relevancia. La clave es el ID de la consulta 
        y el valor es un diccionario donde cada clave es un ID de documento y su 
        valor es un entero indicando el grado de relevancia.

    Retorna
    -------
    dict[str, float]
        Diccionario con las métricas calculadas para la consulta:
        - `"P@M"` : Precisión en M (donde M es el número de documentos relevantes).
        - `"R@M"` : Recall en M.
        - `"NDCG@M"` : Normalized Discounted Cumulative Gain en M.
        - `"AP"` : Average Precision (precisión promedio).
    
    Notas
    -----
    Esta función utiliza `get_relevance_lists` para obtener las listas de relevancia 
    y luego aplica funciones auxiliares (`precision_at_k`, `recall_at_k`, `ndcg_at_k`, 
    `average_precision`) para calcular las métricas.
    """
    
	relevance_bin, relevance_grades, num_rel = get_relevance_lists(query_id, results, judgments)

	# Según enunciado se evalúa hasta el número de documentos relevantes
	M = num_rel  
	# Métricas calculadas
	p_at_m = precision_at_k(relevance_bin, M)
	r_at_m = recall_at_k(relevance_bin, num_rel, M)
	ndcg_at_m = ndcg_at_k(relevance_grades, M)
	ap = average_precision(relevance_bin) 
	return {
		"P@M": p_at_m,
		"R@M": r_at_m,
		"NDCG@M": ndcg_at_m,
		"AP": ap
	}

In [200]:
def evaluate_system(
        results: dict[str, list[str]], 
        judgments: dict[str, dict[str, int]]
    ) -> tuple[dict[str, dict[str, float]], float]:
    
	"""
    Evalúa el desempeño de todo el sistema de recuperación sobre un conjunto de consultas.

    Parámetros
    ----------
    results : dict[str, list[str]]
        Diccionario donde la clave es el identificador de la consulta (ej. "q01")
        y el valor es una lista ordenada de identificadores de documentos recuperados.
    judgments : dict[str, dict[str, int]]
        Diccionario de juicios de relevancia. 
        Cada clave es el ID de una consulta y su valor es un diccionario que asigna
        a cada documento un grado de relevancia (entero).

    Retorna
    -------
    tuple[dict[str, dict[str, float]], float]
        Una tupla compuesta por:
        - `per_query` : diccionario con las métricas por consulta, donde cada clave 
          es el ID de la consulta y cada valor es otro diccionario con:
            - `"P@M"` : Precisión en M
            - `"R@M"` : Recall en M
            - `"NDCG@M"` : NDCG en M
            - `"AP"` : Average Precision
        - `MAP` : valor agregado del Mean Average Precision sobre todas las consultas.

    Notas
    -----
    Para cada consulta se invoca `evaluate_query`, y los valores de Average Precision
    se promedian para obtener el MAP (Mean Average Precision).
    """
     
	per_query = {}
	all_ap = []
	# Métricas individuales
	for qid, docs in results.items():
		metrics = evaluate_query(qid, docs, judgments)
		per_query[qid] = metrics
		all_ap.append(metrics["AP"])
	# Métrica general
	MAP = float(np.mean(all_ap)) if all_ap else 0.0
	return per_query, MAP

In [201]:
"""
Ejemplo de evaluación completa del sistema de recuperación.

Se utiliza la función `evaluate_system` para:
1. Comparar los resultados obtenidos (`results`) contra los juicios de relevancia (`judgments`).
2. Calcular métricas por consulta: P@M, R@M, NDCG@M y AP.
3. Obtener el valor global de MAP (Mean Average Precision).

El resultado es una tupla `(per_query, MAP)` donde:
- `per_query` contiene las métricas por consulta.
- `MAP` es un único valor agregado.
"""

evaluation = evaluate_system(results, judgments)

print("--------** evaluate system **--------\n")
print("Métricas por consulta:\n")
for qid, metrics in evaluation[0].items():
    print(f"{qid}: {metrics}")

print("\nMAP global:")
print(evaluation[1])


--------** evaluate system **--------

Métricas por consulta:

q01: {'P@M': 0.3333333333333333, 'R@M': 0.3333333333333333, 'NDCG@M': 0.20151514190050246, 'AP': 0.4777777777777777}
q02: {'P@M': 0.5454545454545454, 'R@M': 0.5454545454545454, 'NDCG@M': 0.5804945971717436, 'AP': 0.6893900356025772}
q03: {'P@M': 1.0, 'R@M': 1.0, 'NDCG@M': 0.9946788263132377, 'AP': 1.0}
q04: {'P@M': 0.7142857142857143, 'R@M': 0.7142857142857143, 'NDCG@M': 0.7519920639782149, 'AP': 0.8444444444444443}
q06: {'P@M': 0.6666666666666666, 'R@M': 0.6666666666666666, 'NDCG@M': 0.8053569325627968, 'AP': 0.8551587301587301}
q07: {'P@M': 0.25, 'R@M': 0.25, 'NDCG@M': 0.41311732856427996, 'AP': 0.21565407772304324}
q08: {'P@M': 0.75, 'R@M': 0.75, 'NDCG@M': 0.8878603511782898, 'AP': 0.8490740740740742}
q09: {'P@M': 0.8333333333333334, 'R@M': 0.8333333333333334, 'NDCG@M': 0.8878789582207093, 'AP': 0.9761904761904762}
q10: {'P@M': 0.375, 'R@M': 0.375, 'NDCG@M': 0.4189348825219358, 'AP': 0.4001583749256621}
q12: {'P@M': 1.0,