<a href="https://colab.research.google.com/github/CeHaga-UFRJ/bmt-ufrj-2024.P1/blob/main/BMT_Trab1_Carlos.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Busca e Mineração de Texto - 2024.1
Carlos Bravo - 124066176

## Bibliotecas e Dados

In [1]:
!git clone https://github.com/CeHaga-UFRJ/bmt-ufrj-2024.P1.git repo
!mv repo/data data
!mv repo/config config
!mkdir results

!pip3 install nltk
!pip3 install lxml
!pip3 install unidecode
!pip3 install numpy

fatal: destination path 'repo' already exists and is not an empty directory.
mv: cannot stat 'repo/data': No such file or directory
mv: cannot stat 'repo/config': No such file or directory
mkdir: cannot create directory ‘results’: File exists


In [2]:
import nltk
from nltk.tokenize import word_tokenize
from lxml import etree
from unidecode import unidecode
from math import log, sqrt
import numpy as np

nltk.download("punkt")
nltk.download('stopwords')

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [3]:
def get_xml_root(file_name):
    tree = etree.parse(file_name)
    return tree.getroot()

def text_normalize(text):
    # Uppercase
    text = text.upper()

    # Remove accents
    text = unidecode(text)

    # Remove trailing spaces
    text = text.strip()

    # Remove semi-colons
    text = text.replace(";", "")

    # Remove newlines
    text = text.replace("\n", " ")

    # Change multiple spaces to single space
    text = " ".join(text.split())

    return text

def text_to_tokens(text):
    # Tokenize text
    text = word_tokenize(text)

    # Remove small words
    text = [word for word in text if len(word) > 1]

    # Remove words with numbers
    text = [word for word in text if not any(char.isdigit() for char in word)]

    # Remove stopwords
    stopwords = set(nltk.corpus.stopwords.words('english'))
    text = [word for word in text if word.lower() not in stopwords]

    # Stemming
    stemmer = nltk.stem.PorterStemmer()
    text = [stemmer.stem(word) for word in text]

    # Uppercase
    text = [word.upper() for word in text]

    return text

## Processador

In [4]:
class Processer:
  def __init__(self, cfg_file):
    self.xml_root = None
    self.query_file = ""
    self.expected_file = ""
    self._read_cfg(cfg_file)

  def _read_cfg(self, cfg_file):
    with open(cfg_file, 'r') as config:
      for line in config:
        command, file_name = line.replace("\n","").split("=")
        if(command == 'LEIA'):
          root = get_xml_root(file_name)
          self.xml_root = root
        elif(command == 'CONSULTAS'):
          self.query_file = file_name
        elif(command == 'ESPERADOS'):
          self.expected_file = file_name

  def process(self):
    self._create_query_file()
    self._create_expected_file()

  def _create_query_file(self):
    print('Criando arquivo de consultas...')
    with open(self.query_file, 'w') as query_file:
      # Write headers
      query_file.write("QueryNumber;QueryText\n")

      # Process each query
      for query in self.xml_root.iter('QUERY'):
        query_number, query_text = self._get_query_data(query)
        query_file.write(f"{query_number};{query_text}\n")

    print('Arquivo de consultas criado\n')

  def _create_expected_file(self):
    print('Criando arquivo de esperados...')
    with open(self.expected_file, 'w') as expected_file:
      # Write headers
      expected_file.write("QueryNumber;DocNumber;DocVotes\n")

      # Process each query
      for query in self.xml_root.iter('QUERY'):
        for query_number, doc_num, doc_votes in self._get_expected_data(query):
          expected_file.write(f"{query_number};{doc_num};{doc_votes}\n")

    print('Arquivo de esperados criado\n')

  def _get_query_data(self, query):
    # Get query number and text
    query_number = query.find('QueryNumber').text
    query_text = query.find('QueryText').text
    query_text = text_normalize(query_text)

    return query_number, query_text

  def _get_expected_data(self, query):
    query_number = query.find('QueryNumber').text
    records = query.find('Records')
    for item in records.iter('Item'):
      doc_num = item.text
      score = item.get('score')
      doc_votes = len(score) - score.count('0')
      yield query_number, doc_num, doc_votes


In [5]:
processer = Processer('config/pc.cfg')
processer.process()

Criando arquivo de consultas...
Arquivo de consultas criado

Criando arquivo de esperados...
Arquivo de esperados criado



## Gerador de Lista Invertida

In [6]:
class InvertedList:
  def __init__(self, cfg_file):
    self.inverted_list = {}
    self.xml_root = []
    self.output_file = ""
    self._read_cfg(cfg_file)

  def invert(self):
    print('Criando lista invertida...')
    # Process each XML file
    for root in self.xml_root:
      for record in root.iter('RECORD'):
        # Get abstract from record
        abstract = self._get_abstract_from_record(record)
        if not abstract: continue

        # Tokenize abstract
        words = text_normalize(abstract)
        words = text_to_tokens(words)

        # Get document number
        doc_num = record.find('RECORDNUM').text.strip()

        # Create inverted list
        for word in words:
          if word not in self.inverted_list:
            self.inverted_list[word] = []
          self.inverted_list[word].append(doc_num)

    # Write inverted list to file
    with open(self.output_file, 'w') as output_file:
      for word, doc_list in self.inverted_list.items():
        output_file.write(f"{word};{doc_list}\n")

    print('Lista invertida criada\n')

  def _read_cfg(self, cfg_file):
    with open(cfg_file, 'r') as config:
      for line in config:
        command, file_name = line.replace("\n","").split("=")
        if(command == 'LEIA'):
          root = get_xml_root(file_name)
          self.xml_root.append(root)
        elif(command == 'ESCREVA'):
          self.output_file = file_name

  def _get_abstract_from_record(self, record):
    abstract_element = record.find('ABSTRACT')
    if abstract_element is not None:
      return abstract_element.text.strip()

    abstract_element = record.find('EXTRACT')
    if abstract_element is not None:
      return abstract_element.text.strip()

    return None

In [7]:
inverted_list = InvertedList('config/gli.cfg')
inverted_list.invert()

Criando lista invertida...
Lista invertida criada



## Indexador

In [8]:
class Indexer:
    def __init__(self, cfg_file):
        self.word_freq = {}
        self.max_freq_doc = {}
        self.inverted_list_file = ""
        self.output_model_file = ""
        self._read_cfg(cfg_file)

    def index(self):
        print('Indexando...')
        self._read_inverted_list(self.inverted_list_file)
        self._create_model()
        print('Indexação concluída\n')

    def _read_inverted_list(self, inverted_list_file):
        with open(self.inverted_list_file, 'r') as inverted_list_file:
            for line in inverted_list_file:
                word, doc_list = line.replace("\n","").split(";")
                doc_list = doc_list.replace("[","").replace("]","").replace(" ","").split(",")
                for doc in doc_list:
                    if word not in self.word_freq:
                        self.word_freq[word] = {}
                    if doc not in self.word_freq[word]:
                        self.word_freq[word][doc] = 0
                    self.word_freq[word][doc] += 1
                    if doc not in self.max_freq_doc:
                        self.max_freq_doc[doc] = 0
                    if self.word_freq[word][doc] > self.max_freq_doc[doc]:
                        self.max_freq_doc[doc] = self.word_freq[word][doc]

    def _create_model(self):
        with open(self.output_model_file, 'w') as output_file:
            for word in self.word_freq:
                idf = self._get_idf(word)
                for doc in self.word_freq[word]:
                    tf = self._get_tf(word, doc)
                    tf_idf = tf * idf
                    output_file.write(f"{word};{doc};{tf_idf}\n")

    def _read_cfg(self, cfg_file):
        with open(cfg_file, 'r') as config:
            for line in config:
                command, file_name = line.replace("\n","").split("=")
                if(command == 'LEIA'):
                    self.inverted_list_file = file_name
                elif(command == 'ESCREVA'):
                    self.output_model_file = file_name

    def _get_tf(self, word, doc):
        if word not in self.word_freq and doc in self.word_freq[word]:
            return 0
        tf = self.word_freq[word][doc]
        return tf / self.max_freq_doc[doc]

    def _get_n(self):
        return len(self.word_freq)

    def _get_ni(self, word):
        if word in self.word_freq:
            return len(self.word_freq[word])
        return 0

    def _get_idf(self, word):
        return log(self._get_n() / self._get_ni(word))

In [9]:
indexer = Indexer('config/index.cfg')
indexer.index()

Indexando...
Indexação concluída



## Buscador

In [10]:
class Searcher:
    def __init__(self, cfg_file):
        self.model_file = ""
        self.query_file = ""
        self.output_file = ""

        self._read_cfg(cfg_file)

        self._get_model()
        self._get_queries()

    def search(self):
        print('Buscando...')
        with open(self.output_file, 'w') as output_file:
            similarities = {}
            for query_number, query_text in self.queries.items():
                query_vector = { word: 1 for word in query_text }
                doc_vectors = {}
                for word in query_text:
                    if word in self.model:
                        for doc in self.model[word]:
                            if doc not in doc_vectors:
                                doc_vectors[doc] = {}
                            doc_vectors[doc][word] = self.model[word][doc]
                similarities[query_number] = {}
                for doc in doc_vectors:
                    similarities[query_number][doc] = self._cosine_similarity(query_vector, doc_vectors[doc])
            for query_number in similarities:
                sorted_similarities = sorted(similarities[query_number].items(), key=lambda x: x[1], reverse=True)
                i = 1
                for doc, similarity in sorted_similarities:
                    output_file.write(f"{query_number};[{i},{doc},{similarity}]\n")
                    i += 1
        print('Busca concluída\n')

    def _cosine_similarity(self, query_vector, doc_vector):
        dot_product = 0
        for word in query_vector:
            if word in doc_vector:
                dot_product += query_vector[word] * doc_vector[word]
        query_norm = sqrt(sum([value**2 for value in query_vector.values()]))
        doc_norm = sqrt(sum([value**2 for value in doc_vector.values()]))
        return dot_product / (query_norm * doc_norm)

    def _read_cfg(self, cfg_file):
        with open(cfg_file, 'r') as config:
            for line in config:
                command, file_name = line.replace("\n","").split("=")
                if(command == 'MODELO'):
                    self.model_file = file_name
                elif(command == 'CONSULTAS'):
                    self.query_file = file_name
                elif(command == 'RESULTADOS'):
                    self.output_file = file_name

    def _get_model(self):
        self.model = {}
        with open(self.model_file, 'r') as model_file:
            for line in model_file:
                word, doc, tf_idf = line.replace("\n","").split(";")
                if word not in self.model:
                    self.model[word] = {}
                self.model[word][doc] = float(tf_idf)

    def _get_queries(self):
        self.queries = {}
        with open(self.query_file, 'r') as query_file:
            for line in query_file:
                query_number, query_text = line.replace("\n","").split(";")
                query_text = text_normalize(query_text)
                query_text = text_to_tokens(query_text)
                self.queries[query_number] = query_text

In [11]:
searcher = Searcher('config/busca.cfg')
searcher.search()

Buscando...
Busca concluída

