Skip to content

Commit

Permalink
Merge pull request #94 from kaleido-io/stream
Browse files Browse the repository at this point in the history
Ensure stream name and topic always match
  • Loading branch information
awrichar committed Aug 23, 2022
2 parents d6eb54d + 0b09d56 commit 5b10833
Show file tree
Hide file tree
Showing 12 changed files with 73 additions and 44 deletions.
3 changes: 2 additions & 1 deletion src/event-stream/event-stream.interfaces.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2021 Kaleido, Inc.
// Copyright © 2022 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -39,6 +39,7 @@ export interface Event {
data: any;
inputMethod?: string;
inputArgs?: Record<string, any>;
inputSigner?: string;
}

export class EventStreamReplyHeaders {
Expand Down
2 changes: 1 addition & 1 deletion src/event-stream/event-stream.module.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2021 Kaleido, Inc.
// Copyright © 2022 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
2 changes: 1 addition & 1 deletion src/event-stream/event-stream.service.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2021 Kaleido, Inc.
// Copyright © 2022 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
2 changes: 1 addition & 1 deletion src/event-stream/event-stream.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2021 Kaleido, Inc.
// Copyright © 2022 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
26 changes: 19 additions & 7 deletions src/eventstream-proxy/eventstream-proxy.base.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2021 Kaleido, Inc.
// Copyright © 2022 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -26,6 +26,7 @@ import {
} from '../websocket-events/websocket-events.base';
import {
AckMessageData,
ConnectionListener,
EventListener,
WebSocketMessageBatchData,
WebSocketMessageWithId,
Expand All @@ -42,7 +43,8 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {
url?: string;
topic?: string;

private listeners: EventListener[] = [];
private connectListeners: ConnectionListener[] = [];
private eventListeners: EventListener[] = [];
private awaitingAck: WebSocketMessageWithId[] = [];
private currentClient: WebSocketEx | undefined;
private subscriptionNames = new Map<string, string>();
Expand All @@ -65,8 +67,14 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {
super.handleConnection(client);
if (this.server.clients.size === 1) {
this.logger.log(`Initializing event stream proxy`);
this.setCurrentClient(client);
this.startListening();
Promise.all(this.connectListeners.map(l => l.onConnect()))
.then(() => {
this.setCurrentClient(client);
this.startListening();
})
.catch(err => {
this.logger.error(`Error initializing event stream proxy: ${err}`);
});
}
}

Expand Down Expand Up @@ -108,8 +116,12 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {
this.currentClient = undefined;
}

addListener(listener: EventListener) {
this.listeners.push(listener);
addConnectionListener(listener: ConnectionListener) {
this.connectListeners.push(listener);
}

addEventListener(listener: EventListener) {
this.eventListeners.push(listener);
}

private async processEvents(events: Event[]) {
Expand All @@ -122,7 +134,7 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {
return;
}

for (const listener of this.listeners) {
for (const listener of this.eventListeners) {
try {
await listener.onEvent(subName, event, (msg: WebSocketMessage | undefined) => {
if (msg !== undefined) {
Expand Down
2 changes: 1 addition & 1 deletion src/eventstream-proxy/eventstream-proxy.gateway.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2021 Kaleido, Inc.
// Copyright © 2022 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
2 changes: 1 addition & 1 deletion src/eventstream-proxy/eventstream-proxy.gateway.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2021 Kaleido, Inc.
// Copyright © 2022 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
7 changes: 5 additions & 2 deletions src/eventstream-proxy/eventstream-proxy.interfaces.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2021 Kaleido, Inc.
// Copyright © 2022 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -14,14 +14,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.

import { ApiProperty } from '@nestjs/swagger';
import { WebSocketMessage } from '../websocket-events/websocket-events.base';
import { Event } from '../event-stream/event-stream.interfaces';

export interface EventProcessor {
(msg: WebSocketMessage | undefined): void;
}

export interface ConnectionListener {
onConnect: () => void | Promise<void>;
}

export interface EventListener {
onEvent: (subName: string, event: Event, process: EventProcessor) => void | Promise<void>;
}
Expand Down
2 changes: 1 addition & 1 deletion src/eventstream-proxy/eventstream-proxy.module.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2021 Kaleido, Inc.
// Copyright © 2022 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
10 changes: 0 additions & 10 deletions src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import { version as API_VERSION } from '../package.json';
import { AppModule } from './app.module';
import { RequestLoggingInterceptor } from './request-logging.interceptor';
import { TokensService } from './tokens/tokens.service';
import { EventStreamProxyGateway } from './eventstream-proxy/eventstream-proxy.gateway';
import { EventStreamReply } from './event-stream/event-stream.interfaces';
import {
TokenApprovalEvent,
Expand Down Expand Up @@ -78,10 +77,7 @@ async function bootstrap() {
const password = config.get<string>('ETHCONNECT_PASSWORD', '');
const contractAddress = config.get<string>('CONTRACT_ADDRESS', '');

const wsUrl = ethConnectUrl.replace('http', 'ws') + '/ws';

app.get(EventStreamService).configure(ethConnectUrl, username, password);
app.get(EventStreamProxyGateway).configure(wsUrl, topic);
app
.get(TokensService)
.configure(
Expand All @@ -94,12 +90,6 @@ async function bootstrap() {
contractAddress,
);

try {
await app.get(TokensService).migrationCheck();
} catch (err) {
this.logger.debug('Subscription checks skipped (ethconnect may not be up)');
}

if (autoInit.toLowerCase() !== 'false') {
await app.get(TokensService).init();
}
Expand Down
54 changes: 36 additions & 18 deletions src/tokens/tokens.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,9 @@ const tokenCreateFunctionName = 'create';
const tokenCreateEvent = 'TokenPoolCreation';
const tokenCreateEventSignatureOld = 'TokenCreate(address,uint256,bytes)';
const tokenCreateEventSignature = 'TokenPoolCreation(address,uint256,bytes)';
const safeTransferFromFunctionName = 'safeTransferFrom';
const transferSingleEvent = 'TransferSingle';
const transferSingleEventSignature = 'TransferSingle(address,address,address,uint256,uint256)';
const transferBatchEvent = 'TransferBatch';
const safeBatchTransferFromFunctionName = 'safeBatchTransferFrom';
const transferBatchEventSignature = 'TransferBatch(address,address,address,uint256[],uint256[])';
const approvalForAllEvent = 'ApprovalForAll';
const approvalForAllEventSignature = 'ApprovalForAll(address,address,bool)';
Expand Down Expand Up @@ -136,26 +134,34 @@ export class TokensService {
this.username = username;
this.password = password;
this.contractAddress = contractAddress.toLowerCase();
this.proxy.addListener(new TokenListener(this));
this.proxy.addConnectionListener(this);
this.proxy.addEventListener(new TokenListener(this));
}

async onConnect() {
const wsUrl = this.baseUrl.replace('http', 'ws') + '/ws';
const stream = await this.getStream();
this.proxy.configure(wsUrl, stream.name);
}

/**
* One-time initialization of event stream and base subscription.
*/
async init() {
this.stream = await this.getStream();
const stream = await this.getStream();

const eventABI = ERC1155MixedFungibleAbi.find(m => m.name === tokenCreateEvent);
const methodABI = ERC1155MixedFungibleAbi.find(m => m.name === tokenCreateFunctionName);

if (eventABI !== undefined && methodABI !== undefined) {
const contractAddress = await this.getContractAddress();
await this.eventstream.getOrCreateSubscription(
this.baseUrl,
eventABI,
this.stream.id,
stream.id,
tokenCreateEvent,
packSubscriptionName(this.instancePath, BASE_SUBSCRIPTION_NAME, tokenCreateEvent),
this.contractAddress,
contractAddress,
[methodABI],
'0',
);
Expand All @@ -174,8 +180,8 @@ export class TokensService {
}),
),
);
this.contractAddress = response.data.address.toLowerCase();
this.logger.debug(`s: ${this.contractAddress}`);
this.contractAddress = '0x' + response.data.address.toLowerCase();
this.logger.debug(`Contract address: ${this.contractAddress}`);
}

return this.contractAddress;
Expand Down Expand Up @@ -216,10 +222,14 @@ export class TokensService {
}

private async getStream() {
if (this.stream === undefined) {
const name = packStreamName(this.topic, this.instancePath);
this.stream = await this.eventstream.createOrUpdateStream(name, this.topic);
const stream = this.stream;
if (stream !== undefined) {
return stream;
}
await this.migrationCheck();
const name = this.stream?.name ?? packStreamName(this.topic, this.instancePath);
this.logger.log('Creating stream with name ' + name);
this.stream = await this.eventstream.createOrUpdateStream(name, name);
return this.stream;
}

Expand All @@ -246,6 +256,7 @@ export class TokensService {
`to create a new stream with the name ${name}.`,
);
}
this.stream = existingStream;
const streamId = existingStream.id;

const allSubscriptions = await this.eventstream.getSubscriptions();
Expand Down Expand Up @@ -405,12 +416,18 @@ export class TokensService {
);
const transferBatchEventABI = ERC1155MixedFungibleAbi.find(m => m.name === transferBatchEvent);
const transferFunctionABIs = ERC1155MixedFungibleAbi.filter(
m => m.name?.toLowerCase().includes('mint') || m.name?.toLowerCase().includes('transfer') || m.name?.toLowerCase().includes('burn'),
m =>
m.name !== undefined &&
(m.name.toLowerCase().includes('mint') ||
m.name.toLowerCase().includes('transfer') ||
m.name.toLowerCase().includes('burn')),
);
const approvalForAllEventABI = ERC1155MixedFungibleAbi.find(
m => m.name === approvalForAllEvent,
);
const approvalFunctionABIs = ERC1155MixedFungibleAbi.filter(m => m.name?.toLowerCase().includes('approval'));
const approvalFunctionABIs = ERC1155MixedFungibleAbi.filter(m =>
m.name?.toLowerCase().includes('approval'),
);

if (
tokenCreateEventABI !== undefined &&
Expand All @@ -419,14 +436,15 @@ export class TokensService {
transferBatchEventABI !== undefined &&
approvalForAllEventABI !== undefined
) {
const contractAddress = await this.getContractAddress();
await Promise.all([
this.eventstream.getOrCreateSubscription(
this.baseUrl,
tokenCreateEventABI,
stream.id,
tokenCreateEvent,
packSubscriptionName(this.instancePath, dto.poolLocator, tokenCreateEvent, dto.poolData),
this.contractAddress,
contractAddress,
[tokenCreateFunctionABI],
poolLocator.blockNumber ?? '0',
),
Expand All @@ -441,7 +459,7 @@ export class TokensService {
transferSingleEvent,
dto.poolData,
),
this.contractAddress,
contractAddress,
transferFunctionABIs,
poolLocator.blockNumber ?? '0',
),
Expand All @@ -456,7 +474,7 @@ export class TokensService {
transferBatchEvent,
dto.poolData,
),
this.contractAddress,
contractAddress,
transferFunctionABIs,
poolLocator.blockNumber ?? '0',
),
Expand All @@ -471,7 +489,7 @@ export class TokensService {
approvalForAllEvent,
dto.poolData,
),
this.contractAddress,
contractAddress,
approvalFunctionABIs,
// Block number is 0 because it is important to receive all approval events,
// so existing approvals will be reflected in the newly created pool
Expand Down Expand Up @@ -624,7 +642,7 @@ class TokenListener implements EventListener {
}

private trimEventSignature(signature: string) {
let firstColon = signature.indexOf(':');
const firstColon = signature.indexOf(':');
if (firstColon > 0) {
return signature.substring(firstColon + 1);
}
Expand Down
5 changes: 5 additions & 0 deletions test/app.e2e-context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ export class TestContext {
this.receiptHandler = handleReceipt;
},

getStreams: jest.fn(),
createOrUpdateStream: jest.fn(),
getSubscription: jest.fn(),
};

Expand All @@ -47,6 +49,9 @@ export class TestContext {
get: jest.fn(),
post: jest.fn(),
};
this.eventstream.getStreams.mockReset().mockReturnValue([]);
this.eventstream.createOrUpdateStream.mockReset().mockReturnValue({ name: TOPIC });

this.eventstream.getSubscription.mockReset();

const moduleFixture: TestingModule = await Test.createTestingModule({
Expand Down

0 comments on commit 5b10833

Please sign in to comment.