<a href="https://colab.research.google.com/github/araujobma/LangchainGeminiMultimodalMongoDB/blob/main/GeminiMUltimodal.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Gemini Multimodal RAG Applications with LangChain
## This notebook is a replication of the notebook from this video https://www.youtube.com/watch?v=vxF8-ay9Bzk (Google Cloud Events) with some adaptations.
Use at your own risk 😆. You won't need to pay for any service

The changes are:
  - The vectorstore is changed(adapted) to Atlas mongoDB
  - Instead of VertexAI, it is used langchain-google-genai to access models(llm chat and embedding)
  - the code is not completely the same. It has some minors changes like variable names etc.

Doing the installing step:

In [None]:
!pip install -q -U langchain langchain-google-genai langchain-mongodb pymongo[srv] pypdf

First you need to set the anvironment variable GOOGLE_API_KEY; This key you can generate at https://aistudio.google.com for free. This is necessary to use Gemini

In [None]:
import os
from google.colab import userdata

os.environ["GOOGLE_API_KEY"] = userdata.get("GOOGLE_API_KEY") #this is defined in colab Secrets

## First part of video: Langchain basics

In [None]:
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.messages import HumanMessage
chat_model = ChatGoogleGenerativeAI(model="gemini-1.5-flash-latest", temperature=0.2)

In [None]:
question1 = "What is BMW revenue for 2022 break it up per line of business and share the 2024 business prespective?"
for chunk in chat_model.stream([question1]):
    print(chunk.content, end="", flush=True)

In [None]:
from langchain.prompts import PromptTemplate

template = "What is {company} revenue for {year} break it up per line of business and share the 2024 business perspective"

prompt = PromptTemplate.from_template(template)
chain = prompt | chat_model #model__or__(prompt)
chain

In [None]:
for chunk in chain.stream({"company": "BMW", "year": 2022}):
    print(chunk.content, end="", flush=True)

In [None]:
result = chain.invoke({"company": "BMW", "year": 2022})
print(result.content)

In [None]:
for chunk in llm_model.invoke(question1, stream=True, stop=["growth"]):
    print(chunk[1], end="", flush=True)

In [None]:
answer = chat_model.invoke([HumanMessage(content=question1)])
print(answer.content)

In [None]:
for chunk in chat_model.stream([HumanMessage(content=question1)]):
    print(chunk.content)

In [None]:
answer = chat_model.invoke([HumanMessage(content="Hello. Who are you?")])

In [None]:
print(answer.content)

In [None]:
answer = chat_model.invoke([HumanMessage(content="Are you multimodal? Or should I use another model")])
#When I run this cell the llm answered that is not a Multimodal. But in fact the model gemini-1.5-flash is multimodal
print(answer.content)

In [None]:
from IPython.display import HTML, display
import base64

def encode_image(image_path):
    """"getting base64 string"""
    with open(image_path, "rb") as image_file:
        return base64.b64encode(image_file.read()).decode('utf-8')

def plt_img_base64(img_base64):
  """Display base64 encoded string as image"""
  #Create an HTML img tag with the base64 string as the source
  image_html = f'<img src="data:image/jpeg;base64,{img_base64}" />'
  #Display the image by rendering the HTML
  display(HTML(image_html))

In [None]:
img_url = "https://t0.gstatic.com/licensed-image?q=tbn:ANd9GcRKJRziB51kfnSanSzZLUIAPOUP4vhYSbs2MN8-CEGMzjHXGfOLpD-J7ORjvVdGskFXRYmc3l15" #ai studio example you can use another

import requests

with open('image.jpeg', 'wb') as f:
    f.write(requests.get(img_url).content)



In [None]:
plt_img_base64(encode_image('image.jpeg'))

In [None]:
message = HumanMessage(
    content=[
        {
            "type": "text",
            "text": "What's in this image?",
        },  # You can optionally provide text parts
        {"type": "image_url", "image_url": './image.jpeg'},
    ]
)

answer = chat_model.invoke([message])
print(answer.content)

## Second part. The RAG part
you need this file: storage.googleapis.com/benchmarks-artifacts/langchain-docs-benchmarking/cj.zip

In [None]:
from langchain_community.document_loaders import PyPDFLoader
loader = PyPDFLoader("./cj.pdf")
docs = loader.load()
tables =[]
texts = [d.page_content for d in docs]

In [None]:
texts[0]

In [None]:
from langchain_core.messages import AIMessage
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableLambda, RunnablePassthrough

In [None]:
# Generate summaries of text elements
def generate_text_summaries(texts, tables, summarize_texts=False):
    """
    texts: List of str
    tables: List of str
    summarize_texts: Bool
    """

    #Prompt
    prompt_text = """
    You are an assistant tasked with summarizing tables and text for retrieval.
    These summaries will be embedded  and used  to retrieve the raw text or table elements.
    Give a concise  summary of the table or text that is well  optimized  for retrieval.
    Table or text: {element}
    """

    prompt = PromptTemplate.from_template(prompt_text)
    empty_response = RunnableLambda(
        lambda x: AIMessage(content="Error processing document")
    )
    #Text summary chain
    model = ChatGoogleGenerativeAI(
        temperature=0,
        model='gemini-1.5-flash-latest',
        max_output_tokens=1024
    ).with_fallbacks([empty_response])
    #summarize_chain = {"element": lambda x: x} | prompt | model | StrOutputParser()
    summarize_chain =  prompt | model | StrOutputParser()

    #Initialize empty summaries
    text_summaries = []
    table_summaries = []

    #Apply to text  if text are provided and summarization is requested
    if texts and summarize_texts:
        #text_summaries = summarize_chain.batch(texts, {"max_concurrency": 1})
        text_summaries = [summarize_chain.invoke(text) for text in texts]
    elif texts:
        text_summaries = texts

    # Apply to tables if tables are provided
    if tables:
        table_summaries = summarize_chain.batch(tables, {"max_concurrency": 1})

    return text_summaries, table_summaries


In [None]:
#This will raise some excessive calls to API but will finish
text_summaries, table_summaries = generate_text_summaries(texts, tables, summarize_texts=True)

In [None]:
print(len(text_summaries))
text_summaries[0:2]

In [None]:
def image_summirize(img_base64, prompt):
    """Make image summary"""
    model = ChatGoogleGenerativeAI(model='gemini-1.5-flash-latest', max_output_tokens=1024)
    msg = model(
        [
            HumanMessage(
            content=[
                {"type": "text", "text": prompt},
                {
                  "type": "image_url",
                  "image_url": {"url": f"data:image/jpeg;base64,{img_base64}"},
                },
            ]
            )
        ]
    )
    return msg.content


In [None]:
def generate_img_summaries(path):
    """
    Generate summaries and base64 encoded strings for images
    path: Path to list of .jpg files extracted by Unstructured
    """

    #Store base64 encoded images
    img_base64_list = []
    #Store image summaries
    image_summaries = []

    #Prompt
    prompt = """
    You are an assistant tasked with summarizing images for retrieval.
    These summaries  will be embedded and used to retrieve the raw image.
    Give concise summary of the image that is well optimized for retrieval.
    """

    #Apply to images
    for img_file in sorted(os.listdir(path)):
        if img_file.endswith('.jpg'):
            img_path =  os.path.join(path, img_file)
            base64_img = encode_image(img_path)
            img_base64_list.append(base64_img)
            image_summaries.append(image_summirize(base64_img, prompt))
    return img_base64_list, image_summaries

#Image summaries
img_base64_list, image_summaries = generate_img_summaries("./")

In [None]:
print(len(img_base64_list))
image_summaries


In [None]:
import uuid
from langchain.retrievers.multi_vector import MultiVectorRetriever
from langchain.storage import InMemoryStore
from langchain_google_genai.embeddings import GoogleGenerativeAIEmbeddings
#from langchain_community.vectorstores import Chroma
from langchain_mongodb.vectorstores import MongoDBAtlasVectorSearch
from langchain_core.documents import Document

In [None]:
#you need to set this env variable
os.environ["MONGODB_CONNECTION_STRING"] = userdata.get("MONGODB_CONNECTION_STRING")
#You can create a free cluster in atlas mongodb and you nedd to allow access
#from anywhere to acces the cluster from colab

from pymongo import MongoClient
from pymongo.server_api import ServerApi
import os
mongo_client = MongoClient(os.environ["MONGODB_CONNECTION_STRING"], server_api=ServerApi('1'))
try:
    mongo_client.admin.command('ping')
    print("Pinged your deployment. You successfully connected to MongoDB!")

except Exception as e:
    print(e)

In [None]:
def create_multi_vector_retriever(
    vectorstore, store, text_summaries, texts, table_summaries, tables, image_summaries, images
    ):
    """
    Create retriever that indexes summaries, but retruns raw image or texts
    """

    id_key = "doc_id"

    #Create the multivector retriever
    retriever = MultiVectorRetriever(
        vectorstore=vectorstore,
        docstore=store,
        id_key=id_key
    )

    #Helper function to add documents to the vectorstore and docstore
    def add_documents(retriever, doc_summaries, doc_contents):
        doc_ids = [str(uuid.uuid4()) for _ in doc_contents]
        summary_docs = [
            Document(page_content=s, metadata={id_key: doc_ids[i]})
            for i, s in enumerate(doc_summaries)
        ]
        retriever.vectorstore.add_documents(summary_docs)
        retriever.docstore.mset(list(zip(doc_ids, doc_contents)))

    ##Add texts, tables and images
    #Check that text_summaries is not empty before adding
    if text_summaries:
        add_documents(retriever, text_summaries, texts)
    #Check that table_summaries is not empty before adding
    if table_summaries:
        add_documents(retriever, table_summaries, tables)
    #Check that image_summares is not empty before adding
    if image_summaries:
        add_documents(retriever, image_summaries, images)

    return retriever


Now you need to create the database and collection both with same name "mm_rag_cj_blog"(thats what is used below)
https://gist.github.com/araujobma/d495097fe3d19fd9e5735b25b5903ff0


In [None]:
#Use the information below to create a vector search index in Atlas MongoDB(where you can create a free cluster - local mongodb community doesn't have search index)
# {
#         "fields": [
#             {
#               "type": "vector",
#               "path": "embedding",
#               "numDimensions": 768,
#               "similarity": "dotProduct"
#             },


#        ]
#    }

#    name="mm_rag_cj_blog_vector_index"

In [None]:
#The vectorstore to use to index the summaries
db_collection= mongo_client["mm_rag_cj_blog"]["mm_rag_cj_blog"]
vectostore = MongoDBAtlasVectorSearch(
    db_collection,
    GoogleGenerativeAIEmbeddings(model="models/embedding-001"),
    index_name='mm_rag_cj_blog_vector_index',
    relevance_score_fn='dotProduct'
)

#Initialize the storage layer
inmemory_store = InMemoryStore()

In [None]:
retriever_multi_vector_image = create_multi_vector_retriever(
    vectostore,
    inmemory_store,
    text_summaries,
    texts,
    table_summaries,
    tables,
    image_summaries,
    img_base64_list
)

In [None]:
import io
import re

from PIL import Image

def looks_like_base64(sb):
    """Check if the string  looks like base64"""
    return re.match("^[A-Za-z0-9+/]+[=]{0,2}$", sb) is not None

def is_image_data(b64data):
    """
    Check if the base64 data is an image by looking at the start of the data
    """

    image_signatures = {
            b"\xFF\xD8\xFF\xE0\x00\x10\x4A\x46\x49\x46": "JPEG",
            b"\x89PNG\r\n\x1a\n": "PNG",
            b"\x47\x49\x46\x38\x37\x61": "GIF",
            b"\x42\x4D": "BMP",
            b"\x49\x49\x2A\x00": "TIFF",
            b"\x52\x49\x46\x46": "WebP",
            b"\x00\x00\x01\x00": "ICO",
            b"\x49\x43\x43\x50": "ICNS",
        }
    try:
        header = base64.b64encode(b64data)[:8]
        for sig, format in image_signatures.items():
            if header.startswith(sig):
                return True
        return False
    except Exception:
        return False

def resize_base64_image(base64_string, size=(128,128)):
  """
  Resize an image encoded as  Base64 string
  """
  #Decode the Base64 string
  img_data = base64.b64decode(base64_string)
  img = Image.open(io.BytesIO(img_data))

  #Resize the image
  resized_image = img.resize(size, Image.LANCZOS)
  #Save the resized image to bytes buffer
  buffered =  io.BytesIO()
  resized_image.save(buffered, format=img.format)

  #Encode the resized image to Base64
  return base64.b64encode(buffered.getvalue()).decode('utf-8')


def split_image_text_types(docs):
    """
    Split base64-encoded images and texts
    """
    b64_images = []
    texts = []
    for doc in docs:
        if isinstance(doc, Document):
            doc = doc.page_content
        if looks_like_base64(doc) and is_image_data(doc):
            doc = resize_base64_image(doc, size=(1300,600))
            b64_images.append(doc)
        else:
            texts.append(doc)
    if len(b64_images) > 0:
        return {"images": b64_images[:1], "texts": []}
    return {"images": b64_images, "texts": texts}


def img_prompt_func(data_dict):
  """
  Join the context into a single string
  """
  formatted_texts = '\n'.join(data_dict["context"]["texts"])
  messages = []

  #Adding the text for analysis
  text_message = {
      "type": "text",
      "text": f"""
      You are financial analyst tasking with providing investiment advice.\n
      You will be given a mixed of text, tables, and image(s) usually of charts or graphs.\n
      Use this information to provide investiment advice related to user question. \n
      User-provided question: {data_dict['question']}\n\n
      {formatted_texts}"""


  }

  messages.append(text_message)
  # Adding image(s) to the messages if present
  if data_dict["context"]["images"]:
    for image in data_dict["context"]["images"]:
      image_message = {
          "type": "image_url",
          "image_url": {"url": f"data:image/jpeg;base64,{image}"},
      }
      messages.append(image_message)
  return [HumanMessage(content=messages)]

def multi_modal_rag_chain(retriever):
  #Multimodal LLM
  model = ChatGoogleGenerativeAI(
      temperature=0, model='gemini-1.5-flash-latest',
      max_output_tokens=1024
  )

  #RAG pipeline
  chain = (
      {
          'context': retriever | RunnableLambda(split_image_text_types),
          'question': RunnablePassthrough(),
      }
      | RunnableLambda(img_prompt_func)
      | model
      | StrOutputParser()
  )
  return chain

#Create RAG chain
chain_multimodal_rag = multi_modal_rag_chain(retriever_multi_vector_image)



In [None]:
query = "What are the EV/NTM and NTM rev growth for MongoDB, Cloudflare, and DataDog?"
docs = retriever_multi_vector_image.get_relevant_documents(query, limit=1)

docs

In [None]:
plt_img_base64(docs[0])

In [None]:
result = chain_multimodal_rag.invoke(query)

In [None]:
from IPython.display import Markdown as md
md(result)

In [None]:
result = chain_multimodal_rag.invoke("What are the EV / NTM rev growth for adobe")
md(result)