From 0fa90a29367f4c7c10ac68d223c4492629cb8c03 Mon Sep 17 00:00:00 2001 From: Indranil Banerjee Date: Mon, 21 Oct 2024 15:16:24 -0700 Subject: [PATCH 01/12] msk-lambda-iam-java-sam - Java AWS Lambda Kafka consumer with IAM auth, using AWS SAM --- msk-lambda-iam-java-sam/.gitignore | 23 ++ msk-lambda-iam-java-sam/README.md | 134 ++++++++++++ msk-lambda-iam-java-sam/events/event.json | 31 +++ msk-lambda-iam-java-sam/example-pattern.json | 90 ++++++++ .../kafka_event_consumer_function/.gitignore | 1 + .../.settings/org.eclipse.jdt.core.prefs | 8 + .../kafka_event_consumer_function/pom.xml | 67 ++++++ .../lambda/samples/events/msk/HandlerMSK.java | 110 ++++++++++ .../samples/events/msk/KafkaHeader.java | 67 ++++++ .../samples/events/msk/KafkaMessage.java | 198 ++++++++++++++++++ .../samples/events/msk/HandlerMSKTest.java | 70 +++++++ .../samples/events/msk/TestContext.java | 45 ++++ .../lambda/samples/events/msk/TestLogger.java | 14 ++ msk-lambda-iam-java-sam/template.yaml | 87 ++++++++ 14 files changed, 945 insertions(+) create mode 100644 msk-lambda-iam-java-sam/.gitignore create mode 100644 msk-lambda-iam-java-sam/README.md create mode 100644 msk-lambda-iam-java-sam/events/event.json create mode 100644 msk-lambda-iam-java-sam/example-pattern.json create mode 100644 msk-lambda-iam-java-sam/kafka_event_consumer_function/.gitignore create mode 100644 msk-lambda-iam-java-sam/kafka_event_consumer_function/.settings/org.eclipse.jdt.core.prefs create mode 100644 msk-lambda-iam-java-sam/kafka_event_consumer_function/pom.xml create mode 100644 msk-lambda-iam-java-sam/kafka_event_consumer_function/src/main/java/com/amazonaws/services/lambda/samples/events/msk/HandlerMSK.java create mode 100644 msk-lambda-iam-java-sam/kafka_event_consumer_function/src/main/java/com/amazonaws/services/lambda/samples/events/msk/KafkaHeader.java create mode 100644 msk-lambda-iam-java-sam/kafka_event_consumer_function/src/main/java/com/amazonaws/services/lambda/samples/events/msk/KafkaMessage.java create mode 100644 msk-lambda-iam-java-sam/kafka_event_consumer_function/src/test/java/com/amazonaws/services/lambda/samples/events/msk/HandlerMSKTest.java create mode 100644 msk-lambda-iam-java-sam/kafka_event_consumer_function/src/test/java/com/amazonaws/services/lambda/samples/events/msk/TestContext.java create mode 100644 msk-lambda-iam-java-sam/kafka_event_consumer_function/src/test/java/com/amazonaws/services/lambda/samples/events/msk/TestLogger.java create mode 100644 msk-lambda-iam-java-sam/template.yaml diff --git a/msk-lambda-iam-java-sam/.gitignore b/msk-lambda-iam-java-sam/.gitignore new file mode 100644 index 000000000..cf4f80259 --- /dev/null +++ b/msk-lambda-iam-java-sam/.gitignore @@ -0,0 +1,23 @@ +node_modules +npm-debug.log +package-lock.json +package +*out.yml +out.json +bucket-name.txt +target +build +.gradle +*.zip +bin +obj +Gemfile.lock +lib +__pycache__ +*.pyc +.classpath +.factorypath +.project +.settings/* +.aws +.sam diff --git a/msk-lambda-iam-java-sam/README.md b/msk-lambda-iam-java-sam/README.md new file mode 100644 index 000000000..f58d74424 --- /dev/null +++ b/msk-lambda-iam-java-sam/README.md @@ -0,0 +1,134 @@ +# msk-lambda-iam-java-sam +# Java AWS Lambda Kafka consumer with IAM auth, using AWS SAM + +This pattern is an example of a Lambda function that consumes messages from an Amazon Managed Streaming for Kafka (Amazon MSK) topic, where the MSK Cluster has been configured to use IAM authentication. This pattern assumes you already have an MSK cluster with a topic configured, if you need a sample pattern to deploy an MSK cluster either in Provisioned or Serverless modes please see the [msk-cfn-sasl-lambda pattern](https://serverlessland.com/patterns/msk-cfn-sasl-lambda). + +This project contains source code and supporting files for a serverless application that you can deploy with the SAM CLI. It includes the following files and folders. + +- kafka_event_consumer_function/src/main/java - Code for the application's Lambda function. +- events - Invocation events that you can use to invoke the function. +- kafka_event_consumer_function/src/test/java - Unit tests for the application code. +- template.yaml - A template that defines the application's AWS resources. + +The application uses several AWS resources, including Lambda functions and an MSK event source. These resources are defined in the `template.yaml` file in this project. You can update the template to add AWS resources through the same deployment process that updates your application code. + +Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example. + +## Requirements + +* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources. +* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured +* [Git installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git) +* [AWS Serverless Application Model](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html) (AWS SAM) installed +* Create MSK cluster and topic that will be used for testing. It is important to create the topic before deploying the Lambda function, otherwise the event source mapping will stay disabled. + + +Before proceeding with the next step, please make sure you have Java JDK and Maven installed on your machine + +For the latest version of Amazon Corretto JDK (at the time of publishing), please go the following link: + +https://docs.aws.amazon.com/corretto/latest/corretto-21-ug/downloads-list.html + +Please follow the instructions to download and install the JDK for your Operating System + +Note that you don't have to use Amazon Corretto JDK but can use JDK from another source as well + +For the latest version of Maven (at the time of publishing) please go the following link: + +https://maven.apache.org/download.cgi#:~:text=Apache%20Maven%203.9.9%20is,recommended%20version%20for%20all%20users. + +Please follow the instructions to download and install Maven and then add the location to the bin folder of Maven in your System PATH + +To ensure Java and Maven are correctly installed, run the commands: + +java --version + +mvn --version + + +## Deploy the sample application + +The AWS SAM CLI is a serverless tool for building and testing Lambda applications. It uses Docker to locally test your functions in an Amazon Linux environment that resembles the Lambda execution environment. It can also emulate your application's build environment and API. + +To use the AWS SAM CLI, you need the following tools. + +* AWS SAM CLI - [Install the AWS SAM CLI](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html) +* Docker - [Install Docker community edition](https://hub.docker.com/search/?type=edition&offering=community) + +1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository: + ``` + git clone https://github.com/aws-samples/serverless-patterns.git + ``` +1. Change directory to the pattern directory: + ``` + cd msk-lambda-iam-java-sam + ``` + +## Use the SAM CLI to build and test locally + +Build your application with the `sam build` command. + +```bash +sam build +``` + +The SAM CLI installs dependencies defined in `kafka_event_consumer_function/pom.xml`, creates a deployment package, and saves it in the `.aws-sam/build` folder. + +Test a single function by invoking it directly with a test event. An event is a JSON document that represents the input that the function receives from the event source. Test events are included in the `events` folder in this project. + +Run functions locally and invoke them with the `sam local invoke` command. + +```bash +sam local invoke --event events/event.json +``` + +## Deploy the sample application + +The Serverless Application Model Command Line Interface (SAM CLI) is an extension of the AWS CLI that adds functionality for building and testing Lambda applications. It uses Docker to run your functions in an Amazon Linux environment that matches Lambda. It can also emulate your application's build environment and API. + +To use the SAM CLI, you need the following tools. + +* SAM CLI - [Install the SAM CLI](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html) +* Docker - [Install Docker community edition](https://hub.docker.com/search/?type=edition&offering=community) + +To build and deploy your application for the first time, run the following in your shell: + +```bash +sam build +sam deploy --guided +``` + +The first command will build the source of your application. The second command will package and deploy your application to AWS, with a series of prompts: + +* **Stack Name**: The name of the stack to deploy to CloudFormation. This should be unique to your account and region, and a good starting point would be something matching your project name. +* **AWS Region**: The AWS region you want to deploy your app to. +* **Parameter MSKClusterName**: The name of the MSKCluster +* **Parameter MSKClusterId**: The unique ID of the MSKCluster +* **Parameter MSKTopic**: The Kafka topic on which the lambda function will listen on +* **Confirm changes before deploy**: If set to yes, any change sets will be shown to you before execution for manual review. If set to no, the AWS SAM CLI will automatically deploy application changes. +* **Allow SAM CLI IAM role creation**: Many AWS SAM templates, including this example, create AWS IAM roles required for the AWS Lambda function(s) included to access AWS services. By default, these are scoped down to minimum required permissions. To deploy an AWS CloudFormation stack which creates or modifies IAM roles, the `CAPABILITY_IAM` value for `capabilities` must be provided. If permission isn't provided through this prompt, to deploy this example you must explicitly pass `--capabilities CAPABILITY_IAM` to the `sam deploy` command. +* **Disable rollback**: Defaults to No and it preserves the state of previously provisioned resources when an operation fails +* **Save arguments to configuration file**: If set to yes, your choices will be saved to a configuration file inside the project, so that in the future you can just re-run `sam deploy` without parameters to deploy changes to your application. +* **SAM configuration file [samconfig.toml]**: Name of the configuration file to store configuration information locally +* **SAM configuration environment [default]**: Environment for storing deployment information locally + +You should get a message "Successfully created/updated stack - in " if all goes well + + +## Test the sample application + +Once the lambda function is deployed, send some Kafka messages on the topic that the lambda function is listening on, on the MSK server. + +Either send at least 10 messages or wait for 300 seconds (check the values of BatchSize: 10 and MaximumBatchingWindowInSeconds: 300 in the template.yaml file) + +Then check Cloudwatch logs and you should see messages for the Cloudwatch Log Group with the name of the deployed Lambda function. + +The lambda code parses the Kafka messages and outputs the fields in the Kafka messages to Cloudwatch logs + +A single lambda function receives a batch of messages. The messages are received as a map with each key being a combination of the topic and the partition, as a single batch can receive messages from multiple partitions. + +Each key has a list of messages. Each Kafka message has the following properties - Topic, Partition, Offset, TimeStamp, TimeStampType, Key and Value + +The Key and Value are base64 encoded and have to be decoded. A message can also have a list of headers, each header having a key and a value. + +The code in this example prints out the fields in the Kafka message and also decrypts the key and the value and logs them in Cloudwatch logs. diff --git a/msk-lambda-iam-java-sam/events/event.json b/msk-lambda-iam-java-sam/events/event.json new file mode 100644 index 000000000..b527986ab --- /dev/null +++ b/msk-lambda-iam-java-sam/events/event.json @@ -0,0 +1,31 @@ +{ + "records":{ + "myTopic-0":[ + { + "topic":"myTopic", + "partition":0, + "offset":250, + "timestamp":1678072110111, + "timestampType":"CREATE_TIME", + "value":"Zg==", + "headers":[ + + ] + }, + { + "topic":"myTopic", + "partition":0, + "offset":251, + "timestamp":1678072111086, + "timestampType":"CREATE_TIME", + "value":"Zw==", + "headers":[ + + ] + } + ] + }, + "eventSource":"aws:kafka", + "eventSourceArn":"arn:aws:kafka:us-west-2:123456789012:cluster/MSKWorkshopCluster/a93759a9-c9d0-4952-984c-492c6bfa2be8-13", + "bootstrapServers":"b-2.mskworkshopcluster.z9kc4f.c13.kafka.us-west-2.amazonaws.com:9098,b-3.mskworkshopcluster.z9kc4f.c13.kafka.us-west-2.amazonaws.com:9098,b-1.mskworkshopcluster.z9kc4f.c13.kafka.us-west-2.amazonaws.com:9098" +} \ No newline at end of file diff --git a/msk-lambda-iam-java-sam/example-pattern.json b/msk-lambda-iam-java-sam/example-pattern.json new file mode 100644 index 000000000..571060cb0 --- /dev/null +++ b/msk-lambda-iam-java-sam/example-pattern.json @@ -0,0 +1,90 @@ +{ + "title": "AWS Lambda function subscribed to an Amazon MSK topic using IAM auth", + "description": "Creates a Lambda function that uses an Amazon MSK topic as an event source with IAM authentication.", + "language": "Java", + "level": "200", + "framework": "SAM", + "introBox": { + "headline": "How it works", + "text": [ + "This pattern provides a Lambda function along with an Event Source Mapping to a Kafka topic.", + "It requires that you already have an Amazon Managed Streaming for Kafka (Amazon MSK) cluster setup with a topic created. ", + "If you don't already have an MSK cluster, you can use the example in this pattern https://serverlessland.com/patterns/msk-cfn-sasl-lambda (linked in the resources) to deploy a cluster.", + "This pattern works with either a Provisioned or Serverless MSK cluster as long as the cluster is configured to use IAM authentication. ", + "For detailed deployment instructions instructions see the README." + ] + }, + "gitHub": { + "template": { + "repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/msk-lambda-iam-java-sam", + "templateURL": "serverless-patterns/msk-lambda-iam-java-sam", + "projectFolder": "msk-lambda-iam-java-sam", + "templateFile": "template.yaml" + } + }, + "resources": { + "bullets": [ + { + "text": "Amazon MSK cluster pattern", + "link": "https://serverlessland.com/patterns/msk-cfn-sasl-lambda" + }, + { + "text": "Using AWS Lambda with Amazon MSK", + "link": "https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html" + }, + { + "text": "AWS CloudFormation Provisioned MSK cluster reference", + "link": "https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-msk-cluster.html" + }, + { + "text": "AWS CloudFormation Serverless MSK cluster reference", + "link": "https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-msk-serverlesscluster.html" + } + ] + }, + "deploy": { + "text": [ + "sam deploy --guided" + ] + }, + "testing": { + "text": [ + "See the GitHub repo for detailed testing instructions." + ] + }, + "cleanup": { + "text": [ + "Delete the template: sam delete." + ] + }, + "authors": [ + { + "name": "Indranil Banerjee", + "bio": "AWS - Senior Solutions Architect", + "linkedin": "https://www.linkedin.com/in/indranil-banerjee-b00a261/" + }, + { + "name": "Vaibhav Jain", + "bio": "AWS - Sr. Application Architect", + "image": "https://media.licdn.com/dms/image/C4E03AQEqzZWHGT4dBQ/profile-displayphoto-shrink_800_800/0/1580165399872?e=1687392000&v=beta&t=zdxENLnqCpqCz9i1Uf5Yx4YXlR9EYvgxP8N5UTsy6J8", + "linkedin": "https://www.linkedin.com/in/vaibhavjainv/" + }, + { + "name": "Paveen Allam", + "bio": "Senior Solutions Architect", + "image": "https://www.fintail.me/images/pa.jpg", + "linkedin": "https://www.linkedin.com/in/pallam/" + }, + { + "name": "Suraj Tripathi", + "bio": "AWS - AppDev Cloud Consultant", + "linkedin": "https://www.linkedin.com/in/suraj-tripathi-01b49a140/" + }, + { + "name": "Adam Wagner", + "bio": "AWS - Principal Serverless Solutions Architect", + "linkedin": "https://www.linkedin.com/in/adam-wagner-4bb412/" + } + ] + } + \ No newline at end of file diff --git a/msk-lambda-iam-java-sam/kafka_event_consumer_function/.gitignore b/msk-lambda-iam-java-sam/kafka_event_consumer_function/.gitignore new file mode 100644 index 000000000..b83d22266 --- /dev/null +++ b/msk-lambda-iam-java-sam/kafka_event_consumer_function/.gitignore @@ -0,0 +1 @@ +/target/ diff --git a/msk-lambda-iam-java-sam/kafka_event_consumer_function/.settings/org.eclipse.jdt.core.prefs b/msk-lambda-iam-java-sam/kafka_event_consumer_function/.settings/org.eclipse.jdt.core.prefs new file mode 100644 index 000000000..2af1e7b99 --- /dev/null +++ b/msk-lambda-iam-java-sam/kafka_event_consumer_function/.settings/org.eclipse.jdt.core.prefs @@ -0,0 +1,8 @@ +eclipse.preferences.version=1 +org.eclipse.jdt.core.compiler.codegen.targetPlatform=11 +org.eclipse.jdt.core.compiler.compliance=11 +org.eclipse.jdt.core.compiler.problem.enablePreviewFeatures=disabled +org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning +org.eclipse.jdt.core.compiler.problem.reportPreviewFeatures=ignore +org.eclipse.jdt.core.compiler.release=disabled +org.eclipse.jdt.core.compiler.source=11 diff --git a/msk-lambda-iam-java-sam/kafka_event_consumer_function/pom.xml b/msk-lambda-iam-java-sam/kafka_event_consumer_function/pom.xml new file mode 100644 index 000000000..ffd678ec0 --- /dev/null +++ b/msk-lambda-iam-java-sam/kafka_event_consumer_function/pom.xml @@ -0,0 +1,67 @@ + + 4.0.0 + com.amazonaws.services.lambda.samples.events.msk + MSKConsumer + 1.0 + jar + A sample Lambda MSK consumer + + 11 + 11 + + + + + com.amazonaws + aws-lambda-java-core + 1.2.1 + + + com.amazonaws + aws-lambda-java-events + 3.11.0 + + + com.google.code.gson + gson + 2.10.1 + + + + org.apache.kafka + kafka-clients + 3.4.0 + + + org.junit.jupiter + junit-jupiter-api + 5.6.0 + test + + + org.junit.jupiter + junit-jupiter-engine + 5.6.0 + test + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.4 + + + + package + + shade + + + + + + + diff --git a/msk-lambda-iam-java-sam/kafka_event_consumer_function/src/main/java/com/amazonaws/services/lambda/samples/events/msk/HandlerMSK.java b/msk-lambda-iam-java-sam/kafka_event_consumer_function/src/main/java/com/amazonaws/services/lambda/samples/events/msk/HandlerMSK.java new file mode 100644 index 000000000..7e140a603 --- /dev/null +++ b/msk-lambda-iam-java-sam/kafka_event_consumer_function/src/main/java/com/amazonaws/services/lambda/samples/events/msk/HandlerMSK.java @@ -0,0 +1,110 @@ +//Lambda Runtime delivers a batch of messages to the lambda function +//Each batch of messages has two fields EventSource and EventSourceARN +//Each batch of messages also has a field called Records +//The Records is a map with multiple keys and values +//Each key is a combination of the Topic Name and the Partition Number +//One batch of messages can contain messages from multiple partitions + +/* +To simplify representing a batch of Kafka messages as a list of messages +We have created a Java class called KafkaMessage under the models package +Here we are mapping the structure of an incoming Kafka event to a list of +objects of the KafkaMessage class +*/ + +package com.amazonaws.services.lambda.samples.events.msk; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.LambdaLogger; +import com.amazonaws.services.lambda.runtime.RequestHandler; +import com.amazonaws.services.lambda.runtime.events.KafkaEvent; +import com.amazonaws.services.lambda.runtime.events.KafkaEvent.KafkaEventRecord; + +import java.util.ArrayList; +import java.util.Base64; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + +public class HandlerMSK implements RequestHandler{ + //We initialize an empty list of the KafkaMessage class + List listOfMessages = new ArrayList(); + Gson gson = new GsonBuilder().setPrettyPrinting().create(); + @Override + public String handleRequest(KafkaEvent event, Context context) { + LambdaLogger logger = context.getLogger(); + String response = new String("200 OK"); + this.listOfMessages = new ArrayList(); + //Incoming KafkaEvent object has a property called records that is a map + //Each key in the map is a combination of a topic and a partition + Map> record=event.getRecords(); + Set keySet = record.keySet(); + Iterator iterator = keySet.iterator(); + //We iterate through each of the keys in the map + while (iterator.hasNext()) { + String thisKey=(String)iterator.next(); + //Using the key we retrieve the value of the map which is a list of KafkaEventRecord + //One object of KafkaEventRecord represents an individual Kafka message + List thisListOfRecords = record.get(thisKey); + //We now iterate through the list of KafkaEventRecords + for(KafkaEventRecord thisRecord : thisListOfRecords) { + /* + We initialize a new object of the KafkaMessage class which is a simplified representation in our models package + We then get the fields from each kafka message in the object of KafkaEventRecord class and set them to the fields + of the KafkaRecord class + */ + KafkaMessage thisMessage = new KafkaMessage(); + thisMessage.setTopic(thisRecord.getTopic()); + thisMessage.setPartition(thisRecord.getPartition()); + thisMessage.setOffset(thisRecord.getOffset()); + thisMessage.setTimestamp(thisRecord.getTimestamp()); + thisMessage.setTimestampType(thisRecord.getTimestampType()); + String key = thisRecord.getKey(); + String value = thisRecord.getValue(); + String decodedKey = "null"; + String decodedValue = "null"; + //the key and value inside a kafka message are base64 encrypted and will need to be decrypted + if (null != key) { + byte[] decodedKeyBytes = Base64.getDecoder().decode(key); + decodedKey = new String(decodedKeyBytes); + } + if (null != value) { + byte[] decodedValueBytes = Base64.getDecoder().decode(value); + decodedValue = new String(decodedValueBytes); + } + thisMessage.setKey(key); + thisMessage.setValue(value); + thisMessage.setDecodedKey(decodedKey); + thisMessage.setDecodedValue(decodedValue); + //A kafka message can optionally have a list of headers + //the below code is to get the headers, iterate through each header and get its key and value + List headersInThisMessage = new ArrayList(); + List> headers = thisRecord.getHeaders(); + for (Map thisHeader : headers) { + Set thisHeaderKeys = thisHeader.keySet(); + Iterator thisHeaderKeysIterator = thisHeaderKeys.iterator(); + while (thisHeaderKeysIterator.hasNext()) { + String thisHeaderKey = thisHeaderKeysIterator.next(); + byte[] thisHeaderValue = (byte[])thisHeader.get(thisHeaderKey); + String thisHeaderValueString = new String(thisHeaderValue); + KafkaHeader thisMessageHeader = new KafkaHeader(); + thisMessageHeader.setKey(thisHeaderKey); + thisMessageHeader.setValue(thisHeaderValueString); + headersInThisMessage.add(thisMessageHeader); + } + } + thisMessage.setHeaders(headersInThisMessage); + listOfMessages.add(thisMessage); + // Below we are logging the particular kafka message in string format using the toString method + // as well as in Json format using gson.toJson function + logger.log("Received this message from Kafka - " + thisMessage.toString()); + logger.log("Message in JSON format : " + gson.toJson(thisMessage)); + } + } + logger.log("All Messages in this batch = " + gson.toJson(listOfMessages)); + return response; + } +} diff --git a/msk-lambda-iam-java-sam/kafka_event_consumer_function/src/main/java/com/amazonaws/services/lambda/samples/events/msk/KafkaHeader.java b/msk-lambda-iam-java-sam/kafka_event_consumer_function/src/main/java/com/amazonaws/services/lambda/samples/events/msk/KafkaHeader.java new file mode 100644 index 000000000..d093d05ee --- /dev/null +++ b/msk-lambda-iam-java-sam/kafka_event_consumer_function/src/main/java/com/amazonaws/services/lambda/samples/events/msk/KafkaHeader.java @@ -0,0 +1,67 @@ +package com.amazonaws.services.lambda.samples.events.msk; + +import java.util.Objects; + +public class KafkaHeader { + String key; + String value; + /** + * + */ + public KafkaHeader() { + super(); + } + /** + * @param key + * @param value + */ + public KafkaHeader(String key, String value) { + super(); + this.key = key; + this.value = value; + } + /** + * @return the key + */ + public String getKey() { + return key; + } + /** + * @param key the key to set + */ + public void setKey(String key) { + this.key = key; + } + /** + * @return the value + */ + public String getValue() { + return value; + } + /** + * @param value the value to set + */ + public void setValue(String value) { + this.value = value; + } + @Override + public int hashCode() { + return Objects.hash(key, value); + } + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + KafkaHeader other = (KafkaHeader) obj; + return Objects.equals(key, other.key) && Objects.equals(value, other.value); + } + @Override + public String toString() { + return "KafkaHeader [key=" + key + ", value=" + value + "]"; + } + +} diff --git a/msk-lambda-iam-java-sam/kafka_event_consumer_function/src/main/java/com/amazonaws/services/lambda/samples/events/msk/KafkaMessage.java b/msk-lambda-iam-java-sam/kafka_event_consumer_function/src/main/java/com/amazonaws/services/lambda/samples/events/msk/KafkaMessage.java new file mode 100644 index 000000000..98422a29f --- /dev/null +++ b/msk-lambda-iam-java-sam/kafka_event_consumer_function/src/main/java/com/amazonaws/services/lambda/samples/events/msk/KafkaMessage.java @@ -0,0 +1,198 @@ +package com.amazonaws.services.lambda.samples.events.msk; + +import java.util.List; +import java.util.Objects; + +public class KafkaMessage { + String topic; + int partition; + long offset; + long timestamp; + String timestampType; + String key; + String value; + String decodedKey; + String decodedValue; + List headers; + /** + * + */ + public KafkaMessage() { + super(); + } + /** + * @param topic + * @param partition + * @param offset + * @param timestamp + * @param timestampType + * @param key + * @param value + * @param decodedKey + * @param decodedValue + * @param headers + */ + public KafkaMessage(String topic, int partition, long offset, long timestamp, String timestampType, String key, String value, + String decodedKey, String decodedValue, List headers) { + super(); + this.topic = topic; + this.partition = partition; + this.offset = offset; + this.timestamp = timestamp; + this.timestampType = timestampType; + this.key = key; + this.value = value; + this.decodedKey = decodedKey; + this.decodedValue = decodedValue; + this.headers = headers; + } + /** + * @return the topic + */ + public String getTopic() { + return topic; + } + /** + * @param topic the topic to set + */ + public void setTopic(String topic) { + this.topic = topic; + } + /** + * @return the partition + */ + public int getPartition() { + return partition; + } + /** + * @param partition the partition to set + */ + public void setPartition(int partition) { + this.partition = partition; + } + /** + * @return the offset + */ + public long getOffset() { + return offset; + } + /** + * @param offset the offset to set + */ + public void setOffset(long offset) { + this.offset = offset; + } + /** + * @return the timestamp + */ + public long getTimestamp() { + return timestamp; + } + /** + * @param timestamp the timestamp to set + */ + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + /** + * @return the timestampType + */ + public String getTimestampType() { + return timestampType; + } + /** + * @param timestampType the timestampType to set + */ + public void setTimestampType(String timestampType) { + this.timestampType = timestampType; + } + /** + * @return the key + */ + public String getKey() { + return key; + } + /** + * @param key the key to set + */ + public void setKey(String key) { + this.key = key; + } + /** + * @return the value + */ + public String getValue() { + return value; + } + /** + * @param value the value to set + */ + public void setValue(String value) { + this.value = value; + } + /** + * @return the decodedKey + */ + public String getDecodedKey() { + return decodedKey; + } + /** + * @param decodedKey the decodedKey to set + */ + public void setDecodedKey(String decodedKey) { + this.decodedKey = decodedKey; + } + /** + * @return the decodedValue + */ + public String getDecodedValue() { + return decodedValue; + } + /** + * @param decodedValue the decodedValue to set + */ + public void setDecodedValue(String decodedValue) { + this.decodedValue = decodedValue; + } + /** + * @return the headers + */ + public List getHeaders() { + return headers; + } + /** + * @param headers the headers to set + */ + public void setHeaders(List headers) { + this.headers = headers; + } + + @Override + public int hashCode() { + return Objects.hash(decodedKey, decodedValue, headers, key, offset, partition, timestamp, timestampType, topic, + value); + } + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + KafkaMessage other = (KafkaMessage) obj; + return Objects.equals(decodedKey, other.decodedKey) && Objects.equals(decodedValue, other.decodedValue) + && Objects.equals(headers, other.headers) && Objects.equals(key, other.key) && offset == other.offset + && partition == other.partition && timestamp == other.timestamp + && Objects.equals(timestampType, other.timestampType) && Objects.equals(topic, other.topic) + && Objects.equals(value, other.value); + } + @Override + public String toString() { + return "KafkaMessage [topic=" + topic + ", partition=" + partition + ", timestamp=" + timestamp + + ", timestampType=" + timestampType + ", key=" + key + ", value=" + value + ", decodedKey=" + + decodedKey + ", decodedValue=" + decodedValue + ", headers=" + headers.toString() + "]"; + } + + +} diff --git a/msk-lambda-iam-java-sam/kafka_event_consumer_function/src/test/java/com/amazonaws/services/lambda/samples/events/msk/HandlerMSKTest.java b/msk-lambda-iam-java-sam/kafka_event_consumer_function/src/test/java/com/amazonaws/services/lambda/samples/events/msk/HandlerMSKTest.java new file mode 100644 index 000000000..65bd543d3 --- /dev/null +++ b/msk-lambda-iam-java-sam/kafka_event_consumer_function/src/test/java/com/amazonaws/services/lambda/samples/events/msk/HandlerMSKTest.java @@ -0,0 +1,70 @@ +package com.amazonaws.services.lambda.samples.events.msk; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.junit.jupiter.api.Test; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.events.KafkaEvent; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + + +class HandlerMSKTest { + private static final String kafkaEventJson = "{\n" + + " \"records\":{\n" + + " \"myTopic-0\":[\n" + + " {\n" + + " \"topic\":\"myTopic\",\n" + + " \"partition\":0,\n" + + " \"offset\":250,\n" + + " \"timestamp\":1678072110111,\n" + + " \"timestampType\":\"CREATE_TIME\",\n" + + " \"value\":\"Zg==\",\n" + + " \"headers\":[\n" + + " \n" + + " ]\n" + + " },\n" + + " {\n" + + " \"topic\":\"myTopic\",\n" + + " \"partition\":0,\n" + + " \"offset\":251,\n" + + " \"timestamp\":1678072111086,\n" + + " \"timestampType\":\"CREATE_TIME\",\n" + + " \"value\":\"Zw==\",\n" + + " \"headers\":[\n" + + " \n" + + " ]\n" + + " }\n" + + " ]\n" + + " },\n" + + " \"eventSource\":\"aws:kafka\",\n" + + " \"eventSourceArn\":\"arn:aws:kafka:us-west-2:123456789012:cluster/MSKWorkshopCluster/a93759a9-c9d0-4952-984c-492c6bfa2be8-13\",\n" + + " \"bootstrapServers\":\"b-2.mskworkshopcluster.z9kc4f.c13.kafka.us-west-2.amazonaws.com:9098,b-3.mskworkshopcluster.z9kc4f.c13.kafka.us-west-2.amazonaws.com:9098,b-1.mskworkshopcluster.z9kc4f.c13.kafka.us-west-2.amazonaws.com:9098\"\n" + + "}"; + + @Test + void invokeTest() { + Gson gson = new GsonBuilder().setPrettyPrinting().create(); + KafkaEvent event = gson.fromJson(kafkaEventJson, KafkaEvent.class); + Context context = new TestContext(); + HandlerMSK handler = new HandlerMSK(); + String result = handler.handleRequest(event, context); + assertEquals(result, "200 OK"); + assertEquals(handler.listOfMessages.size(), 2); + assertEquals(handler.listOfMessages.get(0).getTopic(), "myTopic"); + assertEquals(handler.listOfMessages.get(0).getPartition(), 0); + assertEquals(handler.listOfMessages.get(0).getOffset(), 250L); + assertEquals(handler.listOfMessages.get(0).getTimestamp(), 1678072110111L); + assertEquals(handler.listOfMessages.get(0).getTimestampType(), "CREATE_TIME"); + assertEquals(handler.listOfMessages.get(0).getDecodedKey(), "null"); + assertEquals(handler.listOfMessages.get(0).getDecodedValue(), "f"); + assertEquals(handler.listOfMessages.get(1).getTopic(), "myTopic"); + assertEquals(handler.listOfMessages.get(1).getPartition(), 0); + assertEquals(handler.listOfMessages.get(1).getOffset(), 251L); + assertEquals(handler.listOfMessages.get(1).getTimestamp(), 1678072111086L); + assertEquals(handler.listOfMessages.get(1).getTimestampType(), "CREATE_TIME"); + assertEquals(handler.listOfMessages.get(1).getDecodedKey(), "null"); + assertEquals(handler.listOfMessages.get(1).getDecodedValue(), "g"); + } +} diff --git a/msk-lambda-iam-java-sam/kafka_event_consumer_function/src/test/java/com/amazonaws/services/lambda/samples/events/msk/TestContext.java b/msk-lambda-iam-java-sam/kafka_event_consumer_function/src/test/java/com/amazonaws/services/lambda/samples/events/msk/TestContext.java new file mode 100644 index 000000000..479a3b98a --- /dev/null +++ b/msk-lambda-iam-java-sam/kafka_event_consumer_function/src/test/java/com/amazonaws/services/lambda/samples/events/msk/TestContext.java @@ -0,0 +1,45 @@ +package com.amazonaws.services.lambda.samples.events.msk; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.CognitoIdentity; +import com.amazonaws.services.lambda.runtime.ClientContext; +import com.amazonaws.services.lambda.runtime.LambdaLogger; + +public class TestContext implements Context{ + + public TestContext() {} + public String getAwsRequestId(){ + return new String("495b12a8-xmpl-4eca-8168-160484189f99"); + } + public String getLogGroupName(){ + return new String("/aws/lambda/my-function"); + } + public String getLogStreamName(){ + return new String("2020/02/26/[$LATEST]704f8dxmpla04097b9134246b8438f1a"); + } + public String getFunctionName(){ + return new String("my-function"); + } + public String getFunctionVersion(){ + return new String("$LATEST"); + } + public String getInvokedFunctionArn(){ + return new String("arn:aws:lambda:us-east-2:123456789012:function:my-function"); + } + public CognitoIdentity getIdentity(){ + return null; + } + public ClientContext getClientContext(){ + return null; + } + public int getRemainingTimeInMillis(){ + return 300000; + } + public int getMemoryLimitInMB(){ + return 512; + } + public LambdaLogger getLogger(){ + return new TestLogger(); + } + +} \ No newline at end of file diff --git a/msk-lambda-iam-java-sam/kafka_event_consumer_function/src/test/java/com/amazonaws/services/lambda/samples/events/msk/TestLogger.java b/msk-lambda-iam-java-sam/kafka_event_consumer_function/src/test/java/com/amazonaws/services/lambda/samples/events/msk/TestLogger.java new file mode 100644 index 000000000..0fb16cdc6 --- /dev/null +++ b/msk-lambda-iam-java-sam/kafka_event_consumer_function/src/test/java/com/amazonaws/services/lambda/samples/events/msk/TestLogger.java @@ -0,0 +1,14 @@ +package com.amazonaws.services.lambda.samples.events.msk; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.amazonaws.services.lambda.runtime.LambdaLogger; + +public class TestLogger implements LambdaLogger { + private static final Logger logger = LoggerFactory.getLogger(TestLogger.class); + public void log(String message){ + logger.info(message); + } + public void log(byte[] message){ + logger.info(new String(message)); + } +} diff --git a/msk-lambda-iam-java-sam/template.yaml b/msk-lambda-iam-java-sam/template.yaml new file mode 100644 index 000000000..e0da4f901 --- /dev/null +++ b/msk-lambda-iam-java-sam/template.yaml @@ -0,0 +1,87 @@ +AWSTemplateFormatVersion: '2010-09-09' +Transform: AWS::Serverless-2016-10-31 +Description: > + kafka_event_consumer_function + + Sample SAM Template for simple-kafka-consumer-with-sam + +# More info about Globals: https://github.com/awslabs/serverless-application-model/blob/master/docs/globals.rst +Globals: + Function: + Timeout: 15 + +Resources: + LambdaMSKConsumerJavaFunction: + Type: AWS::Serverless::Function # More info about Function Resource: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#awsserverlessfunction + Properties: + CodeUri: kafka_event_consumer_function + Handler: com.amazonaws.services.lambda.samples.events.msk.HandlerMSK::handleRequest + Runtime: java11 + Architectures: + - x86_64 + MemorySize: 512 + Environment: # More info about Env Vars: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#environment-object + Variables: + PARAM1: VALUE + JAVA_TOOL_OPTIONS: -XX:+TieredCompilation -XX:TieredStopAtLevel=1 # More info about tiered compilation https://aws.amazon.com/blogs/compute/optimizing-aws-lambda-function-performance-for-java/ + Events: + MSKEvent: + Type: MSK + Properties: + StartingPosition: LATEST + BatchSize: 10 + MaximumBatchingWindowInSeconds: 300 + Stream: !Join [ '', ["arn:", "aws:", "kafka:", !Ref "AWS::Region" , ":" ,!Ref "AWS::AccountId", ":", "cluster/", !Ref MSKClusterName, "/" , !Ref MSKClusterId] ] + Topics: + - !Ref MSKTopic + Policies: + - Statement: + - Sid: KafkaClusterPermissionsPolicy + Effect: Allow + Action: + - kafka-cluster:Connect + - kafka-cluster:DescribeGroup + - kafka-cluster:DescribeCluster + - kafka-cluster:AlterCluster + - kafka-cluster:AlterClusterDynamicConfiguration + - kafka-cluster:WriteDataIdempotently + - kafka-cluster:AlterGroup + - kafka-cluster:DescribeTopic + - kafka-cluster:ReadData + - kafka-cluster:DescribeClusterDynamicConfiguration + Resource: + - !Join ['', ["arn:", "aws:", "kafka:", !Ref "AWS::Region", ":", !Ref "AWS::AccountId", ":", "cluster/", !Ref MSKClusterName, "/" , !Ref MSKClusterId]] + - !Join ['', ["arn:", "aws:", "kafka:", !Ref "AWS::Region", ":", !Ref "AWS::AccountId", ":", "topic/", !Ref MSKClusterName, "/" , !Ref MSKClusterId, "/*"]] + - !Join ['', ["arn:", "aws:", "kafka:", !Ref "AWS::Region", ":", !Ref "AWS::AccountId", ":", "group/", !Ref MSKClusterName, "/" , !Ref MSKClusterId, "/*"]] + + - Sid: KafkaPermissionsPolicy + Effect: Allow + Action: + - kafka:DescribeClusterV2 + - kafka:GetBootstrapBrokers + Resource: '*' + + - Sid: EC2PermissionsPolicy + Effect: Allow + Action: + - ec2:DescribeSecurityGroups + - ec2:DescribeSubnets + - ec2:DescribeVpcs + - ec2:CreateNetworkInterface + - ec2:DescribeNetworkInterfaces + - ec2:DeleteNetworkInterface + Resource: '*' +Parameters: + MSKClusterName: + Type: String + Description: Enter the name of the MSK Cluster + MSKClusterId: + Type: String + Description: Enter the ID of the MSK Cluster + MSKTopic: + Type: String + Description: Enter the name of the MSK Topic +Outputs: + HelloWorldKafkaJavaFunction: + Description: "Topic Consumer Lambda Function ARN" + Value: !GetAtt LambdaMSKConsumerJavaFunction.Arn From 10343797aea5677de6b18324db0b9d41bc8ba93a Mon Sep 17 00:00:00 2001 From: Indranil Banerjee Date: Fri, 29 Nov 2024 12:20:23 -0800 Subject: [PATCH 02/12] Added Cloudformation Template that creates MSK Cluster and a Client EC2 machine with everything installed --- .../MSKAndKafkaClientEC2.yaml | 938 ++++++++++++++++++ 1 file changed, 938 insertions(+) create mode 100644 msk-lambda-iam-java-sam/MSKAndKafkaClientEC2.yaml diff --git a/msk-lambda-iam-java-sam/MSKAndKafkaClientEC2.yaml b/msk-lambda-iam-java-sam/MSKAndKafkaClientEC2.yaml new file mode 100644 index 000000000..1ecb969e0 --- /dev/null +++ b/msk-lambda-iam-java-sam/MSKAndKafkaClientEC2.yaml @@ -0,0 +1,938 @@ +AWSTemplateFormatVersion: '2010-09-09' +Parameters: + EnvType: + Description: MSK Cluster Type. + Default: Provisioned + Type: String + AllowedValues: + - Serverless + - Provisioned + ConstraintDescription: Must specify Serverless or Provisioned. + LatestAmiId: + Type: 'AWS::SSM::Parameter::Value' + Default: '/aws/service/ami-amazon-linux-latest/amzn2-ami-hvm-x86_64-gp2' + MSKKafkaVersion: + Type: String + Default: 3.5.1 + ApacheKafkaInstallerLocation: + Type: String + Default: https://archive.apache.org/dist/kafka/3.5.1/kafka_2.13-3.5.1.tgz + KafkaTopicForLambda: + Type: String + Default: MskIamJavaLambdaTopic +Conditions: + CreateProvisionedCluster: !Equals + - !Ref EnvType + - Provisioned + CreateServerlessCluster: !Equals + - !Ref EnvType + - Serverless +Mappings: + SubnetConfig: + VPC: + CIDR: '10.0.0.0/16' + PublicOne: + CIDR: '10.0.0.0/24' + PrivateSubnetMSKOne: + CIDR: '10.0.1.0/24' + PrivateSubnetMSKTwo: + CIDR: '10.0.2.0/24' + PrivateSubnetMSKThree: + CIDR: '10.0.3.0/24' +Resources: + VPC: + Type: AWS::EC2::VPC + Properties: + EnableDnsSupport: true + EnableDnsHostnames: true + CidrBlock: !FindInMap ['SubnetConfig', 'VPC', 'CIDR'] + Tags: + - Key: 'Name' + Value: 'MSKVPC' + + PublicSubnetOne: + Type: AWS::EC2::Subnet + Properties: + AvailabilityZone: + Fn::Select: + - 0 + - Fn::GetAZs: {Ref: 'AWS::Region'} + VpcId: !Ref 'VPC' + CidrBlock: !FindInMap ['SubnetConfig', 'PublicOne', 'CIDR'] + MapPublicIpOnLaunch: true + Tags: + - Key: 'Name' + Value: 'PublicSubnet' + PrivateSubnetMSKOne: + Type: AWS::EC2::Subnet + Properties: + AvailabilityZone: + Fn::Select: + - 0 + - Fn::GetAZs: {Ref: 'AWS::Region'} + VpcId: !Ref 'VPC' + CidrBlock: !FindInMap ['SubnetConfig', 'PrivateSubnetMSKOne', 'CIDR'] + MapPublicIpOnLaunch: false + Tags: + - Key: 'Name' + Value: 'PrivateSubnetMSKOne' + PrivateSubnetMSKTwo: + Type: AWS::EC2::Subnet + Properties: + AvailabilityZone: + Fn::Select: + - 1 + - Fn::GetAZs: {Ref: 'AWS::Region'} + VpcId: !Ref 'VPC' + CidrBlock: !FindInMap ['SubnetConfig', 'PrivateSubnetMSKTwo', 'CIDR'] + MapPublicIpOnLaunch: false + Tags: + - Key: 'Name' + Value: 'PrivateSubnetMSKTwo' + PrivateSubnetMSKThree: + Type: AWS::EC2::Subnet + Properties: + AvailabilityZone: + Fn::Select: + - 2 + - Fn::GetAZs: {Ref: 'AWS::Region'} + VpcId: !Ref 'VPC' + CidrBlock: !FindInMap ['SubnetConfig', 'PrivateSubnetMSKThree', 'CIDR'] + MapPublicIpOnLaunch: false + Tags: + - Key: 'Name' + Value: 'PrivateSubnetMSKThree' + + InternetGateway: + Type: AWS::EC2::InternetGateway + GatewayAttachement: + Type: AWS::EC2::VPCGatewayAttachment + Properties: + VpcId: !Ref 'VPC' + InternetGatewayId: !Ref 'InternetGateway' + + NATEIP: + Type: AWS::EC2::EIP + DependsOn: GatewayAttachement + Properties: + Domain: vpc + + NATGateway: + Type: AWS::EC2::NatGateway + Properties: + AllocationId: !GetAtt NATEIP.AllocationId + SubnetId: !Ref 'PublicSubnetOne' + Tags: + - Key: 'Name' + Value: 'ConfluentKafkaNATGateway' + + PublicRouteTable: + Type: AWS::EC2::RouteTable + Properties: + VpcId: !Ref 'VPC' + PublicRoute: + Type: AWS::EC2::Route + DependsOn: GatewayAttachement + Properties: + RouteTableId: !Ref 'PublicRouteTable' + DestinationCidrBlock: '0.0.0.0/0' + GatewayId: !Ref 'InternetGateway' + PublicSubnetOneRouteTableAssociation: + Type: AWS::EC2::SubnetRouteTableAssociation + Properties: + SubnetId: !Ref PublicSubnetOne + RouteTableId: !Ref PublicRouteTable + + PrivateRouteTable: + Type: AWS::EC2::RouteTable + Properties: + VpcId: !Ref 'VPC' + + PrivateRoute: + Type: AWS::EC2::Route + DependsOn: NATGateway + Properties: + RouteTableId: !Ref 'PrivateRouteTable' + DestinationCidrBlock: '0.0.0.0/0' + NatGatewayId: !Ref 'NATGateway' + + PrivateSubnetMSKOneRouteTableAssociation: + Type: AWS::EC2::SubnetRouteTableAssociation + Properties: + RouteTableId: !Ref PrivateRouteTable + SubnetId: !Ref PrivateSubnetMSKOne + PrivateSubnetMSKTwoRouteTableAssociation: + Type: AWS::EC2::SubnetRouteTableAssociation + Properties: + RouteTableId: !Ref PrivateRouteTable + SubnetId: !Ref PrivateSubnetMSKTwo + PrivateSubnetMSKThreeRouteTableAssociation: + Type: AWS::EC2::SubnetRouteTableAssociation + Properties: + RouteTableId: !Ref PrivateRouteTable + SubnetId: !Ref PrivateSubnetMSKThree + + KafkaClientInstanceSecurityGroup: + Type: AWS::EC2::SecurityGroup + Properties: + GroupDescription: Enable SSH access via port 22 from BastionHostSecurityGroup + GroupName: !Sub "${AWS::StackName} Security group attached to the kakfa client producer" + VpcId: !Ref VPC + SecurityGroupIngress: + - IpProtocol: tcp + FromPort: 22 + ToPort: 22 + CidrIp: 10.0.0.0/24 + - IpProtocol: tcp + FromPort: 3500 + ToPort: 3500 + CidrIp: 10.0.0.0/24 + - IpProtocol: tcp + FromPort: 3600 + ToPort: 3600 + CidrIp: 10.0.0.0/24 + - IpProtocol: tcp + FromPort: 3800 + ToPort: 3800 + CidrIp: 10.0.0.0/24 + - IpProtocol: tcp + FromPort: 3900 + ToPort: 3900 + CidrIp: 10.0.0.0/24 + + MSKSecurityGroup: + Type: AWS::EC2::SecurityGroup + DependsOn: [VPC,KafkaClientInstanceSecurityGroup] + Properties: + GroupDescription: MSK Security Group + GroupName: !Sub "${AWS::StackName} Security group for the MSK cluster" + VpcId: !Ref 'VPC' + SecurityGroupIngress: + - IpProtocol: tcp + FromPort: 2181 + ToPort: 2181 + SourceSecurityGroupId: !GetAtt KafkaClientInstanceSecurityGroup.GroupId + - IpProtocol: tcp + FromPort: 9094 + ToPort: 9094 + SourceSecurityGroupId: !GetAtt KafkaClientInstanceSecurityGroup.GroupId + - IpProtocol: tcp + FromPort: 9096 + ToPort: 9096 + SourceSecurityGroupId: !GetAtt KafkaClientInstanceSecurityGroup.GroupId + - IpProtocol: tcp + FromPort: 9092 + ToPort: 9092 + SourceSecurityGroupId: !GetAtt KafkaClientInstanceSecurityGroup.GroupId + - IpProtocol: tcp + FromPort: 9098 + ToPort: 9098 + SourceSecurityGroupId: !GetAtt KafkaClientInstanceSecurityGroup.GroupId + - IpProtocol: tcp + FromPort: 8083 + ToPort: 8083 + SourceSecurityGroupId: !GetAtt KafkaClientInstanceSecurityGroup.GroupId + - IpProtocol: tcp + FromPort: 8081 + ToPort: 8081 + SourceSecurityGroupId: !GetAtt KafkaClientInstanceSecurityGroup.GroupId + + MSKSelfIngressAllowRule: + Type: AWS::EC2::SecurityGroupIngress + DependsOn: MSKSecurityGroup + Properties: + GroupId: !GetAtt MSKSecurityGroup.GroupId + Description: Enable Self referencing Bootstrap servers + IpProtocol: tcp + FromPort: 9092 + ToPort: 9098 + SourceSecurityGroupId: !GetAtt MSKSecurityGroup.GroupId + + KafkaClientSelfIngressAllowRule: + Type: AWS::EC2::SecurityGroupIngress + DependsOn: KafkaClientInstanceSecurityGroup + Properties: + GroupId: !GetAtt KafkaClientInstanceSecurityGroup.GroupId + IpProtocol: tcp + FromPort: 22 + ToPort: 22 + SourceSecurityGroupId: !GetAtt KafkaClientInstanceSecurityGroup.GroupId + + KafkaClientEC2InstanceProvisioned: + Condition: CreateProvisionedCluster + DependsOn: MSKCluster + Type: AWS::EC2::Instance + Properties: + InstanceType: m5.large + IamInstanceProfile: !Ref EC2InstanceProfile + AvailabilityZone: + Fn::Select: + - 0 + - Fn::GetAZs: {Ref: 'AWS::Region'} + SubnetId: !Ref PublicSubnetOne + SecurityGroupIds: [!GetAtt KafkaClientInstanceSecurityGroup.GroupId] + ImageId: !Ref LatestAmiId + Tags: + - Key: 'Name' + Value: 'KafkaClientInstance' + BlockDeviceMappings: + - DeviceName: /dev/xvda + Ebs: + VolumeSize: 50 + VolumeType: gp2 + DeleteOnTermination: true + UserData: + Fn::Base64: + !Sub + - | + #!/bin/bash + yum update -y + yum install java-1.8.0-openjdk-devel -y + yum install nmap-ncat -y + yum install git -y + yum erase awscli -y + yum install jq -y + amazon-linux-extras install docker -y + amazon-linux-extras install python3.8 -y + service docker start + usermod -a -G docker ec2-user + unlink /usr/bin/python3 + ln -s /usr/bin/python3.8 /usr/bin/python3 + sudo wget https://repos.fedorapeople.org/repos/dchen/apache-maven/epel-apache-maven.repo -O /etc/yum.repos.d/epel-apache-maven.repo + sudo sed -i s/\$releasever/6/g /etc/yum.repos.d/epel-apache-maven.repo + sudo yum install -y apache-maven + + cd /home/ec2-user + su -c "ln -s /usr/bin/python3.8 /usr/bin/python3" -s /bin/sh ec2-user + su -c "pip3 install boto3 --user" -s /bin/sh ec2-user + su -c "pip3 install kafka-python --user" -s /bin/sh ec2-user + + # install AWS CLI 2 - access with aws2 + cd /home/ec2-user + mkdir -p awscli + cd awscli + curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" + unzip awscliv2.zip + sudo ./aws/install + + # Create dirs, get Apache Kafka and unpack it + cd /home/ec2-user + KAFKA_VERSION=${msk_kafka_version} + KAFKA_FOLDER_VERSION=$(echo "$KAFKA_VERSION" | tr -d '.') + KAFKA_FOLDER='Kafka'$KAFKA_FOLDER_VERSION + mkdir -p $KAFKA_FOLDER + mkdir -p /tmp/kafka + ln -s /home/ec2-user/$KAFKA_FOLDER /home/ec2-user/kafka + cd $KAFKA_FOLDER + APACHE_KAFKA_INSTALLER_LOCATION=${apache_kafka_installer_location} + wget $APACHE_KAFKA_INSTALLER_LOCATION + APACHE_KAFKA_INSTALLER_FILE=$(echo "$APACHE_KAFKA_INSTALLER_LOCATION" | awk -F "/" '{print $NF}') + tar -xzf $APACHE_KAFKA_INSTALLER_FILE --strip 1 + cd libs + wget https://github.com/aws/aws-msk-iam-auth/releases/download/v2.2.0/aws-msk-iam-auth-2.2.0-all.jar + cd ../bin + echo "security.protocol=SASL_SSL" > client.properties + echo "sasl.mechanism=AWS_MSK_IAM" >> client.properties + echo "sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;" >> client.properties + echo "sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler" >> client.properties + + # Install AWS SAM CLI + cd /home/ec2-user + mkdir -p awssam + cd awssam + wget https://github.com/aws/aws-sam-cli/releases/latest/download/aws-sam-cli-linux-x86_64.zip + unzip aws-sam-cli-linux-x86_64.zip -d sam-installation + sudo ./sam-installation/install + + # Create command files for creating Kafka Topic and Kafka Producer + cd /home/ec2-user + MSK_CLUSTER_ARN=${msk_cluster_arn} + KAFKA_TOPIC=${kafka_topic_for_lambda} + echo "#!/bin/bash" > kafka_topic_creator.sh + sudo chmod +x kafka_topic_creator.sh + echo "MSK_CLUSTER_ARN=$MSK_CLUSTER_ARN" >> kafka_topic_creator.sh + AWS_REGION=$(curl -fsq http://169.254.169.254/latest/meta-data/placement/availability-zone | sed 's/[a-z]$//') + echo "AWS_REGION=$AWS_REGION" >> kafka_topic_creator.sh + echo "BOOTSTRAP_BROKERS_IAM=\$(aws kafka get-bootstrap-brokers --region \$AWS_REGION --cluster-arn \$MSK_CLUSTER_ARN --query 'BootstrapBrokerStringSaslIam' --output text)" >> kafka_topic_creator.sh + echo "sleep 5" >> kafka_topic_creator.sh + echo "KAFKA_TOPIC=$KAFKA_TOPIC" >> kafka_topic_creator.sh + echo "/home/ec2-user/kafka/bin/kafka-topics.sh --create --bootstrap-server \$BOOTSTRAP_BROKERS_IAM --command-config /home/ec2-user/kafka/bin/client.properties --replication-factor 3 --partitions 3 --topic \$KAFKA_TOPIC" >> kafka_topic_creator.sh + echo "echo \"export MSK_CLUSTER_ARN=\$MSK_CLUSTER_ARN\" >> .bash_profile" >> kafka_topic_creator.sh + echo "echo \"export AWS_REGION=\$AWS_REGION\" >> .bash_profile" >> kafka_topic_creator.sh + echo "echo \"export BOOTSTRAP_BROKERS_IAM=\$BOOTSTRAP_BROKERS_IAM\" >> .bash_profile" >> kafka_topic_creator.sh + echo "echo \"export KAFKA_TOPIC=\$KAFKA_TOPIC\" >> .bash_profile" >> kafka_topic_creator.sh + echo "#!/bin/bash" > kafka_message_sender.sh + echo "source /home/ec2-user/.bash_profile" >> kafka_message_sender.sh + echo "/home/ec2-user/kafka/bin/kafka-console-producer.sh --bootstrap-server \$BOOTSTRAP_BROKERS_IAM --producer.config /home/ec2-user/kafka/bin/client.properties --topic $KAFKA_TOPIC" >> kafka_message_sender.sh + sudo chmod +x kafka_message_sender.sh + + #Checkout Serverless Patterns from Github + cd /home/ec2-user + git clone https://github.com/aws-samples/serverless-patterns/ + + # Get IP CIDR range for EC2 Instance Connect + cd /home/ec2-user + mkdir -p ip_prefix + cd ip_prefix + git clone https://github.com/joetek/aws-ip-ranges-json.git + cd aws-ip-ranges-json + AWS_REGION=$(curl -s http://169.254.169.254/latest/meta-data/placement/availability-zone | sed 's/\(.*\)[a-z]/\1/') + EC2_CONNECT_IP=$(cat ip-ranges-ec2-instance-connect.json | jq -r --arg AWS_REGION "$AWS_REGION" '.prefixes[] | select(.region==$AWS_REGION).ip_prefix') + echo "export EC2_CONNECT_IP=$EC2_CONNECT_IP" >> /home/ec2-user/.bash_profile + SECURITY_GROUP=${security_group_id} + echo "export SECURITY_GROUP=$SECURITY_GROUP" >> /home/ec2-user/.bash_profile + aws ec2 authorize-security-group-ingress --region $AWS_REGION --group-id $SECURITY_GROUP --protocol tcp --port 22 --cidr $EC2_CONNECT_IP + + - security_group_id : !GetAtt KafkaClientInstanceSecurityGroup.GroupId + msk_cluster_arn : !GetAtt MSKCluster.Arn + kafka_topic_for_lambda : !Ref KafkaTopicForLambda + msk_kafka_version: !Ref MSKKafkaVersion + apache_kafka_installer_location: !Ref ApacheKafkaInstallerLocation + + KafkaClientEC2InstanceServerless: + Condition: CreateServerlessCluster + DependsOn: ServerlessMSKCluster + Type: AWS::EC2::Instance + Properties: + InstanceType: m5.large + IamInstanceProfile: !Ref EC2InstanceProfile + AvailabilityZone: + Fn::Select: + - 0 + - Fn::GetAZs: {Ref: 'AWS::Region'} + SubnetId: !Ref PublicSubnetOne + SecurityGroupIds: [!GetAtt KafkaClientInstanceSecurityGroup.GroupId] + ImageId: !Ref LatestAmiId + Tags: + - Key: 'Name' + Value: 'KafkaClientInstance' + BlockDeviceMappings: + - DeviceName: /dev/xvda + Ebs: + VolumeSize: 50 + VolumeType: gp2 + DeleteOnTermination: true + UserData: + Fn::Base64: + !Sub + - | + #!/bin/bash + yum update -y + yum install java-1.8.0-openjdk-devel -y + yum install nmap-ncat -y + yum install git -y + yum erase awscli -y + yum install jq -y + amazon-linux-extras install docker -y + amazon-linux-extras install python3.8 -y + service docker start + usermod -a -G docker ec2-user + unlink /usr/bin/python3 + ln -s /usr/bin/python3.8 /usr/bin/python3 + sudo wget https://repos.fedorapeople.org/repos/dchen/apache-maven/epel-apache-maven.repo -O /etc/yum.repos.d/epel-apache-maven.repo + sudo sed -i s/\$releasever/6/g /etc/yum.repos.d/epel-apache-maven.repo + sudo yum install -y apache-maven + + cd /home/ec2-user + su -c "ln -s /usr/bin/python3.8 /usr/bin/python3" -s /bin/sh ec2-user + su -c "pip3 install boto3 --user" -s /bin/sh ec2-user + su -c "pip3 install kafka-python --user" -s /bin/sh ec2-user + + # install AWS CLI 2 - access with aws2 + cd /home/ec2-user + mkdir -p awscli + cd awscli + curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" + unzip awscliv2.zip + sudo ./aws/install + + # Create dirs, get Apache Kafka and unpack it + cd /home/ec2-user + KAFKA_VERSION=${msk_kafka_version} + KAFKA_FOLDER_VERSION=$(echo "$KAFKA_VERSION" | tr -d '.') + KAFKA_FOLDER='Kafka'$KAFKA_FOLDER_VERSION + mkdir -p $KAFKA_FOLDER + mkdir -p /tmp/kafka + ln -s /home/ec2-user/$KAFKA_FOLDER /home/ec2-user/kafka + cd $KAFKA_FOLDER + APACHE_KAFKA_INSTALLER_LOCATION=${apache_kafka_installer_location} + wget $APACHE_KAFKA_INSTALLER_LOCATION + APACHE_KAFKA_INSTALLER_FILE=$(echo "$APACHE_KAFKA_INSTALLER_LOCATION" | awk -F "/" '{print $NF}') + tar -xzf $APACHE_KAFKA_INSTALLER_FILE --strip 1 + cd libs + wget https://github.com/aws/aws-msk-iam-auth/releases/download/v2.2.0/aws-msk-iam-auth-2.2.0-all.jar + cd ../bin + echo "security.protocol=SASL_SSL" > client.properties + echo "sasl.mechanism=AWS_MSK_IAM" >> client.properties + echo "sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;" >> client.properties + echo "sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler" >> client.properties + + # Install AWS SAM CLI + cd /home/ec2-user + mkdir -p awssam + cd awssam + wget https://github.com/aws/aws-sam-cli/releases/latest/download/aws-sam-cli-linux-x86_64.zip + unzip aws-sam-cli-linux-x86_64.zip -d sam-installation + sudo ./sam-installation/install + + # Create command files for creating Kafka Topic and Kafka Producer + cd /home/ec2-user + MSK_CLUSTER_ARN=${msk_cluster_arn} + KAFKA_TOPIC=${kafka_topic_for_lambda} + echo "#!/bin/bash" > kafka_topic_creator.sh + sudo chmod +x kafka_topic_creator.sh + echo "MSK_CLUSTER_ARN=$MSK_CLUSTER_ARN" >> kafka_topic_creator.sh + AWS_REGION=$(curl -fsq http://169.254.169.254/latest/meta-data/placement/availability-zone | sed 's/[a-z]$//') + echo "AWS_REGION=$AWS_REGION" >> kafka_topic_creator.sh + echo "BOOTSTRAP_BROKERS_IAM=\$(aws kafka get-bootstrap-brokers --region \$AWS_REGION --cluster-arn \$MSK_CLUSTER_ARN --query 'BootstrapBrokerStringSaslIam' --output text)" >> kafka_topic_creator.sh + echo "sleep 5" >> kafka_topic_creator.sh + echo "KAFKA_TOPIC=$KAFKA_TOPIC" >> kafka_topic_creator.sh + echo "/home/ec2-user/kafka/bin/kafka-topics.sh --create --bootstrap-server \$BOOTSTRAP_BROKERS_IAM --command-config /home/ec2-user/kafka/bin/client.properties --replication-factor 3 --partitions 3 --topic \$KAFKA_TOPIC" >> kafka_topic_creator.sh + echo "echo \"export MSK_CLUSTER_ARN=\$MSK_CLUSTER_ARN\" >> .bash_profile" >> kafka_topic_creator.sh + echo "echo \"export AWS_REGION=\$AWS_REGION\" >> .bash_profile" >> kafka_topic_creator.sh + echo "echo \"export BOOTSTRAP_BROKERS_IAM=\$BOOTSTRAP_BROKERS_IAM\" >> .bash_profile" >> kafka_topic_creator.sh + echo "echo \"export KAFKA_TOPIC=\$KAFKA_TOPIC\" >> .bash_profile" >> kafka_topic_creator.sh + echo "#!/bin/bash" > kafka_message_sender.sh + echo "source /home/ec2-user/.bash_profile" >> kafka_message_sender.sh + echo "/home/ec2-user/kafka/bin/kafka-console-producer.sh --bootstrap-server \$BOOTSTRAP_BROKERS_IAM --producer.config /home/ec2-user/kafka/bin/client.properties --topic $KAFKA_TOPIC" >> kafka_message_sender.sh + sudo chmod +x kafka_message_sender.sh + + #Checkout Serverless Patterns from Github + cd /home/ec2-user + git clone https://github.com/aws-samples/serverless-patterns/ + + # Get IP CIDR range for EC2 Instance Connect + cd /home/ec2-user + mkdir -p ip_prefix + cd ip_prefix + git clone https://github.com/joetek/aws-ip-ranges-json.git + cd aws-ip-ranges-json + AWS_REGION=$(curl -s http://169.254.169.254/latest/meta-data/placement/availability-zone | sed 's/\(.*\)[a-z]/\1/') + EC2_CONNECT_IP=$(cat ip-ranges-ec2-instance-connect.json | jq -r --arg AWS_REGION "$AWS_REGION" '.prefixes[] | select(.region==$AWS_REGION).ip_prefix') + echo "export EC2_CONNECT_IP=$EC2_CONNECT_IP" >> /home/ec2-user/.bash_profile + SECURITY_GROUP=${security_group_id} + echo "export SECURITY_GROUP=$SECURITY_GROUP" >> /home/ec2-user/.bash_profile + aws ec2 authorize-security-group-ingress --region $AWS_REGION --group-id $SECURITY_GROUP --protocol tcp --port 22 --cidr $EC2_CONNECT_IP + + - security_group_id : !GetAtt KafkaClientInstanceSecurityGroup.GroupId + msk_cluster_arn : !GetAtt MSKCluster.Arn + kafka_topic_for_lambda : !Ref KafkaTopicForLambda + msk_kafka_version: !Ref MSKKafkaVersion + apache_kafka_installer_location: !Ref ApacheKafkaInstallerLocation + + EC2InstanceEndpoint: + Type: AWS::EC2::InstanceConnectEndpoint + Properties: + PreserveClientIp: true + SecurityGroupIds: + - !GetAtt KafkaClientInstanceSecurityGroup.GroupId + SubnetId: !Ref PublicSubnetOne + + EC2Role: + Type: AWS::IAM::Role + Properties: + AssumeRolePolicyDocument: + Version: 2012-10-17 + Statement: + - Sid: '' + Effect: Allow + Principal: + Service: ec2.amazonaws.com + Action: 'sts:AssumeRole' + Path: "/" + ManagedPolicyArns: + - arn:aws:iam::aws:policy/AmazonMSKFullAccess + - arn:aws:iam::aws:policy/AWSCloudFormationFullAccess + - arn:aws:iam::aws:policy/CloudWatchFullAccess + - arn:aws:iam::aws:policy/AmazonSSMManagedInstanceCore + - arn:aws:iam::aws:policy/AmazonS3FullAccess + - arn:aws:iam::aws:policy/AWSCertificateManagerPrivateCAFullAccess + - arn:aws:iam::aws:policy/IAMFullAccess + Policies: + - PolicyName: MSKConfigurationAccess + PolicyDocument: !Sub '{ + "Version": "2012-10-17", + "Statement": [ + { + "Sid": "VisualEditor0", + "Effect": "Allow", + "Action": "kafka:CreateConfiguration", + "Resource": "*" + } + ] + }' + - PolicyName: CloudformationDeploy + PolicyDocument: !Sub '{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "iam:*" + ], + "Resource": "*" + } + ] + }' + - PolicyName: MSKProducerPermissions + PolicyDocument: + Version: 2012-10-17 + Statement: + - Sid: SecretsAccess + Effect: Allow + Action: + - 'secretsmanager:*' + - 'kms:*' + - 'glue:*Schema*' + - 'iam:CreatePolicy' + - 'iam:Tag*' + - 'iam:AttachRolePolicy' + Resource: '*' + - PolicyName: MSKConnectAuthentication + PolicyDocument: !Sub '{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "kafka-cluster:*Topic*", + "kafka-cluster:Connect", + "kafka-cluster:AlterCluster", + "kafka-cluster:DescribeCluster", + "kafka-cluster:DescribeClusterDynamicConfiguration" + ], + "Resource": [ + "arn:aws:kafka:${AWS::Region}:${AWS::AccountId}:cluster/${AWS::StackName}-cluster/*" + ] + }, + { + "Effect": "Allow", + "Action": [ + "kafka-cluster:*Topic*", + "kafka-cluster:WriteData", + "kafka-cluster:ReadData" + ], + "Resource": [ + "arn:aws:kafka:${AWS::Region}:${AWS::AccountId}:topic/${AWS::StackName}-cluster/*" + ] + }, + { + "Effect": "Allow", + "Action": [ + "kafka-cluster:AlterGroup", + "kafka-cluster:DescribeGroup" + ], + "Resource": [ + "arn:aws:kafka:${AWS::Region}:${AWS::AccountId}:group/${AWS::StackName}-cluster/*" + ] + } + ] + }' + - PolicyName: SecurityGroupsPolicy + PolicyDocument: !Sub '{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "ec2:DescribeSecurityGroups", + "ec2:DescribeSecurityGroupRules", + "ec2:DescribeTags" + ], + "Resource": "*" + }, + { + "Effect": "Allow", + "Action": [ + "ec2:AuthorizeSecurityGroupIngress", + "ec2:RevokeSecurityGroupIngress", + "ec2:AuthorizeSecurityGroupEgress", + "ec2:RevokeSecurityGroupEgress", + "ec2:ModifySecurityGroupRules", + "ec2:UpdateSecurityGroupRuleDescriptionsIngress", + "ec2:UpdateSecurityGroupRuleDescriptionsEgress" + ], + "Resource": [ + "arn:aws:ec2:${AWS::Region}:${AWS::AccountId}:security-group/*" + ] + }, + { + "Effect": "Allow", + "Action": [ + "ec2:ModifySecurityGroupRules" + ], + "Resource": [ + "arn:aws:ec2:${AWS::Region}:${AWS::AccountId}:security-group-rule/*" + ] + } + ] + }' + + EC2InstanceProfile: + Type: AWS::IAM::InstanceProfile + Properties: + InstanceProfileName: !Join + - '-' + - - 'EC2MMMSKCFProfile' + - !Ref 'AWS::StackName' + Roles: + - !Ref EC2Role + + + MSKCertAuthority: + Type: AWS::ACMPCA::CertificateAuthority + Condition: CreateProvisionedCluster + Properties: + KeyAlgorithm: "RSA_4096" + SigningAlgorithm: "SHA256WITHRSA" + Subject: + Country: "US" + Type: "ROOT" + + MSKCert: + Type: AWS::ACMPCA::Certificate + Condition: CreateProvisionedCluster + Properties: + CertificateAuthorityArn: !Ref MSKCertAuthority + CertificateSigningRequest: !GetAtt + - MSKCertAuthority + - CertificateSigningRequest + SigningAlgorithm: "SHA256WITHRSA" + TemplateArn: arn:aws:acm-pca:::template/RootCACertificate/V1 + Validity: + Type: YEARS + Value: 10 + + RootCAActivation: + Type: AWS::ACMPCA::CertificateAuthorityActivation + Condition: CreateProvisionedCluster + Properties: + CertificateAuthorityArn: + Ref: MSKCertAuthority + Certificate: + Fn::GetAtt: + - MSKCert + - Certificate + Status: ACTIVE + + RootCAPermission: + Type: AWS::ACMPCA::Permission + Condition: CreateProvisionedCluster + Properties: + Actions: + - IssueCertificate + - GetCertificate + - ListPermissions + CertificateAuthorityArn: !Ref MSKCertAuthority + Principal: acm.amazonaws.com + + CredentialsKMSKey: + Type: AWS::KMS::Key + Condition: CreateProvisionedCluster + Properties: + Description: "KMS key to use with credentials secret with KMS" + EnableKeyRotation: True + KeyPolicy: + Version: "2012-10-17" + Id: key-default-1 + Statement: + - Sid: Enable IAM User Permissions + Effect: Allow + Principal: + AWS: !Join + - '' + - - 'arn:aws:iam::' + - !Ref 'AWS::AccountId' + - ':root' + Action: 'kms:*' + Resource: '*' + - Sid: Enable Secret Manager Permissions + Effect: Allow + Principal: + AWS: "*" + Action: + - "kms:Decrypt" + - "kms:ReEncrypt*" + - "kms:GenerateDataKey*" + - "kms:CreateGrant" + - "kms:DescribeKey" + Resource: '*' + Condition: + StringEquals: + kms:CallerAccount: !Ref 'AWS::AccountId' + kms:ViaService: !Join + - '' + - - 'secretsmanager.' + - !Ref 'AWS::Region' + - '.amazonaws.com' + PendingWindowInDays: 7 + + CredentialsKMSKeyAlias: + Type: AWS::KMS::Alias + Condition: CreateProvisionedCluster + Properties: + AliasName: alias/mskstack_secret_manager_key + TargetKeyId: !Ref 'CredentialsKMSKey' + + CredentialsSecret: + Type: AWS::SecretsManager::Secret + Condition: CreateProvisionedCluster + Properties: + Description: "Secret to use for SCRAM Auth" + Name: "AmazonMSK_Credentials" + GenerateSecretString: + SecretStringTemplate: '{"username": "test-user"}' + GenerateStringKey: "password" + PasswordLength: 30 + ExcludeCharacters: '"@/\' + KmsKeyId: !Ref 'CredentialsKMSKey' + + MSKConfiguration: + Type: AWS::MSK::Configuration + Condition: CreateProvisionedCluster + Properties: + Description: "MSKConfiguration" + Name: "MSKConfiguration" + ServerProperties: | + auto.create.topics.enable=true + default.replication.factor=3 + min.insync.replicas=2 + num.io.threads=8 + num.network.threads=5 + num.partitions=1 + num.replica.fetchers=2 + replica.lag.time.max.ms=30000 + socket.receive.buffer.bytes=102400 + socket.request.max.bytes=104857600 + socket.send.buffer.bytes=102400 + unclean.leader.election.enable=true + zookeeper.session.timeout.ms=18000 + delete.topic.enable=true + log.retention.hours=8 + + MSKCluster: + Type: AWS::MSK::Cluster + Condition: CreateProvisionedCluster + Properties: + BrokerNodeGroupInfo: + ClientSubnets: + - !Ref PrivateSubnetMSKOne + - !Ref PrivateSubnetMSKTwo + - !Ref PrivateSubnetMSKThree + SecurityGroups: + - !GetAtt MSKSecurityGroup.GroupId + InstanceType: "kafka.m5.large" + StorageInfo: + EBSStorageInfo: + VolumeSize: 100 + ClientAuthentication: + Unauthenticated: + Enabled: False + Sasl: + Iam: + Enabled: True + Scram: + Enabled: True + Tls: + CertificateAuthorityArnList: + - !Ref MSKCertAuthority + Enabled: True + ClusterName: !Sub "${AWS::StackName}-cluster" + ConfigurationInfo: + Arn: !Ref MSKConfiguration + Revision: 1 + EncryptionInfo: + EncryptionInTransit: + ClientBroker: TLS + InCluster: True + KafkaVersion: !Ref MSKKafkaVersion + NumberOfBrokerNodes: 3 + + SecretMSKAssociation: + Type: AWS::MSK::BatchScramSecret + Condition: CreateProvisionedCluster + Properties: + ClusterArn: !Ref MSKCluster + SecretArnList: + - !Ref CredentialsSecret + + ServerlessMSKCluster: + Type: AWS::MSK::ServerlessCluster + Condition: CreateServerlessCluster + Properties: + ClientAuthentication: + Sasl: + Iam: + Enabled: True + ClusterName: !Sub "${AWS::StackName}-cluster" + VpcConfigs: + - SubnetIds: + - !Ref PrivateSubnetMSKOne + - !Ref PrivateSubnetMSKTwo + - !Ref PrivateSubnetMSKThree + SecurityGroups: + - !GetAtt MSKSecurityGroup.GroupId + +Outputs: + VPCId: + Description: The ID of the VPC created + Value: !Ref 'VPC' + Export: + Name: !Sub "${AWS::StackName}-VPCID" + PublicSubnetOne: + Description: The name of the public subnet created + Value: !Ref 'PublicSubnetOne' + Export: + Name: !Sub "${AWS::StackName}-PublicSubnetOne" + PrivateSubnetMSKOne: + Description: The ID of private subnet one created + Value: !Ref 'PrivateSubnetMSKOne' + Export: + Name: !Sub "${AWS::StackName}-PrivateSubnetMSKOne" + PrivateSubnetMSKTwo: + Description: The ID of private subnet two created + Value: !Ref 'PrivateSubnetMSKTwo' + Export: + Name: !Sub "${AWS::StackName}-PrivateSubnetMSKTwo" + PrivateSubnetMSKThree: + Description: The ID of private subnet three created + Value: !Ref 'PrivateSubnetMSKThree' + Export: + Name: !Sub "${AWS::StackName}-PrivateSubnetMSKThree" + VPCStackName: + Description: The name of the VPC Stack + Value: !Ref 'AWS::StackName' + Export: + Name: !Sub "${AWS::StackName}-VPCStackName" + ProvisionedMSKArn: + Description: Provisioned MSK Cluster ARN. + Value: !Ref MSKCluster + Export: + Name: !Sub "${AWS::StackName}-ProvisionedMSKArn" + Condition: "CreateProvisionedCluster" + CredentialsSecretArn: + Description: ARN for secret manager secret with credentials. + Value: !Ref CredentialsSecret + Export: + Name: !Sub "${AWS::StackName}-CredentialsSecret" + Condition: "CreateProvisionedCluster" + ServerlessMSKArn: + Description: Serverless MSK Cluster ARN. + Value: !Ref ServerlessMSKCluster + Export: + Name: !Sub "${AWS::StackName}-Serverless" + Condition: "CreateServerlessCluster" + SecurityGroupId: + Description: ID of scurity group for MSK clients. + Value: !GetAtt MSKSecurityGroup.GroupId + Export: + Name: !Sub "${AWS::StackName}-SecurityGroupId" + EC2InstanceEndpointID: + Description: The ID of the EC2 Instance Endpoint + Value: !Ref EC2InstanceEndpoint + KafkaTopicForLambda: + Description: The Topic to use for the Java Lambda Function + Value: !Ref KafkaTopicForLambda + Export: + Name: !Sub "${AWS::StackName}-KafkaTopicForLambda" + \ No newline at end of file From 3b6dcf0e3b108fe53625c30071feae6da5675763 Mon Sep 17 00:00:00 2001 From: Indranil Banerjee Date: Fri, 29 Nov 2024 14:27:24 -0800 Subject: [PATCH 03/12] Changed template.yaml to template_original.yaml --- .../{template.yaml => template_original.yaml} | 3 +++ 1 file changed, 3 insertions(+) rename msk-lambda-iam-java-sam/{template.yaml => template_original.yaml} (97%) diff --git a/msk-lambda-iam-java-sam/template.yaml b/msk-lambda-iam-java-sam/template_original.yaml similarity index 97% rename from msk-lambda-iam-java-sam/template.yaml rename to msk-lambda-iam-java-sam/template_original.yaml index e0da4f901..ded6e8618 100644 --- a/msk-lambda-iam-java-sam/template.yaml +++ b/msk-lambda-iam-java-sam/template_original.yaml @@ -75,12 +75,15 @@ Parameters: MSKClusterName: Type: String Description: Enter the name of the MSK Cluster + Default: CLUSTER_NAME MSKClusterId: Type: String Description: Enter the ID of the MSK Cluster + Default: CLUSTER_ID MSKTopic: Type: String Description: Enter the name of the MSK Topic + Default: KAFKA_TOPIC Outputs: HelloWorldKafkaJavaFunction: Description: "Topic Consumer Lambda Function ARN" From bcf3a06eea0c8d3f5a86e03bbf68c661f48215a2 Mon Sep 17 00:00:00 2001 From: Indranil Banerjee Date: Sat, 30 Nov 2024 23:39:52 -0800 Subject: [PATCH 04/12] Updated the README.md file for the msk-lambda-iam-java-sam --- .../MSKAndKafkaClientEC2.yaml | 44 ++++- msk-lambda-iam-java-sam/README.md | 159 +++++++++++++----- msk-lambda-iam-java-sam/README_Backup.md | 134 +++++++++++++++ 3 files changed, 288 insertions(+), 49 deletions(-) create mode 100644 msk-lambda-iam-java-sam/README_Backup.md diff --git a/msk-lambda-iam-java-sam/MSKAndKafkaClientEC2.yaml b/msk-lambda-iam-java-sam/MSKAndKafkaClientEC2.yaml index 1ecb969e0..6a5e7ccdb 100644 --- a/msk-lambda-iam-java-sam/MSKAndKafkaClientEC2.yaml +++ b/msk-lambda-iam-java-sam/MSKAndKafkaClientEC2.yaml @@ -20,6 +20,9 @@ Parameters: KafkaTopicForLambda: Type: String Default: MskIamJavaLambdaTopic + ServerlessLandGithubLocation: + Type: String + Default: https://github.com/aws-samples/serverless-patterns/ Conditions: CreateProvisionedCluster: !Equals - !Ref EnvType @@ -287,7 +290,7 @@ Resources: - | #!/bin/bash yum update -y - yum install java-1.8.0-openjdk-devel -y + yum install java-openjdk11-devel -y yum install nmap-ncat -y yum install git -y yum erase awscli -y @@ -365,10 +368,23 @@ Resources: echo "source /home/ec2-user/.bash_profile" >> kafka_message_sender.sh echo "/home/ec2-user/kafka/bin/kafka-console-producer.sh --bootstrap-server \$BOOTSTRAP_BROKERS_IAM --producer.config /home/ec2-user/kafka/bin/client.properties --topic $KAFKA_TOPIC" >> kafka_message_sender.sh sudo chmod +x kafka_message_sender.sh + CLUSTER_NAME="$(echo $MSK_CLUSTER_ARN | cut -d '/' -f2)" + CLUSTER_ID="$(echo $MSK_CLUSTER_ARN | cut -d '/' -f3)" + echo "export CLUSTER_NAME=$CLUSTER_NAME" >> /home/ec2-user/.bash_profile + echo "export CLUSTER_ID=$CLUSTER_ID" >> /home/ec2-user/.bash_profile + ./kafka_topic_creator.sh > kafka_topic_creator_output.txt #Checkout Serverless Patterns from Github cd /home/ec2-user - git clone https://github.com/aws-samples/serverless-patterns/ + SERVERLESS_LAND_GITHUB_LOCATION=${serverless_land_github_location} + git clone $SERVERLESS_LAND_GITHUB_LOCATION + cd ./serverless-patterns/msk-lambda-iam-java-sam + cp template_original.yaml template.yaml + sudo chown -R ec2-user . + source /home/ec2-user/.bash_profile + sed -i "s/CLUSTER_NAME/$CLUSTER_NAME/g" template.yaml + sed -i "s/CLUSTER_ID/$CLUSTER_ID/g" template.yaml + sed -i "s/KAFKA_TOPIC/$KAFKA_TOPIC/g" template.yaml # Get IP CIDR range for EC2 Instance Connect cd /home/ec2-user @@ -388,6 +404,7 @@ Resources: kafka_topic_for_lambda : !Ref KafkaTopicForLambda msk_kafka_version: !Ref MSKKafkaVersion apache_kafka_installer_location: !Ref ApacheKafkaInstallerLocation + serverless_land_github_location: !Ref ServerlessLandGithubLocation KafkaClientEC2InstanceServerless: Condition: CreateServerlessCluster @@ -418,7 +435,7 @@ Resources: - | #!/bin/bash yum update -y - yum install java-1.8.0-openjdk-devel -y + yum install java-openjdk11-devel -y yum install nmap-ncat -y yum install git -y yum erase awscli -y @@ -496,10 +513,23 @@ Resources: echo "source /home/ec2-user/.bash_profile" >> kafka_message_sender.sh echo "/home/ec2-user/kafka/bin/kafka-console-producer.sh --bootstrap-server \$BOOTSTRAP_BROKERS_IAM --producer.config /home/ec2-user/kafka/bin/client.properties --topic $KAFKA_TOPIC" >> kafka_message_sender.sh sudo chmod +x kafka_message_sender.sh + CLUSTER_NAME="$(echo $MSK_CLUSTER_ARN | cut -d '/' -f2)" + CLUSTER_ID="$(echo $MSK_CLUSTER_ARN | cut -d '/' -f3)" + echo "export CLUSTER_NAME=$CLUSTER_NAME" >> /home/ec2-user/.bash_profile + echo "export CLUSTER_ID=$CLUSTER_ID" >> /home/ec2-user/.bash_profile + ./kafka_topic_creator.sh > kafka_topic_creator_output.txt #Checkout Serverless Patterns from Github cd /home/ec2-user - git clone https://github.com/aws-samples/serverless-patterns/ + SERVERLESS_LAND_GITHUB_LOCATION=${serverless_land_github_location} + git clone $SERVERLESS_LAND_GITHUB_LOCATION + cd ./serverless-patterns/msk-lambda-iam-java-sam + cp template_original.yaml template.yaml + sudo chown -R ec2-user . + source /home/ec2-user/.bash_profile + sed -i "s/CLUSTER_NAME/$CLUSTER_NAME/g" template.yaml + sed -i "s/CLUSTER_ID/$CLUSTER_ID/g" template.yaml + sed -i "s/KAFKA_TOPIC/$KAFKA_TOPIC/g" template.yaml # Get IP CIDR range for EC2 Instance Connect cd /home/ec2-user @@ -519,6 +549,7 @@ Resources: kafka_topic_for_lambda : !Ref KafkaTopicForLambda msk_kafka_version: !Ref MSKKafkaVersion apache_kafka_installer_location: !Ref ApacheKafkaInstallerLocation + serverless_land_github_location: !Ref ServerlessLandGithubLocation EC2InstanceEndpoint: Type: AWS::EC2::InstanceConnectEndpoint @@ -548,6 +579,7 @@ Resources: - arn:aws:iam::aws:policy/AmazonS3FullAccess - arn:aws:iam::aws:policy/AWSCertificateManagerPrivateCAFullAccess - arn:aws:iam::aws:policy/IAMFullAccess + - arn:aws:iam::aws:policy/AWSLambda_FullAccess Policies: - PolicyName: MSKConfigurationAccess PolicyDocument: !Sub '{ @@ -904,11 +936,11 @@ Outputs: Value: !Ref 'AWS::StackName' Export: Name: !Sub "${AWS::StackName}-VPCStackName" - ProvisionedMSKArn: + MSKArn: Description: Provisioned MSK Cluster ARN. Value: !Ref MSKCluster Export: - Name: !Sub "${AWS::StackName}-ProvisionedMSKArn" + Name: !Sub "${AWS::StackName}-MSKArn" Condition: "CreateProvisionedCluster" CredentialsSecretArn: Description: ARN for secret manager secret with credentials. diff --git a/msk-lambda-iam-java-sam/README.md b/msk-lambda-iam-java-sam/README.md index f58d74424..a29f03673 100644 --- a/msk-lambda-iam-java-sam/README.md +++ b/msk-lambda-iam-java-sam/README.md @@ -1,67 +1,50 @@ # msk-lambda-iam-java-sam # Java AWS Lambda Kafka consumer with IAM auth, using AWS SAM -This pattern is an example of a Lambda function that consumes messages from an Amazon Managed Streaming for Kafka (Amazon MSK) topic, where the MSK Cluster has been configured to use IAM authentication. This pattern assumes you already have an MSK cluster with a topic configured, if you need a sample pattern to deploy an MSK cluster either in Provisioned or Serverless modes please see the [msk-cfn-sasl-lambda pattern](https://serverlessland.com/patterns/msk-cfn-sasl-lambda). +This pattern is an example of a Lambda function that consumes messages from an Amazon Managed Streaming for Kafka (Amazon MSK) topic, where the MSK Cluster has been configured to use IAM authentication. This project contains source code and supporting files for a serverless application that you can deploy with the SAM CLI. It includes the following files and folders. - kafka_event_consumer_function/src/main/java - Code for the application's Lambda function. - events - Invocation events that you can use to invoke the function. - kafka_event_consumer_function/src/test/java - Unit tests for the application code. -- template.yaml - A template that defines the application's AWS resources. - -The application uses several AWS resources, including Lambda functions and an MSK event source. These resources are defined in the `template.yaml` file in this project. You can update the template to add AWS resources through the same deployment process that updates your application code. +- template_original.yaml - A template that defines the application's Lambda function. +- MSKAndKafkaClientEC2.yaml - A Cloudformation template file that can be used to deploy an MSK cluster and also deploy an EC2 machine with all pre-requisities already installed, so you can directly build and deploy the lambda function and test it out. Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example. ## Requirements * [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources. -* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured -* [Git installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git) -* [AWS Serverless Application Model](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html) (AWS SAM) installed -* Create MSK cluster and topic that will be used for testing. It is important to create the topic before deploying the Lambda function, otherwise the event source mapping will stay disabled. - - -Before proceeding with the next step, please make sure you have Java JDK and Maven installed on your machine - -For the latest version of Amazon Corretto JDK (at the time of publishing), please go the following link: - -https://docs.aws.amazon.com/corretto/latest/corretto-21-ug/downloads-list.html - -Please follow the instructions to download and install the JDK for your Operating System - -Note that you don't have to use Amazon Corretto JDK but can use JDK from another source as well -For the latest version of Maven (at the time of publishing) please go the following link: +## Run the Cloudformation template to create the MSK Cluster and Client EC2 machine -https://maven.apache.org/download.cgi#:~:text=Apache%20Maven%203.9.9%20is,recommended%20version%20for%20all%20users. +* [Run the Cloudformation template using the file MSKAndKafkaClientEC2.yaml] - You can go to the AWS Cloudformation console, create a new stack by specifying the template file. You can keep the defaults for input parameters or modify them as necessary. Wait for the Cloudformation stack to be created. This Cloudformation template will create an MSK cluster (Provisioned or Serverless based on your selection). It will also create an EC2 machine that you can use as a client. -Please follow the instructions to download and install Maven and then add the location to the bin folder of Maven in your System PATH +* [Connect to the EC2 machine] - Once the Cloudformation stack is created, you can go to the EC2 console and log into the machine using either "Connect using EC2 Instance Connect" or "Connect using EC2 Instance Connect Endpoint" option under the "EC2 Instance Connect" tab. -To ensure Java and Maven are correctly installed, run the commands: +* [Check if Kafka Topic has been created] - Once you are inside the EC2 machine, you should be in the /home/ec2-user folder. Check to see the contents of the file kafka_topic_creator_output.txt by running the command cat kafka_topic_creator_output.txt. You should see an output such as "Created topic MskIamJavaLambdaTopic." -java --version +If you are not able to find the file kafka_topic_creator_output.txt or if it is blank or you see an error message, then you need to run the file ./kafka_topic_creator.sh. This file runs a script that goes and creates the Kafka topic that the Lambda function will subscribe to. -mvn --version +## Pre-requisites to Deploy the sample Lambda function - -## Deploy the sample application +The EC2 machine that was created by running the Cloudformation template has all the software that will be needed to deploy the Lambda function. The AWS SAM CLI is a serverless tool for building and testing Lambda applications. It uses Docker to locally test your functions in an Amazon Linux environment that resembles the Lambda execution environment. It can also emulate your application's build environment and API. -To use the AWS SAM CLI, you need the following tools. +* Java - On the EC2 machine, we have installed java-openjdk11-devel but you can also install Amazon Corretto by modifying the Cloudformation UserData script if you wish to (https://docs.aws.amazon.com/linux/al2/ug/java.html) +* Maven - On the EC2 machine, we have installed Maven (https://maven.apache.org/install.html) +* AWS SAM CLI - We have installed the AWS SAM CLI (https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html) +* Docker - We have installed the Docker Community Edition on the EC2 machine (https://hub.docker.com/search/?type=edition&offering=community) -* AWS SAM CLI - [Install the AWS SAM CLI](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html) -* Docker - [Install Docker community edition](https://hub.docker.com/search/?type=edition&offering=community) - -1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository: +We have also cloned the Github repository for serverless-patterns on the EC2 machine already by running the below command ``` git clone https://github.com/aws-samples/serverless-patterns.git ``` -1. Change directory to the pattern directory: +Change directory to the pattern directory: ``` - cd msk-lambda-iam-java-sam + cd serverless-patterns/msk-lambda-iam-java-sam ``` ## Use the SAM CLI to build and test locally @@ -82,23 +65,81 @@ Run functions locally and invoke them with the `sam local invoke` command. sam local invoke --event events/event.json ``` -## Deploy the sample application +You should see a response such as the below: -The Serverless Application Model Command Line Interface (SAM CLI) is an extension of the AWS CLI that adds functionality for building and testing Lambda applications. It uses Docker to run your functions in an Amazon Linux environment that matches Lambda. It can also emulate your application's build environment and API. +``` +***** Begin sam local invoke response ***** + +Invoking com.amazonaws.services.lambda.samples.events.msk.HandlerMSK::handleRequest (java11) +Local image is up-to-date +Using local image: public.ecr.aws/lambda/java:11-rapid-x86_64. + +Mounting /home/ec2-user/serverless-patterns/msk-lambda-iam-java-sam/.aws-sam/build/LambdaMSKConsumerJavaFunction as /var/task:ro,delegated, inside +runtime container +START RequestId: 4484bb15-6749-4307-92d1-8ba2221e2218 Version: $LATEST +START RequestId: 4484bb15-6749-4307-92d1-8ba2221e2218 Version: $LATEST +Picked up JAVA_TOOL_OPTIONS: -XX:+TieredCompilation -XX:TieredStopAtLevel=1 +Received this message from Kafka - KafkaMessage [topic=myTopic, partition=0, timestamp=1678072110111, timestampType=CREATE_TIME, key=null, value=Zg==, decodedKey=null, decodedValue=f, headers=[]]Message in JSON format : { + "topic": "myTopic", + "partition": 0, + "offset": 250, + "timestamp": 1678072110111, + "timestampType": "CREATE_TIME", + "value": "Zg\u003d\u003d", + "decodedKey": "null", + "decodedValue": "f", + "headers": [] +}Received this message from Kafka - KafkaMessage [topic=myTopic, partition=0, timestamp=1678072111086, timestampType=CREATE_TIME, key=null, value=Zw==, decodedKey=null, decodedValue=g, headers=[]]Message in JSON format : { + "topic": "myTopic", + "partition": 0, + "offset": 251, + "timestamp": 1678072111086, + "timestampType": "CREATE_TIME", + "value": "Zw\u003d\u003d", + "decodedKey": "null", + "decodedValue": "g", + "headers": [] +}All Messages in this batch = [ + { + "topic": "myTopic", + "partition": 0, + "offset": 250, + "timestamp": 1678072110111, + "timestampType": "CREATE_TIME", + "value": "Zg\u003d\u003d", + "decodedKey": "null", + "decodedValue": "f", + "headers": [] + }, + { + "topic": "myTopic", + "partition": 0, + "offset": 251, + "timestamp": 1678072111086, + "timestampType": "CREATE_TIME", + "value": "Zw\u003d\u003d", + "decodedKey": "null", + "decodedValue": "g", + "headers": [] + } +]END RequestId: fc96203d-e0c0-4c30-b332-d16708b25d3e +REPORT RequestId: fc96203d-e0c0-4c30-b332-d16708b25d3e Init Duration: 0.06 ms Duration: 474.31 ms Billed Duration: 475 ms Memory Size: 512 MB Max Memory Used: 512 MB +"200 OK" + +***** End sam local invoke response ***** +``` -To use the SAM CLI, you need the following tools. -* SAM CLI - [Install the SAM CLI](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html) -* Docker - [Install Docker community edition](https://hub.docker.com/search/?type=edition&offering=community) +## Deploy the sample application -To build and deploy your application for the first time, run the following in your shell: + +To deploy your application for the first time, run the following in your shell: ```bash -sam build -sam deploy --guided +sam deploy --capabilities CAPABILITY_IAM --no-confirm-changeset --no-disable-rollback --region $AWS_REGION --stack-name msk-lambda-iam-java-sam --guided ``` -The first command will build the source of your application. The second command will package and deploy your application to AWS, with a series of prompts: +The sam deploy command will package and deploy your application to AWS, with a series of prompts. You can accept all the defaults by hitting Enter: * **Stack Name**: The name of the stack to deploy to CloudFormation. This should be unique to your account and region, and a good starting point would be something matching your project name. * **AWS Region**: The AWS region you want to deploy your app to. @@ -119,6 +160,24 @@ You should get a message "Successfully created/updated stack - in My first message +>My second message +>My third message +>My fourth message +>My fifth message +>My sixth message +>My seventh message +>My eigth message +>My ninth message +>My tenth message + Either send at least 10 messages or wait for 300 seconds (check the values of BatchSize: 10 and MaximumBatchingWindowInSeconds: 300 in the template.yaml file) Then check Cloudwatch logs and you should see messages for the Cloudwatch Log Group with the name of the deployed Lambda function. @@ -132,3 +191,17 @@ Each key has a list of messages. Each Kafka message has the following properties The Key and Value are base64 encoded and have to be decoded. A message can also have a list of headers, each header having a key and a value. The code in this example prints out the fields in the Kafka message and also decrypts the key and the value and logs them in Cloudwatch logs. + +## Cleanup + +You can first clean-up the Lambda function by running the sam delete command + +``` +cd /home/ec2-user/serverless-patterns/msk-lambda-iam-java-sam +sam delete + +``` +confirm by pressing y for both the questions +You should see the lambda function getting deleted and a final confirmation "Deleted successfully" on the command-line + +Next you need to delete the Cloudformation template that created the MSK Server and the EC2 machine by going to the Cloudformation console and selecting the stack and then hitting the "Delete" button. It will run for sometime but eventually you should see the stack getting cleaned up. \ No newline at end of file diff --git a/msk-lambda-iam-java-sam/README_Backup.md b/msk-lambda-iam-java-sam/README_Backup.md new file mode 100644 index 000000000..f58d74424 --- /dev/null +++ b/msk-lambda-iam-java-sam/README_Backup.md @@ -0,0 +1,134 @@ +# msk-lambda-iam-java-sam +# Java AWS Lambda Kafka consumer with IAM auth, using AWS SAM + +This pattern is an example of a Lambda function that consumes messages from an Amazon Managed Streaming for Kafka (Amazon MSK) topic, where the MSK Cluster has been configured to use IAM authentication. This pattern assumes you already have an MSK cluster with a topic configured, if you need a sample pattern to deploy an MSK cluster either in Provisioned or Serverless modes please see the [msk-cfn-sasl-lambda pattern](https://serverlessland.com/patterns/msk-cfn-sasl-lambda). + +This project contains source code and supporting files for a serverless application that you can deploy with the SAM CLI. It includes the following files and folders. + +- kafka_event_consumer_function/src/main/java - Code for the application's Lambda function. +- events - Invocation events that you can use to invoke the function. +- kafka_event_consumer_function/src/test/java - Unit tests for the application code. +- template.yaml - A template that defines the application's AWS resources. + +The application uses several AWS resources, including Lambda functions and an MSK event source. These resources are defined in the `template.yaml` file in this project. You can update the template to add AWS resources through the same deployment process that updates your application code. + +Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example. + +## Requirements + +* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources. +* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured +* [Git installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git) +* [AWS Serverless Application Model](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html) (AWS SAM) installed +* Create MSK cluster and topic that will be used for testing. It is important to create the topic before deploying the Lambda function, otherwise the event source mapping will stay disabled. + + +Before proceeding with the next step, please make sure you have Java JDK and Maven installed on your machine + +For the latest version of Amazon Corretto JDK (at the time of publishing), please go the following link: + +https://docs.aws.amazon.com/corretto/latest/corretto-21-ug/downloads-list.html + +Please follow the instructions to download and install the JDK for your Operating System + +Note that you don't have to use Amazon Corretto JDK but can use JDK from another source as well + +For the latest version of Maven (at the time of publishing) please go the following link: + +https://maven.apache.org/download.cgi#:~:text=Apache%20Maven%203.9.9%20is,recommended%20version%20for%20all%20users. + +Please follow the instructions to download and install Maven and then add the location to the bin folder of Maven in your System PATH + +To ensure Java and Maven are correctly installed, run the commands: + +java --version + +mvn --version + + +## Deploy the sample application + +The AWS SAM CLI is a serverless tool for building and testing Lambda applications. It uses Docker to locally test your functions in an Amazon Linux environment that resembles the Lambda execution environment. It can also emulate your application's build environment and API. + +To use the AWS SAM CLI, you need the following tools. + +* AWS SAM CLI - [Install the AWS SAM CLI](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html) +* Docker - [Install Docker community edition](https://hub.docker.com/search/?type=edition&offering=community) + +1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository: + ``` + git clone https://github.com/aws-samples/serverless-patterns.git + ``` +1. Change directory to the pattern directory: + ``` + cd msk-lambda-iam-java-sam + ``` + +## Use the SAM CLI to build and test locally + +Build your application with the `sam build` command. + +```bash +sam build +``` + +The SAM CLI installs dependencies defined in `kafka_event_consumer_function/pom.xml`, creates a deployment package, and saves it in the `.aws-sam/build` folder. + +Test a single function by invoking it directly with a test event. An event is a JSON document that represents the input that the function receives from the event source. Test events are included in the `events` folder in this project. + +Run functions locally and invoke them with the `sam local invoke` command. + +```bash +sam local invoke --event events/event.json +``` + +## Deploy the sample application + +The Serverless Application Model Command Line Interface (SAM CLI) is an extension of the AWS CLI that adds functionality for building and testing Lambda applications. It uses Docker to run your functions in an Amazon Linux environment that matches Lambda. It can also emulate your application's build environment and API. + +To use the SAM CLI, you need the following tools. + +* SAM CLI - [Install the SAM CLI](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html) +* Docker - [Install Docker community edition](https://hub.docker.com/search/?type=edition&offering=community) + +To build and deploy your application for the first time, run the following in your shell: + +```bash +sam build +sam deploy --guided +``` + +The first command will build the source of your application. The second command will package and deploy your application to AWS, with a series of prompts: + +* **Stack Name**: The name of the stack to deploy to CloudFormation. This should be unique to your account and region, and a good starting point would be something matching your project name. +* **AWS Region**: The AWS region you want to deploy your app to. +* **Parameter MSKClusterName**: The name of the MSKCluster +* **Parameter MSKClusterId**: The unique ID of the MSKCluster +* **Parameter MSKTopic**: The Kafka topic on which the lambda function will listen on +* **Confirm changes before deploy**: If set to yes, any change sets will be shown to you before execution for manual review. If set to no, the AWS SAM CLI will automatically deploy application changes. +* **Allow SAM CLI IAM role creation**: Many AWS SAM templates, including this example, create AWS IAM roles required for the AWS Lambda function(s) included to access AWS services. By default, these are scoped down to minimum required permissions. To deploy an AWS CloudFormation stack which creates or modifies IAM roles, the `CAPABILITY_IAM` value for `capabilities` must be provided. If permission isn't provided through this prompt, to deploy this example you must explicitly pass `--capabilities CAPABILITY_IAM` to the `sam deploy` command. +* **Disable rollback**: Defaults to No and it preserves the state of previously provisioned resources when an operation fails +* **Save arguments to configuration file**: If set to yes, your choices will be saved to a configuration file inside the project, so that in the future you can just re-run `sam deploy` without parameters to deploy changes to your application. +* **SAM configuration file [samconfig.toml]**: Name of the configuration file to store configuration information locally +* **SAM configuration environment [default]**: Environment for storing deployment information locally + +You should get a message "Successfully created/updated stack - in " if all goes well + + +## Test the sample application + +Once the lambda function is deployed, send some Kafka messages on the topic that the lambda function is listening on, on the MSK server. + +Either send at least 10 messages or wait for 300 seconds (check the values of BatchSize: 10 and MaximumBatchingWindowInSeconds: 300 in the template.yaml file) + +Then check Cloudwatch logs and you should see messages for the Cloudwatch Log Group with the name of the deployed Lambda function. + +The lambda code parses the Kafka messages and outputs the fields in the Kafka messages to Cloudwatch logs + +A single lambda function receives a batch of messages. The messages are received as a map with each key being a combination of the topic and the partition, as a single batch can receive messages from multiple partitions. + +Each key has a list of messages. Each Kafka message has the following properties - Topic, Partition, Offset, TimeStamp, TimeStampType, Key and Value + +The Key and Value are base64 encoded and have to be decoded. A message can also have a list of headers, each header having a key and a value. + +The code in this example prints out the fields in the Kafka message and also decrypts the key and the value and logs them in Cloudwatch logs. From 37ea4e2fdce959e30add7931216af7aa4a678478 Mon Sep 17 00:00:00 2001 From: Indranil Banerjee Date: Sat, 30 Nov 2024 23:40:44 -0800 Subject: [PATCH 05/12] Deleted the msk-lambda-iam-java-sam/README_Backup.md file --- msk-lambda-iam-java-sam/README_Backup.md | 134 ----------------------- 1 file changed, 134 deletions(-) delete mode 100644 msk-lambda-iam-java-sam/README_Backup.md diff --git a/msk-lambda-iam-java-sam/README_Backup.md b/msk-lambda-iam-java-sam/README_Backup.md deleted file mode 100644 index f58d74424..000000000 --- a/msk-lambda-iam-java-sam/README_Backup.md +++ /dev/null @@ -1,134 +0,0 @@ -# msk-lambda-iam-java-sam -# Java AWS Lambda Kafka consumer with IAM auth, using AWS SAM - -This pattern is an example of a Lambda function that consumes messages from an Amazon Managed Streaming for Kafka (Amazon MSK) topic, where the MSK Cluster has been configured to use IAM authentication. This pattern assumes you already have an MSK cluster with a topic configured, if you need a sample pattern to deploy an MSK cluster either in Provisioned or Serverless modes please see the [msk-cfn-sasl-lambda pattern](https://serverlessland.com/patterns/msk-cfn-sasl-lambda). - -This project contains source code and supporting files for a serverless application that you can deploy with the SAM CLI. It includes the following files and folders. - -- kafka_event_consumer_function/src/main/java - Code for the application's Lambda function. -- events - Invocation events that you can use to invoke the function. -- kafka_event_consumer_function/src/test/java - Unit tests for the application code. -- template.yaml - A template that defines the application's AWS resources. - -The application uses several AWS resources, including Lambda functions and an MSK event source. These resources are defined in the `template.yaml` file in this project. You can update the template to add AWS resources through the same deployment process that updates your application code. - -Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example. - -## Requirements - -* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources. -* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured -* [Git installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git) -* [AWS Serverless Application Model](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html) (AWS SAM) installed -* Create MSK cluster and topic that will be used for testing. It is important to create the topic before deploying the Lambda function, otherwise the event source mapping will stay disabled. - - -Before proceeding with the next step, please make sure you have Java JDK and Maven installed on your machine - -For the latest version of Amazon Corretto JDK (at the time of publishing), please go the following link: - -https://docs.aws.amazon.com/corretto/latest/corretto-21-ug/downloads-list.html - -Please follow the instructions to download and install the JDK for your Operating System - -Note that you don't have to use Amazon Corretto JDK but can use JDK from another source as well - -For the latest version of Maven (at the time of publishing) please go the following link: - -https://maven.apache.org/download.cgi#:~:text=Apache%20Maven%203.9.9%20is,recommended%20version%20for%20all%20users. - -Please follow the instructions to download and install Maven and then add the location to the bin folder of Maven in your System PATH - -To ensure Java and Maven are correctly installed, run the commands: - -java --version - -mvn --version - - -## Deploy the sample application - -The AWS SAM CLI is a serverless tool for building and testing Lambda applications. It uses Docker to locally test your functions in an Amazon Linux environment that resembles the Lambda execution environment. It can also emulate your application's build environment and API. - -To use the AWS SAM CLI, you need the following tools. - -* AWS SAM CLI - [Install the AWS SAM CLI](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html) -* Docker - [Install Docker community edition](https://hub.docker.com/search/?type=edition&offering=community) - -1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository: - ``` - git clone https://github.com/aws-samples/serverless-patterns.git - ``` -1. Change directory to the pattern directory: - ``` - cd msk-lambda-iam-java-sam - ``` - -## Use the SAM CLI to build and test locally - -Build your application with the `sam build` command. - -```bash -sam build -``` - -The SAM CLI installs dependencies defined in `kafka_event_consumer_function/pom.xml`, creates a deployment package, and saves it in the `.aws-sam/build` folder. - -Test a single function by invoking it directly with a test event. An event is a JSON document that represents the input that the function receives from the event source. Test events are included in the `events` folder in this project. - -Run functions locally and invoke them with the `sam local invoke` command. - -```bash -sam local invoke --event events/event.json -``` - -## Deploy the sample application - -The Serverless Application Model Command Line Interface (SAM CLI) is an extension of the AWS CLI that adds functionality for building and testing Lambda applications. It uses Docker to run your functions in an Amazon Linux environment that matches Lambda. It can also emulate your application's build environment and API. - -To use the SAM CLI, you need the following tools. - -* SAM CLI - [Install the SAM CLI](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html) -* Docker - [Install Docker community edition](https://hub.docker.com/search/?type=edition&offering=community) - -To build and deploy your application for the first time, run the following in your shell: - -```bash -sam build -sam deploy --guided -``` - -The first command will build the source of your application. The second command will package and deploy your application to AWS, with a series of prompts: - -* **Stack Name**: The name of the stack to deploy to CloudFormation. This should be unique to your account and region, and a good starting point would be something matching your project name. -* **AWS Region**: The AWS region you want to deploy your app to. -* **Parameter MSKClusterName**: The name of the MSKCluster -* **Parameter MSKClusterId**: The unique ID of the MSKCluster -* **Parameter MSKTopic**: The Kafka topic on which the lambda function will listen on -* **Confirm changes before deploy**: If set to yes, any change sets will be shown to you before execution for manual review. If set to no, the AWS SAM CLI will automatically deploy application changes. -* **Allow SAM CLI IAM role creation**: Many AWS SAM templates, including this example, create AWS IAM roles required for the AWS Lambda function(s) included to access AWS services. By default, these are scoped down to minimum required permissions. To deploy an AWS CloudFormation stack which creates or modifies IAM roles, the `CAPABILITY_IAM` value for `capabilities` must be provided. If permission isn't provided through this prompt, to deploy this example you must explicitly pass `--capabilities CAPABILITY_IAM` to the `sam deploy` command. -* **Disable rollback**: Defaults to No and it preserves the state of previously provisioned resources when an operation fails -* **Save arguments to configuration file**: If set to yes, your choices will be saved to a configuration file inside the project, so that in the future you can just re-run `sam deploy` without parameters to deploy changes to your application. -* **SAM configuration file [samconfig.toml]**: Name of the configuration file to store configuration information locally -* **SAM configuration environment [default]**: Environment for storing deployment information locally - -You should get a message "Successfully created/updated stack - in " if all goes well - - -## Test the sample application - -Once the lambda function is deployed, send some Kafka messages on the topic that the lambda function is listening on, on the MSK server. - -Either send at least 10 messages or wait for 300 seconds (check the values of BatchSize: 10 and MaximumBatchingWindowInSeconds: 300 in the template.yaml file) - -Then check Cloudwatch logs and you should see messages for the Cloudwatch Log Group with the name of the deployed Lambda function. - -The lambda code parses the Kafka messages and outputs the fields in the Kafka messages to Cloudwatch logs - -A single lambda function receives a batch of messages. The messages are received as a map with each key being a combination of the topic and the partition, as a single batch can receive messages from multiple partitions. - -Each key has a list of messages. Each Kafka message has the following properties - Topic, Partition, Offset, TimeStamp, TimeStampType, Key and Value - -The Key and Value are base64 encoded and have to be decoded. A message can also have a list of headers, each header having a key and a value. - -The code in this example prints out the fields in the Kafka message and also decrypts the key and the value and logs them in Cloudwatch logs. From 358a2b1ccb2f5ab09db5e6bb40d0064a1e9bade7 Mon Sep 17 00:00:00 2001 From: Indranil Banerjee Date: Sun, 1 Dec 2024 17:53:54 -0800 Subject: [PATCH 06/12] Fixed an issue in Cloudformation template to handle serverless MSK --- msk-lambda-iam-java-sam/MSKAndKafkaClientEC2.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/msk-lambda-iam-java-sam/MSKAndKafkaClientEC2.yaml b/msk-lambda-iam-java-sam/MSKAndKafkaClientEC2.yaml index 6a5e7ccdb..7b7b34173 100644 --- a/msk-lambda-iam-java-sam/MSKAndKafkaClientEC2.yaml +++ b/msk-lambda-iam-java-sam/MSKAndKafkaClientEC2.yaml @@ -545,7 +545,7 @@ Resources: aws ec2 authorize-security-group-ingress --region $AWS_REGION --group-id $SECURITY_GROUP --protocol tcp --port 22 --cidr $EC2_CONNECT_IP - security_group_id : !GetAtt KafkaClientInstanceSecurityGroup.GroupId - msk_cluster_arn : !GetAtt MSKCluster.Arn + msk_cluster_arn : !GetAtt ServerlessMSKCluster.Arn kafka_topic_for_lambda : !Ref KafkaTopicForLambda msk_kafka_version: !Ref MSKKafkaVersion apache_kafka_installer_location: !Ref ApacheKafkaInstallerLocation From 31b7cb4e08f28be95c6d32deb86a0719c629f530 Mon Sep 17 00:00:00 2001 From: Indranil Banerjee Date: Mon, 2 Dec 2024 11:46:12 -0800 Subject: [PATCH 07/12] Fixed the Readme --- msk-lambda-iam-java-sam/README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/msk-lambda-iam-java-sam/README.md b/msk-lambda-iam-java-sam/README.md index a29f03673..c448e891d 100644 --- a/msk-lambda-iam-java-sam/README.md +++ b/msk-lambda-iam-java-sam/README.md @@ -22,6 +22,7 @@ Important: this application uses various AWS services and there are costs associ * [Run the Cloudformation template using the file MSKAndKafkaClientEC2.yaml] - You can go to the AWS Cloudformation console, create a new stack by specifying the template file. You can keep the defaults for input parameters or modify them as necessary. Wait for the Cloudformation stack to be created. This Cloudformation template will create an MSK cluster (Provisioned or Serverless based on your selection). It will also create an EC2 machine that you can use as a client. * [Connect to the EC2 machine] - Once the Cloudformation stack is created, you can go to the EC2 console and log into the machine using either "Connect using EC2 Instance Connect" or "Connect using EC2 Instance Connect Endpoint" option under the "EC2 Instance Connect" tab. +Note: You may need to wait for some time after the Cloudformation stack is created, as some UserData scripts continue running after the Cloudformation stack shows Created. * [Check if Kafka Topic has been created] - Once you are inside the EC2 machine, you should be in the /home/ec2-user folder. Check to see the contents of the file kafka_topic_creator_output.txt by running the command cat kafka_topic_creator_output.txt. You should see an output such as "Created topic MskIamJavaLambdaTopic." @@ -166,6 +167,7 @@ cd /home/ec2-user You should see a script called kafka_message_sender.sh. Run that script and you should be able to send a new Kafka message in every line as shown below +``` [ec2-user@ip-10-0-0-126 ~]$ sh kafka_message_sender.sh >My first message >My second message @@ -177,6 +179,8 @@ You should see a script called kafka_message_sender.sh. Run that script and you >My eigth message >My ninth message >My tenth message +>Ctrl-C +``` Either send at least 10 messages or wait for 300 seconds (check the values of BatchSize: 10 and MaximumBatchingWindowInSeconds: 300 in the template.yaml file) From 221b3d3c2bdf316513e180af219d4ee371dce7ba Mon Sep 17 00:00:00 2001 From: indranil-banerjee-aws Date: Tue, 10 Dec 2024 00:21:37 -0800 Subject: [PATCH 08/12] Fixed msk-lambda-iam-java-sam/MSKAndKafkaClientEC2.yaml to remove dependency on EC2 Metadata Service to find AWS region --- msk-lambda-iam-java-sam/MSKAndKafkaClientEC2.yaml | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/msk-lambda-iam-java-sam/MSKAndKafkaClientEC2.yaml b/msk-lambda-iam-java-sam/MSKAndKafkaClientEC2.yaml index 7b7b34173..aa4c1621c 100644 --- a/msk-lambda-iam-java-sam/MSKAndKafkaClientEC2.yaml +++ b/msk-lambda-iam-java-sam/MSKAndKafkaClientEC2.yaml @@ -354,7 +354,7 @@ Resources: echo "#!/bin/bash" > kafka_topic_creator.sh sudo chmod +x kafka_topic_creator.sh echo "MSK_CLUSTER_ARN=$MSK_CLUSTER_ARN" >> kafka_topic_creator.sh - AWS_REGION=$(curl -fsq http://169.254.169.254/latest/meta-data/placement/availability-zone | sed 's/[a-z]$//') + AWS_REGION=${aws_region} echo "AWS_REGION=$AWS_REGION" >> kafka_topic_creator.sh echo "BOOTSTRAP_BROKERS_IAM=\$(aws kafka get-bootstrap-brokers --region \$AWS_REGION --cluster-arn \$MSK_CLUSTER_ARN --query 'BootstrapBrokerStringSaslIam' --output text)" >> kafka_topic_creator.sh echo "sleep 5" >> kafka_topic_creator.sh @@ -392,7 +392,7 @@ Resources: cd ip_prefix git clone https://github.com/joetek/aws-ip-ranges-json.git cd aws-ip-ranges-json - AWS_REGION=$(curl -s http://169.254.169.254/latest/meta-data/placement/availability-zone | sed 's/\(.*\)[a-z]/\1/') + AWS_REGION=${aws_region} EC2_CONNECT_IP=$(cat ip-ranges-ec2-instance-connect.json | jq -r --arg AWS_REGION "$AWS_REGION" '.prefixes[] | select(.region==$AWS_REGION).ip_prefix') echo "export EC2_CONNECT_IP=$EC2_CONNECT_IP" >> /home/ec2-user/.bash_profile SECURITY_GROUP=${security_group_id} @@ -405,6 +405,7 @@ Resources: msk_kafka_version: !Ref MSKKafkaVersion apache_kafka_installer_location: !Ref ApacheKafkaInstallerLocation serverless_land_github_location: !Ref ServerlessLandGithubLocation + aws_region: !Ref 'AWS::Region' KafkaClientEC2InstanceServerless: Condition: CreateServerlessCluster @@ -499,7 +500,7 @@ Resources: echo "#!/bin/bash" > kafka_topic_creator.sh sudo chmod +x kafka_topic_creator.sh echo "MSK_CLUSTER_ARN=$MSK_CLUSTER_ARN" >> kafka_topic_creator.sh - AWS_REGION=$(curl -fsq http://169.254.169.254/latest/meta-data/placement/availability-zone | sed 's/[a-z]$//') + AWS_REGION=${aws_region} echo "AWS_REGION=$AWS_REGION" >> kafka_topic_creator.sh echo "BOOTSTRAP_BROKERS_IAM=\$(aws kafka get-bootstrap-brokers --region \$AWS_REGION --cluster-arn \$MSK_CLUSTER_ARN --query 'BootstrapBrokerStringSaslIam' --output text)" >> kafka_topic_creator.sh echo "sleep 5" >> kafka_topic_creator.sh @@ -537,7 +538,7 @@ Resources: cd ip_prefix git clone https://github.com/joetek/aws-ip-ranges-json.git cd aws-ip-ranges-json - AWS_REGION=$(curl -s http://169.254.169.254/latest/meta-data/placement/availability-zone | sed 's/\(.*\)[a-z]/\1/') + AWS_REGION=${aws_region} EC2_CONNECT_IP=$(cat ip-ranges-ec2-instance-connect.json | jq -r --arg AWS_REGION "$AWS_REGION" '.prefixes[] | select(.region==$AWS_REGION).ip_prefix') echo "export EC2_CONNECT_IP=$EC2_CONNECT_IP" >> /home/ec2-user/.bash_profile SECURITY_GROUP=${security_group_id} @@ -550,6 +551,7 @@ Resources: msk_kafka_version: !Ref MSKKafkaVersion apache_kafka_installer_location: !Ref ApacheKafkaInstallerLocation serverless_land_github_location: !Ref ServerlessLandGithubLocation + aws_region: !Ref 'AWS::Region' EC2InstanceEndpoint: Type: AWS::EC2::InstanceConnectEndpoint From 6ff8c909de5f9ae4a6a3911772a725357d2228b6 Mon Sep 17 00:00:00 2001 From: indranil-banerjee-aws Date: Fri, 13 Dec 2024 09:41:49 -0800 Subject: [PATCH 09/12] Made Java Version selectable as input parameter in Cloudformation template --- .../MSKAndKafkaClientEC2.yaml | 55 +++++++++++++++---- .../template_original.yaml | 2 +- 2 files changed, 45 insertions(+), 12 deletions(-) diff --git a/msk-lambda-iam-java-sam/MSKAndKafkaClientEC2.yaml b/msk-lambda-iam-java-sam/MSKAndKafkaClientEC2.yaml index aa4c1621c..24919067e 100644 --- a/msk-lambda-iam-java-sam/MSKAndKafkaClientEC2.yaml +++ b/msk-lambda-iam-java-sam/MSKAndKafkaClientEC2.yaml @@ -10,10 +10,18 @@ Parameters: ConstraintDescription: Must specify Serverless or Provisioned. LatestAmiId: Type: 'AWS::SSM::Parameter::Value' - Default: '/aws/service/ami-amazon-linux-latest/amzn2-ami-hvm-x86_64-gp2' + Default: '/aws/service/ami-amazon-linux-latest/al2023-ami-kernel-6.1-x86_64' MSKKafkaVersion: Type: String Default: 3.5.1 + JavaVersion: + Type: String + Description: Choose the version of Java. Lambda currently supports Java 11, 17 and 21 + AllowedValues: + - java11 + - java17 + - java21 + Default: java21 ApacheKafkaInstallerLocation: Type: String Default: https://archive.apache.org/dist/kafka/3.5.1/kafka_2.13-3.5.1.tgz @@ -290,17 +298,28 @@ Resources: - | #!/bin/bash yum update -y - yum install java-openjdk11-devel -y + # yum install java-openjdk11-devel -y + + # install Java + JAVA_VERSION=${java_version} + echo "JAVA_VERSION=$JAVA_VERSION" >> /home/ec2-user/.bash_profile + if [ "$JAVA_VERSION" == "java11" ]; then + sudo yum install java-11-amazon-corretto-devel -y + elif [ "$JAVA_VERSION" == "java17" ]; then + sudo yum install java-17-amazon-corretto-devel -y + elif [ "$JAVA_VERSION" == "java21" ]; then + sudo yum install java-21-amazon-corretto-devel -y + else + sudo yum install java-21-amazon-corretto-devel -y + fi + yum install nmap-ncat -y yum install git -y yum erase awscli -y yum install jq -y - amazon-linux-extras install docker -y - amazon-linux-extras install python3.8 -y + sudo yum install -y docker service docker start usermod -a -G docker ec2-user - unlink /usr/bin/python3 - ln -s /usr/bin/python3.8 /usr/bin/python3 sudo wget https://repos.fedorapeople.org/repos/dchen/apache-maven/epel-apache-maven.repo -O /etc/yum.repos.d/epel-apache-maven.repo sudo sed -i s/\$releasever/6/g /etc/yum.repos.d/epel-apache-maven.repo sudo yum install -y apache-maven @@ -385,6 +404,7 @@ Resources: sed -i "s/CLUSTER_NAME/$CLUSTER_NAME/g" template.yaml sed -i "s/CLUSTER_ID/$CLUSTER_ID/g" template.yaml sed -i "s/KAFKA_TOPIC/$KAFKA_TOPIC/g" template.yaml + sed -i "s/JAVA_VERSION/$JAVA_VERSION/g" template.yaml # Get IP CIDR range for EC2 Instance Connect cd /home/ec2-user @@ -406,6 +426,7 @@ Resources: apache_kafka_installer_location: !Ref ApacheKafkaInstallerLocation serverless_land_github_location: !Ref ServerlessLandGithubLocation aws_region: !Ref 'AWS::Region' + java_version: !Ref JavaVersion KafkaClientEC2InstanceServerless: Condition: CreateServerlessCluster @@ -436,17 +457,27 @@ Resources: - | #!/bin/bash yum update -y - yum install java-openjdk11-devel -y + + # install Java + JAVA_VERSION=${java_version} + echo "JAVA_VERSION=$JAVA_VERSION" >> /home/ec2-user/.bash_profile + if [ "$JAVA_VERSION" == "java11" ]; then + sudo yum install java-11-amazon-corretto-devel -y + elif [ "$JAVA_VERSION" == "java17" ]; then + sudo yum install java-17-amazon-corretto-devel -y + elif [ "$JAVA_VERSION" == "java21" ]; then + sudo yum install java-21-amazon-corretto-devel -y + else + sudo yum install java-21-amazon-corretto-devel -y + fi + yum install nmap-ncat -y yum install git -y yum erase awscli -y yum install jq -y - amazon-linux-extras install docker -y - amazon-linux-extras install python3.8 -y + sudo yum install -y docker service docker start usermod -a -G docker ec2-user - unlink /usr/bin/python3 - ln -s /usr/bin/python3.8 /usr/bin/python3 sudo wget https://repos.fedorapeople.org/repos/dchen/apache-maven/epel-apache-maven.repo -O /etc/yum.repos.d/epel-apache-maven.repo sudo sed -i s/\$releasever/6/g /etc/yum.repos.d/epel-apache-maven.repo sudo yum install -y apache-maven @@ -531,6 +562,7 @@ Resources: sed -i "s/CLUSTER_NAME/$CLUSTER_NAME/g" template.yaml sed -i "s/CLUSTER_ID/$CLUSTER_ID/g" template.yaml sed -i "s/KAFKA_TOPIC/$KAFKA_TOPIC/g" template.yaml + sed -i "s/JAVA_VERSION/$JAVA_VERSION/g" template.yaml # Get IP CIDR range for EC2 Instance Connect cd /home/ec2-user @@ -552,6 +584,7 @@ Resources: apache_kafka_installer_location: !Ref ApacheKafkaInstallerLocation serverless_land_github_location: !Ref ServerlessLandGithubLocation aws_region: !Ref 'AWS::Region' + java_version: !Ref JavaVersion EC2InstanceEndpoint: Type: AWS::EC2::InstanceConnectEndpoint diff --git a/msk-lambda-iam-java-sam/template_original.yaml b/msk-lambda-iam-java-sam/template_original.yaml index ded6e8618..5b8140b2a 100644 --- a/msk-lambda-iam-java-sam/template_original.yaml +++ b/msk-lambda-iam-java-sam/template_original.yaml @@ -16,7 +16,7 @@ Resources: Properties: CodeUri: kafka_event_consumer_function Handler: com.amazonaws.services.lambda.samples.events.msk.HandlerMSK::handleRequest - Runtime: java11 + Runtime: JAVA_VERSION Architectures: - x86_64 MemorySize: 512 From 88f9343e90734ce9c9e92cea28c805092042999b Mon Sep 17 00:00:00 2001 From: indranil-banerjee-aws Date: Fri, 13 Dec 2024 11:06:50 -0800 Subject: [PATCH 10/12] Updated the Readme.md file to mention that the Java version is now selectable --- msk-lambda-iam-java-sam/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/msk-lambda-iam-java-sam/README.md b/msk-lambda-iam-java-sam/README.md index c448e891d..c7a93085d 100644 --- a/msk-lambda-iam-java-sam/README.md +++ b/msk-lambda-iam-java-sam/README.md @@ -34,7 +34,7 @@ The EC2 machine that was created by running the Cloudformation template has all The AWS SAM CLI is a serverless tool for building and testing Lambda applications. It uses Docker to locally test your functions in an Amazon Linux environment that resembles the Lambda execution environment. It can also emulate your application's build environment and API. -* Java - On the EC2 machine, we have installed java-openjdk11-devel but you can also install Amazon Corretto by modifying the Cloudformation UserData script if you wish to (https://docs.aws.amazon.com/linux/al2/ug/java.html) +* Java - On the EC2 machine, we have installed the version of Java that you selected. We have installed Amazon Corrretto JDK of the version that you had selected at the time of specifying the input parameters in the Cloudformation template. At the time of publishing this pattern, only Java versions 11, 17 and 21 are supported by AWS SAM * Maven - On the EC2 machine, we have installed Maven (https://maven.apache.org/install.html) * AWS SAM CLI - We have installed the AWS SAM CLI (https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html) * Docker - We have installed the Docker Community Edition on the EC2 machine (https://hub.docker.com/search/?type=edition&offering=community) @@ -208,4 +208,4 @@ sam delete confirm by pressing y for both the questions You should see the lambda function getting deleted and a final confirmation "Deleted successfully" on the command-line -Next you need to delete the Cloudformation template that created the MSK Server and the EC2 machine by going to the Cloudformation console and selecting the stack and then hitting the "Delete" button. It will run for sometime but eventually you should see the stack getting cleaned up. \ No newline at end of file +Next you need to delete the Cloudformation template that created the MSK Server and the EC2 machine by going to the Cloudformation console and selecting the stack and then hitting the "Delete" button. It will run for sometime but eventually you should see the stack getting cleaned up. If you get an error message that says the stack could not be deleted, please retry again and do a Force Delete. The reason this may happen is because ENIs created by the deplayed Lambda function in your VPC may prevent the VPC from being deleted even after deleting the lambda function. \ No newline at end of file From e7ea98611b695a7a2d76be9546684bb46ee1b9bb Mon Sep 17 00:00:00 2001 From: indranil-banerjee-aws Date: Thu, 19 Dec 2024 23:21:34 -0800 Subject: [PATCH 11/12] Updated CFT to put yum install commands inside retry blocks to cater to corner case of rpm lock file preventing yum installs on Amazon Linux 2023 --- .../MSKAndKafkaClientEC2.yaml | 307 +++++++++++++++--- 1 file changed, 268 insertions(+), 39 deletions(-) diff --git a/msk-lambda-iam-java-sam/MSKAndKafkaClientEC2.yaml b/msk-lambda-iam-java-sam/MSKAndKafkaClientEC2.yaml index 24919067e..c41df9f28 100644 --- a/msk-lambda-iam-java-sam/MSKAndKafkaClientEC2.yaml +++ b/msk-lambda-iam-java-sam/MSKAndKafkaClientEC2.yaml @@ -297,32 +297,146 @@ Resources: !Sub - | #!/bin/bash - yum update -y - # yum install java-openjdk11-devel -y + + max_attempts=5 + attempt_num=1 + success=false + while [ $success = false ] && [ $attempt_num -le $max_attempts ]; do + echo "Trying yum install" + yum update -y + # yum install java-openjdk11-devel -y + + # install Java + JAVA_VERSION=${java_version} + echo "JAVA_VERSION=$JAVA_VERSION" >> /home/ec2-user/.bash_profile + if [ "$JAVA_VERSION" == "java11" ]; then + sudo yum install java-11-amazon-corretto-devel -y + elif [ "$JAVA_VERSION" == "java17" ]; then + sudo yum install java-17-amazon-corretto-devel -y + elif [ "$JAVA_VERSION" == "java21" ]; then + sudo yum install java-21-amazon-corretto-devel -y + else + sudo yum install java-21-amazon-corretto-devel -y + fi + # Check the exit code of the command + if [ $? -eq 0 ]; then + echo "Yum install of Java succeeded" + success=true + else + echo "Attempt $attempt_num failed. Sleeping for 3 seconds and trying again..." + sleep 3 + ((attempt_num++)) + fi + done + + max_attempts=5 + attempt_num=1 + success=false + while [ $success = false ] && [ $attempt_num -le $max_attempts ]; do + echo "Trying yum install" + yum install nmap-ncat -y + # Check the exit code of the command + if [ $? -eq 0 ]; then + echo "Yum install of nmap succeeded" + success=true + else + echo "Attempt $attempt_num failed. Sleeping for 3 seconds and trying again..." + sleep 3 + ((attempt_num++)) + fi + done + + + max_attempts=5 + attempt_num=1 + success=false + while [ $success = false ] && [ $attempt_num -le $max_attempts ]; do + echo "Trying yum install" + yum install git -y + # Check the exit code of the command + if [ $? -eq 0 ]; then + echo "Yum install of git succeeded" + success=true + else + echo "Attempt $attempt_num failed. Sleeping for 3 seconds and trying again..." + sleep 3 + ((attempt_num++)) + fi + done - # install Java - JAVA_VERSION=${java_version} - echo "JAVA_VERSION=$JAVA_VERSION" >> /home/ec2-user/.bash_profile - if [ "$JAVA_VERSION" == "java11" ]; then - sudo yum install java-11-amazon-corretto-devel -y - elif [ "$JAVA_VERSION" == "java17" ]; then - sudo yum install java-17-amazon-corretto-devel -y - elif [ "$JAVA_VERSION" == "java21" ]; then - sudo yum install java-21-amazon-corretto-devel -y - else - sudo yum install java-21-amazon-corretto-devel -y - fi - yum install nmap-ncat -y - yum install git -y - yum erase awscli -y - yum install jq -y - sudo yum install -y docker + max_attempts=5 + attempt_num=1 + success=false + while [ $success = false ] && [ $attempt_num -le $max_attempts ]; do + echo "Trying yum install" + yum erase awscli -y + # Check the exit code of the command + if [ $? -eq 0 ]; then + echo "Yum erase of awscli succeeded" + success=true + else + echo "Attempt $attempt_num failed. Sleeping for 3 seconds and trying again..." + sleep 3 + ((attempt_num++)) + fi + done + + max_attempts=5 + attempt_num=1 + success=false + while [ $success = false ] && [ $attempt_num -le $max_attempts ]; do + echo "Trying yum install" + yum install jq -y + # Check the exit code of the command + if [ $? -eq 0 ]; then + echo "Yum install of jq succeeded" + success=true + else + echo "Attempt $attempt_num failed. Sleeping for 3 seconds and trying again..." + sleep 3 + ((attempt_num++)) + fi + done + + max_attempts=5 + attempt_num=1 + success=false + while [ $success = false ] && [ $attempt_num -le $max_attempts ]; do + echo "Trying yum install" + sudo yum install -y docker + # Check the exit code of the command + if [ $? -eq 0 ]; then + echo "Yum install of docker succeeded" + success=true + else + echo "Attempt $attempt_num failed. Sleeping for 3 seconds and trying again..." + sleep 3 + ((attempt_num++)) + fi + done + service docker start usermod -a -G docker ec2-user sudo wget https://repos.fedorapeople.org/repos/dchen/apache-maven/epel-apache-maven.repo -O /etc/yum.repos.d/epel-apache-maven.repo sudo sed -i s/\$releasever/6/g /etc/yum.repos.d/epel-apache-maven.repo - sudo yum install -y apache-maven + + max_attempts=5 + attempt_num=1 + success=false + while [ $success = false ] && [ $attempt_num -le $max_attempts ]; do + echo "Trying yum install" + sudo yum install -y apache-maven + # Check the exit code of the command + if [ $? -eq 0 ]; then + echo "Yum install of maven succeeded" + success=true + else + echo "Attempt $attempt_num failed. Sleeping for 3 seconds and trying again..." + sleep 3 + ((attempt_num++)) + fi + done cd /home/ec2-user su -c "ln -s /usr/bin/python3.8 /usr/bin/python3" -s /bin/sh ec2-user @@ -456,31 +570,146 @@ Resources: !Sub - | #!/bin/bash - yum update -y + + max_attempts=5 + attempt_num=1 + success=false + while [ $success = false ] && [ $attempt_num -le $max_attempts ]; do + echo "Trying yum install" + yum update -y + # yum install java-openjdk11-devel -y + + # install Java + JAVA_VERSION=${java_version} + echo "JAVA_VERSION=$JAVA_VERSION" >> /home/ec2-user/.bash_profile + if [ "$JAVA_VERSION" == "java11" ]; then + sudo yum install java-11-amazon-corretto-devel -y + elif [ "$JAVA_VERSION" == "java17" ]; then + sudo yum install java-17-amazon-corretto-devel -y + elif [ "$JAVA_VERSION" == "java21" ]; then + sudo yum install java-21-amazon-corretto-devel -y + else + sudo yum install java-21-amazon-corretto-devel -y + fi + # Check the exit code of the command + if [ $? -eq 0 ]; then + echo "Yum install of Java succeeded" + success=true + else + echo "Attempt $attempt_num failed. Sleeping for 3 seconds and trying again..." + sleep 3 + ((attempt_num++)) + fi + done + + max_attempts=5 + attempt_num=1 + success=false + while [ $success = false ] && [ $attempt_num -le $max_attempts ]; do + echo "Trying yum install" + yum install nmap-ncat -y + # Check the exit code of the command + if [ $? -eq 0 ]; then + echo "Yum install of nmap succeeded" + success=true + else + echo "Attempt $attempt_num failed. Sleeping for 3 seconds and trying again..." + sleep 3 + ((attempt_num++)) + fi + done + + + max_attempts=5 + attempt_num=1 + success=false + while [ $success = false ] && [ $attempt_num -le $max_attempts ]; do + echo "Trying yum install" + yum install git -y + # Check the exit code of the command + if [ $? -eq 0 ]; then + echo "Yum install of git succeeded" + success=true + else + echo "Attempt $attempt_num failed. Sleeping for 3 seconds and trying again..." + sleep 3 + ((attempt_num++)) + fi + done - # install Java - JAVA_VERSION=${java_version} - echo "JAVA_VERSION=$JAVA_VERSION" >> /home/ec2-user/.bash_profile - if [ "$JAVA_VERSION" == "java11" ]; then - sudo yum install java-11-amazon-corretto-devel -y - elif [ "$JAVA_VERSION" == "java17" ]; then - sudo yum install java-17-amazon-corretto-devel -y - elif [ "$JAVA_VERSION" == "java21" ]; then - sudo yum install java-21-amazon-corretto-devel -y - else - sudo yum install java-21-amazon-corretto-devel -y - fi - yum install nmap-ncat -y - yum install git -y - yum erase awscli -y - yum install jq -y - sudo yum install -y docker + max_attempts=5 + attempt_num=1 + success=false + while [ $success = false ] && [ $attempt_num -le $max_attempts ]; do + echo "Trying yum install" + yum erase awscli -y + # Check the exit code of the command + if [ $? -eq 0 ]; then + echo "Yum erase of awscli succeeded" + success=true + else + echo "Attempt $attempt_num failed. Sleeping for 3 seconds and trying again..." + sleep 3 + ((attempt_num++)) + fi + done + + max_attempts=5 + attempt_num=1 + success=false + while [ $success = false ] && [ $attempt_num -le $max_attempts ]; do + echo "Trying yum install" + yum install jq -y + # Check the exit code of the command + if [ $? -eq 0 ]; then + echo "Yum install of jq succeeded" + success=true + else + echo "Attempt $attempt_num failed. Sleeping for 3 seconds and trying again..." + sleep 3 + ((attempt_num++)) + fi + done + + max_attempts=5 + attempt_num=1 + success=false + while [ $success = false ] && [ $attempt_num -le $max_attempts ]; do + echo "Trying yum install" + sudo yum install -y docker + # Check the exit code of the command + if [ $? -eq 0 ]; then + echo "Yum install of docker succeeded" + success=true + else + echo "Attempt $attempt_num failed. Sleeping for 3 seconds and trying again..." + sleep 3 + ((attempt_num++)) + fi + done + service docker start usermod -a -G docker ec2-user sudo wget https://repos.fedorapeople.org/repos/dchen/apache-maven/epel-apache-maven.repo -O /etc/yum.repos.d/epel-apache-maven.repo sudo sed -i s/\$releasever/6/g /etc/yum.repos.d/epel-apache-maven.repo - sudo yum install -y apache-maven + + max_attempts=5 + attempt_num=1 + success=false + while [ $success = false ] && [ $attempt_num -le $max_attempts ]; do + echo "Trying yum install" + sudo yum install -y apache-maven + # Check the exit code of the command + if [ $? -eq 0 ]; then + echo "Yum install of maven succeeded" + success=true + else + echo "Attempt $attempt_num failed. Sleeping for 3 seconds and trying again..." + sleep 3 + ((attempt_num++)) + fi + done cd /home/ec2-user su -c "ln -s /usr/bin/python3.8 /usr/bin/python3" -s /bin/sh ec2-user From 019396f4233f2b46f45fb8aa0c377c16a9b30672 Mon Sep 17 00:00:00 2001 From: Manju Arakere <23362982+marakere@users.noreply.github.com> Date: Fri, 10 Jan 2025 09:55:11 -0500 Subject: [PATCH 12/12] Create msk-lambda-iam-java-sam.json --- .../msk-lambda-iam-java-sam.json | 111 ++++++++++++++++++ 1 file changed, 111 insertions(+) create mode 100644 msk-lambda-iam-java-sam/msk-lambda-iam-java-sam.json diff --git a/msk-lambda-iam-java-sam/msk-lambda-iam-java-sam.json b/msk-lambda-iam-java-sam/msk-lambda-iam-java-sam.json new file mode 100644 index 000000000..b9394bbb1 --- /dev/null +++ b/msk-lambda-iam-java-sam/msk-lambda-iam-java-sam.json @@ -0,0 +1,111 @@ +{ + "title": "AWS Lambda function subscribed to an Amazon MSK topic using IAM auth", + "description": "Creates a Lambda function that uses an Amazon MSK topic as an event source with IAM authentication.", + "language": "Java", + "level": "200", + "framework": "SAM", + "introBox": { + "headline": "How it works", + "text": [ + "This pattern provides a Lambda function along with an Event Source Mapping to a Kafka topic.", + "It requires that you already have an Amazon Managed Streaming for Kafka (Amazon MSK) cluster setup with a topic created. ", + "If you don't already have an MSK cluster, you can use the example in this pattern https://serverlessland.com/patterns/msk-cfn-sasl-lambda (linked in the resources) to deploy a cluster.", + "This pattern works with either a Provisioned or Serverless MSK cluster as long as the cluster is configured to use IAM authentication. ", + "For detailed deployment instructions instructions see the README." + ] + }, + "gitHub": { + "template": { + "repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/msk-lambda-iam-java-sam", + "templateURL": "serverless-patterns/msk-lambda-iam-java-sam", + "projectFolder": "msk-lambda-iam-java-sam", + "templateFile": "template.yaml" + } + }, + "resources": { + "bullets": [ + { + "text": "Amazon MSK cluster pattern", + "link": "https://serverlessland.com/patterns/msk-cfn-sasl-lambda" + }, + { + "text": "Using AWS Lambda with Amazon MSK", + "link": "https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html" + }, + { + "text": "AWS CloudFormation Provisioned MSK cluster reference", + "link": "https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-msk-cluster.html" + }, + { + "text": "AWS CloudFormation Serverless MSK cluster reference", + "link": "https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-msk-serverlesscluster.html" + } + ] + }, + "deploy": { + "text": [ + "sam deploy --guided" + ] + }, + "testing": { + "text": [ + "See the GitHub repo for detailed testing instructions." + ] + }, + "cleanup": { + "text": [ + "Delete the template: sam delete." + ] + }, + "authors": [ + { + "name": "Indranil Banerjee", + "bio": "AWS - Senior Solutions Architect", + "image": "https://beta.serverlessland.com/assets/images/resources/contributors/indranil-banerjee.jpg", + "linkedin": "indranil-banerjee-b00a261/" + }, + { + "name": "Vaibhav Jain", + "bio": "AWS - Sr. Application Architect", + "image": "https://beta.serverlessland.com/assets/images/resources/contributors/vaibhav-jain.jpg", + "linkedin": "vaibhavjainv/" + }, + { + "name": "Paveen Allam", + "bio": "Senior Solutions Architect", + "image": "https://www.fintail.me/images/pa.jpg", + "linkedin": "pallam/" + }, + { + "name": "Suraj Tripathi", + "bio": "AWS - AppDev Cloud Consultant", + "image": "https://beta.serverlessland.com/assets/images/resources/contributors/suraj-tripathi.jpg", + "linkedin": "suraj-tripathi-01b49a140/" + }, + { + "name": "Adam Wagner", + "bio": "AWS - Principal Serverless Solutions Architect", + "image": "https://beta.serverlessland.com/assets/images/resources/contributors/adam-wagner.jpg", + "linkedin": "adam-wagner-4bb412/" + } + ], + "patternArch": { + "icon1": { + "x": 20, + "y": 50, + "service": "msk", + "label": "Amazon MSK" + }, + "icon2": { + "x": 80, + "y": 50, + "service": "lambda", + "label": "AWS Lambda" + }, + "line1": { + "from": "icon1", + "to": "icon2", + "label": "IAM Authentication" + } + } +}