# RAG System Implementation with Qwen Models

This notebook implements a Retrieval-Augmented Generation (RAG) system using Qwen models. The implementation includes:

1. Basic Single-Turn RAG
2. Feature A: Multi-Turn Search (Optional)
3. Feature B: Agentic Workflow (Optional)

We'll be using one of the following Qwen models:
- Qwen2.5-0.5B-Instruct
- Qwen2.5-1.5B-Instruct
- Qwen2.5-3B-Instruct
- Qwen2.5-7B-Instruct

## 1. Setup and Dependencies

Let's start by installing and importing the required libraries. We'll need:
- transformers: For the Qwen model
- sentence-transformers: For text embeddings
- faiss-cpu/faiss-gpu: For vector storage and similarity search
- torch: For deep learning operations
- Additional utilities for text processing and data management

In [1]:
# Install required packages
!pip install -q transformers sentence-transformers faiss-cpu numpy pandas tqdm

In [2]:
import os
import json
import torch
import faiss
import numpy as np
import pandas as pd
from typing import List, Dict, Any
from dataclasses import dataclass
from tqdm.auto import tqdm
from transformers import AutoTokenizer, AutoModelForCausalLM
from sentence_transformers import SentenceTransformer

# set cache


# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Initialize Qwen model and tokenizer (we'll use the smallest model for demonstration)
MODEL_NAME = "Qwen/Qwen2.5-0.5B-Instruct"
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
model = AutoModelForCausalLM.from_pretrained(MODEL_NAME).to(device)

# Initialize sentence transformer for embeddings
embedding_model = SentenceTransformer('all-MiniLM-L6-v2').to(device)

  from .autonotebook import tqdm as notebook_tqdm


Using device: cuda


## 2. Data Loading and Preprocessing

In this section, we'll implement the following components:
1. Text chunking function to split documents into manageable pieces
2. Data loading utilities
3. Text preprocessing functions

In [None]:
@dataclass
class Document:
    """Class to represent a document chunk."""
    text: str
    metadata: Dict[str, Any]

def split_text(text: str, chunk_size: int = 200, overlap: int = 50) -> List[str]:
    """Split text into chunks with overlap."""
    words = text.split()
    chunks = []
    
    for i in range(0, len(words), chunk_size - overlap):
        chunk = ' '.join(words[i:i + chunk_size])
        chunks.append(chunk)
    
    return chunks

def load_documents(directory: str) -> List[Document]:
    """Load documents from a directory."""
    documents = []
    
    # This is a placeholder - implement actual document loading based on your data
    # For demonstration, we'll create some sample documents
    sample_texts = [
        "Barack Obama was born in Hawaii. He served as the 44th president of the United States.",
        "Michelle Obama was born in Chicago, Illinois. She served as First Lady of the United States.",
        "Barack and Michelle Obama met in Chicago and got married in 1992.",
        "Kathy lives in Lai King"
    ]
    
    for i, text in enumerate(sample_texts):
        chunks = split_text(text)
        for chunk in chunks:
            doc = Document(
                text=chunk,
                metadata={'source_id': i, 'source_text': text}
            )
            documents.append(doc)
    
    return documents

# Load documents
documents = load_documents("path/to/your/documents")
print(f"Loaded {len(documents)} document chunks")

Loaded 4 document chunks


## 3. Vector Store Creation

Now we'll create a FAISS vector store to index our document embeddings:

In [39]:
class VectorStore:
    def __init__(self, embedding_dim: int = 384):  # MiniLM-L6-v2 has 384 dimensions
        self.index = faiss.IndexFlatL2(embedding_dim)
        self.documents = []
        
    def add_documents(self, documents: List[Document]):
        """Add documents to the vector store."""
        texts = [doc.text for doc in documents]
        embeddings = embedding_model.encode(texts, convert_to_tensor=True)
        embeddings = embeddings.cpu().numpy()
        
        self.index.add(embeddings)
        self.documents.extend(documents)
    
    def search(self, query: str, k: int = 3) -> List[Document]:
        """Search for similar documents."""
        query_embedding = embedding_model.encode([query], convert_to_tensor=True)
        query_embedding = query_embedding.cpu().numpy()
        
        distances, indices = self.index.search(query_embedding, k)
        return [self.documents[i] for i in indices[0]]

# Create vector store and add documents
vector_store = VectorStore()
vector_store.add_documents(documents)

# Test search
results = vector_store.search("Where was Barack Obama born?")
print("Test search results:")
for doc in results:
    print(f"- {doc.text}")

Test search results:
- Barack Obama was born in Hawaii. He served as the 44th president of the United States.
- Michelle Obama was born in Chicago, Illinois. She served as First Lady of the United States.
- Barack and Michelle Obama met in Chicago and got married in 1992.


## 4. Basic RAG Implementation

Now we'll implement the core RAG pipeline components:
1. Passage retrieval
2. Prompt construction
3. Response generation

In [None]:
class BasicRAG:
    def __init__(self, model, tokenizer, vector_store):
        self.model = model
        self.tokenizer = tokenizer
        self.vector_store = vector_store
        self.prompt_instruction = """You are a helpful assistant. Reply briefly and only with information explicitly stated in the context that directly answers the question. If you cannot find an answer based on the provided information, say "I don't have enough information to answer that.". Use the following examples as a guide for your response style but not context:

        Example 1:
        Context: The capital of Germany is the city of Berlin. The capital of France is Paris.
        Query: What is the capital of Germany?
        Answer: The capital of Germany is Berlin.

        Example 2:
        Context: William Shakespeare was an English playwright. He wrote Romeo and Juliet in the 1590s.
        Query: Who wrote Romeo and Juliet?
        Answer: Romeo and Juliet was written by William Shakespeare.

        Example 3:
        Context: BeiJing is in China.
        Query: Where is New York?
        Answer: I don't have enough information to answer that.

        Now, use this context to answer the query:"""
        
    def _build_prompt(self, query: str, contexts: List[Document]) -> str:
        """Build a prompt combining the query and retrieved contexts."""
        context_str = "\n\n".join([doc.text for doc in contexts])
        
        prompt = f"""{self.prompt_instruction}
        Context:
        {context_str}

        Query: {query}

        Answer:"""
        
        return prompt
    
    def generate_response(self, query: str, max_new_tokens: int = 200) -> str:
        """Generate a response using RAG."""
        # Retrieve relevant documents
        relevant_docs = self.vector_store.search(query)
        
        # Build prompt
        prompt = self._build_prompt(query, relevant_docs)
        
        # Generate response
        inputs = self.tokenizer(prompt, return_tensors="pt").to(device)
        outputs = self.model.generate(
            **inputs,
            max_new_tokens=max_new_tokens,
            num_return_sequences=1,
            temperature=0.3, # less creative answer
            pad_token_id=self.tokenizer.eos_token_id
        )
        
        response = self.tokenizer.decode(outputs[0], skip_special_tokens=True)

        return response.split("Answer:")[-1].strip()

# Initialize RAG system
rag_system = BasicRAG(model, tokenizer, vector_store)

# Test the basic RAG system
test_query = "where did Kathy live"
response = rag_system.generate_response(test_query)
print(f"Question: {test_query}")
print(f"Answer: {response}")

Question: where did Kathy live
Answer: Lai King.


## 5. Query Processing Pipeline

Let's implement query processing components that will help with both basic RAG and the optional features:

In [None]:
from collections import deque

class QueryProcessor:
    def __init__(self, memory_size: int = 5):
        self.conversation_history = deque(maxlen=memory_size) # 
    
    def preprocess_query(self, query: str) -> str:
        """Clean and normalize the query."""
        return query.strip()
    
    def extract_entities(self, text: str) -> List[str]:
        """Extract key entities from text (simplified version)."""
        # This is a placeholder - in a real system, use NER
        words = text.split()
        # Simple capitalized words detection
        entities = [word for word in words if word[0].isupper()]
        return entities
    
    def expand_query(self, query: str) -> str:
        """Expand query with relevant context from conversation history."""
        if not self.conversation_history:
            return query
            
        # Get the last exchange
        last_exchange = self.conversation_history[-1]
        entities = self.extract_entities(last_exchange["query"] + " " + last_exchange["response"])
        
        # If the query contains pronouns and we have entities, expand it
        pronouns = ["he", "she", "his", "her", "their", "it"]
        if any(pronoun in query.lower() for pronoun in pronouns) and entities:
            expanded_query = f"{', '.join(entities)}: {query}"
            print(expanded_query)
            return expanded_query
            
        return query
    
    def add_to_history(self, query: str, response: str):
        """Add an exchange to conversation history."""
        self.conversation_history.append({
            "query": query,
            "response": response,
            "entities": self.extract_entities(query + " " + response)
        })

# Initialize query processor
query_processor = QueryProcessor()

# Test query processing
test_query = "What about his wife?"
processed_query = query_processor.preprocess_query(test_query)
expanded_query = query_processor.expand_query(processed_query)

print(f"Original query: {test_query}")
print(f"Expanded query: {expanded_query}")

Original query: What about his wife?
Expanded query: What about his wife?


## 6. Multi-Turn Conversation Handler (Feature A)

Implement the multi-turn conversation feature with context management:

In [None]:
class MultiTurnRAG(BasicRAG):
    def __init__(self, model, tokenizer, vector_store):
        super().__init__(model, tokenizer, vector_store)
        self.query_processor = QueryProcessor()
        
    def _build_prompt(self, query: str, contexts: List[Document], include_history: bool = True) -> str:
        """Build a prompt with conversation history."""
        context_str = "\n\n".join([doc.text for doc in contexts])
        
        history_str = ""
        if include_history and self.query_processor.conversation_history:
            history = self.query_processor.conversation_history[-3:]  # Last 3 exchanges
            history_str = "\n".join([
                f"Query: {h['query']}\nAnswer: {h['response']}"
                for h in history
            ])
            history_str = f"\nPrevious conversation:\n{history_str}\n"
            
        prompt = f"""{self.prompt_instruction}
        Context:
        {context_str}
        {history_str}

        Query: {query}

        Answer:"""
        
        return prompt
    
    def generate_response(self, query: str, max_new_tokens: int = 200) -> str:
        """Generate a response with conversation context."""
        # Process and expand query
        processed_query = self.query_processor.preprocess_query(query)
        expanded_query = self.query_processor.expand_query(processed_query)
        
        # Get response using the expanded query
        response = super().generate_response(expanded_query)
        
        # Update conversation history
        self.query_processor.add_to_history(query, response)
        
        return response

# Initialize multi-turn RAG system
multi_turn_rag = MultiTurnRAG(model, tokenizer, vector_store)

# Test multi-turn conversation
queries = [
    "Where was Barack Obama born?",
    "Where does kathy live?",
    "What about his wife, where was she born?"
]

print("Testing multi-turn conversation:")
for query in queries:
    response = multi_turn_rag.generate_response(query)
    print(f"\nQ: {query}")
    print(f"A: {response}")

Testing multi-turn conversation:

Q: Where was Barack Obama born?
A: Barack Obama was born in Hawaii.
Where, Barack, Obama, Barack, Obama, Hawaii.: Where does kathy live?


TypeError: sequence index must be integer, not 'slice'

## 7. Agentic Workflow Components (Feature B)

Implement advanced features for improved response quality:

In [None]:
class AgenticRAG(MultiTurnRAG):
    def __init__(self, model, tokenizer, vector_store):
        super().__init__(model, tokenizer, vector_store)
    
    def _decompose_query(self, query: str) -> List[str]:
        """Decompose complex queries into simpler sub-queries."""
        decomposition_prompt = f"""Break down the following question into simpler sub-questions:
        Question: {query}
        
        Generate sub-questions in a numbered list. Keep it simple."""
        
        inputs = self.tokenizer(decomposition_prompt, return_tensors="pt").to(device)
        outputs = self.model.generate(**inputs, max_new_tokens=256)
        decomposition = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
        
        # Extract sub-questions (simple implementation)
        sub_queries = [
            line.split(". ", 1)[1] if ". " in line else line
            for line in decomposition.split("\n")
            if line.strip() and line[0].isdigit()
        ]
        
        return sub_queries
    
    def _verify_response(self, response: str, context: List[Document]) -> bool:
        """Verify if the response is supported by the context."""
        context_str = "\n".join([doc.text for doc in context])
        
        verification_prompt = f"""Given the following context and response, determine if the response is fully supported by the context.
        
        Context: {context_str}
        
        Response: {response}
        
        Is the response fully supported by the context? Answer with just 'yes' or 'no'."""
        
        inputs = self.tokenizer(verification_prompt, return_tensors="pt").to(device)
        outputs = self.model.generate(**inputs, max_new_tokens=128)
        result = self.tokenizer.decode(outputs[0], skip_special_tokens=True).lower()
        
        return "yes" in result
    
    def generate_response(self, query: str, max_new_tokens: int = 600) -> str:
        """Generate a response using the agentic workflow."""
        # 1. Query Planning
        sub_queries = self._decompose_query(query)
        
        # 2. Parallel Retrieval and Response Generation
        sub_responses = []
        for sub_query in sub_queries:
            # Get relevant documents for sub-query
            relevant_docs = self.vector_store.search(sub_query)
            
            # Generate response for sub-query
            prompt = self._build_prompt(sub_query, relevant_docs, include_history=False)
            inputs = self.tokenizer(prompt, return_tensors="pt").to(device)
            outputs = self.model.generate(**inputs, max_new_tokens=max_new_tokens)
            sub_response = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
            sub_responses.append(sub_response)
        
        # 3. Response Synthesis
        synthesis_prompt = f"""Synthesize a coherent answer from the following responses:
        
        Question: {query}
        
        Sub-responses:
        {chr(10).join(sub_responses)}
        
        Provide a clear, concise answer that addresses the original question."""
        
        inputs = self.tokenizer(synthesis_prompt, return_tensors="pt").to(device)
        outputs = self.model.generate(**inputs, max_new_tokens=max_new_tokens)
        final_response = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
        
        # 4. Verification
        relevant_docs = self.vector_store.search(query)
        if not self._verify_response(final_response, relevant_docs):
            final_response += "\n[Note: Some details in this response may require verification.]"
        
        # Update conversation history
        self.query_processor.add_to_history(query, final_response)
        
        return final_response

# Initialize agentic RAG system
agentic_rag = AgenticRAG(model, tokenizer, vector_store)

# Test complex query
complex_query = "Tell me about Barack Obama's early life and his wife's background."
response = agentic_rag.generate_response(complex_query)
print(f"\nComplex Query: {complex_query}")
print(f"Response: {response}")


Complex Query: Tell me about Barack Obama's early life and his wife's background.
Response: Synthesize a coherent answer from the following responses:

        Question: Tell me about Barack Obama's early life and his wife's background.

        Sub-responses:
        You are a helpful assistant. Use the following information to answer the question.
        If you cannot find the answer in the provided context, say "I don't have enough information to answer that."

        Context:
        Michelle Obama was born in Chicago, Illinois. She served as First Lady of the United States.

Barack Obama was born in Hawaii. He served as the 44th president of the United States.

Barack and Michelle Obama met in Chicago and got married in 1992.
        
        Question: The first president was George Washington.

        Answer: Let me answer based on the provided information. Barack Obama is the first president mentioned in the given context, and he is also referred to as the 44th president of 

## 8. System Integration

Let's create a unified interface that allows switching between different RAG modes:

In [None]:
class UnifiedBasicRAG:
    def __init__(self, model_name: str = "Qwen/Qwen2.5-0.5B-Instruct"):
        self.model = AutoModelForCausalLM.from_pretrained(model_name).to(device)
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.vector_store = VectorStore()
        
        # Initialize different RAG modes
        self.basic_rag = BasicRAG(self.model, self.tokenizer, self.vector_store)
        self.multi_turn_rag = MultiTurnRAG(self.model, self.tokenizer, self.vector_store)
        self.agentic_rag = AgenticRAG(self.model, self.tokenizer, self.vector_store)
        
    def add_documents(self, documents: List[Document]):
        """Add documents to the knowledge base."""
        self.vector_store.add_documents(documents)
    
    def query(self, 
             query: str, 
             mode: str = "basic",
             max_new_tokens: int = 200) -> str:
        """
        Query the RAG system using specified mode.
        
        Args:
            query: The user's question
            mode: One of ["basic", "multi_turn", "agentic"]
            max_new_tokens: Maximum length of the generated response
        """
        if mode == "basic":
            return self.basic_rag.generate_response(query, max_new_tokens)
        elif mode == "multi_turn":
            return self.multi_turn_rag.generate_response(query, max_new_tokens)
        elif mode == "agentic":
            return self.agentic_rag.generate_response(query, max_new_tokens)
        else:
            raise ValueError(f"Unknown mode: {mode}")

# Initialize unified system
unified_rag = UnifiedBasicRAG()
unified_rag.add_documents(documents)

# Test different modes
test_queries = [
    ("Where was Barack Obama born?", "basic"),
    ("What about his wife?", "multi_turn"),
    ("Tell me about both Barack and Michelle Obama's early lives.", "agentic")
]

print("Testing different RAG modes:")
for query, mode in test_queries:
    print(f"\nMode: {mode}")
    print(f"Q: {query}")
    response = unified_rag.query(query, mode=mode)
    print(f"A: {response}")

Testing different RAG modes:

Mode: basic
Q: Where was Barack Obama born?
A: According to the given context, Barack Obama was born in Hawaii. The information states:

"Barack Obama was born in Hawaii. He served as the 44th president of the United States."

So, the correct answer is: Hawaii.
I don't have enough information to answer that.
Therefore, the response is: I don't have enough information to answer that.

Mode: multi_turn
Q: What about his wife?
A: The passage states that Michelle Obama was born in Chicago, Illinois and she served as First Lady of the United States. It also mentions that Barack Obama was born in Hawaii and he served as the 44th president of the United States. Therefore, it is not possible to determine what about his wife from the given information. I don't have enough information to answer that question.
Therefore, my answer is: I don't have enough information to answer that.

Mode: agentic
Q: Tell me about both Barack and Michelle Obama's early lives.
A: Synth

## 9. Evaluation Metrics

Implement metrics to evaluate the system's performance:

In [None]:
class RAGEvaluator:
    def __init__(self, rag_system: UnifiedBasicRAG):
        self.rag_system = rag_system
        
    def evaluate_retrieval(self, query: str, relevant_docs: List[Document]) -> dict:
        """Evaluate retrieval performance."""
        retrieved_docs = self.rag_system.vector_store.search(query)
        
        # Calculate precision and recall
        relevant_ids = set(doc.metadata['source_id'] for doc in relevant_docs)
        retrieved_ids = set(doc.metadata['source_id'] for doc in retrieved_docs)
        
        true_positives = len(relevant_ids.intersection(retrieved_ids))
        precision = true_positives / len(retrieved_ids) if retrieved_ids else 0
        recall = true_positives / len(relevant_ids) if relevant_ids else 0
        f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
        
        return {
            'precision': precision,
            'recall': recall,
            'f1': f1
        }
    
    def evaluate_response_quality(self, 
                                query: str, 
                                ground_truth: str,
                                mode: str = "basic") -> dict:
        """Evaluate response quality."""
        response = self.rag_system.query(query, mode=mode)
        
        # Calculate response length
        response_length = len(response.split())
        
        # Simplified similarity score (in practice, use better metrics)
        ground_truth_words = set(ground_truth.lower().split())
        response_words = set(response.lower().split())
        similarity = len(ground_truth_words.intersection(response_words)) / len(ground_truth_words)
        
        return {
            'response_length': response_length,
            'similarity_to_ground_truth': similarity
        }
    
    def run_evaluation(self, 
                      test_cases: List[dict],
                      modes: List[str] = ["basic", "multi_turn", "agentic"]) -> dict:
        """Run full evaluation on test cases."""
        results = {mode: {'retrieval': [], 'response': []} for mode in modes}
        
        for test_case in test_cases:
            query = test_case['query']
            ground_truth = test_case['ground_truth']
            relevant_docs = test_case['relevant_docs']
            
            # Evaluate retrieval
            retrieval_metrics = self.evaluate_retrieval(query, relevant_docs)
            
            # Evaluate response for each mode
            for mode in modes:
                response_metrics = self.evaluate_response_quality(
                    query, ground_truth, mode
                )
                
                results[mode]['retrieval'].append(retrieval_metrics)
                results[mode]['response'].append(response_metrics)
        
        # Calculate averages
        final_results = {}
        for mode in modes:
            final_results[mode] = {
                'avg_precision': np.mean([m['precision'] for m in results[mode]['retrieval']]),
                'avg_recall': np.mean([m['recall'] for m in results[mode]['retrieval']]),
                'avg_f1': np.mean([m['f1'] for m in results[mode]['retrieval']]),
                'avg_response_length': np.mean([m['response_length'] for m in results[mode]['response']]),
                'avg_similarity': np.mean([m['similarity_to_ground_truth'] for m in results[mode]['response']])
            }
            
        return final_results

# Create test cases
test_cases = [
    {
        'query': "Where was Barack Obama born?",
        'ground_truth': "Barack Obama was born in Hawaii.",
        'relevant_docs': [Document(
            text="Barack Obama was born in Hawaii.",
            metadata={'source_id': 0}
        )]
    },
    {
        'query': "Where was Michelle Obama born?",
        'ground_truth': "Michelle Obama was born in Chicago, Illinois.",
        'relevant_docs': [Document(
            text="Michelle Obama was born in Chicago, Illinois.",
            metadata={'source_id': 1}
        )]
    }
]

# Run evaluation
evaluator = RAGEvaluator(unified_rag)
evaluation_results = evaluator.run_evaluation(test_cases)

# Print results
print("Evaluation Results:")
for mode, metrics in evaluation_results.items():
    print(f"\nMode: {mode}")
    for metric, value in metrics.items():
        print(f"{metric}: {value:.3f}")

Evaluation Results:

Mode: basic
avg_precision: 0.333
avg_recall: 1.000
avg_f1: 0.500
avg_response_length: 82.500
avg_similarity: 1.000

Mode: multi_turn
avg_precision: 0.333
avg_recall: 1.000
avg_f1: 0.500
avg_response_length: 30.500
avg_similarity: 1.000

Mode: agentic
avg_precision: 0.333
avg_recall: 1.000
avg_f1: 0.500
avg_response_length: 180.500
avg_similarity: 0.774
