Skip to content

phaniteja5789/Event-Driven-Data-Processing-and-Workflow-Orchestration-on-AWS

Repository files navigation

ETLJob_StreamingJob_Workflow

image

The StepFunction code is attached in both the formats JSON and YAML and present in the files StateMachine.Json as well as StateMachine.YAML under the same repository

This entire workflow has been developed using AWS Step Functions with appropriate permissions and roles

Workflow has been divided into 2 parts
1.) ETL WorkFlow
2.) Streaming WorkFlow

Workflow will be identified based on intrinsic step functions (States.MathRandom(StartValue, EndValue))

Used AWS Services

Step Functions, Lambda, KINESIS, SNS, S3, Glue, IAM

Used Intrinsic Functions in the Step Function

States.Format, States.MathRandom, States.StringToJson, States.JsonToString etc

Used InputPath, Parameters for Input Filteration and Transformation of Input from one form to another form

Used ResultPath, and OutputSelector to filter capture both input and output for the next state.

Input Data given to the State Machine.
The Sample data has been given in the InputDataToStepFunction.txt in the same repository

Role details has been present in the GlueRoleToAccessS3.txt and StepFunctionRoleToAccessAllUsedServices.txt
1.) In GlueRoleToAccessS3.txt, specifying the AWS Glue Job to use S3 service
2.) In StepFunctionRoleToAccessAllUsedServices.txt, specifying the StepFunction to use required service with necessary permission.

Streaming WorkFlow
1.) The Streaming Workflow execution starts with Pass State and passes to the next state to Invoke Lambda Function with Function Name (FetchSubscriptionDataFunction)
2.) In the FetchSubscriptionDataFunction, a layer is created for the usage of the Requests Module and attached to the Lambda Function. In this function with the help of RapidAPI, the data is fetched from an open endpoint regarding the OTT Platform Subscription details for each and every country. But in the code, it will be handled only for a country.
3.) Data will be fetched and returns the output from the LambdaInvoke State will be sent to next state
4.) In Kinesis ListStreams State, with the help of AWS SDK fetching the list of streams present under the account
5.) If the stream that is passed from input is already present in the stream, then write the result of lambda function directly to the Stream
6.) If the stream is not present then create the stream with the input stream name, and write the result of lambda function into the stream
7.) Once the DataRecord is inserted into the DataStream we will be moving to next state
8.) In next state, we are using ListTopics from SNS Service to get list of topics under the account
9.) If the Topic name recieved from the Input is already present then we assume that the Subscriber is already present
10.) If the Topic is not present, then we are creating a topic based on the input, and creating a Subscriber with Email Protocol based on the Input data where we are sending the Subscriber Email address
11.) Once the topic is created subscription is confirmed then we Publish the data into the Topic
12.) In the Data published, we will be sending the ShardId with the Sequence number of the Kinesis Data Stream, where our data is stored.
13.) After that we mark the Streaming workflow as Success with Success State.

ETL Job WorkFlow
1.) The ETL Workflow execution starts with Pass State and passes to the next state to Invoke Lambda Function with Function Name (FetchStockDataFunction)
2.) In the FetchStockDataFunction, a layer is created for the usage of the Requests Module and attached to the Lambda Function. In this function with the help of RapidAPI, the data is fetched from an open endpoint regarding the stock information details for each and every minute.
3.) The Lambda Function returns the result and the data will be passed to the next state
4.) In S3 ListBuckets State, it will lists all the buckets present under the account
5.) If the bucket that is passed as input is already present in the account, then it directly put objects in the Bucket
6.) If the bucket is not present, then it will create the Bucket and put objects in the Bucket
7.) In the next state, it will start the Glue Job Run with the name (StockDataETLJob.py) which is already created under the account in PySpark
8.) Once the Job executes it will create the results under S3 Bucket.

The Execution results for both the workflows are attached below

Streaming WorkFlow Result

image

ETL WorkFlow Result

image

Programming Languages & Technologies used in this project are

Python PySpark AWS