In [2]:
! pip install langchain-community pinecone -q

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/2.5 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━[0m [32m2.1/2.5 MB[0m [31m64.0 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.5/2.5 MB[0m [31m44.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m427.3/427.3 kB[0m [31m34.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m87.5/87.5 kB[0m [31m9.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m49.6/49.6 kB[0m [31m3.7 MB/s[0m eta [36m0:00:00[0m
[?25h

In [39]:
from langchain.llms import OpenAI, HuggingFaceHub
from langchain.embeddings import OpenAIEmbeddings, HuggingFaceEmbeddings
from langchain.chains import LLMChain, RetrievalQA
from langchain.prompts import PromptTemplate
from langchain.vectorstores import Pinecone
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.document_loaders import (
    PyPDFLoader,
    TextLoader,
    CSVLoader,
    UnstructuredWordDocumentLoader
)
from langchain.memory import ConversationBufferMemory
from typing import List, Dict, Optional, Union, Any
import pinecone
import os
import logging
from pathlib import Path
from abc import ABC, abstractmethod

In [7]:
class ModelLocation(ABC):
    """Strategy for model location handling"""
    @abstractmethod
    def get_model(self, model_name: str):
        pass

class RemoteModelLocation(ModelLocation):
    """Handles remote API-based models"""
    def get_model(self, model_name: str):
        return {"location": "remote", "model_name": model_name}

class LocalModelLocation(ModelLocation):
    """Handles local model loading"""
    def __init__(self, cache_dir: Optional[str] = None):
        self.cache_dir = cache_dir

    def get_model(self, model_name: str):
        return {
            "location": "local",
            "model_name": model_name,
            "cache_dir": self.cache_dir
        }

In [36]:
class BaseLLMProvider(ABC):
    """Abstract base class for LLM providers"""

    def __init__(self, model_location: ModelLocation):
        self.model_location = model_location

    @abstractmethod
    def get_llm(self):
        pass

    @abstractmethod
    def get_embeddings(self):
        pass

In [37]:
class OpenAIProvider(BaseLLMProvider):
    """OpenAI implementation - Always remote"""

    def __init__(self, model_name: str = "gpt-3.5-turbo", temperature: float = 0.7):
        super().__init__(RemoteModelLocation())  # OpenAI is always remote
        self.model_name = model_name
        self.temperature = temperature

    def get_llm(self):
        return OpenAI(
            temperature=self.temperature,
            model_name=self.model_name
        )

    def get_embeddings(self):
        return OpenAIEmbeddings()

In [38]:
class HuggingFaceProvider(BaseLLMProvider):
    """HuggingFace implementation with remote/local options"""

    def __init__(self,
                 model_name: str = "google/flan-t5-base",
                 api_token: Optional[str] = None,
                 model_location: Optional[ModelLocation] = None):
        super().__init__(model_location or RemoteModelLocation())
        self.model_name = model_name
        self.api_token=api_token

    def get_llm(self):
        model_config = self.model_location.get_model(self.model_name)

        if model_config["location"] == "remote":
            return HuggingFaceHub(
                repo_id=self.model_name,
                huggingfacehub_api_token=self.api_token,
                model_kwargs={"temperature": 0.7}
            )
        else:
            # For API-only usage without downloading
            return HuggingFaceHub(
                repo_id=self.model_name,
                huggingfacehub_api_token=self.api_token,
                model_kwargs={
                    "temperature": 0.7,
                    "use_api": True
                }
            )

    def get_embeddings(self):
        model_config = self.model_location.get_model(self.model_name)

        if model_config["location"] == "remote":
            return HuggingFaceEmbeddings(
                model_name=self.model_name,
                model_kwargs={"use_api": True}
            )
        else:
            return HuggingFaceEmbeddings(
                model_name=self.model_name,
                cache_folder=model_config.get("cache_dir")
            )


In [32]:
API_token="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"
model=HuggingFaceProvider(model_name="mistralai/Mistral-7B-v0.1",api_token=API_token)
model=model.get_llm()

In [34]:
model.__call__("What is the strongest country?")

'What is the strongest country? As we strive to find the answer, let us not forget that the question itself is heavily biased. It is a question that naturally assumes a country is the most powerful and can be judged by the same criteria. But the world we live in is complex and not a single country can be pigeonholed into a single answer.\n\nTo understand this better, let us ask a few questions. Is the United States of America the most powerful country on earth? Is China the'

In [None]:
class VectorStoreManager:
    """Manages interactions with Pinecone vector store"""

    def __init__(self, index_name: str, embedding_provider: BaseLLMProvider):
        self.index_name = index_name
        self.embedding_provider = embedding_provider
        self.vectorstore = None

    def initialize_vectorstore(self):
        """Initialize connection to Pinecone"""
        embeddings = self.embedding_provider.get_embeddings()
        self.vectorstore = Pinecone.from_existing_index(
            index_name=self.index_name,
            embedding=embeddings
        )

    def similarity_search(self, query: str, k: int = 5) -> List[Dict]:
        """Perform similarity search"""
        if not self.vectorstore:
            raise ValueError("Vector store not initialized")
        return self.vectorstore.similarity_search(query, k=k)

In [None]:
class DocumentProcessor:
    """Handles document loading and processing"""

    def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200):
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            length_function=len,
            is_separator_regex=False
        )

    def load_documents(self, file_paths: List[str]) -> List[Any]:
        """Load documents from various file formats"""
        documents = []
        for file_path in file_paths:
            file_extension = Path(file_path).suffix.lower()

            try:
                if file_extension == '.pdf':
                    loader = PyPDFLoader(file_path)
                elif file_extension == '.txt':
                    loader = TextLoader(file_path)
                elif file_extension == '.csv':
                    loader = CSVLoader(file_path)
                elif file_extension in ['.doc', '.docx']:
                    loader = UnstructuredWordDocumentLoader(file_path)
                else:
                    raise ValueError(f"Unsupported file format: {file_extension}")

                documents.extend(loader.load())

            except Exception as e:
                logging.error(f"Error loading document {file_path}: {str(e)}")
                continue

        return documents

    def process_documents(self, documents: List[Any]) -> List[Any]:
        """Split documents into chunks"""
        return self.text_splitter.split_documents(documents)


In [46]:
os.getcwd()

'/content'

In [None]:

class RAGVectorStore:
    """Manages vector store operations for RAG"""

    def __init__(self,
                 index_name: str,
                 embedding_provider: Any,
                 pinecone_api_key: str,
                 pinecone_env: str):
        self.index_name = index_name
        self.embedding_provider = embedding_provider
        self.pinecone_api_key = pinecone_api_key
        self.pinecone_env = pinecone_env
        self.vectorstore = None

    def initialize_vectorstore(self):
        """Initialize Pinecone vector store"""
        pinecone.init(
            api_key=self.pinecone_api_key,
            environment=self.pinecone_env
        )

        self.vectorstore = Pinecone.from_existing_index(
            index_name=self.index_name,
            embedding=self.embedding_provider.get_embeddings()
        )

    def add_documents(self, documents: List[Any]):
        """Add documents to vector store"""
        if not self.vectorstore:
            raise ValueError("Vector store not initialized")

        self.vectorstore.add_documents(documents)

    def similarity_search(self, query: str, k: int = 4) -> List[Any]:
        """Perform similarity search"""
        if not self.vectorstore:
            raise ValueError("Vector store not initialized")

        return self.vectorstore.similarity_search(query, k=k)

In [41]:

class RAGLLMManager:
    """Manager for RAG operations with LLM integration"""

    def __init__(
        self,
        llm_provider: Any,
        vector_store: Any,
        memory_size: int = 5
    ):
        self.llm_provider = llm_provider
        self.vector_store = vector_store
        self.memory = ConversationBufferMemory(
            memory_key="chat_history",
            return_messages=True,
            k=memory_size
        )
        self.qa_chain = None

        # Default RAG prompt template
        self.default_template = """
        You are a helpful AI assistant. Use the following pieces of context to answer the question at the end.
        If you don't know the answer, just say that you don't know, don't try to make up an answer.

        Context: {context}

        Chat History: {chat_history}

        Question: {question}

        Helpful Answer:"""

    def initialize(self):
        """Initialize the RAG system"""
        self.vector_store.initialize_vectorstore()

        prompt = PromptTemplate(
            input_variables=["context", "chat_history", "question"],
            template=self.default_template
        )

        self.qa_chain = RetrievalQA.from_chain_type(
            llm=self.llm_provider.get_llm(),
            chain_type="stuff",
            retriever=self.vector_store.vectorstore.as_retriever(),
            memory=None,
            chain_type_kwargs={
                "prompt": None
            }
        )

    # def process_and_add_documents(self,
    #                             file_paths: List[str],
    #                             processor: DocumentProcessor):
    #     """Process and add documents to the vector store"""
    #     documents = processor.load_documents(file_paths)
    #     chunks = processor.process_documents(documents)
    #     self.vector_store.add_documents(chunks)

    async def get_response(self,
                          question: str,
                          return_source_documents: bool = False) -> Union[str, Dict]:
        """Get response using RAG"""
        if not self.qa_chain:
            raise ValueError("RAG system not initialized")

        result = await self.qa_chain.arun(
            query=question,
            return_source_documents=return_source_documents
        )

        if return_source_documents:
            return {
                "answer": result["result"],
                "sources": [doc.metadata for doc in result["source_documents"]]
            }

        return result


In [None]:
class LLMManager:
    """Main class for managing LLM operations"""

    def __init__(
        self,
        llm_provider: BaseLLMProvider,
        vector_store_manager: VectorStoreManager,
        default_prompt_template: Optional[str] = None
    ):
        self.llm_provider = llm_provider
        self.vector_store_manager = vector_store_manager
        self.llm_chain = None
        self.default_prompt_template = default_prompt_template or """
        Context: {context}
        Question: {question}
        Please provide a detailed answer:
        """

    def initialize(self):
        """Initialize LLM chain and vector store"""
        llm = self.llm_provider.get_llm()
        prompt = PromptTemplate(
            input_variables=["context", "question"],
            template=self.default_prompt_template
        )
        self.llm_chain = LLMChain(llm=llm, prompt=prompt)
        self.vector_store_manager.initialize_vectorstore()

    def switch_llm_provider(self, new_provider: BaseLLMProvider):
        """Switch to a different LLM provider"""
        self.llm_provider = new_provider
        self.initialize()

    async def get_response(self, question: str, use_context: bool = True) -> str:
        """Get response from LLM with optional context from vector store"""
        if not self.llm_chain:
            raise ValueError("LLM chain not initialized")

        context = ""
        if use_context:
            relevant_docs = self.vector_store_manager.similarity_search(question)
            context = "\n".join([doc.page_content for doc in relevant_docs])

        response = await self.llm_chain.arun(
            context=context,
            question=question
        )
        return response


In [47]:
async def main():
    # Initialize providers
    embedding_provider = OpenAIProvider()  # Or HuggingFaceProvider
    llm_provider = HuggingFaceProvider(model_name="mistralai/Mistral-7B-v0.1",api_token=API_token)

    # Initialize vector store
    vector_store = RAGVectorStore(
        index_name="my-rag-index",
        embedding_provider=embedding_provider,
        pinecone_api_key=os.getenv("PINECONE_API_KEY"),
        pinecone_env=os.getenv("PINECONE_ENV")
    )

    # Initialize RAG manager
    rag_manager = RAGLLMManager(
        llm_provider=llm_provider,
        vector_store=vector_store
    )
    rag_manager.initialize()

    # # Initialize document processor
    # processor = DocumentProcessor(
    #     chunk_size=1000,
    #     chunk_overlap=200
    # )

    # # Add documents
    # file_paths = ["document1.pdf", "document2.txt"]
    # rag_manager.process_and_add_documents(file_paths, processor)

    # Get response
    response = await rag_manager.get_response(
        "What is the main topic discussed in the documents?",
        return_source_documents=True
    )

    print("Answer:", response["answer"])
    print("Sources:", response["sources"])
