diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..43d7415 --- /dev/null +++ b/.gitignore @@ -0,0 +1,21 @@ +.DS_Store +.idea +.mvn +.terraform +.vscode +terraform.tfstate.backup +.terraform.lock.hcl +target +out +source/clicklogger/src/main/.DS_Store +source/clicklogger/src/main/java/.DS_Store +source/clicklogger/src/main/java/com/.DS_Store +source/clicklogger/src/main/java/com/clicklogs/.DS_Store +terraform/workspaces/us-east-1/terraform.tfstate +terraform/workspaces/us-east-1/.terraform/* +terraform/workspaces/us-east-1/.terraform* +source/clicklogger/src/test/.DS_Store +source/clicklogger/src/test/java/.DS_Store +source/clicklogger/src/test/java/com/.DS_Store +assets/.$emr-serverless-click-logs-from-web-application.drawio.bkp +assets/.$emr-serverless-click-logs-from-web-application.drawio.dtmp diff --git a/HELP.md b/HELP.md new file mode 100644 index 0000000..af11780 --- /dev/null +++ b/HELP.md @@ -0,0 +1,17 @@ +# Getting Started + +### Reference Documentation +For further reference, please consider the following sections: + +* [Official Apache Maven documentation](https://maven.apache.org/guides/index.html) +* [Spring Boot Maven Plugin Reference Guide](https://docs.spring.io/spring-boot/docs/2.4.2/maven-plugin/reference/html/) +* [Create an OCI image](https://docs.spring.io/spring-boot/docs/2.4.2/maven-plugin/reference/html/#build-image) +* [Spring Web](https://docs.spring.io/spring-boot/docs/2.4.2/reference/htmlsingle/#boot-features-developing-web-applications) + +### Guides +The following guides illustrate how to use some features concretely: + +* [Building a RESTful Web Service](https://spring.io/guides/gs/rest-service/) +* [Serving Web Content with Spring MVC](https://spring.io/guides/gs/serving-web-content/) +* [Building REST services with Spring](https://spring.io/guides/tutorials/bookmarks/) + diff --git a/LICENSES/MIT-0.txt b/LICENSES/MIT-0.txt new file mode 100644 index 0000000..ea279d0 --- /dev/null +++ b/LICENSES/MIT-0.txt @@ -0,0 +1,15 @@ +SPDX-FileCopyrightText: Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + diff --git a/README.md b/README.md index 7f92204..569aab7 100644 --- a/README.md +++ b/README.md @@ -1,17 +1,216 @@ -## My Project +# Running a Data Processing Job on EMR Serverless with AWS Step Functions and AWS Lambda using Terraform (By HashiCorp) -TODO: Fill this README out! -Be sure to: +In this blog we showcase how to build and orchestrate a [Scala](https://www.scala-lang.org/) Spark Application using [Amazon EMR Serverless](https://aws.amazon.com/emr/serverless/) , AWS Step Functions and [Terraform By HashiCorp](https://www.terraform.io/). In this end to end solution we execute a Spark job on EMR Serverless which processes sample click-stream data in Amazon S3 bucket and stores the aggregation results in Amazon S3. + +With EMR Serverless, customers don’t have to configure, optimize, secure, or operate clusters to run applications. You will continue to get the benefits of [Amazon EMR](https://aws.amazon.com/emr/), such as open source compatibility, concurrency, and optimized runtime performance for popular data frameworks. EMR Serverless is suitable for customers who want ease in operating applications using open-source frameworks. It offers quick job startup, automatic capacity management, and straightforward cost controls. + +There are several ‘infrastructure as code’ frameworks available today, to help customers define their infrastructure, such as the AWS CDK or Terraform. Terraform, an AWS Partner Network (APN) Advanced Technology Partner and member of the AWS DevOps Competency, is an infrastructure as code tool similar to AWS CloudFormation that allows you to create, update, and version your AWS infrastructure. Terraform provides friendly syntax (similar to [AWS CloudFormation](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/Welcome.html)) along with other features like planning (visibility to see the changes before they actually happen), graphing, ability to create templates to break infra configurations into smaller chunks which allows better maintenance and reusability. We will leverage the capabilities and features of Terraform to build an API based ingestion process into AWS. Let’s get started! + +We will provide the Terraform infrastructure definition and the source code for an AWS Lambda using which sample customer user clicks for online website inputs are ingested into an [Amazon Kinesis Data Firehose](https://aws.amazon.com/kinesis/data-firehose/). The solution leverages Firehose’s capability to convert the incoming data into a Parquet file (an open-source file format for Hadoop) before pushing it to [Amazon S3](https://aws.amazon.com/s3/) using [AWS Glue](https://aws.amazon.com/glue/) catalog. The generated output S3 Parquet file logs are then processed by an EMR Serverless process and outputs a report detailing aggregate click stream statistics in S3 bucket. The EMR serverless operation is triggered using [AWS Step Functions](https://aws.amazon.com/step-functions). The sample architecture and code will be spun up as below. + +Provided samples have the source code for building the infrastructure using Terraform for running the Amazon EMR Application. Setup scripts are provided to create the sample ingestion using AWS Lambda for incoming application logs. Similar ingestion pattern sample was terraformed in an earlier [blog](https://aws.amazon.com/blogs/developer/provision-aws-infrastructure-using-terraform-by-hashicorp-an-example-of-web-application-logging-customer-data/). + +Overview of the steps and the AWS Services used in this solution: -* Change the title in this README -* Edit your repository description on GitHub +* Java source build – Provided application code is packaged & built using Apache Maven +* Terraform commands are used to deploy the infrastructure in AWS. +* [Amazon EMR Serverless](https://aws.amazon.com/emr/serverless/) Application - provides the option to submit a Spark job. +* [AWS Lambda](https://aws.amazon.com/lambda/): + * Ingestion Lambda – This lambda processes the incoming request and pushes the data into Firehose stream. + * EMR Start Job Lambda - This lambda starts the EMR Serverless application, the EMR job process converts the ingested user click logs into output in another S3 bucket. +* [AWS Step Functions](https://aws.amazon.com/step-functions) triggers the EMR Start Job Lambda which submits the application to EMR Serverless for processing of the ingested log files. +* [Amazon Simple Storage Service](https://aws.amazon.com/s3/) (Amazon S3) + * Firehose Delivery Bucket - Stores the ingested application logs in parquet file format + * Loggregator Source Bucket - Stores the scala code/jar for EMR job execution + * Loggregator Output Bucket - EMR processed output is stored in this bucket + * EMR Serverless logs Bucket - Stores EMR process application logs +* Sample AWS Invoke commands (run as part of initial set up process) inserts the data using the Ingestion Lambda and Firehose stream converts the incoming stream into a Parquet file and stored in an S3 bucket -## Security + +![Alt text](assets/emr-serverless-click-logs-from-web-application.drawio.png?raw=true "Title") +### Prerequisites -See [CONTRIBUTING](CONTRIBUTING.md#security-issue-notifications) for more information. +* [AWS Cli](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html) - At the time of writing this article version 2.7.18 was used. This will be required to query aws emr-serverless cli commands from your local machine. Optionally all the AWS Services used in this blog can be viewed/operated from AWS Console also. +* Make sure to have [Java](https://www.java.com/en/download/) installed, JDK/JRE 8 is set in the environment path of your machine. For instructions, see [Java Development Kit](https://www.java.com/en/download/) +* [Apache Maven](https://maven.apache.org/download.cgi) – Java Lambdas are built using mvn packages and are deployed using Terraform into AWS +* [Scala Build Tool](https://www.scala-sbt.org/download.html) (sbt) - Version 1.4.7 is used at the time of this article. Make sure to download and install based on your operating system needs. +* Set up [Terraform](https://www.terraform.io/downloads). For steps, see Terraform downloads. Version 1.2.5 is used at the time of this article. +* An [AWS Account](https://aws.amazon.com/free/) -## License +### Design Decisions -This library is licensed under the MIT-0 License. See the LICENSE file. +* We use AWS Step Functions and AWS Lambda in this use case to trigger the EMR Serverless Application. In real world, the data processing application could be long running and may exceed AWS Lambda’s execution timeout. Tools like [Amazon Managed Workflows for Apache Airflow (MWAA)](https://aws.amazon.com/managed-workflows-for-apache-airflow/) can be used. Amazon Managed Apache airflow is a managed orchestration service makes it easier to set up and operate end-to-end data pipelines in the cloud at scale +* AWS Lambda Code & EMR Serverless Log Aggregation code are developed using Java & Scala respectively. These can any done using any supported languages in these use cases. +* AWS CLI V2 is required for querying Amazon EMR Serverless applications from command line. These can be viewed from AWS Console also. A sample CLI command provided below in the “Testing” section below. +### Steps + + + Clone [this repository](https://github.com/aws-samples/aws-emr-serverless-using-terraform) and execute the below command to spin up the infrastructure and the application +Provided “exec.sh” shell script builds the Java application jar (For the Lambda Ingestion), the Scala application Jar (For the EMR Processing) and deploys the AWS Infrastructure that is needed for this use case. + +Execute the below commands + + +``` +$ chmod +x exec.sh +$ ./exec.sh +``` + + +To run the commands individually + +Set the application deployment region and account number. An example below. Modify as needed. + +``` +$ APP_DIR=$PWD + $ APP_PREFIX=clicklogger + $ STAGE_NAME=dev + $ REGION=us-east-1 + $ ACCOUNT_ID=$(aws sts get-caller-identity | jq -r '.Account') +``` + +Maven build AWS Lambda Application Jar & Scala Application package + +``` +$ cd $APP_DIR/source/clicklogger + $ mvn clean package +$ sbt reload + $ sbt compile + $ sbt package +``` + + +Deploy the AWS Infrastructure using Terraform + +``` +$ terraform init + $ terraform plan + $ terraform apply --auto-approve +``` + +### Testing + + + + Once the application is built and deployed, you can also insert sample data for the EMR processing. An example as below. Note exec.sh has multiple sample insertions for AWS Lambda. The ingested logs will be used by the EMR Serverless Application job + +Below sample AWS CLI Invoke command inserts sample data for the application logs + +``` +aws lambda invoke --function-name clicklogger-dev-ingestion-lambda —cli-binary-format raw-in-base64-out —payload '{"requestid":"OAP-guid-001","contextid":"OAP-ctxt-001","callerid":"OrderingApplication","component":"login","action":"load","type":"webpage"}' out +``` + +Validate the Deployments + +* Output – Once the Lambda is successfully executed, you should see the output in S3 buckets as shown below +* Validate the saved ingested data as below + * Navigate to the bucket created as part of the stack. + * Select the file and view the file from “Select From” sub tab. + * You should see something ingested stream got converted into parquet file. * + * Select the file and view the data. A sample is shown below + +![Alt text](assets/s3_source_parquet_files.png?raw=true "Title") + +* Run AWS Step Function to validate the Serverless application + * Open AWS Console > AWS Step Function > Open "clicklogger-dev-state-machine". + * The step function will show the steps that ran to trigger the AWS Lambda and EMR Serverless Application + * Start a new execution to trigger the AWS Lambda and EMR Serverless Application/Job + * Once the AWS Step Function is successful, navigate to Amazon S3 > clicklogger-dev-outputs-bucket- to see the output files. + * These will be partitioned by year/month/date/response.md. A sample is shown below + +![Alt text](assets/s3_output_response_file.png?raw=true "Title") + + +AWS CLI can be used to check the deployed AWS Serverless Application + +``` +$ aws emr-serverless list-applications \ + | jq -r '.applications[] | select(.name=="clicklogger-dev-loggregrator-emr-").id' + + +``` + +![Alt text](assets/step_function_success.png?raw=true "Title") + +EMR Studio + +* Open AWS Console, Navigate to “EMR” > “Serverless” tab on the left pane. +* Select “clicklogger-dev-studio” and click “Manage Applications” + + + +![Alt text](assets/EMRStudioApplications.png?raw=true "Title") + +![Alt text](assets/EMRServerlessApplication.png?raw=true "Title") + +Reviewing the Serverless Application Output: + + +* Open AWS Console, Navigate to Amazon S3 +* Open the outputs S3 bucket. This will be like - us-east-1-clicklogger-dev-loggregator-output- +* The EMR Serverless application writes the output based on the date partition as below + * 2022/07/28/response.md + * Output of the file will be like below + +``` + + |*createdTime*|*callerid*|*component*|*count* + |------------|-----------|-----------|------- + *07-28-2022*|OrderingApplication|checkout|2 + *07-28-2022*|OrderingApplication|login|2 + *07-28-2022*|OrderingApplication|products|2 +``` + +## Cleanup + + +Provided "./cleanup.sh" has the required steps to delete all the files from Amazon S3 buckets that were created as part of this blog. terraform destroy command will clean up the AWS infrastructure those were spun up as mentioned above + + +``` +$ chmod +x cleanup.sh +$ ./cleanup.sh +``` + +* To do the steps manually, + +S3 and created services can be deleted using CLI also. Execute the below commands (an example below, modify as needed): + +``` + + +# CLI Commands to delete the S3 + +aws s3 rb s3://clicklogger-dev-emr-serverless-logs-bucket- --force +aws s3 rb s3://clicklogger-dev-firehose-delivery-bucket- --force +aws s3 rb s3://clicklogger-dev-loggregator-output-bucket- --force +aws s3 rb s3://clicklogger-dev-loggregator-source-bucket- --force +aws s3 rb s3://clicklogger-dev-loggregator-source-bucket- --force + +# Destroy the AWS Infrastructure +terraform destroy --auto-approve + + +``` + + + +## Conclusion + + +To recap, in this post we built, deployed & ran a data processing spark job in Amazon EMR Serverless that interacts with various AWS Services. The post walked through deploying a lambda packaged with Java using maven, a Scala application code for EMR Serverless Application triggered with AWS Step Functions with infrastructure as code. You may use any combination of applicable programming languages to build your lambda functions, EMR Job application. EMR Serverless can be triggered manually, automated or can be orchestrated using AWS Services like AWS Step Function, Amazon Managed Apache airflow, etc., + +We encourage you to test this example and see for yourself how this overall application design works within AWS. Then, it will be just the matter of replacing your individual code base, package them and let the Amazon EMR Serverless handle the process efficiently. + +If you implement this example and run into any issues, or have any questions or feedback about this blog please provide your comments below! + +## References + +* [Terraform: Beyond the basics with AWS](https://aws.amazon.com/blogs/apn/terraform-beyond-the-basics-with-aws/) +* [Amazon EMR Serverless General Availability](https://aws.amazon.com/about-aws/whats-new/2022/06/amazon-emr-serverless-generally-available/) +* [Amazon EMR Serverless Now Generally Available – Run Big Data Applications without Managing Servers](https://aws.amazon.com/blogs/aws/amazon-emr-serverless-now-generally-available-run-big-data-applications-without-managing-servers/) +* [Provision AWS infrastructure using Terraform (By HashiCorp): an example of web application logging customer data](https://aws.amazon.com/blogs/developer/provision-aws-infrastructure-using-terraform-by-hashicorp-an-example-of-web-application-logging-customer-data/) + + diff --git a/assets/AWSStepFunction.png b/assets/AWSStepFunction.png new file mode 100644 index 0000000..f4feea4 Binary files /dev/null and b/assets/AWSStepFunction.png differ diff --git a/assets/AWSStepFunction.png.license b/assets/AWSStepFunction.png.license new file mode 100644 index 0000000..182df80 --- /dev/null +++ b/assets/AWSStepFunction.png.license @@ -0,0 +1,3 @@ +SPDX-FileCopyrightText: Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved + +SPDX-License-Identifier: MIT-0 \ No newline at end of file diff --git a/assets/EMRServerlessApplication.png b/assets/EMRServerlessApplication.png new file mode 100644 index 0000000..53c9c5b Binary files /dev/null and b/assets/EMRServerlessApplication.png differ diff --git a/assets/EMRServerlessApplication.png.license b/assets/EMRServerlessApplication.png.license new file mode 100644 index 0000000..182df80 --- /dev/null +++ b/assets/EMRServerlessApplication.png.license @@ -0,0 +1,3 @@ +SPDX-FileCopyrightText: Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved + +SPDX-License-Identifier: MIT-0 \ No newline at end of file diff --git a/assets/EMRStudioApplications.png b/assets/EMRStudioApplications.png new file mode 100644 index 0000000..20f6ffb Binary files /dev/null and b/assets/EMRStudioApplications.png differ diff --git a/assets/EMRStudioApplications.png.license b/assets/EMRStudioApplications.png.license new file mode 100644 index 0000000..182df80 --- /dev/null +++ b/assets/EMRStudioApplications.png.license @@ -0,0 +1,3 @@ +SPDX-FileCopyrightText: Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved + +SPDX-License-Identifier: MIT-0 \ No newline at end of file diff --git a/assets/emr-serverless-click-logs-from-web-application.drawio.license b/assets/emr-serverless-click-logs-from-web-application.drawio.license new file mode 100644 index 0000000..182df80 --- /dev/null +++ b/assets/emr-serverless-click-logs-from-web-application.drawio.license @@ -0,0 +1,3 @@ +SPDX-FileCopyrightText: Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved + +SPDX-License-Identifier: MIT-0 \ No newline at end of file diff --git a/assets/emr-serverless-click-logs-from-web-application.drawio.png b/assets/emr-serverless-click-logs-from-web-application.drawio.png new file mode 100644 index 0000000..14500f3 Binary files /dev/null and b/assets/emr-serverless-click-logs-from-web-application.drawio.png differ diff --git a/assets/emr-serverless-click-logs-from-web-application.drawio.png.license b/assets/emr-serverless-click-logs-from-web-application.drawio.png.license new file mode 100644 index 0000000..182df80 --- /dev/null +++ b/assets/emr-serverless-click-logs-from-web-application.drawio.png.license @@ -0,0 +1,3 @@ +SPDX-FileCopyrightText: Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved + +SPDX-License-Identifier: MIT-0 \ No newline at end of file diff --git a/assets/s3_output_response_file.png b/assets/s3_output_response_file.png new file mode 100644 index 0000000..ebd7fd6 Binary files /dev/null and b/assets/s3_output_response_file.png differ diff --git a/assets/s3_output_response_file.png.license b/assets/s3_output_response_file.png.license new file mode 100644 index 0000000..182df80 --- /dev/null +++ b/assets/s3_output_response_file.png.license @@ -0,0 +1,3 @@ +SPDX-FileCopyrightText: Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved + +SPDX-License-Identifier: MIT-0 \ No newline at end of file diff --git a/assets/s3_source_parquet_files.png b/assets/s3_source_parquet_files.png new file mode 100644 index 0000000..b4e7ad1 Binary files /dev/null and b/assets/s3_source_parquet_files.png differ diff --git a/assets/s3_source_parquet_files.png.license b/assets/s3_source_parquet_files.png.license new file mode 100644 index 0000000..182df80 --- /dev/null +++ b/assets/s3_source_parquet_files.png.license @@ -0,0 +1,3 @@ +SPDX-FileCopyrightText: Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved + +SPDX-License-Identifier: MIT-0 \ No newline at end of file diff --git a/assets/step_function_caught.png b/assets/step_function_caught.png new file mode 100644 index 0000000..5030a92 Binary files /dev/null and b/assets/step_function_caught.png differ diff --git a/assets/step_function_caught.png.license b/assets/step_function_caught.png.license new file mode 100644 index 0000000..182df80 --- /dev/null +++ b/assets/step_function_caught.png.license @@ -0,0 +1,3 @@ +SPDX-FileCopyrightText: Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved + +SPDX-License-Identifier: MIT-0 \ No newline at end of file diff --git a/assets/step_function_success.png b/assets/step_function_success.png new file mode 100644 index 0000000..9175129 Binary files /dev/null and b/assets/step_function_success.png differ diff --git a/assets/step_function_success.png.license b/assets/step_function_success.png.license new file mode 100644 index 0000000..182df80 --- /dev/null +++ b/assets/step_function_success.png.license @@ -0,0 +1,3 @@ +SPDX-FileCopyrightText: Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved + +SPDX-License-Identifier: MIT-0 \ No newline at end of file diff --git a/assets/step_function_uncaught.png b/assets/step_function_uncaught.png new file mode 100644 index 0000000..3ea4e4e Binary files /dev/null and b/assets/step_function_uncaught.png differ diff --git a/assets/step_function_uncaught.png.license b/assets/step_function_uncaught.png.license new file mode 100644 index 0000000..182df80 --- /dev/null +++ b/assets/step_function_uncaught.png.license @@ -0,0 +1,3 @@ +SPDX-FileCopyrightText: Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved + +SPDX-License-Identifier: MIT-0 \ No newline at end of file diff --git a/cleanup.sh b/cleanup.sh new file mode 100755 index 0000000..06ea193 --- /dev/null +++ b/cleanup.sh @@ -0,0 +1,26 @@ +#! /bin/bash + +echo 'Cleaning up Deployed Infrastructure..' +echo $PWD +APP_DIR=$PWD +APP_PREFIX=clicklogger +STAGE_NAME=dev +REGION=us-east-1 + +ACCOUNT_ID=$(aws sts get-caller-identity | jq -r '.Account') +echo $ACCOUNT_ID + +aws s3 rb s3://$REGION-$APP_PREFIX-$STAGE_NAME-emr-logs-$ACCOUNT_ID --force +aws s3 rb s3://$REGION-$APP_PREFIX-$STAGE_NAME-firehose-delivery-$ACCOUNT_ID --force +aws s3 rb s3://$REGION-$APP_PREFIX-$STAGE_NAME-loggregator-output-$ACCOUNT_ID --force +aws s3 rb s3://$REGION-$APP_PREFIX-$STAGE_NAME-loggregator-source-$ACCOUNT_ID --force +aws s3 rb s3://$REGION-$APP_PREFIX-$STAGE_NAME-emr-studio-$ACCOUNT_ID --force +echo 'Deleted S3 contents' + +echo 'Terraform Destroy Resources' +cd $APP_DIR/terraform/workspaces/$REGION +terraform destroy --auto-approve + +cd $APP_DIR + +echo 'Completed Successfully!' diff --git a/exec.sh b/exec.sh new file mode 100755 index 0000000..df48644 --- /dev/null +++ b/exec.sh @@ -0,0 +1,42 @@ +#! /bin/bash + +echo $PWD +APP_DIR=$PWD +APP_PREFIX=clicklogger +STAGE_NAME=dev +REGION=us-east-1 +ACCOUNT_ID=$(aws sts get-caller-identity | jq -r '.Account') + +echo 'Building Source Lambda Jar' +cd $APP_DIR/source/clicklogger +mvn clean package +echo 'Building Source EMR Jar' +cd $APP_DIR/source/loggregator +# Make sure to have JAVA8 in your PATH +sbt reload +sbt compile +sbt package + +echo 'Deploying Terraform Resources' +cd $APP_DIR/terraform/workspaces/$REGION + +terraform init +terraform plan +terraform apply --auto-approve +# shellcheck disable=SC2103 + +cd $APP_DIR +echo 'Deployed Successfully!' + +echo 'Inserting Sample Data' +aws lambda invoke --function-name $APP_PREFIX-$STAGE_NAME-ingestion-lambda --cli-binary-format raw-in-base64-out --payload '{"requestid":"OAP-guid-001","contextid":"OAP-ctxt-001","callerid":"OrderingApplication","component":"login","action":"load","type":"webpage"}' out +aws lambda invoke --function-name $APP_PREFIX-$STAGE_NAME-ingestion-lambda --cli-binary-format raw-in-base64-out --payload '{"requestid":"OAP-guid-002","contextid":"OAP-ctxt-002","callerid":"OrderingApplication","component":"login","action":"load","type":"webpage"}' out +aws lambda invoke --function-name $APP_PREFIX-$STAGE_NAME-ingestion-lambda --cli-binary-format raw-in-base64-out --payload '{"requestid":"OAP-guid-003","contextid":"OAP-ctxt-003","callerid":"OrderingApplication","component":"products","action":"show","type":"webpage"}' out +aws lambda invoke --function-name $APP_PREFIX-$STAGE_NAME-ingestion-lambda --cli-binary-format raw-in-base64-out --payload '{"requestid":"OAP-guid-004","contextid":"OAP-ctxt-004","callerid":"OrderingApplication","component":"products","action":"show","type":"webpage"}' out +aws lambda invoke --function-name $APP_PREFIX-$STAGE_NAME-ingestion-lambda --cli-binary-format raw-in-base64-out --payload '{"requestid":"OAP-guid-005","contextid":"OAP-ctxt-005","callerid":"OrderingApplication","component":"checkout","action":"show","type":"webpage"}' out +aws lambda invoke --function-name $APP_PREFIX-$STAGE_NAME-ingestion-lambda --cli-binary-format raw-in-base64-out --payload '{"requestid":"OAP-guid-006","contextid":"OAP-ctxt-006","callerid":"OrderingApplication","component":"checkout","action":"show","type":"webpage"}' out +aws lambda invoke --function-name $APP_PREFIX-$STAGE_NAME-ingestion-lambda --cli-binary-format raw-in-base64-out --payload '{"requestid":"OAP-guid-007","contextid":"OAP-ctxt-007","callerid":"OrderingApplication","component":"submitorder","action":"backend","type":"process"}' out +aws lambda invoke --function-name $APP_PREFIX-$STAGE_NAME-ingestion-lambda --cli-binary-format raw-in-base64-out --payload '{"requestid":"OAP-guid-008","contextid":"OAP-ctxt-008","callerid":"OrderingApplication","component":"submitorder","action":"backend","type":"process"}' out + + +echo 'All process completed successfully!!' diff --git a/source/clicklogger/.gitignore b/source/clicklogger/.gitignore new file mode 100644 index 0000000..a2a3040 --- /dev/null +++ b/source/clicklogger/.gitignore @@ -0,0 +1,31 @@ +HELP.md +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/** +!**/src/test/** + +### STS ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ + +### VS Code ### +.vscode/ diff --git a/source/clicklogger/mvnw b/source/clicklogger/mvnw new file mode 100644 index 0000000..a16b543 --- /dev/null +++ b/source/clicklogger/mvnw @@ -0,0 +1,310 @@ +#!/bin/sh +# ---------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# ---------------------------------------------------------------------------- + +# ---------------------------------------------------------------------------- +# Maven Start Up Batch script +# +# Required ENV vars: +# ------------------ +# JAVA_HOME - location of a JDK home dir +# +# Optional ENV vars +# ----------------- +# M2_HOME - location of maven2's installed home dir +# MAVEN_OPTS - parameters passed to the Java VM when running Maven +# e.g. to debug Maven itself, use +# set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000 +# MAVEN_SKIP_RC - flag to disable loading of mavenrc files +# ---------------------------------------------------------------------------- + +if [ -z "$MAVEN_SKIP_RC" ] ; then + + if [ -f /etc/mavenrc ] ; then + . /etc/mavenrc + fi + + if [ -f "$HOME/.mavenrc" ] ; then + . "$HOME/.mavenrc" + fi + +fi + +# OS specific support. $var _must_ be set to either true or false. +cygwin=false; +darwin=false; +mingw=false +case "`uname`" in + CYGWIN*) cygwin=true ;; + MINGW*) mingw=true;; + Darwin*) darwin=true + # Use /usr/libexec/java_home if available, otherwise fall back to /Library/Java/Home + # See https://developer.apple.com/library/mac/qa/qa1170/_index.html + if [ -z "$JAVA_HOME" ]; then + if [ -x "/usr/libexec/java_home" ]; then + export JAVA_HOME="`/usr/libexec/java_home`" + else + export JAVA_HOME="/Library/Java/Home" + fi + fi + ;; +esac + +if [ -z "$JAVA_HOME" ] ; then + if [ -r /etc/gentoo-release ] ; then + JAVA_HOME=`java-config --jre-home` + fi +fi + +if [ -z "$M2_HOME" ] ; then + ## resolve links - $0 may be a link to maven's home + PRG="$0" + + # need this for relative symlinks + while [ -h "$PRG" ] ; do + ls=`ls -ld "$PRG"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG="`dirname "$PRG"`/$link" + fi + done + + saveddir=`pwd` + + M2_HOME=`dirname "$PRG"`/.. + + # make it fully qualified + M2_HOME=`cd "$M2_HOME" && pwd` + + cd "$saveddir" + # echo Using m2 at $M2_HOME +fi + +# For Cygwin, ensure paths are in UNIX format before anything is touched +if $cygwin ; then + [ -n "$M2_HOME" ] && + M2_HOME=`cygpath --unix "$M2_HOME"` + [ -n "$JAVA_HOME" ] && + JAVA_HOME=`cygpath --unix "$JAVA_HOME"` + [ -n "$CLASSPATH" ] && + CLASSPATH=`cygpath --path --unix "$CLASSPATH"` +fi + +# For Mingw, ensure paths are in UNIX format before anything is touched +if $mingw ; then + [ -n "$M2_HOME" ] && + M2_HOME="`(cd "$M2_HOME"; pwd)`" + [ -n "$JAVA_HOME" ] && + JAVA_HOME="`(cd "$JAVA_HOME"; pwd)`" +fi + +if [ -z "$JAVA_HOME" ]; then + javaExecutable="`which javac`" + if [ -n "$javaExecutable" ] && ! [ "`expr \"$javaExecutable\" : '\([^ ]*\)'`" = "no" ]; then + # readlink(1) is not available as standard on Solaris 10. + readLink=`which readlink` + if [ ! `expr "$readLink" : '\([^ ]*\)'` = "no" ]; then + if $darwin ; then + javaHome="`dirname \"$javaExecutable\"`" + javaExecutable="`cd \"$javaHome\" && pwd -P`/javac" + else + javaExecutable="`readlink -f \"$javaExecutable\"`" + fi + javaHome="`dirname \"$javaExecutable\"`" + javaHome=`expr "$javaHome" : '\(.*\)/bin'` + JAVA_HOME="$javaHome" + export JAVA_HOME + fi + fi +fi + +if [ -z "$JAVACMD" ] ; then + if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD="$JAVA_HOME/jre/sh/java" + else + JAVACMD="$JAVA_HOME/bin/java" + fi + else + JAVACMD="`which java`" + fi +fi + +if [ ! -x "$JAVACMD" ] ; then + echo "Error: JAVA_HOME is not defined correctly." >&2 + echo " We cannot execute $JAVACMD" >&2 + exit 1 +fi + +if [ -z "$JAVA_HOME" ] ; then + echo "Warning: JAVA_HOME environment variable is not set." +fi + +CLASSWORLDS_LAUNCHER=org.codehaus.plexus.classworlds.launcher.Launcher + +# traverses directory structure from process work directory to filesystem root +# first directory with .mvn subdirectory is considered project base directory +find_maven_basedir() { + + if [ -z "$1" ] + then + echo "Path not specified to find_maven_basedir" + return 1 + fi + + basedir="$1" + wdir="$1" + while [ "$wdir" != '/' ] ; do + if [ -d "$wdir"/.mvn ] ; then + basedir=$wdir + break + fi + # workaround for JBEAP-8937 (on Solaris 10/Sparc) + if [ -d "${wdir}" ]; then + wdir=`cd "$wdir/.."; pwd` + fi + # end of workaround + done + echo "${basedir}" +} + +# concatenates all lines of a file +concat_lines() { + if [ -f "$1" ]; then + echo "$(tr -s '\n' ' ' < "$1")" + fi +} + +BASE_DIR=`find_maven_basedir "$(pwd)"` +if [ -z "$BASE_DIR" ]; then + exit 1; +fi + +########################################################################################## +# Extension to allow automatically downloading the maven-wrapper.jar from Maven-central +# This allows using the maven wrapper in projects that prohibit checking in binary data. +########################################################################################## +if [ -r "$BASE_DIR/.mvn/wrapper/maven-wrapper.jar" ]; then + if [ "$MVNW_VERBOSE" = true ]; then + echo "Found .mvn/wrapper/maven-wrapper.jar" + fi +else + if [ "$MVNW_VERBOSE" = true ]; then + echo "Couldn't find .mvn/wrapper/maven-wrapper.jar, downloading it ..." + fi + if [ -n "$MVNW_REPOURL" ]; then + jarUrl="$MVNW_REPOURL/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar" + else + jarUrl="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar" + fi + while IFS="=" read key value; do + case "$key" in (wrapperUrl) jarUrl="$value"; break ;; + esac + done < "$BASE_DIR/.mvn/wrapper/maven-wrapper.properties" + if [ "$MVNW_VERBOSE" = true ]; then + echo "Downloading from: $jarUrl" + fi + wrapperJarPath="$BASE_DIR/.mvn/wrapper/maven-wrapper.jar" + if $cygwin; then + wrapperJarPath=`cygpath --path --windows "$wrapperJarPath"` + fi + + if command -v wget > /dev/null; then + if [ "$MVNW_VERBOSE" = true ]; then + echo "Found wget ... using wget" + fi + if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then + wget "$jarUrl" -O "$wrapperJarPath" + else + wget --http-user=$MVNW_USERNAME --http-password=$MVNW_PASSWORD "$jarUrl" -O "$wrapperJarPath" + fi + elif command -v curl > /dev/null; then + if [ "$MVNW_VERBOSE" = true ]; then + echo "Found curl ... using curl" + fi + if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then + curl -o "$wrapperJarPath" "$jarUrl" -f + else + curl --user $MVNW_USERNAME:$MVNW_PASSWORD -o "$wrapperJarPath" "$jarUrl" -f + fi + + else + if [ "$MVNW_VERBOSE" = true ]; then + echo "Falling back to using Java to download" + fi + javaClass="$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.java" + # For Cygwin, switch paths to Windows format before running javac + if $cygwin; then + javaClass=`cygpath --path --windows "$javaClass"` + fi + if [ -e "$javaClass" ]; then + if [ ! -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then + if [ "$MVNW_VERBOSE" = true ]; then + echo " - Compiling MavenWrapperDownloader.java ..." + fi + # Compiling the Java class + ("$JAVA_HOME/bin/javac" "$javaClass") + fi + if [ -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then + # Running the downloader + if [ "$MVNW_VERBOSE" = true ]; then + echo " - Running MavenWrapperDownloader.java ..." + fi + ("$JAVA_HOME/bin/java" -cp .mvn/wrapper MavenWrapperDownloader "$MAVEN_PROJECTBASEDIR") + fi + fi + fi +fi +########################################################################################## +# End of extension +########################################################################################## + +export MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-"$BASE_DIR"} +if [ "$MVNW_VERBOSE" = true ]; then + echo $MAVEN_PROJECTBASEDIR +fi +MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS" + +# For Cygwin, switch paths to Windows format before running java +if $cygwin; then + [ -n "$M2_HOME" ] && + M2_HOME=`cygpath --path --windows "$M2_HOME"` + [ -n "$JAVA_HOME" ] && + JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"` + [ -n "$CLASSPATH" ] && + CLASSPATH=`cygpath --path --windows "$CLASSPATH"` + [ -n "$MAVEN_PROJECTBASEDIR" ] && + MAVEN_PROJECTBASEDIR=`cygpath --path --windows "$MAVEN_PROJECTBASEDIR"` +fi + +# Provide a "standardized" way to retrieve the CLI args that will +# work with both Windows and non-Windows executions. +MAVEN_CMD_LINE_ARGS="$MAVEN_CONFIG $@" +export MAVEN_CMD_LINE_ARGS + +WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain + +exec "$JAVACMD" \ + $MAVEN_OPTS \ + -classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \ + "-Dmaven.home=${M2_HOME}" "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \ + ${WRAPPER_LAUNCHER} $MAVEN_CONFIG "$@" diff --git a/source/clicklogger/mvnw.cmd b/source/clicklogger/mvnw.cmd new file mode 100644 index 0000000..c8d4337 --- /dev/null +++ b/source/clicklogger/mvnw.cmd @@ -0,0 +1,182 @@ +@REM ---------------------------------------------------------------------------- +@REM Licensed to the Apache Software Foundation (ASF) under one +@REM or more contributor license agreements. See the NOTICE file +@REM distributed with this work for additional information +@REM regarding copyright ownership. The ASF licenses this file +@REM to you under the Apache License, Version 2.0 (the +@REM "License"); you may not use this file except in compliance +@REM with the License. You may obtain a copy of the License at +@REM +@REM https://www.apache.org/licenses/LICENSE-2.0 +@REM +@REM Unless required by applicable law or agreed to in writing, +@REM software distributed under the License is distributed on an +@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +@REM KIND, either express or implied. See the License for the +@REM specific language governing permissions and limitations +@REM under the License. +@REM ---------------------------------------------------------------------------- + +@REM ---------------------------------------------------------------------------- +@REM Maven Start Up Batch script +@REM +@REM Required ENV vars: +@REM JAVA_HOME - location of a JDK home dir +@REM +@REM Optional ENV vars +@REM M2_HOME - location of maven2's installed home dir +@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands +@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a keystroke before ending +@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven +@REM e.g. to debug Maven itself, use +@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000 +@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files +@REM ---------------------------------------------------------------------------- + +@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on' +@echo off +@REM set title of command window +title %0 +@REM enable echoing by setting MAVEN_BATCH_ECHO to 'on' +@if "%MAVEN_BATCH_ECHO%" == "on" echo %MAVEN_BATCH_ECHO% + +@REM set %HOME% to equivalent of $HOME +if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%") + +@REM Execute a user defined script before this one +if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre +@REM check for pre script, once with legacy .bat ending and once with .cmd ending +if exist "%HOME%\mavenrc_pre.bat" call "%HOME%\mavenrc_pre.bat" +if exist "%HOME%\mavenrc_pre.cmd" call "%HOME%\mavenrc_pre.cmd" +:skipRcPre + +@setlocal + +set ERROR_CODE=0 + +@REM To isolate internal variables from possible post scripts, we use another setlocal +@setlocal + +@REM ==== START VALIDATION ==== +if not "%JAVA_HOME%" == "" goto OkJHome + +echo. +echo Error: JAVA_HOME not found in your environment. >&2 +echo Please set the JAVA_HOME variable in your environment to match the >&2 +echo location of your Java installation. >&2 +echo. +goto error + +:OkJHome +if exist "%JAVA_HOME%\bin\java.exe" goto init + +echo. +echo Error: JAVA_HOME is set to an invalid directory. >&2 +echo JAVA_HOME = "%JAVA_HOME%" >&2 +echo Please set the JAVA_HOME variable in your environment to match the >&2 +echo location of your Java installation. >&2 +echo. +goto error + +@REM ==== END VALIDATION ==== + +:init + +@REM Find the project base dir, i.e. the directory that contains the folder ".mvn". +@REM Fallback to current working directory if not found. + +set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR% +IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir + +set EXEC_DIR=%CD% +set WDIR=%EXEC_DIR% +:findBaseDir +IF EXIST "%WDIR%"\.mvn goto baseDirFound +cd .. +IF "%WDIR%"=="%CD%" goto baseDirNotFound +set WDIR=%CD% +goto findBaseDir + +:baseDirFound +set MAVEN_PROJECTBASEDIR=%WDIR% +cd "%EXEC_DIR%" +goto endDetectBaseDir + +:baseDirNotFound +set MAVEN_PROJECTBASEDIR=%EXEC_DIR% +cd "%EXEC_DIR%" + +:endDetectBaseDir + +IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig + +@setlocal EnableExtensions EnableDelayedExpansion +for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a +@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS% + +:endReadAdditionalConfig + +SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe" +set WRAPPER_JAR="%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.jar" +set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain + +set DOWNLOAD_URL="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar" + +FOR /F "tokens=1,2 delims==" %%A IN ("%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.properties") DO ( + IF "%%A"=="wrapperUrl" SET DOWNLOAD_URL=%%B +) + +@REM Extension to allow automatically downloading the maven-wrapper.jar from Maven-central +@REM This allows using the maven wrapper in projects that prohibit checking in binary data. +if exist %WRAPPER_JAR% ( + if "%MVNW_VERBOSE%" == "true" ( + echo Found %WRAPPER_JAR% + ) +) else ( + if not "%MVNW_REPOURL%" == "" ( + SET DOWNLOAD_URL="%MVNW_REPOURL%/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar" + ) + if "%MVNW_VERBOSE%" == "true" ( + echo Couldn't find %WRAPPER_JAR%, downloading it ... + echo Downloading from: %DOWNLOAD_URL% + ) + + powershell -Command "&{"^ + "$webclient = new-object System.Net.WebClient;"^ + "if (-not ([string]::IsNullOrEmpty('%MVNW_USERNAME%') -and [string]::IsNullOrEmpty('%MVNW_PASSWORD%'))) {"^ + "$webclient.Credentials = new-object System.Net.NetworkCredential('%MVNW_USERNAME%', '%MVNW_PASSWORD%');"^ + "}"^ + "[Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::Tls12; $webclient.DownloadFile('%DOWNLOAD_URL%', '%WRAPPER_JAR%')"^ + "}" + if "%MVNW_VERBOSE%" == "true" ( + echo Finished downloading %WRAPPER_JAR% + ) +) +@REM End of extension + +@REM Provide a "standardized" way to retrieve the CLI args that will +@REM work with both Windows and non-Windows executions. +set MAVEN_CMD_LINE_ARGS=%* + +%MAVEN_JAVA_EXE% %JVM_CONFIG_MAVEN_PROPS% %MAVEN_OPTS% %MAVEN_DEBUG_OPTS% -classpath %WRAPPER_JAR% "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" %WRAPPER_LAUNCHER% %MAVEN_CONFIG% %* +if ERRORLEVEL 1 goto error +goto end + +:error +set ERROR_CODE=1 + +:end +@endlocal & set ERROR_CODE=%ERROR_CODE% + +if not "%MAVEN_SKIP_RC%" == "" goto skipRcPost +@REM check for post script, once with legacy .bat ending and once with .cmd ending +if exist "%HOME%\mavenrc_post.bat" call "%HOME%\mavenrc_post.bat" +if exist "%HOME%\mavenrc_post.cmd" call "%HOME%\mavenrc_post.cmd" +:skipRcPost + +@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on' +if "%MAVEN_BATCH_PAUSE%" == "on" pause + +if "%MAVEN_TERMINATE_CMD%" == "on" exit %ERROR_CODE% + +exit /B %ERROR_CODE% diff --git a/source/clicklogger/pom.xml b/source/clicklogger/pom.xml new file mode 100644 index 0000000..b3d95bc --- /dev/null +++ b/source/clicklogger/pom.xml @@ -0,0 +1,153 @@ + + 4.0.0 + com.clicklogs + clicklogger + jar + 1.0-SNAPSHOT + clicklogger + + UTF-8 + 1.8 + 1.8 + + + + + software.amazon.awssdk + bom + 2.17.202 + pom + import + + + + + + com.amazonaws + aws-lambda-java-core + 1.2.1 + + + com.amazonaws + aws-lambda-java-events + 2.2.8 + + + com.amazonaws + aws-java-sdk-kinesis + 1.11.774 + + + software.amazon.awssdk + regions + + + software.amazon.awssdk + aws-core + + + software.amazon.awssdk + aws-json-protocol + + + software.amazon.awssdk + url-connection-client + + + software.amazon.awssdk + emrserverless + + + org.apache.commons + commons-lang3 + 3.10 + + + com.google.code.gson + gson + 2.8.9 + + + org.apache.logging.log4j + log4j-api + 2.17.1 + test + + + org.apache.logging.log4j + log4j-core + 2.17.1 + test + + + org.apache.logging.log4j + log4j-slf4j18-impl + 2.13.0 + test + + + org.junit.jupiter + junit-jupiter-api + 5.6.0 + test + + + org.junit.jupiter + junit-jupiter-engine + 5.6.0 + test + + + + org.mockito + mockito-core + 2.22.0 + test + + + junit + junit + 4.13.1 + + + + + + + maven-surefire-plugin + 2.22.2 + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.2 + + false + + + + package + + shade + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + 1.8 + 1.8 + + + + + diff --git a/source/clicklogger/src/main/java/com/clicklogs/Handlers/ClickLoggerEMRJobHandler.java b/source/clicklogger/src/main/java/com/clicklogs/Handlers/ClickLoggerEMRJobHandler.java new file mode 100644 index 0000000..63639dc --- /dev/null +++ b/source/clicklogger/src/main/java/com/clicklogs/Handlers/ClickLoggerEMRJobHandler.java @@ -0,0 +1,200 @@ +package com.clicklogs.Handlers; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.LambdaLogger; +import com.amazonaws.services.lambda.runtime.RequestHandler; +import com.clicklogs.model.StartJobRequest; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient; +import software.amazon.awssdk.services.emrserverless.EmrServerlessClient; +import software.amazon.awssdk.services.emrserverless.model.*; +import com.clicklogs.model.ClickLogResponse; +import com.clicklogs.model.ResponseBuilder; +import com.clicklogs.model.ClickLoggerException; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import org.apache.commons.lang3.StringUtils; + +import java.lang.Thread; +import java.text.Format; +import java.text.SimpleDateFormat; +import java.util.Arrays; +import java.util.Date; + +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * SPDX-License-Identifier: MIT-0 + */ + +public class ClickLoggerEMRJobHandler implements RequestHandler{ + + private String applicationName = "clicklogger-dev-emr-serverless-application"; + private String applicationId = ""; + private String executionRoleArn = ""; + private String entrypoint = ""; + private String mainClass = ""; + private String outputBucket = ""; + private String sourceBucket = ""; + private String logsOutputPath = ""; + private Integer emrJobTimout = 5000; + + Gson gson = new GsonBuilder().setPrettyPrinting().create(); + + LambdaLogger logger = null; + @Override + public ClickLogResponse handleRequest(final StartJobRequest startJobRequest, final Context context) { + logger = context.getLogger(); + final String success_response = new String("200 OK"); + final String fail_response = new String("400 ERROR"); + + ResponseBuilder responseBuilder = new ResponseBuilder(); + ClickLogResponse response = responseBuilder.badRequest(fail_response).build(); + + + logger.log("Incoming request - " + gson.toJson(startJobRequest)); + + String envAppName = System.getenv("APPLICATION_NAME"); + if(!StringUtils.isBlank(envAppName)) + { + applicationName = envAppName; + } + + String envAppId = System.getenv("APPLICATION_ID"); + if(!StringUtils.isBlank(envAppId)) + { + applicationId = envAppId; + } + + String envExecRole = System.getenv("EXECUTION_ROLE_ARN"); + if(!StringUtils.isBlank(envExecRole)) + { + executionRoleArn = envExecRole; + } + + String envEntryPoint = System.getenv("ENTRY_POINT"); + if(!StringUtils.isBlank(envEntryPoint)) + { + entrypoint = envEntryPoint; + } + + String envMainClass = System.getenv("MAIN_CLASS"); + if(!StringUtils.isBlank(envMainClass)) + { + mainClass = envMainClass; + } + + String envOutputBucket = System.getenv("OUTPUT_BUCKET"); + if(!StringUtils.isBlank(envOutputBucket)) + { + outputBucket = envOutputBucket; + } + + String envSourceBucket = System.getenv("SOURCE_BUCKET"); + if(!StringUtils.isBlank(envSourceBucket)) + { + sourceBucket = envSourceBucket; + } + + String envLogsOutputBucket = System.getenv("LOGS_OUTPUT_PATH"); + if(!StringUtils.isBlank(envLogsOutputBucket)) + { + logsOutputPath = envLogsOutputBucket; + } + + emrJobTimout = Integer.parseInt(System.getenv("EMR_GET_SLEEP_TIME")); + + String datetime = ""; + if (startJobRequest == null || + (startJobRequest != null && StringUtils.isBlank(startJobRequest.getDate()))) { + logger.log("setting default createdtime"); + Format f = new SimpleDateFormat("yyyy-MM-dd"); + datetime = f.format(new Date()); + } + else{ + datetime = startJobRequest.getDate(); + } + + logger.log("Starting EMR Serverless job for the date - "+ datetime); + + try { + startEMRJobRun(datetime, applicationId, applicationName, + executionRoleArn, mainClass, + entrypoint, sourceBucket, outputBucket, emrJobTimout); + } catch (InterruptedException e) { + logger.log("Error occurred starting EMR Job"); + throw new ClickLoggerException("Error occurred starting EMR Job"); + } + logger.log("Stopping application"); + stopApplication(applicationId); + logger.log(success_response); + responseBuilder = new ResponseBuilder(); + responseBuilder.ok(); + response = responseBuilder.originHeader("*").build(); + logger.log("Returning response " + gson.toJson(response)); + return response; + } + + private void startEMRJobRun(String inputDate, + String applicationId, String applicationName, String executionRoleArn, + String mainClass, String entrypoint, String sourceBucket, + String outputBucket, Integer emrJobTimout) throws InterruptedException { + EmrServerlessClient client = getClient(); + + S3MonitoringConfiguration s3MonitoringConfiguration = S3MonitoringConfiguration.builder().logUri(logsOutputPath).build(); + StartJobRunRequest jobRunRequest = StartJobRunRequest.builder() + .name(applicationName) + .applicationId(applicationId) + .executionRoleArn(executionRoleArn) + .jobDriver( + JobDriver.fromSparkSubmit(SparkSubmit.builder() + .entryPoint(entrypoint) + .entryPointArguments(Arrays.asList(inputDate, sourceBucket, outputBucket)) + .sparkSubmitParameters(mainClass) + .build()) + ) + .configurationOverrides(ConfigurationOverrides.builder() + .monitoringConfiguration(MonitoringConfiguration.builder() + .s3MonitoringConfiguration(s3MonitoringConfiguration) + .build()).build() + .toBuilder().build()).build(); + + logger.log("Starting job run"); + StartJobRunResponse response = client.startJobRun(jobRunRequest); + + String jobRunId = response.jobRunId(); + GetJobRunRequest getJobRunRequest = GetJobRunRequest.builder() + .applicationId(applicationId) + .jobRunId(jobRunId) + .build(); + GetJobRunResponse jobRunResponse = client.getJobRun(getJobRunRequest); + + while(true){ + Thread.sleep(emrJobTimout); + jobRunResponse = client.getJobRun(getJobRunRequest); + + if(jobRunResponse != null){ + JobRunState jobState = jobRunResponse.jobRun().state(); + if(jobState.name().equals("SUCCESS") || jobState.name().equals("FAILED") || + jobState.name().equals("CANCELLING") || jobState.name().equals("CANCELLED")){ + logger.log("Job Completed Successfully!"); + break; + } + } + } + } + + private StopApplicationResponse stopApplication(String applicationId) { + EmrServerlessClient client = getClient(); + StopApplicationRequest stopApp = StopApplicationRequest.builder().applicationId(applicationId).build(); + return client.stopApplication(stopApp); + } + + private EmrServerlessClient getClient() { + return EmrServerlessClient.builder() + .credentialsProvider(DefaultCredentialsProvider.create()) + .httpClient(UrlConnectionHttpClient.builder().build()) + .build(); + } +} \ No newline at end of file diff --git a/source/clicklogger/src/main/java/com/clicklogs/Handlers/ClickLoggerHandler.java b/source/clicklogger/src/main/java/com/clicklogs/Handlers/ClickLoggerHandler.java new file mode 100644 index 0000000..8066b49 --- /dev/null +++ b/source/clicklogger/src/main/java/com/clicklogs/Handlers/ClickLoggerHandler.java @@ -0,0 +1,151 @@ +package com.clicklogs.Handlers; + +import com.amazonaws.services.kinesisfirehose.model.Record; +import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehose; +import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClientBuilder; +import com.amazonaws.services.kinesisfirehose.model.PutRecordRequest; +import com.amazonaws.services.kinesisfirehose.model.PutRecordResult; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.RequestHandler; +import com.amazonaws.services.lambda.runtime.LambdaLogger; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import org.apache.commons.lang3.StringUtils; + +import java.nio.ByteBuffer; +import java.text.Format; +import java.text.SimpleDateFormat; +import java.util.Date; + +import com.clicklogs.model.ClickLogRequest; +import com.clicklogs.model.ClickLogResponse; +import com.clicklogs.model.ResponseBuilder; + +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * SPDX-License-Identifier: MIT-0 + */ + +public class ClickLoggerHandler implements RequestHandler { + + private String stream_name = "click-logger-firehose-delivery-stream"; + private String region = "us-east-1"; + + Gson gson = new GsonBuilder().setPrettyPrinting().create(); + + @Override + public ClickLogResponse handleRequest(final ClickLogRequest clickLogRequest, final Context context) { + final LambdaLogger logger = context.getLogger(); + final String success_response = new String("200 OK"); + final String fail_response = new String("400 ERROR"); + + ResponseBuilder responseBuilder = new ResponseBuilder(); + ClickLogResponse response = responseBuilder.badRequest(fail_response).build(); + + + String env_stream_name = System.getenv("STREAM_NAME"); + if (!StringUtils.isBlank(env_stream_name)) { + stream_name = env_stream_name; + } + + String env_region = System.getenv("REGION"); + logger.log("Environment region name - " + env_region); + if (!StringUtils.isBlank(env_region)) { + region = env_region; + } + if (clickLogRequest != null) { + String req = clickLogRequest.getRequestid() + " - " + clickLogRequest.getCallerid() + " - " + + clickLogRequest.getComponent() + " - " + clickLogRequest.getType() + " - " + clickLogRequest.getAction() + + " - " + clickLogRequest.getUser() + " - " + clickLogRequest.getClientip() + " - " + + clickLogRequest.getCreatedtime(); + logger.log("Incoming request variables - " + req); + + if (validateRequest(clickLogRequest, logger, response)) return response; + } + + System.out.println("Calling updateclicklogs method for the received clicklogrequest"); + + updateClickLogRequestToStream(clickLogRequest); + logger.log(success_response); + responseBuilder = new ResponseBuilder(); + responseBuilder.ok(); + response = responseBuilder.originHeader("*").build(); + return response; + } + + private boolean validateRequest(ClickLogRequest clickLogRequest, LambdaLogger logger, ClickLogResponse response) { + logger.log("Validating inputs"); + if (StringUtils.isBlank(clickLogRequest.getRequestid())) { + logger.log("error occurred - requestid missing"); + return true; + } + if (StringUtils.isBlank(clickLogRequest.getContextid())) { + logger.log("error occurred - contextid missing"); + return true; + } + if (StringUtils.isBlank(clickLogRequest.getCallerid())) { + logger.log("error occurred - caller missing"); + return true; + } + if (StringUtils.isBlank(clickLogRequest.getType())) { + logger.log("error occurred - type missing"); + return true; + } + if (StringUtils.isBlank(clickLogRequest.getAction())) { + logger.log("error occurred - action missing"); + return true; + } + if (StringUtils.isBlank(clickLogRequest.getComponent())) { + logger.log("error occurred - component missing"); + return true; + } + + String user = "GUEST"; + if (StringUtils.isBlank(clickLogRequest.getUser())) { + logger.log("setting default user"); + clickLogRequest.setUser(user); + } + + String clientip = "APIGWY"; + if (StringUtils.isBlank(clickLogRequest.getClientip())) { + logger.log("setting default clientip"); + clickLogRequest.setClientip(clientip); + } + + String datetime = ""; + if (StringUtils.isBlank(clickLogRequest.getCreatedtime())) { + logger.log("setting default createdtime"); + Format f = new SimpleDateFormat("MM-dd-yyyy hh:mm:ss"); + datetime = f.format(new Date()); + clickLogRequest.setCreatedtime(datetime); + } + logger.log("Validated inputs"); + return false; + } + + private Boolean updateClickLogRequestToStream(ClickLogRequest clickLogRequest) { + System.out.println("Inside updateClickLogRequestToStream method for the input"); + try { + + AmazonKinesisFirehose amazonKinesisFirehoseClient = AmazonKinesisFirehoseClientBuilder.standard().withRegion(region).build(); + + PutRecordRequest putRecordRequest = new PutRecordRequest(); + putRecordRequest.setDeliveryStreamName(stream_name); + Gson gson = new Gson(); + String messageJson = gson.toJson(clickLogRequest); + System.out.println("gson - " + messageJson); + Record record = new Record().withData(ByteBuffer.wrap(messageJson.toString().getBytes())); + putRecordRequest.setRecord(record); + PutRecordResult putRecordResult = amazonKinesisFirehoseClient.putRecord(putRecordRequest); + System.out.println("updated the stream for recordid - " + putRecordResult.getRecordId()); + return true; + } catch (Exception e) { + System.out.println("Error occurred - " + e.getMessage()); + } + return false; + } + +} \ No newline at end of file diff --git a/source/clicklogger/src/main/java/com/clicklogs/model/ClickLogRequest.java b/source/clicklogger/src/main/java/com/clicklogs/model/ClickLogRequest.java new file mode 100644 index 0000000..dfc97f4 --- /dev/null +++ b/source/clicklogger/src/main/java/com/clicklogs/model/ClickLogRequest.java @@ -0,0 +1,94 @@ +package com.clicklogs.model; + +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * SPDX-License-Identifier: MIT-0 + */ + +public class ClickLogRequest { + + public String requestid; + public String contextid; + public String callerid; + public String type; + public String component; + public String action; + public String user; + public String clientip; + public String createdtime; + + public String getRequestid() { + return requestid; + } + + public String getContextid() { + return contextid; + } + + public String getCallerid() { + return callerid; + } + + public String getType() { + return type; + } + + public String getComponent() { + return component; + } + + public String getAction() { + return action; + } + + public String getCreatedtime() { + return createdtime; + } + + public String getUser() { + return user; + } + + public String getClientip() { + return clientip; + } + + public void setRequestid(String requestid) { + this.requestid = requestid; + } + + public void setContextid(String contextid) { + this.contextid = contextid; + } + + public void setCallerid(String callerid) { + this.callerid = callerid; + } + + public void setType(String type) { + this.type = type; + } + + + public void setComponent(String component) { + this.component = component; + } + + public void setAction(String action) { + this.action = action; + } + + public void setCreatedtime(String createdtime) { + this.createdtime = createdtime; + } + + public void setUser(String user) { + this.user = user; + } + + public void setClientip(String clientip) { + this.clientip = clientip; + } + +} \ No newline at end of file diff --git a/source/clicklogger/src/main/java/com/clicklogs/model/ClickLogResponse.java b/source/clicklogger/src/main/java/com/clicklogs/model/ClickLogResponse.java new file mode 100644 index 0000000..a010ca9 --- /dev/null +++ b/source/clicklogger/src/main/java/com/clicklogs/model/ClickLogResponse.java @@ -0,0 +1,47 @@ +package com.clicklogs.model; +import java.util.Map; + +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * SPDX-License-Identifier: MIT-0 + */ + +public class ClickLogResponse { + + private int statusCode; + + private Map headers; + + private String body; + + public ClickLogResponse(int statusCode, Map headers, String body) { + this.statusCode = statusCode; + this.headers = headers; + this.body = body; + } + + public int getStatusCode() { + return statusCode; + } + + public Map getHeaders() { + return headers; + } + + public String getBody() { + return body; + } + + public void setStatusCode(int statusCode) { + this.statusCode = statusCode; + } + + public void setHeaders(Map headers) { + this.headers = headers; + } + + public void setBody(String body) { + this.body = body; + } +} \ No newline at end of file diff --git a/source/clicklogger/src/main/java/com/clicklogs/model/ClickLoggerException.java b/source/clicklogger/src/main/java/com/clicklogs/model/ClickLoggerException.java new file mode 100644 index 0000000..23944d4 --- /dev/null +++ b/source/clicklogger/src/main/java/com/clicklogs/model/ClickLoggerException.java @@ -0,0 +1,9 @@ +package com.clicklogs.model; + +import java.lang.RuntimeException; + +public class ClickLoggerException extends RuntimeException { + public ClickLoggerException(String errorMessage) { + super(errorMessage); + } +} \ No newline at end of file diff --git a/source/clicklogger/src/main/java/com/clicklogs/model/ResponseBuilder.java b/source/clicklogger/src/main/java/com/clicklogs/model/ResponseBuilder.java new file mode 100644 index 0000000..21b3d71 --- /dev/null +++ b/source/clicklogger/src/main/java/com/clicklogs/model/ResponseBuilder.java @@ -0,0 +1,61 @@ +package com.clicklogs.model; + +import java.util.HashMap; +import java.util.Map; + +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * SPDX-License-Identifier: MIT-0 + */ + +public class ResponseBuilder { + private static final String ACCESS_CONTROL_ALLOW_HEADERS = "Access-Control-Allow-Headers"; + + private static final String ACCESS_CONTROL_ALLOW_ORIGIN = "Access-Control-Allow-Origin"; + + private int statusCode; + + private Map headers = new HashMap<>(); + + private String body; + + public ResponseBuilder headers(Map headers) { + this.headers = headers; + return this; + } + + public ResponseBuilder body(String body) { + this.body = body; + return this; + } + + public ResponseBuilder ok() { + this.statusCode = 200; + return this; + } + + public ResponseBuilder badRequest(String body) { + this.body = buildErrorMsg(body); + this.statusCode = 400; + return this; + } + + private String buildErrorMsg(String body) { + return "{\"message\": \"" + body + "\"}"; + } + + public ResponseBuilder originHeader(String domain) { + headers.put(ACCESS_CONTROL_ALLOW_ORIGIN, domain); + return this; + } + + private void initDefaultHeaders() { + headers.put(ACCESS_CONTROL_ALLOW_HEADERS, "Origin, Access-Control-Allow-Headers, X-Requested-With, Content-Type, Accept"); + } + + public ClickLogResponse build() { + this.initDefaultHeaders(); + return new ClickLogResponse(statusCode, headers, body); + } +} \ No newline at end of file diff --git a/source/clicklogger/src/main/java/com/clicklogs/model/StartJobRequest.java b/source/clicklogger/src/main/java/com/clicklogs/model/StartJobRequest.java new file mode 100644 index 0000000..6eec0d0 --- /dev/null +++ b/source/clicklogger/src/main/java/com/clicklogs/model/StartJobRequest.java @@ -0,0 +1,20 @@ +package com.clicklogs.model; + +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * SPDX-License-Identifier: MIT-0 + */ + +public class StartJobRequest { + + public String date; + + public String getDate() { + return date; + } + + public void setDate(String date) { + this.date = date; + } +} \ No newline at end of file diff --git a/source/clicklogger/src/main/resources/application.properties b/source/clicklogger/src/main/resources/application.properties new file mode 100644 index 0000000..e69de29 diff --git a/source/clicklogger/src/test/java/com/clicklogs/ClickLoggerHandlerTest.java b/source/clicklogger/src/test/java/com/clicklogs/ClickLoggerHandlerTest.java new file mode 100644 index 0000000..962c44b --- /dev/null +++ b/source/clicklogger/src/test/java/com/clicklogs/ClickLoggerHandlerTest.java @@ -0,0 +1,85 @@ +package com.clicklogs; + +import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehose; +import com.amazonaws.services.kinesisfirehose.model.PutRecordRequest; +import com.amazonaws.services.kinesisfirehose.model.PutRecordResult; +import com.amazonaws.services.kinesisfirehose.model.Record; +import com.amazonaws.services.lambda.runtime.Context; +import com.clicklogs.Handlers.ClickLoggerHandler; +import com.clicklogs.model.ClickLogRequest; + +import org.junit.Assert; +import java.nio.ByteBuffer; +import com.google.gson.Gson; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * SPDX-License-Identifier: MIT-0 + */ + +@RunWith(MockitoJUnitRunner.class) +public class ClickLoggerHandlerTest { + + private static String deliveryStreamName = "delivery-stream"; + Gson gson = new Gson(); + ClickLogRequest clickLogRequest = new ClickLogRequest(); + PutRecordRequest putRecordRequest = new PutRecordRequest(); + Record record = new Record(); + PutRecordResult putRecordResult = new PutRecordResult(); + + Context context = Mockito.mock(Context.class); + + protected AmazonKinesisFirehose amazonKinesisFirehoseClient = Mockito.mock(AmazonKinesisFirehose.class); + ClickLoggerHandler clickLoggerHandler = Mockito.mock(ClickLoggerHandler.class); + + @Before + public void setup() { + clickLogRequest = new ClickLogRequest(); + clickLogRequest.setAction("ACTION"); + clickLogRequest.setCallerid("CALLERID"); + clickLogRequest.setClientip("CLIENTIP"); + clickLogRequest.setComponent("COMPONENT"); + clickLogRequest.setContextid("CONTEXTID"); + clickLogRequest.setCreatedtime("CREATEDTIME"); + clickLogRequest.setRequestid("REQUESTID"); + clickLogRequest.setType("TYPE"); + clickLogRequest.setUser("USER"); + + when(clickLogRequest).thenReturn(this.clickLogRequest); + + amazonKinesisFirehoseClient = Mockito.mock(AmazonKinesisFirehose.class); + when(amazonKinesisFirehoseClient).thenReturn(amazonKinesisFirehoseClient); + + putRecordResult = new PutRecordResult(); + putRecordResult.setRecordId("SUCCESS_RECORD_ID"); + when(any(PutRecordResult.class)).thenReturn(putRecordResult); + } + + @Test + void invokeTest() { + clickLoggerHandler.handleRequest(clickLogRequest, context); + + putRecordRequest.setDeliveryStreamName(deliveryStreamName); + + Gson gson = new Gson(); + String messageJson = gson.toJson(clickLogRequest); + System.out.println("gson - " + messageJson); + record = new Record().withData(ByteBuffer.wrap(messageJson.toString().getBytes())); + putRecordRequest.setRecord(record); + + amazonKinesisFirehoseClient = Mockito.mock(AmazonKinesisFirehose.class); + PutRecordResult result = amazonKinesisFirehoseClient.putRecord( putRecordRequest ); + Assert.assertEquals(result.getRecordId(), "SUCCESS_RECORD_ID"); + + } + +} \ No newline at end of file diff --git a/source/loggregator/README.md b/source/loggregator/README.md new file mode 100644 index 0000000..690c346 --- /dev/null +++ b/source/loggregator/README.md @@ -0,0 +1,12 @@ +$ sbt reload + +$ sbt compile + +$ sbt package + +$ java -jar target/scala-2.13/loggregator-assembly-0.1.jar com.examples.clicklogger "2020-06-15" "clicklogger-dev-firehose-delivery-bucket-" "clicklogger-dev-loggregator-output-bucket-" + +$ emr console + +- command-runner.jar +- spark-submit --deploy-mode client --class com.examples.clicklogger.Loggregator s3://clicklogger-emr-source/loggregator-assembly-0.1.jar 2022-07-18 clicklogger-dev-firehose-delivery-bucket- clicklogger-dev-loggregator-output-bucket- diff --git a/source/loggregator/build.sbt b/source/loggregator/build.sbt new file mode 100644 index 0000000..5431a75 --- /dev/null +++ b/source/loggregator/build.sbt @@ -0,0 +1,43 @@ +name := "loggregator" + +version := "0.1" + + +scalaVersion := "2.12.1" + +lazy val root = (project in file(".")). + settings( + name := "loggregator", + version := "0.1", + maintainer := "shiva.ramani@live.com", + mainClass in Compile := Some("com.examples.clicklogger.Loggregator") + ) + +val sparkVersion = "3.2.0" +val hadoopVersion = "3.2.0" + +libraryDependencies ++= Seq( + "org.apache.spark" %% "spark-core" % sparkVersion, + "org.apache.spark" %% "spark-sql" % sparkVersion, + "org.apache.hadoop" % "hadoop-client" % hadoopVersion, + "org.apache.hadoop" % "hadoop-aws" % hadoopVersion, + "org.apache.commons" % "commons-lang3" % "3.10", + "com.amazonaws" % "aws-java-sdk-s3" % "1.12.262", + "io.netty" % "netty-buffer" % "4.1.17.Final" + +) + +val meta = """META.INF(.)*""".r +assemblyMergeStrategy in assembly := { + case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first + case PathList(ps @ _*) if ps.last endsWith ".html" => MergeStrategy.first + case n if n.contains("services") => MergeStrategy.concat + case n if n.startsWith("reference.conf") => MergeStrategy.concat + case n if n.endsWith(".conf") => MergeStrategy.concat + case meta(_) => MergeStrategy.discard + case x => MergeStrategy.first + /*case PathList("META-INF", xs @ _*) => MergeStrategy.discard + case x => MergeStrategy.first*/ +} + +enablePlugins(JavaAppPackaging) \ No newline at end of file diff --git a/source/loggregator/project/build.properties b/source/loggregator/project/build.properties new file mode 100644 index 0000000..95cca09 --- /dev/null +++ b/source/loggregator/project/build.properties @@ -0,0 +1 @@ +sbt.version = 1.3.12 \ No newline at end of file diff --git a/source/loggregator/project/plugins.sbt b/source/loggregator/project/plugins.sbt new file mode 100644 index 0000000..e18d881 --- /dev/null +++ b/source/loggregator/project/plugins.sbt @@ -0,0 +1,4 @@ +resolvers += Resolver.url("sbt-plugin-releases-scala-sbt", url("http://repo.scala-sbt.org/scalasbt/sbt-plugin-releases/")) + +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.9") +addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.7.3") \ No newline at end of file diff --git a/source/loggregator/src/main/resources/application.properties b/source/loggregator/src/main/resources/application.properties new file mode 100644 index 0000000..e69de29 diff --git a/source/loggregator/src/main/scala/com/examples/clicklogger/Loggregator.scala b/source/loggregator/src/main/scala/com/examples/clicklogger/Loggregator.scala new file mode 100644 index 0000000..819c25b --- /dev/null +++ b/source/loggregator/src/main/scala/com/examples/clicklogger/Loggregator.scala @@ -0,0 +1,149 @@ +package com.examples.clicklogger + +import org.apache.spark.{SparkConf, SparkContext, sql} +import org.apache.spark.sql.{DataFrameReader, SQLContext, SparkSession} +import java.lang.Boolean +import java.util +import java.io.IOException +import java.text.Format +import java.text.SimpleDateFormat +import java.util.Date + +import com.amazonaws.AmazonServiceException +import com.amazonaws.SdkClientException +import com.amazonaws.services.s3.AmazonS3 +import com.amazonaws.services.s3.AmazonS3ClientBuilder +import org.apache.commons.lang3.StringUtils + + +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * SPDX-License-Identifier: MIT-0 + */ + +object Loggregator { + + private var sourceBucketName = "" + private var outputBucketName = "" + private var region = "" + + private var configMap = new util.HashMap[String, String] + + def main(args: Array[String]): Unit = { + + if (args.length != 3) { + System.out.println("Invalid no of Arguments!!!") + System.exit(-1) + } + + val date = args(0) + sourceBucketName = args(1) + outputBucketName = args(2) + + System.out.println("Input Date " + date) + region = scala.util.Properties.envOrElse("REGION", "us-east-1") + + System.out.format("values receive bucket_name %s, output_bucket_name %s \n", sourceBucketName, outputBucketName) + + val dataFrame = processFiles(date, sourceBucketName) + System.out.println("") + writeOutputToS3(date, dataFrame, outputBucketName) + + System.out.println("Completed successfully!") + } + + private def writeOutputToS3(date: String, dataFrame: sql.DataFrame, outputBucketName: String): String = { + var outputResponse: String = "|*createdTime*|*callerid*|*component*|*count*\n" + + if (dataFrame != null) { + outputResponse = outputResponse + "|------------|-----------------------|-----------|-------\n" + for (row <- dataFrame.rdd.collect) { + val createdTime = row.mkString(",").split(",")(0) + val callerid = row.mkString(",").split(",")(1) + val component = row.mkString(",").split(",")(2) + val count = row.mkString(",").split(",")(3) + outputResponse = outputResponse + "*" + createdTime + "*|" + callerid + "|" + component + "|" + count + "\n" + } + } + System.out.println("printing output schema from data frame") + System.out.println(outputResponse) + + var f = new SimpleDateFormat("yyyy") + var year = f.format(new Date()) + f = new SimpleDateFormat("MM") + var month = f.format(new Date()) + f = new SimpleDateFormat("dd") + var onlydate = f.format(new Date()) + + // 2020-07-18 + if (date.length == 10) { + val dateArr = date.split("-") + year = dateArr(0) + month = dateArr(1) + onlydate = dateArr(2) + } + + var fileObjKeyName = year + "/" + month + "/" + onlydate + "/" + if (date.equalsIgnoreCase("ALL")) { + fileObjKeyName = "ALL/" + year + "/" + month + "/" + onlydate + "/" + } + val fileName = "response.md" + + System.out.println("fileObjKeyName " + fileObjKeyName + " fileName " + fileName) + try { + val s3Client = AmazonS3ClientBuilder.standard.build + s3Client.putObject(outputBucketName, fileObjKeyName + fileName, outputResponse) + } catch { + case e: AmazonServiceException => + System.out.println(e.getMessage) + case e: SdkClientException => + System.out.println(e.getMessage) + } + + return outputResponse + } + + + def processFiles(date: String, bucket: String): sql.DataFrame = { + System.out.println("processing a date - " + date + " from bucket - " + bucket) + var s3Path = String.format("s3a://%s/clicklog/data=%s/", bucket, date) + var spark = getSparkSession() + val s3FolderDF = spark.read.parquet(s3Path) + s3FolderDF.createOrReplaceTempView("ClickLoggerTable") + return getClickLoggerDataFrame(spark) + } + + def getClickLoggerDataFrame(spark: SparkSession): sql.DataFrame = { + val sql = "select substring(createdTime,0, 10) as createdTime, callerid, component from ClickLoggerTable" + + val clickLoggerDF = spark.sql(sql) + clickLoggerDF.groupBy("createdTime", "callerid", "component").count() + .orderBy("createdTime", "callerid", "component") + .show() + + // DO not printSchema in production + clickLoggerDF.printSchema() + + System.out.println("DataFrame for date completed successfully ---------") + return clickLoggerDF.groupBy("createdTime", "callerid", "component").count() + .orderBy("createdTime", "callerid", "component") + } + + def getSparkSession(): SparkSession = { + System.out.println("starting spark session -------------") + + val sparkConfig = new SparkConf() + //.setMaster("local[*]") + //.setAppName("ClickLogger") + val spark: SparkSession = SparkSession.builder() + .config(conf = sparkConfig) + .getOrCreate() + val sparkContext: SparkContext = spark.sparkContext + val hadoopConf = sparkContext.hadoopConfiguration + hadoopConf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") + hadoopConf.set("fs.s3a.path.style.access", "true") + hadoopConf.set("fs.s3a.endpoint", "s3." + region + ".amazonaws.com") + return spark; + } +} diff --git a/terraform/templates/cloudwatch.tf b/terraform/templates/cloudwatch.tf new file mode 100644 index 0000000..878452e --- /dev/null +++ b/terraform/templates/cloudwatch.tf @@ -0,0 +1,25 @@ +## Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved +## +### SPDX-License-Identifier: MIT-0 + +resource "aws_cloudwatch_log_group" "lambda_click_logger_log_group" { + name = "/aws/lambda/${var.app_prefix}/${aws_lambda_function.lambda_clicklogger_ingest.function_name}" + retention_in_days = 3 + depends_on = [aws_lambda_function.lambda_clicklogger_ingest] +} + +resource "aws_cloudwatch_log_group" "click_logger_firehose_delivery_stream_log_group" { + name = "/aws/kinesis_firehose_delivery_stream/${var.app_prefix}/${var.stage_name}/click_logger_firehose_delivery_stream" + retention_in_days = 3 +} + +resource "aws_cloudwatch_log_stream" "click_logger_firehose_delivery_stream" { + name = "${var.app_prefix}-${var.stage_name}-firehose-delivery-stream" + log_group_name = aws_cloudwatch_log_group.click_logger_firehose_delivery_stream_log_group.name +} + + +resource "aws_cloudwatch_log_group" "lambda_clicklogger_emr_sfn_start_job_log_group" { + name = "/aws/state-machines/${var.app_prefix}/${var.stage_name}" + retention_in_days = 3 +} \ No newline at end of file diff --git a/terraform/templates/configs.tf b/terraform/templates/configs.tf new file mode 100644 index 0000000..9467461 --- /dev/null +++ b/terraform/templates/configs.tf @@ -0,0 +1,26 @@ +## Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved +## +### SPDX-License-Identifier: MIT-0 + +variable "app_prefix" { + description = "Application prefix for the AWS services that are built" + default = "clicklogger" +} + +variable "stage_name" { + default = "dev" +} + +variable "lambda_source_zip_path" { + description = "Java lambda zip" +} + +variable "emr_source_zip_path" { + description = "EMR lambda zip" +} + +variable "loggregator_jar" { + default = "loggregator-0-0.1.jar" +} + + diff --git a/terraform/templates/emr.tf b/terraform/templates/emr.tf new file mode 100644 index 0000000..8e68817 --- /dev/null +++ b/terraform/templates/emr.tf @@ -0,0 +1,55 @@ +## Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved +## +### SPDX-License-Identifier: MIT-0 + +resource "aws_emr_studio" "clicklog_dev_studio" { + auth_mode = "IAM" + default_s3_location = "s3://${aws_s3_bucket.click_logger_emr_studio_bucket.bucket}/clicklogger" + engine_security_group_id = aws_security_group.click_logger_emr_security_group.id + name = "${var.app_prefix}-${var.stage_name}-studio" + service_role = aws_iam_role.emr_studio_role.arn + subnet_ids = [aws_subnet.click_logger_emr_public_subnet1.id] + vpc_id = aws_vpc.click_logger_emr_vpc.id + workspace_security_group_id = aws_security_group.click_logger_emr_security_group.id +} + + +resource "aws_emrserverless_application" "click_log_loggregator_emr_serverless" { + name = "${var.app_prefix}-${var.stage_name}-loggregrator-emr-${data.aws_caller_identity.current.account_id}" + release_label = "emr-6.6.0" + type = "spark" + + initial_capacity { + initial_capacity_type = "Driver" + + initial_capacity_config { + worker_count = 5 + worker_configuration { + cpu = "4 vCPU" + memory = "20 GB" + } + } + } + + initial_capacity { + initial_capacity_type = "Executor" + + initial_capacity_config { + worker_count = 5 + worker_configuration { + cpu = "4 vCPU" + memory = "20 GB" + } + } + } + + maximum_capacity { + cpu = "150 vCPU" + memory = "1000 GB" + } + + tags = { + Name = "EMR Serverless for ClickLogs Aggregation" + Environment = var.stage_name + } +} \ No newline at end of file diff --git a/terraform/templates/environments.tf b/terraform/templates/environments.tf new file mode 100644 index 0000000..14d8aa0 --- /dev/null +++ b/terraform/templates/environments.tf @@ -0,0 +1,6 @@ +## Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved +## +### SPDX-License-Identifier: MIT-0 + +data "aws_caller_identity" "current" { } +data "aws_region" "current" {} \ No newline at end of file diff --git a/terraform/templates/firehose.tf b/terraform/templates/firehose.tf new file mode 100644 index 0000000..083958b --- /dev/null +++ b/terraform/templates/firehose.tf @@ -0,0 +1,54 @@ +## Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved +## +### SPDX-License-Identifier: MIT-0 + +resource "aws_kinesis_firehose_delivery_stream" "click_logger_firehose_delivery_stream" { + name = "${var.app_prefix}-${var.stage_name}-firehose-delivery-stream" + depends_on = [aws_s3_bucket.click_logger_firehose_delivery_s3_bucket] + + destination = "extended_s3" + + extended_s3_configuration { + role_arn = aws_iam_role.click_logger_stream_consumer_firehose_role.arn + bucket_arn = aws_s3_bucket.click_logger_firehose_delivery_s3_bucket.arn + buffer_size = 64 + buffer_interval = 60 + cloudwatch_logging_options { + enabled = true + log_group_name = "/aws/kinesis_firehose_delivery_stream/click_logger_firehose_delivery_stream" + log_stream_name = "${var.app_prefix}-${var.stage_name}_firehose_delivery_stream" + } + compression_format = "UNCOMPRESSED" + prefix = "clicklog/data=!{timestamp:yyyy}-!{timestamp:MM}-!{timestamp:dd}/" + error_output_prefix = "clicklog_error/error=!{firehose:error-output-type}data=!{timestamp:yyyy}-!{timestamp:MM}-!{timestamp:dd}/" + + + data_format_conversion_configuration { + enabled = true + + input_format_configuration { + deserializer { + open_x_json_ser_de { + case_insensitive = true + } + } + } + + output_format_configuration { + serializer { + parquet_ser_de { + compression = "SNAPPY" + } + } + } + + schema_configuration { + database_name = aws_glue_catalog_database.aws_glue_click_logger_database.name + role_arn = aws_iam_role.click_logger_stream_consumer_firehose_role.arn + table_name = aws_glue_catalog_table.aws_glue_click_logger_catalog_table.name + region = data.aws_region.current.name + } + } + + } +} \ No newline at end of file diff --git a/terraform/templates/glue.tf b/terraform/templates/glue.tf new file mode 100644 index 0000000..c76ece6 --- /dev/null +++ b/terraform/templates/glue.tf @@ -0,0 +1,91 @@ +## Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved +## +### SPDX-License-Identifier: MIT-0 + +resource "aws_glue_catalog_database" "aws_glue_click_logger_database" { + name = "${var.app_prefix}${var.stage_name}database" + description = "Click logger Glue database" +} + +resource "aws_glue_catalog_table" "aws_glue_click_logger_catalog_table" { + name = "${var.app_prefix}${var.stage_name}-table" + database_name = "${var.app_prefix}${var.stage_name}database" + depends_on = [aws_glue_catalog_database.aws_glue_click_logger_database, aws_s3_bucket.click_logger_firehose_delivery_s3_bucket] + + table_type = "EXTERNAL_TABLE" + + parameters = { + EXTERNAL = "TRUE" + "parquet.compression" = "SNAPPY" + } + + retention = 0 + + storage_descriptor { + location = aws_s3_bucket.click_logger_firehose_delivery_s3_bucket.arn + input_format = "org.apache.hadoop.mapred.TextInputFormat" + output_format = "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat" + compressed = false + parameters = { + "crawler_schema_serializer_version" = "1.0" + "crawler_schema_deserializer_version" = "1.0" + "compression_type" = "none" + "classification" = "json" + "type_of_data" = "file" + } + ser_de_info { + name = "${var.app_prefix}table" + serialization_library = "org.openx.data.jsonserde.JsonSerDe" + + parameters = { + "serialization.format" = 1 + } + } + + columns { + name = "requestid" + type = "string" + } + + columns { + name = "contextid" + type = "string" + } + + columns { + name = "callerid" + type = "string" + comment = "" + } + + columns { + name = "component" + type = "string" + comment = "" + } + + columns { + name = "action" + type = "string" + comment = "" + } + + columns { + name = "type" + type = "string" + comment = "" + } + + columns { + name = "clientip" + type = "string" + comment = "" + } + + columns { + name = "createdtime" + type = "string" + comment = "" + } + } +} \ No newline at end of file diff --git a/terraform/templates/lambda.tf b/terraform/templates/lambda.tf new file mode 100644 index 0000000..ece6830 --- /dev/null +++ b/terraform/templates/lambda.tf @@ -0,0 +1,72 @@ +## Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved +## +### SPDX-License-Identifier: MIT-0 + +resource "aws_lambda_function" "lambda_clicklogger_ingest" { + filename = var.lambda_source_zip_path + function_name = "${var.app_prefix}-${var.stage_name}-ingestion-lambda" + role = aws_iam_role.click_logger_lambda_role.arn + handler = "com.clicklogs.Handlers.ClickLoggerHandler::handleRequest" + runtime = "java8" + memory_size = 2048 + timeout = 300 + + source_code_hash = filebase64sha256(var.lambda_source_zip_path) + depends_on = [ + aws_iam_role.click_logger_lambda_role, aws_kinesis_firehose_delivery_stream.click_logger_firehose_delivery_stream + ] + + environment { + variables = { + STREAM_NAME = aws_kinesis_firehose_delivery_stream.click_logger_firehose_delivery_stream.name + REGION = data.aws_region.current.name + } + } + + vpc_config { + subnet_ids = [aws_subnet.click_logger_emr_private_subnet1.id] + security_group_ids = [aws_security_group.click_logger_emr_security_group.id] + } +} + +resource "aws_lambda_function" "lambda_clicklogger_emr_start_job" { + description = "Lambda to accept request to submit a job to an EMR Serverless cluster." + filename = var.lambda_source_zip_path + function_name = "${var.app_prefix}-${var.stage_name}-emr-start-job-lambda" + role = aws_iam_role.click_logger_emr_lambda_role.arn + handler = "com.clicklogs.Handlers.ClickLoggerEMRJobHandler::handleRequest" + runtime = "java8" + memory_size = 2048 + timeout = 600 + + source_code_hash = filebase64sha256(var.lambda_source_zip_path) + depends_on = [aws_iam_role.click_logger_emr_lambda_role] + + environment { + variables = { + APPLICATION_NAME = "${var.app_prefix}-${var.stage_name}-emr-serverless-application" + APPLICATION_ID = aws_emrserverless_application.click_log_loggregator_emr_serverless.id + EXECUTION_ROLE_ARN = aws_iam_role.click_logger_emr_serverless_role.arn + ENTRY_POINT = "s3://${aws_s3_bucket.click_log_loggregator_source_s3_bucket.id}/${var.loggregator_jar}" + MAIN_CLASS = "--class com.examples.clicklogger.Loggregator" + OUTPUT_BUCKET = aws_s3_bucket.click_log_loggregator_output_s3_bucket.id + SOURCE_BUCKET = aws_s3_bucket.click_logger_firehose_delivery_s3_bucket.id + LOGS_OUTPUT_PATH = "s3://${aws_s3_bucket.click_log_loggregator_emr_serverless_logs_s3_bucket.id}" + REGION = data.aws_region.current.name + EMR_GET_SLEEP_TIME = 5000 + } + } + + vpc_config { + subnet_ids = [aws_subnet.click_logger_emr_private_subnet1.id] + security_group_ids = [aws_security_group.click_logger_emr_security_group.id] + } +} + +output "lambda-clicklogger-ingest" { + value = aws_lambda_function.lambda_clicklogger_ingest +} + +output "lambda-clicklogger-emr-job" { + value = aws_lambda_function.lambda_clicklogger_emr_start_job +} \ No newline at end of file diff --git a/terraform/templates/policies.tf b/terraform/templates/policies.tf new file mode 100644 index 0000000..07d89de --- /dev/null +++ b/terraform/templates/policies.tf @@ -0,0 +1,38 @@ +## Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved +## +### SPDX-License-Identifier: MIT-0 + +resource "aws_iam_policy" "click_loggerlambda_logging_policy" { + name = "${var.app_prefix}-${var.stage_name}-lambda-logging-policy" + path = "/" + description = "IAM policy for logging from a lambda" + + policy = <