# Manage Bedrock Knowledge Bases with Boto3

## Overview of Steps

### Permissions

> [AWS Docs]()

1. Configure Permissions for OpenSearch Serverless
2. Configure Permissions for Bedrock (create policies and service role)
3. Configure Model Invocation Logging

### OpenSearch Servless Collection

[Docs](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/opensearchserverless/client/create_collection.html)

Add description
Add name
Set stand by replicas (enabled or disabled)
(Optional) Add Tags
Set type ('VECTORSEARCH')


### Bedrock Knowledge Base

#### Create Knowledge Base and attach service role

[Docs](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/bedrock-agent/client/create_knowledge_base.html)

Add name
Add description
Add Bedrock Service Role ARN
Add embedding model ARN to knowledge base configuration
Add collection ARN to storage configuration
Add vector index name to storage configuration
Add vector field, index field, and field mapping to storage configuraiton field mapping key
(Optional add tags)

#### Setup Knowledge Base

> [AWS Docs]()


#### Model Invocation Logging

> [AWS Docs](https://docs.aws.amazon.com/bedrock/latest/userguide/settings.html#model-invocation-logging)

##### Option 1: Set Up CloudWatch for Invocation Logging

1. S3 Bucket Destination for Invocation Logging
2. Attach an S3 bucket policy

##### Option 2: Set Up S3 Bucket for Invocation Logging

1. Create CloudWatch log group
2. Create IAM Role with permissions to access CloudWatch logs (Trusted entity)
3. Attach IAM Role policy (Role policy)

#### Set Up Data for Ingestion

> [AWS Docs](https://docs.aws.amazon.com/bedrock/latest/userguide/knowledge-base-setup.html)

#### Set Up and Verity the Bedrock API Setup

1. 
#### 

1. Configure the data sources to add to your knowledge base.
2. Upload your data to an Amazon S3 bucket.
3. Ingest your data by generating embeddings with a foundation model and storing them in a supported vector store.
4. Set up your application or agent to query the knowledge base and return augmented responses.


4. Create a Collection
5. 
6. 


### Configure Setup

#### Import Dependencies

In [None]:
import logging
import boto3
import json
from botocore.exceptions import ClientError

#### Set Variables

In [None]:
collection_description = 'OpenSearch PoC Collection'
collection_name = 'collection-poc-1'
collection_policy_string = json.dumps(collection_policy)
embed_model_arn = 'arn:aws:bedrock:us-east-1::foundation-model/amazon.titan-embed-text-v1'
kb_name = 'knowledge-base-poc-1'
kb_description = 'OpenSearch PoC Knowledge Base'
security_policy_description = 'OpenSearch Serverless PoC Security Policy'
security_policy = {
      "Rules": [
         {
            "ResourceType":"collection",
            "Resource": [
               f"collection/{collection_name}"
            ]
         }
      ],
}
security_policy_string = json.dumps(security_policy)
security_policy_name = 'security-policy-poc-1'
service_role_arn = 'arn:aws:iam::860100747351:role/service-role/AmazonBedrockExecutionRoleForKnowledgeBase_tqqps'
kb_config = {
    'type': 'VECTOR',
    'vectorKnowledgeBaseConfiguration': {
        'embeddingModelArn': embed_model_arn,
    }
}
service_role_arn = 'arn:aws:iam::860100747351:role/service-role/AmazonBedrockExecutionRoleForKnowledgeBase_1nor9'

#### Instantiate S3 Client

In [None]:
bedrock_agent_client = boto3.client('bedrock-agent')

#### Create or Update an S3 Bucket

In [None]:
# Attempts to create an S3 bucket. If it already exists, it updates the existing bucket.
# ref: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/create_bucket.html
def create_bucket(bucket_name: str, region: str = None):
    """Create an S3 bucket in a specified region

    If a region is not specified, the bucket is created in the S3 default
    region (us-east-1).

    :param bucket_name: Bucket to create
    :param region: String region to create bucket in, e.g., 'us-west-2'
    :return: True if bucket created, else False
    """
    try:
        if region is None:
            s3_client = boto3.client('s3')
            s3_client.create_bucket(Bucket=bucket_name)
        else:
            s3_client = boto3.client('s3', region_name=region)
            location = {'LocationConstraint': region}
            s3_client.create_bucket(Bucket=bucket_name,
                                    CreateBucketConfiguration=location)
    except ClientError as e:
        logging.error(e)
        return False
    return True

In [None]:
bucket_name = 'bedrock-greg'
create_bucket(bucket_name)

### List Buckets

In [None]:
s3 = boto3.client('s3')
response = s3.list_buckets()

# Output the bucket names
print('Does the bucket exist?')
bucket_exists = False
for bucket in response['Buckets']:
    if bucket['Name'] == bucket_name:
        print(f'It does! The bucket name is: {bucket["Name"]}')
        bucket_exists = True
        break

if not bucket_exists:
    print(f'Bucket {bucket_name} does not exist.')

### Create OpenSearch Serverless Collection

#### Instantiate the OpenSearch Serverless Client

In [None]:
opensearch_serverless_client = boto3.client('opensearchserverless')

#### List Security Policies

In [None]:
response = opensearch_serverless_client.list_security_policies(
    type='encryption'
)
print(json.dumps(response, indent=2))

In [None]:
# security_policy_response = opensearch_serverless_client.create_security_policy(
#             description=security_policy_description,
#             name=collection_name,
#             policy=security_policy_string,
#             type='encryption',
#         )
# print(security_policy_response)

security_policy_response = opensearch_serverless_client.update_security_policy(
                description=security_policy_description,
                name=collection_name,
                policy=security_policy_string,
                policyVersion='MTcwMTcyNzc2MDg3Nl8x',
                type='encryption',
            )
print(security_policy_response)

In [None]:
# Attempts to create a security policy. If it already exists, it updates the existing policy.
# For some reason, the docs don't say that the policy name must be unique when updating the policy
# but it does.
# ref: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/opensearchserverless/client/create_security_policy.html
# ref: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/opensearchserverless/client/update_security_policy.html
from botocore.exceptions import ValidationError


class SecurityPolicyNotFoundError(Exception):
    def __init__(self, collection_name: str):
        self.message = json.dumps({
            "error": f"No security policy found for {collection_name}. Unable to update security policy."
        })
        super().__init__(self.message)
        

def create_or_update_security_policy(
    description: str,
    name: str,
    policy: str,
    type: str = 'encryption',
    ) -> json:
    """
    This function creates or updates a security policy for a given OpenSearch Serverless collection.

    Parameters:
    collection_name (str): The name of the collection for which the security policy is being created or updated.
    policy_string (str): The policy string that defines the security policy.
    description (str): A description of the security policy.
    type (str, optional): The type of the security policy. Defaults to 'encryption'.

    Returns:
    json: The response from the create or update security policy operation.
    """
    try:
        security_policy_response = opensearch_serverless_client.create_security_policy(
            description=description,
            name=name,
            policy=policy,
            type=type,
        )
        json.dumps(security_policy_response["securityPolicyDetail"], indent=4)
        return json.dumps(security_policy_response["securityPolicyDetail"], indent=4)
    except opensearch_serverless_client.exceptions.ConflictException:
        print("Security policy already exists. Updating the existing policy...")
        security_policy_response = opensearch_serverless_client.list_security_policies(
            type='encryption'
        )
        sp_policy_name = None
        sp_policy_version = None
        for sp_policy in security_policy_response["securityPolicySummaries"]:
            if sp_policy['name'] == collection_name:
                sp_policy_name = sp_policy['name']
                sp_policy_version = sp_policy['policyVersion']
                break
        if collection_name is None:
            raise SecurityPolicyNotFoundError(collection_name)
        else:
            try:
                security_policy_response = opensearch_serverless_client.update_security_policy(
                    description=description,
                    name=sp_policy_name,
                    policy=security_policy_string,
                    policyVersion=sp_policy_version,
                    type=type,
                )
            except:
                raise ValidationError("Unable to update security policy.")
            return json.dumps(security_policy_response["securityPolicyDetail"], indent=4)

create_or_update_security_policy_response = create_or_update_security_policy(
    security_policy_description, collection_name, security_policy_string)
print(json.dumps(create_or_update_security_policy_response, indent=4))

#### Verify OpenSearch Serverless Security Policy Details

> This step is optional, since the code above will print the response which includes the security policy response after attempting to create or update an existing security policy.

In [None]:
import json

security_policy_response = opensearch_serverless_client.get_security_policy(
    name=security_policy_name,
    type='encryption'
)
print(json.dumps(security_policy_response["securityPolicyDetail"], indent=4))

#### Create OpenSearch Serverless Collection

In [None]:
# Get the storage config for the collection
storage_config_response = opensearch_serverless_client.get_storage_configuration(
    collectionId=collection_name
)

In [None]:
# Attempts to create a security policy. If it already exists, it updates the existing policy.
# Only collection descriptions can be updated.
# ref: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/opensearchserverless/client/create_collection.html
# ref: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/opensearchserverless/client/update_collection.html

# Declare the collection not found error
class CollectionNotFoundError(Exception):
    def __init__(self, collection_name: str):
        self.message = json.dumps({
            "error": f"No collection found with name {collection_name}. Unable to update collection."
        })
        super().__init__(self.message)


def create_or_update_collection(
    description: str,
    name: str,
    stand_by_replicas: str = 'DISABLED',
    type: str = 'VECTORSEARCH',
    ) -> json:
    """
    This function creates or updates an OpenSearch Serverless collection.

    Parameters:
    description (str): The description of the collection.
    name (str): The name of the collection.
    stand_by_replicas (str, optional): The standby replica setting for the collection. Defaults to 'DISABLED'.
    type (str, optional): The type of the collection. Defaults to 'VECTORSEARCH'.

    Returns:
    json: The response from the create or update collection operation.
    """
    try:
        print("Creating a new collection...")
        collection_response = opensearch_serverless_client.create_collection(
            description=description,
            name=name,
            standbyReplicas=stand_by_replicas,
            type=type,
        )
        return json.dumps(collection_response, indent=4)
    except opensearch_serverless_client.exceptions.ConflictException:
        print("Collection already exists. Updating the collection...")
        collection_response = opensearch_serverless_client.batch_get_collection(
            names=[collection_name],
        )
        collection_id = None
        for collection in collection_response["collectionDetails"]:
            if collection['name'] == collection_name:
                collection_id = collection['id']
                break
        if collection_id is None:
            raise CollectionNotFoundError(collection_name)
        else:
            collection_response = opensearch_serverless_client.update_collection(
                description=description,
                id=collection_id,
            )
            return json.dumps(collection_response["updateCollectionDetail"], indent=4)

create_or_update_collection_response = create_or_update_collection(collection_description, collection_name)
print(json.loads(create_or_update_collection_response))

#### Create Network Access

In [None]:
response = opensearch_serverless_client.create_access_policy(
    clientToken='string',
    description='string',
    name='string',
    policy='string',
    type='data'
)

# Attempts to create a security policy. If it already exists, it updates the existing policy.
# Only collection descriptions can be updated.
# ref: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/opensearchserverless/client/create_collection.html
# ref: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/opensearchserverless/client/update_collection.html

# Declare the collection not found error
class CollectionAcccessFoundError(Exception):
    def __init__(self, collection_name: str):
        self.message = json.dumps({
            "error": f"No access found for {collection_name}. Unable to update the access for the collection."
        })
        super().__init__(self.message)


def create_or_update_access(
    description: str,
    name: str,
    policy: str,
    type: str = 'data',
    ) -> json:
    """
    This function creates or updates an OpenSearch Serverless access policy for a collection.
    Access policies limit access to collections and the resources within them, and allow a 
    user to access that data irrespective of the access mechanism or network source.

    Parameters:
    description (str): The description of the collection.
    name (str): The name of the collection.
    policy (str): The policy access for the collection. Defaults to 'DISABLED'.
    type (str, optional): The type of the access policy. Defaults to 'data'.

    Returns:
    json: The response from the create or update data access policy operation.
    """
    try:
        print("Creating a new data access policy...")
        collection_response = opensearch_serverless_client.create_collection(
            description=description,
            name=name,
            standbyReplicas=stand_by_replicas,
            type=type,
        )
        return json.dumps(collection_response, indent=4)
    except opensearch_serverless_client.exceptions.ConflictException:
        print("Collection already exists. Updating the collection...")
        collection_response = opensearch_serverless_client.batch_get_collection(
            names=[collection_name],
        )
        collection_id = None
        for collection in collection_response["collectionDetails"]:
            if collection['name'] == collection_name:
                collection_id = collection['id']
                break
        if collection_id is None:
            raise CollectionNetworkAcccessFoundError(collection_name)
        else:
            collection_response = opensearch_serverless_client.update_collection(
                description=description,
                id=collection_id,
            )
            return json.dumps(collection_response["updateCollectionDetail"], indent=4)

create_or_update_collection_response = create_or_update_collection(collection_description, collection_name)
print(json.loads(create_or_update_collection_response))

### Create a Bedrock Knowledge Base

> **NOTE**: The storage configuration for the knowledge base requires the collection ARN so is defined after creating or updating the collection. The object is defined here for reference.

```python
storageConfiguration = {
    'type': 'OPENSEARCH_SERVERLESS',
    'opensearchServerlessConfiguration': {
        'collectionArn': 'string',
        'vectorIndexName': 'string',
        'fieldMapping': {
            'vectorField': 'string',
            'textField': 'string',
            'metadataField': 'string'
        }
    },
},
```

#### Set variables needed for the Storage Configuration

In [None]:
collection_arn = create_or_update_collection_response["arn"]
vector_index_name = 

In [None]:

# Attempts to create a knowledge base. If it already exists, it updates the existing knowledge base.
# Note: The collection must be in the ACTIVE state before you can create a knowledge base.
# ref: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/opensearchserverless/client/create_knowledge_base.html
# ref: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/bedrock-agent/client/update_knowledge_base.html

# Hard code role arn for simplicity. This role is created by the OpenSearch Serverless console. We could probably create this role
# using the Terraform setup in the Infrastructure as Code repo.
role_arn = 'arn:aws:iam::860100747351:role/service-role/AmazonBedrockExecutionRoleForKnowledgeBase_1nor9'

# Declare the knowledge base not found error
class KnowledgeBaseNotFoundError(Exception):
    def __init__(self, kb_name):
        self.message = json.dumps({
            "error": f"No knowledge base found for {name}. Unable to update security policy."
        })
        super().__init__(self.message)


def create_or_update_kb(
    name: str,
    description: str,
    role_arn: str,
    kb_config: dict,
    storage_config: dict,
    ) -> json:
    """
    """
    try:
        kb_response = opensearch_serverless_client.create_knowledge_base(
            name=name,
            description=description,
            roleArn=role_arn,
            knowledgeBaseConfiguration=kb_config,
            storageConfiguration=storage_config,
        )
        return json.dumps(kb_response, indent=4)
    except opensearch_serverless_client.exceptions.ConflictException:
        print("Knowledge base already exists. Updating the knowledge base...")
        kb_response = opensearch_serverless_client.get_knowledge_base(
            names=[name],
        )
        kb_id = None
        for kb in kb_response["knowledgeBase"]:
            if kb['name'] == kb_name:
                kb_id = kb['knowledgeBaseId']
                break
        if kb_id is None:
            raise CollectionNotFoundError(kb_name)
        else:
            kb_response = opensearch_serverless_client.update_knowledge_base(
                knowledgeBaseId=kb_id,
                name=kb_name,
                description=description,
                roleArn=role_arn,
                knowledgeBaseConfiguration=kb_config,
                storageConfiguration=storage_config,
            )
            return json.dumps(kb_response["knowledgeBase"], indent=4)

create_or_update_kb_response = create_or_update_kb(kb_name, kb_description, service_role_arn, kb_config, storage_config)
print(json.dumps(create_or_update_kb_response, indent=4))