In [1]:
# cell 1: Install dependencies
!pip install -q faiss-cpu sentence-transformers bitsandbytes accelerate transformers requests beautifulsoup4
!pip install -q torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118

In [2]:
# cell 2: Import libraries
import torch
import requests
from bs4 import BeautifulSoup
import faiss
import numpy as np
from sentence_transformers import SentenceTransformer
from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
    BitsAndBytesConfig,
    pipeline
)
import json
import os
from typing import List, Dict, Any

In [3]:
# cell 3: Define SimpleRAGSystem class - Combined
import torch
import requests
from bs4 import BeautifulSoup
import faiss
import numpy as np
from sentence_transformers import SentenceTransformer
from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
    BitsAndBytesConfig,
    pipeline
)
import json
import os
import re
from typing import List, Dict, Any

class SimpleRAGSystem:
    def __init__(self, embedding_model: str = "sentence-transformers/all-mpnet-base-v2",
                 llm_model_name: str = "mistralai/Mistral-7B-Instruct-v0.2"):
        self.embedding_model = SentenceTransformer(embedding_model)
        self.llm_model_name = llm_model_name
        self.documents = []
        self.embeddings = None
        self.index = None
        self.setup_llm()

    def setup_llm(self):
        """Initialize the language model with quantization"""
        self.tokenizer = AutoTokenizer.from_pretrained(self.llm_model_name)

        quantization_config = BitsAndBytesConfig(
            load_in_4bit=True,
            bnb_4bit_compute_dtype=torch.float16,
            bnb_4bit_quant_type="nf4",
        )

        self.model = AutoModelForCausalLM.from_pretrained(
            self.llm_model_name,
            quantization_config=quantization_config,
            device_map="auto",
            trust_remote_code=True
        )

        self.generator = pipeline(
            "text-generation",
            model=self.model,
            tokenizer=self.tokenizer,
            max_new_tokens=256,
            temperature=0.7,
            do_sample=True
        )

    def load_documents(self, source: str, chunk_size: int = 500) -> List[str]:
        """Load text data from a URL or local file and split into chunks"""
        print("Loading and processing documents...")

        text = ""

        # If the source is a URL
        if source.startswith("http"):
            response = requests.get(source)
            soup = BeautifulSoup(response.text, "html.parser")

            # Extract only visible text
            paragraphs = [p.get_text() for p in soup.find_all("p")]
            text = "\n".join(paragraphs)
        else:
            # If it’s a local text file
            with open(source, "r", encoding="utf-8") as f:
                text = f.read()

        # Basic cleaning
        text = re.sub(r"\s+", " ", text).strip()

        # Split text into chunks
        chunks = [text[i:i+chunk_size] for i in range(0, len(text), chunk_size)]

        self.documents = chunks
        print(f"✅ Loaded {len(chunks)} document chunks.")
        return chunks

    def create_embeddings(self):
        """Create embeddings and build FAISS index"""
        if not self.documents:
            raise ValueError("No documents loaded")

        self.embeddings = self.embedding_model.encode(self.documents)
        self.embeddings = np.array(self.embeddings).astype('float32')

        # Create FAISS index
        dimension = self.embeddings.shape[1]
        self.index = faiss.IndexFlatIP(dimension)
        faiss.normalize_L2(self.embeddings)
        self.index.add(self.embeddings)

    def retrieve(self, query: str, k: int = 3) -> List[dict]:
        """Retrieve relevant documents for a query"""
        if self.index is None:
            raise ValueError("Index not built. Call create_embeddings first.")

        query_embedding = self.embedding_model.encode([query])
        query_embedding = np.array(query_embedding).astype('float32')
        faiss.normalize_L2(query_embedding)

        distances, indices = self.index.search(query_embedding, k)

        results = []
        for idx, distance in zip(indices[0], distances[0]):
            if idx < len(self.documents):
                results.append({
                    'content': self.documents[idx],
                    'score': float(distance),
                    'index': idx
                })

        return results

    def generate_answer(self, query: str, context: str) -> str:
        """Generate answer using the LLM with context"""
        prompt = f"""Based on the following context, please answer the question.
If the context doesn't contain relevant information, say you don't know.

Context: {context}

Question: {query}

Answer:"""

        response = self.generator(
            prompt,
            max_new_tokens=256,
            temperature=0.7,
            do_sample=True,
            pad_token_id=self.tokenizer.eos_token_id
        )

        # Remove the prompt from the generated text
        return response[0]['generated_text'].replace(prompt, '').strip()

    def ask(self, query: str, k: int = 3) -> Dict[str, Any]:
        """Complete RAG pipeline: retrieve + generate"""
        retrieved_docs = self.retrieve(query, k)
        context = "\n\n".join([doc['content'] for doc in retrieved_docs])
        answer = self.generate_answer(query, context)

        return {
            'question': query,
            'answer': answer,
            'sources': retrieved_docs,
            'context': context
        }

    def save(self, path: str):
        """Save the RAG system"""
        if not os.path.exists(path):
            os.makedirs(path)

        # Save documents
        with open(os.path.join(path, 'documents.json'), 'w') as f:
            json.dump(self.documents, f)

        # Save embeddings and index
        if self.embeddings is not None:
            np.save(os.path.join(path, 'embeddings.npy'), self.embeddings)
            faiss.write_index(self.index, os.path.join(path, 'faiss.index'))

    def load(self, path: str):
        """Load a saved RAG system"""
        # Load documents
        with open(os.path.join(path, 'documents.json'), 'r') as f:
            self.documents = json.load(f)

        # Load embeddings and index
        self.embeddings = np.load(os.path.join(path, 'embeddings.npy'))
        self.index = faiss.read_index(os.path.join(path, 'faiss.index'))

In [4]:
# cell 4: Define SimpleRAGSystem class - Part 2 (LLM setup)
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig, pipeline
import torch

class SimpleRAGSystem:
    def __init__(self, llm_model_name="mistralai/Mistral-7B-Instruct-v0.2"):
        self.llm_model_name = llm_model_name
        self.setup_llm()

    def setup_llm(self):
        """Initialize the language model with quantization"""
        self.tokenizer = AutoTokenizer.from_pretrained(self.llm_model_name)

        quantization_config = BitsAndBytesConfig(
            load_in_4bit=True,
            bnb_4bit_compute_dtype=torch.float16,
            bnb_4bit_quant_type="nf4",
        )

        self.model = AutoModelForCausalLM.from_pretrained(
            self.llm_model_name,
            quantization_config=quantization_config,
            device_map="auto",
            trust_remote_code=True
        )

        self.generator = pipeline(
            "text-generation",
            model=self.model,
            tokenizer=self.tokenizer,
            max_new_tokens=256,
            temperature=0.7,
            do_sample=True
        )


In [5]:
# cell 5: Define SimpleRAGSystem class - Part 3 (Document loading)
import requests
from bs4 import BeautifulSoup
from typing import List

class SimpleRAGSystem:
    def __init__(self, llm_model_name="mistralai/Mistral-7B-Instruct-v0.2"):
        self.llm_model_name = llm_model_name
        self.setup_llm()

    def setup_llm(self):
        """Initialize the language model with quantization"""
        self.tokenizer = AutoTokenizer.from_pretrained(self.llm_model_name)
        quantization_config = BitsAndBytesConfig(
            load_in_4bit=True,
            bnb_4bit_compute_dtype=torch.float16,
            bnb_4bit_quant_type="nf4",
        )
        self.model = AutoModelForCausalLM.from_pretrained(
            self.llm_model_name,
            quantization_config=quantization_config,
            device_map="auto",
            trust_remote_code=True
        )
        self.generator = pipeline(
            "text-generation",
            model=self.model,
            tokenizer=self.tokenizer,
            max_new_tokens=256,
            temperature=0.7,
            do_sample=True
        )

    def load_documents(self, source: str):
        """Load documents from URL or local file"""
        if source.startswith('http'):
            response = requests.get(source)
            soup = BeautifulSoup(response.content, 'html.parser')
            text = soup.get_text()
            # Simple text cleaning
            lines = [line.strip() for line in text.split('\n') if line.strip()]
            text = ' '.join(lines)
            self.documents = self.split_text(text)
        else:
            with open(source, 'r', encoding='utf-8') as f:
                text = f.read()
            self.documents = self.split_text(text)

    def split_text(self, text: str, chunk_size: int = 1000, chunk_overlap: int = 200) -> List[str]:
        """Split text into overlapping chunks"""
        chunks = []
        start = 0
        while start < len(text):
            end = start + chunk_size
            chunks.append(text[start:end])
            start += chunk_size - chunk_overlap
            if end >= len(text):
                break
        return chunks


In [6]:
# cell 6: Define SimpleRAGSystem class - Part 4 (Embeddings and retrieval)
import faiss
import numpy as np
from typing import List

class SimpleRAGSystem:
    def __init__(self, embedding_model):
        self.embedding_model = embedding_model
        self.documents = []
        self.embeddings = None
        self.index = None

    def create_embeddings(self):
        """Create embeddings and build FAISS index"""
        if not self.documents:
            raise ValueError("No documents loaded")

        self.embeddings = self.embedding_model.encode(self.documents)
        self.embeddings = np.array(self.embeddings).astype('float32')

        # Create FAISS index
        dimension = self.embeddings.shape[1]
        self.index = faiss.IndexFlatIP(dimension)
        faiss.normalize_L2(self.embeddings)
        self.index.add(self.embeddings)

    def retrieve(self, query: str, k: int = 3) -> List[dict]:
        """Retrieve relevant documents for a query"""
        if self.index is None:
            raise ValueError("Index not built. Call create_embeddings first.")

        query_embedding = self.embedding_model.encode([query])
        query_embedding = np.array(query_embedding).astype('float32')
        faiss.normalize_L2(query_embedding)

        distances, indices = self.index.search(query_embedding, k)

        results = []
        for idx, distance in zip(indices[0], distances[0]):
            if idx < len(self.documents):
                results.append({
                    'content': self.documents[idx],
                    'score': float(distance),
                    'index': idx
                })

        return results


In [7]:
# cell 7: Define SimpleRAGSystem class - Part 5 (Generation and main pipeline)
from typing import Dict, Any

class SimpleRAGSystem:
    def __init__(self, llm_model, tokenizer, generator, embedding_model):
        self.model = llm_model
        self.tokenizer = tokenizer
        self.generator = generator
        self.embedding_model = embedding_model
        self.documents = []
        self.embeddings = None
        self.index = None

    def create_embeddings(self):
        """Create embeddings and build FAISS index"""
        if not self.documents:
            raise ValueError("No documents loaded")

        self.embeddings = self.embedding_model.encode(self.documents)
        self.embeddings = np.array(self.embeddings).astype('float32')
        dimension = self.embeddings.shape[1]
        self.index = faiss.IndexFlatIP(dimension)
        faiss.normalize_L2(self.embeddings)
        self.index.add(self.embeddings)

    def retrieve(self, query: str, k: int = 3) -> List[dict]:
        """Retrieve relevant documents for a query"""
        if self.index is None:
            raise ValueError("Index not built. Call create_embeddings first.")

        query_embedding = self.embedding_model.encode([query])
        query_embedding = np.array(query_embedding).astype('float32')
        faiss.normalize_L2(query_embedding)

        distances, indices = self.index.search(query_embedding, k)

        results = []
        for idx, distance in zip(indices[0], distances[0]):
            if idx < len(self.documents):
                results.append({
                    'content': self.documents[idx],
                    'score': float(distance),
                    'index': idx
                })

        return results

    def generate_answer(self, query: str, context: str) -> str:
        """Generate answer using the LLM with context"""
        prompt = f"""Based on the following context, please answer the question.
If the context doesn't contain relevant information, say you don't know.

Context: {context}

Question: {query}

Answer:"""

        response = self.generator(
            prompt,
            max_new_tokens=256,
            temperature=0.7,
            do_sample=True,
            pad_token_id=self.tokenizer.eos_token_id
        )

        # Remove the prompt from the generated text
        return response[0]['generated_text'].replace(prompt, '').strip()

    def ask(self, query: str, k: int = 3) -> Dict[str, Any]:
        """Complete RAG pipeline: retrieve + generate"""
        retrieved_docs = self.retrieve(query, k)
        context = "\n\n".join([doc['content'] for doc in retrieved_docs])
        answer = self.generate_answer(query, context)

        return {
            'question': query,
            'answer': answer,
            'sources': retrieved_docs,
            'context': context
        }


In [8]:
# cell 8: Define SimpleRAGSystem class - Part 6 (Save/load functionality)

def save(self, path: str):
    """Save the RAG system"""
    if not os.path.exists(path):
        os.makedirs(path)

    # Save documents
    with open(os.path.join(path, 'documents.json'), 'w') as f:
        json.dump(self.documents, f)

    # Save embeddings and index
    if self.embeddings is not None:
        np.save(os.path.join(path, 'embeddings.npy'), self.embeddings)
        faiss.write_index(self.index, os.path.join(path, 'faiss.index'))

def load(self, path: str):
    """Load a saved RAG system"""
    # Load documents
    with open(os.path.join(path, 'documents.json'), 'r') as f:
        self.documents = json.load(f)

    # Load embeddings and index
    self.embeddings = np.load(os.path.join(path, 'embeddings.npy'))
    self.index = faiss.read_index(os.path.join(path, 'faiss.index'))


In [None]:
# cell 9: Initialize RAG system

from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline, BitsAndBytesConfig
from sentence_transformers import SentenceTransformer
import torch

print("Initializing RAG system...")

# Load the embedding model for document retrieval
embedding_model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")

# Load the tokenizer and generator model (LLM)
llm_model_name = "tiiuae/falcon-7b-instruct"  # You can replace with a smaller one if Colab RAM is low

quantization_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_compute_dtype=torch.float16,
    bnb_4bit_quant_type="nf4",
)

tokenizer = AutoTokenizer.from_pretrained(llm_model_name)
model = AutoModelForCausalLM.from_pretrained(
    llm_model_name,
    quantization_config=quantization_config,
    device_map="auto",
    trust_remote_code=True
)

generator = pipeline(
    "text-generation",
    model=model,
    tokenizer=tokenizer,
    max_new_tokens=256,
    temperature=0.7,
    do_sample=True
)


# Initialize SimpleRAGSystem with all components
rag = SimpleRAGSystem(
    llm_model_name=llm_model_name, # Pass model name for setup_llm
    embedding_model="sentence-transformers/all-MiniLM-L6-v2" # Pass model name for __init__
)

# Assign the loaded model components to the rag object
rag.tokenizer = tokenizer
rag.model = model
rag.generator = generator
rag.embedding_model = embedding_model


print("✅ RAG system initialized successfully!")

Initializing RAG system...


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.



Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

In [None]:
# cell 10: Define SimpleRAGSystem class - Part (Document Loading)
import requests
from bs4 import BeautifulSoup
import re
from typing import List

def load_documents(self, source: str, chunk_size: int = 500) -> List[str]:
    """Load text data from a URL or local file and split into chunks"""
    print("Loading and processing documents...")

    text = ""

    # If the source is a URL
    if source.startswith("http"):
        response = requests.get(source)
        soup = BeautifulSoup(response.text, "html.parser")

        # Extract only visible text
        paragraphs = [p.get_text() for p in soup.find_all("p")]
        text = "\n".join(paragraphs)
    else:
        # If it’s a local text file
        with open(source, "r", encoding="utf-8") as f:
            text = f.read()

    # Basic cleaning
    text = re.sub(r"\s+", " ", text).strip()

    # Split text into chunks
    chunks = [text[i:i+chunk_size] for i in range(0, len(text), chunk_size)]

    self.documents = chunks
    print(f"✅ Loaded {len(chunks)} document chunks.")
    return chunks

# Attach this method to your class
SimpleRAGSystem.load_documents = load_documents

# Re-initialize the rag object to include the new method
rag = SimpleRAGSystem(
    llm_model="tiiuae/falcon-7b-instruct",
    tokenizer=tokenizer,
    generator=generator,
    embedding_model=embedding_model
)

In [None]:
# cell 11: Create embeddings
print("Creating embeddings and FAISS index...")
# Load documents before creating embeddings
rag.load_documents("https://raw.githubusercontent.com/pinecone-io/examples/master/learn/generation/rag/rag-with-langchain/data/paul_graham_essay.txt")
rag.create_embeddings()
print("Embeddings and index created successfully!")

In [None]:
# cell 12: Test the system
print("Testing the RAG system...")
questions = [
    "What is artificial intelligence?",
    "What are the main applications of AI?",
    "How does machine learning relate to AI?"
]

for question in questions:
    result = rag.ask(question)
    print(f"Q: {result['question']}")
    print(f"A: {result['answer']}")
    print(f"Retrieved {len(result['sources'])} documents")
    print("=" * 60)

In [None]:
# cell 13: Save the system
print("Saving the system...")
rag.save("saved_rag_system")
print("System saved to 'saved_rag_system'")

In [None]:
# cell 14: Load and test saved system
print("Testing saved system loading...")
new_rag = SimpleRAGSystem()
new_rag.load("saved_rag_system")

test_result = new_rag.ask("What is machine learning?")
print(f"Q: What is machine learning?")
print(f"A: {test_result['answer']}")
print("System loaded and tested successfully!")