In [None]:
!pip install llama-index faiss-cpu redis fastapi uvicorn sentence-transformers openai

In [None]:
!pip install llama-index faiss-cpu redis fastapi uvicorn sentence-transformers openai

import os
import faiss
import redis
import fastapi
import uvicorn
from llama_index import VectorStoreIndex, SimpleDirectoryReader, ServiceContext, StorageContext, load_index_from_storage, QueryEngine
from llama_index.query_engine import RetrieverQueryEngine
from llama_index.llms import OpenAI
from sentence_transformers import SentenceTransformer
from llama_index.embeddings import HuggingFaceEmbedding
from fastapi import FastAPI, HTTPException
import json
from pydantic import BaseModel
from typing import List

Set API Keys

In [None]:
os.environ["OPEN_API_KEY"] = "Your api key will be here"

1. Basic LlamaIndex Setup
● Task: Install and set up LlamaIndex in a new Python project.
● Use Case: A company wants to explore RAG-based AI chatbots, and the first step is setting up the basic LlamaIndex library

In [None]:
documents = SimpleDirectoryReader('/content/sample_data').load_data()
index = VectorStoreIndex.from_documents(documents)


2. Constructing a Simple Index
● Task: Write a Python script that:
○ Loads a small text document (e.g., data.txt).
○ Creates a LlamaIndex VectorStoreIndex.
○ Saves the index and reloads it.
● Use Case: A law firm wants to build an internal document retrieval system where employees can search for legal references quickly

In [None]:

index.storage_context.persist(persist_dir="/content/sample_data/data.txt")
# Reload the index
storage_context = StorageContext.from_defaults(persist_dir="/content/sample_data/data.txt")
index = load_index_from_storage(storage_context)


3. Implementing a Basic RAG Pipeline
● Task: Create a simple RAG pipeline using:
○ OpenAI GPT as the LLM.
○ FAISS as the vector store.
○ LlamaIndex for retrieval.

In [None]:
# Create a FAISS vector store
vector_store = FaissVectorStore(faiss_index=faiss.IndexFlatL2(1536))

# Create a service context with the FAISS vector store and OpenAI LLM
embed_model = HuggingFaceEmbedding(model_name="sentence-transformers/all-mpnet-base-v2")
service_context = ServiceContext.from_defaults(llm=OpenAI(temperature=0, model="gpt-3.5-turbo"), embed_model=embed_model, vector_store=vector_store)

index = VectorStoreIndex.from_documents(documents, service_context=service_context)

query_engine = index.as_query_engine()

response = query_engine.query("What is in the documents?")
response


4. Customizing the Indexing Process
● Task: Modify the default LlamaIndex indexing process to:
○ Split documents into smaller chunks (e.g., 256 tokens).
○ Use a different embedding model (e.g., sentence-transformers/all-MiniLM-L6-v2).
● Use Case: A tech company needs to break down its API documentation into smaller, searchable segments to help developers find relevant code snippets.

In [None]:
from llama_index.text_splitter import TokenTextSplitter

# Define chunk size and embedding model
chunk_size = 256
embed_model = HuggingFaceEmbedding(model_name="sentence-transformers/all-MiniLM-L6-v2")

# Create a text splitter
text_splitter = TokenTextSplitter(chunk_size=chunk_size)
# Create a service context with the new settings
service_context = ServiceContext.from_defaults(llm=OpenAI(temperature=0, model="gpt-3.5-turbo"), embed_model=embed_model, text_splitter=text_splitter)

# Create the index with the custom service context
index = VectorStoreIndex.from_documents(documents, service_context=service_context)
query_engine = index.as_query_engine()
response = query_engine.query("What is in the documents?")
response


5. Query Engine Customization
● Task: Implement a custom retriever that ranks retrieved documents based on keyword matching before passing them to the LLM.
● Use Case: A university wants to create a chatbot that helps students find course materials, prioritizing documents that contain course codes and professor names.

In [None]:
from llama_index.retrievers import BaseRetriever
from typing import List
from llama_index.schema import NodeWithScore

class KeywordMatchingRetriever(BaseRetriever):
    def __init__(self, index):
        self._index = index

    def _retrieve(self, query_str: str) -> List[NodeWithScore]:
        # Simple keyword matching (replace with more sophisticated logic)
        keywords = query_str.lower().split()
        scored_nodes = []
        for node in self._index.docstore.docs.values():
            score = 0
            for keyword in keywords:
                if keyword in node.text.lower():
                    score += 1
            scored_nodes.append(NodeWithScore(node, score))

        # Sort by score
        scored_nodes.sort(key=lambda x: x.score, reverse=True)
        return scored_nodes

retriever = KeywordMatchingRetriever(index)
query_engine = RetrieverQueryEngine.from_args(retriever, service_context=service_context)

response = query_engine.query("What is in the documents?")
response


6. LlamaIndex Query Transformation
● Task: Implement a query transformer that:
○ Expands the user’s query using synonyms before sending it to the index.
● Use Case: A medical information system needs to ensure that if a doctor searches for “cardiac issues,” results for “heart disease” and “cardiovascular conditions” are also retrieved.

In [None]:
from llama_index.indices.query.query_transform.base import QueryTransform
from typing import List

class SynonymQueryTransformer(QueryTransform):
    def __init__(self, synonym_map):
        self._synonym_map = synonym_map
    def _transform(self, query_str: str) -> str:
        expanded_query = [query_str]
        for word in query_str.split():
            if word in self._synonym_map:
                expanded_query.extend(self._synonym_map[word])
        return " OR ".join(expanded_query)
synonym_map = {
    "cardiac issues": ["heart disease", "cardiovascular conditions"],
    # Add more synonyms as needed
}
query_transformer = SynonymQueryTransformer(synonym_map)
# Example query
query_str = "cardiac issues"
transformed_query = query_transformer.transform(query_str)
print(f"Original query: {query_str}")
print(f"Transformed query: {transformed_query}")
# Integrate with your query engine:
query_engine = index.as_query_engine(query_transform=query_transformer)
esponse = query_engine.query("cardiac issues")
print(response)


7. Hybrid Search (Vector + Keyword)
● Task: Modify LlamaIndex to use a hybrid search strategy that combines:
○ Vector similarity search (e.g., FAISS).
○ BM25 keyword search (e.g., using llama-index.query_engine.RetrieverQueryEngine).
● Use Case: A news aggregator wants users to find articles using semantic similarity and exact keyword matches, ensuring better coverage of trending topics.

In [None]:
from llama_index.retrievers import BM25Retriever
from llama_index.query_engine import RetrieverQueryEngine

bm25_retriever = BM25Retriever.from_defaults(index)
# Combine retrievers
def hybrid_retrieve(query_str):
    vector_results = index.as_retriever().retrieve(query_str)
    bm25_results = bm25_retriever.retrieve(query_str)
    # Combine the results
    combined_results = {}
    for node_with_score in vector_results:
        combined_results[node_with_score.node.node_id] = node_with_score.score
    for node_with_score in bm25_results:
        if node_with_score.node.node_id in combined_results:
            combined_results[node_with_score.node.node_id] += node_with_score.score
        else:
            combined_results[node_with_score.node.node_id] = node_with_score.score
    sorted_results = sorted(combined_results.items(), key=lambda x: x[1], reverse=True)
    return [NodeWithScore(index.docstore.get_node(node_id), score) for node_id, score in sorted_results]
# Create a hybrid query engine
hybrid_query_engine = RetrieverQueryEngine.from_args(hybrid_retrieve, service_context=service_context)
# Test the hybrid query engine
response = hybrid_query_engine.query("What is in the documents?")
response


8. Caching with LlamaIndex
● Task: Implement query caching using Redis or a local file-based approach to store previously queried results.
● Use Case: A customer service AI needs to improve response time by storing frequently asked questions so that repetitive queries do not trigger expensive LLM calls.

In [None]:
from llama_index.indices.query.query_engine import QueryEngine
from llama_index.indices.vector_store.base import VectorStoreIndex
from llama_index import StorageContext, load_index_from_storage
import json

class CacheQueryEngine(QueryEngine):
    def __init__(self, query_engine: QueryEngine, cache_dir: str):
        self._query_engine = query_engine
        self._cache_dir = cache_dir
        os.makedirs(self._cache_dir, exist_ok=True)

    def query(self, query_str: str):
        cache_file = os.path.join(self._cache_dir, f"{hash(query_str)}.json")

        if os.path.exists(cache_file):
            with open(cache_file, "r") as f:
                cached_response = json.load(f)
                print("Returning from cache")
                return cached_response
        else:
            response = self._query_engine.query(query_str)
            with open(cache_file, "w") as f:
                json.dump(response.response, f, indent=4)
            print("Storing in cache")
            return response
storage_context = StorageContext.from_defaults(persist_dir="/content/sample_data/data.txt")
index = load_index_from_storage(storage_context)
query_engine = index.as_query_engine()

# Create the cache query engine
cached_query_engine = CacheQueryEngine(query_engine, "/content/cache")

# Test the cached query engine
response1 = cached_query_engine.query("What is in the documents?")
print(response1)

response2 = cached_query_engine.query("What is in the documents?")
response2


9. API Integration with FastAPI
● Task: Create a REST API with FastAPI that:
○ Accepts a query via a /query endpoint.
○ Uses a LlamaIndex-powered RAG system to fetch and generate responses.
● Use Case: A business intelligence tool needs to allow users to search company reports via an API, returning relevant insights dynamically.

In [None]:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel

app = FastAPI()

class QueryRequest(BaseModel):
    query: str

@app.post("/query")
async def query_endpoint(request: QueryRequest):
    query_str = request.query
    try:
        response = cached_query_engine.query(query_str) # Use your existing query engine
        return {"response": response.response}
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Error processing query: {e}")

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)


10. Fine-tuning the RAG Model
● Task: Fine-tune the retrieval process by:
○ Adjusting similarity thresholds.
○ Implementing a re-ranking model (e.g., Cohere’s reranker or BERT-based re-ranking).
● Use Case: An e-commerce company wants a product recommendation AI that retrieves the most relevant products based on user queries, improving search relevance

In [None]:
from llama_index.indices.query.query_transform.base import QueryTransform
from llama_index.retrievers import BaseRetriever
from llama_index.schema import NodeWithScore
from typing import List

# Assuming 'index' and 'service_context' are defined from previous code blocks

class RerankingRetriever(BaseRetriever):
    def __init__(self, index, similarity_threshold=0.7):
        self._index = index
        self.similarity_threshold = similarity_threshold

    def _retrieve(self, query_str: str) -> List[NodeWithScore]:
        vector_results = self._index.as_retriever().retrieve(query_str)

        # 2. Re-ranking based on similarity score and threshold
        reranked_results = []
        for node_with_score in vector_results:
          if node_with_score.score >= self.similarity_threshold:
            reranked_results.append(node_with_score)
        # Sort by score again after filtering
        reranked_results.sort(key=lambda x: x.score, reverse=True)
        return reranked_results

# Example usage:
reranking_retriever = RerankingRetriever(index, similarity_threshold=0.8)
query_engine = RetrieverQueryEngine.from_args(reranking_retriever, service_context=service_context)
response = query_engine.query("What is in the documents?")
response
