In [None]:
!pip install -U --quiet langchain langchain_community langchain-google-vertexai langchain_chroma
!pip install --quiet "unstructured[all-docs]" pypdf pillow pydantic lxml pillow chromadb tiktoken wikipedia

In [None]:
PROJECT_ID = ""
REGION = ""

from google.colab import auth
auth.authenticate_user()

In [None]:
import vertexai
vertexai.init(project = PROJECT_ID , location = REGION)

In [None]:
#Download and load wikipedia page data
import logging
from langchain_community.document_loaders import WikipediaLoader

logging.basicConfig(level=logging.INFO)
docs = []
#Query Wikipedia and load the articles
for item in ["Solar System", "Sun", "Earth", "Mars", "Jupiter", "Saturn", "Uranus", "Neptune", "Venus", "Mercury planet", "Asteroid", "comet"]:
    loader = WikipediaLoader(query=item, load_max_docs=1, doc_content_chars_max=100000000)
    docs.append(loader.load())


In [None]:
#Put page content into this list
texts = [d[0].page_content for d in docs if d]
list_of_titles = [d[0].metadata.get("title") for d in docs if d]

In [None]:
#Generate Text summaries
from langchain_google_vertexai import VertexAI , ChatVertexAI , VertexAIEmbeddings
from langchain.prompts import PromptTemplate
from langchain_core.messages import AIMessage
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableLambda

Read about Runnable Lambda here - https://www.pinecone.io/learn/series/langchain/langchain-expression-language/

In [None]:
# Generate summaries of text elements
def generate_text_summaries(texts, summarize_texts=False):
   """
   Summarize text elements
   texts: List of str
   summarize_texts: Bool to summarize texts
   """

   # Prompt
   prompt_text = """You are an assistant tasked with summarizing text for retrieval. \
   These summaries will be embedded and used to retrieve the raw text. \
   Give a concise summary of the text that is well optimized for retrieval. text: {element} """
   prompt = PromptTemplate.from_template(prompt_text)
   empty_response = RunnableLambda(
       lambda x: AIMessage(content="Error processing document")
   )

   # Text summary chain
   model = VertexAI(
       temperature=0, model_name="gemini-pro", max_output_tokens=1024
   ).with_fallbacks([empty_response])

   summarize_chain = {"element": lambda x: x} | prompt | model | StrOutputParser()

   # Initialize empty summaries
   text_summaries = []

   # Apply to text if texts are provided and summarization is requested
   if texts and summarize_texts:
       text_summaries = summarize_chain.batch(texts, {"max_concurrency": 2})
   elif texts:
       text_summaries = texts

   return(text_summaries)


# Get text summaries
text_summaries = generate_text_summaries(texts, summarize_texts=True)

# Code for multi vector Retrival

In [None]:
import uuid
from langchain.retrievers.multi_vector import MultiVectorRetriever
from langchain.storage import InMemoryStore
from langchain_chroma import Chroma
from langchain_core.documents import Document

In [None]:
#This function provides functionality to first setup a retrival funtion,
# then function to add the supplied documents to the DB (both the summary and
# raw text), then return this initialized Retriver function
def create_multi_vector_retriever(
   vectorstore, text_summaries, texts):
   """
   Create retriever that indexes summaries, but returns raw images or texts
   """

   # Initialize the storage layer
   store = InMemoryStore()
   id_key = "doc_id"

   # Create the multi-vector retriever
   retriever = MultiVectorRetriever(
       vectorstore=vectorstore,
       docstore=store,
       id_key=id_key,
   )

   # Helper function to add documents to the vectorstore and docstore
   def add_documents(retriever, doc_summaries, doc_contents):
       doc_ids = [str(uuid.uuid4()) for _ in doc_contents] #generate uniq ids
       summary_docs = [
           Document(page_content=s, metadata={id_key: doc_ids[i]})
           for i, s in enumerate(doc_summaries)
       ]
       retriever.vectorstore.add_documents(summary_docs) # Summary added to DB
       retriever.docstore.mset(list(zip(doc_ids, doc_contents)))# Raw text added to DB

   # Add texts, tables, and images
   # Check that text_summaries is not empty before adding
   if text_summaries:
       add_documents(retriever, text_summaries, texts)

   return(retriever)


# The vectorstore to use to Vecterize, Embbed and index the summaries
vectorstore = Chroma(
   collection_name="test-RAG-WikiPages",
   embedding_function=VertexAIEmbeddings(model_name="textembedding-gecko@latest"),
)

# Create and initialize retriever using the functions created above
retriever_multi_vector_img = create_multi_vector_retriever(
   vectorstore,
   text_summaries,
   texts
)


Build the milti modal RAG pipeline

In [None]:
from langchain_core.runnables import RunnableLambda, RunnablePassthrough
from langchain_core.messages import HumanMessage
import io

In [None]:
#convert_Doc_to_Dict() - THis function coverts the retriver output to a dict
def convert_Doc_to_Dict(docs):
   """
   coverts the retriver output to a dict
   """
   b64_images = []
   texts = []
   for doc in docs:
       # Check if the document is of type Document and extract page_content if so
       if isinstance(doc, Document):
           doc = doc.page_content
           texts.append(doc)
   return({"images": b64_images, "texts": texts})

In [None]:
#This function creates the prompt for the RAG task
def rag_prompt_func(data_dict):
   """
   Join the context into a single string
   """
   formatted_texts = "\n".join(data_dict["context"]["texts"])
   messages = []

   # Adding the text for analysis
   text_message = {
       "type": "text",
       "text": (
           "You are responsible AI assistant who is role playing a middle school science teacher. \n"
           "You will be given text. \n"
           "You can only Use this information to answer the question from your students. \n"
           "If the answer in not part of your context, just say that you don't know, don't try to make up an answer. \n\n"
           f"User-provided question: {data_dict['question']}\n\n"
           "Text and / or tables:\n"
           f"{formatted_texts}"
       ),
   }
   messages.append(text_message)

   return([HumanMessage(content=messages)])


In [None]:
def multi_modal_rag_chain(retriever):
   """
   Multi-modal RAG chain
   """

   # Multi-modal LLM
   model = ChatVertexAI(
       temperature=0, model_name="gemini-pro", max_output_tokens=1024
       #gemini-1.5-flash, gemma2,
   )

   # RAG pipeline
   chain = (
       {
           "context": retriever | RunnableLambda(convert_Doc_to_Dict),
           "question": RunnablePassthrough(),
       }
       | RunnableLambda(rag_prompt_func)
       | model
       | StrOutputParser()
   )

   return chain


# Create RAG chain
chain_multimodal_rag = multi_modal_rag_chain(retriever_multi_vector_img)

Test Retrival pipeline

In [None]:
query = "What updates do you have about Genrative AI technology space?"
docs = retriever_multi_vector_img.get_relevant_documents(query, limit=3)

# We get relevant docs
len(docs)

Calling the RAG pipeline

In [None]:
query = "What is the diameter of the pluto?"
result = chain_multimodal_rag.invoke(query)

from IPython.display import Markdown as md
md(result)

I'm sorry, but the text you provided does not contain information about the diameter of Pluto. Therefore, I cannot answer your question. 


In [None]:
query = "What is the diameter a one rupee coin?"
result = chain_multimodal_rag.invoke(query)

from IPython.display import Markdown as md
md(result)

I'm sorry, but the text you provided does not contain information about the diameter of a one rupee coin. Therefore, I cannot answer your question. 

Would you like me to try to find the answer to your question from another source? 
