Skip to content

Commit

Permalink
Merge pull request #95 from kaleido-io/ack
Browse files Browse the repository at this point in the history
iInclude the batchNumber if specified in acks, for FFTM/EVMConnect
  • Loading branch information
peterbroadhurst committed Sep 9, 2022
2 parents 5b10833 + da7c475 commit 5b2c2a3
Show file tree
Hide file tree
Showing 6 changed files with 281 additions and 237 deletions.
5 changes: 5 additions & 0 deletions src/event-stream/event-stream.interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ export interface Event {
inputSigner?: string;
}

export interface EventBatch {
batchNumber?: number;
events: Event[];
}

export class EventStreamReplyHeaders {
@ApiProperty()
type: string;
Expand Down
50 changes: 32 additions & 18 deletions src/event-stream/event-stream.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import { IAbiMethod } from '../tokens/tokens.interfaces';
import { basicAuth } from '../utils';
import {
Event,
EventBatch,
EventStream,
EventStreamReply,
EventStreamSubscription,
Expand All @@ -44,7 +45,7 @@ export class EventStreamSocket {
private topic: string,
private username: string,
private password: string,
private handleEvents: (events: Event[]) => void,
private handleEvents: (events: EventBatch) => void,
private handleReceipt: (receipt: EventStreamReply) => void,
) {
this.init();
Expand Down Expand Up @@ -106,28 +107,38 @@ export class EventStreamSocket {
this.ws.send(JSON.stringify(message));
}

ack() {
this.produce({ type: 'ack', topic: this.topic });
ack(batchNumber: number | undefined) {
this.produce({ type: 'ack', topic: this.topic, batchNumber });
}

close() {
this.closeRequested = true;
this.ws.terminate();
}

private handleMessage(message: EventStreamReply | Event[]) {
private handleMessage(message: EventStreamReply | Event[] | EventBatch) {
if (Array.isArray(message)) {
for (const event of message) {
this.logger.log(`Ethconnect '${event.signature}' message: ${JSON.stringify(event.data)}`);
}
this.handleEvents({ events: message });
} else if ('batchNumber' in message && Array.isArray(message.events)) {
for (const event of message.events) {
this.logger.log(
`Ethconnect '${event.signature}' message (batch=${message.batchNumber}): ${JSON.stringify(
event.data,
)}`,
);
}
this.handleEvents(message);
} else {
const replyType = message.headers.type;
const errorMessage = message.errorMessage ?? '';
const reply = message as EventStreamReply;
const replyType = reply.headers.type;
const errorMessage = reply.errorMessage ?? '';
this.logger.log(
`Ethconnect '${replyType}' reply request=${message.headers.requestId} tx=${message.transactionHash} ${errorMessage}`,
`Ethconnect '${replyType}' reply request=${reply.headers.requestId} tx=${reply.transactionHash} ${errorMessage}`,
);
this.handleReceipt(message);
this.handleReceipt(reply);
}
}
}
Expand All @@ -150,7 +161,7 @@ export class EventStreamService {

async getStreams(): Promise<EventStream[]> {
const response = await lastValueFrom(
this.http.get<EventStream[]>(`${this.baseUrl}/eventstreams`, {
this.http.get<EventStream[]>(new URL('/eventstreams', this.baseUrl).href, {
...basicAuth(this.username, this.password),
}),
);
Expand All @@ -175,7 +186,7 @@ export class EventStreamService {
if (stream) {
const patchedStreamRes = await lastValueFrom(
this.http.patch<EventStream>(
`${this.baseUrl}/eventstreams/${stream.id}`,
new URL(`/eventstreams/${stream.id}`, this.baseUrl).href,
{
...streamDetails,
},
Expand All @@ -189,7 +200,7 @@ export class EventStreamService {
}
const newStreamRes = await lastValueFrom(
this.http.post<EventStream>(
`${this.baseUrl}/eventstreams`,
new URL('/eventstreams', this.baseUrl).href,
{
...streamDetails,
},
Expand All @@ -204,15 +215,15 @@ export class EventStreamService {

async deleteStream(id: string) {
await lastValueFrom(
this.http.delete(`${this.baseUrl}/eventstreams/${id}`, {
this.http.delete(new URL(`/eventstreams/${id}`, this.baseUrl).href, {
...basicAuth(this.username, this.password),
}),
);
}

async getSubscriptions(): Promise<EventStreamSubscription[]> {
const response = await lastValueFrom(
this.http.get<EventStreamSubscription[]>(`${this.baseUrl}/subscriptions`, {
this.http.get<EventStreamSubscription[]>(new URL('/subscriptions', this.baseUrl).href, {
...basicAuth(this.username, this.password),
}),
);
Expand All @@ -221,10 +232,13 @@ export class EventStreamService {

async getSubscription(subId: string): Promise<EventStreamSubscription | undefined> {
const response = await lastValueFrom(
this.http.get<EventStreamSubscription>(`${this.baseUrl}/subscriptions/${subId}`, {
validateStatus: status => status < 300 || status === 404,
...basicAuth(this.username, this.password),
}),
this.http.get<EventStreamSubscription>(
new URL(`/subscriptions/${subId}`, this.baseUrl).href,
{
validateStatus: status => status < 300 || status === 404,
...basicAuth(this.username, this.password),
},
),
);
if (response.status === 404) {
return undefined;
Expand Down Expand Up @@ -293,7 +307,7 @@ export class EventStreamService {
connect(
url: string,
topic: string,
handleEvents: (events: Event[]) => void,
handleEvents: (events: EventBatch) => void,
handleReceipt: (receipt: EventStreamReply) => void,
) {
return new EventStreamSocket(
Expand Down
12 changes: 8 additions & 4 deletions src/eventstream-proxy/eventstream-proxy.base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import { Logger } from '@nestjs/common';
import { MessageBody, SubscribeMessage } from '@nestjs/websockets';
import { v4 as uuidv4 } from 'uuid';
import { Event, EventStreamReply } from '../event-stream/event-stream.interfaces';
import { EventBatch, EventStreamReply } from '../event-stream/event-stream.interfaces';
import { EventStreamService, EventStreamSocket } from '../event-stream/event-stream.service';
import {
WebSocketEventsBase,
Expand Down Expand Up @@ -124,9 +124,9 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {
this.eventListeners.push(listener);
}

private async processEvents(events: Event[]) {
private async processEvents(batch: EventBatch) {
const messages: WebSocketMessage[] = [];
for (const event of events) {
for (const event of batch.events) {
this.logger.log(`Proxying event: ${JSON.stringify(event)}`);
const subName = await this.getSubscriptionName(event.subId);
if (subName === undefined) {
Expand All @@ -152,6 +152,7 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {
data: <WebSocketMessageBatchData>{
events: messages,
},
batchNumber: batch.batchNumber,
};
this.awaitingAck.push(message);
this.currentClient?.send(JSON.stringify(message));
Expand Down Expand Up @@ -191,8 +192,11 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {

this.logger.log(`Received ack ${data.id}`);
if (this.socket !== undefined && this.awaitingAck.find(msg => msg.id === data.id)) {
const firstAck = this.awaitingAck.find(msg => msg.id === data.id);
this.awaitingAck = this.awaitingAck.filter(msg => msg.id !== data.id);
this.socket.ack();
if (firstAck) {
this.socket.ack(firstAck.batchNumber);
}
}
}
}
1 change: 1 addition & 0 deletions src/eventstream-proxy/eventstream-proxy.interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export interface EventListener {

export interface WebSocketMessageWithId extends WebSocketMessage {
id: string;
batchNumber: number | undefined;
}

export interface AckMessageData {
Expand Down
6 changes: 3 additions & 3 deletions test/app.e2e-context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { INestApplication, ValidationPipe } from '@nestjs/common';
import { WsAdapter } from '@nestjs/platform-ws';
import { Test, TestingModule } from '@nestjs/testing';
import { AppModule } from '../src/app.module';
import { EventStreamReply, Event } from '../src/event-stream/event-stream.interfaces';
import { EventStreamReply, EventBatch } from '../src/event-stream/event-stream.interfaces';
import { EventStreamService } from '../src/event-stream/event-stream.service';
import { EventStreamProxyGateway } from '../src/eventstream-proxy/eventstream-proxy.gateway';
import { TokensService } from '../src/tokens/tokens.service';
Expand All @@ -25,14 +25,14 @@ export class TestContext {
get: ReturnType<typeof jest.fn>;
post: ReturnType<typeof jest.fn>;
};
eventHandler: (events: Event[]) => void;
eventHandler: (events: EventBatch) => void;
receiptHandler: (receipt: EventStreamReply) => void;

eventstream = {
connect: (
url: string,
topic: string,
handleEvents: (events: Event[]) => void,
handleEvents: (events: EventBatch) => void,
handleReceipt: (receipt: EventStreamReply) => void,
) => {
this.eventHandler = handleEvents;
Expand Down

0 comments on commit 5b2c2a3

Please sign in to comment.