# Bedrock Knowledge Base Retrieval and Generation for ReVIEW

In [1]:
import json

import sys

from pydantic import BaseModel

sys.path.append("../frontend/")
from components.bedrock_utils import get_bedrock_client


In [2]:
FOUNDATION_MODEL = "anthropic.claude-3-sonnet-20240229-v1:0"
REGION_NAME = "us-east-1"
KNOWLEDGE_BASE_ID = "P1YXM9ZYUA"
NUM_CHUNKS = 5
USERNAME = "demouser"
MEDIA_NAME = None
QUERY = "Did they mention Nvidia?"


In [3]:
# Used for retrieval
bedrock_agent_runtime_client = get_bedrock_client(region=REGION_NAME, agent=True)

# Used for generation
bedrock_client = get_bedrock_client(region=REGION_NAME, agent=False)


Create new client
  Using region: us-east-1
Found credentials in shared credentials file: ~/.aws/credentials
boto3 Bedrock client successfully created!
bedrock_client._endpoint=bedrock-agent-runtime(https://bedrock-agent-runtime.us-east-1.amazonaws.com)
Create new client
  Using region: us-east-1
Found credentials in shared credentials file: ~/.aws/credentials
boto3 Bedrock client successfully created!
bedrock_client._endpoint=bedrock-runtime(https://bedrock-runtime.us-east-1.amazonaws.com)


In [4]:
def retrieve(agent_client, query, username, media_name, num_chunks):
    # Always filter on username to prevent people from querying other users' data
    # Optionally filter on media name if user wants to chat with just one media file
    username_filter = {"equals": {"key": "username", "value": username}}
    if not media_name:
        retrieval_filter = username_filter
    else:
        retrieval_filter = {
            "andAll": [
                username_filter,
                {"equals": {"key": "media_name", "value": media_name}},
            ]
        }

    retrieval_config = {
        "vectorSearchConfiguration": {
            "numberOfResults": num_chunks,
            "filter": retrieval_filter,
        },
    }

    res = agent_client.retrieve(
        knowledgeBaseId=KNOWLEDGE_BASE_ID,
        retrievalConfiguration=retrieval_config,
        retrievalQuery={"text": query},
    )

    return res

In [5]:
res = retrieve(
    agent_client=bedrock_agent_runtime_client,
    query=QUERY,
    username=USERNAME,
    media_name=MEDIA_NAME,
    num_chunks=NUM_CHUNKS,
)

In [6]:
kazu = res["retrievalResults"]
print(f"{len(kazu)} chunks retrieved.")
c0 = kazu[0]
print("First chunk:")
print(f"  Text: {c0['content']['text'][:50]} ...")
print(f"  Score: {c0['score']}")
print(f"  Location: {c0['location']['s3Location']['uri']}")
print(f"  Custom Meta: {c0['metadata']['media_name']}")

5 chunks retrieved.
First chunk:
  Text: of the meeting. [117] If this is a recording of a  ...
  Score: 0.36510268
  Location: s3://review-dev-339712833620-assets/transcripts-txt/demouser/379ebdf7-53ab-4f4a-9a19-34b55bc33d1b.txt
  Custom Meta: test-vid1.mp4


In [7]:
def build_chunks_string(retrieve_response: dict) -> str:
    """Build a single string from retrieved chunks like:
    <chunk_1>
    <media_name>
    foo-bar-vid.mp4
    </media_name>
    <transcript>
    [0] blah blah [12] blah blah blah
    </transcript>
    </chunk_1>
    <chunk_2>
    ...
    """
    chunks_string = ""
    for i, chunk in enumerate(retrieve_response["retrievalResults"]):
        chunks_string += f"<chunk_{i+1}>\n<media_name>\n{chunk['metadata']['media_name']}\n</media_name>\n<transcript>\n{chunk['content']['text']}\n</transcript>\n</chunk_{i+1}>\n\n"
    return chunks_string

In [8]:
def generate(br_client, model_id, query, retrieval_response, **kwargs) -> str:
    SYSTEM_PROMPT = """You are an intelligent AI which attempts to answer questions based on retrieved chunks of automatically generated transcripts."""

    MESSAGE_TEMPLATE = """
I will provide you with retrieved chunks of transcripts. The user will provide you with a question. Using only information in the provided transcript chunks, you will attempt to answer the user's question.

Each chunk may or may not be relevant to answering the question. Each chunk will include a <media_name> block which contains the parent file that the transcript came from. Each line in the transcript chunk begins with an integer timestamp (in seconds) within square brackets, followed by a transcribed sentence. When answering the question, you will need to provide the timestamp you got the answer from.

Here are the retrieved chunks of transcripts in numbered order:

<transcript_chunks>
{chunks}
</transcript_chunks>

When you answer the question, your answer must include a parsable json string contained within <json></json> tags. The json should have one top level key, "answer", whose value is a list. Each element in the list represents a portion of the full answer, and should have two keys: "partial_answer", is a part of your answer to the user's question, and "citations" which is a list of dicts which contain a "media_name" key and a "timestamp" key, which correspond to the resources used to answer that part of the question. For example, if you got this partial_answer from only one chunk, then the "citations" list will be only one element long, with the media_name of the chunk from which you got the partial_answer, and the relevant timestamp within that chunk's transcript. If you used information from three chunks for this partial_answer, the "citations" list will be three elements long. For multi-part answers, the partial_answer list will be multiple elements long.

The final answer displayed to the user will be all of the partial_answers concatenated. Make sure that you format your partial answers appropriately. For example, if your response has two partial answers which are meant to be displayed as a comma separated list, the first partial_answer should be formatted like "partial_answer": "The two partial answers are this" and the second partial_answer should be formatted like "partial_answer": ", and this.". Similarly, if your partial answers are meant to be a bulleted list, the first partial answer may look like "partial_answer": "The partial answers are:\\n- First partial answer" and "partial_answer": "\\n- Second partial answer". Note the newline character at the beginning of the second partial_answer for final display purposes.

For example, if your answer is in two parts, the first part coming from two chunks, the second part coming from one chunk, your answer will have this structure:
<json>
{{"answer": [ {{"partial_answer": "This is the first part to the answer.", "citations": [{{"media_name": "media_file_foo.mp4", "timestamp": 123}}, {{"media_name": "media_file_bar.mp4", "timestamp": 345}}]}}, {{"partial_answer": " This is the second part to the answer.", "citations": [{{"media_name": "blahblah.wav", "timestamp": 83}}]}} ] }}
</json>

Notice the space at the beginning of the second partial_answer string, " This is...". That space is important so when the partial_answers get concatenated they will be readable, like "This is the first part to the answer. This is the second..."

If you are unable to answer the question using information provided in any of the chunks, your response should include no citations like this:
<json>
{{"answer": [ {{"partial_answer": "I am unable to answer the question based on the provided media file(s).", "citations": []}} ] }}
</json>

Here is the user's question:
<question>
{query}
</question>

Now write your json response in <json> </json> brackets like explained above. Make sure the content between the brackets is json parsable, e.g. escaping " marks inside of strings and so on.
    """

    chunks_str = build_chunks_string(retrieval_response)
    message_content = MESSAGE_TEMPLATE.format(query=query, chunks=chunks_str)

    body = {
        "system": SYSTEM_PROMPT,
        "messages": [{"role": "user", "content": message_content}],
        "anthropic_version": "",
        **kwargs,
    }
    response = br_client.invoke_model(modelId=model_id, body=json.dumps(body))
    response = json.loads(response["body"].read().decode("utf-8"))

    return response["content"][0]["text"]

In [9]:
from pydantic import BaseModel
from typing import List
import re
import json


class Citation(BaseModel):
    media_name: str
    timestamp: int


class PartialQAnswer(BaseModel):
    partial_answer: str
    citations: List[Citation]

    def pprint(self):
        print(f"LLMAnswer:\n Answer={self.answer}\n Citations={self.citations}")


class FullQAnswer(BaseModel):
    answer: List[PartialQAnswer]

    @classmethod
    def from_LLM_response(cls, generation_response: str) -> "FullQAnswer":
        """
        Create a FullQAnswer instance from an LLM response string containing JSON data.

        The JSON data should be enclosed between <json> and </json> tags.
        """
        pattern = r"<json>\s*(.*?)\s*</json>"
        matches = re.findall(pattern, generation_response, re.DOTALL)
        if not matches:
            raise ValueError("No JSON data found between <json> and </json> tags")

        match = matches[0].strip("\n")
        try:
            data = json.loads(match)
            return cls(**data)
        except json.JSONDecodeError as e:
            raise ValueError(f"Invalid JSON data: {e}")
        except ValueError as e:
            raise ValueError(f"Error creating FullQAnswer instance: {e}")

    def pprint(self):
        result = ""
        citation_counter = 1
        citations = []

        for partial in self.answer:
            result += partial.partial_answer

            if partial.citations:
                citation_refs = []
                for citation in partial.citations:
                    citation_refs.append(f"[{citation_counter}]")
                    citations.append(
                        f"[{citation_counter}] http://fake_url.com/{citation.media_name}?start_time={citation.timestamp}"
                    )
                    citation_counter += 1
                result += "".join(citation_refs)

            result += "\n"

        result += "\nCitations:\n"
        result += "\n".join(citations)

        print(result)

In [10]:
#########################
# Full workflow example #
#########################

FOUNDATION_MODEL = "anthropic.claude-3-sonnet-20240229-v1:0"
REGION_NAME = "us-east-1"
KNOWLEDGE_BASE_ID = "P1YXM9ZYUA"
NUM_CHUNKS = 5
USERNAME = "demouser"
MEDIA_NAME = None

query = "What AWS services are mentioned?"
# query = "Do they mention Nvidia? Please provide exact quotes."

# Used for retrieval
bedrock_agent_runtime_client = get_bedrock_client(region=REGION_NAME, agent=True)
# Used for generation
bedrock_client = get_bedrock_client(region=REGION_NAME, agent=False)

retrieval_result = retrieve(
    agent_client=bedrock_agent_runtime_client,
    query=query,
    username=USERNAME,
    media_name=MEDIA_NAME,
    num_chunks=NUM_CHUNKS,
)

generate_result = generate(
    br_client=bedrock_client,
    model_id=FOUNDATION_MODEL,
    query=query,
    retrieval_response=retrieval_result,
    temperature=0.1,
    max_tokens=5000,
)

answer: FullQAnswer = FullQAnswer.from_LLM_response(generate_result)

Create new client
  Using region: us-east-1
Found credentials in shared credentials file: ~/.aws/credentials
boto3 Bedrock client successfully created!
bedrock_client._endpoint=bedrock-agent-runtime(https://bedrock-agent-runtime.us-east-1.amazonaws.com)
Create new client
  Using region: us-east-1
Found credentials in shared credentials file: ~/.aws/credentials
boto3 Bedrock client successfully created!
bedrock_client._endpoint=bedrock-runtime(https://bedrock-runtime.us-east-1.amazonaws.com)


In [11]:
answer.pprint()

The following AWS services are mentioned:

- Amazon Transcribe[1]

- DynamoDB for job tracking[2]

- Amazon Bedrock for large language models[3]

- Amazon Cognito for user authentication[4]

- Amazon SageMaker (mentioned as a feature announcement)[5]

Citations:
[1] http://fake_url.com/test-vid1.mp4?start_time=29
[2] http://fake_url.com/test-vid1.mp4?start_time=31
[3] http://fake_url.com/test-vid1.mp4?start_time=31
[4] http://fake_url.com/test-vid1.mp4?start_time=31
[5] http://fake_url.com/test-vid1.mp4?start_time=208
