1- import { Policy , type Capability } from "@alchemy.run/effect" ;
1+ import { $ , Policy } from "@alchemy.run/effect" ;
22import type {
33 Context as LambdaContext ,
44 SQSBatchResponse ,
@@ -9,58 +9,48 @@ import * as S from "effect/Schema";
99import * as SQS from "../sqs/index.ts" ;
1010import * as Lambda from "./function.ts" ;
1111
12- export interface ConsumeProps < Q extends SQS . Queue , Req >
13- extends Lambda . FunctionProps {
14- queue : Q ;
15- bindings : Policy < Extract < Req , Capability > > ;
16- }
17-
18- export function consume < Q extends SQS . Queue , ID extends string , Req > (
19- id : ID ,
20- props : ConsumeProps < Q , Req > ,
21- handle : (
22- event : SQS . QueueEvent < Q [ "props" ] [ "schema" ] [ "Type" ] > ,
23- context : LambdaContext ,
24- ) => Effect . Effect < SQSBatchResponse | void , never , Req > ,
25- ) {
26- const { queue, bindings } = props ;
27- const schema = queue . props . schema ;
28- return Lambda . Function (
29- id ,
30- {
31- ...props ,
32- bindings : bindings . and ( SQS . Consume ( queue ) ) ,
12+ export const consume =
13+ < Q extends SQS . Queue , ID extends string , Req > (
14+ id : ID ,
15+ { queue, handle } : {
16+ queue : Q ;
17+ handle : (
18+ event : SQS . QueueEvent < Q [ "props" ] [ "schema" ] [ "Type" ] > ,
19+ context : LambdaContext ,
20+ ) => Effect . Effect < SQSBatchResponse | void , never , Req > ;
3321 } ,
34- Effect . fn ( function * ( event : SQSEvent , context : LambdaContext ) {
35- yield * Policy . declare < SQS . Consume < Q > > ( ) ;
36- const records = yield * Effect . all (
37- event . Records . map (
38- Effect . fn ( function * ( record ) {
39- return {
40- ...record ,
41- body : yield * S . validate ( schema ) ( record . body ) . pipe (
42- Effect . catchAll ( ( ) => Effect . void ) ,
43- ) ,
44- } ;
45- } ) ,
46- ) ,
47- ) ;
48- const response = yield * handle (
49- {
50- Records : records . filter ( ( record ) => record . body !== undefined ) ,
51- } ,
52- context ,
53- ) ;
54- return {
55- batchItemFailures : [
56- ...( response ?. batchItemFailures ?? [ ] ) ,
57- ...records
58- . filter ( ( record ) => record . body === undefined )
59- . map ( ( failed ) => ( {
60- itemIdentifier : failed . messageId ,
61- } ) ) ,
62- ] ,
63- } satisfies SQSBatchResponse ;
64- } ) ,
65- ) ;
66- }
22+ ) =>
23+ < const Props extends Lambda . FunctionProps < Req > > ( props : Props ) =>
24+ Lambda . Function ( id , {
25+ handle : Effect . fn ( function * ( event : SQSEvent , context : LambdaContext ) {
26+ yield * Policy . declare < SQS . Consume < Q > > ( ) ;
27+ const records = yield * Effect . all (
28+ event . Records . map (
29+ Effect . fn ( function * ( record ) {
30+ return {
31+ ...record ,
32+ body : yield * S . validate ( queue . props . schema ) ( record . body ) . pipe (
33+ Effect . catchAll ( ( ) => Effect . void ) ,
34+ ) ,
35+ } ;
36+ } ) ,
37+ ) ,
38+ ) ;
39+ const response = yield * handle (
40+ {
41+ Records : records . filter ( ( record ) => record . body !== undefined ) ,
42+ } ,
43+ context ,
44+ ) ;
45+ return {
46+ batchItemFailures : [
47+ ...( response ?. batchItemFailures ?? [ ] ) ,
48+ ...records
49+ . filter ( ( record ) => record . body === undefined )
50+ . map ( ( failed ) => ( {
51+ itemIdentifier : failed . messageId ,
52+ } ) ) ,
53+ ] ,
54+ } satisfies SQSBatchResponse ;
55+ } ) ,
56+ } ) ( { ...props , bindings : $ ( SQS . Consume ( queue ) ) } ) ;
0 commit comments