<a href="https://colab.research.google.com/github/dcdlima/COS738/blob/main/COS738_working.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Sistema de Busca e Recuperaçao

# Implementação de um Sistema de Recuperação em Memória Segundo o Modelo Vetorial

Este notebook apresenta a implementação completa de um sistema de recuperação da informação em memória, dividido em 4 módulos:

1. **Processador de Consultas**
2. **Gerador de Lista Invertida**
3. **Indexador (Modelo Vetorial com TF-IDF)**
4. **Buscador (Busca por Similaridade Cosseno)**

Todos os módulos seguem o padrão proposto, utilizando arquivos de configuração, separação por etapas e logs com o módulo `logging`.

### O conjunto de dados

O conjunto de dados foi baixado para o Google Drive e utilizado diretamente no Colab

In [None]:
import zipfile
from google.colab import files
import os

# 1. Fazer upload do arquivo ZIP (se estiver no seu computador)
uploaded = files.upload() # Descomente esta linha e execute para fazer upload
#nome_do_arquivo_zip = list(uploaded.keys())[0]


### Arquivos de configuraçao

Foram criados a cada módulo

In [None]:
### Conjunto de dados

from google.colab import drive
drive.mount('/content/gdrive', force_remount=True)

Mounted at /content/gdrive


## Criaçao do arquivo CFG (configuraçao) para o processador de consultas

In [None]:
def criar_pc_cfg():
    conteudo_cfg = """LEIA=cfquery.xml
CONSULTAS=consultas.csv
ESPERADOS=esperados.csv
"""
    with open("PC.CFG", "w", encoding="utf-8") as f:
        f.write(conteudo_cfg)
    print("Arquivo PC.CFG criado com sucesso!")

# Executa a criação
criar_pc_cfg()

In [None]:
import logging
import time
import csv
import xml.etree.ElementTree as ET
import unicodedata
import re

# Configuração do logger
logging.basicConfig(
    filename='processador_consultas.log',
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

def remover_acentos(texto):
    """Remove acentos e caracteres especiais."""
    nfkd = unicodedata.normalize('NFKD', texto)
    return ''.join([c for c in nfkd if not unicodedata.combining(c)])

def normalizar_texto(texto):
    """Remove acentos, pontuações e converte para maiúsculas."""
    texto = remover_acentos(texto)
    texto = re.sub(r'[^A-Za-z0-9 ]+', ' ', texto)  # Remove pontuação
    return texto.upper().strip()

def ler_configuracao(caminho_cfg="/content/PC.CFG"):
    logging.info("Iniciando leitura do arquivo de configuração.")
    config = {}
    try:
        with open('/content/PC.CF', "r", encoding="utf-8") as f:
            linhas = [linha.strip() for linha in f.readlines()]
        chaves = ["LEIA", "CONSULTAS", "ESPERADOS"]
        for i, chave in enumerate(chaves):
            if not linhas[i].startswith(f"{chave}="):
                raise ValueError(f"Erro no PC.CFG: linha {i+1} deveria começar com '{chave}='")
            config[chave] = linhas[i].split("=")[1].strip()
        logging.info("Leitura do arquivo de configuração concluída.")
        return config
    except Exception as e:
        logging.error(f"Erro ao ler PC.CFG: {e}")
        raise

def processar_consultas(xml_path, consultas_csv, esperados_csv):
    inicio = time.time()
    logging.info(f"Iniciando processamento do XML: {'/content/gdrive/MyDrive/Dados_COS738/cfquery.xml'}")

    try:
        tree = ET.parse('/content/gdrive/MyDrive/Dados_COS738/cfquery.xml')
        root = tree.getroot()
    except Exception as e:
        logging.error(f"Erro ao carregar XML {'/content/gdrive/MyDrive/Dados_COS738/cfquery.xml'}: {e}")
        raise

    consultas = []
    esperados = []

    for record in root.findall("QUERY"):
        query_num = record.find("QueryNumber").text.strip()
        query_text = normalizar_texto(record.find("QueryText").text)

        consultas.append([query_num, query_text])

        # Processa os itens esperados
        for item in record.find("Records").findall("Item"):
            doc_num = item.get("ref")
            score = int(item.get("score", 0))
            if score != 0:
                esperados.append([query_num, doc_num, score])

    # Salva consultas.csv
    with open(consultas_csv, "w", encoding="utf-8", newline="") as f:
        writer = csv.writer(f, delimiter=";")
        writer.writerow(["QueryNumber", "QueryText"])
        writer.writerows(consultas)

    # Salva esperados.csv
    with open(esperados_csv, "w", encoding="utf-8", newline="") as f:
        writer = csv.writer(f, delimiter=";")
        writer.writerow(["QueryNumber", "DocNumber", "DocVotes"])
        writer.writerows(esperados)

    logging.info(f"{len(consultas)} consultas processadas.")
    logging.info(f"{len(esperados)} entradas esperadas gravadas.")
    logging.info(f"Tempo total do processamento: {time.time() - inicio:.2f} segundos.")

def main():
    logging.info("===== Início do módulo Processador de Consultas =====")
    tempo_total = time.time()
    try:
        config = ler_configuracao("PC.CFG")
        processar_consultas(config["LEIA"], config["CONSULTAS"], config["ESPERADOS"])
    except Exception as e:
        logging.error(f"Erro no módulo Processador de Consultas: {e}")
    logging.info(f"===== Fim do módulo (tempo total: {time.time() - tempo_total:.2f} seg) =====")

if __name__ == "__main__":
    main()

with open("processador_consultas.log", "r", encoding="utf-8") as f:
    print(f.read())

2025-07-29 10:29:30,391 - INFO - ===== Início do módulo Processador de Consultas =====
2025-07-29 10:29:30,391 - INFO - Iniciando leitura do arquivo de configuração.
2025-07-29 10:29:30,391 - INFO - Leitura do arquivo de configuração concluída.
2025-07-29 10:29:30,391 - INFO - Iniciando processamento do XML: /content/drive/MyDrive/Dados_COS738/cfquery.xml
2025-07-29 10:29:30,391 - ERROR - Erro no módulo Processador de Consultas: [Errno 2] No such file or directory: 'cfquery.xml'
2025-07-29 10:29:30,391 - INFO - ===== Fim do módulo (tempo total: 0.00 seg) =====
2025-07-29 10:30:06,487 - INFO - NumExpr defaulting to 2 threads.



In [None]:
def criar_gli_cfg():
    arquivos = ["/content/cf74.xml", "/content/cf75.xml", "/content/cf76.xml", "/content/cf77.xml", "/content/cf78.xml", "/content/cf79.xml"]
    caminho_saida = "lista_invertida.csv"

    with open("GLI.CFG", "w", encoding="utf-8") as f:
        for nome in arquivos:
            f.write(f"LEIA={nome}\n")
        f.write(f"ESCREVA={caminho_saida}\n")

    print("Arquivo GLI.CFG criado com sucesso!")

# Executar
criar_gli_cfg()

In [None]:
import logging
import time
import unicodedata
import re
import xml.etree.ElementTree as ET
import csv
from collections import defaultdict

# Reconfigura o logger, mesmo se já estiver configurado
for handler in logging.root.handlers[:]:
    logging.root.removeHandler(handler)

logging.basicConfig(
    filename='gerador_lista_invertida.log',
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

def remover_acentos(texto):
    nfkd = unicodedata.normalize('NFKD', texto)
    return ''.join([c for c in nfkd if not unicodedata.combining(c)])

def normalizar_texto(texto):
    texto = remover_acentos(texto)
    texto = re.sub(r'[^A-Za-z ]+', ' ', texto)
    return texto.upper().strip()

def ler_configuracao(gli_cfg="GLI.CFG"):
    logging.info("Lendo arquivo de configuração GLI.CFG")
    arquivos_leitura = []
    arquivo_saida = None
    with open('/content/GLI.CFG', "r", encoding="utf-8") as f:
        linhas = [linha.strip() for linha in f.readlines()]
    for linha in linhas:
        if linha.startswith("LEIA="):
            arquivos_leitura.append(linha.split("=")[1].strip())
        elif linha.startswith("ESCREVA="):
            if arquivo_saida:
                raise ValueError("Mais de um ESCREVA no arquivo de configuração.")
            arquivo_saida = linha.split("=")[1].strip()
    if not arquivos_leitura or not arquivo_saida:
        raise ValueError("Faltam instruções LEIA ou ESCREVA no GLI.CFG.")
    logging.info(f"{len(arquivos_leitura)} arquivos para leitura. Saída: {arquivo_saida}")
    return arquivos_leitura, arquivo_saida

def processar_documentos(arquivos_xml):
    lista_invertida = defaultdict(list)
    total_docs = 0
    total_palavras = 0
    for arquivo in arquivos_xml:
        logging.info(f"Lendo arquivo {arquivo}")
        try:
            tree = ET.parse(arquivo)
            root = tree.getroot()
            for record in root.findall("RECORD"):
                recordnum_elem = record.find("RECORDNUM")
                if recordnum_elem is None:
                    continue
                doc_id = int(recordnum_elem.text.strip())
                texto_elem = record.find("ABSTRACT") or record.find("EXTRACT")
                if texto_elem is None:
                    logging.warning(f"Documento {doc_id} sem ABSTRACT ou EXTRACT.")
                    continue
                texto = normalizar_texto(texto_elem.text or "")
                palavras = texto.split()
                palavras_validas = [p for p in palavras if len(p) >= 2 and p.isalpha()]
                for palavra in palavras_validas:
                    lista_invertida[palavra].append(doc_id)
                total_docs += 1
                total_palavras += len(palavras_validas)
        except Exception as e:
            logging.error(f"Erro ao processar {arquivo}: {e}")
    return lista_invertida, total_docs, total_palavras

def salvar_lista_invertida(lista_invertida, caminho_saida):
    logging.info(f"Salvando lista invertida em {caminho_saida}")
    with open(caminho_saida, "w", encoding="utf-8", newline="") as f:
        writer = csv.writer(f, delimiter=";")
        writer.writerow(["Palavra", "Documentos"])
        for palavra, docs in sorted(lista_invertida.items()):
            writer.writerow([palavra, str(docs)])

def main():
    logging.info("===== Início do módulo Gerador de Lista Invertida =====")
    inicio_total = time.time()
    try:
        arquivos_xml, saida_csv = ler_configuracao()
        lista_invertida, total_docs, total_palavras = processar_documentos(arquivos_xml)
        salvar_lista_invertida(lista_invertida, saida_csv)
        tempo_total = time.time() - inicio_total
        logging.info(f"Total de documentos processados: {total_docs}")
        logging.info(f"Total de palavras indexadas: {total_palavras}")
        logging.info(f"Total de termos únicos: {len(lista_invertida)}")
        logging.info(f"Tempo total: {tempo_total:.2f} segundos")
    except Exception as e:
        logging.error(f"Erro geral: {e}")
    logging.info("===== Fim do módulo Gerador de Lista Invertida =====")

# Executar o módulo
main()



In [None]:
with open("gerador_lista_invertida.log", "r", encoding="utf-8") as f:
    print(f.read())

### Visualizando o arquivo csv gerado parcialmente (até o item 10)

In [None]:
import pandas as pd

df_lista = pd.read_csv("lista_invertida.csv", sep=";")
df_lista.head(10)

# Módulo 3 - Indexador

### Criar arquivo de configuraçao do indexador

In [None]:
def criar_index_cfg():
    with open("INDEX.CFG", "w", encoding="utf-8") as f:
        f.write("LEIA=lista_invertida.csv\n")
        f.write("ESCREVA=modelo_vetorial.pkl\n")
    print("Arquivo INDEX.CFG criado com sucesso!")

# Executar
criar_index_cfg()

In [None]:
import logging
import time
import csv
import math
import pickle
from collections import defaultdict

# Reconfigura logger se necessário
for handler in logging.root.handlers[:]:
    logging.root.removeHandler(handler)

logging.basicConfig(
    filename='indexador.log',
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

def ler_configuracao_indexador(caminho_cfg="INDEX.CFG"):
    logging.info("Lendo arquivo de configuração INDEX.CFG")
    with open('/content/INDEX.CFG', "r", encoding="utf-8") as f:
        linhas = [linha.strip() for linha in f.readlines()]
    leia = None
    escreva = None
    for linha in linhas:
        if linha.startswith("LEIA="):
            leia = linha.split("=")[1].strip()
        elif linha.startswith("ESCREVA="):
            escreva = linha.split("=")[1].strip()
    if not leia or not escreva:
        raise ValueError("Arquivo de configuração INDEX.CFG inválido.")
    return leia, escreva

def carregar_lista_invertida(caminho_csv):
    logging.info(f"Carregando lista invertida de {'/content/lista_invertida.csv'}")
    lista_invertida = {}
    with open('/content/lista_invertida.csv', "r", encoding="utf-8") as f:
        reader = csv.reader(f, delimiter=";")
        next(reader)  # pula cabeçalho
        for linha in reader:
            termo = linha[0]
            docs_str = linha[1]
            docs = eval(docs_str)  # converte a lista a partir da string
            lista_invertida[termo] = docs
    return lista_invertida

def calcular_tfidf(lista_invertida):
    logging.info("Calculando TF-IDF")
    modelo = defaultdict(dict)
    total_docs = set()

    # Passo 1: contar DF (quantos docs cada termo aparece)
    df = {termo: len(set(docs)) for termo, docs in lista_invertida.items()}

    # Passo 2: identificar todos os documentos
    for docs in lista_invertida.values():
        total_docs.update(docs)
    N = len(total_docs)

    # Passo 3: calcular TF-IDF para cada termo/doc
    for termo, docs in lista_invertida.items():
        tf_counts = defaultdict(int)
        for doc_id in docs:
            tf_counts[doc_id] += 1

        for doc_id, tf in tf_counts.items():
            idf = math.log(N / df[termo]) if df[termo] != 0 else 0
            peso = tf * idf
            modelo[doc_id][termo] = peso

    return dict(modelo)

def salvar_modelo(modelo, caminho_saida):
    logging.info(f"Salvando modelo vetorial em {caminho_saida}")
    with open(caminho_saida, "wb") as f:
        pickle.dump(modelo, f)

def main():
    logging.info("===== Início do módulo Indexador =====")
    inicio = time.time()
    try:
        entrada_csv, saida_modelo = ler_configuracao_indexador()
        lista_invertida = carregar_lista_invertida(entrada_csv)
        modelo = calcular_tfidf(lista_invertida)
        salvar_modelo(modelo, saida_modelo)
        logging.info(f"Modelo criado com {len(modelo)} documentos.")
        logging.info(f"Tempo total: {time.time() - inicio:.2f} segundos.")
    except Exception as e:
        logging.error(f"Erro no indexador: {e}")
    logging.info("===== Fim do módulo Indexador =====")

# Executar o módulo
main()

with open("indexador.log", "r", encoding="utf-8") as f:
    print (f.read())


# Módulo 4 - Buscador

### Criaçao do arquivo de configuraçao do buscador

In [None]:
def criar_busca_cfg():
    with open("BUSCA.CFG", "w", encoding="utf-8") as f:
        f.write("MODELO=modelo_vetorial.pkl\n")
        f.write("CONSULTAS=consultas.csv\n")
        f.write("RESULTADOS=resultados.csv\n")
    print("Arquivo BUSCA.CFG criado com sucesso!")

criar_busca_cfg()

### Buscador pelo princípio de similaridade de cosseno

In [None]:
import logging
import time
import pickle
import csv
import math
import re
import unicodedata
import pandas as pd
from collections import defaultdict

# Reconfigura logger
for handler in logging.root.handlers[:]:
    logging.root.removeHandler(handler)

logging.basicConfig(
    filename='buscador.log',
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

def remover_acentos(texto):
    nfkd = unicodedata.normalize('NFKD', texto)
    return ''.join([c for c in nfkd if not unicodedata.combining(c)])

def normalizar_texto(texto):
    texto = remover_acentos(texto)
    texto = re.sub(r'[^A-Za-z ]+', ' ', texto)
    return texto.upper().strip()

def ler_configuracao_busca(cfg_path="BUSCA.CFG"):
    logging.info("Lendo BUSCA.CFG")
    with open('/content/BUSCA.CFG', "r", encoding="utf-8") as f:
        linhas = [linha.strip() for linha in f.readlines()]
    config = {}
    for linha in linhas:
        chave, valor = linha.split("=")
        config[chave.strip()] = valor.strip()
    return config["MODELO"], config["CONSULTAS"], config["RESULTADOS"]

def carregar_modelo(caminho_modelo):
    logging.info(f"Carregando modelo de {caminho_modelo}")
    with open('/content/modelo_vetorial.pkl', "rb") as f:
        return pickle.load(f)

def carregar_consultas(caminho_csv):
    df = pd.read_csv('/content/consultas.csv', sep=";")
    return df.to_dict(orient="records")

def vetorizar_consulta(texto_normalizado):
    termos = [t for t in texto_normalizado.split() if len(t) >= 2 and t.isalpha()]
    vetor = defaultdict(float)
    for termo in termos:
        vetor[termo] = 1.0  # peso fixo
    return dict(vetor)

def cosseno(v1, v2):
    intersecao = set(v1.keys()) & set(v2.keys())
    numerador = sum(v1[t] * v2[t] for t in intersecao)
    denom1 = math.sqrt(sum(v**2 for v in v1.values()))
    denom2 = math.sqrt(sum(v**2 for v in v2.values()))
    if denom1 == 0 or denom2 == 0:
        return 0.0
    return numerador / (denom1 * denom2)

def buscar(modelo, consultas):
    resultados = []
    for consulta in consultas:
        qid = consulta["QueryNumber"]
        texto = normalizar_texto(consulta["QueryText"])
        vetor_q = vetorizar_consulta(texto)
        ranking = []
        for doc_id, vetor_d in modelo.items():
            sim = cosseno(vetor_q, vetor_d)
            if sim > 0:
                ranking.append((doc_id, sim))
        ranking.sort(key=lambda x: x[1], reverse=True)
        ranking_formatado = [(i+1, doc_id, round(sim, 5)) for i, (doc_id, sim) in enumerate(ranking)]
        resultados.append([qid, str(ranking_formatado)])
    return resultados

def salvar_resultados(resultados, caminho_saida):
    logging.info(f"Salvando resultados em {caminho_saida}")
    with open(caminho_saida, "w", encoding="utf-8", newline="") as f:
        writer = csv.writer(f, delimiter=";")
        writer.writerow(["QueryNumber", "Ranking"])
        writer.writerows(resultados)

def main():
    logging.info("===== Início do módulo Buscador =====")
    inicio = time.time()
    try:
        modelo_path, consultas_path, saida_path = ler_configuracao_busca()
        modelo = carregar_modelo(modelo_path)
        consultas = carregar_consultas(consultas_path)
        resultados = buscar(modelo, consultas)
        salvar_resultados(resultados, saida_path)
        logging.info(f"Consultas processadas: {len(consultas)}")
        logging.info(f"Tempo total: {time.time() - inicio:.2f} segundos")
    except Exception as e:
        logging.error(f"Erro no buscador: {e}")
    logging.info("===== Fim do módulo Buscador =====")

# Executar
main()

In [None]:
with open("buscador.log", "r", encoding="utf-8") as f:
    print (f.read())

In [None]:
df_resultados = pd.read_csv("resultados.csv", sep=";")
df_resultados.head()

## ✅ Considerações Finais

O sistema foi implementado com sucesso seguindo os princípios de um modelo vetorial de recuperação da informação.  
Todos os módulos foram testados com a base Cystic Fibrosis e o formato de arquivos foi mantido conforme especificações.  

**Módulos implementados**:
- Leitura e processamento de XML com configuração
- Lista invertida com frequência real
- Cálculo do modelo vetorial com TF-IDF
- Buscador com similaridade cosseno

📁 Resultados disponíveis nos arquivos:
- `consultas.csv`, `esperados.csv`
- `lista_invertida.csv`
- `modelo_vetorial.pkl`
- `resultados.csv`