From 126917cb6d863672ef2ff1bdaea6fefab13ebc45 Mon Sep 17 00:00:00 2001 From: Vaibhav Jain Date: Sun, 22 Jun 2025 19:12:07 -0400 Subject: [PATCH 01/10] Moved schema registry to SAM --- .../MSKAndKafkaClientEC2.yaml | 52 +--------------- msk-lambda-schema-avro-java-sam/README.md | 9 +-- msk-lambda-schema-avro-java-sam/template.yaml | 57 ++++++++++++++++- .../template_original.yaml | 61 +++++++++++++++++-- 4 files changed, 117 insertions(+), 62 deletions(-) diff --git a/msk-lambda-schema-avro-java-sam/MSKAndKafkaClientEC2.yaml b/msk-lambda-schema-avro-java-sam/MSKAndKafkaClientEC2.yaml index 53cb70b51..89b0b338f 100644 --- a/msk-lambda-schema-avro-java-sam/MSKAndKafkaClientEC2.yaml +++ b/msk-lambda-schema-avro-java-sam/MSKAndKafkaClientEC2.yaml @@ -31,12 +31,7 @@ Parameters: ServerlessLandGithubLocation: Type: String Default: https://github.com/aws-samples/serverless-patterns/ - ContactSchemaName: - Type: String - Default: ContactSchema - GlueSchemaRegistryForMSKName: - Type: String - Default: GlueSchemaRegistryForMSK + Conditions: CreateProvisionedCluster: !Equals @@ -276,39 +271,6 @@ Resources: ToPort: 22 SourceSecurityGroupId: !GetAtt KafkaClientInstanceSecurityGroup.GroupId - MSKGlueRegistry: - Type: AWS::Glue::Registry - Properties: - Name: !Ref GlueSchemaRegistryForMSKName - Description: "Registry for storing schemas related to MSK" - - ContactSchema: - Type: AWS::Glue::Schema - Properties: - Name: !Ref ContactSchemaName - Compatibility: BACKWARD - DataFormat: AVRO - Registry: - Arn: !GetAtt MSKGlueRegistry.Arn - SchemaDefinition: | - { - "type": "record", - "name": "Contact", - "fields": [ - {"name": "firstname", "type": "string"}, - {"name": "lastname", "type": "string"}, - {"name": "company", "type": "string"}, - {"name": "street", "type": "string"}, - {"name": "city", "type": "string"}, - {"name": "county", "type": "string"}, - {"name": "state", "type": "string"}, - {"name": "zip", "type": "string"}, - {"name": "homePhone", "type": "string"}, - {"name": "cellPhone", "type": "string"}, - {"name": "email", "type": "string"}, - {"name": "website", "type": "string"} - ] - } KafkaClientEC2InstanceProvisioned: Condition: CreateProvisionedCluster @@ -556,8 +518,6 @@ Resources: cp template_original.yaml template.yaml sudo chown -R ec2-user . - GLUE_SCHEMA_REGISTRY_NAME=${glue_registry_name} - CONTACT_SCHEMA=${contact_schema_name} VPC_ID=${vpcid} LAMBDA_SECURITY_GROUP_ID=${securitygroup} PRIVATE_SUBNET_1=${privatesubnetone} @@ -570,8 +530,6 @@ Resources: sed -i "s/CLUSTER_ID/$CLUSTER_ID/g" template.yaml sed -i "s/KAFKA_TOPIC/$KAFKA_TOPIC/g" template.yaml sed -i "s/JAVA_VERSION/$JAVA_VERSION/g" template.yaml - sed -i "s/GLUE_SCHEMA_REGISTRY_NAME/$GLUE_SCHEMA_REGISTRY_NAME/g" template.yaml - sed -i "s/AVRO_SCHEMA/$CONTACT_SCHEMA/g" template.yaml sed -i "s/VPC_ID/$VPC_ID/g" template.yaml sed -i "s/LAMBDA_SECURITY_GROUP_ID/$LAMBDA_SECURITY_GROUP_ID/g" template.yaml sed -i "s/SUBNET_IDS/$SUBNET_IDS/g" template.yaml @@ -602,8 +560,6 @@ Resources: privatesubnettwo: !Ref PrivateSubnetMSKTwo privatesubnetthree: !Ref PrivateSubnetMSKThree securitygroup: !GetAtt MSKSecurityGroup.GroupId - glue_registry_name: !Ref GlueSchemaRegistryForMSKName - contact_schema_name: !Ref ContactSchemaName KafkaClientEC2InstanceServerless: @@ -852,8 +808,6 @@ Resources: cp template_original.yaml template.yaml sudo chown -R ec2-user . - GLUE_SCHEMA_REGISTRY_NAME=${glue_registry_name} - CONTACT_SCHEMA=${contact_schema_name} VPC_ID=${vpcid} LAMBDA_SECURITY_GROUP_ID=${securitygroup} PRIVATE_SUBNET_1=${privatesubnetone} @@ -867,8 +821,6 @@ Resources: sed -i "s/CLUSTER_ID/$CLUSTER_ID/g" template.yaml sed -i "s/KAFKA_TOPIC/$KAFKA_TOPIC/g" template.yaml sed -i "s/JAVA_VERSION/$JAVA_VERSION/g" template.yaml - sed -i "s/GLUE_SCHEMA_REGISTRY_NAME/$GLUE_SCHEMA_REGISTRY_NAME/g" template.yaml - sed -i "s/AVRO_SCHEMA/$CONTACT_SCHEMA/g" template.yaml sed -i "s/VPC_ID/$VPC_ID/g" template.yaml sed -i "s/LAMBDA_SECURITY_GROUP_ID/$LAMBDA_SECURITY_GROUP_ID/g" template.yaml sed -i "s/SUBNET_IDS/$SUBNET_IDS/g" template.yaml @@ -899,8 +851,6 @@ Resources: privatesubnettwo: !Ref PrivateSubnetMSKTwo privatesubnetthree: !Ref PrivateSubnetMSKThree securitygroup: !GetAtt MSKSecurityGroup.GroupId - glue_registry_name: !Ref GlueSchemaRegistryForMSKName - contact_schema_name: !Ref ContactSchemaName diff --git a/msk-lambda-schema-avro-java-sam/README.md b/msk-lambda-schema-avro-java-sam/README.md index 97c1a48b7..bb2125ee5 100644 --- a/msk-lambda-schema-avro-java-sam/README.md +++ b/msk-lambda-schema-avro-java-sam/README.md @@ -4,7 +4,7 @@ This pattern is an example of Lambda functions that: 1. Consume messages from an Amazon Managed Streaming for Kafka (Amazon MSK) topic 2. Produce Avro-formatted messages to an Amazon MSK topic using Schema Registry -Both functions use IAM authentication to connect to the MSK Cluster and use AWS Glue Schema Registry for Avro schema management. +Both functions use IAM authentication to connect to the MSK Cluster and use AWS Glue Schema Registry for Avro schema management. The Glue Schema Registry and Contact Schema are created as part of the SAM deployment. This project contains source code and supporting files for a serverless application that you can deploy with the SAM CLI. It includes the following files and folders. @@ -12,7 +12,7 @@ This project contains source code and supporting files for a serverless applicat - `kafka_event_producer_function/src/main/java` - Code for the Avro producer Lambda function. - `events` - Invocation events that you can use to invoke the functions. - `kafka_event_consumer_function/src/test/java` - Unit tests for the consumer code. -- `template.yaml` - A template that defines the application's Lambda functions. +- `template.yaml` - A template that defines the application's Lambda functions, Glue Schema Registry, and Contact Schema. - `template_original.yaml` - The original template with placeholders that get replaced during deployment. - `MSKAndKafkaClientEC2.yaml` - A CloudFormation template file that can be used to deploy an MSK cluster and also deploy an EC2 machine with all pre-requisites already installed, so you can directly build and deploy the lambda functions and test them out. @@ -96,14 +96,15 @@ sam deploy --capabilities CAPABILITY_IAM --no-confirm-changeset --no-disable-rol The `sam deploy` command packages and deploys your application to AWS, with a series of prompts. > [!NOTE] -> The script retrieves the required parameters from the CloudFormation outputs in the AWS Console after deploying the `MSKAndKafkaClientEC2.yaml` template. These outputs contain all the necessary information for deploying the Lambda functions. If you connect to a different Kafka cluster, enter the values manually. +> The script retrieves the required parameters from the CloudFormation outputs in the AWS Console after deploying the `MSKAndKafkaClientEC2.yaml` template. These outputs contain the necessary information for deploying the Lambda functions. If you connect to a different Kafka cluster, enter the values manually. * **Stack Name**: The name of the stack to deploy to CloudFormation. This should be unique to your account and region, and a good starting point would be something matching your project name. * **AWS Region**: The AWS region you want to deploy your app to. * **Parameter MSKClusterName**: The name of the MSK Cluster. This will be `-cluster` from the CloudFormation template you deployed in the previous step. * **Parameter MSKClusterId**: The unique ID of the MSK Cluster. This can be found in the MSK console or extracted from the MSK ARN in the CloudFormation outputs. * **Parameter MSKTopic**: The Kafka topic on which the Lambda functions will produce and consume messages. You can find this in the CloudFormation outputs as `KafkaTopicForLambda` -* **Parameter ContactSchemaName**: The name of the schema to be used for the Avro serialization (default: ContactSchema). +* **Parameter GlueSchemaRegistryName**: The name of the Glue Schema Registry to be created (default: GlueSchemaRegistryForMSK). +* **Parameter ContactSchemaName**: The name of the Contact Schema to be created (default: ContactSchema). * **Parameter VpcId**: The ID of the VPC where the MSK cluster is deployed. You can find this in the CloudFormation outputs as `VPCId`. * **Parameter SubnetIds**: Comma-separated list of subnet IDs where the MSK cluster is deployed. You can find these in the CloudFormation outputs as `PrivateSubnetMSKOne`, `PrivateSubnetMSKTwo`, and `PrivateSubnetMSKThree`. * **Parameter SecurityGroupIds**: Comma-separated list of security group IDs that allow access to the MSK cluster. You can find this in the CloudFormation outputs as `SecurityGroupId`. diff --git a/msk-lambda-schema-avro-java-sam/template.yaml b/msk-lambda-schema-avro-java-sam/template.yaml index c416c991d..0075add40 100644 --- a/msk-lambda-schema-avro-java-sam/template.yaml +++ b/msk-lambda-schema-avro-java-sam/template.yaml @@ -11,6 +11,43 @@ Globals: Timeout: 15 Resources: + # Glue Schema Registry for storing Avro schemas + MSKGlueRegistry: + Type: AWS::Glue::Registry + Properties: + Name: !Ref GlueSchemaRegistryName + Description: "Registry for storing schemas related to MSK" + + # Contact Schema for Avro serialization/deserialization + ContactSchema: + Type: AWS::Glue::Schema + Properties: + Name: !Ref ContactSchemaName + Description: "Schema for Contact data in Avro format" + Compatibility: BACKWARD + DataFormat: AVRO + Registry: + Arn: !GetAtt MSKGlueRegistry.Arn + SchemaDefinition: | + { + "type": "record", + "name": "Contact", + "fields": [ + {"name": "firstName", "type": "string"}, + {"name": "lastName", "type": "string"}, + {"name": "company", "type": "string"}, + {"name": "address", "type": "string"}, + {"name": "city", "type": "string"}, + {"name": "county", "type": "string"}, + {"name": "state", "type": "string"}, + {"name": "zip", "type": "string"}, + {"name": "homePhone", "type": "string"}, + {"name": "cellPhone", "type": "string"}, + {"name": "email", "type": "string"}, + {"name": "website", "type": "string"} + ] + } + # SQS Queue to use as Dead Letter Queue for the MSK event source mapping ConsumerDLQ: Type: AWS::SQS::Queue @@ -105,7 +142,9 @@ Resources: - glue:PutSchemaVersionMetadata - glue:GetSchemaVersionsDiff - glue:QuerySchemaVersionMetadata - Resource: '*' + Resource: + - !GetAtt MSKGlueRegistry.Arn + - !GetAtt ContactSchema.Arn - Sid: SQSPermissionsPolicy Effect: Allow @@ -180,8 +219,8 @@ Resources: - glue:CreateSchema - glue:CreateRegistry Resource: - - !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:registry/*" - - !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:schema/*/*" + - !GetAtt MSKGlueRegistry.Arn + - !GetAtt ContactSchema.Arn - VPCAccessPolicy: {} Parameters: @@ -231,3 +270,15 @@ Outputs: ConsumerDLQArn: Description: "ARN of the Dead Letter Queue for the MSK Consumer" Value: !GetAtt ConsumerDLQ.Arn + GlueSchemaRegistryArn: + Description: "ARN of the Glue Schema Registry" + Value: !GetAtt MSKGlueRegistry.Arn + GlueSchemaRegistryName: + Description: "Name of the Glue Schema Registry" + Value: !Ref MSKGlueRegistry + ContactSchemaArn: + Description: "ARN of the Contact Schema" + Value: !GetAtt ContactSchema.Arn + ContactSchemaName: + Description: "Name of the Contact Schema" + Value: !Ref ContactSchema diff --git a/msk-lambda-schema-avro-java-sam/template_original.yaml b/msk-lambda-schema-avro-java-sam/template_original.yaml index e77964ce2..f15f54e53 100644 --- a/msk-lambda-schema-avro-java-sam/template_original.yaml +++ b/msk-lambda-schema-avro-java-sam/template_original.yaml @@ -11,6 +11,43 @@ Globals: Timeout: 15 Resources: + # Glue Schema Registry for storing Avro schemas + MSKGlueRegistry: + Type: AWS::Glue::Registry + Properties: + Name: !Ref GlueSchemaRegistryName + Description: "Registry for storing schemas related to MSK" + + # Contact Schema for Avro serialization/deserialization + ContactSchema: + Type: AWS::Glue::Schema + Properties: + Name: !Ref ContactSchemaName + Description: "Schema for Contact data in Avro format" + Compatibility: BACKWARD + DataFormat: AVRO + Registry: + Arn: !GetAtt MSKGlueRegistry.Arn + SchemaDefinition: | + { + "type": "record", + "name": "Contact", + "fields": [ + {"name": "firstName", "type": "string"}, + {"name": "lastName", "type": "string"}, + {"name": "company", "type": "string"}, + {"name": "address", "type": "string"}, + {"name": "city", "type": "string"}, + {"name": "county", "type": "string"}, + {"name": "state", "type": "string"}, + {"name": "zip", "type": "string"}, + {"name": "homePhone", "type": "string"}, + {"name": "cellPhone", "type": "string"}, + {"name": "email", "type": "string"}, + {"name": "website", "type": "string"} + ] + } + # SQS Queue to use as Dead Letter Queue for the MSK event source mapping ConsumerDLQ: Type: AWS::SQS::Queue @@ -25,7 +62,7 @@ Resources: Type: AWS::Serverless::Function # More info about Function Resource: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#awsserverlessfunction Properties: CodeUri: kafka_event_consumer_function - Handler: com.amazonaws.services.lambda.samples.events.msk.HandlerMSK::handleRequest + Handler: com.amazonaws.services.lambda.samples.events.msk.AvroKafkaHandler::handleRequest Runtime: java21 Architectures: - x86_64 @@ -37,6 +74,8 @@ Resources: Variables: PARAM1: VALUE JAVA_TOOL_OPTIONS: -XX:+TieredCompilation -XX:TieredStopAtLevel=1 # More info about tiered compilation https://aws.amazon.com/blogs/compute/optimizing-aws-lambda-function-performance-for-java/ + POWERTOOLS_LOG_LEVEL: TRACE + POWERTOOLS_SERVICE_NAME: kafka_consumer Events: MSKEvent: Type: MSK @@ -112,7 +151,9 @@ Resources: - glue:PutSchemaVersionMetadata - glue:GetSchemaVersionsDiff - glue:QuerySchemaVersionMetadata - Resource: '*' + Resource: + - !GetAtt MSKGlueRegistry.Arn + - !GetAtt ContactSchema.Arn - Sid: SQSPermissionsPolicy Effect: Allow @@ -187,8 +228,8 @@ Resources: - glue:CreateSchema - glue:CreateRegistry Resource: - - !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:registry/*" - - !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:schema/*/*" + - !GetAtt MSKGlueRegistry.Arn + - !GetAtt ContactSchema.Arn - VPCAccessPolicy: {} Parameters: @@ -238,3 +279,15 @@ Outputs: ConsumerDLQArn: Description: "ARN of the Dead Letter Queue for the MSK Consumer" Value: !GetAtt ConsumerDLQ.Arn + GlueSchemaRegistryArn: + Description: "ARN of the Glue Schema Registry" + Value: !GetAtt MSKGlueRegistry.Arn + GlueSchemaRegistryName: + Description: "Name of the Glue Schema Registry" + Value: !Ref MSKGlueRegistry + ContactSchemaArn: + Description: "ARN of the Contact Schema" + Value: !GetAtt ContactSchema.Arn + ContactSchemaName: + Description: "Name of the Contact Schema" + Value: !Ref ContactSchema From 5399dcdc2508b82b4bb33d71fdc803c87e891a83 Mon Sep 17 00:00:00 2001 From: Vaibhav Jain Date: Sun, 22 Jun 2025 19:21:29 -0400 Subject: [PATCH 02/10] deleted redundant files --- .../lambda/samples/events/msk/HandlerMSK.java | 275 ----------------- .../samples/events/msk/SimpleHandler.java | 18 -- .../samples/events/msk/HandlerMSKTest.java | 70 ----- msk-lambda-schema-avro-java-sam/template.yaml | 284 ------------------ 4 files changed, 647 deletions(-) delete mode 100644 msk-lambda-schema-avro-java-sam/kafka_event_consumer_function/src/main/java/com/amazonaws/services/lambda/samples/events/msk/HandlerMSK.java delete mode 100644 msk-lambda-schema-avro-java-sam/kafka_event_consumer_function/src/main/java/com/amazonaws/services/lambda/samples/events/msk/SimpleHandler.java delete mode 100644 msk-lambda-schema-avro-java-sam/kafka_event_consumer_function/src/test/java/com/amazonaws/services/lambda/samples/events/msk/HandlerMSKTest.java delete mode 100644 msk-lambda-schema-avro-java-sam/template.yaml diff --git a/msk-lambda-schema-avro-java-sam/kafka_event_consumer_function/src/main/java/com/amazonaws/services/lambda/samples/events/msk/HandlerMSK.java b/msk-lambda-schema-avro-java-sam/kafka_event_consumer_function/src/main/java/com/amazonaws/services/lambda/samples/events/msk/HandlerMSK.java deleted file mode 100644 index c5ddbdc7e..000000000 --- a/msk-lambda-schema-avro-java-sam/kafka_event_consumer_function/src/main/java/com/amazonaws/services/lambda/samples/events/msk/HandlerMSK.java +++ /dev/null @@ -1,275 +0,0 @@ -//Lambda Runtime delivers a batch of messages to the lambda function -//Each batch of messages has two fields EventSource and EventSourceARN -//Each batch of messages also has a field called Records -//The Records is a map with multiple keys and values -//Each key is a combination of the Topic Name and the Partition Number -//One batch of messages can contain messages from multiple partitions - -/* -To simplify representing a batch of Kafka messages as a list of messages -We have created a Java class called KafkaMessage under the models package -Here we are mapping the structure of an incoming Kafka event to a list of -objects of the KafkaMessage class -*/ - -package com.amazonaws.services.lambda.samples.events.msk; - -import com.amazonaws.services.lambda.runtime.Context; -import com.amazonaws.services.lambda.runtime.LambdaLogger; -import com.amazonaws.services.lambda.runtime.RequestHandler; -import com.amazonaws.services.lambda.runtime.events.KafkaEvent; -import com.amazonaws.services.lambda.runtime.events.KafkaEvent.KafkaEventRecord; - -import java.util.ArrayList; -import java.util.Base64; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; - -public class HandlerMSK implements RequestHandler{ - //We initialize an empty list of the KafkaMessage class - List listOfMessages = new ArrayList(); - Gson gson = new GsonBuilder().setPrettyPrinting().create(); - @Override - public String handleRequest(KafkaEvent event, Context context) { - LambdaLogger logger = context.getLogger(); - logger.log("========== LAMBDA FUNCTION STARTED =========="); - logger.log("Event received: " + gson.toJson(event)); - - String response = new String("200 OK"); - this.listOfMessages = new ArrayList(); - - // Counters for zip code patterns - int zip1000Count = 0; - int zip2000Count = 0; - //Incoming KafkaEvent object has a property called records that is a map - //Each key in the map is a combination of a topic and a partition - Map> record=event.getRecords(); - - if (record == null) { - logger.log("WARNING: Event records map is null"); - return response; - } - - logger.log("Records map size: " + record.size()); - - Set keySet = record.keySet(); - logger.log("Key set size: " + keySet.size()); - logger.log("Keys: " + keySet); - - Iterator iterator = keySet.iterator(); - //We iterate through each of the keys in the map - while (iterator.hasNext()) { - String thisKey=(String)iterator.next(); - logger.log("Processing key: " + thisKey); - - //Using the key we retrieve the value of the map which is a list of KafkaEventRecord - //One object of KafkaEventRecord represents an individual Kafka message - List thisListOfRecords = record.get(thisKey); - - if (thisListOfRecords == null) { - logger.log("WARNING: Record list for key " + thisKey + " is null"); - continue; - } - - logger.log("Record list size for key " + thisKey + ": " + thisListOfRecords.size()); - - //We now iterate through the list of KafkaEventRecords - for(KafkaEventRecord thisRecord : thisListOfRecords) { - logger.log("Processing record..."); - - /* - We initialize a new object of the KafkaMessage class which is a simplified representation in our models package - We then get the fields from each kafka message in the object of KafkaEventRecord class and set them to the fields - of the KafkaRecord class - */ - KafkaMessage thisMessage = new KafkaMessage(); - thisMessage.setTopic(thisRecord.getTopic()); - thisMessage.setPartition(thisRecord.getPartition()); - thisMessage.setOffset(thisRecord.getOffset()); - thisMessage.setTimestamp(thisRecord.getTimestamp()); - thisMessage.setTimestampType(thisRecord.getTimestampType()); - - logger.log("Record metadata - Topic: " + thisRecord.getTopic() + - ", Partition: " + thisRecord.getPartition() + - ", Offset: " + thisRecord.getOffset()); - - String key = thisRecord.getKey(); - String value = thisRecord.getValue(); - - logger.log("Key (base64): " + key); - logger.log("Value (base64): " + value); - - String decodedKey = "null"; - String decodedValue = "null"; - //the key and value inside a kafka message are base64 encrypted and will need to be decrypted - if (null != key) { - logger.log("Decoding key..."); - try { - byte[] decodedKeyBytes = Base64.getDecoder().decode(key); - decodedKey = new String(decodedKeyBytes); - logger.log("Decoded key: " + decodedKey); - } catch (Exception e) { - logger.log("ERROR decoding key: " + e.getMessage()); - } - } else { - logger.log("Key is null"); - } - - if (null != value) { - logger.log("Decoding value..."); - try { - byte[] decodedValueBytes = Base64.getDecoder().decode(value); - logger.log("Value decoded, length: " + decodedValueBytes.length + " bytes"); - - // Print the complete message in hex format - logger.log("Complete message in hex format:"); - logger.log(bytesToHexString(decodedValueBytes, 0)); - - try { - decodedValue = new String(decodedValueBytes); - logger.log("Decoded value as string: " + (decodedValue.length() > 100 ? decodedValue.substring(0, 100) + "..." : decodedValue)); - - // Add more detailed logging for AVRO messages - logger.log("=== AVRO MESSAGE DETAILS ==="); - logger.log("Message appears to be AVRO-formatted. Attempting to extract fields:"); - - // Try to extract some common fields from the AVRO binary data - // This is a simple approach to show some readable content - StringBuilder readableContent = new StringBuilder(); - for (int i = 0; i < decodedValueBytes.length; i++) { - // Skip non-printable characters - if (decodedValueBytes[i] >= 32 && decodedValueBytes[i] < 127) { - readableContent.append((char)decodedValueBytes[i]); - } - } - - String readableString = readableContent.toString(); - logger.log("Readable content extracted from AVRO: " + readableString); - - // Check for zip code patterns - if (readableString.contains("1000")) { - logger.log("FOUND ZIP CODE STARTING WITH 1000"); - } - if (readableString.contains("2000")) { - logger.log("FOUND ZIP CODE STARTING WITH 2000"); - } - - logger.log("=== END AVRO MESSAGE DETAILS ==="); - } catch (Exception e) { - logger.log("ERROR converting bytes to string: " + e.getMessage()); - decodedValue = "Error decoding: " + e.getMessage(); - } - } catch (Exception e) { - logger.log("ERROR decoding value: " + e.getMessage()); - e.printStackTrace(); - } - } else { - logger.log("Value is null"); - } - - thisMessage.setKey(key); - thisMessage.setValue(value); - thisMessage.setDecodedKey(decodedKey); - thisMessage.setDecodedValue(decodedValue); - - //A kafka message can optionally have a list of headers - //the below code is to get the headers, iterate through each header and get its key and value - List headersInThisMessage = new ArrayList(); - List> headers = thisRecord.getHeaders(); - - if (headers != null) { - logger.log("Headers count: " + headers.size()); - - for (Map thisHeader : headers) { - Set thisHeaderKeys = thisHeader.keySet(); - Iterator thisHeaderKeysIterator = thisHeaderKeys.iterator(); - while (thisHeaderKeysIterator.hasNext()) { - String thisHeaderKey = thisHeaderKeysIterator.next(); - byte[] thisHeaderValue = (byte[])thisHeader.get(thisHeaderKey); - String thisHeaderValueString = new String(thisHeaderValue); - KafkaHeader thisMessageHeader = new KafkaHeader(); - thisMessageHeader.setKey(thisHeaderKey); - thisMessageHeader.setValue(thisHeaderValueString); - headersInThisMessage.add(thisMessageHeader); - logger.log("Header - Key: " + thisHeaderKey + ", Value: " + thisHeaderValueString); - } - } - } else { - logger.log("No headers in message"); - } - - thisMessage.setHeaders(headersInThisMessage); - listOfMessages.add(thisMessage); - - // Below we are logging the particular kafka message in string format using the toString method - // as well as in Json format using gson.toJson function - logger.log("Received this message from Kafka - " + thisMessage.toString()); - logger.log("Message in JSON format : " + gson.toJson(thisMessage)); - - // Add a more readable summary of the message - logger.log("=== MESSAGE SUMMARY ==="); - logger.log("Topic: " + thisMessage.getTopic()); - logger.log("Partition: " + thisMessage.getPartition()); - logger.log("Offset: " + thisMessage.getOffset()); - logger.log("Key: " + thisMessage.getDecodedKey()); - - // Check for zip code patterns in the decoded value - String decodedValueStr = thisMessage.getDecodedValue(); - if (decodedValueStr != null) { - if (decodedValueStr.contains("1000")) { - logger.log("ZIP CODE: Found 1000 pattern in message"); - zip1000Count++; - } - if (decodedValueStr.contains("2000")) { - logger.log("ZIP CODE: Found 2000 pattern in message"); - zip2000Count++; - } - } - - logger.log("=== END MESSAGE SUMMARY ==="); - } - } - logger.log("All Messages in this batch = " + gson.toJson(listOfMessages)); - - // Log summary of zip code distribution - logger.log("========== ZIP CODE DISTRIBUTION SUMMARY =========="); - logger.log("Messages with zip code containing 1000: " + zip1000Count); - logger.log("Messages with zip code containing 2000: " + zip2000Count); - logger.log("Other messages: " + (listOfMessages.size() - zip1000Count - zip2000Count)); - logger.log("===================================================="); - - logger.log("========== LAMBDA FUNCTION COMPLETED =========="); - return response; - } - - /** - * Convert byte array to hexadecimal string representation - * - * @param bytes Byte array to convert - * @param maxLength Maximum number of bytes to convert (0 for all) - * @return Hexadecimal string representation - */ - private String bytesToHexString(byte[] bytes, int maxLength) { - StringBuilder sb = new StringBuilder(); - int length = maxLength > 0 && maxLength < bytes.length ? maxLength : bytes.length; - - for (int i = 0; i < length; i++) { - sb.append(String.format("%02X", bytes[i])); - if (i % 16 == 15) { - sb.append("\n"); - } else if (i % 4 == 3) { - sb.append(" "); - } - } - - if (maxLength > 0 && length < bytes.length) { - sb.append("... (").append(bytes.length - length).append(" more bytes)"); - } - - return sb.toString(); - } -} diff --git a/msk-lambda-schema-avro-java-sam/kafka_event_consumer_function/src/main/java/com/amazonaws/services/lambda/samples/events/msk/SimpleHandler.java b/msk-lambda-schema-avro-java-sam/kafka_event_consumer_function/src/main/java/com/amazonaws/services/lambda/samples/events/msk/SimpleHandler.java deleted file mode 100644 index 03c4e5092..000000000 --- a/msk-lambda-schema-avro-java-sam/kafka_event_consumer_function/src/main/java/com/amazonaws/services/lambda/samples/events/msk/SimpleHandler.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.amazonaws.services.lambda.samples.events.msk; - -import com.amazonaws.services.lambda.runtime.Context; -import com.amazonaws.services.lambda.runtime.RequestHandler; - -public class SimpleHandler implements RequestHandler { - - @Override - public String handleRequest(Object event, Context context) { - System.out.println("=== SIMPLE HANDLER CALLED ==="); - System.out.println("Event: " + event); - System.out.println("Event class: " + (event != null ? event.getClass().getName() : "null")); - System.out.println("Context: " + context.getFunctionName()); - System.out.println("=== SIMPLE HANDLER END ==="); - - return "Simple handler executed successfully"; - } -} diff --git a/msk-lambda-schema-avro-java-sam/kafka_event_consumer_function/src/test/java/com/amazonaws/services/lambda/samples/events/msk/HandlerMSKTest.java b/msk-lambda-schema-avro-java-sam/kafka_event_consumer_function/src/test/java/com/amazonaws/services/lambda/samples/events/msk/HandlerMSKTest.java deleted file mode 100644 index 65bd543d3..000000000 --- a/msk-lambda-schema-avro-java-sam/kafka_event_consumer_function/src/test/java/com/amazonaws/services/lambda/samples/events/msk/HandlerMSKTest.java +++ /dev/null @@ -1,70 +0,0 @@ -package com.amazonaws.services.lambda.samples.events.msk; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -import org.junit.jupiter.api.Test; - -import com.amazonaws.services.lambda.runtime.Context; -import com.amazonaws.services.lambda.runtime.events.KafkaEvent; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; - - -class HandlerMSKTest { - private static final String kafkaEventJson = "{\n" - + " \"records\":{\n" - + " \"myTopic-0\":[\n" - + " {\n" - + " \"topic\":\"myTopic\",\n" - + " \"partition\":0,\n" - + " \"offset\":250,\n" - + " \"timestamp\":1678072110111,\n" - + " \"timestampType\":\"CREATE_TIME\",\n" - + " \"value\":\"Zg==\",\n" - + " \"headers\":[\n" - + " \n" - + " ]\n" - + " },\n" - + " {\n" - + " \"topic\":\"myTopic\",\n" - + " \"partition\":0,\n" - + " \"offset\":251,\n" - + " \"timestamp\":1678072111086,\n" - + " \"timestampType\":\"CREATE_TIME\",\n" - + " \"value\":\"Zw==\",\n" - + " \"headers\":[\n" - + " \n" - + " ]\n" - + " }\n" - + " ]\n" - + " },\n" - + " \"eventSource\":\"aws:kafka\",\n" - + " \"eventSourceArn\":\"arn:aws:kafka:us-west-2:123456789012:cluster/MSKWorkshopCluster/a93759a9-c9d0-4952-984c-492c6bfa2be8-13\",\n" - + " \"bootstrapServers\":\"b-2.mskworkshopcluster.z9kc4f.c13.kafka.us-west-2.amazonaws.com:9098,b-3.mskworkshopcluster.z9kc4f.c13.kafka.us-west-2.amazonaws.com:9098,b-1.mskworkshopcluster.z9kc4f.c13.kafka.us-west-2.amazonaws.com:9098\"\n" - + "}"; - - @Test - void invokeTest() { - Gson gson = new GsonBuilder().setPrettyPrinting().create(); - KafkaEvent event = gson.fromJson(kafkaEventJson, KafkaEvent.class); - Context context = new TestContext(); - HandlerMSK handler = new HandlerMSK(); - String result = handler.handleRequest(event, context); - assertEquals(result, "200 OK"); - assertEquals(handler.listOfMessages.size(), 2); - assertEquals(handler.listOfMessages.get(0).getTopic(), "myTopic"); - assertEquals(handler.listOfMessages.get(0).getPartition(), 0); - assertEquals(handler.listOfMessages.get(0).getOffset(), 250L); - assertEquals(handler.listOfMessages.get(0).getTimestamp(), 1678072110111L); - assertEquals(handler.listOfMessages.get(0).getTimestampType(), "CREATE_TIME"); - assertEquals(handler.listOfMessages.get(0).getDecodedKey(), "null"); - assertEquals(handler.listOfMessages.get(0).getDecodedValue(), "f"); - assertEquals(handler.listOfMessages.get(1).getTopic(), "myTopic"); - assertEquals(handler.listOfMessages.get(1).getPartition(), 0); - assertEquals(handler.listOfMessages.get(1).getOffset(), 251L); - assertEquals(handler.listOfMessages.get(1).getTimestamp(), 1678072111086L); - assertEquals(handler.listOfMessages.get(1).getTimestampType(), "CREATE_TIME"); - assertEquals(handler.listOfMessages.get(1).getDecodedKey(), "null"); - assertEquals(handler.listOfMessages.get(1).getDecodedValue(), "g"); - } -} diff --git a/msk-lambda-schema-avro-java-sam/template.yaml b/msk-lambda-schema-avro-java-sam/template.yaml deleted file mode 100644 index 0075add40..000000000 --- a/msk-lambda-schema-avro-java-sam/template.yaml +++ /dev/null @@ -1,284 +0,0 @@ -AWSTemplateFormatVersion: '2010-09-09' -Transform: AWS::Serverless-2016-10-31 -Description: > - kafka_event_consumer_and_producer_functions - - Sample SAM Template for MSK consumer and AVRO producer with IAM auth - -# More info about Globals: https://github.com/awslabs/serverless-application-model/blob/master/docs/globals.rst -Globals: - Function: - Timeout: 15 - -Resources: - # Glue Schema Registry for storing Avro schemas - MSKGlueRegistry: - Type: AWS::Glue::Registry - Properties: - Name: !Ref GlueSchemaRegistryName - Description: "Registry for storing schemas related to MSK" - - # Contact Schema for Avro serialization/deserialization - ContactSchema: - Type: AWS::Glue::Schema - Properties: - Name: !Ref ContactSchemaName - Description: "Schema for Contact data in Avro format" - Compatibility: BACKWARD - DataFormat: AVRO - Registry: - Arn: !GetAtt MSKGlueRegistry.Arn - SchemaDefinition: | - { - "type": "record", - "name": "Contact", - "fields": [ - {"name": "firstName", "type": "string"}, - {"name": "lastName", "type": "string"}, - {"name": "company", "type": "string"}, - {"name": "address", "type": "string"}, - {"name": "city", "type": "string"}, - {"name": "county", "type": "string"}, - {"name": "state", "type": "string"}, - {"name": "zip", "type": "string"}, - {"name": "homePhone", "type": "string"}, - {"name": "cellPhone", "type": "string"}, - {"name": "email", "type": "string"}, - {"name": "website", "type": "string"} - ] - } - - # SQS Queue to use as Dead Letter Queue for the MSK event source mapping - ConsumerDLQ: - Type: AWS::SQS::Queue - Properties: - MessageRetentionPeriod: 1209600 # 14 days (maximum retention period) - VisibilityTimeout: 300 # 5 minutes - Tags: - - Key: Purpose - Value: MSKConsumerDLQ - - LambdaMSKConsumerJavaFunction: - Type: AWS::Serverless::Function # More info about Function Resource: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#awsserverlessfunction - Properties: - CodeUri: kafka_event_consumer_function - Handler: com.amazonaws.services.lambda.samples.events.msk.AvroKafkaHandler::handleRequest - Runtime: java21 - Architectures: - - x86_64 - MemorySize: 512 - VpcConfig: - SecurityGroupIds: !Ref SecurityGroupIds - SubnetIds: !Ref SubnetIds - Environment: # More info about Env Vars: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#environment-object - Variables: - PARAM1: VALUE - JAVA_TOOL_OPTIONS: -XX:+TieredCompilation -XX:TieredStopAtLevel=1 # More info about tiered compilation https://aws.amazon.com/blogs/compute/optimizing-aws-lambda-function-performance-for-java/ - POWERTOOLS_LOG_LEVEL: TRACE - POWERTOOLS_SERVICE_NAME: kafka_consumer - Events: - MSKEvent: - Type: MSK - Properties: - StartingPosition: LATEST - BatchSize: 1 - MaximumBatchingWindowInSeconds: 1 - Stream: !Join [ '', ["arn:", "aws:", "kafka:", !Ref "AWS::Region" , ":" ,!Ref "AWS::AccountId", ":", "cluster/", !Ref MSKClusterName, "/" , !Ref MSKClusterId] ] - Topics: - - !Ref MSKTopic - DestinationConfig: - OnFailure: - Destination: !GetAtt ConsumerDLQ.Arn - FilterCriteria: - Filters: - - Pattern: '{"value": {"zip": [ { "prefix": "2000" } ]}}' - Policies: - - Statement: - - Sid: KafkaClusterPermissionsPolicy - Effect: Allow - Action: - - kafka-cluster:Connect - - kafka-cluster:DescribeGroup - - kafka-cluster:DescribeCluster - - kafka-cluster:AlterCluster - - kafka-cluster:AlterClusterDynamicConfiguration - - kafka-cluster:WriteDataIdempotently - - kafka-cluster:AlterGroup - - kafka-cluster:DescribeTopic - - kafka-cluster:ReadData - - kafka-cluster:DescribeClusterDynamicConfiguration - Resource: - - !Join ['', ["arn:", "aws:", "kafka:", !Ref "AWS::Region", ":", !Ref "AWS::AccountId", ":", "cluster/", !Ref MSKClusterName, "/" , !Ref MSKClusterId]] - - !Join ['', ["arn:", "aws:", "kafka:", !Ref "AWS::Region", ":", !Ref "AWS::AccountId", ":", "topic/", !Ref MSKClusterName, "/" , !Ref MSKClusterId, "/*"]] - - !Join ['', ["arn:", "aws:", "kafka:", !Ref "AWS::Region", ":", !Ref "AWS::AccountId", ":", "group/", !Ref MSKClusterName, "/" , !Ref MSKClusterId, "/*"]] - - - Sid: KafkaPermissionsPolicy - Effect: Allow - Action: - - kafka:DescribeClusterV2 - - kafka:GetBootstrapBrokers - Resource: '*' - - - Sid: EC2PermissionsPolicy - Effect: Allow - Action: - - ec2:DescribeSecurityGroups - - ec2:DescribeSubnets - - ec2:DescribeVpcs - - ec2:CreateNetworkInterface - - ec2:DescribeNetworkInterfaces - - ec2:DeleteNetworkInterface - Resource: '*' - - - Sid: GlueSchemaRegistryPermissionsPolicy - Effect: Allow - Action: - - glue:GetSchemaByDefinition - - glue:GetSchemaVersion - - glue:GetRegistry - - glue:ListSchemas - - glue:ListSchemaVersions - - glue:RegisterSchemaVersion - - glue:PutSchemaVersionMetadata - - glue:GetSchemaVersionsDiff - - glue:QuerySchemaVersionMetadata - Resource: - - !GetAtt MSKGlueRegistry.Arn - - !GetAtt ContactSchema.Arn - - - Sid: SQSPermissionsPolicy - Effect: Allow - Action: - - sqs:SendMessage - Resource: !GetAtt ConsumerDLQ.Arn - - VPCAccessPolicy: {} - - LambdaMSKProducerJavaFunction: - Type: AWS::Serverless::Function - Properties: - CodeUri: kafka_event_producer_function - Handler: com.amazonaws.services.lambda.samples.events.msk.AvroProducerHandler::handleRequest - Runtime: java21 - Timeout: 300 - Architectures: - - x86_64 - MemorySize: 512 - VpcConfig: - SecurityGroupIds: !Ref SecurityGroupIds - SubnetIds: !Ref SubnetIds - Environment: - Variables: - MSK_CLUSTER_ARN: !Join [ '', ["arn:", "aws:", "kafka:", !Ref "AWS::Region" , ":" ,!Ref "AWS::AccountId", ":", "cluster/", !Ref MSKClusterName, "/" , !Ref MSKClusterId] ] - MSK_TOPIC: !Ref MSKTopic - REGISTRY_NAME: !Ref GlueSchemaRegistryName - CONTACT_SCHEMA_NAME: !Ref ContactSchemaName - JAVA_TOOL_OPTIONS: -XX:+TieredCompilation -XX:TieredStopAtLevel=1 - Policies: - - Statement: - - Sid: KafkaClusterPermissionsPolicy - Effect: Allow - Action: - - kafka-cluster:Connect - - kafka-cluster:DescribeCluster - - kafka-cluster:WriteData - - kafka-cluster:DescribeTopic - Resource: - - !Join ['', ["arn:", "aws:", "kafka:", !Ref "AWS::Region", ":", !Ref "AWS::AccountId", ":", "cluster/", !Ref MSKClusterName, "/" , !Ref MSKClusterId]] - - !Join ['', ["arn:", "aws:", "kafka:", !Ref "AWS::Region", ":", !Ref "AWS::AccountId", ":", "topic/", !Ref MSKClusterName, "/" , !Ref MSKClusterId, "/*"]] - - - Sid: KafkaPermissionsPolicy - Effect: Allow - Action: - - kafka:DescribeClusterV2 - - kafka:GetBootstrapBrokers - Resource: '*' - - - Sid: EC2PermissionsPolicy - Effect: Allow - Action: - - ec2:DescribeSecurityGroups - - ec2:DescribeSubnets - - ec2:DescribeVpcs - - ec2:CreateNetworkInterface - - ec2:DescribeNetworkInterfaces - - ec2:DeleteNetworkInterface - Resource: '*' - - - Sid: GlueSchemaRegistryPermissionsPolicy - Effect: Allow - Action: - - glue:GetSchemaByDefinition - - glue:GetSchemaVersion - - glue:GetRegistry - - glue:ListSchemas - - glue:ListSchemaVersions - - glue:GetSchemaVersionsDiff - - glue:QuerySchemaVersionMetadata - - glue:RegisterSchemaVersion - - glue:PutSchemaVersionMetadata - - glue:CreateSchema - - glue:CreateRegistry - Resource: - - !GetAtt MSKGlueRegistry.Arn - - !GetAtt ContactSchema.Arn - - VPCAccessPolicy: {} - -Parameters: - MSKClusterName: - Type: String - Description: Enter the name of the MSK Cluster - Default: CLUSTER_NAME - MSKClusterId: - Type: String - Description: Enter the ID of the MSK Cluster - Default: CLUSTER_ID - MSKTopic: - Type: String - Description: Enter the name of the MSK Topic - Default: KAFKA_TOPIC - GlueSchemaRegistryName: - Type: String - Description: Enter the name of the Glue Schema Registry - Default: GLUE_SCHEMA_REGISTRY_NAME - ContactSchemaName: - Type: String - Description: Enter the name of the Contact Schema - Default: AVRO_SCHEMA - VpcId: - Type: String - Description: Enter the VPC ID where the MSK cluster is deployed - Default: VPC_ID - SubnetIds: - Type: CommaDelimitedList - Description: Enter the subnet IDs where the MSK cluster is deployed (comma-separated) - Default: SUBNET_IDS - SecurityGroupIds: - Type: CommaDelimitedList - Description: Enter the security group IDs that allow access to the MSK cluster (comma-separated) - Default: LAMBDA_SECURITY_GROUP_ID - -Outputs: - MSKConsumerLambdaFunction: - Description: "Topic Consumer Lambda Function ARN" - Value: !GetAtt LambdaMSKConsumerJavaFunction.Arn - MSKProducerLambdaFunction: - Description: "AVRO Producer Lambda Function ARN" - Value: !GetAtt LambdaMSKProducerJavaFunction.Arn - ConsumerDLQUrl: - Description: "URL of the Dead Letter Queue for the MSK Consumer" - Value: !Ref ConsumerDLQ - ConsumerDLQArn: - Description: "ARN of the Dead Letter Queue for the MSK Consumer" - Value: !GetAtt ConsumerDLQ.Arn - GlueSchemaRegistryArn: - Description: "ARN of the Glue Schema Registry" - Value: !GetAtt MSKGlueRegistry.Arn - GlueSchemaRegistryName: - Description: "Name of the Glue Schema Registry" - Value: !Ref MSKGlueRegistry - ContactSchemaArn: - Description: "ARN of the Contact Schema" - Value: !GetAtt ContactSchema.Arn - ContactSchemaName: - Description: "Name of the Contact Schema" - Value: !Ref ContactSchema From 96f4446afcdcce0fc6591f2b6ba79035acf8159e Mon Sep 17 00:00:00 2001 From: Vaibhav Jain Date: Sun, 22 Jun 2025 19:26:41 -0400 Subject: [PATCH 03/10] Need permission to all resources --- msk-lambda-schema-avro-java-sam/template_original.yaml | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/msk-lambda-schema-avro-java-sam/template_original.yaml b/msk-lambda-schema-avro-java-sam/template_original.yaml index f15f54e53..4d788907d 100644 --- a/msk-lambda-schema-avro-java-sam/template_original.yaml +++ b/msk-lambda-schema-avro-java-sam/template_original.yaml @@ -151,9 +151,7 @@ Resources: - glue:PutSchemaVersionMetadata - glue:GetSchemaVersionsDiff - glue:QuerySchemaVersionMetadata - Resource: - - !GetAtt MSKGlueRegistry.Arn - - !GetAtt ContactSchema.Arn + Resource: '*' - Sid: SQSPermissionsPolicy Effect: Allow From 9cc0f8826f2e58f001e1617892b59aad487f56aa Mon Sep 17 00:00:00 2001 From: Vaibhav Jain Date: Sun, 22 Jun 2025 22:04:33 -0400 Subject: [PATCH 04/10] removed contact schema ref --- msk-lambda-schema-avro-java-sam/MSKAndKafkaClientEC2.yaml | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/msk-lambda-schema-avro-java-sam/MSKAndKafkaClientEC2.yaml b/msk-lambda-schema-avro-java-sam/MSKAndKafkaClientEC2.yaml index 89b0b338f..a1b43146b 100644 --- a/msk-lambda-schema-avro-java-sam/MSKAndKafkaClientEC2.yaml +++ b/msk-lambda-schema-avro-java-sam/MSKAndKafkaClientEC2.yaml @@ -1289,9 +1289,4 @@ Outputs: Description: The Topic to use for the Java Lambda Function Value: !Ref KafkaTopicForLambda Export: - Name: !Sub "${AWS::StackName}-KafkaTopicForLambda" - ContactSchemaArn: - Description: ARN of the Contact Schema Registry - Value: !Ref ContactSchema - Export: - Name: !Sub "${AWS::StackName}-ContactSchemaArn" \ No newline at end of file + Name: !Sub "${AWS::StackName}-KafkaTopicForLambda" \ No newline at end of file From 27f31126a113b5a27d659a673f3103adc0418939 Mon Sep 17 00:00:00 2001 From: Vaibhav Jain Date: Sun, 22 Jun 2025 22:07:22 -0400 Subject: [PATCH 05/10] Revert "removed contact schema ref" This reverts commit 9cc0f8826f2e58f001e1617892b59aad487f56aa. --- msk-lambda-schema-avro-java-sam/MSKAndKafkaClientEC2.yaml | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/msk-lambda-schema-avro-java-sam/MSKAndKafkaClientEC2.yaml b/msk-lambda-schema-avro-java-sam/MSKAndKafkaClientEC2.yaml index a1b43146b..89b0b338f 100644 --- a/msk-lambda-schema-avro-java-sam/MSKAndKafkaClientEC2.yaml +++ b/msk-lambda-schema-avro-java-sam/MSKAndKafkaClientEC2.yaml @@ -1289,4 +1289,9 @@ Outputs: Description: The Topic to use for the Java Lambda Function Value: !Ref KafkaTopicForLambda Export: - Name: !Sub "${AWS::StackName}-KafkaTopicForLambda" \ No newline at end of file + Name: !Sub "${AWS::StackName}-KafkaTopicForLambda" + ContactSchemaArn: + Description: ARN of the Contact Schema Registry + Value: !Ref ContactSchema + Export: + Name: !Sub "${AWS::StackName}-ContactSchemaArn" \ No newline at end of file From 58cee28c57a7734ed0dcb715689399ca899b1d71 Mon Sep 17 00:00:00 2001 From: Vaibhav Jain Date: Sun, 22 Jun 2025 22:07:40 -0400 Subject: [PATCH 06/10] Revert "Need permission to all resources" This reverts commit 96f4446afcdcce0fc6591f2b6ba79035acf8159e. --- msk-lambda-schema-avro-java-sam/template_original.yaml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/msk-lambda-schema-avro-java-sam/template_original.yaml b/msk-lambda-schema-avro-java-sam/template_original.yaml index 4d788907d..f15f54e53 100644 --- a/msk-lambda-schema-avro-java-sam/template_original.yaml +++ b/msk-lambda-schema-avro-java-sam/template_original.yaml @@ -151,7 +151,9 @@ Resources: - glue:PutSchemaVersionMetadata - glue:GetSchemaVersionsDiff - glue:QuerySchemaVersionMetadata - Resource: '*' + Resource: + - !GetAtt MSKGlueRegistry.Arn + - !GetAtt ContactSchema.Arn - Sid: SQSPermissionsPolicy Effect: Allow From d6ad9c9506b040ad5e984c0e322fa17de3dd7e7a Mon Sep 17 00:00:00 2001 From: Vaibhav Jain Date: Sun, 22 Jun 2025 22:09:38 -0400 Subject: [PATCH 07/10] Revert "deleted redundant files" This reverts commit 5399dcdc2508b82b4bb33d71fdc803c87e891a83. --- .../lambda/samples/events/msk/HandlerMSK.java | 275 +++++++++++++++++ .../samples/events/msk/SimpleHandler.java | 18 ++ .../samples/events/msk/HandlerMSKTest.java | 70 +++++ msk-lambda-schema-avro-java-sam/template.yaml | 284 ++++++++++++++++++ 4 files changed, 647 insertions(+) create mode 100644 msk-lambda-schema-avro-java-sam/kafka_event_consumer_function/src/main/java/com/amazonaws/services/lambda/samples/events/msk/HandlerMSK.java create mode 100644 msk-lambda-schema-avro-java-sam/kafka_event_consumer_function/src/main/java/com/amazonaws/services/lambda/samples/events/msk/SimpleHandler.java create mode 100644 msk-lambda-schema-avro-java-sam/kafka_event_consumer_function/src/test/java/com/amazonaws/services/lambda/samples/events/msk/HandlerMSKTest.java create mode 100644 msk-lambda-schema-avro-java-sam/template.yaml diff --git a/msk-lambda-schema-avro-java-sam/kafka_event_consumer_function/src/main/java/com/amazonaws/services/lambda/samples/events/msk/HandlerMSK.java b/msk-lambda-schema-avro-java-sam/kafka_event_consumer_function/src/main/java/com/amazonaws/services/lambda/samples/events/msk/HandlerMSK.java new file mode 100644 index 000000000..c5ddbdc7e --- /dev/null +++ b/msk-lambda-schema-avro-java-sam/kafka_event_consumer_function/src/main/java/com/amazonaws/services/lambda/samples/events/msk/HandlerMSK.java @@ -0,0 +1,275 @@ +//Lambda Runtime delivers a batch of messages to the lambda function +//Each batch of messages has two fields EventSource and EventSourceARN +//Each batch of messages also has a field called Records +//The Records is a map with multiple keys and values +//Each key is a combination of the Topic Name and the Partition Number +//One batch of messages can contain messages from multiple partitions + +/* +To simplify representing a batch of Kafka messages as a list of messages +We have created a Java class called KafkaMessage under the models package +Here we are mapping the structure of an incoming Kafka event to a list of +objects of the KafkaMessage class +*/ + +package com.amazonaws.services.lambda.samples.events.msk; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.LambdaLogger; +import com.amazonaws.services.lambda.runtime.RequestHandler; +import com.amazonaws.services.lambda.runtime.events.KafkaEvent; +import com.amazonaws.services.lambda.runtime.events.KafkaEvent.KafkaEventRecord; + +import java.util.ArrayList; +import java.util.Base64; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + +public class HandlerMSK implements RequestHandler{ + //We initialize an empty list of the KafkaMessage class + List listOfMessages = new ArrayList(); + Gson gson = new GsonBuilder().setPrettyPrinting().create(); + @Override + public String handleRequest(KafkaEvent event, Context context) { + LambdaLogger logger = context.getLogger(); + logger.log("========== LAMBDA FUNCTION STARTED =========="); + logger.log("Event received: " + gson.toJson(event)); + + String response = new String("200 OK"); + this.listOfMessages = new ArrayList(); + + // Counters for zip code patterns + int zip1000Count = 0; + int zip2000Count = 0; + //Incoming KafkaEvent object has a property called records that is a map + //Each key in the map is a combination of a topic and a partition + Map> record=event.getRecords(); + + if (record == null) { + logger.log("WARNING: Event records map is null"); + return response; + } + + logger.log("Records map size: " + record.size()); + + Set keySet = record.keySet(); + logger.log("Key set size: " + keySet.size()); + logger.log("Keys: " + keySet); + + Iterator iterator = keySet.iterator(); + //We iterate through each of the keys in the map + while (iterator.hasNext()) { + String thisKey=(String)iterator.next(); + logger.log("Processing key: " + thisKey); + + //Using the key we retrieve the value of the map which is a list of KafkaEventRecord + //One object of KafkaEventRecord represents an individual Kafka message + List thisListOfRecords = record.get(thisKey); + + if (thisListOfRecords == null) { + logger.log("WARNING: Record list for key " + thisKey + " is null"); + continue; + } + + logger.log("Record list size for key " + thisKey + ": " + thisListOfRecords.size()); + + //We now iterate through the list of KafkaEventRecords + for(KafkaEventRecord thisRecord : thisListOfRecords) { + logger.log("Processing record..."); + + /* + We initialize a new object of the KafkaMessage class which is a simplified representation in our models package + We then get the fields from each kafka message in the object of KafkaEventRecord class and set them to the fields + of the KafkaRecord class + */ + KafkaMessage thisMessage = new KafkaMessage(); + thisMessage.setTopic(thisRecord.getTopic()); + thisMessage.setPartition(thisRecord.getPartition()); + thisMessage.setOffset(thisRecord.getOffset()); + thisMessage.setTimestamp(thisRecord.getTimestamp()); + thisMessage.setTimestampType(thisRecord.getTimestampType()); + + logger.log("Record metadata - Topic: " + thisRecord.getTopic() + + ", Partition: " + thisRecord.getPartition() + + ", Offset: " + thisRecord.getOffset()); + + String key = thisRecord.getKey(); + String value = thisRecord.getValue(); + + logger.log("Key (base64): " + key); + logger.log("Value (base64): " + value); + + String decodedKey = "null"; + String decodedValue = "null"; + //the key and value inside a kafka message are base64 encrypted and will need to be decrypted + if (null != key) { + logger.log("Decoding key..."); + try { + byte[] decodedKeyBytes = Base64.getDecoder().decode(key); + decodedKey = new String(decodedKeyBytes); + logger.log("Decoded key: " + decodedKey); + } catch (Exception e) { + logger.log("ERROR decoding key: " + e.getMessage()); + } + } else { + logger.log("Key is null"); + } + + if (null != value) { + logger.log("Decoding value..."); + try { + byte[] decodedValueBytes = Base64.getDecoder().decode(value); + logger.log("Value decoded, length: " + decodedValueBytes.length + " bytes"); + + // Print the complete message in hex format + logger.log("Complete message in hex format:"); + logger.log(bytesToHexString(decodedValueBytes, 0)); + + try { + decodedValue = new String(decodedValueBytes); + logger.log("Decoded value as string: " + (decodedValue.length() > 100 ? decodedValue.substring(0, 100) + "..." : decodedValue)); + + // Add more detailed logging for AVRO messages + logger.log("=== AVRO MESSAGE DETAILS ==="); + logger.log("Message appears to be AVRO-formatted. Attempting to extract fields:"); + + // Try to extract some common fields from the AVRO binary data + // This is a simple approach to show some readable content + StringBuilder readableContent = new StringBuilder(); + for (int i = 0; i < decodedValueBytes.length; i++) { + // Skip non-printable characters + if (decodedValueBytes[i] >= 32 && decodedValueBytes[i] < 127) { + readableContent.append((char)decodedValueBytes[i]); + } + } + + String readableString = readableContent.toString(); + logger.log("Readable content extracted from AVRO: " + readableString); + + // Check for zip code patterns + if (readableString.contains("1000")) { + logger.log("FOUND ZIP CODE STARTING WITH 1000"); + } + if (readableString.contains("2000")) { + logger.log("FOUND ZIP CODE STARTING WITH 2000"); + } + + logger.log("=== END AVRO MESSAGE DETAILS ==="); + } catch (Exception e) { + logger.log("ERROR converting bytes to string: " + e.getMessage()); + decodedValue = "Error decoding: " + e.getMessage(); + } + } catch (Exception e) { + logger.log("ERROR decoding value: " + e.getMessage()); + e.printStackTrace(); + } + } else { + logger.log("Value is null"); + } + + thisMessage.setKey(key); + thisMessage.setValue(value); + thisMessage.setDecodedKey(decodedKey); + thisMessage.setDecodedValue(decodedValue); + + //A kafka message can optionally have a list of headers + //the below code is to get the headers, iterate through each header and get its key and value + List headersInThisMessage = new ArrayList(); + List> headers = thisRecord.getHeaders(); + + if (headers != null) { + logger.log("Headers count: " + headers.size()); + + for (Map thisHeader : headers) { + Set thisHeaderKeys = thisHeader.keySet(); + Iterator thisHeaderKeysIterator = thisHeaderKeys.iterator(); + while (thisHeaderKeysIterator.hasNext()) { + String thisHeaderKey = thisHeaderKeysIterator.next(); + byte[] thisHeaderValue = (byte[])thisHeader.get(thisHeaderKey); + String thisHeaderValueString = new String(thisHeaderValue); + KafkaHeader thisMessageHeader = new KafkaHeader(); + thisMessageHeader.setKey(thisHeaderKey); + thisMessageHeader.setValue(thisHeaderValueString); + headersInThisMessage.add(thisMessageHeader); + logger.log("Header - Key: " + thisHeaderKey + ", Value: " + thisHeaderValueString); + } + } + } else { + logger.log("No headers in message"); + } + + thisMessage.setHeaders(headersInThisMessage); + listOfMessages.add(thisMessage); + + // Below we are logging the particular kafka message in string format using the toString method + // as well as in Json format using gson.toJson function + logger.log("Received this message from Kafka - " + thisMessage.toString()); + logger.log("Message in JSON format : " + gson.toJson(thisMessage)); + + // Add a more readable summary of the message + logger.log("=== MESSAGE SUMMARY ==="); + logger.log("Topic: " + thisMessage.getTopic()); + logger.log("Partition: " + thisMessage.getPartition()); + logger.log("Offset: " + thisMessage.getOffset()); + logger.log("Key: " + thisMessage.getDecodedKey()); + + // Check for zip code patterns in the decoded value + String decodedValueStr = thisMessage.getDecodedValue(); + if (decodedValueStr != null) { + if (decodedValueStr.contains("1000")) { + logger.log("ZIP CODE: Found 1000 pattern in message"); + zip1000Count++; + } + if (decodedValueStr.contains("2000")) { + logger.log("ZIP CODE: Found 2000 pattern in message"); + zip2000Count++; + } + } + + logger.log("=== END MESSAGE SUMMARY ==="); + } + } + logger.log("All Messages in this batch = " + gson.toJson(listOfMessages)); + + // Log summary of zip code distribution + logger.log("========== ZIP CODE DISTRIBUTION SUMMARY =========="); + logger.log("Messages with zip code containing 1000: " + zip1000Count); + logger.log("Messages with zip code containing 2000: " + zip2000Count); + logger.log("Other messages: " + (listOfMessages.size() - zip1000Count - zip2000Count)); + logger.log("===================================================="); + + logger.log("========== LAMBDA FUNCTION COMPLETED =========="); + return response; + } + + /** + * Convert byte array to hexadecimal string representation + * + * @param bytes Byte array to convert + * @param maxLength Maximum number of bytes to convert (0 for all) + * @return Hexadecimal string representation + */ + private String bytesToHexString(byte[] bytes, int maxLength) { + StringBuilder sb = new StringBuilder(); + int length = maxLength > 0 && maxLength < bytes.length ? maxLength : bytes.length; + + for (int i = 0; i < length; i++) { + sb.append(String.format("%02X", bytes[i])); + if (i % 16 == 15) { + sb.append("\n"); + } else if (i % 4 == 3) { + sb.append(" "); + } + } + + if (maxLength > 0 && length < bytes.length) { + sb.append("... (").append(bytes.length - length).append(" more bytes)"); + } + + return sb.toString(); + } +} diff --git a/msk-lambda-schema-avro-java-sam/kafka_event_consumer_function/src/main/java/com/amazonaws/services/lambda/samples/events/msk/SimpleHandler.java b/msk-lambda-schema-avro-java-sam/kafka_event_consumer_function/src/main/java/com/amazonaws/services/lambda/samples/events/msk/SimpleHandler.java new file mode 100644 index 000000000..03c4e5092 --- /dev/null +++ b/msk-lambda-schema-avro-java-sam/kafka_event_consumer_function/src/main/java/com/amazonaws/services/lambda/samples/events/msk/SimpleHandler.java @@ -0,0 +1,18 @@ +package com.amazonaws.services.lambda.samples.events.msk; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.RequestHandler; + +public class SimpleHandler implements RequestHandler { + + @Override + public String handleRequest(Object event, Context context) { + System.out.println("=== SIMPLE HANDLER CALLED ==="); + System.out.println("Event: " + event); + System.out.println("Event class: " + (event != null ? event.getClass().getName() : "null")); + System.out.println("Context: " + context.getFunctionName()); + System.out.println("=== SIMPLE HANDLER END ==="); + + return "Simple handler executed successfully"; + } +} diff --git a/msk-lambda-schema-avro-java-sam/kafka_event_consumer_function/src/test/java/com/amazonaws/services/lambda/samples/events/msk/HandlerMSKTest.java b/msk-lambda-schema-avro-java-sam/kafka_event_consumer_function/src/test/java/com/amazonaws/services/lambda/samples/events/msk/HandlerMSKTest.java new file mode 100644 index 000000000..65bd543d3 --- /dev/null +++ b/msk-lambda-schema-avro-java-sam/kafka_event_consumer_function/src/test/java/com/amazonaws/services/lambda/samples/events/msk/HandlerMSKTest.java @@ -0,0 +1,70 @@ +package com.amazonaws.services.lambda.samples.events.msk; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.junit.jupiter.api.Test; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.events.KafkaEvent; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + + +class HandlerMSKTest { + private static final String kafkaEventJson = "{\n" + + " \"records\":{\n" + + " \"myTopic-0\":[\n" + + " {\n" + + " \"topic\":\"myTopic\",\n" + + " \"partition\":0,\n" + + " \"offset\":250,\n" + + " \"timestamp\":1678072110111,\n" + + " \"timestampType\":\"CREATE_TIME\",\n" + + " \"value\":\"Zg==\",\n" + + " \"headers\":[\n" + + " \n" + + " ]\n" + + " },\n" + + " {\n" + + " \"topic\":\"myTopic\",\n" + + " \"partition\":0,\n" + + " \"offset\":251,\n" + + " \"timestamp\":1678072111086,\n" + + " \"timestampType\":\"CREATE_TIME\",\n" + + " \"value\":\"Zw==\",\n" + + " \"headers\":[\n" + + " \n" + + " ]\n" + + " }\n" + + " ]\n" + + " },\n" + + " \"eventSource\":\"aws:kafka\",\n" + + " \"eventSourceArn\":\"arn:aws:kafka:us-west-2:123456789012:cluster/MSKWorkshopCluster/a93759a9-c9d0-4952-984c-492c6bfa2be8-13\",\n" + + " \"bootstrapServers\":\"b-2.mskworkshopcluster.z9kc4f.c13.kafka.us-west-2.amazonaws.com:9098,b-3.mskworkshopcluster.z9kc4f.c13.kafka.us-west-2.amazonaws.com:9098,b-1.mskworkshopcluster.z9kc4f.c13.kafka.us-west-2.amazonaws.com:9098\"\n" + + "}"; + + @Test + void invokeTest() { + Gson gson = new GsonBuilder().setPrettyPrinting().create(); + KafkaEvent event = gson.fromJson(kafkaEventJson, KafkaEvent.class); + Context context = new TestContext(); + HandlerMSK handler = new HandlerMSK(); + String result = handler.handleRequest(event, context); + assertEquals(result, "200 OK"); + assertEquals(handler.listOfMessages.size(), 2); + assertEquals(handler.listOfMessages.get(0).getTopic(), "myTopic"); + assertEquals(handler.listOfMessages.get(0).getPartition(), 0); + assertEquals(handler.listOfMessages.get(0).getOffset(), 250L); + assertEquals(handler.listOfMessages.get(0).getTimestamp(), 1678072110111L); + assertEquals(handler.listOfMessages.get(0).getTimestampType(), "CREATE_TIME"); + assertEquals(handler.listOfMessages.get(0).getDecodedKey(), "null"); + assertEquals(handler.listOfMessages.get(0).getDecodedValue(), "f"); + assertEquals(handler.listOfMessages.get(1).getTopic(), "myTopic"); + assertEquals(handler.listOfMessages.get(1).getPartition(), 0); + assertEquals(handler.listOfMessages.get(1).getOffset(), 251L); + assertEquals(handler.listOfMessages.get(1).getTimestamp(), 1678072111086L); + assertEquals(handler.listOfMessages.get(1).getTimestampType(), "CREATE_TIME"); + assertEquals(handler.listOfMessages.get(1).getDecodedKey(), "null"); + assertEquals(handler.listOfMessages.get(1).getDecodedValue(), "g"); + } +} diff --git a/msk-lambda-schema-avro-java-sam/template.yaml b/msk-lambda-schema-avro-java-sam/template.yaml new file mode 100644 index 000000000..0075add40 --- /dev/null +++ b/msk-lambda-schema-avro-java-sam/template.yaml @@ -0,0 +1,284 @@ +AWSTemplateFormatVersion: '2010-09-09' +Transform: AWS::Serverless-2016-10-31 +Description: > + kafka_event_consumer_and_producer_functions + + Sample SAM Template for MSK consumer and AVRO producer with IAM auth + +# More info about Globals: https://github.com/awslabs/serverless-application-model/blob/master/docs/globals.rst +Globals: + Function: + Timeout: 15 + +Resources: + # Glue Schema Registry for storing Avro schemas + MSKGlueRegistry: + Type: AWS::Glue::Registry + Properties: + Name: !Ref GlueSchemaRegistryName + Description: "Registry for storing schemas related to MSK" + + # Contact Schema for Avro serialization/deserialization + ContactSchema: + Type: AWS::Glue::Schema + Properties: + Name: !Ref ContactSchemaName + Description: "Schema for Contact data in Avro format" + Compatibility: BACKWARD + DataFormat: AVRO + Registry: + Arn: !GetAtt MSKGlueRegistry.Arn + SchemaDefinition: | + { + "type": "record", + "name": "Contact", + "fields": [ + {"name": "firstName", "type": "string"}, + {"name": "lastName", "type": "string"}, + {"name": "company", "type": "string"}, + {"name": "address", "type": "string"}, + {"name": "city", "type": "string"}, + {"name": "county", "type": "string"}, + {"name": "state", "type": "string"}, + {"name": "zip", "type": "string"}, + {"name": "homePhone", "type": "string"}, + {"name": "cellPhone", "type": "string"}, + {"name": "email", "type": "string"}, + {"name": "website", "type": "string"} + ] + } + + # SQS Queue to use as Dead Letter Queue for the MSK event source mapping + ConsumerDLQ: + Type: AWS::SQS::Queue + Properties: + MessageRetentionPeriod: 1209600 # 14 days (maximum retention period) + VisibilityTimeout: 300 # 5 minutes + Tags: + - Key: Purpose + Value: MSKConsumerDLQ + + LambdaMSKConsumerJavaFunction: + Type: AWS::Serverless::Function # More info about Function Resource: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#awsserverlessfunction + Properties: + CodeUri: kafka_event_consumer_function + Handler: com.amazonaws.services.lambda.samples.events.msk.AvroKafkaHandler::handleRequest + Runtime: java21 + Architectures: + - x86_64 + MemorySize: 512 + VpcConfig: + SecurityGroupIds: !Ref SecurityGroupIds + SubnetIds: !Ref SubnetIds + Environment: # More info about Env Vars: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#environment-object + Variables: + PARAM1: VALUE + JAVA_TOOL_OPTIONS: -XX:+TieredCompilation -XX:TieredStopAtLevel=1 # More info about tiered compilation https://aws.amazon.com/blogs/compute/optimizing-aws-lambda-function-performance-for-java/ + POWERTOOLS_LOG_LEVEL: TRACE + POWERTOOLS_SERVICE_NAME: kafka_consumer + Events: + MSKEvent: + Type: MSK + Properties: + StartingPosition: LATEST + BatchSize: 1 + MaximumBatchingWindowInSeconds: 1 + Stream: !Join [ '', ["arn:", "aws:", "kafka:", !Ref "AWS::Region" , ":" ,!Ref "AWS::AccountId", ":", "cluster/", !Ref MSKClusterName, "/" , !Ref MSKClusterId] ] + Topics: + - !Ref MSKTopic + DestinationConfig: + OnFailure: + Destination: !GetAtt ConsumerDLQ.Arn + FilterCriteria: + Filters: + - Pattern: '{"value": {"zip": [ { "prefix": "2000" } ]}}' + Policies: + - Statement: + - Sid: KafkaClusterPermissionsPolicy + Effect: Allow + Action: + - kafka-cluster:Connect + - kafka-cluster:DescribeGroup + - kafka-cluster:DescribeCluster + - kafka-cluster:AlterCluster + - kafka-cluster:AlterClusterDynamicConfiguration + - kafka-cluster:WriteDataIdempotently + - kafka-cluster:AlterGroup + - kafka-cluster:DescribeTopic + - kafka-cluster:ReadData + - kafka-cluster:DescribeClusterDynamicConfiguration + Resource: + - !Join ['', ["arn:", "aws:", "kafka:", !Ref "AWS::Region", ":", !Ref "AWS::AccountId", ":", "cluster/", !Ref MSKClusterName, "/" , !Ref MSKClusterId]] + - !Join ['', ["arn:", "aws:", "kafka:", !Ref "AWS::Region", ":", !Ref "AWS::AccountId", ":", "topic/", !Ref MSKClusterName, "/" , !Ref MSKClusterId, "/*"]] + - !Join ['', ["arn:", "aws:", "kafka:", !Ref "AWS::Region", ":", !Ref "AWS::AccountId", ":", "group/", !Ref MSKClusterName, "/" , !Ref MSKClusterId, "/*"]] + + - Sid: KafkaPermissionsPolicy + Effect: Allow + Action: + - kafka:DescribeClusterV2 + - kafka:GetBootstrapBrokers + Resource: '*' + + - Sid: EC2PermissionsPolicy + Effect: Allow + Action: + - ec2:DescribeSecurityGroups + - ec2:DescribeSubnets + - ec2:DescribeVpcs + - ec2:CreateNetworkInterface + - ec2:DescribeNetworkInterfaces + - ec2:DeleteNetworkInterface + Resource: '*' + + - Sid: GlueSchemaRegistryPermissionsPolicy + Effect: Allow + Action: + - glue:GetSchemaByDefinition + - glue:GetSchemaVersion + - glue:GetRegistry + - glue:ListSchemas + - glue:ListSchemaVersions + - glue:RegisterSchemaVersion + - glue:PutSchemaVersionMetadata + - glue:GetSchemaVersionsDiff + - glue:QuerySchemaVersionMetadata + Resource: + - !GetAtt MSKGlueRegistry.Arn + - !GetAtt ContactSchema.Arn + + - Sid: SQSPermissionsPolicy + Effect: Allow + Action: + - sqs:SendMessage + Resource: !GetAtt ConsumerDLQ.Arn + - VPCAccessPolicy: {} + + LambdaMSKProducerJavaFunction: + Type: AWS::Serverless::Function + Properties: + CodeUri: kafka_event_producer_function + Handler: com.amazonaws.services.lambda.samples.events.msk.AvroProducerHandler::handleRequest + Runtime: java21 + Timeout: 300 + Architectures: + - x86_64 + MemorySize: 512 + VpcConfig: + SecurityGroupIds: !Ref SecurityGroupIds + SubnetIds: !Ref SubnetIds + Environment: + Variables: + MSK_CLUSTER_ARN: !Join [ '', ["arn:", "aws:", "kafka:", !Ref "AWS::Region" , ":" ,!Ref "AWS::AccountId", ":", "cluster/", !Ref MSKClusterName, "/" , !Ref MSKClusterId] ] + MSK_TOPIC: !Ref MSKTopic + REGISTRY_NAME: !Ref GlueSchemaRegistryName + CONTACT_SCHEMA_NAME: !Ref ContactSchemaName + JAVA_TOOL_OPTIONS: -XX:+TieredCompilation -XX:TieredStopAtLevel=1 + Policies: + - Statement: + - Sid: KafkaClusterPermissionsPolicy + Effect: Allow + Action: + - kafka-cluster:Connect + - kafka-cluster:DescribeCluster + - kafka-cluster:WriteData + - kafka-cluster:DescribeTopic + Resource: + - !Join ['', ["arn:", "aws:", "kafka:", !Ref "AWS::Region", ":", !Ref "AWS::AccountId", ":", "cluster/", !Ref MSKClusterName, "/" , !Ref MSKClusterId]] + - !Join ['', ["arn:", "aws:", "kafka:", !Ref "AWS::Region", ":", !Ref "AWS::AccountId", ":", "topic/", !Ref MSKClusterName, "/" , !Ref MSKClusterId, "/*"]] + + - Sid: KafkaPermissionsPolicy + Effect: Allow + Action: + - kafka:DescribeClusterV2 + - kafka:GetBootstrapBrokers + Resource: '*' + + - Sid: EC2PermissionsPolicy + Effect: Allow + Action: + - ec2:DescribeSecurityGroups + - ec2:DescribeSubnets + - ec2:DescribeVpcs + - ec2:CreateNetworkInterface + - ec2:DescribeNetworkInterfaces + - ec2:DeleteNetworkInterface + Resource: '*' + + - Sid: GlueSchemaRegistryPermissionsPolicy + Effect: Allow + Action: + - glue:GetSchemaByDefinition + - glue:GetSchemaVersion + - glue:GetRegistry + - glue:ListSchemas + - glue:ListSchemaVersions + - glue:GetSchemaVersionsDiff + - glue:QuerySchemaVersionMetadata + - glue:RegisterSchemaVersion + - glue:PutSchemaVersionMetadata + - glue:CreateSchema + - glue:CreateRegistry + Resource: + - !GetAtt MSKGlueRegistry.Arn + - !GetAtt ContactSchema.Arn + - VPCAccessPolicy: {} + +Parameters: + MSKClusterName: + Type: String + Description: Enter the name of the MSK Cluster + Default: CLUSTER_NAME + MSKClusterId: + Type: String + Description: Enter the ID of the MSK Cluster + Default: CLUSTER_ID + MSKTopic: + Type: String + Description: Enter the name of the MSK Topic + Default: KAFKA_TOPIC + GlueSchemaRegistryName: + Type: String + Description: Enter the name of the Glue Schema Registry + Default: GLUE_SCHEMA_REGISTRY_NAME + ContactSchemaName: + Type: String + Description: Enter the name of the Contact Schema + Default: AVRO_SCHEMA + VpcId: + Type: String + Description: Enter the VPC ID where the MSK cluster is deployed + Default: VPC_ID + SubnetIds: + Type: CommaDelimitedList + Description: Enter the subnet IDs where the MSK cluster is deployed (comma-separated) + Default: SUBNET_IDS + SecurityGroupIds: + Type: CommaDelimitedList + Description: Enter the security group IDs that allow access to the MSK cluster (comma-separated) + Default: LAMBDA_SECURITY_GROUP_ID + +Outputs: + MSKConsumerLambdaFunction: + Description: "Topic Consumer Lambda Function ARN" + Value: !GetAtt LambdaMSKConsumerJavaFunction.Arn + MSKProducerLambdaFunction: + Description: "AVRO Producer Lambda Function ARN" + Value: !GetAtt LambdaMSKProducerJavaFunction.Arn + ConsumerDLQUrl: + Description: "URL of the Dead Letter Queue for the MSK Consumer" + Value: !Ref ConsumerDLQ + ConsumerDLQArn: + Description: "ARN of the Dead Letter Queue for the MSK Consumer" + Value: !GetAtt ConsumerDLQ.Arn + GlueSchemaRegistryArn: + Description: "ARN of the Glue Schema Registry" + Value: !GetAtt MSKGlueRegistry.Arn + GlueSchemaRegistryName: + Description: "Name of the Glue Schema Registry" + Value: !Ref MSKGlueRegistry + ContactSchemaArn: + Description: "ARN of the Contact Schema" + Value: !GetAtt ContactSchema.Arn + ContactSchemaName: + Description: "Name of the Contact Schema" + Value: !Ref ContactSchema From 21852d9da9b266005fb76ce994bcb2b39a81b062 Mon Sep 17 00:00:00 2001 From: Vaibhav Jain Date: Sun, 22 Jun 2025 22:10:04 -0400 Subject: [PATCH 08/10] Revert "Moved schema registry to SAM" This reverts commit 126917cb6d863672ef2ff1bdaea6fefab13ebc45. --- .../MSKAndKafkaClientEC2.yaml | 52 +++++++++++++++- msk-lambda-schema-avro-java-sam/README.md | 9 ++- msk-lambda-schema-avro-java-sam/template.yaml | 57 +---------------- .../template_original.yaml | 61 ++----------------- 4 files changed, 62 insertions(+), 117 deletions(-) diff --git a/msk-lambda-schema-avro-java-sam/MSKAndKafkaClientEC2.yaml b/msk-lambda-schema-avro-java-sam/MSKAndKafkaClientEC2.yaml index 89b0b338f..53cb70b51 100644 --- a/msk-lambda-schema-avro-java-sam/MSKAndKafkaClientEC2.yaml +++ b/msk-lambda-schema-avro-java-sam/MSKAndKafkaClientEC2.yaml @@ -31,7 +31,12 @@ Parameters: ServerlessLandGithubLocation: Type: String Default: https://github.com/aws-samples/serverless-patterns/ - + ContactSchemaName: + Type: String + Default: ContactSchema + GlueSchemaRegistryForMSKName: + Type: String + Default: GlueSchemaRegistryForMSK Conditions: CreateProvisionedCluster: !Equals @@ -271,6 +276,39 @@ Resources: ToPort: 22 SourceSecurityGroupId: !GetAtt KafkaClientInstanceSecurityGroup.GroupId + MSKGlueRegistry: + Type: AWS::Glue::Registry + Properties: + Name: !Ref GlueSchemaRegistryForMSKName + Description: "Registry for storing schemas related to MSK" + + ContactSchema: + Type: AWS::Glue::Schema + Properties: + Name: !Ref ContactSchemaName + Compatibility: BACKWARD + DataFormat: AVRO + Registry: + Arn: !GetAtt MSKGlueRegistry.Arn + SchemaDefinition: | + { + "type": "record", + "name": "Contact", + "fields": [ + {"name": "firstname", "type": "string"}, + {"name": "lastname", "type": "string"}, + {"name": "company", "type": "string"}, + {"name": "street", "type": "string"}, + {"name": "city", "type": "string"}, + {"name": "county", "type": "string"}, + {"name": "state", "type": "string"}, + {"name": "zip", "type": "string"}, + {"name": "homePhone", "type": "string"}, + {"name": "cellPhone", "type": "string"}, + {"name": "email", "type": "string"}, + {"name": "website", "type": "string"} + ] + } KafkaClientEC2InstanceProvisioned: Condition: CreateProvisionedCluster @@ -518,6 +556,8 @@ Resources: cp template_original.yaml template.yaml sudo chown -R ec2-user . + GLUE_SCHEMA_REGISTRY_NAME=${glue_registry_name} + CONTACT_SCHEMA=${contact_schema_name} VPC_ID=${vpcid} LAMBDA_SECURITY_GROUP_ID=${securitygroup} PRIVATE_SUBNET_1=${privatesubnetone} @@ -530,6 +570,8 @@ Resources: sed -i "s/CLUSTER_ID/$CLUSTER_ID/g" template.yaml sed -i "s/KAFKA_TOPIC/$KAFKA_TOPIC/g" template.yaml sed -i "s/JAVA_VERSION/$JAVA_VERSION/g" template.yaml + sed -i "s/GLUE_SCHEMA_REGISTRY_NAME/$GLUE_SCHEMA_REGISTRY_NAME/g" template.yaml + sed -i "s/AVRO_SCHEMA/$CONTACT_SCHEMA/g" template.yaml sed -i "s/VPC_ID/$VPC_ID/g" template.yaml sed -i "s/LAMBDA_SECURITY_GROUP_ID/$LAMBDA_SECURITY_GROUP_ID/g" template.yaml sed -i "s/SUBNET_IDS/$SUBNET_IDS/g" template.yaml @@ -560,6 +602,8 @@ Resources: privatesubnettwo: !Ref PrivateSubnetMSKTwo privatesubnetthree: !Ref PrivateSubnetMSKThree securitygroup: !GetAtt MSKSecurityGroup.GroupId + glue_registry_name: !Ref GlueSchemaRegistryForMSKName + contact_schema_name: !Ref ContactSchemaName KafkaClientEC2InstanceServerless: @@ -808,6 +852,8 @@ Resources: cp template_original.yaml template.yaml sudo chown -R ec2-user . + GLUE_SCHEMA_REGISTRY_NAME=${glue_registry_name} + CONTACT_SCHEMA=${contact_schema_name} VPC_ID=${vpcid} LAMBDA_SECURITY_GROUP_ID=${securitygroup} PRIVATE_SUBNET_1=${privatesubnetone} @@ -821,6 +867,8 @@ Resources: sed -i "s/CLUSTER_ID/$CLUSTER_ID/g" template.yaml sed -i "s/KAFKA_TOPIC/$KAFKA_TOPIC/g" template.yaml sed -i "s/JAVA_VERSION/$JAVA_VERSION/g" template.yaml + sed -i "s/GLUE_SCHEMA_REGISTRY_NAME/$GLUE_SCHEMA_REGISTRY_NAME/g" template.yaml + sed -i "s/AVRO_SCHEMA/$CONTACT_SCHEMA/g" template.yaml sed -i "s/VPC_ID/$VPC_ID/g" template.yaml sed -i "s/LAMBDA_SECURITY_GROUP_ID/$LAMBDA_SECURITY_GROUP_ID/g" template.yaml sed -i "s/SUBNET_IDS/$SUBNET_IDS/g" template.yaml @@ -851,6 +899,8 @@ Resources: privatesubnettwo: !Ref PrivateSubnetMSKTwo privatesubnetthree: !Ref PrivateSubnetMSKThree securitygroup: !GetAtt MSKSecurityGroup.GroupId + glue_registry_name: !Ref GlueSchemaRegistryForMSKName + contact_schema_name: !Ref ContactSchemaName diff --git a/msk-lambda-schema-avro-java-sam/README.md b/msk-lambda-schema-avro-java-sam/README.md index bb2125ee5..97c1a48b7 100644 --- a/msk-lambda-schema-avro-java-sam/README.md +++ b/msk-lambda-schema-avro-java-sam/README.md @@ -4,7 +4,7 @@ This pattern is an example of Lambda functions that: 1. Consume messages from an Amazon Managed Streaming for Kafka (Amazon MSK) topic 2. Produce Avro-formatted messages to an Amazon MSK topic using Schema Registry -Both functions use IAM authentication to connect to the MSK Cluster and use AWS Glue Schema Registry for Avro schema management. The Glue Schema Registry and Contact Schema are created as part of the SAM deployment. +Both functions use IAM authentication to connect to the MSK Cluster and use AWS Glue Schema Registry for Avro schema management. This project contains source code and supporting files for a serverless application that you can deploy with the SAM CLI. It includes the following files and folders. @@ -12,7 +12,7 @@ This project contains source code and supporting files for a serverless applicat - `kafka_event_producer_function/src/main/java` - Code for the Avro producer Lambda function. - `events` - Invocation events that you can use to invoke the functions. - `kafka_event_consumer_function/src/test/java` - Unit tests for the consumer code. -- `template.yaml` - A template that defines the application's Lambda functions, Glue Schema Registry, and Contact Schema. +- `template.yaml` - A template that defines the application's Lambda functions. - `template_original.yaml` - The original template with placeholders that get replaced during deployment. - `MSKAndKafkaClientEC2.yaml` - A CloudFormation template file that can be used to deploy an MSK cluster and also deploy an EC2 machine with all pre-requisites already installed, so you can directly build and deploy the lambda functions and test them out. @@ -96,15 +96,14 @@ sam deploy --capabilities CAPABILITY_IAM --no-confirm-changeset --no-disable-rol The `sam deploy` command packages and deploys your application to AWS, with a series of prompts. > [!NOTE] -> The script retrieves the required parameters from the CloudFormation outputs in the AWS Console after deploying the `MSKAndKafkaClientEC2.yaml` template. These outputs contain the necessary information for deploying the Lambda functions. If you connect to a different Kafka cluster, enter the values manually. +> The script retrieves the required parameters from the CloudFormation outputs in the AWS Console after deploying the `MSKAndKafkaClientEC2.yaml` template. These outputs contain all the necessary information for deploying the Lambda functions. If you connect to a different Kafka cluster, enter the values manually. * **Stack Name**: The name of the stack to deploy to CloudFormation. This should be unique to your account and region, and a good starting point would be something matching your project name. * **AWS Region**: The AWS region you want to deploy your app to. * **Parameter MSKClusterName**: The name of the MSK Cluster. This will be `-cluster` from the CloudFormation template you deployed in the previous step. * **Parameter MSKClusterId**: The unique ID of the MSK Cluster. This can be found in the MSK console or extracted from the MSK ARN in the CloudFormation outputs. * **Parameter MSKTopic**: The Kafka topic on which the Lambda functions will produce and consume messages. You can find this in the CloudFormation outputs as `KafkaTopicForLambda` -* **Parameter GlueSchemaRegistryName**: The name of the Glue Schema Registry to be created (default: GlueSchemaRegistryForMSK). -* **Parameter ContactSchemaName**: The name of the Contact Schema to be created (default: ContactSchema). +* **Parameter ContactSchemaName**: The name of the schema to be used for the Avro serialization (default: ContactSchema). * **Parameter VpcId**: The ID of the VPC where the MSK cluster is deployed. You can find this in the CloudFormation outputs as `VPCId`. * **Parameter SubnetIds**: Comma-separated list of subnet IDs where the MSK cluster is deployed. You can find these in the CloudFormation outputs as `PrivateSubnetMSKOne`, `PrivateSubnetMSKTwo`, and `PrivateSubnetMSKThree`. * **Parameter SecurityGroupIds**: Comma-separated list of security group IDs that allow access to the MSK cluster. You can find this in the CloudFormation outputs as `SecurityGroupId`. diff --git a/msk-lambda-schema-avro-java-sam/template.yaml b/msk-lambda-schema-avro-java-sam/template.yaml index 0075add40..c416c991d 100644 --- a/msk-lambda-schema-avro-java-sam/template.yaml +++ b/msk-lambda-schema-avro-java-sam/template.yaml @@ -11,43 +11,6 @@ Globals: Timeout: 15 Resources: - # Glue Schema Registry for storing Avro schemas - MSKGlueRegistry: - Type: AWS::Glue::Registry - Properties: - Name: !Ref GlueSchemaRegistryName - Description: "Registry for storing schemas related to MSK" - - # Contact Schema for Avro serialization/deserialization - ContactSchema: - Type: AWS::Glue::Schema - Properties: - Name: !Ref ContactSchemaName - Description: "Schema for Contact data in Avro format" - Compatibility: BACKWARD - DataFormat: AVRO - Registry: - Arn: !GetAtt MSKGlueRegistry.Arn - SchemaDefinition: | - { - "type": "record", - "name": "Contact", - "fields": [ - {"name": "firstName", "type": "string"}, - {"name": "lastName", "type": "string"}, - {"name": "company", "type": "string"}, - {"name": "address", "type": "string"}, - {"name": "city", "type": "string"}, - {"name": "county", "type": "string"}, - {"name": "state", "type": "string"}, - {"name": "zip", "type": "string"}, - {"name": "homePhone", "type": "string"}, - {"name": "cellPhone", "type": "string"}, - {"name": "email", "type": "string"}, - {"name": "website", "type": "string"} - ] - } - # SQS Queue to use as Dead Letter Queue for the MSK event source mapping ConsumerDLQ: Type: AWS::SQS::Queue @@ -142,9 +105,7 @@ Resources: - glue:PutSchemaVersionMetadata - glue:GetSchemaVersionsDiff - glue:QuerySchemaVersionMetadata - Resource: - - !GetAtt MSKGlueRegistry.Arn - - !GetAtt ContactSchema.Arn + Resource: '*' - Sid: SQSPermissionsPolicy Effect: Allow @@ -219,8 +180,8 @@ Resources: - glue:CreateSchema - glue:CreateRegistry Resource: - - !GetAtt MSKGlueRegistry.Arn - - !GetAtt ContactSchema.Arn + - !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:registry/*" + - !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:schema/*/*" - VPCAccessPolicy: {} Parameters: @@ -270,15 +231,3 @@ Outputs: ConsumerDLQArn: Description: "ARN of the Dead Letter Queue for the MSK Consumer" Value: !GetAtt ConsumerDLQ.Arn - GlueSchemaRegistryArn: - Description: "ARN of the Glue Schema Registry" - Value: !GetAtt MSKGlueRegistry.Arn - GlueSchemaRegistryName: - Description: "Name of the Glue Schema Registry" - Value: !Ref MSKGlueRegistry - ContactSchemaArn: - Description: "ARN of the Contact Schema" - Value: !GetAtt ContactSchema.Arn - ContactSchemaName: - Description: "Name of the Contact Schema" - Value: !Ref ContactSchema diff --git a/msk-lambda-schema-avro-java-sam/template_original.yaml b/msk-lambda-schema-avro-java-sam/template_original.yaml index f15f54e53..e77964ce2 100644 --- a/msk-lambda-schema-avro-java-sam/template_original.yaml +++ b/msk-lambda-schema-avro-java-sam/template_original.yaml @@ -11,43 +11,6 @@ Globals: Timeout: 15 Resources: - # Glue Schema Registry for storing Avro schemas - MSKGlueRegistry: - Type: AWS::Glue::Registry - Properties: - Name: !Ref GlueSchemaRegistryName - Description: "Registry for storing schemas related to MSK" - - # Contact Schema for Avro serialization/deserialization - ContactSchema: - Type: AWS::Glue::Schema - Properties: - Name: !Ref ContactSchemaName - Description: "Schema for Contact data in Avro format" - Compatibility: BACKWARD - DataFormat: AVRO - Registry: - Arn: !GetAtt MSKGlueRegistry.Arn - SchemaDefinition: | - { - "type": "record", - "name": "Contact", - "fields": [ - {"name": "firstName", "type": "string"}, - {"name": "lastName", "type": "string"}, - {"name": "company", "type": "string"}, - {"name": "address", "type": "string"}, - {"name": "city", "type": "string"}, - {"name": "county", "type": "string"}, - {"name": "state", "type": "string"}, - {"name": "zip", "type": "string"}, - {"name": "homePhone", "type": "string"}, - {"name": "cellPhone", "type": "string"}, - {"name": "email", "type": "string"}, - {"name": "website", "type": "string"} - ] - } - # SQS Queue to use as Dead Letter Queue for the MSK event source mapping ConsumerDLQ: Type: AWS::SQS::Queue @@ -62,7 +25,7 @@ Resources: Type: AWS::Serverless::Function # More info about Function Resource: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#awsserverlessfunction Properties: CodeUri: kafka_event_consumer_function - Handler: com.amazonaws.services.lambda.samples.events.msk.AvroKafkaHandler::handleRequest + Handler: com.amazonaws.services.lambda.samples.events.msk.HandlerMSK::handleRequest Runtime: java21 Architectures: - x86_64 @@ -74,8 +37,6 @@ Resources: Variables: PARAM1: VALUE JAVA_TOOL_OPTIONS: -XX:+TieredCompilation -XX:TieredStopAtLevel=1 # More info about tiered compilation https://aws.amazon.com/blogs/compute/optimizing-aws-lambda-function-performance-for-java/ - POWERTOOLS_LOG_LEVEL: TRACE - POWERTOOLS_SERVICE_NAME: kafka_consumer Events: MSKEvent: Type: MSK @@ -151,9 +112,7 @@ Resources: - glue:PutSchemaVersionMetadata - glue:GetSchemaVersionsDiff - glue:QuerySchemaVersionMetadata - Resource: - - !GetAtt MSKGlueRegistry.Arn - - !GetAtt ContactSchema.Arn + Resource: '*' - Sid: SQSPermissionsPolicy Effect: Allow @@ -228,8 +187,8 @@ Resources: - glue:CreateSchema - glue:CreateRegistry Resource: - - !GetAtt MSKGlueRegistry.Arn - - !GetAtt ContactSchema.Arn + - !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:registry/*" + - !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:schema/*/*" - VPCAccessPolicy: {} Parameters: @@ -279,15 +238,3 @@ Outputs: ConsumerDLQArn: Description: "ARN of the Dead Letter Queue for the MSK Consumer" Value: !GetAtt ConsumerDLQ.Arn - GlueSchemaRegistryArn: - Description: "ARN of the Glue Schema Registry" - Value: !GetAtt MSKGlueRegistry.Arn - GlueSchemaRegistryName: - Description: "Name of the Glue Schema Registry" - Value: !Ref MSKGlueRegistry - ContactSchemaArn: - Description: "ARN of the Contact Schema" - Value: !GetAtt ContactSchema.Arn - ContactSchemaName: - Description: "Name of the Contact Schema" - Value: !Ref ContactSchema From 45a05a2ee5755fe9684e7b833056268a5ec8643f Mon Sep 17 00:00:00 2001 From: Vaibhav Jain Date: Sun, 22 Jun 2025 22:12:41 -0400 Subject: [PATCH 09/10] Removed redundant files and updated handler --- .../lambda/samples/events/msk/HandlerMSK.java | 275 ------------------ .../samples/events/msk/SimpleHandler.java | 18 -- .../samples/events/msk/HandlerMSKTest.java | 70 ----- msk-lambda-schema-avro-java-sam/template.yaml | 233 --------------- .../template_original.yaml | 2 +- 5 files changed, 1 insertion(+), 597 deletions(-) delete mode 100644 msk-lambda-schema-avro-java-sam/kafka_event_consumer_function/src/main/java/com/amazonaws/services/lambda/samples/events/msk/HandlerMSK.java delete mode 100644 msk-lambda-schema-avro-java-sam/kafka_event_consumer_function/src/main/java/com/amazonaws/services/lambda/samples/events/msk/SimpleHandler.java delete mode 100644 msk-lambda-schema-avro-java-sam/kafka_event_consumer_function/src/test/java/com/amazonaws/services/lambda/samples/events/msk/HandlerMSKTest.java delete mode 100644 msk-lambda-schema-avro-java-sam/template.yaml diff --git a/msk-lambda-schema-avro-java-sam/kafka_event_consumer_function/src/main/java/com/amazonaws/services/lambda/samples/events/msk/HandlerMSK.java b/msk-lambda-schema-avro-java-sam/kafka_event_consumer_function/src/main/java/com/amazonaws/services/lambda/samples/events/msk/HandlerMSK.java deleted file mode 100644 index c5ddbdc7e..000000000 --- a/msk-lambda-schema-avro-java-sam/kafka_event_consumer_function/src/main/java/com/amazonaws/services/lambda/samples/events/msk/HandlerMSK.java +++ /dev/null @@ -1,275 +0,0 @@ -//Lambda Runtime delivers a batch of messages to the lambda function -//Each batch of messages has two fields EventSource and EventSourceARN -//Each batch of messages also has a field called Records -//The Records is a map with multiple keys and values -//Each key is a combination of the Topic Name and the Partition Number -//One batch of messages can contain messages from multiple partitions - -/* -To simplify representing a batch of Kafka messages as a list of messages -We have created a Java class called KafkaMessage under the models package -Here we are mapping the structure of an incoming Kafka event to a list of -objects of the KafkaMessage class -*/ - -package com.amazonaws.services.lambda.samples.events.msk; - -import com.amazonaws.services.lambda.runtime.Context; -import com.amazonaws.services.lambda.runtime.LambdaLogger; -import com.amazonaws.services.lambda.runtime.RequestHandler; -import com.amazonaws.services.lambda.runtime.events.KafkaEvent; -import com.amazonaws.services.lambda.runtime.events.KafkaEvent.KafkaEventRecord; - -import java.util.ArrayList; -import java.util.Base64; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; - -public class HandlerMSK implements RequestHandler{ - //We initialize an empty list of the KafkaMessage class - List listOfMessages = new ArrayList(); - Gson gson = new GsonBuilder().setPrettyPrinting().create(); - @Override - public String handleRequest(KafkaEvent event, Context context) { - LambdaLogger logger = context.getLogger(); - logger.log("========== LAMBDA FUNCTION STARTED =========="); - logger.log("Event received: " + gson.toJson(event)); - - String response = new String("200 OK"); - this.listOfMessages = new ArrayList(); - - // Counters for zip code patterns - int zip1000Count = 0; - int zip2000Count = 0; - //Incoming KafkaEvent object has a property called records that is a map - //Each key in the map is a combination of a topic and a partition - Map> record=event.getRecords(); - - if (record == null) { - logger.log("WARNING: Event records map is null"); - return response; - } - - logger.log("Records map size: " + record.size()); - - Set keySet = record.keySet(); - logger.log("Key set size: " + keySet.size()); - logger.log("Keys: " + keySet); - - Iterator iterator = keySet.iterator(); - //We iterate through each of the keys in the map - while (iterator.hasNext()) { - String thisKey=(String)iterator.next(); - logger.log("Processing key: " + thisKey); - - //Using the key we retrieve the value of the map which is a list of KafkaEventRecord - //One object of KafkaEventRecord represents an individual Kafka message - List thisListOfRecords = record.get(thisKey); - - if (thisListOfRecords == null) { - logger.log("WARNING: Record list for key " + thisKey + " is null"); - continue; - } - - logger.log("Record list size for key " + thisKey + ": " + thisListOfRecords.size()); - - //We now iterate through the list of KafkaEventRecords - for(KafkaEventRecord thisRecord : thisListOfRecords) { - logger.log("Processing record..."); - - /* - We initialize a new object of the KafkaMessage class which is a simplified representation in our models package - We then get the fields from each kafka message in the object of KafkaEventRecord class and set them to the fields - of the KafkaRecord class - */ - KafkaMessage thisMessage = new KafkaMessage(); - thisMessage.setTopic(thisRecord.getTopic()); - thisMessage.setPartition(thisRecord.getPartition()); - thisMessage.setOffset(thisRecord.getOffset()); - thisMessage.setTimestamp(thisRecord.getTimestamp()); - thisMessage.setTimestampType(thisRecord.getTimestampType()); - - logger.log("Record metadata - Topic: " + thisRecord.getTopic() + - ", Partition: " + thisRecord.getPartition() + - ", Offset: " + thisRecord.getOffset()); - - String key = thisRecord.getKey(); - String value = thisRecord.getValue(); - - logger.log("Key (base64): " + key); - logger.log("Value (base64): " + value); - - String decodedKey = "null"; - String decodedValue = "null"; - //the key and value inside a kafka message are base64 encrypted and will need to be decrypted - if (null != key) { - logger.log("Decoding key..."); - try { - byte[] decodedKeyBytes = Base64.getDecoder().decode(key); - decodedKey = new String(decodedKeyBytes); - logger.log("Decoded key: " + decodedKey); - } catch (Exception e) { - logger.log("ERROR decoding key: " + e.getMessage()); - } - } else { - logger.log("Key is null"); - } - - if (null != value) { - logger.log("Decoding value..."); - try { - byte[] decodedValueBytes = Base64.getDecoder().decode(value); - logger.log("Value decoded, length: " + decodedValueBytes.length + " bytes"); - - // Print the complete message in hex format - logger.log("Complete message in hex format:"); - logger.log(bytesToHexString(decodedValueBytes, 0)); - - try { - decodedValue = new String(decodedValueBytes); - logger.log("Decoded value as string: " + (decodedValue.length() > 100 ? decodedValue.substring(0, 100) + "..." : decodedValue)); - - // Add more detailed logging for AVRO messages - logger.log("=== AVRO MESSAGE DETAILS ==="); - logger.log("Message appears to be AVRO-formatted. Attempting to extract fields:"); - - // Try to extract some common fields from the AVRO binary data - // This is a simple approach to show some readable content - StringBuilder readableContent = new StringBuilder(); - for (int i = 0; i < decodedValueBytes.length; i++) { - // Skip non-printable characters - if (decodedValueBytes[i] >= 32 && decodedValueBytes[i] < 127) { - readableContent.append((char)decodedValueBytes[i]); - } - } - - String readableString = readableContent.toString(); - logger.log("Readable content extracted from AVRO: " + readableString); - - // Check for zip code patterns - if (readableString.contains("1000")) { - logger.log("FOUND ZIP CODE STARTING WITH 1000"); - } - if (readableString.contains("2000")) { - logger.log("FOUND ZIP CODE STARTING WITH 2000"); - } - - logger.log("=== END AVRO MESSAGE DETAILS ==="); - } catch (Exception e) { - logger.log("ERROR converting bytes to string: " + e.getMessage()); - decodedValue = "Error decoding: " + e.getMessage(); - } - } catch (Exception e) { - logger.log("ERROR decoding value: " + e.getMessage()); - e.printStackTrace(); - } - } else { - logger.log("Value is null"); - } - - thisMessage.setKey(key); - thisMessage.setValue(value); - thisMessage.setDecodedKey(decodedKey); - thisMessage.setDecodedValue(decodedValue); - - //A kafka message can optionally have a list of headers - //the below code is to get the headers, iterate through each header and get its key and value - List headersInThisMessage = new ArrayList(); - List> headers = thisRecord.getHeaders(); - - if (headers != null) { - logger.log("Headers count: " + headers.size()); - - for (Map thisHeader : headers) { - Set thisHeaderKeys = thisHeader.keySet(); - Iterator thisHeaderKeysIterator = thisHeaderKeys.iterator(); - while (thisHeaderKeysIterator.hasNext()) { - String thisHeaderKey = thisHeaderKeysIterator.next(); - byte[] thisHeaderValue = (byte[])thisHeader.get(thisHeaderKey); - String thisHeaderValueString = new String(thisHeaderValue); - KafkaHeader thisMessageHeader = new KafkaHeader(); - thisMessageHeader.setKey(thisHeaderKey); - thisMessageHeader.setValue(thisHeaderValueString); - headersInThisMessage.add(thisMessageHeader); - logger.log("Header - Key: " + thisHeaderKey + ", Value: " + thisHeaderValueString); - } - } - } else { - logger.log("No headers in message"); - } - - thisMessage.setHeaders(headersInThisMessage); - listOfMessages.add(thisMessage); - - // Below we are logging the particular kafka message in string format using the toString method - // as well as in Json format using gson.toJson function - logger.log("Received this message from Kafka - " + thisMessage.toString()); - logger.log("Message in JSON format : " + gson.toJson(thisMessage)); - - // Add a more readable summary of the message - logger.log("=== MESSAGE SUMMARY ==="); - logger.log("Topic: " + thisMessage.getTopic()); - logger.log("Partition: " + thisMessage.getPartition()); - logger.log("Offset: " + thisMessage.getOffset()); - logger.log("Key: " + thisMessage.getDecodedKey()); - - // Check for zip code patterns in the decoded value - String decodedValueStr = thisMessage.getDecodedValue(); - if (decodedValueStr != null) { - if (decodedValueStr.contains("1000")) { - logger.log("ZIP CODE: Found 1000 pattern in message"); - zip1000Count++; - } - if (decodedValueStr.contains("2000")) { - logger.log("ZIP CODE: Found 2000 pattern in message"); - zip2000Count++; - } - } - - logger.log("=== END MESSAGE SUMMARY ==="); - } - } - logger.log("All Messages in this batch = " + gson.toJson(listOfMessages)); - - // Log summary of zip code distribution - logger.log("========== ZIP CODE DISTRIBUTION SUMMARY =========="); - logger.log("Messages with zip code containing 1000: " + zip1000Count); - logger.log("Messages with zip code containing 2000: " + zip2000Count); - logger.log("Other messages: " + (listOfMessages.size() - zip1000Count - zip2000Count)); - logger.log("===================================================="); - - logger.log("========== LAMBDA FUNCTION COMPLETED =========="); - return response; - } - - /** - * Convert byte array to hexadecimal string representation - * - * @param bytes Byte array to convert - * @param maxLength Maximum number of bytes to convert (0 for all) - * @return Hexadecimal string representation - */ - private String bytesToHexString(byte[] bytes, int maxLength) { - StringBuilder sb = new StringBuilder(); - int length = maxLength > 0 && maxLength < bytes.length ? maxLength : bytes.length; - - for (int i = 0; i < length; i++) { - sb.append(String.format("%02X", bytes[i])); - if (i % 16 == 15) { - sb.append("\n"); - } else if (i % 4 == 3) { - sb.append(" "); - } - } - - if (maxLength > 0 && length < bytes.length) { - sb.append("... (").append(bytes.length - length).append(" more bytes)"); - } - - return sb.toString(); - } -} diff --git a/msk-lambda-schema-avro-java-sam/kafka_event_consumer_function/src/main/java/com/amazonaws/services/lambda/samples/events/msk/SimpleHandler.java b/msk-lambda-schema-avro-java-sam/kafka_event_consumer_function/src/main/java/com/amazonaws/services/lambda/samples/events/msk/SimpleHandler.java deleted file mode 100644 index 03c4e5092..000000000 --- a/msk-lambda-schema-avro-java-sam/kafka_event_consumer_function/src/main/java/com/amazonaws/services/lambda/samples/events/msk/SimpleHandler.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.amazonaws.services.lambda.samples.events.msk; - -import com.amazonaws.services.lambda.runtime.Context; -import com.amazonaws.services.lambda.runtime.RequestHandler; - -public class SimpleHandler implements RequestHandler { - - @Override - public String handleRequest(Object event, Context context) { - System.out.println("=== SIMPLE HANDLER CALLED ==="); - System.out.println("Event: " + event); - System.out.println("Event class: " + (event != null ? event.getClass().getName() : "null")); - System.out.println("Context: " + context.getFunctionName()); - System.out.println("=== SIMPLE HANDLER END ==="); - - return "Simple handler executed successfully"; - } -} diff --git a/msk-lambda-schema-avro-java-sam/kafka_event_consumer_function/src/test/java/com/amazonaws/services/lambda/samples/events/msk/HandlerMSKTest.java b/msk-lambda-schema-avro-java-sam/kafka_event_consumer_function/src/test/java/com/amazonaws/services/lambda/samples/events/msk/HandlerMSKTest.java deleted file mode 100644 index 65bd543d3..000000000 --- a/msk-lambda-schema-avro-java-sam/kafka_event_consumer_function/src/test/java/com/amazonaws/services/lambda/samples/events/msk/HandlerMSKTest.java +++ /dev/null @@ -1,70 +0,0 @@ -package com.amazonaws.services.lambda.samples.events.msk; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -import org.junit.jupiter.api.Test; - -import com.amazonaws.services.lambda.runtime.Context; -import com.amazonaws.services.lambda.runtime.events.KafkaEvent; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; - - -class HandlerMSKTest { - private static final String kafkaEventJson = "{\n" - + " \"records\":{\n" - + " \"myTopic-0\":[\n" - + " {\n" - + " \"topic\":\"myTopic\",\n" - + " \"partition\":0,\n" - + " \"offset\":250,\n" - + " \"timestamp\":1678072110111,\n" - + " \"timestampType\":\"CREATE_TIME\",\n" - + " \"value\":\"Zg==\",\n" - + " \"headers\":[\n" - + " \n" - + " ]\n" - + " },\n" - + " {\n" - + " \"topic\":\"myTopic\",\n" - + " \"partition\":0,\n" - + " \"offset\":251,\n" - + " \"timestamp\":1678072111086,\n" - + " \"timestampType\":\"CREATE_TIME\",\n" - + " \"value\":\"Zw==\",\n" - + " \"headers\":[\n" - + " \n" - + " ]\n" - + " }\n" - + " ]\n" - + " },\n" - + " \"eventSource\":\"aws:kafka\",\n" - + " \"eventSourceArn\":\"arn:aws:kafka:us-west-2:123456789012:cluster/MSKWorkshopCluster/a93759a9-c9d0-4952-984c-492c6bfa2be8-13\",\n" - + " \"bootstrapServers\":\"b-2.mskworkshopcluster.z9kc4f.c13.kafka.us-west-2.amazonaws.com:9098,b-3.mskworkshopcluster.z9kc4f.c13.kafka.us-west-2.amazonaws.com:9098,b-1.mskworkshopcluster.z9kc4f.c13.kafka.us-west-2.amazonaws.com:9098\"\n" - + "}"; - - @Test - void invokeTest() { - Gson gson = new GsonBuilder().setPrettyPrinting().create(); - KafkaEvent event = gson.fromJson(kafkaEventJson, KafkaEvent.class); - Context context = new TestContext(); - HandlerMSK handler = new HandlerMSK(); - String result = handler.handleRequest(event, context); - assertEquals(result, "200 OK"); - assertEquals(handler.listOfMessages.size(), 2); - assertEquals(handler.listOfMessages.get(0).getTopic(), "myTopic"); - assertEquals(handler.listOfMessages.get(0).getPartition(), 0); - assertEquals(handler.listOfMessages.get(0).getOffset(), 250L); - assertEquals(handler.listOfMessages.get(0).getTimestamp(), 1678072110111L); - assertEquals(handler.listOfMessages.get(0).getTimestampType(), "CREATE_TIME"); - assertEquals(handler.listOfMessages.get(0).getDecodedKey(), "null"); - assertEquals(handler.listOfMessages.get(0).getDecodedValue(), "f"); - assertEquals(handler.listOfMessages.get(1).getTopic(), "myTopic"); - assertEquals(handler.listOfMessages.get(1).getPartition(), 0); - assertEquals(handler.listOfMessages.get(1).getOffset(), 251L); - assertEquals(handler.listOfMessages.get(1).getTimestamp(), 1678072111086L); - assertEquals(handler.listOfMessages.get(1).getTimestampType(), "CREATE_TIME"); - assertEquals(handler.listOfMessages.get(1).getDecodedKey(), "null"); - assertEquals(handler.listOfMessages.get(1).getDecodedValue(), "g"); - } -} diff --git a/msk-lambda-schema-avro-java-sam/template.yaml b/msk-lambda-schema-avro-java-sam/template.yaml deleted file mode 100644 index c416c991d..000000000 --- a/msk-lambda-schema-avro-java-sam/template.yaml +++ /dev/null @@ -1,233 +0,0 @@ -AWSTemplateFormatVersion: '2010-09-09' -Transform: AWS::Serverless-2016-10-31 -Description: > - kafka_event_consumer_and_producer_functions - - Sample SAM Template for MSK consumer and AVRO producer with IAM auth - -# More info about Globals: https://github.com/awslabs/serverless-application-model/blob/master/docs/globals.rst -Globals: - Function: - Timeout: 15 - -Resources: - # SQS Queue to use as Dead Letter Queue for the MSK event source mapping - ConsumerDLQ: - Type: AWS::SQS::Queue - Properties: - MessageRetentionPeriod: 1209600 # 14 days (maximum retention period) - VisibilityTimeout: 300 # 5 minutes - Tags: - - Key: Purpose - Value: MSKConsumerDLQ - - LambdaMSKConsumerJavaFunction: - Type: AWS::Serverless::Function # More info about Function Resource: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#awsserverlessfunction - Properties: - CodeUri: kafka_event_consumer_function - Handler: com.amazonaws.services.lambda.samples.events.msk.AvroKafkaHandler::handleRequest - Runtime: java21 - Architectures: - - x86_64 - MemorySize: 512 - VpcConfig: - SecurityGroupIds: !Ref SecurityGroupIds - SubnetIds: !Ref SubnetIds - Environment: # More info about Env Vars: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#environment-object - Variables: - PARAM1: VALUE - JAVA_TOOL_OPTIONS: -XX:+TieredCompilation -XX:TieredStopAtLevel=1 # More info about tiered compilation https://aws.amazon.com/blogs/compute/optimizing-aws-lambda-function-performance-for-java/ - POWERTOOLS_LOG_LEVEL: TRACE - POWERTOOLS_SERVICE_NAME: kafka_consumer - Events: - MSKEvent: - Type: MSK - Properties: - StartingPosition: LATEST - BatchSize: 1 - MaximumBatchingWindowInSeconds: 1 - Stream: !Join [ '', ["arn:", "aws:", "kafka:", !Ref "AWS::Region" , ":" ,!Ref "AWS::AccountId", ":", "cluster/", !Ref MSKClusterName, "/" , !Ref MSKClusterId] ] - Topics: - - !Ref MSKTopic - DestinationConfig: - OnFailure: - Destination: !GetAtt ConsumerDLQ.Arn - FilterCriteria: - Filters: - - Pattern: '{"value": {"zip": [ { "prefix": "2000" } ]}}' - Policies: - - Statement: - - Sid: KafkaClusterPermissionsPolicy - Effect: Allow - Action: - - kafka-cluster:Connect - - kafka-cluster:DescribeGroup - - kafka-cluster:DescribeCluster - - kafka-cluster:AlterCluster - - kafka-cluster:AlterClusterDynamicConfiguration - - kafka-cluster:WriteDataIdempotently - - kafka-cluster:AlterGroup - - kafka-cluster:DescribeTopic - - kafka-cluster:ReadData - - kafka-cluster:DescribeClusterDynamicConfiguration - Resource: - - !Join ['', ["arn:", "aws:", "kafka:", !Ref "AWS::Region", ":", !Ref "AWS::AccountId", ":", "cluster/", !Ref MSKClusterName, "/" , !Ref MSKClusterId]] - - !Join ['', ["arn:", "aws:", "kafka:", !Ref "AWS::Region", ":", !Ref "AWS::AccountId", ":", "topic/", !Ref MSKClusterName, "/" , !Ref MSKClusterId, "/*"]] - - !Join ['', ["arn:", "aws:", "kafka:", !Ref "AWS::Region", ":", !Ref "AWS::AccountId", ":", "group/", !Ref MSKClusterName, "/" , !Ref MSKClusterId, "/*"]] - - - Sid: KafkaPermissionsPolicy - Effect: Allow - Action: - - kafka:DescribeClusterV2 - - kafka:GetBootstrapBrokers - Resource: '*' - - - Sid: EC2PermissionsPolicy - Effect: Allow - Action: - - ec2:DescribeSecurityGroups - - ec2:DescribeSubnets - - ec2:DescribeVpcs - - ec2:CreateNetworkInterface - - ec2:DescribeNetworkInterfaces - - ec2:DeleteNetworkInterface - Resource: '*' - - - Sid: GlueSchemaRegistryPermissionsPolicy - Effect: Allow - Action: - - glue:GetSchemaByDefinition - - glue:GetSchemaVersion - - glue:GetRegistry - - glue:ListSchemas - - glue:ListSchemaVersions - - glue:RegisterSchemaVersion - - glue:PutSchemaVersionMetadata - - glue:GetSchemaVersionsDiff - - glue:QuerySchemaVersionMetadata - Resource: '*' - - - Sid: SQSPermissionsPolicy - Effect: Allow - Action: - - sqs:SendMessage - Resource: !GetAtt ConsumerDLQ.Arn - - VPCAccessPolicy: {} - - LambdaMSKProducerJavaFunction: - Type: AWS::Serverless::Function - Properties: - CodeUri: kafka_event_producer_function - Handler: com.amazonaws.services.lambda.samples.events.msk.AvroProducerHandler::handleRequest - Runtime: java21 - Timeout: 300 - Architectures: - - x86_64 - MemorySize: 512 - VpcConfig: - SecurityGroupIds: !Ref SecurityGroupIds - SubnetIds: !Ref SubnetIds - Environment: - Variables: - MSK_CLUSTER_ARN: !Join [ '', ["arn:", "aws:", "kafka:", !Ref "AWS::Region" , ":" ,!Ref "AWS::AccountId", ":", "cluster/", !Ref MSKClusterName, "/" , !Ref MSKClusterId] ] - MSK_TOPIC: !Ref MSKTopic - REGISTRY_NAME: !Ref GlueSchemaRegistryName - CONTACT_SCHEMA_NAME: !Ref ContactSchemaName - JAVA_TOOL_OPTIONS: -XX:+TieredCompilation -XX:TieredStopAtLevel=1 - Policies: - - Statement: - - Sid: KafkaClusterPermissionsPolicy - Effect: Allow - Action: - - kafka-cluster:Connect - - kafka-cluster:DescribeCluster - - kafka-cluster:WriteData - - kafka-cluster:DescribeTopic - Resource: - - !Join ['', ["arn:", "aws:", "kafka:", !Ref "AWS::Region", ":", !Ref "AWS::AccountId", ":", "cluster/", !Ref MSKClusterName, "/" , !Ref MSKClusterId]] - - !Join ['', ["arn:", "aws:", "kafka:", !Ref "AWS::Region", ":", !Ref "AWS::AccountId", ":", "topic/", !Ref MSKClusterName, "/" , !Ref MSKClusterId, "/*"]] - - - Sid: KafkaPermissionsPolicy - Effect: Allow - Action: - - kafka:DescribeClusterV2 - - kafka:GetBootstrapBrokers - Resource: '*' - - - Sid: EC2PermissionsPolicy - Effect: Allow - Action: - - ec2:DescribeSecurityGroups - - ec2:DescribeSubnets - - ec2:DescribeVpcs - - ec2:CreateNetworkInterface - - ec2:DescribeNetworkInterfaces - - ec2:DeleteNetworkInterface - Resource: '*' - - - Sid: GlueSchemaRegistryPermissionsPolicy - Effect: Allow - Action: - - glue:GetSchemaByDefinition - - glue:GetSchemaVersion - - glue:GetRegistry - - glue:ListSchemas - - glue:ListSchemaVersions - - glue:GetSchemaVersionsDiff - - glue:QuerySchemaVersionMetadata - - glue:RegisterSchemaVersion - - glue:PutSchemaVersionMetadata - - glue:CreateSchema - - glue:CreateRegistry - Resource: - - !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:registry/*" - - !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:schema/*/*" - - VPCAccessPolicy: {} - -Parameters: - MSKClusterName: - Type: String - Description: Enter the name of the MSK Cluster - Default: CLUSTER_NAME - MSKClusterId: - Type: String - Description: Enter the ID of the MSK Cluster - Default: CLUSTER_ID - MSKTopic: - Type: String - Description: Enter the name of the MSK Topic - Default: KAFKA_TOPIC - GlueSchemaRegistryName: - Type: String - Description: Enter the name of the Glue Schema Registry - Default: GLUE_SCHEMA_REGISTRY_NAME - ContactSchemaName: - Type: String - Description: Enter the name of the Contact Schema - Default: AVRO_SCHEMA - VpcId: - Type: String - Description: Enter the VPC ID where the MSK cluster is deployed - Default: VPC_ID - SubnetIds: - Type: CommaDelimitedList - Description: Enter the subnet IDs where the MSK cluster is deployed (comma-separated) - Default: SUBNET_IDS - SecurityGroupIds: - Type: CommaDelimitedList - Description: Enter the security group IDs that allow access to the MSK cluster (comma-separated) - Default: LAMBDA_SECURITY_GROUP_ID - -Outputs: - MSKConsumerLambdaFunction: - Description: "Topic Consumer Lambda Function ARN" - Value: !GetAtt LambdaMSKConsumerJavaFunction.Arn - MSKProducerLambdaFunction: - Description: "AVRO Producer Lambda Function ARN" - Value: !GetAtt LambdaMSKProducerJavaFunction.Arn - ConsumerDLQUrl: - Description: "URL of the Dead Letter Queue for the MSK Consumer" - Value: !Ref ConsumerDLQ - ConsumerDLQArn: - Description: "ARN of the Dead Letter Queue for the MSK Consumer" - Value: !GetAtt ConsumerDLQ.Arn diff --git a/msk-lambda-schema-avro-java-sam/template_original.yaml b/msk-lambda-schema-avro-java-sam/template_original.yaml index e77964ce2..711ea0edf 100644 --- a/msk-lambda-schema-avro-java-sam/template_original.yaml +++ b/msk-lambda-schema-avro-java-sam/template_original.yaml @@ -25,7 +25,7 @@ Resources: Type: AWS::Serverless::Function # More info about Function Resource: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#awsserverlessfunction Properties: CodeUri: kafka_event_consumer_function - Handler: com.amazonaws.services.lambda.samples.events.msk.HandlerMSK::handleRequest + Handler: com.amazonaws.services.lambda.samples.events.msk.AvroKafkaHandler::handleRequest Runtime: java21 Architectures: - x86_64 From 3cc30b23b86e79d9547bac08f21ff5b891a58a74 Mon Sep 17 00:00:00 2001 From: Vaibhav Jain Date: Sun, 22 Jun 2025 23:22:43 -0400 Subject: [PATCH 10/10] Added env variables --- msk-lambda-schema-avro-java-sam/template_original.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/msk-lambda-schema-avro-java-sam/template_original.yaml b/msk-lambda-schema-avro-java-sam/template_original.yaml index 711ea0edf..c79adc47f 100644 --- a/msk-lambda-schema-avro-java-sam/template_original.yaml +++ b/msk-lambda-schema-avro-java-sam/template_original.yaml @@ -37,6 +37,8 @@ Resources: Variables: PARAM1: VALUE JAVA_TOOL_OPTIONS: -XX:+TieredCompilation -XX:TieredStopAtLevel=1 # More info about tiered compilation https://aws.amazon.com/blogs/compute/optimizing-aws-lambda-function-performance-for-java/ + POWERTOOLS_LOG_LEVEL: TRACE + POWERTOOLS_SERVICE_NAME: kafka_consumer Events: MSKEvent: Type: MSK