diff --git a/static/files/dynamodb-opensearch-zetl/OpenSearchPipeline/bedrock_query.py b/static/files/dynamodb-opensearch-zetl/OpenSearchPipeline/bedrock_query.py index 6cefca6..d32e2f8 100644 --- a/static/files/dynamodb-opensearch-zetl/OpenSearchPipeline/bedrock_query.py +++ b/static/files/dynamodb-opensearch-zetl/OpenSearchPipeline/bedrock_query.py @@ -67,21 +67,23 @@ def product_recommend(input_text, language, region, opensearch_host, model_id): llm_prompt = 'Human: 你现在是一个导购客服,需要帮助客户推荐商品,根据商品的描述信息,给客户推荐具体的商品名称和编号. 客户的问题如下: ' + input_text + ',你必须基于以下商品信息进行推荐.适当的时候如果客户问题不清晰,可以反问一些关键信息.' + str(es_res) + ' Assistant:' llm_request_body = json.dumps({ - "prompt": llm_prompt, - "max_tokens_to_sample": 4000, + "anthropic_version": "bedrock-2023-05-31", + "messages": [ + {"role": "user", "content": llm_prompt} + ], + "max_tokens": 4000, "temperature": 0.1, "top_p": 0.9, }) - modelId = 'anthropic.claude-v2:1' + modelId = 'anthropic.claude-haiku-4-5-20251001-v1:0' accept = 'application/json' contentType = 'application/json' response = brt.invoke_model(body=llm_request_body, modelId=modelId, accept=accept, contentType=contentType) - response_body = json.loads(response.get('body').read()) - - llm_result = response_body.get('completion') + #llm_result = response_body.get('completion') + llm_result = response_body.get('content', [{}])[0].get('text', '') return llm_result,es_response def reviews_analytis(input_text, language, region, opensearch_host, model_id): @@ -135,21 +137,23 @@ def reviews_analytis(input_text, language, region, opensearch_host, model_id): llm_prompt = 'Human: 你现在是一个导购客服,需要帮助客户分析商品的评价,根据商品过去的评论信息,给客户做评论总结,主要关注商品的评分,评论内容的情绪表达. 客户的问题如下: ' + input_text + ',你必须基于以下商品评价信息进行总结.适当的时候如果客户问题不清晰,可以反问一些关键信息.' + str(es_res) + ' Assistant:' llm_request_body = json.dumps({ - "prompt": llm_prompt, - "max_tokens_to_sample": 4000, + "anthropic_version": "bedrock-2023-05-31", + "messages": [ + {"role": "user", "content": llm_prompt} + ], + "max_tokens": 4000, "temperature": 0.1, "top_p": 0.9, }) - modelId = 'anthropic.claude-v2:1' + modelId = 'anthropic.claude-haiku-4-5-20251001-v1:0' accept = 'application/json' contentType = 'application/json' response = brt.invoke_model(body=llm_request_body, modelId=modelId, accept=accept, contentType=contentType) - response_body = json.loads(response.get('body').read()) - llm_result = response_body.get('completion') + llm_result = response_body.get('content', [{}])[0].get('text', '') return llm_result,es_response diff --git a/static/files/dynamodb-opensearch-zetl/OpenSearchPipeline/credentials.sh b/static/files/dynamodb-opensearch-zetl/OpenSearchPipeline/credentials.sh index 2e009c5..13de575 100644 --- a/static/files/dynamodb-opensearch-zetl/OpenSearchPipeline/credentials.sh +++ b/static/files/dynamodb-opensearch-zetl/OpenSearchPipeline/credentials.sh @@ -1,3 +1,5 @@ +aws cloudformation describe-stacks --stack-name dynamodb-opensearch-setup --query "Stacks[0].Outputs" --output table > CloudFormation-Outputs.txt +export OPENSEARCH_ENDPOINT=`aws cloudformation describe-stacks --stack-name dynamodb-opensearch-setup --query "Stacks[0].Outputs[?OutputKey=='OSDomainEndpoint'].OutputValue" --output text` TOKEN=$(curl -s -X PUT "http://169.254.169.254/latest/api/token" -H "X-aws-ec2-metadata-token-ttl-seconds: 21600") INSTANCE_ROLE=$(curl -s -H "X-aws-ec2-metadata-token: $TOKEN" http://169.254.169.254/latest/meta-data/iam/security-credentials/) RESULTS=$(curl -s -H "X-aws-ec2-metadata-token: $TOKEN" http://169.254.169.254/latest/meta-data/iam/security-credentials/${INSTANCE_ROLE}) @@ -18,4 +20,5 @@ echo "METADATA_AWS_ACCESS_KEY_ID: $AccessKeyId" echo "METADATA_AWS_SECRET_ACCESS_KEY: $SecretAccessKey" echo "METADATA_AWS_SESSION_TOKEN: $Token" echo "METADATA_AWS_REGION: $Region" -echo "METADATA_AWS_ROLE: $Role" \ No newline at end of file +echo "METADATA_AWS_ROLE: $Role" +echo "OPENSEARCH_ENDPOINT: $OPENSEARCH_ENDPOINT" \ No newline at end of file diff --git a/static/files/dynamodb-opensearch-zetl/dynamodb-opensearch-setup.yaml b/static/files/dynamodb-opensearch-zetl/dynamodb-opensearch-setup.yaml index cdf1e64..0eee81e 100644 --- a/static/files/dynamodb-opensearch-zetl/dynamodb-opensearch-setup.yaml +++ b/static/files/dynamodb-opensearch-zetl/dynamodb-opensearch-setup.yaml @@ -21,6 +21,106 @@ Resources: PasswordLength: 16 ExcludeCharacters: "\"'@/\\" + SecretPlaintextLambdaRole: + Type: AWS::IAM::Role + Properties: + AssumeRolePolicyDocument: + Version: 2012-10-17 + Statement: + - Effect: Allow + Principal: + Service: !Sub lambda.${AWS::URLSuffix} + Action: sts:AssumeRole + ManagedPolicyArns: + - !Sub arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole + Policies: + - PolicyName: AwsSecretsManager + PolicyDocument: + Version: 2012-10-17 + Statement: + - Effect: Allow + Action: + - secretsmanager:GetSecretValue + Resource: + - !Ref OpenSearchSecret + + SecretPlaintextLambda: + Type: AWS::Lambda::Function + Metadata: + cfn_nag: + rules_to_suppress: + - id: W58 + reason: Warning incorrectly reported. The role associated with the Lambda function has the AWSLambdaBasicExecutionRole managed policy attached, which includes permission to write CloudWatch Logs. See https://docs.aws.amazon.com/aws-managed-policy/latest/reference/AWSLambdaBasicExecutionRole.html + - id: W89 + reason: CloudFormation custom function does not need the scaffolding of a VPC, to do so would add unnecessary complexity + - id: W92 + reason: CloudFormation custom function does not need reserved concurrent executions, to do so would add unnecessary complexity + Properties: + Description: Return the value of the secret + Handler: index.lambda_handler + Runtime: python3.13 + MemorySize: 128 + Timeout: 10 + Architectures: + - arm64 + Role: !GetAtt SecretPlaintextLambdaRole.Arn + Code: + ZipFile: | + import boto3 + import json + import cfnresponse + import logging + + logger = logging.getLogger() + logger.setLevel(logging.INFO) + + def is_valid_json(json_string): + logger.debug(f'Calling is_valid_jason:{json_string}') + try: + json.loads(json_string) + logger.info('Secret is in json format') + return True + except json.JSONDecodeError: + logger.info('Secret is in string format') + return False + + def lambda_handler(event, context): + logger.debug(f'event: {event}') + logger.debug(f'context: {context}') + try: + if event['RequestType'] == 'Delete': + cfnresponse.send(event, context, cfnresponse.SUCCESS, responseData={}, reason='No action to take') + else: + resource_properties = event['ResourceProperties'] + secret_name = resource_properties['SecretArn'] + secrets_mgr = boto3.client('secretsmanager') + + logger.info('Getting secret from %s', secret_name) + + secret = secrets_mgr.get_secret_value(SecretId = secret_name) + logger.debug(f'secret: {secret}') + secret_value = secret['SecretString'] + + responseData = {} + if is_valid_json(secret_value): + responseData = secret_value + else: + responseData = {'secret': secret_value} + logger.debug(f'responseData: {responseData}') + logger.debug(f'type(responseData): {type(responseData)}') + cfnresponse.send(event, context, cfnresponse.SUCCESS, responseData=json.loads(responseData), reason='OK', noEcho=True) + except Exception as e: + logger.error(e) + cfnresponse.send(event, context, cfnresponse.FAILED, responseData={}, reason=str(e)) + + SecretPlaintext: + Type: Custom::SecretPlaintextLambda + Properties: + ServiceToken: !GetAtt SecretPlaintextLambda.Arn + ServiceTimeout: 15 + SecretArn: !Ref OpenSearchSecret + + ProductDetailsTable: Type: AWS::DynamoDB::Table Properties: @@ -63,7 +163,7 @@ Resources: Statement: - Effect: Allow Principal: - AWS: !ImportValue Cloud9RoleArn + AWS: !ImportValue CodeInstanceRoleArn Action: 'es:*' Resource: !Join @@ -88,17 +188,40 @@ Resources: MasterUserPassword: Fn::Sub: "{{resolve:secretsmanager:${OpenSearchSecret}::password}}" + PipelineBucket: + Type: AWS::S3::Bucket + Metadata: + cfn_nag: + rules_to_suppress: + - id: W35 + reason: Access logs aren't needed for this bucket + DeletionPolicy: Delete + Properties: + AccessControl: Private + BucketEncryption: + ServerSideEncryptionConfiguration: + - ServerSideEncryptionByDefault: + SSEAlgorithm: AES256 + PublicAccessBlockConfiguration: + BlockPublicAcls: true + BlockPublicPolicy: true + IgnorePublicAcls: true + RestrictPublicBuckets: true + Outputs: - SecretConsoleLink: - Description: URL to the secret in AWS Secrets Manager console - Value: !Sub "https://${AWS::Region}.console.aws.amazon.com/secretsmanager/secret?name=${OpenSearchSecret}®ion=${AWS::Region}" - Cloud9IdeUrl: - Description: URL to launch the Cloud9 IDE - Value: !ImportValue Cloud9IdeUrl + VSCodeUrl: + Description: URL to launch the VSCode IDE + Value: !ImportValue VSCodeUrl + VSCodePassword: + Description: VSCode Server Password (stored in AWS Secrets Manager) + Value: !ImportValue VSCodePassword OSDashboardsURL: Description: URL to the OpenSearch Dashboards Value: !Sub "https://${OpenSearchServiceDomain.DomainEndpoint}/_dashboards/" + OpenSearchPassword: + Description: OpenSearch Password (stored in AWS Secrets Manager) + Value: !GetAtt SecretPlaintext.password OSDomainEndpoint: Description: The endpoint of the OpenSearch domain. Value: !Sub "https://${OpenSearchServiceDomain.DomainEndpoint}" @@ -112,7 +235,7 @@ Outputs: Value: !GetAtt ProductDetailsTable.Arn Role: Description: "ARN of the Role used to provide access" - Value: !ImportValue Cloud9RoleArn + Value: !ImportValue CodeInstanceRoleArn S3Bucket: Description: "Name of the S3 Bucket" - Value: !ImportValue Cloud9LogBucket \ No newline at end of file + Value: !Ref PipelineBucket \ No newline at end of file