Skip to content

Feature request: Parser integration for Batch Processing #4394

@dreamorosi

Description

@dreamorosi

Use case

As a customer, when working with Batch Processing, I want to have the utility parse and validate payloads before they're passed to my record handler1, so that I can focus on my business logic.

Today, assuming I'm working with SQS - but this applies to all the sources supported by the Batch Processing utility - I need to manually parse/validate payloads against my schemas manually in the body of the record handler like this:

import {
  BatchProcessor,
  EventType,
  processPartialResponse,
} from '@aws-lambda-powertools/batch';
import { parse } from '@aws-lambda-powertools/parser';
import { JSONStringified } from '@aws-lambda-powertools/parser/helpers';
import {
  SqsRecordSchema,
  SqsSchema,
} from '@aws-lambda-powertools/parser/schemas/sqs';
import { z } from 'zod';
import type { SQSHandler, SQSRecord } from 'aws-lambda';

const customSchema = z.object({
  name: z.string(),
  age: z.number(),
});

const SqsExtendedSchema = SqsSchema.extend({
  Records: z.array(
    SqsRecordSchema.extend({
      body: JSONStringified(customSchema),
    })
  ),
});

const processor = new BatchProcessor(EventType.SQS); 

const recordHandler = async (record: SQSRecord) => { 
  const payload = parse(record, undefined, SqsExtendedSchema);
  const { body: { name, age } } = payload; // this is safe to use because it's parsed
};

export const handler: SQSHandler = async (event, context) =>
  processPartialResponse(event, recordHandler, processor, { 
    context,
  });

We could simplify this experience quite a bit by making some assumptions (see below).

Solution/User Experience

Because we always know the EventType at the time of instantiation of the BatchProcessor we can allow customers to simply pass us a schema for the record/item like so:

import {
  BatchProcessor,
  EventType,
  processPartialResponse,
} from '@aws-lambda-powertools/batch';
import { z } from 'zod';
import type { SQSHandler, SQSRecord } from 'aws-lambda';

const customSchema = z.object({
  name: z.string(),
  age: z.number(),
});

const processor = new BatchProcessor(EventType.SQS, customSchema);

const recordHandler = async (
  { body: { name, age } }: SQSRecord & { body: z.infer<typeof customSchema> }
) => { 
   // this is safe to use because it's parsed
 };

 export const handler: SQSHandler = async (event, context) =>
   processPartialResponse(event, recordHandler, processor, { 
     context,
   });

Under the hood, we can then:

  • dynamically require the parse function, JSONStringified helper, and SqsSchema schema
  • extend the SqsSchema with the customer-provided schema
  • call parse(<data>, undefined, <customer-provided-schema>)
  • mark the item as failed if parsing fails, pass it to the record handler if parsing is successful

Open questions:

Note that this will require us to use dynamic imports - this should be fine since the record handler is meant to be async, so that code path is already async. Alternatively, we'll need to make the customer pass us the parser function and the schema already extended, which basically saves one line.

Alternative solutions

Acknowledgment

Future readers

Please react with 👍 and your use case to help us understand customer demand.

Footnotes

  1. A record handler is the function designated to be called by the Batch Processing utility for each item

Metadata

Metadata

Assignees

Labels

discussingThe issue needs to be discussed, elaborated, or refinedfeature-requestThis item refers to a feature request for an existing or new utilityparserThis item relates to the Parser Utility

Type

No type

Projects

Status

Working on it

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions