# Bedrock Knowledge Base

In [1]:
# Installing the necessary Python modules on Kaggle
PIP_INSTALLS = [
    'langchain',
    'langchain-core',
    'langchain-community',
    'langchain-openai',
    'ragas',
    'boto3',
    'botocore'
]

from pip_install import perform_pip_install
# perform_pip_install(pip_installs = PIP_INSTALLS)

In [2]:
# Importing the necessary Python libraries
import os
import json
import time
from datetime import date
from pathlib import Path

import boto3
import pandas as pd
from ragas.metrics import *
from ragas.metrics.critique import *
from botocore.exceptions import ClientError

from load_api_keys import load_aws_creds

In [3]:
# Loading my personal AWS credentials from Kaggle Secrets
aws_creds = load_aws_creds()
os.environ['AWS_ACCESS_KEY_ID'] = aws_creds['AWS_ACCESS_KEY']
os.environ['AWS_SECRET_ACCESS_KEY'] = aws_creds['AWS_SECRET_KEY_ID']

# Setting the default AWS region
os.environ['AWS_DEFAULT_REGION'] = 'us-east-1'
os.environ['AWS_REGION'] = 'us-east-1'

In [4]:
# Loading the knowledge items (KIs) as a Pandas DataFrame
df_kis = pd.read_csv('/kaggle/input/synthetic-it-related-knowledge-items/synthetic_knowledge_items.csv')
df_kis = df_kis[['ki_topic', 'ki_text']]

In [5]:
# Setting a prefix for any resource we create
resource_prefix = 'bedrock-kb-for-kis'

## Data Preparation

In [6]:
# Instantiating a directory to hold the files we'll upload to AWS
aws_files_dir = 'aws_files'
if not os.path.isdir(aws_files_dir):
    os.mkdir(aws_files_dir)

In [7]:
# Getting today's date formatted as string
# todays_date = date.today().strftime('%Y_%m_%d')
todays_date = '2024_07_05'
todays_date

'2024_07_05'

In [8]:
# Iterating over each KI in the DataFrame
for index, row in df_kis.iterrows():
    
    # Extracting the KI information from the row
    ki_text = row['ki_text']
    ki_topic = row['ki_topic']
    
    # Converting the KI topic name to be used as a filename
    formatted_ki_topic = ki_topic.lower().replace(' ', '_')
    ki_filename = f'{formatted_ki_topic}_{todays_date}'
    
    # Setting the metadata associated to the KI
    ki_metadata = {
        'metadataAttributes': {
            'is_current': True,
            'created_date': todays_date,
            'last_updated': todays_date,
            'ki_topic': ki_topic,
            'department': 'information_technology'
        }
    }
    
    # Writing the KI text to a .txt file
    if not Path(f'{aws_files_dir}/{ki_filename}.txt').is_file():
        with open(f'{aws_files_dir}/{ki_filename}.txt', 'w') as txt_file:
            txt_file.write(ki_text)
    
    # Writing the KI metadata to a .metadata.json
    if not Path(f'{aws_files_dir}/{ki_filename}.metadata.json').is_file():
        with open(f'{aws_files_dir}/{ki_filename}.metadata.json', 'w') as json_file:
            json.dump(ki_metadata, json_file, indent = 4)

# Upload to AWS

In [9]:
# Instantiating the S3 client from boto3
s3_client = boto3.client(service_name = 's3')

# Setting a name for our S3 bucket
s3_bucket_name = 'dkhundley-ki-docs-for-bedrock-knowledge-base'

In [None]:
# Creating the S3 bucket if it does not already exist
try:
    s3_client.head_bucket(Bucket = s3_bucket_name)
    print('Bucket already exists')
except ClientError as e:
    error_code = e.response['Error']['Code']
    if error_code == '404':
        print(f'{s3_bucket_name} does not exist. Creating...')
        s3_client.create_bucket(Bucket = s3_bucket_name)
    else:
        print(f'Error checking bucket status. Error details: {e}')

In [10]:
# Uploading each file to AWS
for file in os.listdir(aws_files_dir):
    
    # Setting the file path
    filepath = f'{aws_files_dir}/{file}'
    
    # Checking to see if the file already exists in AWS
    try:
        s3_client.head_object(Bucket = s3_bucket_name, Key = file)
        
    except ClientError as e:
        if e.response['Error']['Code'] == '404':
            
            # Uploading the file to S3
            print(f'Uploading {file} to {s3_bucket_name} Bucket...')
            s3_client.upload_file(
                Filename = filepath,
                Bucket = s3_bucket_name,
                Key = file
            )
            print(f'{file} upload successful!')
            

Uploading troubleshooting_issues_with_skype_for_business_2024_07_05.metadata.json to dkhundley-ki-docs-for-bedrock-knowledge-base Bucket...
troubleshooting_issues_with_skype_for_business_2024_07_05.metadata.json upload successful!
Uploading configuring_a_network_scanner_2024_07_05.txt to dkhundley-ki-docs-for-bedrock-knowledge-base Bucket...
configuring_a_network_scanner_2024_07_05.txt upload successful!
Uploading creating_a_new_it_change_request_2024_07_05.metadata.json to dkhundley-ki-docs-for-bedrock-knowledge-base Bucket...
creating_a_new_it_change_request_2024_07_05.metadata.json upload successful!
Uploading creating_a_new_distribution_list_in_exchange_2024_07_05.metadata.json to dkhundley-ki-docs-for-bedrock-knowledge-base Bucket...
creating_a_new_distribution_list_in_exchange_2024_07_05.metadata.json upload successful!
Uploading setting_up_a_new_printer_on_the_network_2024_07_05.txt to dkhundley-ki-docs-for-bedrock-knowledge-base Bucket...
setting_up_a_new_printer_on_the_network

## Creating the Bedrock Knowledge Base

### Creating a Custom IAM Role

In [None]:
# Setting the name of the IAM role
role_name = f'{resource_prefix}-role'

# Instantiating an IAM boto3 client
iam_client = boto3.client('iam')

# Defining the trust relationship policy information
trust_relationship_info = {
    'Version': '2012-10-17',
    'Statement': [
        {
            'Effect': 'Allow',
            'Principal': {
                'Service': 'bedrock.amazonaws.com'
            },
            'Action': 'sts:AssumeRole'
        }
    ]
}

# Defining the IAM policy information
policy_info = {
    'Version': '2012-10-17',
    'Statement': [
        {
            "Effect": "Allow",
            "Action": [
                "bedrock:CreateKnowledgeBase",
                "bedrock:GetKnowledgeBase",
                "bedrock:UpdateKnowledgeBase",
                "bedrock:DeleteKnowledgeBase",
                "bedrock:CreateDataSource",
                "bedrock:GetDataSource",
                "bedrock:UpdateDataSource",
                "bedrock:DeleteDataSource",
                "bedrock:ListKnowledgeBases",
                "bedrock:ListDataSources"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "aoss:CreateCollection",
                "aoss:DeleteCollection",
                "aoss:GetCollection",
                "aoss:ListCollections",
                "aoss:BatchGetCollection",
                "aoss:CreateSecurityPolicy",
                "aoss:GetSecurityPolicy",
                "aoss:ListSecurityPolicies"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "ec2:DescribeVpcs",
                "ec2:DescribeSecurityGroups"
            ],
            "Resource": "*"
        }

    ]
}

In [None]:
# Creating the IAM role / policy if they do not already exist
try:
    
    # Creating the IAM role
    response = iam_client.create_role(
        RoleName = role_name,
        AssumeRolePolicyDocument = json.dumps(trust_relationship_info)
    )
    role_arn = response['Role']['Arn']
    print(f'IAM Role created: {role_arn}')
    
    # Creating the IAM policy
    policy_name = f'{role_name}-policy'
    policy_response = iam_client.create_policy(
        PolicyName = policy_name,
        PolicyDocument = json.dumps(policy_info)
    )
    policy_arn = policy_response['Policy']['Arn']
    print(f'IAM Policy created: {policy_arn}')
    
    # Atttaching the IAM policy to the ARN
    iam_client.attach_role_policy(
        RoleName = role_name,
        PolicyArn = policy_arn
    )
    time.sleep(10)
    print(f'Policy attached to role {role_name}')
    
except iam_client.exceptions.EntityAlreadyExistsException:
    print(f'IAM role {role_name} already exists')
    role = iam_client.get_role(RoleName = role_name)
    role_arn = role['Role']['Arn']

### Creating an OpenSearch Serverless Collection

In [None]:
# Instantiating a boto3 client for OpenSearch Serverless 
opensearch_serverless_client = boto3.client('opensearchserverless')

# Setting the name for the OpenSearch Serverless collection
opensearch_serverless_collection_name = f'{resource_prefix}-ossc'
opensearch_serverless_collection_name

In [None]:
def get_or_create_oss_security_policy(collection_name):
    '''
    Gets or creates the OpenSearch Serverless collection's security policy
    
    Inputs:
        - collection_name (str): The name of the collection
        
    Returns:
        - policy_name (str): The name of the policy
    '''
    
    # Setting the name of the policy
    policy_name = f'{collection_name}-policy'
    
    # Creating the policy if it does not already exist
    try:
        response = opensearch_serverless_client.create_security_policy(
            name = policy_name,
            policy = """
            {
                "Rules": [
                    {
                        "ResourceType": "collection",
                        "Resource": [
                            "collection/"""+ collection_name + """"
                        ]
                    }
                ],
                "AWSOwnedKey": true
            }
            """,
            type = 'encryption'
        )
        
        print(f'Security policy created: {policy_name}')
        
    except opensearch_serverless_client.exceptions.ConflictException:
        print(f'Security policy {policy_name} already exists')
        
    return policy_name

In [None]:
def create_oss_collection_for_bedrock_kb(collection_name, policy_name):
    '''
    Creates the OpenSearch Serverless collection to serve as the backend for Bedrock Knowledge Base
    
    Inputs:
        - collection_name (str): The name we want for the OpenSearch Serverless collection
        - policy_name (str): The name of policy that will be associated to the OpenSearch Serverless collection
    
    Returns:
        - oss_collection_arn (str): The OpenSearch Serverless collection ARN
    '''
    
    # Creating the OpenSearch Serverless Collection
    response = opensearch_serverless_client.create_collection(
        name = collection_name,
        description = 'Collection for Bedrock Knowledge Base',
        type = 'SEARCH',
    )
    
    # Getting the OpenSearch Serverless Collection ARN
    oss_collection_arn = response['createCollectionDetail']['arn']
    print('OpenSearch Serverless collection created. Waiting to be ready...')
    
    # Waiting the for the collection to become active
    if wait_for_collection_active(collection_name = collection_name):
        
        return oss_collection_arn

In [None]:
def get_or_create_opensearch_serverless_collection(collection_name):
    '''
    Gets or creates the OpenSearch Collection
    
    Inputs:
        - collection_name (str): The name we want for the OpenSearch Serverless collection
        
    Returns
        - oss_collection_arn (str): The ARN of the OpenSearch Serverless colection
    '''
    
    # Getting or creating the security policy to be associated to the OpenSearch Serverless Collection
    policy_name = get_or_create_oss_security_policy(collection_name = collection_name)
    
    # Checking to see that the OpenSearch Serverless collection has not yet been created
    existing_collections = opensearch_serverless_client.list_collections()

    if len(existing_collections['collectionSummaries']) == 0:
        print('No OpenSearch Serverless collections found. Creating...')

        # Creating the OpenSearch Serverless Collection
        oss_collection_arn = create_oss_collection_for_bedrock_kb(
            collection_name = collection_name,
            policy_name = policy_name
        )
        
        return oss_collection_arn

    else:

        # Checking through all the existing collections to check for any matching names
        for oss_collection in existing_collections['collectionSummaries']:

            if oss_collection['name'] == opensearch_serverless_collection_name:
                print(f'Collection already exists. ARN: {oss_collection["arn"]}')
                oss_collection_arn = oss_collection['arn']
                return oss_collection_arn
        
        # Creating the OpenSearch Serverless Collection
        oss_collection_arn = create_oss_collection_for_bedrock_kb(
            collection_name = collection_name,
            policy_name = policy_name
        )
        
        return oss_collection_arn

In [None]:
def wait_for_collection_active(collection_name, max_attempts = 20, delay = 30):
    '''
    Checks to see if an OpenSearch Serverless collection is active (ready)
    
    Inputs:
        - collection_name (str): The name of the collection,
        - max_attempts (int): The total number of attempts to try polling
        - delay (int): The number of seconds in between each poll
        
    Returns:
        - (boolean): A boolean value where True indicates that the collection is active
    '''
    
    # Iterating through each attempt
    for attempt in range(max_attempts):
        
        try:
            
            # Checking the status using the OpenSearch Serverless client
            response = opensearch_serverless_client.batch_get_collection(names = [collection_name])
            
            # Extracting the specific status
            status = response['collectionDetails'][0]['status']
            
            if status == 'ACTIVE':
                print(f'Collection {collectiion_name} is now active!')
                return True
            
            elif status in ['FAILED', 'DELETED']:
                print(f'Collection {collection_name} enetered {status} state')
                return False
            
            else:
                print(f'Collection {collection_name} is in {status} state. Waiting...')
                
        except Exception as e:
            print(f'Error checking colelction status: {str(e)}')
            
        
        # Waiting between each attempt
        time.sleep(delay)
        
        # Giving up if maximum attempts have been reached
        print(f'Timeout waiting for collection {collection_name} to become active')
        
        return False

In [None]:
# Getting the ARN associated to the newly created or already existing OpenSearch Serverless Collection
oss_collection_arn = get_or_create_opensearch_serverless_collection(collection_name = opensearch_serverless_collection_name)

### Building Bedrock Knowledge Base