<div style="border-radius: 5px; padding: 1rem; margin-bottom: 1rem">
<img src="https://www.prototypesforhumanity.com/wp-content/uploads/2022/11/LOGO_UTEC_.png" alt="Banner" width="150" />   
 </div>

# Laboratorio 5.2: Similitud de Coseno e Indice Invertido

> **Prof. Heider Sanchez**  
> **ACLs:** Ana María Accilio, Sebastián Loza

## Introducción

Este laboratorio tiene como objetivo que los estudiantes implementen un sistema de análisis y búsqueda de documentos utilizando Procesamiento de Lenguaje Natural (NLP) y una base de datos PostgreSQL. Se trabajará a partir de los Bag of Words generados en el laboratorio 5.1, agregando dos funcionalidades principales: búsquedas rankeadas mediante similitud de coseno e índices invertidos para una recuperación eficiente de documentos.


### Objetivos
- Implementar la lectura de Bags of Words desde PostgreSQL utilizando Python.
- Desarrollar un sistema de búsqueda de documentos similares mediante similitud de coseno que:
  - Procese consultas en lenguaje natural
  - Filtre documentos relevantes según palabras clave
  - Ordene resultados por relevancia mediante similitud de coseno
- Diseñar e implementar un índice invertido para:
  - Optimizar las búsquedas booleanas
  - Mejorar el rendimiento de la similitud de coseno
  - Reducir el tiempo de recuperación de documentos

### Requisitos previos

- Haber cargado la tabla de noticias en PostgreSQL y generado el bag of word.
- Tener instalado las dependencias de NLTK en Python.
- Completar la función para conectarte a PostgreSQL y leer los datos:

In [None]:
import psycopg2
import pandas as pd
import nltk
import os
from dotenv import load_dotenv
from collections import Counter
import numpy as np
import math

nltk.download('punkt')
nltk.download('punkt_tab')
load_dotenv()

dbname = os.getenv("DBNAME")
dbuser = os.getenv("DBUSER")
dbpass = os.getenv("DBPASS")
dbhost = os.getenv("DBHOST")

def connect_db():
    conn = psycopg2.connect(
        dbname=dbname,
        user=dbuser,
        password=dbpass,
        host=dbhost
    )
    return conn

def fetch_data():
    conn = connect_db()
    query = "SELECT id, contenido, bag_of_words FROM noticias;"
    df = pd.read_sql(query, conn)
    df['bag_of_words'] = df['bag_of_words']
    conn.close()
    return df

def fetch_query(query):
    conn = connect_db()
    df = pd.read_sql(query, conn)
    conn.close()
    return df

## 1. (5 puntos) Bag of Words y Similitud de Coseno 

El proceso de búsqueda se realiza en dos etapas:

1. **Filtrado inicial**:
   - Recibe una consulta en texto natural
   - Procesa la consulta para extraer palabras clave
   - Utiliza operadores OR en SQL para recuperar documentos que contengan al menos una palabra clave

2. **Ordenamiento por relevancia**:
   - Convierte los Bag of Words de los documentos filtrados en vectores numéricos (usando solo la frecuencia)
   - Calcula la similitud de coseno entre la consulta y cada documento
   - Ordena los resultados por similitud descendente
   - Retorna los top-k documentos más relevantes

In [None]:
def load_stopwords():
    conn = connect_db()
    query = "SELECT word FROM stopwords;"
    df = pd.read_sql(query, conn)
    conn.close()
    return set(df["word"])

stopwords_set = load_stopwords()
stemmer = nltk.SnowballStemmer("spanish")

def preprocess(text: str) -> list:
    text = text.lower()
    tokens = nltk.word_tokenize(text) 
    filtered = [t for t in tokens if t.isalpha() and t not in stopwords_set]
    stemmed = [stemmer.stem(t) for t in filtered]
    return stemmed

def compute_bow(text: str) -> dict:
    tokens = preprocess(text)
    freq = Counter(tokens)
    return dict(freq)

def bow_to_vector(bow: dict, vocab: list) -> list:
    return [bow.get(word, 0) for word in vocab]

def cosine_similarity(vec1, vec2):
    a = np.array(vec1)
    b = np.array(vec2)
    if np.linalg.norm(a) == 0 or np.linalg.norm(b) == 0:
        return 0.0
    return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))

######################################################################

def search(query, top_k=5):
    # --- Etapa 1: Filtrado inicial ---
    consulta_bow = compute_bow(query)
    if not consulta_bow:
        return pd.DataFrame(columns=["id", "contenido", "similarity"])
    
    # Palabras clave
    palabras_clave = list(consulta_bow.keys())
    condiciones_sql = " OR ".join([f"contenido ILIKE '%{pal}%'" for pal in palabras_clave])

    # Usando operadores OR para buscar documentos con alguna de estas palabras clave
    sql = f"""
        SELECT id, contenido FROM noticias 
        WHERE {condiciones_sql}
    """
    df = fetch_query(sql)

    if df.empty:
        return pd.DataFrame(columns=["id", "contenido", "similarity"])

    # --- Etapa 2: Ordenamiento por relevancia ---
    document_bows = [compute_bow(texto) for texto in df["contenido"]]

    vocab = set(consulta_bow.keys())
    for bow in document_bows:
        vocab.update(bow.keys())
    vocab = sorted(list(vocab))

    consulta_vector = bow_to_vector(consulta_bow, vocab)
    document_vectors = [bow_to_vector(bow, vocab) for bow in document_bows]

    # Similitud de coseno
    similitudes = [cosine_similarity(consulta_vector, doc_vector) for doc_vector in document_vectors]

    #Ordenando y resultado final
    df["similarity"] = similitudes
    df_ordenado = df.sort_values(by="similarity", ascending=False).head(top_k)

    return df_ordenado

####  Pruebas funcionales

In [None]:
test_queries = [
    "¿Cuáles son las últimas innovaciones en la banca digital y la tecnología financiera?",
    "evolución de la inflación y el crecimiento de la economía en los últimos años",
    "avances sobre sostenibilidad y energías renovables para el medio ambiente"
]

for query in test_queries:
    results = search(query, top_k=3)
    print(f"Probando consulta: '{query}'")
    for _, row in results.iterrows():
        print(f"\nID: {row['id']}")
        print(f"Similitud: {row['similarity']:.3f}")
        print(f"Texto: {row['contenido'][:200]}...")
    print("-" * 50)

## 2. (5 puntos) Construcción del Indice Invertido 

A partir de los  `bag of words` almacenados en la base de datos, se debe construir un índice invertido y conservarlo en un diccionario de Python para su eficiente recuperación.

In [None]:
class InvertedIndex:
    def __init__(self):
        self.index = {}
        self.idf = {}
        self.length = {}

    def build_from_db(self):
        # leer los datos desde la base de datos
        noticias_df = fetch_data()
        noticias_id = noticias_df["id"].to_list()
        bag_of_words_list = noticias_df["bag_of_words"].tolist()

        n = len(noticias_id) # total de documentos
        
        df = {} # cantidad de documentos donde aparece cada palabr
        # inicializar los diccionarios del indice invertido y de las normas (vacios)
        self.index = {}
        self.length = {}

        # recorrer cada documento con su bag of words
        for doc_id, bow in zip(noticias_id, bag_of_words_list):
            norm = 0 # acumulador para la norma del documento
            for word, tf in bow.items(): # recorrer cada palabra y su frecuencia en el documento
                tf = 1 + math.log10(tf)
                if word not in self.index: # si la palabra no estaen el indice, se agrega
                    self.index[word] = [] 
                self.index[word].append((doc_id, tf)) # agregar el documento y su frecuencia al indice invertido
                df[word] = df.get(word, 0) + 1 #contar que esta palabra aparece en este documento
                norm += tf ** 2 # acumular el cuadrado del tf para calcular la norma

            self.length[doc_id] = math.sqrt(norm) # guardar la norma del documento

        for word, doc_freq in df.items(): # calcular el idf para cada palabra
            self.idf[word] = math.log10(n / doc_freq) #idf
    
    def L(self, word):
        word = stemmer.stem(word)
        return self.index.get(word, [])
  
    def cosine_search(self, query, top_k=5):  
        score = {}
        
        tokens = preprocess(query)

        query_tf = Counter(tokens)

        for token, qtf in query_tf.items():
            if token not in self.index:
                continue
            documents = self.index[token]
            idf = self.idf[token]
            wtq = (1 + math.log10(qtf))*idf
            for doc, tf in documents:
                if doc not in score:
                    score[doc] = 0
                wtd = tf*idf
                score[doc] += wtd*wtq
        
        for key, value in score.items():
            score[key] = value/self.length[key]
        
        # Ordenar el score resultante de forma descendente
        result = sorted(score.items(), key= lambda tup: tup[1], reverse=True)
        # retornamos los k documentos mas relevantes (de mayor similitud a la query)
        return result[:top_k]
    
    def showDocument(self, doc_id):
        noticia = fetch_query(f"SELECT id, contenido FROM noticias WHERE id = {doc_id}")
        return noticia

    def showDocuments(self, doc_ids):
        if not doc_ids:
            return ""
        condition = " OR ".join([f"id = {id[0]}" for id in doc_ids])
        noticias = fetch_query(f"SELECT id, contenido FROM noticias WHERE {condition}")
        return noticias

## 3. (4 puntos) Consultas Booleanas usando el indice invertido

Implementar búsquedas booleanas utilizando el índice invertido construido anteriormente. La búsqueda debe:

- Soportar los operadores básicos:
    - AND: intersección de documentos
    - OR: unión de documentos
    - AND-NOT: diferencia de documentos
- Procesar consultas como:
    - "sostenibilidad AND ambiente AND renovable"
    - "tecnología AND (banca OR finanzas)"
    - "economía AND-NOT inflación"    

####  Pruebas funcionales

In [None]:
idx = InvertedIndex()
idx.build_from_db()

def AND(list1, list2):
    # Implementar la intersección de dos listas O(n +m)
    res = []
    p1 = 0
    p2 = 0
    while p1 < len(list1) and p2 < len(list2):
        e1 = list1[p1]
        e2 = list2[p2]
        if e1[0] == e2[0]:
            res.append(e1)
            p1 += 1
            p2 += 1
        elif e1[0] > e2[0]:
            p2 += 1
        else:
            p1 += 1
    return res
        

def OR(list1, list2):
    # Implementar la unión de dos listas O(n +m)
    res = []
    p1 = 0
    p2 = 0
    while p1 < len(list1) and p2 < len(list2):
        e1 = list1[p1]
        e2 = list2[p2]
        if e1[0] == e2[0]:
            res.append(e1)
            p1 += 1
            p2 += 1
        elif e1[0] > e2[0]:
            res.append(e2)
            p2 += 1
        else:
            res.append(e1)
            p1 += 1
    while p1 < len(list1):
        e1 = list1[p1]
        res.append(e1)
        p1 += 1
    while p2 < len(list2):
        e2 = list2[p2]
        res.append(e2)
        p2 += 2
    return res

def AND_NOT(list1, list2):
    # Implementar la diferencia de dos listas O(n +m)
    res = []
    p1 = 0
    p2 = 0
    while p1 < len(list1) and p2 < len(list2):
        e1 = list1[p1]
        e2 = list2[p2]
        if e1[0] == e2[0]:
            p1 += 1
            p2 += 1
        elif e1[0] > e2[0]:
            p2 += 1
        else:
            res.append(e1)
            p1 += 1
    while p1 < len(list1):
        e1 = list1[p1]
        res.append(e1)
        p1 += 1
    return res

# Prueba 1
result = AND(idx.L("sostenibilidad"), AND(idx.L("ambiente"), idx.L("renovables")))
print("sostenibilidad AND ambiente AND renovable:")
print(idx.showDocuments(result))

# Prueba 2
result = AND(idx.L("tecnología"), OR(idx.L("banca"), idx.L("finanzas")))
print("tecnología AND (banca OR finanzas):")
print(idx.showDocuments(result))

# Prueba 3
result = AND_NOT(idx.L("economía"), idx.L("inflación"))
print("economía AND-NOT inflación:")
print(idx.showDocuments(result))

## 4. (6 puntos) Similitud de coseno usando el indice invertido
Implementar búsqueda por similitud de coseno aprovechando el índice invertido:

- Proceso de búsqueda:
    - Recibe una consulta en lenguaje natural y un parámetro top_k
    - Utiliza el índice invertido para identificar documentos candidatos
    - Calcula similitud de coseno solo con los documentos relevantes utilizando los pesos TF-IDF
    - Retorna los top-k documentos más similares

<img src="imagenes/image-20250506-162949.png" width="500" align="" />

####  Pruebas funcionales

In [None]:
test_queries = [
    {
        "query": "¿Cuáles son las últimas innovaciones en la banca digital y la tecnología financiera?",
        "top_k": 10
    },
    {
        "query": "evolución de la inflación y el crecimiento de la economía en los últimos años",
        "top_k": 20
    },
    {
        "query": "avances sobre sostenibilidad y energías renovables para el medio ambiente",
        "top_k": 15
    }
]

for test in test_queries:    
    results = idx.cosine_search(test['query'], test['top_k'])
    print(f"Consulta: {test['query']}")
    print(f"Top {test['top_k']} documentos más similares:") 
    for doc_id, score in results:
        print(f"Doc {doc_id}, Score: {score:.3f}:")
        print(idx.showDocument(doc_id))

<a style='text-decoration:none;line-height:16px;display:flex;color:#5B5B62;padding:10px;justify-content:end;' href='https://deepnote.com?utm_source=created-in-deepnote-cell&projectId=0cac3f27-ab57-45e1-94d7-eb1f84dca7ec' target="_blank">
 </img>
Created in <span style='font-weight:600;margin-left:4px;'>Deepnote</span></a>