Skip to content

Commit

Permalink
feat(pipes-targets): add lambda function
Browse files Browse the repository at this point in the history
  • Loading branch information
WtfJoke committed May 20, 2024
1 parent 465c8ad commit 23db285
Show file tree
Hide file tree
Showing 20 changed files with 33,964 additions and 1 deletion.
50 changes: 49 additions & 1 deletion packages/@aws-cdk/aws-pipes-targets-alpha/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ Pipe targets are the end point of a EventBridge Pipe.
The following targets are supported:

1. `targets.SqsTarget`: [Send event source to a Queue](#amazon-sqs)
2. `targets.SfnStateMachine`: [Invoke a State Machine from an event source](#aws-step-functions)
2. `targets.SfnStateMachine`: [Invoke a State Machine from an event source](#aws-step-functions-state-machine)
3. `targets.LambdaFunction`: [Send event source to a Lambda Function](#aws-lambda-function)

### Amazon SQS

Expand Down Expand Up @@ -122,3 +123,50 @@ const pipe = new pipes.Pipe(this, 'Pipe', {
target: pipeTarget
});
```
### AWS Lambda Function

A Lambda Function can be used as a target for a pipe. The Lambda Function will be invoked with the (enriched/filtered) source payload.

```ts
declare const sourceQueue: sqs.Queue;
declare const targetFunction: lambda.IFunction;

const pipeTarget = new targets.LambdaFunction(targetFunction,{});

const pipe = new pipes.Pipe(this, 'Pipe', {
source: new SomeSource(sourceQueue),
target: pipeTarget
});
```

Specifying the Invocation Type when the target Lambda Function is invoked:

```ts
declare const sourceQueue: sqs.Queue;
declare const targetFunction: lambda.IFunction;

const pipeTarget = new targets.LambdaFunction(targetFunction, {
invocationType: targets.LambdaFunctionInvocationType.FIRE_AND_FORGET,
});

const pipe = new pipes.Pipe(this, 'Pipe', {
source: new SomeSource(sourceQueue),
target: pipeTarget
});
```

The input to the target Lambda Function can be transformed:

```ts
declare const sourceQueue: sqs.Queue;
declare const targetFunction: lambda.IFunction;

const pipeTarget = new targets.LambdaFunction(targetFunction, {
inputTransformation: pipes.InputTransformation.fromObject({ body: "👀" }),
});

const pipe = new pipes.Pipe(this, 'Pipe', {
source: new SomeSource(sourceQueue),
target: pipeTarget
});
```
1 change: 1 addition & 0 deletions packages/@aws-cdk/aws-pipes-targets-alpha/lib/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from './lambda';
export * from './sqs';
export * from './stepfunctions';
79 changes: 79 additions & 0 deletions packages/@aws-cdk/aws-pipes-targets-alpha/lib/lambda.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import { IInputTransformation, IPipe, ITarget, TargetConfig } from '@aws-cdk/aws-pipes-alpha';
import { IRole } from 'aws-cdk-lib/aws-iam';
import * as lambda from 'aws-cdk-lib/aws-lambda';

/**
* Parameters for the LambdaFunction target
*/
export interface LambdaFunctionParameters {

/**
* The input transformation to apply to the message before sending it to the target.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargetparameters.html#cfn-pipes-pipe-pipetargetparameters-inputtemplate
* @default none
*/
readonly inputTransformation?: IInputTransformation;

/**
* Specify whether to invoke the Lambda Function synchronously (`REQUEST_RESPONSE`) or asynchronously (`FIRE_AND_FORGET`).
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargetlambdafunctionparameters.html
* @default LambdaFunctionInvocationType.REQUEST_RESPONSE
*/
readonly invocationType?: LambdaFunctionInvocationType;
}

/**
* InvocationType for invoking the Lambda Function.
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargetlambdafunctionparameters.html
*/
export enum LambdaFunctionInvocationType {
/**
* Invoke Lambda Function asynchronously (`Invoke`). `InvocationType` is set to `Event` on `Invoke`, see https://docs.aws.amazon.com/lambda/latest/api/API_Invoke.html for more details.
*/
FIRE_AND_FORGET = 'FIRE_AND_FORGET',

/**
* Invoke Lambda Function synchronously (`Invoke`) and wait for the response. `InvocationType` is set to `RequestResponse` on `Invoke`, see https://docs.aws.amazon.com/lambda/latest/api/API_Invoke.html for more details.
*/
REQUEST_RESPONSE = 'REQUEST_RESPONSE',
}

/**
* An EventBridge Pipes target that sends messages to an AWS Lambda Function.
*/
export class LambdaFunction implements ITarget {
public readonly targetArn: string;

private readonly lambdaFunction: lambda.IFunction;
private readonly invocationType: LambdaFunctionInvocationType;
private readonly inputTemplate?: IInputTransformation;

constructor(
lambdaFunction: lambda.IFunction,
parameters: LambdaFunctionParameters,
) {
this.lambdaFunction = lambdaFunction;
this.targetArn = lambdaFunction.functionArn;
this.invocationType =
parameters.invocationType ??
LambdaFunctionInvocationType.REQUEST_RESPONSE;
this.inputTemplate = parameters.inputTransformation;
}

grantPush(grantee: IRole): void {
this.lambdaFunction.grantInvoke(grantee);
}

bind(pipe: IPipe): TargetConfig {
return {
targetParameters: {
inputTemplate: this.inputTemplate?.bind(pipe).inputTemplate,
lambdaFunctionParameters: {
invocationType: this.invocationType,
},
},
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import * as cdk from 'aws-cdk-lib';
import * as sqs from 'aws-cdk-lib/aws-sqs';
import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import { Construct } from 'constructs';
import * as pipes from '@aws-cdk/aws-pipes-alpha';
import * as targets from '@aws-cdk/aws-pipes-targets-alpha';
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Jest Snapshot v1, https://goo.gl/fbAQLP

exports[`lambda-function should grant lambda function invoke 1`] = `
{
"MyLambdaPipeRoleEF32F0E5": {
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": "pipes.amazonaws.com",
},
},
],
"Version": "2012-10-17",
},
},
"Type": "AWS::IAM::Role",
},
"MyLambdaServiceRole4539ECB6": {
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com",
},
},
],
"Version": "2012-10-17",
},
"ManagedPolicyArns": [
{
"Fn::Join": [
"",
[
"arn:",
{
"Ref": "AWS::Partition",
},
":iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
],
],
},
],
},
"Type": "AWS::IAM::Role",
},
}
`;
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { LambdaClient, TagResourceCommand } from '@aws-sdk/client-lambda';

exports.handler = async (event: any, context: any) => {
const client = new LambdaClient();
console.log('Received event:', JSON.stringify(event, null, 2));
console.log('Received context:', JSON.stringify(context, null, 2));

const input = {
Resource: context.invokedFunctionArn,
Tags: {
Weather: event[0].body, // event is received in batches, we just take the first message to update the tag. See https://docs.aws.amazon.com/eventbridge/latest/userguide/pipes-targets-specifics.html
},
};
const command = new TagResourceCommand(input);
await client.send(command);
};

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 23db285

Please sign in to comment.