/
orders-processor.ts
100 lines (89 loc) · 2.95 KB
/
orders-processor.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import { RemovalPolicy } from 'aws-cdk-lib';
import { TableV2 } from 'aws-cdk-lib/aws-dynamodb';
import { EventBus, Rule } from 'aws-cdk-lib/aws-events';
import { SfnStateMachine } from 'aws-cdk-lib/aws-events-targets';
import { LogGroup, RetentionDays } from 'aws-cdk-lib/aws-logs';
import { Queue } from 'aws-cdk-lib/aws-sqs';
import {
DefinitionBody,
JsonPath,
LogLevel,
Parallel,
StateMachine,
StateMachineType,
TaskInput,
} from 'aws-cdk-lib/aws-stepfunctions';
import {
DynamoAttributeValue,
DynamoUpdateItem,
SqsSendMessage,
} from 'aws-cdk-lib/aws-stepfunctions-tasks';
import { Construct } from 'constructs';
import { CDC_EVENT, PROJECT_SOURCE, TABLE_PK, TABLE_SK } from '../constants';
interface OrdersProcessorProps {
queue: Queue;
table: TableV2;
}
export class OrdersProcessor extends Construct {
constructor(scope: Construct, id: string, props: OrdersProcessorProps) {
super(scope, id);
const { queue, table } = props;
const bus = EventBus.fromEventBusName(this, 'DefaultBus', 'default');
const parallel = new Parallel(this, 'Parallel Steps');
const adjustInventory = new DynamoUpdateItem(this, 'AdjustInventory', {
expressionAttributeNames: { '#quantity': 'quantity' },
expressionAttributeValues: {
':quantity': DynamoAttributeValue.numberFromString(
JsonPath.format(
'{}',
JsonPath.stringAt('$.detail.data.NewImage.quantity')
)
),
},
conditionExpression: '#quantity >= :quantity',
key: {
[TABLE_PK]: DynamoAttributeValue.fromString('INVENTORY#MACGUFFIN'),
[TABLE_SK]: DynamoAttributeValue.fromString('MODEL#LX'),
},
resultPath: JsonPath.DISCARD,
table,
updateExpression: 'set #quantity = #quantity - :quantity',
});
const enqueuePayment = new SqsSendMessage(this, 'EnqueuePayment', {
messageBody: TaskInput.fromJsonPathAt('$.detail.data.NewImage'),
queue,
});
parallel.branch(adjustInventory);
parallel.branch(enqueuePayment);
const sm = new StateMachine(this, 'OrdersStateMachine', {
definitionBody: DefinitionBody.fromChainable(parallel),
logs: {
destination: new LogGroup(this, 'SMLogs', {
logGroupName: '/aws/vendedlogs/states/OrdersSMLogs',
removalPolicy: RemovalPolicy.DESTROY,
retention: RetentionDays.ONE_DAY,
}),
includeExecutionData: true,
level: LogLevel.ALL,
},
stateMachineName: 'orders-state-machine',
stateMachineType: StateMachineType.EXPRESS,
tracingEnabled: true,
});
new Rule(this, 'OrderStateMachineRule', {
eventBus: bus,
eventPattern: {
source: [PROJECT_SOURCE],
detailType: [CDC_EVENT],
detail: {
data: {
eventType: ['INSERT'],
pk: [{ prefix: 'CUSTOMER#' }],
},
},
},
ruleName: 'OrdersStateMachine',
targets: [new SfnStateMachine(sm)],
});
}
}