In [None]:
!pip install requests-aws4auth

In [1]:
import boto3
import requests
import json
from requests_aws4auth import AWS4Auth

### Creating a new index in our OpenSearch Serverless collection

In [None]:
aoss_client = boto3.client('opensearchserverless')
coll_list_response = aoss_client.list_collections()

matching_collections = [
    collection for collection in coll_list_response.get("collectionSummaries", [])
    if collection['name'] == 'genai-vectors-collection'
]

assert len(matching_collections) == 1, "Expected exactly 1 collection with specified name"

collection = matching_collections[0]
collection_name = collection["name"]
collection_id = collection["id"]  # Get the collection ID

# Now, fetch the collection details using batch_get_collection
collection_details_response = aoss_client.batch_get_collection(ids=[collection_id])

# Extract collection endpoint
collection_detail = collection_details_response.get("collectionDetails", [])[0]
collection_endpoint = collection_detail.get("collectionEndpoint")

print(f"Found Collection: {collection['name']}")
print(f"Endpoint: {collection_endpoint}")

In [None]:
INDEX_NAME = "genai-vectors-collection"

index_mapping = {
  "settings": {
    "index.knn": True
  },
  "mappings": {
    "properties": {
      "vector-field": {
        "type": "knn_vector",
        "dimension": 1536,
        "method": {
          "engine": "faiss",
          "name": "hnsw"
        }
      },
      "text": {
        "type": "text"
      },
      "metadata": {
        "type": "keyword"
      }
    }
  }
}

# OpenSearch API URL for index creation
index_url = f"{collection_endpoint}/{INDEX_NAME}"

# OpenSearch requires authentication (IAM or Basic Auth)
headers = {
    "Content-Type": "application/json"
}

session = boto3.Session()
credentials = session.get_credentials()
aws_auth = AWS4Auth(
    credentials.access_key, credentials.secret_key,
    session.region_name, "aoss",
    session_token=credentials.token
)

# Send request to create index
response = requests.put(index_url, auth=aws_auth, headers=headers, data=json.dumps(index_mapping))

# Print response
print("Index Creation Response:", response.status_code, response.text)

### Creating a new CloudFormation stack

Note: requires AWSCloudFormationFullAccess

In [None]:
cf_client = boto3.client("cloudformation")

KB_STACK_NAME = "wshop-kb"
KB_YAML_FILE_PATH = "../gen-ai-cloudformation/knowledge-base.yaml"

with open(KB_YAML_FILE_PATH, "r") as file:
    template_body = file.read()

response = cf_client.create_stack(
    StackName=KB_STACK_NAME,
    TemplateBody=template_body,
    Capabilities=["CAPABILITY_IAM", "CAPABILITY_NAMED_IAM"],  # Add if IAM roles are defined in the template
)

print("Stack creation started:", response["StackId"])
response

#### Waiting for operation to complete

In [None]:
import time

max_time = 30


def wait_for_stack(stack_name):
    timeout = time.time() + max_time

    while time.time() < timeout:
        response = cf_client.describe_stacks(StackName=stack_name)
        stack_status = response["Stacks"][0]["StackStatus"]
        print(f"Stack Status: {stack_status}")

        if stack_status in ["CREATE_COMPLETE", "ROLLBACK_IN_PROGRESS", "ROLLBACK_COMPLETE", "CREATE_FAILED"]:
            break

        time.sleep(5)


# Monitor the stack creation process
wait_for_stack(KB_STACK_NAME)

### Uploading an example document to S3

In [None]:
!ls

In [11]:
s3_client = boto3.client("s3")
sts_client = boto3.client("sts")

account_id = sts_client.get_caller_identity()["Account"]

example_file_name = "wellarchitected-framework-pages-6-11.pdf"
BUCKET_NAME = "genai-workshop-docs-" + account_id
LOCAL_FILE_PATH = "../" + example_file_name
S3_OBJECT_KEY = "docs/" + example_file_name

s3_client.upload_file(LOCAL_FILE_PATH, BUCKET_NAME, S3_OBJECT_KEY)

### Starting a data source synchronization

In [None]:
bedrock_client = boto3.client("bedrock-agent")

KB_NAME = "gen-ai-workshop-kb"
DATA_SOURCE_NAME = "genai-workshop-kb-datasource"

kb_list = bedrock_client.list_knowledge_bases()
kb_id = None
for kb in kb_list["knowledgeBaseSummaries"]:
    if kb["name"] == KB_NAME:
        kb_id = kb["knowledgeBaseId"]
        break

if not kb_id:
    raise ValueError(f"Knowledge Base '{KB_NAME}' not found.")
else:
    print(f"Knowledge Base '{KB_NAME}' found with ID: {kb_id}")

In [None]:
ds_list = bedrock_client.list_data_sources(knowledgeBaseId=kb_id)
ds_id = None
for ds in ds_list["dataSourceSummaries"]:
    if ds["name"] == DATA_SOURCE_NAME:
        ds_id = ds["dataSourceId"]
        break

if not ds_id:
    raise ValueError(f"Data Source '{DATA_SOURCE_NAME}' not found in KB '{KB_NAME}'.")
else:
    print(f"Data Source '{DATA_SOURCE_NAME}' found")

sync_response = bedrock_client.start_ingestion_job(
    knowledgeBaseId=kb_id,
    dataSourceId=ds_id
)

print("Sync job started:", sync_response["ingestionJob"]["ingestionJobId"])