# Multimodal RAG with Amazon Bedrock Knowledge Base and AOSS

In today's data-driven world, the ability to effectively retrieve and generate relevant information from diverse sources is becoming increasingly crucial. Retrieval-Augmented Generation (RAG) models have emerged as a powerful approach to tackle this challenge, combining the strengths of retrieval systems and language models. In this blog post, we'll explore how to build a multimodal RAG solution using Amazon Bedrock Knowledge Base, Claude 3 (an AI assistant), and Amazon OpenSearch Serverless.

Amazon Bedrock Knowledge Base is a fully managed service that allows you to create and manage private knowledge bases from various data sources, including text, images, and videos. By leveraging Bedrock, we can create a centralized repository of information tailored to our specific needs.

Anthropics Claude 3, an advanced visual language model available on Amazon Bedrock, brings its powerful visual and natural language processing capabilities to the table. With its ability to understand and generate human-like text, Claude 3 can effectively process and synthesize information from the knowledge base.

Amazon OpenSearch Serverless, a fully managed service based on OpenSearch (a fork of Elasticsearch), provides a scalable and cost-effective solution for indexing and searching our knowledge base. Its serverless architecture ensures seamless scaling and eliminates the need for infrastructure management.

By combining these three components, we can create a multimodal RAG solution that can retrieve relevant information from our knowledge base, process it using Claude 3, and generate coherent and contextual responses. This approach has numerous applications, ranging from customer support and knowledge management to research and content creation.

In the following sections, we'll walk through the steps to set up and configure each component, integrate them into a cohesive solution, and demonstrate its capabilities with a practical example.

## Prepare environment

In [1]:
%pip install streamlit --upgrade
!pip install streamlit_jupyter --upgrade

Looking in indexes: https://pypi.org/simple, https://pip.repos.neuron.amazonaws.com
Note: you may need to restart the kernel to use updated packages.
Looking in indexes: https://pypi.org/simple, https://pip.repos.neuron.amazonaws.com
Collecting streamlit_jupyter
  Downloading streamlit_jupyter-0.2.1-py3-none-any.whl.metadata (8.1 kB)
Collecting fastcore (from streamlit_jupyter)
  Downloading fastcore-1.5.29-py3-none-any.whl.metadata (3.5 kB)
Collecting ipywidgets==7.7.2 (from streamlit_jupyter)
  Downloading ipywidgets-7.7.2-py2.py3-none-any.whl.metadata (1.9 kB)
Collecting jupyter (from streamlit_jupyter)
  Downloading jupyter-1.0.0-py2.py3-none-any.whl.metadata (995 bytes)
Collecting stqdm (from streamlit_jupyter)
  Downloading stqdm-0.0.5-py3-none-any.whl.metadata (3.0 kB)
Collecting widgetsnbextension~=3.6.0 (from ipywidgets==7.7.2->streamlit_jupyter)
  Downloading widgetsnbextension-3.6.6-py2.py3-none-any.whl.metadata (1.3 kB)
Collecting jupyterlab-widgets<3,>=1.0.0 (from ipywidge

In [2]:
# restart kernel
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")

In [3]:
%load_ext autoreload
%autoreload 2

## Import needed packages

In [4]:
import streamlit as st
from streamlit_jupyter import StreamlitPatcher, tqdm

In [5]:
sp = StreamlitPatcher()
sp.jupyter()  # register patcher with streamlit

## Start Streamlit UI

In [6]:
st.set_page_config(page_title="MM-RAG Demo",page_icon="🩺",layout="wide")
st.title("Multimodal Demo")

# Multimodal Demo

## Integrate with mmRAG

### ST sidebar

In [11]:
#@st.cache_data
#@st.cache_resource
st.title(':orange[Multimodal Config] :pencil2:')
option = st.selectbox('Choose Model',('anthropic.claude-3-haiku-20240307-v1:0', 
                                      'anthropic.claude-3-sonnet-20240229-v1:0', 
                                      'mistral.mistral-large-2402-v1:0'))

st.write("------- Default parameters ----------")
temperature = st.number_input("Temperature", min_value=0.0, max_value=1.0, value=0.1, step=0.05)
max_token = st.number_input("Maximum Output Token", min_value=0, value=1024, step=64)
top_p = st.number_input("Top_p: The cumulative probability cutoff for token selection", min_value=0.1, value=0.85)
top_k = st.number_input("Top_k: Sample from the k most likely next tokens at each step", min_value=1, value=40)
#candidate_count = st.number_input("Number of generated responses to return", min_value=1, value=1)
stop_sequences = st.text_input("The set of character sequences (up to 5) that will stop output generation", value="\n\n\n")

# :orange[Multimodal Config] :pencil2:

Dropdown(description='Choose Model', options=('anthropic.claude-3-haiku-20240307-v1:0', 'anthropic.claude-3-so…

------- Default parameters ----------

Textarea(value='\n\n\n', description='The set of character sequences (up to 5) that will stop output generatio…

### ST body

In [10]:
if "messages" not in st.session_state:
    st.session_state["messages"] = [{"role": "assistant", "content": "I am your assistant. How can I help today?"}]

for msg in st.session_state.messages:
    st.chat_message(msg["role"]).write(msg["content"])

AttributeError: st.session_state has no attribute "messages". Did you forget to initialize it? More info: https://docs.streamlit.io/library/advanced-features/session-state#initialization

## Test ReAct Agent

In [1]:
from langchain import hub
from langchain.agents import AgentExecutor, create_react_agent, initialize_agent, AgentType,load_tools
#from langchain_community.tools.tavily_search import TavilySearchResults
#from langchain.utilities.tavily_search import TavilySearchAPIWrapper
from langchain_community.embeddings import BedrockEmbeddings
from langchain_community.utilities.serpapi import SerpAPIWrapper
#from langchain_community.chat_models import BedrockChat
from langchain_aws import ChatBedrock
import os
import boto3
#os.environ['TAVILY_API_KEY'] = os.getenv('tavily_api_token')
os.environ['SERPAPI_API_KEY'] = os.getenv('serp_api_token')

tavily_search = TavilySearchAPIWrapper()
g_params = {
    "engine": "google", 
    "gl": "us",
    "hl": "en",
    "num_results": 5,
}
b_params = {
    "engine": "bing",
    "gl": "us",
    "hl": "en",
    "num_results": 5,
}
google_search = SerpAPIWrapper(params=g_params)
bing_search = SerpAPIWrapper(params=b_params)

In [2]:
def config_bedrock(embedding_model_id, model_id, max_tokens, temperature, top_p, top_k):
    bedrock_client = boto3.client('bedrock-runtime')
    embedding_bedrock = BedrockEmbeddings(client=bedrock_client, model_id=embedding_model_id)
    model_kwargs =  { 
        "max_tokens": max_tokens,
        "temperature": temperature,
        "top_k": top_k,
        "top_p": top_p,
        #"stop_sequences": ["\n\nHuman"],
    }
    chat = ChatBedrock(
        model_id=model_id, client=bedrock_client, model_kwargs=model_kwargs
    )

    return chat, embedding_bedrock
    
# Get the prompt to use - you can modify this!
prompt = hub.pull("hwchase17/react")

# Choose the LLM to use
llm, embedding = config_bedrock('amazon.titan-embed-g1-text-02', 'anthropic.claude-3-haiku-20240307-v1:0', max_tokens=1024, temperature=0.01, top_p=0.90, top_k=40)

# Set up tools
#tool1 = [TavilySearchResults(max_results=3, api_wrapper=tavily_search)]
tool2 = load_tools(["serpapi"], llm=llm)

# Construct the ReAct agent
agent = create_react_agent(llm, tool2, prompt)

# Create an agent executor by passing in the agent and tools
agent_executor = AgentExecutor(agent=agent, tools=tool2, verbose=True, handle_parsing_errors=True)


In [8]:
%%time
query = "Does generative AI impact Anyscale on their market cap?"
results = agent_executor.invoke({"input": query})



[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3mQuestion: Does generative AI impact Anyscale on their market cap?
Thought: To answer this question, I will need to gather information about Anyscale and how generative AI might impact their business and market cap.
Action: Search
Action Input: Anyscale market cap[0m[36;1m[1;3m["Anyscale's valuation in December 2021 was $1,000M.", "The current revenue for Anyscale is 00000. How much funding has Anyscale raised over time? Anyscale has raised $260M. Who are Anyscale's investors?", 'Anyscale accelerates the development and productionization of any AI app, on any cloud, at any scale.', 'Very Low Risk. >$1B in revenue. Large Market Cap. +6. Repeat Unicorn Founder. Repeat Unicorn Founder. Category Leader. Fundraised Since Market Downturn. Backed ...', 'SAN FRANCISCO - Anyscale, the distributed programming platform company, today announced $20.6M in Series A funding, led by Andreessen Horowitz (a16z) with ...', 'Market cap. -. Net

In [10]:
results['output']

"Based on the information gathered, generative AI technologies could have a positive impact on Anyscale's market cap by improving the efficiency and productivity of AI application development and deployment. Generative AI models can automate many repetitive tasks, allowing developers to focus on more high-level, creative aspects of the development process. This could make Anyscale's platform and services more valuable to customers, potentially driving increased adoption and revenue growth. However, the actual impact on Anyscale's market cap would depend on a variety of factors and cannot be definitively determined from the information available."

In [196]:
# Initialize the agent
agent = initialize_agent(
    tool2,
    llm,
    agent="structured-chat-zero-shot-react-description",#"zero-shot-react-description",
    verbose=True,
    handle_parsing_errors=True
)

# Execute the agent with a query
result = agent.run(query)
result



[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3mOkay, let's see what I can find about generative AI consultation companies in the San Francisco Bay Area.

Action:
{
  "action": "Search",
  "action_input": "generative AI consultation companies in SF Bay Area"
}

[0m

[1m> Finished chain.[0m


'Okay, let\'s see what I can find about generative AI consultation companies in the San Francisco Bay Area.\n\nAction:\n{\n  "action": "Search",\n  "action_input": "generative AI consultation companies in SF Bay Area"\n}\n\n'

In [143]:
def extract_keywords(input_string):
    # Remove punctuation and convert to lowercase
    input_string = input_string.translate(str.maketrans('', '', string.punctuation)).lower()
    # Split the string into words
    words = input_string.split()
    # Define a regular expression pattern to match web searchable keywords
    pattern = r'^[a-z0-9]+$'
    # Filter out non-keyword words
    keywords = [word for word in words if re.match(pattern, word)]
    # Join the keywords with '+'
    output_string = '+'.join(keywords)
    return output_string

from langchain.agents import initialize_agent, AgentType
from langchain.utilities import GoogleSearchAPIWrapper, BingSearchAPIWrapper
from langchain_community.tools import DuckDuckGoSearchRun


gsearch = GoogleSearchAPIWrapper(google_api_key=os.getenv("google_api_key"), google_cse_id=os.getenv("google_cse_id"))
params = {
    "num_results": 3,  # Number of results to return
    #"exclude": "youtube.com"  # Exclude results from YouTube
}
google_results =  gsearch.results(query, **params)
#bing_search = BingSearchAPIWrapper()
#duckduckgo_search = DuckDuckGoSearchWrapper()

tools = [
    google_search,
    tavily_search,
    serp_search
]

agent = initialize_agent(tools, llm, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, verbose=True)
agent.run("What is the capital of France?")

In [5]:
from serpapi import BingSearch, GoogleSearch
params = {"engine": "google", "q": query, "api_key": os.getenv("serp_api_token"), "num_results": 3}
#bing_search = BingSearch(params)
goog_search = GoogleSearch(params)

In [7]:
%%time
data = goog_search.get_dict()
urls = []
for i in range(10):
    url = data['organic_results'][i]['link']
    if 'youtube.com' not in url:
        print(url)
        urls.append(url)

https://www.anyscale.com/resources/case-study/how-canva-built-a-modern-ai-platform-using-anyscale
https://finance.yahoo.com/news/why-generative-ai-orchestration-startups-000000089.html
https://ashugarg.substack.com/p/incumbents-vs-startups-the-showdown?utm_source=profile&utm_medium=reader2
https://www.linkedin.com/pulse/reflection-how-businesses-actually-using-generative-ai-berkeleyhbsa-du6jc
https://pixelplex.io/blog/generative-ai-market-map/
https://www.v7labs.com/blog/generative-ai-guide
https://www.techtarget.com/searchenterpriseai/definition/generative-AI
https://foundationcapital.com/year-one-of-generative-ai-six-key-trends/
https://www.globenewswire.com/news-release/2024/03/18/2848229/0/en/Anyscale-Teams-Up-With-NVIDIA-to-Scale-Generative-AI-Models-Into-Production.html
CPU times: user 14.9 ms, sys: 0 ns, total: 14.9 ms
Wall time: 351 ms


In [17]:
%%time
data = bing_search.get_dict()
urls = []
for i in range(3):
    print(data['organic_results'][i]['link'])
    urls.append(data['organic_results'][i]['link']))

https://www.builtinsf.com/companies/type/generative-ai-companies
https://www.builtinsf.com/articles/ai-companies-san-francisco-bay-area
https://www.ycombinator.com/companies/industry/generative-ai/san-francisco-bay-area
CPU times: user 9.47 ms, sys: 5.95 ms, total: 15.4 ms
Wall time: 360 ms


## Use AsyncHtmlLoader to scrap the web content
The AsyncHtmlLoader uses the aiohttp library to make asynchronous HTTP requests, suitable for simpler and lightweight scraping.

In [27]:
from langchain_community.document_loaders import AsyncHtmlLoader
loader = AsyncHtmlLoader(urls)

In [28]:
%%time
docs = loader.load()

Fetching pages: 100%|#####################################################################################################################################| 10/10 [00:01<00:00,  7.44it/s]


CPU times: user 4.39 s, sys: 15.4 ms, total: 4.41 s
Wall time: 8.77 s


In [40]:
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.document_loaders.chromium import AsyncChromiumLoader
from langchain.chains import create_extraction_chain
import nest_asyncio

schema = {
    "properties": {
        "news_article_title": {"type": "string"},
        "news_article_summary": {"type": "string"},
    },
    "required": ["news_article_title", "news_article_summary"],
}


def extract(content: str, schema: dict):
    return create_extraction_chain(schema=schema, llm=llm).run(content)
    
def scrape_with_playwright(urls, schema):
    loader = AsyncChromiumLoader(urls)
    nest_asyncio.apply()
    docs = loader.load()
    bs_transformer = BeautifulSoupTransformer()
    docs_transformed = bs_transformer.transform_documents(
        docs, tags_to_extract=["span"]
    )
    print("Extracting content with LLM")

    # Grab the first 1000 tokens of the site
    splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
        chunk_size=1000, chunk_overlap=0
    )
    splits = splitter.split_documents(docs_transformed)

    # Process the first split
    extracted_content = extract(schema=schema, content=splits[0].page_content)
    pprint.pprint(extracted_content)
    return extracted_content

In [41]:
extracted_content = scrape_with_playwright(urls, schema=schema)

Exception: Connection closed while reading from the driver

In [30]:
# Try to get vectordb with FAISS
from langchain_community.vectorstores import FAISS

db = FAISS.from_documents(docs, embedding)
retriever = db.as_retriever(search_kwargs={"k": doc_num})


def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

messages = [
    ("system", """Your are a helpful assistant to provide comprehensive and truthful answers to questions, \n
                drawing upon all relevant information contained within the specified in {context}. \n 
                You add value by analyzing the situation and offering insights to enrich your answer. \n
                Simply say I don't know if you can not find any evidence to match the question. \n
                """),
    #MessagesPlaceholder(variable_name="chat_history"),
    ("human", "{question}"),
]
prompt_template = ChatPromptTemplate.from_messages(messages)

# Reranker
compression_retriever = ContextualCompressionRetriever(
    base_compressor= FlashrankRerank(), base_retriever=retriever
)

rag_chain = (
    #{"context": compression_retriever | format_docs, "question": RunnablePassthrough()}
    #| RunnableParallel(answer=hub.pull("rlm/rag-prompt") | chat |format_docs, question=itemgetter("question") ) 
    RunnableParallel(context=compression_retriever | format_docs, question=RunnablePassthrough() )
    | prompt_template
    | chat
    | StrOutputParser()
)

results = rag_chain.invoke(query)

ValueError: Error raised by inference endpoint: An error occurred (ValidationException) when calling the InvokeModel operation: Malformed input request: expected maxLength: 50000, actual: 190567, please reformat your input and try again.

In [23]:
import re
aa = 'how to add hub.pull("hwchase17/react") into ChatPromptTemplate?'
new_text = re.sub(r'/', '\/', aa)
new_text

'how to add hub.pull("hwchase17\\/react") into ChatPromptTemplate?'