## Retrieval-Augmented Generation (RAG) Agent Development


### Setup and Imports

In [5]:
!pip install PyPDF2 python-docx streamlit torch sentence-transformers scikit-learn plotly unittest2 llama-index llama-index-embeddings-huggingface llama-index-llms-fireworks

Collecting llama-index-llms-fireworks
  Downloading llama_index_llms_fireworks-0.1.7-py3-none-any.whl.metadata (669 bytes)
Collecting argparse (from unittest2)
  Using cached argparse-1.4.0-py2.py3-none-any.whl.metadata (2.8 kB)
Downloading llama_index_llms_fireworks-0.1.7-py3-none-any.whl (4.5 kB)
Using cached argparse-1.4.0-py2.py3-none-any.whl (23 kB)
Installing collected packages: argparse, llama-index-llms-fireworks
Successfully installed argparse-1.4.0 llama-index-llms-fireworks-0.1.7


In [26]:
import asyncio
import csv
import io
from typing import List

import PyPDF2
import docx
import streamlit as st
import torch
from llama_index.core import VectorStoreIndex, Document, Settings
from llama_index.core.node_parser import SentenceSplitter
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.llms.fireworks import Fireworks
from llama_index.core.retrievers import KeywordTableSimpleRetriever
from sentence_transformers import SentenceTransformer
from sklearn.cluster import HDBSCAN
import plotly.graph_objects as go
import unittest
from unittest.mock import Mock, patch
import nest_asyncio

### API Key Input

In [7]:
import getpass

fireworks_api_key = getpass.getpass("Enter your Fireworks API key: ")

Enter your Fireworks API key: ··········


In [27]:
nest_asyncio.apply()

print("All necessary modules imported successfully.")

All necessary modules imported successfully.


### 1. Document Preprocessing


In [31]:
class RAGSystem1:
    def __init__(self, fireworks_api_key, device="cuda"):
        self.device = device if torch.cuda.is_available() else "cpu"
        print(f"Using device: {self.device}")

        self.llm = Fireworks(
            api_key=fireworks_api_key,
            model="accounts/fireworks/models/firefunction-v1"
        )

        self.embed_model = HuggingFaceEmbedding(
            model_name="sentence-transformers/all-MiniLM-L6-v2",
            device=self.device
        )

        self.sentence_model = SentenceTransformer('all-MiniLM-L6-v2')

        Settings.llm = self.llm
        Settings.embed_model = self.embed_model

        self.document_cache = {}
        print("RAGSystem initialized.")

    @staticmethod
    def _extract_text_from_file(uploaded_file):
        file_extension = uploaded_file.name.split('.')[-1].lower()

        if file_extension == 'pdf':
            pdf_reader = PyPDF2.PdfReader(io.BytesIO(uploaded_file.read()))
            return ' '.join(page.extract_text() for page in pdf_reader.pages)

        elif file_extension == 'docx':
            doc = docx.Document(io.BytesIO(uploaded_file.read()))
            return ' '.join(paragraph.text for paragraph in doc.paragraphs)

        elif file_extension == 'csv':
            csv_content = uploaded_file.read().decode('utf-8')
            csv_reader = csv.reader(io.StringIO(csv_content))
            return ' '.join(' '.join(row) for row in csv_reader)

        elif file_extension == 'txt':
            return uploaded_file.read().decode('utf-8')

        else:
            raise ValueError(f"Unsupported file type: {file_extension}")

    def extract_text_from_file(self, uploaded_file):
        return self._extract_text_from_file(uploaded_file)

    async def generate_meaningful_chunks(self, text, chunk_size=512, chunk_overlap=50, min_cluster_size=5):
        sentences = text.split('.')
        sentence_embeddings = await asyncio.to_thread(
            self.sentence_model.encode, sentences, batch_size=64
        )

        clusterer = HDBSCAN(min_cluster_size=min_cluster_size, metric='euclidean')
        clusterer.fit(sentence_embeddings)
        labels = clusterer.labels_

        chunks = []
        for label in set(labels):
            cluster_sentences = [sentences[i] for i in range(len(sentences)) if labels[i] == label]
            if len(cluster_sentences) >= min_cluster_size:
                chunks.append(' '.join(cluster_sentences))

        return chunks

    async def process_file(self, uploaded_file, chunk_size, chunk_overlap, min_cluster_size):
        if uploaded_file.name in self.document_cache:
            return self.document_cache[uploaded_file.name]

        try:
            text = self.extract_text_from_file(uploaded_file)
            chunks = await self.generate_meaningful_chunks(text, chunk_size, chunk_overlap, min_cluster_size)
            documents = [Document(text=chunk) for chunk in chunks]
            self.document_cache[uploaded_file.name] = documents
            return documents
        except Exception as e:
            print(f"Error processing {uploaded_file.name}: {str(e)}")
            return []

# Demonstrate document preprocessing
rag_system = RAGSystem1(fireworks_api_key)
mock_file = Mock()
mock_file.name = 'test.txt'
mock_file.read.return_value = b'This is a test document about RAG systems. It contains multiple sentences. Each sentence should be processed.'

async def test_preprocessing():
    documents = await rag_system.process_file(mock_file, 512, 50, 2)
    print(f"Number of chunks generated: {len(documents)}")
    print(f"First chunk: {documents[0].text[:100]}...")

await test_preprocessing()

Using device: cpu
RAGSystem initialized.
Number of chunks generated: 1
First chunk: This is a test document about RAG systems  It contains multiple sentences  Each sentence should be p...


### 2. Retrieval System Development


In [39]:
class RAGSystem2(RAGSystem):  # Continuing the class definition
  async def create_index(self, documents):
      try:
          index = await asyncio.to_thread(VectorStoreIndex.from_documents, documents)
          return index
      except Exception as e:
          print(f"Error creating index: {str(e)}")
          return None

  def process_query(self, query, index):
      if not index:
          print("Index not available for query processing.")
          return None

      try:
          vector_retriever = index.as_retriever(similarity_top_k=5)
          vector_nodes = vector_retriever.retrieve(query)

          keyword_nodes = []
          for node in vector_nodes:
              doc_text = node.node.text
              if any(keyword in doc_text.lower() for keyword in query.lower().split()):
                  keyword_nodes.append(node)

          combined_nodes = list({node.node.node_id: node for node in vector_nodes + keyword_nodes}.values())
          print(f"Combined results before sorting: {len(combined_nodes)} nodes")

          combined_nodes.sort(
              key=lambda x: x.score + (1 if any(keyword in x.node.text.lower() for keyword in query.lower().split()) else 0),
              reverse=True
          )

          top_nodes = combined_nodes[:5]
          print(f"Top nodes: {top_nodes}")

          query_engine = index.as_query_engine()
          response = query_engine.query(query)

          return response

      except Exception as e:
          print(f"Error processing query: {str(e)}")
          return None

# Demonstrate retrieval system
async def test_retrieval():
    rag_system = RAGSystem2(fireworks_api_key)
    documents = await rag_system.process_file(mock_file, 512, 50, 2)
    index = await rag_system.create_index(documents)
    if index:
        print("Index created successfully.")
        response = rag_system.process_query("What is this document about?", index)
        if response:
            print(f"Query response: {response.response}")
    else:
        print("Failed to create index.")

await test_retrieval()

Using device: cpu
RAGSystem initialized.


  self._core_bpe = _tiktoken.CoreBPE(mergeable_ranks, special_tokens, pat_str)
  self._core_bpe = _tiktoken.CoreBPE(mergeable_ranks, special_tokens, pat_str)


Index created successfully.
Combined results before sorting: 1 nodes
Top nodes: [NodeWithScore(node=TextNode(id_='7e2a2de6-987a-4b95-8e87-95325780baf4', embedding=None, metadata={}, excluded_embed_metadata_keys=[], excluded_llm_metadata_keys=[], relationships={<NodeRelationship.SOURCE: '1'>: RelatedNodeInfo(node_id='c78b01c3-17d3-4506-9644-c867e9d520a4', node_type=<ObjectType.DOCUMENT: '4'>, metadata={}, hash='0970eba4f9de5d3ac7c0e2a44096d897ea2640b887f3f5e708036a04ce62153f')}, text='This is a test document about RAG systems  It contains multiple sentences  Each sentence should be processed', mimetype='text/plain', start_char_idx=0, end_char_idx=108, text_template='{metadata_str}\n\n{content}', metadata_template='{key}: {value}', metadata_seperator='\n'), score=0.39294020255501316)]
Query response: This document is about RAG systems.


### 3. Generation Model Integration


In [41]:
class RAGSystem3(RAGSystem):  # Continuing the class definition
    async def run_rag(self, query, uploaded_files, chunk_size, chunk_overlap, min_cluster_size):
        documents = []
        total_files = len(uploaded_files)
        print("Processing uploaded files...")
        tasks = [self.process_file(file, chunk_size, chunk_overlap, min_cluster_size) for file in uploaded_files]
        documents = []

        for i, uploaded_file in enumerate(uploaded_files, start=1):
            print(f"Processing file {i}/{total_files}: {uploaded_file.name}")
            file_documents = await self.process_file(uploaded_file, chunk_size, chunk_overlap, min_cluster_size)
            if file_documents:
                documents.extend(file_documents)

        if documents:
            print("Creating index from documents...")
            index = await self.create_index(documents)
            if index:
                response = self.process_query(query, index)
                return response
            else:
                print("No valid documents were processed. Please check your file formats and content.")
                return None
        else:
            print("No documents were processed.")
            return None

    def display_results(self, response):
        if not response:
            return

        print("Generated Response:", response.response)

        print("Source Documents:")
        for node in response.source_nodes:
            print(f"- {node.node.get_content()[:100]}...")

        scores = [node.score for node in response.source_nodes]
        fig = go.Figure(data=[go.Bar(y=scores)])
        fig.update_layout(title="Relevance Scores of Retrieved Documents",
                          xaxis_title="Document",
                          yaxis_title="Relevance Score")
        fig.show()

# Demonstrate full RAG process
async def test_full_rag():
    rag_system = RAGSystem3(fireworks_api_key)
    response = await rag_system.run_rag("What is this document about?", [mock_file], 512, 50, 2)
    if response:
        rag_system.display_results(response)
    else:
        print("No response generated.")

asyncio.run(test_full_rag())

Using device: cpu
RAGSystem initialized.
Processing uploaded files...
Processing file 1/1: test.txt
Creating index from documents...
Combined results before sorting: 1 nodes
Top nodes: [NodeWithScore(node=TextNode(id_='7818c153-ceee-414b-93d6-f05f17fd2523', embedding=None, metadata={}, excluded_embed_metadata_keys=[], excluded_llm_metadata_keys=[], relationships={<NodeRelationship.SOURCE: '1'>: RelatedNodeInfo(node_id='9539d047-e005-4385-899b-9cf39c0fd9a4', node_type=<ObjectType.DOCUMENT: '4'>, metadata={}, hash='0970eba4f9de5d3ac7c0e2a44096d897ea2640b887f3f5e708036a04ce62153f')}, text='This is a test document about RAG systems  It contains multiple sentences  Each sentence should be processed', mimetype='text/plain', start_char_idx=0, end_char_idx=108, text_template='{metadata_str}\n\n{content}', metadata_template='{key}: {value}', metadata_seperator='\n'), score=0.39294020255501316)]
Generated Response: This document is about RAG systems.
Source Documents:
- This is a test document a

  def hovertextsrc(self, val):


### 4. Testing and Evaluation


In [53]:
import unittest
from unittest.mock import patch, mock_open
import asyncio

# Mocking the SentenceTransformer for testing purposes
class MockSentenceTransformer:
    def __init__(self, *args, **kwargs):
        pass

    def encode(self, texts, **kwargs):
        return ["mock_embedding" for _ in texts]

class HuggingFaceEmbedding:
    def __init__(self, model_name):
        self._model = MockSentenceTransformer(model_name)

    def encode(self, texts):
        return self._model.encode(texts)

class RAGSystem:
    def __init__(self, api_key):
        self.api_key = api_key
        self.embed_model = HuggingFaceEmbedding("dummy_model_name")

    async def run_rag(self, query, uploaded_files, max_length, top_k, num_return_sequences):
        # Dummy response for testing purposes
        return "This is a test response."

class TestRAGSystem(unittest.TestCase):
    @patch('builtins.open', new_callable=mock_open, read_data="This is a test document.")
    def test_rag_system(self, mock_file):
        rag_system = RAGSystem("dummy_api_key")

        mock_file.name = 'test.txt'
        uploaded_files = [mock_file]
        query = "What is this document about?"

        response = asyncio.run(rag_system.run_rag(query, uploaded_files, 512, 50, 5))

        self.assertIsNotNone(response)

def evaluate_performance(responses, ground_truth):
    correct = sum(r == gt for r, gt in zip(responses, ground_truth))
    accuracy = correct / len(responses)
    return {"accuracy": accuracy}

def generate_evaluation_report(test_results, performance_metrics):
    report = "RAG System Evaluation Report\n"
    report += "===========================\n\n"

    report += "1. Unit Test Results:\n"
    for test, result in test_results.items():
        report += f"   - {test}: {'Passed' if result else 'Failed'}\n"

    report += "\n2. Performance Metrics:\n"
    for metric, value in performance_metrics.items():
        report += f"   - {metric}: {value}\n"

    report += "\n3. Simulated User Feedback:\n"
    report += "   [Include user feedback from testing sessions]\n"

    return report

# Run tests and evaluation
def run_tests_and_evaluation():
    # Run tests
    test_suite = unittest.TestLoader().loadTestsFromTestCase(TestRAGSystem)
    test_results = unittest.TextTestRunner(verbosity=2).run(test_suite)

    # Process test results
    test_results_dict = {}
    for test, error in test_results.errors + test_results.failures:
        test_results_dict[test._testMethodName] = False
    for test in test_suite:
        test_name = getattr(test, '_testMethodName', None)
        if test_name and test_name not in test_results_dict:
            test_results_dict[test_name] = True

    # Dummy implementation of rag_system for the evaluation
    rag_system = RAGSystem("dummy_api_key")
    mock_file = mock_open(read_data="This is a test document.").return_value

    # Evaluate performance (this is a simplified example)
    test_queries = ["What is RAG?"]
    test_ground_truth = ["RAG is a system that combines retrieval and generation."]
    responses = [rag_system.run_rag(query, [mock_file], 512, 50, 2) for query in test_queries]
    responses = [r if r else "" for r in asyncio.run(asyncio.gather(*responses))]
    metrics = evaluate_performance(responses, test_ground_truth)

    # Generate and print evaluation report
    report = generate_evaluation_report(test_results_dict, metrics)
    print(report)

run_tests_and_evaluation()


test_rag_system (__main__.TestRAGSystem) ... ok

----------------------------------------------------------------------
Ran 1 test in 0.010s

OK


RAG System Evaluation Report

1. Unit Test Results:

2. Performance Metrics:
   - accuracy: 0.0

3. Simulated User Feedback:
   [Include user feedback from testing sessions]



### 5. Run RAG System


In [46]:
async def main():
    print("Running full RAG system demo...")

    # Example usage with multiple mock files
    mock_file1 = Mock()
    mock_file1.name = 'test1.txt'
    mock_file1.read.return_value = b'This is a test document about RAG systems.'

    mock_file2 = Mock()
    mock_file2.name = 'test2.txt'
    mock_file2.read.return_value = b'RAG systems combine retrieval and generation for better results.'

    query = "Explain what RAG systems are and how they work."
    rag_system = RAGSystem3(fireworks_api_key)
    response = await rag_system.run_rag(query, [mock_file1, mock_file2], 512, 50, 2)

    if response:
        rag_system.display_results(response)
    else:
        print("No response generated.")

asyncio.run(main())

Running full RAG system demo...
Using device: cpu
RAGSystem initialized.
Processing uploaded files...
Processing file 1/2: test1.txt
Processing file 2/2: test2.txt
Creating index from documents...
Combined results before sorting: 2 nodes
Top nodes: [NodeWithScore(node=TextNode(id_='019a5006-3285-4730-b366-2a56179ec106', embedding=None, metadata={}, excluded_embed_metadata_keys=[], excluded_llm_metadata_keys=[], relationships={<NodeRelationship.SOURCE: '1'>: RelatedNodeInfo(node_id='03d3a8d4-23c4-4da8-86af-eaed74f99559', node_type=<ObjectType.DOCUMENT: '4'>, metadata={}, hash='82cf2869987e3e72bef2ed4eeb49c5920f26913ba6fe58a51bea524140c62144')}, text='This is a test document about RAG systems', mimetype='text/plain', start_char_idx=0, end_char_idx=41, text_template='{metadata_str}\n\n{content}', metadata_template='{key}: {value}', metadata_seperator='\n'), score=0.8703420561313896), NodeWithScore(node=TextNode(id_='7c3f22a8-1479-435a-b9e9-b0b73675fd9b', embedding=None, metadata={}, exclu