Skip to content

Commit

Permalink
Async event enrichment
Browse files Browse the repository at this point in the history
Signed-off-by: Matthew Whitehead <matthew1001@gmail.com>
  • Loading branch information
matthew1001 committed Apr 27, 2023
1 parent 1cb1d6c commit e8cf9b4
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 19 deletions.
27 changes: 18 additions & 9 deletions src/eventstream-proxy/eventstream-proxy.base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {

private async processEvents(batch: EventBatch) {
const messages: WebSocketMessage[] = [];
const eventHandlers: Promise<WebSocketMessage | undefined>[] = [];
for (const event of batch.events) {
this.logger.log(`Proxying event: ${JSON.stringify(event)}`);
const subName = await this.getSubscriptionName(newContext(), event.subId);
Expand All @@ -136,17 +137,25 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {
}

for (const listener of this.eventListeners) {
try {
await listener.onEvent(subName, event, (msg: WebSocketMessage | undefined) => {
if (msg !== undefined) {
messages.push(msg);
}
});
} catch (err) {
this.logger.error(`Error processing event: ${err}`);
}
// Some events require enrichment that could involve a call to the blockchain,
// so we don't want to do those synchronously. Create a promise for each onEvent()
// handler and when they're all complete we'll create the batch message
eventHandlers.push(Promise.resolve(listener.onEvent(subName, event)));
}
}

// Now we need to await the promises in order so the messages stay in order
for (const nextProm of eventHandlers) {
await nextProm
.then((msg: WebSocketMessage | undefined) => {
if (msg !== undefined) {
messages.push(msg);
}
})
.catch(err => {
this.logger.error(`Error processing event: ${err}`);
});
}
const message: WebSocketMessageWithId = {
id: uuidv4(),
event: 'batch',
Expand Down
2 changes: 1 addition & 1 deletion src/eventstream-proxy/eventstream-proxy.interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export interface ConnectionListener {
}

export interface EventListener {
onEvent: (subName: string, event: Event, process: EventProcessor) => void | Promise<void>;
onEvent: (subName: string, event: Event) => undefined | Promise<WebSocketMessage | undefined>;
}

export interface WebSocketMessageWithId extends WebSocketMessage {
Expand Down
14 changes: 5 additions & 9 deletions src/tokens/tokens.listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,21 +62,17 @@ export class TokenListener implements EventListener {
private blockchain: BlockchainConnectorService,
) {}

async onEvent(subName: string, event: Event, process: EventProcessor) {
async onEvent(subName: string, event: Event) {
const signature = this.trimEventSignature(event.signature);
switch (signature) {
case tokenCreateEventSignature:
process(await this.transformTokenPoolCreationEvent(event));
break;
return await this.transformTokenPoolCreationEvent(event);
case transferEventSignature:
process(await this.transformTransferEvent(subName, event));
break;
return await this.transformTransferEvent(subName, event);
case approvalEventSignature:
process(this.transformApprovalEvent(subName, event));
break;
return this.transformApprovalEvent(subName, event);
case approvalForAllEventSignature:
process(this.transformApprovalForAllEvent(subName, event));
break;
return this.transformApprovalForAllEvent(subName, event);
default:
this.logger.error(`Unknown event signature: ${event.signature}`);
}
Expand Down

0 comments on commit e8cf9b4

Please sign in to comment.