Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates for more flexible streaming options #642

Merged
merged 2 commits into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
175 changes: 94 additions & 81 deletions README-streaming-responses.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,74 +31,78 @@ Once the stack creation has completed, the resources you'll need to enable strea

Here is an example fullfilment lambda that could be used for integrating to Amazon Bedrock, in this case using the Claude instant model for answering questions. In the example below, note a few important steps that will be needed for your fullfilment lambda to properly stream to the Web UI.

1. The Lambda will need two variables from the CloudFormation template - the name of the Dynamo table that was created and the endpoint URL of the API Gateway that was created (note that this should be an https URL not a wss URL).
1. The Lambda will need two variables coming from the session attributes of the Web UI - the name of the Dynamo table that was created and the endpoint URL of the API Gateway that was created. If these attributes are not present then streaming is not enabled on the Web UI.

2. The fullfilment Lambda must get items off the DynamoDB table, using the session ID as the key. When a user initiates a session, the session ID is sent to the Websocket API and stored along with a connection ID in DynamoDB. The fullfilment Lambda needs to get this value so it knows where to push its streaming updates.
2. The fullfilment Lambda must get items from the DynamoDB table, using the session ID as the key. When a user initiates a session, the session ID is sent to the Websocket API and stored along with a connection ID in DynamoDB. The fullfilment Lambda needs to get this value so it knows where to push its streaming updates.

3. As the response is processing (in this case our LLM is returning chunks of the response), we push these updates to our Websocket using ```apigatewaymanagementapi.post_to_connection``` which will send those updates back to the client via the Websocket connection.


```
import boto3
import os
import json
from asyncio import get_event_loop, gather, sleep

AWS_REGION = os.environ["AWS_REGION"]
ENDPOINT_URL = os.environ.get("ENDPOINT_URL", f'https://bedrock-runtime.{AWS_REGION}.amazonaws.com')
modelId = "anthropic.claude-instant-v1"
accept = "application/json"
contentType = "application/json"

dynamodb_client = boto3.resource('dynamodb')
bedrock_runtime = boto3.client(service_name='bedrock-runtime', region_name=AWS_REGION, endpoint_url=ENDPOINT_URL)
wstable = dynamodb_client.Table('streaming-chat-table') ## Your DDB table name
s3 = boto3.client('s3')


### Initialize your apigateway client
apigatewaymanagementapi = boto3.client(
'apigatewaymanagementapi',
endpoint_url = 'https://XXXXX.execute-api.us-east-1.amazonaws.com/Prod'
## Your APIGateway endpoint. Format: https://{api-id}.execute-api.{region}.amazonaws.com/{stage}
)

def lambda_handler(event, context):
print('event: ', event)
response = router(event, context)
return response

def router(event, context):
intent_name = event['sessionState']['intent']['name']

# Dispatch to your bot's intent handlers
sess_id = event['sessionId']

#if intent_name == 'FallbackIntent':
result = get_event_loop().run_until_complete(openai_async_api_handler(event, context))
print(result)
return result

raise Exception('Intent with name ' + intent_name + ' not supported')

## Note that the prefix async
async def openai_async_api_handler(event, context):
import boto3
import os
import json
from asyncio import get_event_loop, gather, sleep

AWS_REGION = os.environ["AWS_REGION"]
ENDPOINT_URL = os.environ.get("ENDPOINT_URL", f'https://bedrock-runtime.{AWS_REGION}.amazonaws.com')
modelId = "anthropic.claude-instant-v1"
accept = "application/json"
contentType = "application/json"

dynamodb_client = boto3.resource('dynamodb')
bedrock_runtime = boto3.client(service_name='bedrock-runtime', region_name=AWS_REGION, endpoint_url=ENDPOINT_URL)
s3 = boto3.client('s3')

def lambda_handler(event, context):
print('event: ', event)
response = router(event, context)
return response


def router(event, context):
intent_name = event['sessionState']['intent']['name']

# Dispatch to your bot's intent handlers
sess_id = event['sessionId']

#if intent_name == 'FallbackIntent':
result = get_event_loop().run_until_complete(openai_async_api_handler(event, context))
print(result)
return result

raise Exception('Intent with name ' + intent_name + ' not supported')


## Note that the prefix async
async def openai_async_api_handler(event, context):

sessionId = event['sessionId']
inputTranscript = event['inputTranscript']
body = json.dumps({"prompt": "Human: " + inputTranscript + "Assistant:", "max_tokens_to_sample": 500})
session_attributes = get_session_attributes(event)
print('sessionId ', sessionId)
print('inputTranscript ', inputTranscript)
fullreply = '';

#If streaming, call Bedrock with streaming and push chunks to Websocket
if hasattr(session_attributes, 'streamingDynamoDbTable') and hasattr(session_attributes, 'streamingEndpoint'):
apigatewaymanagementapi = boto3.client(
'apigatewaymanagementapi',
endpoint_url = session_attributes['streamingEndpoint']
)

sessionId = event['sessionId']
inputTranscript = event['inputTranscript']
print('sessionId ', sessionId)
print('inputTranscript ', inputTranscript)
wstable = dynamodb_client.Table(session_attributes['streamingDynamoDbTable'])
db_response = wstable.get_item(Key={'sessionId': sessionId})
print (db_response)
connectionId = db_response['Item']['connectionId']
print('Get ConnectionID ', connectionId)

body = json.dumps({"prompt": "Human: " + inputTranscript + "Assistant:", "max_tokens_to_sample": 500})

response = bedrock_runtime.invoke_model_with_response_stream(
body=body, modelId=modelId, accept=accept, contentType=contentType
)
stream = response.get('body')
fullreply = ''


if stream:
for streamEvent in stream:
chunk = streamEvent.get('chunk')
Expand All @@ -111,41 +115,50 @@ Here is an example fullfilment lambda that could be used for integrating to Amaz
Data=text,
ConnectionId=connectionId
)
#display_markdown(Markdown(''.join(fullreply)))
#If not streaming, generate a reponse and return
else:
response = bedrock_runtime.invoke_model(
body=body, modelId=modelId, accept=accept, contentType=contentType
)
response_body = json.loads(response["body"].read())
fullreply = response_body["completion"]

message = {
'contentType': 'CustomPayload',
'content': fullreply
}
fulfillment_state = "Fulfilled"

return close(event, session_attributes, fulfillment_state, message)


message = {
'contentType': 'CustomPayload',
'content': fullreply
}
fulfillment_state = "Fulfilled"
session_attributes = get_session_attributes(event)
return close(event, session_attributes, fulfillment_state, message)
#------------------------------------------------------------

#------------------------------------------------------------

def get_session_attributes(intent_request):
print(intent_request)
sessionState = intent_request['sessionState']
if 'sessionAttributes' in sessionState:
return sessionState['sessionAttributes']
def get_session_attributes(intent_request):
print(intent_request)
sessionState = intent_request['sessionState']
if 'sessionAttributes' in sessionState:
print('Session Attributes', sessionState['sessionAttributes'])
return sessionState['sessionAttributes']

return {}
return {}


def close(intent_request, session_attributes, fulfillment_state, message):
intent_request['sessionState']['intent']['state'] = fulfillment_state
def close(intent_request, session_attributes, fulfillment_state, message):
intent_request['sessionState']['intent']['state'] = fulfillment_state

return {
'sessionState': {
'dialogAction': {
'type': 'Close'
},
'intent': intent_request['sessionState']['intent']
return {
'sessionState': {
'dialogAction': {
'type': 'Close'
},
'messages': [message],
'sessionId': intent_request['sessionId'],
'requestAttributes': intent_request['requestAttributes'] if 'requestAttributes' in intent_request else None
}
'intent': intent_request['sessionState']['intent']
},
'messages': [message],
'sessionId': intent_request['sessionId'],
'requestAttributes': intent_request['requestAttributes'] if 'requestAttributes' in intent_request else None
}
```

## Example Demo
Expand Down
1 change: 1 addition & 0 deletions config/base.env.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ module.exports = {
retryCountPostTextTimeout: process.env.BOT_RETRY_COUNT_POST_TEXT_TIMEOUT,
allowStreamingResponses: (process.env.ALLOW_STREAMING_RESPONSES === undefined) ? undefined : (process.env.ALLOW_STREAMING_RESPONSES === 'true') ? true : false,
streamingWebSocketEndpoint: process.env.STREAMING_WEB_SOCKET_ENDPOINT,
streamingDynamoDbTable: process.env.STREAMING_DYNAMO_TABLE,
},
ui: {
parentOrigin: process.env.PARENT_ORIGIN,
Expand Down
4 changes: 1 addition & 3 deletions lex-web-ui/.browserslistrc
Original file line number Diff line number Diff line change
@@ -1,3 +1 @@
> 1%
last 2 versions
not dead
defaults and fully supports es6-module
4 changes: 2 additions & 2 deletions lex-web-ui/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@
"worker-loader": "^3.0.8"
},
"engines": {
"node": ">=16.0.0",
"npm": ">=7.8.0"
"node": ">=18.0.0",
"npm": ">=10.0.0"
},
"overrides": {
"postcss": "8.4.32"
Expand Down
10 changes: 10 additions & 0 deletions lex-web-ui/src/components/InputContainer.vue
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,16 @@ export default {
}).toString();
}

// If streaming, send session attributes for streaming
if(this.$store.state.config.lex.allowStreamingResponses){
// Replace with an HTTP endpoint for the fullfilment Lambda
const streamingEndpoint = this.$store.state.config.lex.streamingWebSocketEndpoint.replace('wss://', 'https://');
this.$store.dispatch('setSessionAttribute',
{ key: 'streamingEndpoint', value: streamingEndpoint });
this.$store.dispatch('setSessionAttribute',
{ key: 'streamingDynamoDbTable', value: this.$store.state.config.lex.streamingDynamoDbTable });
}

return this.$store.dispatch('postTextMessage', message)
.then(() => {
this.textInput = '';
Expand Down
28 changes: 20 additions & 8 deletions lex-web-ui/src/components/Message.vue
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,7 @@
<message-text
:message="message"
v-if="'text' in message && message.text !== null && message.text.length && !shouldDisplayInteractiveMessage"
></message-text>
<v-icon
v-if="message.type === 'bot' && message.id !== $store.state.messages[0].id"
class="copy-icon"
@click="copyMessageToClipboard(message.text)"
>
content_copy
</v-icon>
></message-text>
<div
v-if="shouldDisplayInteractiveMessage && message.interactiveMessage.templateType == 'ListPicker'">
<v-card-title primary-title>
Expand Down Expand Up @@ -97,6 +90,13 @@
</v-datetime-picker>
<v-btn v-on:click="sendDateTime(datetime)" variant="flat">Confirm</v-btn>
</div>
<v-icon
v-if="message.type === 'bot' && message.id !== $store.state.messages[0].id && showCopyIcon"
class="copy-icon"
@click="copyMessageToClipboard(message.text)"
>
content_copy
</v-icon>
<div
v-if="message.id === this.$store.state.messages.length - 1 && isLastMessageFeedback && message.type === 'bot' && botDialogState && showDialogFeedback"
class="feedback-state"
Expand Down Expand Up @@ -295,6 +295,9 @@ export default {
showDialogStateIcon() {
return this.$store.state.config.ui.showDialogStateIcon;
},
showCopyIcon() {
return this.$store.state.config.ui.showCopyIcon;
},
showMessageMenu() {
return this.$store.state.config.ui.messageMenu;
},
Expand Down Expand Up @@ -630,6 +633,15 @@ export default {
color: red;
}

.copy-icon {
display: inline-flex;
align-self: center;
}

.copy-icon:hover{
color: grey;
}

.response-card {
justify-content: center;
width: 85vw;
Expand Down
8 changes: 7 additions & 1 deletion lex-web-ui/src/config/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,11 @@ const configDefault = {
// allows the Lex bot to use streaming responses for integration with LLMs or other streaming protocols
allowStreamingResponses: false,

// web socket endpoint for streaming
// web socket endpoint for streaming
streamingWebSocketEndpoint: '',

// dynamo DB table for streaming
streamingDynamoDbTable: '',
},

polly: {
Expand Down Expand Up @@ -227,6 +230,9 @@ const configDefault = {
// Show the diaglog state icon, check or alert, in the text bubble
showDialogStateIcon: true,

// Give the ability for users to copy the text from the bot
showCopyIcon: false,

// Hide the message bubble on a response card button press
hideButtonMessageBubble: false,

Expand Down
1 change: 1 addition & 0 deletions lex-web-ui/src/lex-web-ui.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import VuexStore from '@/store';

import { config as defaultConfig, mergeConfig } from '@/config';
import { createApp, defineAsyncComponent } from 'vue';
import { createAppDev } from 'vue/dist/vue.esm-bundler.js';
import { aliases, md } from 'vuetify/iconsets/md';
import { createStore } from 'vuex';

Expand Down
8 changes: 3 additions & 5 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,10 @@
"webpack-dev-server": "^4.15.1"
},
"engines": {
"node": ">=16.0.0",
"npm": ">=7.8.0"
"node": ">=18.0.0",
"npm": ">=10.0.0"
},
"browserslist": [
"> 1%",
"last 2 versions",
"ie >= 11"
"defaults and fully supports es6-module"
]
}
3 changes: 2 additions & 1 deletion src/config/default-lex-web-ui-loader-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
"v2BotAliasId": "",
"v2BotLocaleId": "",
"allowStreamingResponses": false,
"streamingWebSocketEndpoint": ""
"streamingWebSocketEndpoint": "",
"streamingDynamoDbTable": ""
},
"ui": {
"parentOrigin": "http://localhost:8000",
Expand Down
2 changes: 2 additions & 0 deletions templates/codebuild-deploy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -936,6 +936,8 @@ Resources:
Value: !Ref AllowStreamingResponses
- Name: STREAMING_WEB_SOCKET_ENDPOINT
Value: !If [EnableStreaming, !Sub "wss://${StreamingSupport.Outputs.WebSocketId}.execute-api.${AWS::Region}.amazonaws.com/Prod", ""]
- Name: STREAMING_DYNAMO_TABLE
Value: !If [EnableStreaming, !Sub "${StreamingSupport.Outputs.DynamoTableName}", ""]
- Name: ENABLE_UPLOAD
Value: !Ref ShouldEnableUpload
- Name: UPLOAD_BUCKET_NAME
Expand Down
2 changes: 1 addition & 1 deletion templates/streaming-support.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ Resources:
ApiKey:
Type: 'AWS::ApiGateway::ApiKey'
Properties:
Name: StreamingApiKey
Name: !Join ["-", [!Ref ParentStackName, "StreamingApiKey"]]
Enabled: 'true'

UsagePlanKey:
Expand Down