diff --git a/libs/eventually-aws/src/lambda/command.ts b/libs/eventually-aws/src/lambda/command.ts index 338338f1..f7adefcc 100644 --- a/libs/eventually-aws/src/lambda/command.ts +++ b/libs/eventually-aws/src/lambda/command.ts @@ -1,6 +1,7 @@ import { CommandHandlerFactory, app, + broker, camelize, client } from "@rotorsoft/eventually"; @@ -54,6 +55,12 @@ export const command = async ({ actor } ); + + // TODO: make this optional + // Since we are in a serverless world that won't wait for external async operations to complete, + // we can force a broker drain here, allowing policies and projectors to consume the new events + if (snap?.event) await broker().drain(); + return Ok( snap, snap?.event?.version ? { ETag: snap?.event?.version } : undefined diff --git a/libs/eventually/src/adapters/InMemoryBroker.ts b/libs/eventually/src/adapters/InMemoryBroker.ts index 7a69dc87..dbe66f57 100644 --- a/libs/eventually/src/adapters/InMemoryBroker.ts +++ b/libs/eventually/src/adapters/InMemoryBroker.ts @@ -16,17 +16,21 @@ const event_handler_types: Array = [ * @param timeout lease expiration time (in ms) when polling the store * @param limit max number of events to drain in each try * @param delay debounce delay (in ms) to drain + * @param subscribed to subscribe the broker to commit events - set to false when serverless */ -export const InMemoryBroker = ({ - timeout, - limit, - delay -}: { - timeout: number; - limit: number; - delay: number; +export const InMemoryBroker = (options?: { + timeout?: number; + limit?: number; + delay?: number; + subscribed?: boolean; }): Broker => { const name = "InMemoryBroker"; + const { + timeout = 5000, + limit = 10, + delay = 500, + subscribed = true + } = options ?? {}; // connect private event handlers only // NOTE: public consumers should be connected by an external broker service @@ -58,34 +62,35 @@ export const InMemoryBroker = ({ const __drain = throttle(drainAll, delay); // subscribe broker to commit events - app().on("commit", async ({ factory, snapshot }) => { - // commits STATE_EVENT - artifact must be configured in app builder - if (snapshot) { - const commit = app().commits.get(factory.name); - if (commit && commit(snapshot)) { - try { - const { id, stream, name, metadata, version } = snapshot.event!; - return await store().commit( - stream, - [ + subscribed && + app().on("commit", async ({ factory, snapshot }) => { + // commits STATE_EVENT - artifact must be configured in app builder + if (snapshot) { + const commit = app().commits.get(factory.name); + if (commit && commit(snapshot)) { + try { + const { id, stream, name, metadata, version } = snapshot.event!; + return await store().commit( + stream, + [ + { + name: STATE_EVENT, + data: snapshot.state + } + ], { - name: STATE_EVENT, - data: snapshot.state - } - ], - { - correlation: metadata.correlation, - causation: { event: { id, name, stream } } - }, - version // IMPORTANT! - state events should be committed right after the snapshot's event - ); - } catch (error) { - log().error(error); + correlation: metadata.correlation, + causation: { event: { id, name, stream } } + }, + version // IMPORTANT! - state events should be committed right after the snapshot's event + ); + } catch (error) { + log().error(error); + } } } - } - __drain(); - }); + __drain(); + }); return { name, diff --git a/libs/eventually/src/ports.ts b/libs/eventually/src/ports.ts index 321e86a4..232df153 100644 --- a/libs/eventually/src/ports.ts +++ b/libs/eventually/src/ports.ts @@ -111,7 +111,7 @@ export const client = port(function client(client?: Client & Disposable) { * @remarks Global port to internal broker */ export const broker = port(function broker(broker?: Broker) { - return broker || InMemoryBroker({ timeout: 5000, limit: 10, delay: 500 }); + return broker || InMemoryBroker(); }); /**