diff --git a/msk-lambda-schema-avro-java-sam/kafka_event_consumer_function/src/main/java/com/amazonaws/services/lambda/samples/events/msk/HandlerMSK.java b/msk-lambda-schema-avro-java-sam/kafka_event_consumer_function/src/main/java/com/amazonaws/services/lambda/samples/events/msk/HandlerMSK.java deleted file mode 100644 index c5ddbdc7e..000000000 --- a/msk-lambda-schema-avro-java-sam/kafka_event_consumer_function/src/main/java/com/amazonaws/services/lambda/samples/events/msk/HandlerMSK.java +++ /dev/null @@ -1,275 +0,0 @@ -//Lambda Runtime delivers a batch of messages to the lambda function -//Each batch of messages has two fields EventSource and EventSourceARN -//Each batch of messages also has a field called Records -//The Records is a map with multiple keys and values -//Each key is a combination of the Topic Name and the Partition Number -//One batch of messages can contain messages from multiple partitions - -/* -To simplify representing a batch of Kafka messages as a list of messages -We have created a Java class called KafkaMessage under the models package -Here we are mapping the structure of an incoming Kafka event to a list of -objects of the KafkaMessage class -*/ - -package com.amazonaws.services.lambda.samples.events.msk; - -import com.amazonaws.services.lambda.runtime.Context; -import com.amazonaws.services.lambda.runtime.LambdaLogger; -import com.amazonaws.services.lambda.runtime.RequestHandler; -import com.amazonaws.services.lambda.runtime.events.KafkaEvent; -import com.amazonaws.services.lambda.runtime.events.KafkaEvent.KafkaEventRecord; - -import java.util.ArrayList; -import java.util.Base64; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; - -public class HandlerMSK implements RequestHandler{ - //We initialize an empty list of the KafkaMessage class - List listOfMessages = new ArrayList(); - Gson gson = new GsonBuilder().setPrettyPrinting().create(); - @Override - public String handleRequest(KafkaEvent event, Context context) { - LambdaLogger logger = context.getLogger(); - logger.log("========== LAMBDA FUNCTION STARTED =========="); - logger.log("Event received: " + gson.toJson(event)); - - String response = new String("200 OK"); - this.listOfMessages = new ArrayList(); - - // Counters for zip code patterns - int zip1000Count = 0; - int zip2000Count = 0; - //Incoming KafkaEvent object has a property called records that is a map - //Each key in the map is a combination of a topic and a partition - Map> record=event.getRecords(); - - if (record == null) { - logger.log("WARNING: Event records map is null"); - return response; - } - - logger.log("Records map size: " + record.size()); - - Set keySet = record.keySet(); - logger.log("Key set size: " + keySet.size()); - logger.log("Keys: " + keySet); - - Iterator iterator = keySet.iterator(); - //We iterate through each of the keys in the map - while (iterator.hasNext()) { - String thisKey=(String)iterator.next(); - logger.log("Processing key: " + thisKey); - - //Using the key we retrieve the value of the map which is a list of KafkaEventRecord - //One object of KafkaEventRecord represents an individual Kafka message - List thisListOfRecords = record.get(thisKey); - - if (thisListOfRecords == null) { - logger.log("WARNING: Record list for key " + thisKey + " is null"); - continue; - } - - logger.log("Record list size for key " + thisKey + ": " + thisListOfRecords.size()); - - //We now iterate through the list of KafkaEventRecords - for(KafkaEventRecord thisRecord : thisListOfRecords) { - logger.log("Processing record..."); - - /* - We initialize a new object of the KafkaMessage class which is a simplified representation in our models package - We then get the fields from each kafka message in the object of KafkaEventRecord class and set them to the fields - of the KafkaRecord class - */ - KafkaMessage thisMessage = new KafkaMessage(); - thisMessage.setTopic(thisRecord.getTopic()); - thisMessage.setPartition(thisRecord.getPartition()); - thisMessage.setOffset(thisRecord.getOffset()); - thisMessage.setTimestamp(thisRecord.getTimestamp()); - thisMessage.setTimestampType(thisRecord.getTimestampType()); - - logger.log("Record metadata - Topic: " + thisRecord.getTopic() + - ", Partition: " + thisRecord.getPartition() + - ", Offset: " + thisRecord.getOffset()); - - String key = thisRecord.getKey(); - String value = thisRecord.getValue(); - - logger.log("Key (base64): " + key); - logger.log("Value (base64): " + value); - - String decodedKey = "null"; - String decodedValue = "null"; - //the key and value inside a kafka message are base64 encrypted and will need to be decrypted - if (null != key) { - logger.log("Decoding key..."); - try { - byte[] decodedKeyBytes = Base64.getDecoder().decode(key); - decodedKey = new String(decodedKeyBytes); - logger.log("Decoded key: " + decodedKey); - } catch (Exception e) { - logger.log("ERROR decoding key: " + e.getMessage()); - } - } else { - logger.log("Key is null"); - } - - if (null != value) { - logger.log("Decoding value..."); - try { - byte[] decodedValueBytes = Base64.getDecoder().decode(value); - logger.log("Value decoded, length: " + decodedValueBytes.length + " bytes"); - - // Print the complete message in hex format - logger.log("Complete message in hex format:"); - logger.log(bytesToHexString(decodedValueBytes, 0)); - - try { - decodedValue = new String(decodedValueBytes); - logger.log("Decoded value as string: " + (decodedValue.length() > 100 ? decodedValue.substring(0, 100) + "..." : decodedValue)); - - // Add more detailed logging for AVRO messages - logger.log("=== AVRO MESSAGE DETAILS ==="); - logger.log("Message appears to be AVRO-formatted. Attempting to extract fields:"); - - // Try to extract some common fields from the AVRO binary data - // This is a simple approach to show some readable content - StringBuilder readableContent = new StringBuilder(); - for (int i = 0; i < decodedValueBytes.length; i++) { - // Skip non-printable characters - if (decodedValueBytes[i] >= 32 && decodedValueBytes[i] < 127) { - readableContent.append((char)decodedValueBytes[i]); - } - } - - String readableString = readableContent.toString(); - logger.log("Readable content extracted from AVRO: " + readableString); - - // Check for zip code patterns - if (readableString.contains("1000")) { - logger.log("FOUND ZIP CODE STARTING WITH 1000"); - } - if (readableString.contains("2000")) { - logger.log("FOUND ZIP CODE STARTING WITH 2000"); - } - - logger.log("=== END AVRO MESSAGE DETAILS ==="); - } catch (Exception e) { - logger.log("ERROR converting bytes to string: " + e.getMessage()); - decodedValue = "Error decoding: " + e.getMessage(); - } - } catch (Exception e) { - logger.log("ERROR decoding value: " + e.getMessage()); - e.printStackTrace(); - } - } else { - logger.log("Value is null"); - } - - thisMessage.setKey(key); - thisMessage.setValue(value); - thisMessage.setDecodedKey(decodedKey); - thisMessage.setDecodedValue(decodedValue); - - //A kafka message can optionally have a list of headers - //the below code is to get the headers, iterate through each header and get its key and value - List headersInThisMessage = new ArrayList(); - List> headers = thisRecord.getHeaders(); - - if (headers != null) { - logger.log("Headers count: " + headers.size()); - - for (Map thisHeader : headers) { - Set thisHeaderKeys = thisHeader.keySet(); - Iterator thisHeaderKeysIterator = thisHeaderKeys.iterator(); - while (thisHeaderKeysIterator.hasNext()) { - String thisHeaderKey = thisHeaderKeysIterator.next(); - byte[] thisHeaderValue = (byte[])thisHeader.get(thisHeaderKey); - String thisHeaderValueString = new String(thisHeaderValue); - KafkaHeader thisMessageHeader = new KafkaHeader(); - thisMessageHeader.setKey(thisHeaderKey); - thisMessageHeader.setValue(thisHeaderValueString); - headersInThisMessage.add(thisMessageHeader); - logger.log("Header - Key: " + thisHeaderKey + ", Value: " + thisHeaderValueString); - } - } - } else { - logger.log("No headers in message"); - } - - thisMessage.setHeaders(headersInThisMessage); - listOfMessages.add(thisMessage); - - // Below we are logging the particular kafka message in string format using the toString method - // as well as in Json format using gson.toJson function - logger.log("Received this message from Kafka - " + thisMessage.toString()); - logger.log("Message in JSON format : " + gson.toJson(thisMessage)); - - // Add a more readable summary of the message - logger.log("=== MESSAGE SUMMARY ==="); - logger.log("Topic: " + thisMessage.getTopic()); - logger.log("Partition: " + thisMessage.getPartition()); - logger.log("Offset: " + thisMessage.getOffset()); - logger.log("Key: " + thisMessage.getDecodedKey()); - - // Check for zip code patterns in the decoded value - String decodedValueStr = thisMessage.getDecodedValue(); - if (decodedValueStr != null) { - if (decodedValueStr.contains("1000")) { - logger.log("ZIP CODE: Found 1000 pattern in message"); - zip1000Count++; - } - if (decodedValueStr.contains("2000")) { - logger.log("ZIP CODE: Found 2000 pattern in message"); - zip2000Count++; - } - } - - logger.log("=== END MESSAGE SUMMARY ==="); - } - } - logger.log("All Messages in this batch = " + gson.toJson(listOfMessages)); - - // Log summary of zip code distribution - logger.log("========== ZIP CODE DISTRIBUTION SUMMARY =========="); - logger.log("Messages with zip code containing 1000: " + zip1000Count); - logger.log("Messages with zip code containing 2000: " + zip2000Count); - logger.log("Other messages: " + (listOfMessages.size() - zip1000Count - zip2000Count)); - logger.log("===================================================="); - - logger.log("========== LAMBDA FUNCTION COMPLETED =========="); - return response; - } - - /** - * Convert byte array to hexadecimal string representation - * - * @param bytes Byte array to convert - * @param maxLength Maximum number of bytes to convert (0 for all) - * @return Hexadecimal string representation - */ - private String bytesToHexString(byte[] bytes, int maxLength) { - StringBuilder sb = new StringBuilder(); - int length = maxLength > 0 && maxLength < bytes.length ? maxLength : bytes.length; - - for (int i = 0; i < length; i++) { - sb.append(String.format("%02X", bytes[i])); - if (i % 16 == 15) { - sb.append("\n"); - } else if (i % 4 == 3) { - sb.append(" "); - } - } - - if (maxLength > 0 && length < bytes.length) { - sb.append("... (").append(bytes.length - length).append(" more bytes)"); - } - - return sb.toString(); - } -} diff --git a/msk-lambda-schema-avro-java-sam/kafka_event_consumer_function/src/main/java/com/amazonaws/services/lambda/samples/events/msk/SimpleHandler.java b/msk-lambda-schema-avro-java-sam/kafka_event_consumer_function/src/main/java/com/amazonaws/services/lambda/samples/events/msk/SimpleHandler.java deleted file mode 100644 index 03c4e5092..000000000 --- a/msk-lambda-schema-avro-java-sam/kafka_event_consumer_function/src/main/java/com/amazonaws/services/lambda/samples/events/msk/SimpleHandler.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.amazonaws.services.lambda.samples.events.msk; - -import com.amazonaws.services.lambda.runtime.Context; -import com.amazonaws.services.lambda.runtime.RequestHandler; - -public class SimpleHandler implements RequestHandler { - - @Override - public String handleRequest(Object event, Context context) { - System.out.println("=== SIMPLE HANDLER CALLED ==="); - System.out.println("Event: " + event); - System.out.println("Event class: " + (event != null ? event.getClass().getName() : "null")); - System.out.println("Context: " + context.getFunctionName()); - System.out.println("=== SIMPLE HANDLER END ==="); - - return "Simple handler executed successfully"; - } -} diff --git a/msk-lambda-schema-avro-java-sam/kafka_event_consumer_function/src/test/java/com/amazonaws/services/lambda/samples/events/msk/HandlerMSKTest.java b/msk-lambda-schema-avro-java-sam/kafka_event_consumer_function/src/test/java/com/amazonaws/services/lambda/samples/events/msk/HandlerMSKTest.java deleted file mode 100644 index 65bd543d3..000000000 --- a/msk-lambda-schema-avro-java-sam/kafka_event_consumer_function/src/test/java/com/amazonaws/services/lambda/samples/events/msk/HandlerMSKTest.java +++ /dev/null @@ -1,70 +0,0 @@ -package com.amazonaws.services.lambda.samples.events.msk; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -import org.junit.jupiter.api.Test; - -import com.amazonaws.services.lambda.runtime.Context; -import com.amazonaws.services.lambda.runtime.events.KafkaEvent; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; - - -class HandlerMSKTest { - private static final String kafkaEventJson = "{\n" - + " \"records\":{\n" - + " \"myTopic-0\":[\n" - + " {\n" - + " \"topic\":\"myTopic\",\n" - + " \"partition\":0,\n" - + " \"offset\":250,\n" - + " \"timestamp\":1678072110111,\n" - + " \"timestampType\":\"CREATE_TIME\",\n" - + " \"value\":\"Zg==\",\n" - + " \"headers\":[\n" - + " \n" - + " ]\n" - + " },\n" - + " {\n" - + " \"topic\":\"myTopic\",\n" - + " \"partition\":0,\n" - + " \"offset\":251,\n" - + " \"timestamp\":1678072111086,\n" - + " \"timestampType\":\"CREATE_TIME\",\n" - + " \"value\":\"Zw==\",\n" - + " \"headers\":[\n" - + " \n" - + " ]\n" - + " }\n" - + " ]\n" - + " },\n" - + " \"eventSource\":\"aws:kafka\",\n" - + " \"eventSourceArn\":\"arn:aws:kafka:us-west-2:123456789012:cluster/MSKWorkshopCluster/a93759a9-c9d0-4952-984c-492c6bfa2be8-13\",\n" - + " \"bootstrapServers\":\"b-2.mskworkshopcluster.z9kc4f.c13.kafka.us-west-2.amazonaws.com:9098,b-3.mskworkshopcluster.z9kc4f.c13.kafka.us-west-2.amazonaws.com:9098,b-1.mskworkshopcluster.z9kc4f.c13.kafka.us-west-2.amazonaws.com:9098\"\n" - + "}"; - - @Test - void invokeTest() { - Gson gson = new GsonBuilder().setPrettyPrinting().create(); - KafkaEvent event = gson.fromJson(kafkaEventJson, KafkaEvent.class); - Context context = new TestContext(); - HandlerMSK handler = new HandlerMSK(); - String result = handler.handleRequest(event, context); - assertEquals(result, "200 OK"); - assertEquals(handler.listOfMessages.size(), 2); - assertEquals(handler.listOfMessages.get(0).getTopic(), "myTopic"); - assertEquals(handler.listOfMessages.get(0).getPartition(), 0); - assertEquals(handler.listOfMessages.get(0).getOffset(), 250L); - assertEquals(handler.listOfMessages.get(0).getTimestamp(), 1678072110111L); - assertEquals(handler.listOfMessages.get(0).getTimestampType(), "CREATE_TIME"); - assertEquals(handler.listOfMessages.get(0).getDecodedKey(), "null"); - assertEquals(handler.listOfMessages.get(0).getDecodedValue(), "f"); - assertEquals(handler.listOfMessages.get(1).getTopic(), "myTopic"); - assertEquals(handler.listOfMessages.get(1).getPartition(), 0); - assertEquals(handler.listOfMessages.get(1).getOffset(), 251L); - assertEquals(handler.listOfMessages.get(1).getTimestamp(), 1678072111086L); - assertEquals(handler.listOfMessages.get(1).getTimestampType(), "CREATE_TIME"); - assertEquals(handler.listOfMessages.get(1).getDecodedKey(), "null"); - assertEquals(handler.listOfMessages.get(1).getDecodedValue(), "g"); - } -} diff --git a/msk-lambda-schema-avro-java-sam/template.yaml b/msk-lambda-schema-avro-java-sam/template.yaml deleted file mode 100644 index c416c991d..000000000 --- a/msk-lambda-schema-avro-java-sam/template.yaml +++ /dev/null @@ -1,233 +0,0 @@ -AWSTemplateFormatVersion: '2010-09-09' -Transform: AWS::Serverless-2016-10-31 -Description: > - kafka_event_consumer_and_producer_functions - - Sample SAM Template for MSK consumer and AVRO producer with IAM auth - -# More info about Globals: https://github.com/awslabs/serverless-application-model/blob/master/docs/globals.rst -Globals: - Function: - Timeout: 15 - -Resources: - # SQS Queue to use as Dead Letter Queue for the MSK event source mapping - ConsumerDLQ: - Type: AWS::SQS::Queue - Properties: - MessageRetentionPeriod: 1209600 # 14 days (maximum retention period) - VisibilityTimeout: 300 # 5 minutes - Tags: - - Key: Purpose - Value: MSKConsumerDLQ - - LambdaMSKConsumerJavaFunction: - Type: AWS::Serverless::Function # More info about Function Resource: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#awsserverlessfunction - Properties: - CodeUri: kafka_event_consumer_function - Handler: com.amazonaws.services.lambda.samples.events.msk.AvroKafkaHandler::handleRequest - Runtime: java21 - Architectures: - - x86_64 - MemorySize: 512 - VpcConfig: - SecurityGroupIds: !Ref SecurityGroupIds - SubnetIds: !Ref SubnetIds - Environment: # More info about Env Vars: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#environment-object - Variables: - PARAM1: VALUE - JAVA_TOOL_OPTIONS: -XX:+TieredCompilation -XX:TieredStopAtLevel=1 # More info about tiered compilation https://aws.amazon.com/blogs/compute/optimizing-aws-lambda-function-performance-for-java/ - POWERTOOLS_LOG_LEVEL: TRACE - POWERTOOLS_SERVICE_NAME: kafka_consumer - Events: - MSKEvent: - Type: MSK - Properties: - StartingPosition: LATEST - BatchSize: 1 - MaximumBatchingWindowInSeconds: 1 - Stream: !Join [ '', ["arn:", "aws:", "kafka:", !Ref "AWS::Region" , ":" ,!Ref "AWS::AccountId", ":", "cluster/", !Ref MSKClusterName, "/" , !Ref MSKClusterId] ] - Topics: - - !Ref MSKTopic - DestinationConfig: - OnFailure: - Destination: !GetAtt ConsumerDLQ.Arn - FilterCriteria: - Filters: - - Pattern: '{"value": {"zip": [ { "prefix": "2000" } ]}}' - Policies: - - Statement: - - Sid: KafkaClusterPermissionsPolicy - Effect: Allow - Action: - - kafka-cluster:Connect - - kafka-cluster:DescribeGroup - - kafka-cluster:DescribeCluster - - kafka-cluster:AlterCluster - - kafka-cluster:AlterClusterDynamicConfiguration - - kafka-cluster:WriteDataIdempotently - - kafka-cluster:AlterGroup - - kafka-cluster:DescribeTopic - - kafka-cluster:ReadData - - kafka-cluster:DescribeClusterDynamicConfiguration - Resource: - - !Join ['', ["arn:", "aws:", "kafka:", !Ref "AWS::Region", ":", !Ref "AWS::AccountId", ":", "cluster/", !Ref MSKClusterName, "/" , !Ref MSKClusterId]] - - !Join ['', ["arn:", "aws:", "kafka:", !Ref "AWS::Region", ":", !Ref "AWS::AccountId", ":", "topic/", !Ref MSKClusterName, "/" , !Ref MSKClusterId, "/*"]] - - !Join ['', ["arn:", "aws:", "kafka:", !Ref "AWS::Region", ":", !Ref "AWS::AccountId", ":", "group/", !Ref MSKClusterName, "/" , !Ref MSKClusterId, "/*"]] - - - Sid: KafkaPermissionsPolicy - Effect: Allow - Action: - - kafka:DescribeClusterV2 - - kafka:GetBootstrapBrokers - Resource: '*' - - - Sid: EC2PermissionsPolicy - Effect: Allow - Action: - - ec2:DescribeSecurityGroups - - ec2:DescribeSubnets - - ec2:DescribeVpcs - - ec2:CreateNetworkInterface - - ec2:DescribeNetworkInterfaces - - ec2:DeleteNetworkInterface - Resource: '*' - - - Sid: GlueSchemaRegistryPermissionsPolicy - Effect: Allow - Action: - - glue:GetSchemaByDefinition - - glue:GetSchemaVersion - - glue:GetRegistry - - glue:ListSchemas - - glue:ListSchemaVersions - - glue:RegisterSchemaVersion - - glue:PutSchemaVersionMetadata - - glue:GetSchemaVersionsDiff - - glue:QuerySchemaVersionMetadata - Resource: '*' - - - Sid: SQSPermissionsPolicy - Effect: Allow - Action: - - sqs:SendMessage - Resource: !GetAtt ConsumerDLQ.Arn - - VPCAccessPolicy: {} - - LambdaMSKProducerJavaFunction: - Type: AWS::Serverless::Function - Properties: - CodeUri: kafka_event_producer_function - Handler: com.amazonaws.services.lambda.samples.events.msk.AvroProducerHandler::handleRequest - Runtime: java21 - Timeout: 300 - Architectures: - - x86_64 - MemorySize: 512 - VpcConfig: - SecurityGroupIds: !Ref SecurityGroupIds - SubnetIds: !Ref SubnetIds - Environment: - Variables: - MSK_CLUSTER_ARN: !Join [ '', ["arn:", "aws:", "kafka:", !Ref "AWS::Region" , ":" ,!Ref "AWS::AccountId", ":", "cluster/", !Ref MSKClusterName, "/" , !Ref MSKClusterId] ] - MSK_TOPIC: !Ref MSKTopic - REGISTRY_NAME: !Ref GlueSchemaRegistryName - CONTACT_SCHEMA_NAME: !Ref ContactSchemaName - JAVA_TOOL_OPTIONS: -XX:+TieredCompilation -XX:TieredStopAtLevel=1 - Policies: - - Statement: - - Sid: KafkaClusterPermissionsPolicy - Effect: Allow - Action: - - kafka-cluster:Connect - - kafka-cluster:DescribeCluster - - kafka-cluster:WriteData - - kafka-cluster:DescribeTopic - Resource: - - !Join ['', ["arn:", "aws:", "kafka:", !Ref "AWS::Region", ":", !Ref "AWS::AccountId", ":", "cluster/", !Ref MSKClusterName, "/" , !Ref MSKClusterId]] - - !Join ['', ["arn:", "aws:", "kafka:", !Ref "AWS::Region", ":", !Ref "AWS::AccountId", ":", "topic/", !Ref MSKClusterName, "/" , !Ref MSKClusterId, "/*"]] - - - Sid: KafkaPermissionsPolicy - Effect: Allow - Action: - - kafka:DescribeClusterV2 - - kafka:GetBootstrapBrokers - Resource: '*' - - - Sid: EC2PermissionsPolicy - Effect: Allow - Action: - - ec2:DescribeSecurityGroups - - ec2:DescribeSubnets - - ec2:DescribeVpcs - - ec2:CreateNetworkInterface - - ec2:DescribeNetworkInterfaces - - ec2:DeleteNetworkInterface - Resource: '*' - - - Sid: GlueSchemaRegistryPermissionsPolicy - Effect: Allow - Action: - - glue:GetSchemaByDefinition - - glue:GetSchemaVersion - - glue:GetRegistry - - glue:ListSchemas - - glue:ListSchemaVersions - - glue:GetSchemaVersionsDiff - - glue:QuerySchemaVersionMetadata - - glue:RegisterSchemaVersion - - glue:PutSchemaVersionMetadata - - glue:CreateSchema - - glue:CreateRegistry - Resource: - - !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:registry/*" - - !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:schema/*/*" - - VPCAccessPolicy: {} - -Parameters: - MSKClusterName: - Type: String - Description: Enter the name of the MSK Cluster - Default: CLUSTER_NAME - MSKClusterId: - Type: String - Description: Enter the ID of the MSK Cluster - Default: CLUSTER_ID - MSKTopic: - Type: String - Description: Enter the name of the MSK Topic - Default: KAFKA_TOPIC - GlueSchemaRegistryName: - Type: String - Description: Enter the name of the Glue Schema Registry - Default: GLUE_SCHEMA_REGISTRY_NAME - ContactSchemaName: - Type: String - Description: Enter the name of the Contact Schema - Default: AVRO_SCHEMA - VpcId: - Type: String - Description: Enter the VPC ID where the MSK cluster is deployed - Default: VPC_ID - SubnetIds: - Type: CommaDelimitedList - Description: Enter the subnet IDs where the MSK cluster is deployed (comma-separated) - Default: SUBNET_IDS - SecurityGroupIds: - Type: CommaDelimitedList - Description: Enter the security group IDs that allow access to the MSK cluster (comma-separated) - Default: LAMBDA_SECURITY_GROUP_ID - -Outputs: - MSKConsumerLambdaFunction: - Description: "Topic Consumer Lambda Function ARN" - Value: !GetAtt LambdaMSKConsumerJavaFunction.Arn - MSKProducerLambdaFunction: - Description: "AVRO Producer Lambda Function ARN" - Value: !GetAtt LambdaMSKProducerJavaFunction.Arn - ConsumerDLQUrl: - Description: "URL of the Dead Letter Queue for the MSK Consumer" - Value: !Ref ConsumerDLQ - ConsumerDLQArn: - Description: "ARN of the Dead Letter Queue for the MSK Consumer" - Value: !GetAtt ConsumerDLQ.Arn diff --git a/msk-lambda-schema-avro-java-sam/template_original.yaml b/msk-lambda-schema-avro-java-sam/template_original.yaml index e77964ce2..c79adc47f 100644 --- a/msk-lambda-schema-avro-java-sam/template_original.yaml +++ b/msk-lambda-schema-avro-java-sam/template_original.yaml @@ -25,7 +25,7 @@ Resources: Type: AWS::Serverless::Function # More info about Function Resource: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#awsserverlessfunction Properties: CodeUri: kafka_event_consumer_function - Handler: com.amazonaws.services.lambda.samples.events.msk.HandlerMSK::handleRequest + Handler: com.amazonaws.services.lambda.samples.events.msk.AvroKafkaHandler::handleRequest Runtime: java21 Architectures: - x86_64 @@ -37,6 +37,8 @@ Resources: Variables: PARAM1: VALUE JAVA_TOOL_OPTIONS: -XX:+TieredCompilation -XX:TieredStopAtLevel=1 # More info about tiered compilation https://aws.amazon.com/blogs/compute/optimizing-aws-lambda-function-performance-for-java/ + POWERTOOLS_LOG_LEVEL: TRACE + POWERTOOLS_SERVICE_NAME: kafka_consumer Events: MSKEvent: Type: MSK