diff --git a/README.md b/README.md index d402480..b8e9425 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,6 @@ -# Run a data processing job on Amazon EMR Serverless with AWS Step Functions +# Running a Data Processing Job on EMR Serverless with AWS Step Functions and AWS Lambda using Terraform (By HashiCorp) +*Update Feb 2023* – AWS Step Functions adds direct integration for 35 services including Amazon EMR Serverless. In the current version of this blog, we are able to submit an EMR Serverless job by invoking the APIs directly from a Step Functions workflow. We are using the Lambda only for polling the status of the job in EMR. Read more about this feature enhancement [here](https://aws.amazon.com/about-aws/whats-new/2023/02/aws-step-functions-integration-35-services-emr-serverless/). In this blog we showcase how to build and orchestrate a [Scala](https://www.scala-lang.org/) Spark Application using [Amazon EMR Serverless](https://aws.amazon.com/emr/serverless/) , AWS Step Functions and [Terraform By HashiCorp](https://www.terraform.io/). In this end to end solution we execute a Spark job on EMR Serverless which processes sample click-stream data in Amazon S3 bucket and stores the aggregation results in Amazon S3. @@ -18,8 +19,8 @@ Overview of the steps and the AWS Services used in this solution: * [Amazon EMR Serverless](https://aws.amazon.com/emr/serverless/) Application - provides the option to submit a Spark job. * [AWS Lambda](https://aws.amazon.com/lambda/): * Ingestion Lambda – This lambda processes the incoming request and pushes the data into Firehose stream. - * EMR Start Job Lambda - This lambda starts the EMR Serverless application, the EMR job process converts the ingested user click logs into output in another S3 bucket. -* [AWS Step Functions](https://aws.amazon.com/step-functions) triggers the EMR Start Job Lambda which submits the application to EMR Serverless for processing of the ingested log files. + * EMR Job Status Check Lambda - This lambda does a polling mechanism to check the status of the job that was submitted to EMR Serverless. +* [AWS Step Functions](https://aws.amazon.com/step-functions) Submits the data processing job to an EMR Serverless application and triggers a Lambda which polls to check the status of the submitted job. * [Amazon Simple Storage Service](https://aws.amazon.com/s3/) (Amazon S3) * Firehose Delivery Bucket - Stores the ingested application logs in parquet file format * Loggregator Source Bucket - Stores the scala code/jar for EMR job execution @@ -40,8 +41,8 @@ Overview of the steps and the AWS Services used in this solution: ### Design Decisions -* We use AWS Step Functions and AWS Lambda in this use case to trigger the EMR Serverless Application. In real world, the data processing application could be long running and may exceed AWS Lambda’s execution timeout. Tools like [Amazon Managed Workflows for Apache Airflow (MWAA)](https://aws.amazon.com/managed-workflows-for-apache-airflow/) can be used. Amazon Managed Apache airflow is a managed orchestration service makes it easier to set up and operate end-to-end data pipelines in the cloud at scale -* AWS Lambda Code & EMR Serverless Log Aggregation code are developed using Java & Scala respectively. These can any done using any supported languages in these use cases. +* We use AWS Step Functions and its support for SDK Integrations with EMR Serverless to submit the data processing job to the EMR Serverless Application. +* AWS Lambda Code & EMR Serverless Log Aggregation code are developed using Java & Scala respectively. * AWS CLI V2 is required for querying Amazon EMR Serverless applications from command line. These can be viewed from AWS Console also. A sample CLI command provided below in the “Testing” section below. ### Steps @@ -64,7 +65,7 @@ To run the commands individually Set the application deployment region and account number. An example below. Modify as needed. ``` -$ APP_DIR=$PWD + $ APP_DIR=$PWD $ APP_PREFIX=clicklogger $ STAGE_NAME=dev $ REGION=us-east-1 @@ -74,9 +75,10 @@ $ APP_DIR=$PWD Maven build AWS Lambda Application Jar & Scala Application package ``` -$ cd $APP_DIR/source/clicklogger + $ cd $APP_DIR/source/clicklogger $ mvn clean package -$ sbt reload + $ cd $APP_DIR/source/loggregator + $ sbt reload $ sbt compile $ sbt package ``` @@ -85,14 +87,13 @@ $ sbt reload Deploy the AWS Infrastructure using Terraform ``` -$ terraform init + $ terraform init $ terraform plan $ terraform apply --auto-approve ``` ### Testing - Once the application is built and deployed, you can also insert sample data for the EMR processing. An example as below. Note exec.sh has multiple sample insertions for AWS Lambda. The ingested logs will be used by the EMR Serverless Application job @@ -114,11 +115,16 @@ Validate the Deployments ![Alt text](assets/s3_source_parquet_files.png?raw=true "Title") * Run AWS Step Function to validate the Serverless application - * Open AWS Console > AWS Step Function > Open "clicklogger-dev-state-machine". - * The step function will show the steps that ran to trigger the AWS Lambda and EMR Serverless Application - * Start a new execution to trigger the AWS Lambda and EMR Serverless Application/Job - * Once the AWS Step Function is successful, navigate to Amazon S3 > clicklogger-dev-outputs-bucket- to see the output files. - * These will be partitioned by year/month/date/response.md. A sample is shown below + * Open AWS Console > AWS Step Function > Open "clicklogger-dev-state-machine". + * The step function will show the steps that ran to trigger the AWS Lambda and Job submission to EMR Serverless Application + * Start a new StepFunctions execution to trigger the workflow with the sample input below. Enter the date value equal to the date when sample data was ingested to S3 with the ingest lambda. + ``` + { + "InputDate": "2023-02-08" + } + ``` + * Once the AWS Step Function is successful, navigate to Amazon S3 > -clicklogger-dev-loggregator-output- to see the output files. + * These will be partitioned by year/month/date/response.md. A sample is shown below ![Alt text](assets/s3_output_response_file.png?raw=true "Title") @@ -129,7 +135,6 @@ AWS CLI can be used to check the deployed AWS Serverless Application $ aws emr-serverless list-applications \ | jq -r '.applications[] | select(.name=="clicklogger-dev-loggregrator-emr-").id' - ``` ![Alt text](assets/step_function_success.png?raw=true "Title") @@ -138,7 +143,6 @@ EMR Studio * Open AWS Console, Navigate to “EMR” > “Serverless” tab on the left pane. * Select “clicklogger-dev-studio” and click “Manage Applications” -* The Application created by the stack will be as shown below clicklogger-dev-loggregator-emr- @@ -184,11 +188,19 @@ S3 and created services can be deleted using CLI also. Execute the below command # CLI Commands to delete the S3 -aws s3 rb s3://clicklogger-dev-emr-serverless-logs-bucket- --force -aws s3 rb s3://clicklogger-dev-firehose-delivery-bucket- --force -aws s3 rb s3://clicklogger-dev-loggregator-output-bucket- --force -aws s3 rb s3://clicklogger-dev-loggregator-source-bucket- --force -aws s3 rb s3://clicklogger-dev-loggregator-source-bucket- --force +APP_DIR=$PWD +APP_PREFIX=clicklogger +STAGE_NAME=dev +REGION=us-east-1 + +ACCOUNT_ID=$(aws sts get-caller-identity | jq -r '.Account') +echo $ACCOUNT_ID + +aws s3 rb s3://$REGION-$APP_PREFIX-$STAGE_NAME-emr-logs-$ACCOUNT_ID --force +aws s3 rb s3://$REGION-$APP_PREFIX-$STAGE_NAME-firehose-delivery-$ACCOUNT_ID --force +aws s3 rb s3://$REGION-$APP_PREFIX-$STAGE_NAME-loggregator-output-$ACCOUNT_ID --force +aws s3 rb s3://$REGION-$APP_PREFIX-$STAGE_NAME-loggregator-source-$ACCOUNT_ID --force +aws s3 rb s3://$REGION-$APP_PREFIX-$STAGE_NAME-emr-studio-$ACCOUNT_ID --force # Destroy the AWS Infrastructure terraform destroy --auto-approve diff --git a/assets/AWSStepFunction.png b/assets/AWSStepFunction.png deleted file mode 100644 index f4feea4..0000000 Binary files a/assets/AWSStepFunction.png and /dev/null differ diff --git a/assets/AWSStepFunction.png.license b/assets/AWSStepFunction.png.license deleted file mode 100644 index 182df80..0000000 --- a/assets/AWSStepFunction.png.license +++ /dev/null @@ -1,3 +0,0 @@ -SPDX-FileCopyrightText: Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved - -SPDX-License-Identifier: MIT-0 \ No newline at end of file diff --git a/assets/emr-serverless-click-logs-from-web-application.drawio.png b/assets/emr-serverless-click-logs-from-web-application.drawio.png index 14500f3..82b3933 100644 Binary files a/assets/emr-serverless-click-logs-from-web-application.drawio.png and b/assets/emr-serverless-click-logs-from-web-application.drawio.png differ diff --git a/assets/step_function_caught.png b/assets/step_function_caught.png deleted file mode 100644 index 5030a92..0000000 Binary files a/assets/step_function_caught.png and /dev/null differ diff --git a/assets/step_function_caught.png.license b/assets/step_function_caught.png.license deleted file mode 100644 index 182df80..0000000 --- a/assets/step_function_caught.png.license +++ /dev/null @@ -1,3 +0,0 @@ -SPDX-FileCopyrightText: Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved - -SPDX-License-Identifier: MIT-0 \ No newline at end of file diff --git a/assets/step_function_success.png b/assets/step_function_success.png index 9175129..550e08f 100644 Binary files a/assets/step_function_success.png and b/assets/step_function_success.png differ diff --git a/assets/step_function_uncaught.png b/assets/step_function_uncaught.png deleted file mode 100644 index 3ea4e4e..0000000 Binary files a/assets/step_function_uncaught.png and /dev/null differ diff --git a/assets/step_function_uncaught.png.license b/assets/step_function_uncaught.png.license deleted file mode 100644 index 182df80..0000000 --- a/assets/step_function_uncaught.png.license +++ /dev/null @@ -1,3 +0,0 @@ -SPDX-FileCopyrightText: Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved - -SPDX-License-Identifier: MIT-0 \ No newline at end of file diff --git a/source/clicklogger/src/main/java/com/clicklogs/Handlers/ClickLoggerEMRJobHandler.java b/source/clicklogger/src/main/java/com/clicklogs/Handlers/ClickLoggerEMRJobHandler.java index 63639dc..554f731 100644 --- a/source/clicklogger/src/main/java/com/clicklogs/Handlers/ClickLoggerEMRJobHandler.java +++ b/source/clicklogger/src/main/java/com/clicklogs/Handlers/ClickLoggerEMRJobHandler.java @@ -3,8 +3,7 @@ import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.LambdaLogger; import com.amazonaws.services.lambda.runtime.RequestHandler; -import com.clicklogs.model.StartJobRequest; -import software.amazon.awssdk.regions.Region; +import com.clicklogs.model.JobStatusRequest; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient; import software.amazon.awssdk.services.emrserverless.EmrServerlessClient; @@ -17,184 +16,100 @@ import org.apache.commons.lang3.StringUtils; import java.lang.Thread; -import java.text.Format; -import java.text.SimpleDateFormat; -import java.util.Arrays; -import java.util.Date; /* * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * * SPDX-License-Identifier: MIT-0 */ - -public class ClickLoggerEMRJobHandler implements RequestHandler{ - - private String applicationName = "clicklogger-dev-emr-serverless-application"; - private String applicationId = ""; - private String executionRoleArn = ""; - private String entrypoint = ""; - private String mainClass = ""; - private String outputBucket = ""; - private String sourceBucket = ""; - private String logsOutputPath = ""; - private Integer emrJobTimout = 5000; - - Gson gson = new GsonBuilder().setPrettyPrinting().create(); - - LambdaLogger logger = null; - @Override - public ClickLogResponse handleRequest(final StartJobRequest startJobRequest, final Context context) { - logger = context.getLogger(); - final String success_response = new String("200 OK"); - final String fail_response = new String("400 ERROR"); - - ResponseBuilder responseBuilder = new ResponseBuilder(); - ClickLogResponse response = responseBuilder.badRequest(fail_response).build(); - - - logger.log("Incoming request - " + gson.toJson(startJobRequest)); - - String envAppName = System.getenv("APPLICATION_NAME"); - if(!StringUtils.isBlank(envAppName)) - { - applicationName = envAppName; - } - String envAppId = System.getenv("APPLICATION_ID"); - if(!StringUtils.isBlank(envAppId)) - { - applicationId = envAppId; - } +public class ClickLoggerEMRJobHandler implements RequestHandler { - String envExecRole = System.getenv("EXECUTION_ROLE_ARN"); - if(!StringUtils.isBlank(envExecRole)) - { - executionRoleArn = envExecRole; - } + Gson gson = new GsonBuilder().setPrettyPrinting().create(); + LambdaLogger logger = null; + private String applicationId = ""; + private String logsOutputPath = ""; + private Integer emrJobTimout = 5000; - String envEntryPoint = System.getenv("ENTRY_POINT"); - if(!StringUtils.isBlank(envEntryPoint)) - { - entrypoint = envEntryPoint; - } + @Override + public ClickLogResponse handleRequest(final JobStatusRequest jobStatusRequest, final Context context) { + logger = context.getLogger(); + final String success_response = new String("200 OK"); + final String fail_response = new String("400 ERROR"); - String envMainClass = System.getenv("MAIN_CLASS"); - if(!StringUtils.isBlank(envMainClass)) - { - mainClass = envMainClass; - } + ResponseBuilder responseBuilder = new ResponseBuilder(); + ClickLogResponse response = responseBuilder.badRequest(fail_response).build(); - String envOutputBucket = System.getenv("OUTPUT_BUCKET"); - if(!StringUtils.isBlank(envOutputBucket)) - { - outputBucket = envOutputBucket; - } + logger.log("Incoming request - " + gson.toJson(jobStatusRequest)); - String envSourceBucket = System.getenv("SOURCE_BUCKET"); - if(!StringUtils.isBlank(envSourceBucket)) - { - sourceBucket = envSourceBucket; - } + String envAppId = System.getenv("APPLICATION_ID"); + if (!StringUtils.isBlank(envAppId)) { + applicationId = envAppId; + } - String envLogsOutputBucket = System.getenv("LOGS_OUTPUT_PATH"); - if(!StringUtils.isBlank(envLogsOutputBucket)) - { - logsOutputPath = envLogsOutputBucket; - } + String envLogsOutputBucket = System.getenv("LOGS_OUTPUT_PATH"); + if (!StringUtils.isBlank(envLogsOutputBucket)) { + logsOutputPath = envLogsOutputBucket; + } - emrJobTimout = Integer.parseInt(System.getenv("EMR_GET_SLEEP_TIME")); - - String datetime = ""; - if (startJobRequest == null || - (startJobRequest != null && StringUtils.isBlank(startJobRequest.getDate()))) { - logger.log("setting default createdtime"); - Format f = new SimpleDateFormat("yyyy-MM-dd"); - datetime = f.format(new Date()); - } - else{ - datetime = startJobRequest.getDate(); - } + emrJobTimout = Integer.parseInt(System.getenv("EMR_GET_SLEEP_TIME")); - logger.log("Starting EMR Serverless job for the date - "+ datetime); + String jobRunId = ""; + if (!StringUtils.isBlank(jobStatusRequest.getJobRunId())) { + jobRunId = jobStatusRequest.getJobRunId(); + } - try { - startEMRJobRun(datetime, applicationId, applicationName, - executionRoleArn, mainClass, - entrypoint, sourceBucket, outputBucket, emrJobTimout); - } catch (InterruptedException e) { - logger.log("Error occurred starting EMR Job"); - throw new ClickLoggerException("Error occurred starting EMR Job"); + logger.log("Checking EMR Serverless job status for Job Run ID: " + jobRunId); + + try { + CheckEMRJobStatus(applicationId, emrJobTimout, jobRunId); + } catch (InterruptedException e) { + logger.log("Error occurred checking EMR Job status."); + throw new ClickLoggerException("Error occurred checking EMR Job status."); + } + logger.log("Stopping application"); + stopApplication(applicationId); + logger.log(success_response); + responseBuilder = new ResponseBuilder(); + responseBuilder.ok(); + response = responseBuilder.originHeader("*").build(); + logger.log("Returning response " + gson.toJson(response)); + return response; } - logger.log("Stopping application"); - stopApplication(applicationId); - logger.log(success_response); - responseBuilder = new ResponseBuilder(); - responseBuilder.ok(); - response = responseBuilder.originHeader("*").build(); - logger.log("Returning response " + gson.toJson(response)); - return response; - } - - private void startEMRJobRun(String inputDate, - String applicationId, String applicationName, String executionRoleArn, - String mainClass, String entrypoint, String sourceBucket, - String outputBucket, Integer emrJobTimout) throws InterruptedException { - EmrServerlessClient client = getClient(); - - S3MonitoringConfiguration s3MonitoringConfiguration = S3MonitoringConfiguration.builder().logUri(logsOutputPath).build(); - StartJobRunRequest jobRunRequest = StartJobRunRequest.builder() - .name(applicationName) - .applicationId(applicationId) - .executionRoleArn(executionRoleArn) - .jobDriver( - JobDriver.fromSparkSubmit(SparkSubmit.builder() - .entryPoint(entrypoint) - .entryPointArguments(Arrays.asList(inputDate, sourceBucket, outputBucket)) - .sparkSubmitParameters(mainClass) - .build()) - ) - .configurationOverrides(ConfigurationOverrides.builder() - .monitoringConfiguration(MonitoringConfiguration.builder() - .s3MonitoringConfiguration(s3MonitoringConfiguration) - .build()).build() - .toBuilder().build()).build(); - - logger.log("Starting job run"); - StartJobRunResponse response = client.startJobRun(jobRunRequest); - - String jobRunId = response.jobRunId(); - GetJobRunRequest getJobRunRequest = GetJobRunRequest.builder() - .applicationId(applicationId) - .jobRunId(jobRunId) - .build(); - GetJobRunResponse jobRunResponse = client.getJobRun(getJobRunRequest); - - while(true){ - Thread.sleep(emrJobTimout); - jobRunResponse = client.getJobRun(getJobRunRequest); - - if(jobRunResponse != null){ - JobRunState jobState = jobRunResponse.jobRun().state(); - if(jobState.name().equals("SUCCESS") || jobState.name().equals("FAILED") || - jobState.name().equals("CANCELLING") || jobState.name().equals("CANCELLED")){ - logger.log("Job Completed Successfully!"); - break; + + private void CheckEMRJobStatus(String applicationId, Integer emrJobTimout, String jobRunId) throws InterruptedException { + EmrServerlessClient client = getClient(); + GetJobRunRequest getJobRunRequest = GetJobRunRequest.builder() + .applicationId(applicationId) + .jobRunId(jobRunId) + .build(); + GetJobRunResponse jobRunResponse = client.getJobRun(getJobRunRequest); + + while (true) { + Thread.sleep(emrJobTimout); + jobRunResponse = client.getJobRun(getJobRunRequest); + + if (jobRunResponse != null) { + JobRunState jobState = jobRunResponse.jobRun().state(); + if (jobState.name().equals("SUCCESS") || jobState.name().equals("FAILED") || + jobState.name().equals("CANCELLING") || jobState.name().equals("CANCELLED")) { + logger.log("Job Completed!!"); + break; + } + } } - } } - } - - private StopApplicationResponse stopApplication(String applicationId) { - EmrServerlessClient client = getClient(); - StopApplicationRequest stopApp = StopApplicationRequest.builder().applicationId(applicationId).build(); - return client.stopApplication(stopApp); - } - - private EmrServerlessClient getClient() { - return EmrServerlessClient.builder() - .credentialsProvider(DefaultCredentialsProvider.create()) - .httpClient(UrlConnectionHttpClient.builder().build()) - .build(); - } + + private StopApplicationResponse stopApplication(String applicationId) { + EmrServerlessClient client = getClient(); + StopApplicationRequest stopApp = StopApplicationRequest.builder().applicationId(applicationId).build(); + return client.stopApplication(stopApp); + } + + private EmrServerlessClient getClient() { + return EmrServerlessClient.builder() + .credentialsProvider(DefaultCredentialsProvider.create()) + .httpClient(UrlConnectionHttpClient.builder().build()) + .build(); + } } \ No newline at end of file diff --git a/source/clicklogger/src/main/java/com/clicklogs/model/JobStatusRequest.java b/source/clicklogger/src/main/java/com/clicklogs/model/JobStatusRequest.java new file mode 100644 index 0000000..f9b6740 --- /dev/null +++ b/source/clicklogger/src/main/java/com/clicklogs/model/JobStatusRequest.java @@ -0,0 +1,20 @@ +package com.clicklogs.model; + +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * SPDX-License-Identifier: MIT-0 + */ + +public class JobStatusRequest { + + public String jobRunId; + + public String getJobRunId() { + return jobRunId; + } + + public void setJobRunId(String jobRunId) { + this.jobRunId = jobRunId; + } +} \ No newline at end of file diff --git a/source/clicklogger/src/main/java/com/clicklogs/model/StartJobRequest.java b/source/clicklogger/src/main/java/com/clicklogs/model/StartJobRequest.java deleted file mode 100644 index 6eec0d0..0000000 --- a/source/clicklogger/src/main/java/com/clicklogs/model/StartJobRequest.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.clicklogs.model; - -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * SPDX-License-Identifier: MIT-0 - */ - -public class StartJobRequest { - - public String date; - - public String getDate() { - return date; - } - - public void setDate(String date) { - this.date = date; - } -} \ No newline at end of file diff --git a/terraform/templates/firehose.tf b/terraform/templates/firehose.tf index 083958b..a562feb 100644 --- a/terraform/templates/firehose.tf +++ b/terraform/templates/firehose.tf @@ -15,8 +15,8 @@ resource "aws_kinesis_firehose_delivery_stream" "click_logger_firehose_delivery_ buffer_interval = 60 cloudwatch_logging_options { enabled = true - log_group_name = "/aws/kinesis_firehose_delivery_stream/click_logger_firehose_delivery_stream" - log_stream_name = "${var.app_prefix}-${var.stage_name}_firehose_delivery_stream" + log_group_name = aws_cloudwatch_log_group.click_logger_firehose_delivery_stream_log_group.name + log_stream_name = aws_cloudwatch_log_stream.click_logger_firehose_delivery_stream.name } compression_format = "UNCOMPRESSED" prefix = "clicklog/data=!{timestamp:yyyy}-!{timestamp:MM}-!{timestamp:dd}/" diff --git a/terraform/templates/lambda.tf b/terraform/templates/lambda.tf index ece6830..fcc89cc 100644 --- a/terraform/templates/lambda.tf +++ b/terraform/templates/lambda.tf @@ -3,6 +3,7 @@ ### SPDX-License-Identifier: MIT-0 resource "aws_lambda_function" "lambda_clicklogger_ingest" { + description = "Lambda to ingest data." filename = var.lambda_source_zip_path function_name = "${var.app_prefix}-${var.stage_name}-ingestion-lambda" role = aws_iam_role.click_logger_lambda_role.arn @@ -29,10 +30,10 @@ resource "aws_lambda_function" "lambda_clicklogger_ingest" { } } -resource "aws_lambda_function" "lambda_clicklogger_emr_start_job" { - description = "Lambda to accept request to submit a job to an EMR Serverless cluster." +resource "aws_lambda_function" "lambda_clicklogger_emr_job_status" { + description = "Lambda to check status of job on EMR Serverless cluster." filename = var.lambda_source_zip_path - function_name = "${var.app_prefix}-${var.stage_name}-emr-start-job-lambda" + function_name = "${var.app_prefix}-${var.stage_name}-emr-job-status-lambda" role = aws_iam_role.click_logger_emr_lambda_role.arn handler = "com.clicklogs.Handlers.ClickLoggerEMRJobHandler::handleRequest" runtime = "java8" @@ -44,13 +45,7 @@ resource "aws_lambda_function" "lambda_clicklogger_emr_start_job" { environment { variables = { - APPLICATION_NAME = "${var.app_prefix}-${var.stage_name}-emr-serverless-application" APPLICATION_ID = aws_emrserverless_application.click_log_loggregator_emr_serverless.id - EXECUTION_ROLE_ARN = aws_iam_role.click_logger_emr_serverless_role.arn - ENTRY_POINT = "s3://${aws_s3_bucket.click_log_loggregator_source_s3_bucket.id}/${var.loggregator_jar}" - MAIN_CLASS = "--class com.examples.clicklogger.Loggregator" - OUTPUT_BUCKET = aws_s3_bucket.click_log_loggregator_output_s3_bucket.id - SOURCE_BUCKET = aws_s3_bucket.click_logger_firehose_delivery_s3_bucket.id LOGS_OUTPUT_PATH = "s3://${aws_s3_bucket.click_log_loggregator_emr_serverless_logs_s3_bucket.id}" REGION = data.aws_region.current.name EMR_GET_SLEEP_TIME = 5000 @@ -68,5 +63,5 @@ output "lambda-clicklogger-ingest" { } output "lambda-clicklogger-emr-job" { - value = aws_lambda_function.lambda_clicklogger_emr_start_job + value = aws_lambda_function.lambda_clicklogger_emr_job_status } \ No newline at end of file diff --git a/terraform/templates/roles.tf b/terraform/templates/roles.tf index 39e0a35..26d6721 100644 --- a/terraform/templates/roles.tf +++ b/terraform/templates/roles.tf @@ -4,8 +4,8 @@ data "aws_iam_policy_document" "AWSLambdaTrustPolicy" { statement { - actions = ["sts:AssumeRole"] - effect = "Allow" + actions = ["sts:AssumeRole"] + effect = "Allow" principals { type = "Service" identifiers = ["lambda.amazonaws.com"] @@ -14,7 +14,7 @@ data "aws_iam_policy_document" "AWSLambdaTrustPolicy" { } resource "aws_iam_role" "click_logger_emr_lambda_role" { - name = "${var.app_prefix}-${var.stage_name}-lambda-emr-role" + name = "${var.app_prefix}-${var.stage_name}-lambda-emr-role" assume_role_policy = data.aws_iam_policy_document.AWSLambdaTrustPolicy.json } @@ -65,15 +65,10 @@ EOF } resource "aws_iam_role" "click_logger_lambda_role" { - name = "${var.app_prefix}-${var.stage_name}-lambda-role" + name = "${var.app_prefix}-${var.stage_name}-lambda-role" assume_role_policy = data.aws_iam_policy_document.AWSLambdaTrustPolicy.json } -# resource "aws_iam_role_policy_attachment" "click_loggerlambda_policy" { -# role = aws_iam_role.click_logger_lambda_role.name -# policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" -# } - resource "aws_iam_role_policy_attachment" "click_logger_lambda_iam_role_policy_attachment_vpc_access_execution" { role = aws_iam_role.click_logger_lambda_role.name policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole" @@ -147,6 +142,7 @@ resource "aws_iam_role_policy" "click_logger_stream_consumer_inline_policy" { "glue:GetDataBases", "glue:CreateTable", "glue:GetTable", + "glue:GetTableVersions", "glue:UpdateTable", "glue:DeleteTable", "glue:GetTables", @@ -165,8 +161,8 @@ EOF data "aws_iam_policy_document" "click_logger_emr_s3_and_glue_inline_policy" { statement { - actions = ["sts:AssumeRole"] - effect = "Allow" + actions = ["sts:AssumeRole"] + effect = "Allow" principals { type = "Service" identifiers = ["emr-serverless.amazonaws.com"] @@ -175,7 +171,7 @@ data "aws_iam_policy_document" "click_logger_emr_s3_and_glue_inline_policy" { } resource "aws_iam_role" "click_logger_emr_serverless_role" { - name = "${var.app_prefix}-${var.stage_name}-emr-serverless-role" + name = "${var.app_prefix}-${var.stage_name}-emr-serverless-role" assume_role_policy = data.aws_iam_policy_document.click_logger_emr_s3_and_glue_inline_policy.json } @@ -246,18 +242,18 @@ EOF data "aws_iam_policy_document" "lambda_clicklogger_emr_sfn_start_job_policy" { statement { - actions = ["sts:AssumeRole"] - effect = "Allow" + actions = ["sts:AssumeRole"] + effect = "Allow" principals { type = "Service" - identifiers = ["states.amazonaws.com"] + identifiers = ["states.amazonaws.com", "preprod.states.aws.internal"] } } } resource "aws_iam_role" "lambda_clicklogger_emr_sfn_start_job_role" { - name = "${var.app_prefix}-${var.stage_name}-sfn-lambda-role" + name = "${var.app_prefix}-${var.stage_name}-sfn-lambda-role" assume_role_policy = data.aws_iam_policy_document.lambda_clicklogger_emr_sfn_start_job_policy.json } @@ -287,7 +283,7 @@ resource "aws_iam_role_policy" "lambda_clicklogger_emr_sfn_start_job_inline_poli "lambda:InvokeFunction" ], "Resource": [ - "${aws_lambda_function.lambda_clicklogger_emr_start_job.arn}" + "${aws_lambda_function.lambda_clicklogger_emr_job_status.arn}" ] } ] @@ -295,11 +291,68 @@ resource "aws_iam_role_policy" "lambda_clicklogger_emr_sfn_start_job_inline_poli EOF } +resource "aws_iam_role_policy" "clicklogger_emr_sfn_start_job_inline_policy" { + name = "${var.app_prefix}-${var.stage_name}-emr-job-inline_policy" + role = aws_iam_role.lambda_clicklogger_emr_sfn_start_job_role.id + policy = <