# Ejemplo LSA sobre descriptores TF-IDF

**Curso**: CC5213 - Recuperación de Información Multimedia  
**Profesor**: Juan Manuel Barrios  
**Fecha**: 21 de mayo de 2025


Primero se usará el mismo ejemplo  de la semana anterior de crear descriptores tf-idf y calcular la similitud coseno entre todos.

Luego se reducirá la dimensión de los vectores con LSA usando dos métodos:
 1. Factorizar las matrices con un método iterativo aproximado (más rápido)
 2. Factorizar directamente las matrices (más lento)

Para cada uno se compara el resultado obtenido por LSA versus el obtenido por los descriptores originales.

## Calcular similitudes con tf-idf (igual a semana anterior)


In [None]:
import time
import numpy
import os

class File():
    def __init__(self):
        self.filename = ""
        self.titulo = ""
        self.autor = ""
        self.fecha = ""
        self.tipo = ""
        self.url = ""
        self.full_text = ""

    def load_full_text(self, dir_dataset):
        file = os.path.join(dir_dataset, self.filename)
        with open(file, mode="r", encoding="utf8") as f:
            self.full_text = f.read()
        # quitar la primera linea (es una url)
        self.full_text = self.full_text[self.full_text.find("\n") + 1 :]
  

def load_dataset(dir_dataset):
    files = list()
    header = dict()
    file_dataset = os.path.join(dir_dataset, 'dataset.txt')
    print("leyendo {}...".format(file_dataset))
    with open(file_dataset, mode="r", encoding="utf8") as f:
        for line in f:
            fields = line.strip().split("\t")
            # leer los nombres de la primera fila 
            if len(header) == 0:
                header = dict((c, i) for i, c in enumerate(fields))
                continue
            file = File()
            file.filename = fields[header["Filename"]]
            file.titulo = fields[header["Titulo"]]
            file.autor = fields[header["Por"]]
            file.fecha = fields[header["Fecha"]]
            file.tipo = fields[header["Tipo_documento"]]
            file.url = fields[header["URL"]]
            files.append(file)
    dir_dataset = os.path.join(dir_dataset, 'dataset')
    print("leyendo textos de {} archivos en {} ...".format(len(files), dir_dataset))
    for file in files:
        file.load_full_text(dir_dataset)
    return files

t1 = time.time()
dataset_files = load_dataset('dataset_ciperchile.21-05-2025')
t2 = time.time()

print("leídos {} documentos en {:.1f} segundos".format(len(dataset_files), t2-t1))

## Opcional, restringir los documentos a usar

In [None]:
# se podrían usar menos documentos, para que no sea tan lento
cantidad_documentos = 5174  #len(dataset_files)

dataset_files = dataset_files[0:cantidad_documentos]

print("Usando {} documentos".format(len(dataset_files)))

## Calcular vocabulario (igual a ejemplo semana anterior)

In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer

#un array con los textos para crear el vocabulario y descriptores
textos = list()
largo_total = 0
for doc in dataset_files:
    largo_total += len(doc.full_text)
    textos.append(doc.full_text)

print("Se van a usar {} documentos, con largo total {:.1f} MB".format(len(textos), largo_total/1024/1024))

#calcular el vocabulario: total de palabras a representar
t0 = time.time()
vectorizer = TfidfVectorizer(lowercase=True,          # por defecto es True 
                             strip_accents='unicode', # por defecto es None. unicode=eliminar acentos
                             sublinear_tf=True,       # por defecto es False. TRUE=usar 1+log(freq) 
                             use_idf=True,            # por defecto es True
                             norm='l2',               # por defecto es l2 (asi el coseno es solo la multiplicacion de valores)
                             ngram_range=(1,1),       # por defecto es (1,1). rango de ngramas a usar por ej: (1,1) o (1,2) o (1,3)
                             max_df=1.0, # Si una palabra aparece en más que max_df documentos, se ignora (probar con 0.9)
                                         #  Si es float -> porcentaje del total de documentos
                                         #  Si es int   -> cantidad de documentos
                                         #  Notar que es distinto 1.0 (el 100% de los documentos) vs 1 (solo 1 documento)
                             min_df=0.0  # Si una palabra aparece en menos que min_df documentos, se ignora (probar con 0.1)
                             )           #  También puede ser float o int
vectorizer.fit(textos)
t1 = time.time()

print("tiempo para crear vocabulario: {:.1f} segs".format(t1-t0))
print("vocabulario de {} palabras".format(len(vectorizer.vocabulary_)))
print("primeras 10 palabras: ", list(vectorizer.vocabulary_.keys())[0:10])
print("primeras 10 palabras ordenadas: ", list(sorted(vectorizer.vocabulary_.keys())[0:10]))

## Calcular descriptores (igual a ejemplo semana anterior)

In [None]:
#transform() entrega el descriptor tf-idf (un vector sparse con los pesos de las palabras)
t0 = time.time()
descriptores = vectorizer.transform(textos)
t1 = time.time()

print("tiempo descriptores: {:.1f} segs".format(t1-t0))
print("descriptores es matriz de {}".format(descriptores.shape))

#verificar que los descriptores es una matriz sparse
print("descriptores tipo {}".format(type(descriptores)))

## Calcular similares (igual a ejemplo semana anterior)

In [None]:
#usar la funcion toarray() para convertir los descriptores a una matriz densa
#multiplicar las matrices con matmul() para calcular la similitud coseno

t0 = time.time()
descriptores_denso = descriptores.toarray()
auto_similitud = numpy.matmul(descriptores_denso, descriptores_denso.T)
t1 = time.time()

print("Tiempo comparación todos contra todos: {:.1f} segs".format(t1-t0))

print(auto_similitud)

## Imprimir el resultado de similitud coseno (igual a ejemplo semana anterior)

In [None]:
import pandas

def imprimir_similares(matriz, min_score):
    #obtener el maximo por fila
    #primero llenar la diagonal con ceros para que el mas parecido no sea si mismo
    numpy.fill_diagonal(matriz, 0)
    
    filas = []
    for i in range(matriz.shape[0]):
        #obtener la posicion del maximo por fila
        posicion_mayor = numpy.argmax(matriz[i])
        #nombres de archivos
        nombre_documento = dataset_files[i].filename
        nombre_mas_similar = dataset_files[posicion_mayor].filename
        similitud = matriz[i, posicion_mayor]
        #imprimir documentos con score mayor al mínimo pedido
        if similitud >= min_score:
            fila = {'id':  i,
                    'query':  nombre_documento,
                    'documento': nombre_mas_similar,
                    'similitud coseno': similitud}
            filas.append(fila)
    
    print("documentos con similitud mayor a {}:".format(min_score))
    print()
    df = pandas.DataFrame(filas)
    print(df.to_string(index=False, justify='center'))

imprimir_similares(auto_similitud, 0)

# Latent Semantic Analysis (LSA)

LSA consiste en reducir la cantidad de términos del vocabulario, agrupando los términos que aparecen correlacionados. Conceptualmente es muy similar a Análisis de Componentes Principales (PCA) que veremos más adelante en el curso.

A continuación se define la cantidad de conceptos que se van a calcular para reducir de dimensionalidad los vectores.

# LSA opción 1: usar método aproximado de Scikit-Learn

El algoritmo de TruncatedSVD aproxima la descomposición con un método iterativo (demora como 1 minuto).   

 * `n_components` define la nueva dimensionalidad de los vectores
 * `n_iter` es la cantidad de iteraciones del método aproximado. Al aumentarlo mejora la aproximación al resultado real aunque toma más tiempo de calcular.

Ver documentacion en: https://scikit-learn.org/stable/modules/generated/sklearn.decomposition.TruncatedSVD.html

In [None]:
from sklearn.decomposition import TruncatedSVD

# numero de dimensiones a los que se reducirán los descriptores
# con un numero pequeño funciona todo muy rápido aunque no muy buena calidad
num_conceptos = 50

t0 = time.time()
transformer_tsvd = TruncatedSVD(n_components=num_conceptos, n_iter=20, random_state=4)
transformer_tsvd.fit(descriptores)
t1 = time.time()

print("tiempo ajustar la transformacion: {:.1f} segs".format(t1-t0))

## Revisar el espacio reducido y valores singulares

In [None]:
#Se imprimen los valores singulares y su suma total con respecto al total
print("matriz proyeccion conceptos latentes={}".format(transformer_tsvd.components_.shape))
print("primer concepto latente={}".format(transformer_tsvd.components_[0]))
print("segundo concepto latente={}".format(transformer_tsvd.components_[1]))
print("tercer concepto latente={}".format(transformer_tsvd.components_[2]))
print("PORCENTAJE DE VARIANZA={:.1f}%".format(transformer_tsvd.explained_variance_ratio_.sum()*100))
print(transformer_tsvd.singular_values_)


## Mostrar los primeros conceptos latentes

In [None]:
def imprimir_conceptos_tsvd(conceptos_a_mostrar, palabras_por_concepto):
    vocabulario = numpy.array(vectorizer.get_feature_names_out())
    for i, pesos in enumerate(transformer_tsvd.components_):
        if i >= conceptos_a_mostrar:
            break
        indices_menor_a_mayor = numpy.argsort(pesos)
        indices_mayor_a_menor = indices_menor_a_mayor[::-1]
        pesos_mayor_a_menor = pesos[indices_mayor_a_menor]
        terminos_mayor_a_menor = vocabulario[indices_mayor_a_menor]
        lista=[]
        for j in range(palabras_por_concepto):
            lista.append("{}({:.2f})".format(terminos_mayor_a_menor[j],pesos_mayor_a_menor[j]))
        print("CONCEPTO LATENTE #{:2d}: {}".format(i, " ".join(lista)))

imprimir_conceptos_tsvd(10, 8)

## Reducir de dimensión los descriptores tf-idf
Se obtiene la matriz de descriptores de dimension dada en el parámetro `n_components`.

In [None]:
descriptores_tsvd = transformer_tsvd.transform(descriptores)
print("{} -> {} ".format(descriptores.shape, descriptores_tsvd.shape))

## Calcular la similitud en el espacio reducido

Como los descriptores tienen menos dimensiones, el tiempo de comparación es mucho menor al usado con los descriptores originales.

In [None]:
t0 = time.time()
similitudes_tsvd = numpy.matmul(descriptores_tsvd, descriptores_tsvd.T)
t1 = time.time()
print("tiempo comparacion todos contra todos: {:.1f} segs".format(t1-t0))

## Comparar con lo obtenido con los descriptores originales

In [None]:
import pandas

def evaluar_similitud(matriz_similitud, conceptos, matriz_ideal, imprimir=True):
    #completo la diagonal con cero para que el mas parecido no sea si mismo
    numpy.fill_diagonal(matriz_similitud, 0)
    numpy.fill_diagonal(matriz_ideal, 0)

    #obtener la posicion del maximo por fila
    posicion_max = numpy.argmax(matriz_similitud, axis=1)
    mayor = numpy.amax(matriz_similitud, axis=1)
    
    #posicion del mas similar por fila
    posicion_max_ideal = numpy.argmax(matriz_ideal, axis=1)

    #imprimir los documentos mas parecidos y su similitud
    filas = []
    total = matriz_ideal.shape[0]
    total_iguales = 0
    for i in range(total):
        nombre_documento = dataset_files[i].filename
        nombre_mas_similar = dataset_files[posicion_max[i]].filename
        nombre_ideal = dataset_files[posicion_max_ideal[i]].filename
        resultado = ""
        if posicion_max[i] == posicion_max_ideal[i]:
            resultado = "¡igual! :-) ok"
            total_iguales += 1
        else:
            resultado = "¡distinto! era {}".format(nombre_ideal)
        filas.append([nombre_documento, nombre_mas_similar, mayor[i], resultado])
    pct_coincidencias = total_iguales / total * 100
    if imprimir:
        print("mostrando resultados al usar {} conceptos".format(conceptos))
        print("iguales al original: {} / {} = {:.1f}%".format(total_iguales, total, pct_coincidencias))
        print()
        df = pandas.DataFrame(filas, columns=["query", "similar", "similitud", "resultado"])
        print(df.to_string(index=False,justify='center'))
    return pct_coincidencias


In [None]:
evaluar_similitud(similitudes_tsvd, descriptores_tsvd.shape[1], auto_similitud)

# LSA opción 2: Usar método exacto de factorización de matrices

Usaremos la implementación de Numpy para la SVD (Singular Value Decomposition).  
Ver documentacion en: https://numpy.org/doc/stable/reference/generated/numpy.linalg.svd.html

Es mucho más lento que el método anterior ya que es exacto. La ventaja es que es independiente del número de dimensiones a proyectar, por lo que se calcula una vez y luego se puede probar con distinta cantidad de conceptos latentes.

In [None]:
from numpy.linalg import svd

#demora unos 5 minutos para todo el dataset
t0 = time.time()
descriptores_denso = descriptores.toarray()
U, S, VT = numpy.linalg.svd(descriptores_denso, full_matrices=False, compute_uv=True)
t1 = time.time()
print("tiempo factorizacion USV: {:.1f} segs ({:.1f} mins)".format(t1-t0,(t1-t0)/60))

#notar que S se guarda como un array en vez de matriz
#con numpy.diag(S) obtenemos la matriz diagonal
print("A{} = U{} x S{} x V^T{}".format(descriptores_denso.shape, U.shape, S.shape, VT.shape))
print("array S: {} {} {} ... {} {}".format(S[0], S[1], S[3], S[-2], S[-1]))

## Seleccionar dimensiones

In [None]:
# numero de dimensiones a los que se reducirán los descriptores
# notar que ya se realizó el cálculo de la factorización, por lo que ahora es directo probar con varios numeros
cantidad_conceptos = 50

#probar con distinto numero de conceptos (es rapido, no requiere volver a factorizar los descriptores)
Uk = U[:, :cantidad_conceptos]
Sk = S[0:cantidad_conceptos]
VkT = VT[:cantidad_conceptos, :]

print("Uk{} x Sk{} x Vk^T{}".format(Uk.shape, Sk.shape, VkT.shape))

#se imprimen los valores singulares y su suma total con respecto al total
print("PORCENTAJE DE VARIANZA={:.1f}%".format(numpy.sum(Sk)/numpy.sum(S)*100))
print(Sk)


In [None]:
def imprimir_conceptos_svd(conceptos_a_mostrar, palabras_por_concepto):
    vocabulario = numpy.array(vectorizer.get_feature_names_out())
    for fila in range(conceptos_a_mostrar):
        pesos = numpy.abs(numpy.array(VkT[fila])) # pueden salir pesos negativos
        indices_menor_a_mayor = numpy.argsort(pesos)
        indices_mayor_a_menor = indices_menor_a_mayor[::-1]
        pesos_mayor_a_menor = pesos[indices_mayor_a_menor]
        terminos_mayor_a_menor = vocabulario[indices_mayor_a_menor]
        lista=[]
        for j in range(palabras_por_concepto):
            lista.append("{}({:.2f})".format(terminos_mayor_a_menor[j],pesos_mayor_a_menor[j]))
        print("CONCEPTO LATENTE #{:2d}: {}".format(fila, " ".join(lista)))

imprimir_conceptos_svd(10, 8)

## Reducir de dimensión los descriptores originales

In [None]:
#METODO 1: reducir dimensiones de los descriptores 
Ak = numpy.matmul(descriptores_denso, VkT.T)
print("Ak={}".format(Ak.shape))

#METODO 2: alternativamente podemos reducirlos multiplicando: U*S
Ak_alt = numpy.matmul(Uk, numpy.diag(Sk))
print("Ak_alt={}".format(Ak_alt.shape))

#comprobando la diferencia entre ambos metodos
diferencias = numpy.subtract(Ak, Ak_alt)
print("diferencias entre Ak - Ak_alt: max={} min={}".format(numpy.max(diferencias), numpy.min(diferencias)))


## Calcular la similitud de los descriptores reducidos y comparar resultados con descriptores originales

In [None]:
#se realiza la búsqueda igual que en el ejemplo anterior
t0 = time.time()
similitudes_svd= numpy.matmul(Ak, Ak.T)
t1 = time.time()
print("tiempo comparacion todos contra todos: {:.1f} segs".format(t1-t0))

evaluar_similitud(similitudes_svd, Ak.shape[1], auto_similitud)

# Ejercicios Propuestos

__1.__ Hacer un gráfico que muestre cómo auymenta la calidad de la respuesta al aumentar la cantidad de conceptos


In [None]:
# Hacer un ciclo con distintos número de conceptos. Para cada uno:
#  -Proyectar U, S y V a Uk, Sk y Vk (tomando las primeras columans)
#  -Reducir los descriptores originales (descriptores_denso) a Ak  (multiplicar matrices)
#  -Comparar los descriptores y encontrar la matriz de similityd
#  -Comparar con la matriz original llamando a evaluar_similitud()

__2.__ ¿Cómo buscar documentos similares cuando el documento de consulta no se usó para calcular LSA?

In [None]:
#Calcular el documento más similar para los siguientes documentos de consulta.
textos_q = [ "elecciones de alcalde", "estudiantes de universidad"]

#1.calcular la matriz de descriptores para textos_q (usar el vocabulario ya calculado)
#2.proyectar la matriz de descriptores al espacio reducido
#3.buscar el documento más similar en el espacio reducido
#4.comprobar si se obtiene el mismo resultado en el espacio original
