In [None]:
import os
import json
from typing import List, Dict
import numpy as np
from sentence_transformers import SentenceTransformer
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch
import redis
from redis.commands.search.field import TextField
from redis.commands.search.indexDefinition import IndexDefinition, IndexType
from transformers import BitsAndBytesConfig

import warnings
import pandas as pd
from redisvl.utils.vectorize import HFTextVectorizer, BaseVectorizer
from redisvl.extensions.cache.embeddings import EmbeddingsCache

warnings.filterwarnings("ignore")
os.environ["TOKENIZERS_PARALLELISM"] = "false"

In [None]:
REDIS_HOST = "localhost"
REDIS_PORT = 6379
REDIS_PASSWORD=''
REDIS_URL = f"redis://:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}"
EMBEDDING_MODEL = "sentence-transformers/all-mpnet-base-v2"
LLM_MODEL = "Qwen/Qwen3-0.6B"
VECTOR_DIM = 768
INDEX_NAME = "rag_index"

In [3]:
RAG_DOCUMENTS = [
    "Machine learning is a subset of artificial intelligence that enables systems to learn and improve from experience without being explicitly programmed. It focuses on algorithms and statistical models.",
    "Deep learning uses neural networks with multiple layers to process data. It has revolutionized fields like computer vision, natural language processing, and speech recognition.",
    "Natural language processing (NLP) is a branch of AI that helps computers understand, interpret, and generate human language. It powers chatbots, translation services, and sentiment analysis.",
    "Vector databases store and retrieve data based on vector similarity. They are essential for semantic search, recommendation systems, and machine learning applications.",
    "Redis is an in-memory data structure store that provides high-performance caching and real-time data processing capabilities. RedisVL extends Redis with semantic search capabilities."
]

In [None]:
class EmbeddingEngine:
    def __init__(self, model_name: str = EMBEDDING_MODEL):
        """Initialize embedding engine with specified model"""
        self.model = SentenceTransformer(model_name)
    
    def embed(self, texts: List[str]) -> np.ndarray:
        """Generate embeddings for texts"""
        return self.model.encode(texts, convert_to_tensor=False)

In [None]:
class LLMEngine:
    def __init__(self, model_name: str = LLM_MODEL):
        """Initialize LLM with 4-bit quantization"""
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        bnb_config = BitsAndBytesConfig(
            load_in_4bit=True,
            bnb_4bit_use_double_quant=True,
            bnb_4bit_quant_type="nf4",
            bnb_4bit_compute_dtype=torch.bfloat16
        )
        
        self.model = AutoModelForCausalLM.from_pretrained(
            model_name,
            quantization_config=bnb_config,
            device_map="auto"
        )
    
    def generate(self, prompt: str, max_tokens: int = 100, cache_key: str = None) -> str:
        """Generate text with optional caching"""
        # Simple LM-Cache simulation (for production, integrate actual LMCache)
        if cache_key:
            cached_result = self._get_cache(cache_key)
            if cached_result:
                return cached_result
        
        # Generate text
        inputs = self.tokenizer.encode(prompt, return_tensors="pt").to(self.model.device)
        # Ensure max_length accounts for input length
        outputs = self.model.generate(
            inputs,
            max_length=max_tokens,
            temperature=0.7,
            top_p=0.9,
            do_sample=True
        )
        # Decode output
        result = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
        # Store in cache
        if cache_key:
            self._set_cache(cache_key, result)
        
        return result
    
    def _get_cache(self, key: str) -> str:
        """Placeholder for LM-Cache retrieval"""
        return None
    
    def _set_cache(self, key: str, value: str):
        """Placeholder for LM-Cache storage"""
        pass

In [None]:
class RedisSimpleVectorStore:
    def __init__(self, host: str = REDIS_HOST, port: int = REDIS_PORT):
        """Initialize Redis vector store and create index if not exists"""
        self.client = redis.Redis(host=host, port=port, decode_responses=False)
        self.embedding_engine = EmbeddingEngine()
        self._create_index()
    
    def _create_index(self):
        """Create Redis index for storing documents"""
        try:
            # self.client.ft(INDEX_NAME).info()
            self.client.flushdb()
            print(f"‚úì Index '{INDEX_NAME}' already exists, reusing...")
            return
        except:
            pass
        
        schema = (
            TextField("content"),
        )
        
        definition = IndexDefinition(index_type=IndexType.HASH)
        self.client.ft(INDEX_NAME).create_index(fields=schema, definition=definition)
        print(f"‚úì Created index '{INDEX_NAME}'")
    
    def add_documents(self, docs: List[str]):
        """Add documents with embeddings to vector store"""
        embeddings = self.embedding_engine.embed(docs)
        
        for idx, (doc, embedding) in enumerate(zip(docs, embeddings)):
            doc_id = f"doc:{idx}".encode()
            embedding_bytes = np.array(embedding, dtype=np.float32).tobytes()
            
            self.client.hset(
                doc_id,
                mapping={
                    b"content": doc.encode(),
                    b"embedding": embedding_bytes
                }
            )
        print(f"‚úì Added {len(docs)} documents to vector store")
    
    def semantic_search(self, query: str, top_k: int = 3) -> List[Dict]:
        """Semantic search with caching"""
        
        
        # Check semantic cache
        cache_key = f"semantic_cache:{query}".encode()
        cached_results = self.client.get(cache_key)
        
        if cached_results:
            print(f"‚úì Semantic cache hit for: '{query}'")
            return json.loads(cached_results.decode())
        
        query_embedding = self.embedding_engine.embed([query])[0]
        # Manual vector similarity search
        from sklearn.metrics.pairwise import cosine_similarity
        
        doc_scores = []
        
        # Get all documents
        keys = self.client.keys(b"doc:*")
        
        for key in keys:
            doc_data = self.client.hgetall(key)
            if b"embedding" in doc_data:
                # Deserialize embedding
                stored_embedding = np.frombuffer(
                    doc_data[b"embedding"], dtype=np.float32
                )
                # Calculate similarity
                similarity = cosine_similarity(
                    [query_embedding],
                    [stored_embedding]
                )[0][0]
                
                doc_scores.append({
                    "id": key.decode(),
                    "content": doc_data.get(b"content", b"").decode(),
                    "score": float(similarity)
                })
        
        # Sort by score and get top_k
        doc_scores.sort(key=lambda x: x["score"], reverse=True)
        formatted_results = doc_scores[:top_k]
        
        # Cache semantic results
        self.client.setex(
            cache_key,
            3600,  # 1 hour TTL
            json.dumps(formatted_results).encode()
        )
        
        print(f"‚úì Semantic search completed and cached for: '{query}'")
        return formatted_results

In [None]:
class RAGPipeline:
    def __init__(self):
        """Initialize RAG pipeline with vector store, LLM, and embedding engine"""
        self.vector_store = RedisSimpleVectorStore()
        self.llm = LLMEngine()
        self.embedding_engine = EmbeddingEngine()
    
    def initialize(self, documents: List[str]):
        """Initialize RAG with documents"""
        self.vector_store.add_documents(documents)
    
    def query(self, question: str, top_k: int = 3) -> Dict:
        """Execute RAG query with retrieval and generation"""
        # Retrieve relevant documents
        retrieved_docs = self.vector_store.semantic_search(question, top_k=top_k)
        
        # Build context from retrieved documents
        context = "\n".join([doc["content"] for doc in retrieved_docs])
        
        # Create prompt with context
        prompt = f"""Context:\n{context}\nQuestion:\n{question}\nAnswer:\n"""
        
        # Generate answer with LLM caching
        cache_key = f"llm_cache:{question}"
        answer = self.llm.generate(prompt, max_tokens=150, cache_key=cache_key)
        
        return {
            "question": question,
            "answer": answer,
            "retrieved_docs": retrieved_docs,
            "context": context
        }

In [11]:
rag = RAGPipeline()
rag.initialize(RAG_DOCUMENTS)

‚úì Index 'rag_index' already exists, reusing...
‚úì Added 5 documents to vector store


In [None]:
query="What is machine learning?"
print(f"\n‚ùì Query: {query}")
result = rag.query(query)
print(f"\nüìÑ Retrieved Documents ({len(result['retrieved_docs'])}): ")
for i, doc in enumerate(result['retrieved_docs'], 1):
    print(f"  {i}. {doc['content'][:80]}...")
print(f"\nüí¨ Answer:\n{result['answer']}")
print("-" * 60)

In [None]:
queries = [
    "What is machine learning?",
    "How does deep learning work?",
    "What is machine learning?"
]

print("\n" + "="*60)
for query in queries:
    print(f"\n‚ùì Query: {query}")
    result = rag.query(query)
    print(f"\nüìÑ Retrieved Documents ({len(result['retrieved_docs'])}): ")
    for i, doc in enumerate(result['retrieved_docs'], 1):
        print(f"  {i}. {doc['content'][:80]}...")
    print(f"\nüí¨ Answer:\n{result['answer']}")
    print("-" * 60)

The attention mask is not set and cannot be inferred from input because pad token is same as eos token. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.




‚ùì Query: What is machine learning?
‚úì Semantic search completed and cached for: 'What is machine learning?'


  attn_output = torch.nn.functional.scaled_dot_product_attention(



üìÑ Retrieved Documents (3): 
  1. Machine learning is a subset of artificial intelligence that enables systems to ...
  2. Deep learning uses neural networks with multiple layers to process data. It has ...
  3. Natural language processing (NLP) is a branch of AI that helps computers underst...

üí¨ Answer:
Context:
Machine learning is a subset of artificial intelligence that enables systems to learn and improve from experience without being explicitly programmed. It focuses on algorithms and statistical models.
Deep learning uses neural networks with multiple layers to process data. It has revolutionized fields like computer vision, natural language processing, and speech recognition.
Natural language processing (NLP) is a branch of AI that helps computers understand, interpret, and generate human language. It powers chatbots, translation services, and sentiment analysis.
Question:
What is machine learning?
Answer:
A machine learning is a type of learning process in which the syst

In [13]:
query="describe Machine learning"
print(f"\n‚ùì Query: {query}")
result = rag.query(query)
print(f"\nüìÑ Retrieved Documents ({len(result['retrieved_docs'])}): ")
for i, doc in enumerate(result['retrieved_docs'], 1):
    print(f"  {i}. {doc['content'][:80]}...")
print(f"\nüí¨ Answer:\n{result['answer']}")
print("-" * 60)


‚ùì Query: describe Machine learning
‚úì Semantic search completed and cached for: 'describe Machine learning'

üìÑ Retrieved Documents (3): 
  1. Machine learning is a subset of artificial intelligence that enables systems to ...
  2. Deep learning uses neural networks with multiple layers to process data. It has ...
  3. Natural language processing (NLP) is a branch of AI that helps computers underst...

üí¨ Answer:
Context:
Machine learning is a subset of artificial intelligence that enables systems to learn and improve from experience without being explicitly programmed. It focuses on algorithms and statistical models.
Deep learning uses neural networks with multiple layers to process data. It has revolutionized fields like computer vision, natural language processing, and speech recognition.
Natural language processing (NLP) is a branch of AI that helps computers understand, interpret, and generate human language. It powers chatbots, translation services, and sentiment analysis

# Semantic Vector Store

In [36]:
from redisvl.index import SearchIndex
from redisvl.extensions.llmcache import SemanticCache
from redisvl.query import VectorQuery

In [None]:
class RedisSemanticVectorStore:
    def __init__(self, host: str = REDIS_HOST, port: int = REDIS_PORT):
        """Initialize Redis vector store and create index"""
        self.client = redis.Redis(host=host, port=port, decode_responses=False)

        self._create_index()

    def _create_index(self):
        """Create Redis index for storing documents"""

        index_name = "redisvl"

        schema = {
                "index": {
                    "name": index_name,
                    "prefix": "doc"
                },
                "fields": [
                    {
                        "name": "doc_id",
                        "type": "tag",
                        "attrs": {
                            "sortable": True
                        }
                    },
                    {
                        "name": "content",
                        "type": "text"
                    },
                    {
                        "name": "embedding",
                        "type": "vector",
                        "attrs": {
                            "dims": 768,
                            "distance_metric": "cosine",
                            "algorithm": "hnsw",
                            "datatype": "float32"
                        }
                    }
                ]
                }

        self.client = SearchIndex.from_dict(schema, redis_url=REDIS_URL)
        self.client.create(overwrite=True, drop=True)
        print(f"‚úì Created index '{index_name}'")

    def add_documents(self, docs: List[str], embedding_engine):
        """Add documents with embeddings to vector store"""
        from redisvl.redis.utils import array_to_buffer
        embedding = embedding_engine.embed_many(docs)
        data = [
            {
                "doc_id": i,
                "content": chunk,
                "embedding": array_to_buffer(embedding[i], dtype="float32"),
            }
            for i, chunk in enumerate(docs)
        ]
        keys = self.client.load(data, id_field="doc_id")
        print(f"‚úì Added {len(keys)} documents to vector store")

    def semantic_search(self, query_embedding: str, top_k: int = 3) -> List[Dict]:
        """Semantic search with caching - using manual similarity"""

        
        # Check semantic cache
        vector_query = VectorQuery(
            vector=query_embedding,
            vector_field_name="embedding",
            num_results=top_k,
            return_fields=["doc_id", "content"],
            return_score=True,
        )

        print(str(vector_query))
        result=self.client.query(vector_query)
        return pd.DataFrame(result)

In [None]:
class SemaRAGPipeline:
    def __init__(self):
        """Initialize RAG pipeline with vector store, LLM, and embedding engine"""
        self.vector_store = RedisSemanticVectorStore()
        self.llm = LLMEngine()
        self.embedding_engine = HFTextVectorizer(
            model=EMBEDDING_MODEL,
            cache=EmbeddingsCache(name="embedcache", ttl=600, redis_url=REDIS_URL),
        )

        #  Initialize semantic cache
        self.llmcache = SemanticCache(
            name="cache",
            vectorizer=self.embedding_engine,
            redis_url=REDIS_URL,
            ttl=120,
            distance_threshold=0.2,
            overwrite=True,
        )

    def initialize(self, documents: List[str]):
        """Initialize RAG with documents"""

        self.vector_store.add_documents(
            docs=documents, embedding_engine=self.embedding_engine
        )

    def query(self, question: str, top_k: int = 3) -> Dict:
        """Execute RAG query with retrieval and generation"""
        # Retrieve relevant documents
        query_vector = self.llmcache._vectorizer.embed(question)
        if cache_result := self.llmcache.check(vector=query_vector):
            print("Cache hit!")
            return cache_result[0]["response"]
        query_embedding = self.embedding_engine.embed(question)
        retrieved_docs = self.vector_store.semantic_search(query_embedding, top_k=top_k)

        context = "\n".join([doc for doc in retrieved_docs["content"]])

        prompt = f"""Context:\n{context}\nQuestion:\n{question}\nAnswer:\n"""

        cache_key = f"llm_cache:{question}"
        answer = self.llm.generate(prompt, max_tokens=150, cache_key=cache_key)
        self.llmcache.store(question, answer, query_vector)
        return {
            "question": question,
            "answer": answer,
            "context": context,
        }

In [49]:
sem_rag = SemaRAGPipeline()
sem_rag.initialize(RAG_DOCUMENTS)

22:40:06 redisvl.index.index INFO   Index already exists, overwriting.


‚úì Created index 'redisvl'
22:40:58 sentence_transformers.SentenceTransformer INFO   Use pytorch device_name: cuda:0
22:40:58 sentence_transformers.SentenceTransformer INFO   Load pretrained SentenceTransformer: sentence-transformers/all-mpnet-base-v2
22:41:02 redisvl.index.index INFO   Index already exists, overwriting.
‚úì Added 5 documents to vector store


In [61]:
query = "describe Machine learning"
print(f"\n‚ùì Query: {query}")
result = sem_rag.query(query)
if isinstance(result,dict):
    print(f"\nüìÑ Retrieved Context {result['context']}")
    print(f"\nüí¨ Answer:\n{result['answer']}")
    print("-" * 60)
else:
    print(result)


‚ùì Query: describe Machine learning
Cache hit!
Context:
Machine learning is a subset of artificial intelligence that enables systems to learn and improve from experience without being explicitly programmed. It focuses on algorithms and statistical models.
Deep learning uses neural networks with multiple layers to process data. It has revolutionized fields like computer vision, natural language processing, and speech recognition.
Natural language processing (NLP) is a branch of AI that helps computers understand, interpret, and generate human language. It powers chatbots, translation services, and sentiment analysis.
Question:
describe Machine learning
Answer:
The answer is: Machine learning is a subset of artificial intelligence that enables systems to learn and improve from experience without being explicitly programmed. It focuses on algorithms and statistical models.
The answer is: Machine learning is a subset of artificial
