#### Using GenAI Foundational Platform Endpoints for RAG (Using the SDK (accelerator.py))

Following is sample that shows how to build a RAG workflow using GenAI Foundational Platform Endpoints. This uses the SDK file (accelerator.py)

Before you begin, make sure you create a .env file in the same folder as the notebook, and have the following variables:

 
 COGNITO_CLIENT_ID='<replace_me>'

 COGNITO_CLIENT_SECRET='<replace_me>'

 COGNITO_USER_POOL_ID='<replace_me>'

 COGNITO_REGION='<replace_me>'

 COGNITO_DOMAIN='<replace_me>'
 
 PLATFORM_API_URL='<replace_me>'




***Note .env file is only needed when running a notebook. In a real application deployed to EC2 or container, you can just create environment variables. (For example using export command)***

***

Install the packages

In [None]:
pip install -r reqs.txt

Import the SDK from accelerator.

In [None]:
from accelerator import GenerativeAIAccelerator
import os
import dotenv
import pprint
# Load the environment variables. This is only necessary if you are using a .env file to store your credentials.
dotenv.load_dotenv()

In [None]:
# Initialize the Accelerator and services sdk
accelerator = GenerativeAIAccelerator()
_health = accelerator.health_service
_model = accelerator.model_service
_document = accelerator.document_service
_vectors = accelerator.vector_service
_prompt = accelerator.prompt_service

Getting the acceleraor instance and checking the service status

In [None]:

services = ['model', 'document', 'prompt', 'vector']
for service in services:
    print(service+":"+_health.check_health(service)['status'])

#### Listing Models

In [None]:
list_models = _model.list_models()
print(list_models)

#### Invoke Model

Simple Text Prompt

In [None]:

model_name="ANTHROPIC_CLAUDE_V2", 
prompt="Translate the following text to French: 'Hello, how are you?'", 
max_tokens=100, 
temperature=0.7, 
top_p=0.9, 
top_k=50, 
stop_sequences=["\\n"] 
response = _model.invoke_model(model_name="ANTHROPIC_CLAUDE_V2", prompt="Translate the following text to French: 'Hello, how are you?'", max_tokens=100, temperature=0.7, top_p=0.9, top_k=50, stop_sequences=["\\n"])
print(response)


Messages API

In [None]:

prompt = [ 
        { 
            "role": "user", 
            "content": [{"text": "What is the weather like today?"}] 
        }, 
        { 
            "role": "assistant", 
            "content": [{"text": "The weather is sunny with a high of 25°C."}] 
        } 
    ]

system_prompts = [ 
        { 
            "text": "You are a helful assistant." 
        } 
    ] 

response = _model.invoke_model(model_name="ANTHROPIC_CLAUDE_V2", prompt=prompt, max_tokens=100, temperature=0.7, top_p=0.9, top_k=50, stop_sequences=["\\n"], system_prompts=system_prompts)
print(response)

#### Embed 

In [None]:
model_name = "TITAN_TEXT_EMBED_V2", 
input_text = "Hello, how are you?" 
response = _model.invoke_embed(model_name="TITAN_TEXT_EMBED_V2", input_text="Hello, how are you?")
print(response)

#### Document Extraction

Create Extraction Job

In [None]:
extraction_job = _document.create_extraction_job()
pprint.pprint(extraction_job)

Register Files to the Job

In [None]:
file_name = '<REPLACE_WITH_YOUR_FILE_PATH>' # eg. 'data/sample.pdf'
response = _document.register_file_for_extraction(extraction_job_id=extraction_job['extraction_job_id'], file_name=file_name)
pprint.pprint(response)

Upload the files using presigned urls

In [None]:
## Upload the file to the S3 bucket
pre_signed_url = response['upload_url']
import requests
with open(file_name, 'rb') as f:
    response = requests.put(pre_signed_url, data=f)
    print(response.status_code)

Start Extraction Job

In [None]:
response = _document.start_extraction_job(extraction_job_id=extraction_job['extraction_job_id'])
pprint.pprint(response)

Check Extraction Job Status

In [None]:
import time
response = _document.get_extraction_job_status(extraction_job_id=extraction_job['extraction_job_id'])
job_status = response['status']
print(job_status)
while job_status != 'COMPLETED' and job_status != 'FAILED' and job_status != 'COMPLETED_WITH_ERRORS':
    response = _document.get_extraction_job_status(extraction_job_id=extraction_job['extraction_job_id'])
    job_status = response['status']
    print(job_status)
    time.sleep(5)
pprint.pprint(response)

Get Extracted Text

In [None]:
response = _document.get_file_status(extraction_job_id=extraction_job['extraction_job_id'], file_name=file_name)
text = requests.get(response['result_url']).json()
print(text)

#### Chunking

Create a chunking job

In [None]:
chunking_params = {
    "chunk_size": 400,
    "chunk_overlap": 100
}
chunk_job = _document.create_chunking_job(extraction_job_id=extraction_job['extraction_job_id'], chunking_strategy='fixed_size', chunking_params=chunking_params)
pprint.pprint(chunk_job)

Check Chunking Job Status

In [None]:
response = _document.get_chunking_job_status(job_id=chunk_job['chunking_job_id'])
pprint.pprint(response)
while response['status'] != 'COMPLETED' and response['status'] != 'FAILED' and response['status'] != 'COMPLETED_WITH_ERRORS':
    response = _document.get_chunking_job_status(job_id=chunk_job['chunking_job_id'])
    pprint.pprint(response)
    time.sleep(5)

pprint.pprint(response)

Get Chunks

In [None]:
response = _document.get_chunking_results(chunking_job_id=chunk_job['chunking_job_id'], file_name=file_name)
pprint.pprint(response)

# Get the chunked file
chunk_file_text = requests.get(response['chunk_file_url'])
print(chunk_file_text.status_code)
pprint.pprint(chunk_file_text.json())

#### Vectorization

Create Vector Store

In [None]:
vector_store = _vectors.create_vector_store(store_name="SolarSystem", store_type="opensearchserverless", description="Collection for storing vectorized documents", tags=[{"key": "project", "value": "GenerativeAI"}])
pprint.pprint(vector_store)

Check Vector Store Status

In [None]:
response = _vectors.get_vector_store_status(store_id=vector_store['store_id'])
pprint.pprint(response)

while response['status'] != 'ACTIVE':
    response = _vectors.get_vector_store_status(store_id=vector_store['store_id'])
    pprint.pprint(response)
    time.sleep(5)

pprint.pprint(response)

Create Index

In [None]:
index_name = "my_index"
vector_index = _vectors.create_vector_index(store_id=vector_store['store_id'], index_name=index_name)
pprint.pprint(vector_index)

Check if index is ACTIVE

In [None]:

index_status = _vectors.get_vector_index_status(index_id=vector_index['index_id'])
while index_status['status'] != 'ACTIVE':
    index_status = _vectors.get_vector_index_status(index_id=vector_index['index_id'])
    pprint.pprint(index_status)
    time.sleep(5)

pprint.pprint(index_status)

Vectorize Chunks

In [None]:
vectorize_job = _vectors.vectorize(chunking_job_id=chunk_job['chunking_job_id'], index_id=vector_index['index_id'])
pprint.pprint(vectorize_job)

Check Vectorization Job Status

In [None]:
vectorize_job = _vectors.get_vectorize_job_status(vectorize_job_id=vectorize_job['vectorize_job_id'])
while vectorize_job['status'] != 'COMPLETED' and vectorize_job['status'] != 'FAILED' and vectorize_job['status'] != 'COMPLETED_WITH_ERRORS':
    vectorize_job = _vectors.get_vectorize_job_status(vectorize_job['vectorize_job_id'])
    pprint.pprint(vectorize_job)
    time.sleep(5)

pprint.pprint(vectorize_job)

Semantic Search

In [None]:
response = _vectors.semantic_search(query="<REPLACE_WITH_YOUR_QUERY>", index_id=vector_index['index_id'])
pprint.pprint(response)

#### Retrieval Augmented Generation

In [None]:

question = "<REPLACE_WITH_YOUR_QUERY>"
prompt = """
           You are a helpful assistant. Given a context, answer the following question.
           Context: {context}
           Question: {question}
           Answer:
           """
# Vector search
response = _vectors.semantic_search(query=question, index_id=vector_index['index_id'])
print(response)
context_text = ""
for hit in response:
    context_text += hit['text'] + ' '
    context_text += " "
final_prompt = prompt.format(context=context_text, question=question)

response = _model.invoke_model(model_name="ANTHROPIC_CLAUDE_V2", prompt=final_prompt, max_tokens=100, temperature=0.7, top_p=0.9, top_k=50, stop_sequences=["\\n"])
print(response)