Skip to content

Commit

Permalink
fix: drain broker in serverless command
Browse files Browse the repository at this point in the history
  • Loading branch information
Roger Torres committed Sep 4, 2023
1 parent 255570a commit 8e669a4
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 34 deletions.
7 changes: 7 additions & 0 deletions libs/eventually-aws/src/lambda/command.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {
CommandHandlerFactory,
app,
broker,
camelize,
client
} from "@rotorsoft/eventually";
Expand Down Expand Up @@ -54,6 +55,12 @@ export const command = async ({
actor
}
);

// TODO: make this optional
// Since we are in a serverless world that won't wait for external async operations to complete,
// we can force a broker drain here, allowing policies and projectors to consume the new events
if (snap?.event) await broker().drain();

return Ok(
snap,
snap?.event?.version ? { ETag: snap?.event?.version } : undefined
Expand Down
71 changes: 38 additions & 33 deletions libs/eventually/src/adapters/InMemoryBroker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,21 @@ const event_handler_types: Array<ArtifactType> = [
* @param timeout lease expiration time (in ms) when polling the store
* @param limit max number of events to drain in each try
* @param delay debounce delay (in ms) to drain
* @param subscribed to subscribe the broker to commit events - set to false when serverless
*/
export const InMemoryBroker = ({
timeout,
limit,
delay
}: {
timeout: number;
limit: number;
delay: number;
export const InMemoryBroker = (options?: {
timeout?: number;
limit?: number;
delay?: number;
subscribed?: boolean;
}): Broker => {
const name = "InMemoryBroker";
const {
timeout = 5000,
limit = 10,
delay = 500,
subscribed = true
} = options ?? {};

// connect private event handlers only
// NOTE: public consumers should be connected by an external broker service
Expand Down Expand Up @@ -58,34 +62,35 @@ export const InMemoryBroker = ({
const __drain = throttle(drainAll, delay);

// subscribe broker to commit events
app().on("commit", async ({ factory, snapshot }) => {
// commits STATE_EVENT - artifact must be configured in app builder
if (snapshot) {
const commit = app().commits.get(factory.name);
if (commit && commit(snapshot)) {
try {
const { id, stream, name, metadata, version } = snapshot.event!;
return await store().commit(
stream,
[
subscribed &&
app().on("commit", async ({ factory, snapshot }) => {
// commits STATE_EVENT - artifact must be configured in app builder
if (snapshot) {
const commit = app().commits.get(factory.name);
if (commit && commit(snapshot)) {
try {
const { id, stream, name, metadata, version } = snapshot.event!;
return await store().commit(
stream,
[
{
name: STATE_EVENT,
data: snapshot.state
}
],
{
name: STATE_EVENT,
data: snapshot.state
}
],
{
correlation: metadata.correlation,
causation: { event: { id, name, stream } }
},
version // IMPORTANT! - state events should be committed right after the snapshot's event
);
} catch (error) {
log().error(error);
correlation: metadata.correlation,
causation: { event: { id, name, stream } }
},
version // IMPORTANT! - state events should be committed right after the snapshot's event
);
} catch (error) {
log().error(error);
}
}
}
}
__drain();
});
__drain();
});

return {
name,
Expand Down
2 changes: 1 addition & 1 deletion libs/eventually/src/ports.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ export const client = port(function client(client?: Client & Disposable) {
* @remarks Global port to internal broker
*/
export const broker = port(function broker(broker?: Broker) {
return broker || InMemoryBroker({ timeout: 5000, limit: 10, delay: 500 });
return broker || InMemoryBroker();
});

/**
Expand Down

0 comments on commit 8e669a4

Please sign in to comment.