Skip to content

Commit

Permalink
Merge pull request #134 from kaleido-io/async-event-enrichment
Browse files Browse the repository at this point in the history
Async event enrichment
  • Loading branch information
matthew1001 committed Apr 28, 2023
2 parents a989283 + 6491552 commit 5d9269a
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 28 deletions.
24 changes: 16 additions & 8 deletions src/eventstream-proxy/eventstream-proxy.base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {

private async processEvents(batch: EventBatch) {
const messages: WebSocketMessage[] = [];
const eventHandlers: Promise<WebSocketMessage | undefined>[] = [];
for (const event of batch.events) {
this.logger.log(`Proxying event: ${JSON.stringify(event)}`);
const subName = await this.getSubscriptionName(newContext(), event.subId);
Expand All @@ -136,15 +137,22 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {
}

for (const listener of this.eventListeners) {
try {
await listener.onEvent(subName, event, (msg: WebSocketMessage | undefined) => {
if (msg !== undefined) {
messages.push(msg);
}
});
} catch (err) {
this.logger.error(`Error processing event: ${err}`);
// Some events require enrichment that could involve a call to the blockchain,
// so we don't want to do those synchronously. Create a promise for each onEvent()
// handler and when they're all complete we'll create the batch message
eventHandlers.push(Promise.resolve(listener.onEvent(subName, event)));
}
}

// Now we need to await the promises in order so the messages stay in order
for (const nextProm of eventHandlers) {
try {
const msg = await nextProm;
if (msg !== undefined) {
messages.push(msg);
}
} catch (err) {
this.logger.error(`Error processing event: ${err}`);
}
}
const message: WebSocketMessageWithId = {
Expand Down
2 changes: 1 addition & 1 deletion src/eventstream-proxy/eventstream-proxy.interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export interface ConnectionListener {
}

export interface EventListener {
onEvent: (subName: string, event: Event, process: EventProcessor) => void | Promise<void>;
onEvent: (subName: string, event: Event) => undefined | Promise<WebSocketMessage | undefined>;
}

export interface WebSocketMessageWithId extends WebSocketMessage {
Expand Down
8 changes: 4 additions & 4 deletions src/tokens/blockchain.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2023 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -103,7 +103,7 @@ export class BlockchainConnectorService {
// Check if retry condition matches the err that's been hit
private matchesRetryCondition(err: any): boolean {
return (
this.retryConfiguration.retryCondition != '' &&
this.retryConfiguration.retryCondition !== '' &&
`${err}`.match(this.retryConfiguration.retryCondition) !== null
);
}
Expand All @@ -127,8 +127,8 @@ export class BlockchainConnectorService {
let retries = 0;
for (
;
this.retryConfiguration.retriesMax == -1 || retries <= this.retryConfiguration.retriesMax;
this.retryConfiguration.retriesMax == -1 || retries++ // Don't inc 'retries' if 'retriesMax' if set to -1 (infinite retries)
this.retryConfiguration.retriesMax === -1 || retries <= this.retryConfiguration.retriesMax;
this.retryConfiguration.retriesMax === -1 || retries++ // Don't inc 'retries' if 'retriesMax' if set to -1 (infinite retries)
) {
try {
return await blockchainFunction();
Expand Down
19 changes: 7 additions & 12 deletions src/tokens/tokens.listener.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2023 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -16,7 +16,7 @@

import { Logger } from '@nestjs/common';
import { Event } from '../event-stream/event-stream.interfaces';
import { EventListener, EventProcessor } from '../eventstream-proxy/eventstream-proxy.interfaces';
import { EventListener } from '../eventstream-proxy/eventstream-proxy.interfaces';
import { WebSocketMessage } from '../websocket-events/websocket-events.base';
import { Context, newContext } from '../request-context/request-context.decorator';
import {
Expand All @@ -41,7 +41,6 @@ import {
unpackSubscriptionName,
validatePoolLocator,
} from './tokens.util';
import { TokensService } from './tokens.service';
import { AbiMapperService } from './abimapper.service';
import { BlockchainConnectorService } from './blockchain.service';
import { TokenURI as ERC721URI } from './erc721';
Expand All @@ -58,21 +57,17 @@ export class TokenListener implements EventListener {

constructor(private mapper: AbiMapperService, private blockchain: BlockchainConnectorService) {}

async onEvent(subName: string, event: Event, process: EventProcessor) {
async onEvent(subName: string, event: Event) {
const signature = this.trimEventSignature(event.signature);
switch (signature) {
case tokenCreateEventSignature:
process(await this.transformTokenPoolCreationEvent(subName, event));
break;
return this.transformTokenPoolCreationEvent(subName, event);
case transferEventSignature:
process(await this.transformTransferEvent(subName, event));
break;
return this.transformTransferEvent(subName, event);
case approvalEventSignature:
process(this.transformApprovalEvent(subName, event));
break;
return this.transformApprovalEvent(subName, event);
case approvalForAllEventSignature:
process(this.transformApprovalForAllEvent(subName, event));
break;
return this.transformApprovalForAllEvent(subName, event);
default:
this.logger.error(`Unknown event signature: ${event.signature}`);
}
Expand Down
4 changes: 2 additions & 2 deletions src/tokens/tokens.service.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2023 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -240,7 +240,7 @@ describe('TokensService', () => {
.useValue(eventstream)
.compile();

let blockchainRetryCfg: RetryConfiguration = {
const blockchainRetryCfg: RetryConfiguration = {
retryBackOffFactor: 2,
retryBackOffLimit: 500,
retryBackOffInitial: 50,
Expand Down
2 changes: 1 addition & 1 deletion test/app.e2e-context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ export class TestContext {
this.app.use(requestIDMiddleware);
await this.app.init();

let blockchainRetryCfg: RetryConfiguration = {
const blockchainRetryCfg: RetryConfiguration = {
retryBackOffFactor: 2,
retryBackOffLimit: 500,
retryBackOffInitial: 50,
Expand Down

0 comments on commit 5d9269a

Please sign in to comment.