In [None]:
import boto3
import botocore.exceptions
import time

In [None]:
StackName = 'BedrockStreamIngest'
KafkaTopic = 'streamtopic'

In [None]:
# Get MSK Cluster ARN (Created via CloudFormation)

cf_client = boto3.client('cloudformation')

try:
    describe_stack_resource_response = cf_client.describe_stack_resource(
        StackName = StackName,
        LogicalResourceId = 'MSKCluster'
    )
    
    MSKClusterArn = describe_stack_resource_response['StackResourceDetail']['PhysicalResourceId']
    print('MSK Cluster ARN:', MSKClusterArn)

except botocore.exceptions.ClientError as error:
    print(error)
    raise error

In [None]:
# Get MSK cluster client connection string (Created via CloudFormation)

kafka_client = boto3.client('kafka')

try:
    get_bootstrap_brokers_response = kafka_client.get_bootstrap_brokers(
        ClusterArn = MSKClusterArn
    )
    BootstrapBrokerString = get_bootstrap_brokers_response['BootstrapBrokerString']
    print(BootstrapBrokerString)
    
except botocore.exceptions.ClientError as error:
    print(error)
    raise error

Run terminal commands now



Apache Kafka client installation from SageMaker Studio terminal window

- Navigate to SageMaker Studio environment.
- In the top menu, hit "File" and choose "New" -> "Terminal".
- Once a terminal window is fully available, execute commands in below cells.
- Replace the cluster-connection-string in commands below with Kafka connection string previously saved off in a temporary cation.

1. Prepare the terminal environment.

```
sudo yum -y update
sudo yum -y install java-11
sudo yum -y install wget
```

If it is a Ubuntu system (easily find it by checking /etc/os-release file), run below commands instead  -

```
sudo apt -y update
sudo apt-get -y install openjdk-11-jdk
sudo apt -y install wget
```

2. Create directory for Apache Kafka client download

```
mkdir kafka
chmod 777 kafka
cd kafka
```

3. Download and install Apache Kafka client

```
sudo wget https://archive.apache.org/dist/kafka/3.6.0/kafka_2.13-3.6.0.tgz
tar -xzf kafka_2.13-3.6.0.tgz
chmod 777 kafka_2.13-3.6.0/libs
cd kafka_2.13-3.6.0/libs
sudo wget https://github.com/aws/aws-msk-iam-auth/releases/download/v1.1.1/aws-msk-iam-auth-1.1.1-all.jar
cd ./../../kafka_2.13-3.6.0/
```

4. Create source topic (replace cluster-connection-string with MSK cluster connection string retrieved in cell above)

```
bin/kafka-topics.sh --create --bootstrap-server <cluster-connection-string> --replication-factor 2 --partitions 1 --topic streamtopic
```

5. Verify created topics (replace cluster-connection-string with connection string stored in temporary location)

```
bin/kafka-topics.sh --bootstrap-server <cluster-connection-string> --list
```

6. To make cleanup easier remove the kafka client. Re-install using steps above if it had to be referred again.

```
cd ..
rm -rf kafka_2.13-3.6.0
rm -rf kafka_2.13-3.6.0.tgz
```

7. Exit out of the terminal window

```
exit
```

Create a Bedrock Knowledge Base via the AWS console

- Navigate to "Knowledge Bases" page within Amazon Bedrock service AWS console page.
- Find the "Create" button and choose "Knowledge Base with vector store" option.
- For "Knowledge Base name", provide "BedrockStreamIngestKnowledgeBase"
- For Data Source, choose "Custom".
- In Data Source configuration page, provide the name as "BedrockStreamIngestKBCustomDS"
- For Embeddings Model, choose "Titan Text Embeddings v2"
- Leave the rest as defaults and hit "Create Knowledge Base" button.
- Wait until the Knowledge Base is created.

In [None]:
# Get Knowledge Base ID (Created manually)

bedrock_agent_client = boto3.client('bedrock-agent')
KBId = "None"
try:
    list_knowledge_bases_response = bedrock_agent_client.list_knowledge_bases(
        maxResults=100
    )
   
    for knowledge_base in list_knowledge_bases_response['knowledgeBaseSummaries']:
        if StackName in knowledge_base['name']: 
            KBId = knowledge_base['knowledgeBaseId']

except botocore.exceptions.ClientError as error:
    print(error)
    raise error

print(KBId)

In [None]:
# Get the custom data source ID (Created manually)

bedrock_agent_client = boto3.client('bedrock-agent')
DSId = "None"
try:
    list_data_sources_response = bedrock_agent_client.list_data_sources(
        knowledgeBaseId = KBId
    )
  
    for data_source in list_data_sources_response['dataSourceSummaries']:
        if StackName in data_source['name']: 
            DSId = data_source['dataSourceId']

except botocore.exceptions.ClientError as error:
    print(error)
    raise error
    
print(DSId)

In [None]:
# Get the physical ID of the Lambda consumer function (created via CloudFormation)

cf_client = boto3.client('cloudformation')

try:
    describe_stack_resource_response = cf_client.describe_stack_resource(
        StackName = StackName,
        LogicalResourceId = 'KafkaConsumerLambdaFunction'
    )
    
    LambdaFunctionName = describe_stack_resource_response['StackResourceDetail']['PhysicalResourceId']
    print('Lambda Function Name:', LambdaFunctionName)

except botocore.exceptions.ClientError as error:
    print(error)
    raise error

In [None]:
# Set the KB ID and DS ID as Environment Variables of the consumer Lambda funciton

lambda_client = boto3.client('lambda')

try:
    update_function_configuration_response = lambda_client.update_function_configuration(
        FunctionName=LambdaFunctionName,
        Environment={
            'Variables': {
                'KBID': KBId,
                'DSID': DSId
            }
        }
    )
    print(update_function_configuration_response)
    
except botocore.exceptions.ClientError as error:
    print(error)
    raise error

In [None]:
# Verify if the Environment Variables are set within consumer Lambda function

lambda_client = boto3.client('lambda')

try:
    get_function_response = lambda_client.get_function(
        FunctionName=LambdaFunctionName
    )
    print(get_function_response['Configuration']['Environment']['Variables'])

except botocore.exceptions.ClientError as error:
    print(error)
    raise error


In [None]:
# Add MSK topic trigger to the consumer Lambda function

lambda_client = boto3.client('lambda')

try:
    create_event_source_mapping_response = lambda_client.create_event_source_mapping(
        EventSourceArn=MSKClusterArn,
        FunctionName=LambdaFunctionName,
        StartingPosition='LATEST',
        Enabled=True,
        Topics=['streamtopic']
    )
    print(create_event_source_mapping_response)
except botocore.exceptions.ClientError as error:
    print(error)
    raise error

In [None]:
# Ensure the MSK trigger is fully enabled

lambda_client = boto3.client('lambda')

max_time = time.time() + 3*60*60 # 3 hours
while time.time() < max_time:
    try:
        list_event_source_mappings_response = lambda_client.list_event_source_mappings(
            FunctionName = LambdaFunctionName,
            EventSourceArn=MSKClusterArn
        )
        for mapping in list_event_source_mappings_response['EventSourceMappings']:
            print('Event source mapping UUID:', mapping['UUID'])
            print('Event source enablment status:', mapping['State'])
            status = mapping['State']
    except botocore.exceptions.ClientError as error:
        print(error)
        raise error
    
    if status == "Enabled" or status == "Disabled":
        break        
    time.sleep(30)

In [None]:
%store StackName
%store KafkaTopic
%store LambdaFunctionName
%store KBId
%store DSId
%store BootstrapBrokerString
%store MSKClusterArn