Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions infrastructure/modules/eventpub/lambda/eventpub/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ function validateEvent(event) {
'source',
'specversion',
'type',
'plane',
'subject',
'time',
'datacontenttype',
Expand Down Expand Up @@ -49,10 +50,10 @@ function validateEvent(event) {
}

async function sendToEventBridge(events, eventBusArn) {
// console.info(`Sending ${events.length} events to EventBridge: ${eventBusArn}`);
const failedEvents = [];

const failedEvents = [];
for (let i = 0; i < events.length; i += EVENTBRIDGE_MAX_BATCH_SIZE) {
for (let i = 0; i < events.length; i += EVENTBRIDGE_MAX_BATCH_SIZE) {
console.debug(`Sending ${events.length} events to EventBridge: ${eventBusArn}`);
const batch = events.slice(i, i + EVENTBRIDGE_MAX_BATCH_SIZE);
const entries = batch.map(event => ({
Source: 'custom.event',
Expand All @@ -64,7 +65,7 @@ async function sendToEventBridge(events, eventBusArn) {
let attempts = 0;
while (attempts < MAX_RETRIES) {
try {
// console.info(`Attempt ${attempts + 1}: Sending batch of ${entries.length} events.`);
console.debug(`Attempt ${attempts + 1}: Sending batch of ${entries.length} events.`);

const response = await eventBridge.send(new PutEventsCommand({ Entries: entries }));
response.FailedEntryCount && response.Entries.forEach((entry, idx) => {
Expand Down Expand Up @@ -92,15 +93,14 @@ async function sendToEventBridge(events, eventBusArn) {
}

async function sendToDLQ(events) {
console.warn(`Sending ${events.length} failed events to DLQ`);

for (const event of events) {
for (const event of events) {
console.warn(`Sending ${events.length} failed events to DLQ`);
await sqs.send(new SendMessageCommand({ QueueUrl: DLQ_URL, MessageBody: JSON.stringify(event) }));
}
}

exports.handler = async (snsEvent) => {
// console.info(`Received SNS event with ${snsEvent.Records.length} records.`);
console.debug(`Received SNS event with ${snsEvent.Records.length} records.`);

if (THROTTLE_DELAY_MS > 0) {
console.info(`Throttling enabled. Delaying processing by ${THROTTLE_DELAY_MS}ms`);
Expand All @@ -111,14 +111,14 @@ exports.handler = async (snsEvent) => {
const validEvents = records.filter(validateEvent);
const invalidEvents = records.filter(event => !validateEvent(event));

// console.info(`Valid events: ${validEvents.length}, Invalid events: ${invalidEvents.length}`);
console.debug(`Valid events: ${validEvents.length}, Invalid events: ${invalidEvents.length}`);

if (invalidEvents.length) await sendToDLQ(invalidEvents);

const dataEvents = validEvents.filter(event => event.type === 'data');
const controlEvents = validEvents.filter(event => event.type === 'control');
const dataEvents = validEvents.filter(event => event.plane === 'data');
const controlEvents = validEvents.filter(event => event.plane === 'control');

// console.info(`Data events: ${dataEvents.length}, Control events: ${controlEvents.length}`);
console.debug(`Data events: ${dataEvents.length}, Control events: ${controlEvents.length}`);

const failedDataEvents = await sendToEventBridge(dataEvents, DATA_PLANE_EVENT_BUS_ARN);
const failedControlEvents = await sendToEventBridge(controlEvents, CONTROL_PLANE_EVENT_BUS_ARN);
Expand Down
Loading