<a href="https://colab.research.google.com/github/chakrateja70/AIChatBot/blob/main/ClaudeAgent.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# *Required Pacakges*

In [None]:
!pip install langchain langchain_anthropic pinecone pinecone-client anthropic voyageai langchain-voyageai python-dotenv pypdf langchain-community datasets langchain_openai


***Retrieving API Keys from Userdata***

In [None]:
from google.colab import userdata
userdata.get('PINECONE_API_KEY')
userdata.get('ANTHROPIC_API_KEY')
userdata.get('VOYAGE_API_KEY')

In [None]:
from langchain_community.document_loaders import PyPDFLoader, Docx2txtLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from typing import List
from langchain_core.documents import Document
import os

def load_documents(folder_path: str) -> List[Document]:
    pages = []
    for filename in os.listdir(folder_path):
        file_path = os.path.join(folder_path, filename)
        if filename.endswith('.pdf'):
            loader = PyPDFLoader(file_path)
        elif filename.endswith('.docx'):
            loader = Docx2txtLoader(file_path)
        else:
            print(f"Unsupported file type: {filename}")
            continue
        pages.extend(loader.load())
    return pages

folder_path = "/content/"
pages = load_documents(folder_path)
print(f"Loaded {len(pages)} documents from the folder.")


In [None]:
from langchain_text_splitters import RecursiveCharacterTextSplitter

text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=100,
    chunk_overlap=30,
    length_function=len,
    is_separator_regex=False,
)
splits = text_splitter.split_documents(pages)
print(f"split the document into  {len(splits)} chunks.")

In [None]:
from langchain_community.embeddings.sentence_transformer import SentenceTransformerEmbeddings

embedding_function = SentenceTransformerEmbeddings(model_name="all-MiniLM-L6-v2")
document_embeddings = embedding_function.embed_documents([split.page_content for split in splits])

document_embeddings

***NOTE:*** If you get error regarding index, create index manually ( dimensions= 384)


In [None]:
import os
from langchain.vectorstores import Pinecone
from langchain_community.embeddings import HuggingFaceEmbeddings
from pinecone import Pinecone, ServerlessSpec

pinecone_api_key = "your pinecone api_key"
os.environ["PINECONE_API_KEY"] = pinecone_api_key

pc = Pinecone(api_key=pinecone_api_key)

index_name = "agentindexx"


if index_name not in pc.list_indexes().names():
    print(f"Index '{index_name}' does not exist. Creating a new index...")

    pc.create_index(
        name=index_name,
        dimension=384,
        spec=ServerlessSpec(
            cloud="aws",
            region="us-east-1"
        )
    )
    print(f"Index '{index_name}' created successfully!")
else:
    print(f"Index '{index_name}' already exists.")

from langchain_community.embeddings import HuggingFaceEmbeddings
embedding_function = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")

from langchain.vectorstores import Pinecone
vectordb = Pinecone.from_documents(
    documents=splits,
    embedding=embedding_function,
    index_name=index_name,
)

print(f"Data successfully stored in Pinecone under index '{index_name}'!")

In [None]:
import requests
from langchain.chains import RetrievalQA
from langchain.llms import BaseLLM
from pydantic import BaseModel
from langchain.schema import LLMResult, Generation

class GeminiAPI(BaseLLM, BaseModel):
    model_name: str
    api_key: str
    api_url: str = "https://api.gemini.com/v2/flash-model"

    def __init__(self, api_key: str, model_name: str = "gemini-2.0-flash"):
        super().__init__(api_key=api_key, model_name=model_name)

    def _call(self, prompt: str, **kwargs) -> str:
        response = requests.post(
            self.api_url,
            json={"model": self.model_name, "input": prompt},
            headers={"Authorization": f"Bearer {self.api_key}"}
        )
        response_data = response.json()
        return response_data.get("output", "")

    def _generate(self, prompt: str, **kwargs):
        response_text = self._call(prompt, **kwargs)

        if not response_text.strip():
            response_text = "No valid response received from the model."

        generation = Generation(text=response_text)
        return LLMResult(generations=[[generation]])

    @property
    def _llm_type(self):
        return "GeminiAPI"

api_key = "your_gemini_api_key"

gemini_model = GeminiAPI(api_key=api_key)

retriever = vectordb.as_retriever()

rag_chain = RetrievalQA.from_chain_type(
    llm=gemini_model,
    retriever=retriever,
    return_source_documents=True
)

print("RAG pipeline successfully set up with Gemini 2.0 model!")




In [None]:
from transformers import pipeline
from langchain.chains import RetrievalQA
from langchain.llms import HuggingFacePipeline

model_name = "google/flan-t5-large"
llm_pipeline = pipeline("text2text-generation", model=model_name, tokenizer=model_name)

llm = HuggingFacePipeline(pipeline=llm_pipeline)

retriever = vectordb.as_retriever()

rag_chain = RetrievalQA.from_chain_type(
    llm=llm,
    retriever=retriever,
    return_source_documents=True
)

print("RAG pipeline successfully set up with Hugging Face model!")

In [None]:

query = "your query here ?"
response = rag_chain.invoke({"query": query})

answer = response.get("result", "No relevant information found in the knowledge base.")

print("Answer:", answer)
