<a href="https://colab.research.google.com/github/Cehiim/TeoriaDosGrafos/blob/main/Projeto/grafos_RAG.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Setup

## Integração dos pacotes

O pacote `vectordb2` é usado para armazenar e recuperar textos usando técnicas de *chunking* (segmentação de texto), *embedding* (conversão de texto para vetores numéricos) e busca vetorial.

In [16]:
%pip install vectordb2



O pacote requests pode ser usado para recuperar o arquivo por meio de requisição em HTTP (esse pacote é opcional).

In [17]:
%pip install requests



O pacote `networkx` é usado para a criação, manipulação e representação de grafos.

In [18]:
%pip install networkx



Importação das bibliotecas

In [19]:
from vectordb import Memory
import requests
import networkx as nx
import matplotlib.pyplot as plt # Será usado para apresentação visual do grafo
import os # Será usado métodos para limpar o terminal para atualizar a interface em cada iteração do sistema
import time # Será usado método de espera para atualizar a interface gradualmente

## Classe Grafo

In [65]:
# -*- coding: utf-8 -*-
"""
Created on Mon Feb 13 13:59:10 2023

@author: icalc
"""
class Grafo:
    TAM_MAX_DEFAULT = 100 # qtde de vértices máxima default
    # construtor da classe grafo
    def __init__(self, n=TAM_MAX_DEFAULT):
        self.n = n # número de vértices
        self.m = 0 # número de arestas
        # matriz de adjacência
        self.adj = [[0 for i in range(n)] for j in range(n)]

	# Insere uma aresta no Grafo tal que
	# v é adjacente a w
    def insereA(self, v, w):
        if self.adj[v][w] == 0:
            self.adj[v][w] = 1
            self.m+=1 # atualiza qtd arestas

# remove uma aresta v->w do Grafo
    def removeA(self, v, w):
        if(v == w):
            return
        # testa se temos a aresta
        if self.adj[v][w] == 1:
            self.adj[v][w] = 0
            self.m -= 1  # atualiza qtd arestas

	# Apresenta o Grafo contendo
	# número de vértices, arestas
	# e a matriz de adjacência obtida
    def show(self):
        print(f"\n n: {self.n:2d} ", end="")
        print(f"m: {self.m:2d}\n")
        for i in range(self.n):
            for w in range(self.n):
                if self.adj[i][w] == 1:
                    print(f"Adj[{i:2d},{w:2d}] = 1 ", end="")
                else:
                    print(f"Adj[{i:2d},{w:2d}] = 0 ", end="")
            print("\n")
        print("\nfim da impressao do grafo." )


	# Apresenta o Grafo contendo
	# número de vértices, arestas
	# e a matriz de adjacência obtida
    # Apresentando apenas os valores 0 ou 1
    def showMin(self):
        print(f"\n n: {self.n:2d} ", end="")
        print(f"m: {self.m:2d}\n")
        for i in range(self.n):
            for w in range(self.n):
                if self.adj[i][w] == 1:
                    print(" 1 ", end="")
                else:
                    print(" 0 ", end="")
            print("\n")
        print("\nfim da impressao do grafo." )

    def plota_grafo(self):
        # Criar um grafo dirigido usando a matriz de adjacência
        G = nx.DiGraph()  # Grafo dirigido

        # Adicionar vértices e arestas
        for i in range(self.n):
            for j in range(self.n):
                if self.adj[i][j] == 1:
                    G.add_edge(i, j)

        # Plotar o grafo
        plt.figure(figsize=(8, 8))
        pos = nx.spring_layout(G)  # Layout para a posição dos nós
        nx.draw(G, pos, with_labels=True, node_color='lightblue', node_size=500, font_size=10, font_color='black', arrowstyle='-|>', arrowsize=20)
        plt.title(f"Grafo com {self.n} vértices e {self.m} arestas")
        plt.show()

## Classe GrafoR (Grafo direcionado rotulado)

In [66]:
# Grafo como uma matriz de adjacência rotulado
class GrafoR(Grafo): # Ex 16
# Não bota o init, vai bugar a classe

    def insereA(self, v, w, p):
        if self.adj[v][w] == 0:
            self.adj[v][w] = p
            self.m += 1  # atualiza qtd arestas

    def show(self):
        print(f"\n n: {self.n:2d} ", end="")
        print(f"m: {self.m:2d}\n")
        for i in range(self.n):
            for w in range(self.n):
                print(f"Adj[{i:2d},{w:2d}] = {self.adj[i][w]:.2f} ", end="")
            print("\n")
        print("\nfim da impressao do grafo." )


	# Apresenta o Grafo contendo
	# número de vértices, arestas
	# e a matriz de adjacência obtida
    # Apresentando apenas os valores 0 ou 1
    def showMin(self):
        print(f"\n n: {self.n:2d} ", end="")
        print(f"m: {self.m:2d}\n")
        for i in range(self.n):
            for w in range(self.n):
                print(f" {self.adj[i][w]:.2f} ", end="")
            print("\n")
        print("\nfim da impressao do grafo." )

## Classe Memory

Aqui é utilizado a biblioteca VectorDB para criar uma memória virtual.

```
memoria = Memory(chunking_strategy={"mode": "sliding_window", "window_size": 1, "overlap": 0})
```

- `chunking_strategy`: Define a estratégia de fragmentação dos dados. No modo "sliding_window", os dados são divididos em *chunks* (pedaços de texto) de tamanho fixo.

- `window_size`: Define a quantidade de palavras que um *chunk* representa. Neste caso, cada *chunk* representa uma palavra.

- `overlap`: Define quantos elementos de sobreposição existirão entre os *chunks* adjacentes. Neste caso, não haverá sobreposição já que as palavras usadas não formam frases, logo são independentes uma das outras.

### INIT



```
    def __init__(
        self,
        memory_file: str = None,
        chunking_strategy: dict = None,
        embeddings: Union[BaseEmbedder, str] = "normal",
    ):
        """
        Initializes the Memory class.

        :param memory_file: a string containing the path to the memory file. (default: None)
        :param chunking_strategy: a dictionary containing the chunking mode (default: {"mode": "sliding_window"}).
        :param embedding_model: a string containing the name of the pre-trained model to be used for embeddings (default: "sentence-transformers/all-MiniLM-L6-v2").
        """
        self.memory_file = memory_file

        if memory_file is None:
            self.memory = []
            self.metadata_memory = []
        else:
            load = Storage(memory_file).load_from_disk()       
            self.memory = [] if len(load) != 1 else load[0]["memory"]
            self.metadata_memory = [] if len(load) != 1 else load[0]["metadata"]

        if chunking_strategy is None:
            chunking_strategy = {"mode": "sliding_window"}
        self.chunker = Chunker(chunking_strategy)

        self.metadata_index_counter = 0
        self.text_index_counter = 0

        if isinstance(embeddings, str):
            self.embedder = Embedder(embeddings)
        elif isinstance(embeddings, BaseEmbedder):
            self.embedder = embeddings
        else:
            raise TypeError("Embeddings must be an Embedder instance or string")

        self.vector_search = VectorSearch()
```



### SAVE



```
    def save(
        self,
        texts,
        metadata: Union[List, List[dict], None] = None,
        memory_file: str = None,
    ):
        """
        Saves the given texts and metadata to memory.

        :param texts: a string or a list of strings containing the texts to be saved.
        :param metadata: a dictionary or a list of dictionaries containing the metadata associated with the texts.
        :param memory_file: a string containing the path to the memory file. (default: None)
        """

        if not isinstance(texts, list):
            texts = [texts]

        if metadata is None:
            metadata = []
        elif not isinstance(metadata, list):
            metadata = [metadata]

        # Extend metadata to be the same length as texts, if it's shorter.
        metadata += [{}] * (len(texts) - len(metadata))

        for meta in metadata:
            self.metadata_memory.append(meta)

        meta_index_start = (
            self.metadata_index_counter
        )  # Starting index for this save operation
        self.metadata_index_counter += len(
            metadata
        )  # Update the counter for future save operations

        if memory_file is None:
            memory_file = self.memory_file

        text_chunks = [self.chunker(text) for text in texts]
        chunks_size = [len(chunks) for chunks in text_chunks]

        flatten_chunks = list(itertools.chain.from_iterable(text_chunks))

        embeddings = self.embedder.embed_text(flatten_chunks)

        text_index_start = (
            self.text_index_counter
        )  # Starting index for this save operation
        self.text_index_counter += len(texts)

        # accumulated size is end_index of each chunk
        for size, end_index, chunks, meta_index, text_index in zip(
            chunks_size,
            itertools.accumulate(chunks_size),
            text_chunks,
            range(meta_index_start, self.metadata_index_counter),
            range(text_index_start, self.text_index_counter),
        ):
            start_index = end_index - size
            chunks_embedding = embeddings[start_index:end_index]

            for chunk, embedding in zip(chunks, chunks_embedding):
                entry = {
                    "chunk": chunk,
                    "embedding": embedding,
                    "metadata_index": meta_index,
                    "text_index": text_index,
                }
                self.memory.append(entry)

        if memory_file is not None:
            Storage(self.memory_file).save_to_disk([{"memory": self.memory, "metadata" :self.metadata_memory}])
```



### SEARCH



```
    def search(
        self, query: str, top_n: int = 5, unique: bool = False, batch_results: str = "flatten"
    ) -> List[Dict[str, Any]]:
        """
        Searches for the most similar chunks to the given query in memory.

        :param query: a string containing the query text.
        :param top_n: the number of most similar chunks to return. (default: 5)
        :param unique: chunks are filtered out to unique texts (default: False)
        :param batch_results: if input is list of queries, results can use "flatten" or "diverse" algorithm
        :return: a list of dictionaries containing the top_n most similar chunks and their associated metadata.
        """

        if isinstance(query, list):
            query_embedding = self.embedder.embed_text(query)
        else:
            query_embedding = self.embedder.embed_text([query])[0]

        
        embeddings = [entry["embedding"] for entry in self.memory]

        indices = self.vector_search.search_vectors(query_embedding, embeddings, top_n, batch_results)
        if unique:
            unique_indices = []
            seen_text_indices = set()  # Change the variable name
            for i in indices:
                text_index = self.memory[i[0]][
                    "text_index"
                ]  # Use text_index instead of metadata_index
                if (
                    text_index not in seen_text_indices
                ):  # Use seen_text_indices instead of seen_meta_indices
                    unique_indices.append(i)
                    seen_text_indices.add(
                        text_index
                    )  # Use seen_text_indices instead of seen_meta_indices
            indices = unique_indices

        results = [
            {
                "chunk": self.memory[i[0]]["chunk"],
                "metadata": self.metadata_memory[self.memory[i[0]]["metadata_index"]],
                "distance": i[1],
            }
            for i in indices
        ]

        return results
```



# Métodos

## 1. Ler dados

### Aquisição dos dados

Os dados do documento são importados e guardados na variável `dados`.

In [71]:
def leArquivoHTTP():
  arquivo = requests.get('https://raw.githubusercontent.com/Cehiim/TeoriaDosGrafos/refs/heads/main/Projeto/palavras.txt').text

  lista = arquivo.split()
  n_palavras = int(lista.pop(0))
  vertices = []
  for i in range(n_palavras):
    vertice = {
        "palavra": lista[i],
        "indice": i,
        "proximos": []
    }
    vertices.append(vertice)

  dados = [n_palavras]
  dados.append(vertices)

  return dados

In [72]:
def leArquivo(origem):
  with open(origem, 'r', encoding='utf-8') as arquivo:
    n_palavras = int(arquivo.readline())

    vertices = []
    for i in range(n_palavras):
      vertice = {
          "palavra": arquivo.readline().strip(),
          "indice": i,
          "proximos": []
      }
      vertices.append(vertice)

  dados = [n_palavras]
  dados.append(vertices)

  return dados

In [73]:
d = leArquivoHTTP()
#d = leArquivo("./Projeto/palavras.txt")
print(d)

[50, [{'palavra': 'Ecossistema', 'indice': 0, 'proximos': []}, {'palavra': 'Sustentabilidade', 'indice': 1, 'proximos': []}, {'palavra': 'Biodiversidade', 'indice': 2, 'proximos': []}, {'palavra': 'Reciclagem', 'indice': 3, 'proximos': []}, {'palavra': 'Conservação', 'indice': 4, 'proximos': []}, {'palavra': 'Poluição', 'indice': 5, 'proximos': []}, {'palavra': 'Desmatamento', 'indice': 6, 'proximos': []}, {'palavra': 'Reflorestamento', 'indice': 7, 'proximos': []}, {'palavra': 'Erosão', 'indice': 8, 'proximos': []}, {'palavra': 'Compostagem', 'indice': 9, 'proximos': []}, {'palavra': 'Biodegradável', 'indice': 10, 'proximos': []}, {'palavra': 'Emissões', 'indice': 11, 'proximos': []}, {'palavra': 'Pegada', 'indice': 12, 'proximos': []}, {'palavra': 'Recursos', 'indice': 13, 'proximos': []}, {'palavra': 'Preservação', 'indice': 14, 'proximos': []}, {'palavra': 'Ecologia', 'indice': 15, 'proximos': []}, {'palavra': 'Habitat', 'indice': 16, 'proximos': []}, {'palavra': 'Fauna', 'indice':

### Embedding

Cada palavra é convertida para um vetor numérico e guardada na memória.

In [74]:
def embedding(memoria, vertices, n_palavras):
  for i in range(n_palavras):
    memoria.save(vertices[i]["palavra"])

### Busca vetorial

Quanto menor é a distância, maior é a proximidade semântica.

In [75]:
def buscaVetorial(memoria, palavra):
  busca = memoria.search(palavra, top_n=6)
  return busca

A palavra mais próxima armazenada na memória é ela mesma, portanto para encontrar as outras cinco palavras mais próximas foi recuperado as palavras de índice 1 até 6.

In [76]:
m = Memory(chunking_strategy={"mode": "sliding_window", "window_size": 1, "overlap": 0})
n = d[0]
vs = d[1]

embedding(m, vs, n)

b = buscaVetorial(m, "Biodiversidade")
print(b)
print(f"\n\nBusca: Biodiversidade\n")
for i in range(1,6):
  palavra = b[i]['chunk']
  distancia = b[i]['distance']
  print(f"Palavra: {palavra}\nDistância: {distancia:.2f}\n")

Initiliazing embeddings:  normal
OK.
[{'chunk': 'Biodiversidade', 'metadata': {}, 'distance': 0.0}, {'chunk': 'Sustentabilidade', 'metadata': {}, 'distance': 0.44970125}, {'chunk': 'Agroecologia', 'metadata': {}, 'distance': 0.492421}, {'chunk': 'Biotecnologia', 'metadata': {}, 'distance': 0.49487936}, {'chunk': 'Biodegradável', 'metadata': {}, 'distance': 0.5287854}, {'chunk': 'Bioma', 'metadata': {}, 'distance': 0.5513243}]


Busca: Biodiversidade

Palavra: Sustentabilidade
Distância: 0.45

Palavra: Agroecologia
Distância: 0.49

Palavra: Biotecnologia
Distância: 0.49

Palavra: Biodegradável
Distância: 0.53

Palavra: Bioma
Distância: 0.55



### Integração no grafo

In [79]:
def buscaIndice(n_palavras, vertices, palavra):
  for i in range(n_palavras):
    if(vertices[i]["palavra"] == palavra):
      return vertices[i]["indice"]
  return -1

In [80]:
def integraGrafo(memoria, n_palavras, vertices):
  grafo = GrafoR(n_palavras)
  for i in range(n_palavras):
    busca = buscaVetorial(memoria, vertices[i]["palavra"])
    proximos = []
    for j in range(1,6):
      palavra = busca[j]['chunk']
      distancia = busca[j]['distance']
      proximo = {
          "vizinho": palavra,
          "indice": buscaIndice(n_palavras, vertices, palavra),
          "distancia": distancia
      }
      proximos.append(proximo)
    vertices[i]["proximos"] = proximos

In [81]:
integraGrafo(m, d[0], d[1])
print(d[1])

[{'palavra': 'Ecossistema', 'indice': 0, 'proximos': [{'vizinho': 'Ecologia', 'indice': 15, 'distancia': 0.44486222}, {'vizinho': 'Economia', 'indice': 23, 'distancia': 0.48550734}, {'vizinho': 'Bioma', 'indice': 20, 'distancia': 0.57568085}, {'vizinho': 'Biodiversidade', 'indice': 2, 'distancia': 0.5811522}, {'vizinho': 'Sustentabilidade', 'indice': 1, 'distancia': 0.59003323}]}, {'palavra': 'Sustentabilidade', 'indice': 1, 'proximos': [{'vizinho': 'Biodiversidade', 'indice': 2, 'distancia': 0.44970125}, {'vizinho': 'Economia', 'indice': 23, 'distancia': 0.5571618}, {'vizinho': 'Ambiental', 'indice': 45, 'distancia': 0.5643711}, {'vizinho': 'Ecossistema', 'indice': 0, 'distancia': 0.59003323}, {'vizinho': 'Biotecnologia', 'indice': 31, 'distancia': 0.60961545}]}, {'palavra': 'Biodiversidade', 'indice': 2, 'proximos': [{'vizinho': 'Sustentabilidade', 'indice': 1, 'distancia': 0.44970125}, {'vizinho': 'Agroecologia', 'indice': 19, 'distancia': 0.492421}, {'vizinho': 'Biotecnologia', 'in

## 2. Gravar dados

## 3. Inserir vértice

## 4. Inserir aresta

## 5. Remover vértice

## 6. Remover aresta

## 7. Mostrar conteúdo

## 8. Mostrar grafo

## 9. Apresentar conexidade do grafo e o reduzido

# Menu

In [30]:
memoria = Memory(chunking_strategy={"mode": "sliding_window", "window_size": 1, "overlap": 0})
fim = False

while(fim == False):
    print(
'''
Menu:
    1) Ler dados do arquivo em python
    2) Gravar dados no arquivo grafo.txt
    3) Inserir vértice
    4) Inserir aresta
    5) Remove vértice
    6) Remove aresta
    7) Mostrar conteúdo do arquivo
    8) Mostrar grafo
    9) Apresentar a conexidade do grafo e o reduzido
    10) Encerrar a aplicação
''')
    choice = int(input())
    if choice == 1: # Lê grafo
        dados = leArquivoHTTP()
        #dados = leArquivo("palavras.txt")
        embedding(memoria, dados["palavras"], dados["n_palavras"])
        print("Grafo lido com sucesso!")

    elif choice == 2: # Grava dados no arquivo .txt
        print("Dados salvos com sucesso!")

    elif choice == 3: # Insere vértice
        print("Vértice inserido com sucesso!")

    elif choice == 4: # Insere aresta
        print("Arestas inseridas com sucesso!")

    elif choice == 5: # Remove vértice
        print("Vértice removido com sucesso!")

    elif choice == 6: # Remove varesta
        print("Aresta removida com sucesso!")

    elif choice == 7: # Imprime arquivo
        print("oi")

    elif choice == 8: # Exibe grafo
        print("oi")

    elif choice == 9: # Apresenta a conexidade do grafo e grafo reduzido
        print("oi")

    elif choice == 10: # Encerra
        fim = True
        print("Encerrando programa...")

    else:
        print("Opção inválida.")

    time.sleep(4) # Volta para o menu após 4 segundos

    if os.name == 'nt': # Limpa o terminal
        os.system('cls') # Caso o OS seja Windows
    else:
        os.system('clear') # Caso o OS seja Linux ou MacOS

Initiliazing embeddings:  normal
OK.

Menu:
    1) Ler dados do arquivo em python
    2) Gravar dados no arquivo grafo.txt
    3) Inserir vértice
    4) Inserir aresta
    5) Remove vértice
    6) Remove aresta
    7) Mostrar conteúdo do arquivo
    8) Mostrar grafo
    9) Apresentar a conexidade do grafo e o reduzido
    10) Encerrar a aplicação

1
Grafo lido com sucesso!

Menu:
    1) Ler dados do arquivo em python
    2) Gravar dados no arquivo grafo.txt
    3) Inserir vértice
    4) Inserir aresta
    5) Remove vértice
    6) Remove aresta
    7) Mostrar conteúdo do arquivo
    8) Mostrar grafo
    9) Apresentar a conexidade do grafo e o reduzido
    10) Encerrar a aplicação

10
Encerrando programa...
