Skip to content

Commit

Permalink
Use separate eventstream per namespace
Browse files Browse the repository at this point in the history
Signed-off-by: Nicko Guyer <nicko.guyer@kaleido.io>
  • Loading branch information
nguyer committed Nov 6, 2023
1 parent 6f759a2 commit cff5c71
Show file tree
Hide file tree
Showing 12 changed files with 304 additions and 145 deletions.
33 changes: 33 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Run Tests",
"runtimeExecutable": "npm",
"args": ["run", "test"],
"request": "launch",
"type": "node",
"outputCapture": "std"
},
{
"name": "Run E2E Tests",
"runtimeExecutable": "npm",
"args": ["run", "test:e2e"],
"request": "launch",
"type": "node",
"outputCapture": "std"
},
{
"type": "node",
"request": "launch",
"name": "Launch Program",
"skipFiles": ["<node_internals>/**"],
"program": "${file}",
"preLaunchTask": "tsc: build - tsconfig.json",
"outFiles": ["${workspaceFolder}/dist/**/*.js"]
}
]
}
9 changes: 9 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"solidity.compileUsingRemoteVersion": "v0.6.12+commit.27d51765",
"editor.codeActionsOnSave": {
"source.fixAll.eslint": true
},
"eslint.validate": ["javascript"],
"solidity.defaultCompiler": "remote",
"cSpell.words": ["eventstream", "fftm"]
}
14 changes: 8 additions & 6 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM node:16-alpine3.15 as solidity-builder
FROM node:16-alpine3.15 as solidity-build
RUN apk add python3 alpine-sdk
USER node
WORKDIR /home/node
Expand All @@ -7,7 +7,7 @@ RUN npm install
ADD --chown=node:node ./samples/solidity .
RUN npx hardhat compile

FROM node:16-alpine3.15 as builder
FROM node:16-alpine3.15 as build
WORKDIR /root
ADD package*.json ./
RUN npm install
Expand All @@ -16,13 +16,15 @@ RUN npm run build

FROM node:16-alpine3.15
RUN apk add curl
# We also need to keep copying it to the old location to maintain compatibility with the FireFly CLI
COPY --from=solidity-build --chown=1001:0 /home/node/artifacts/contracts/ERC1155MixedFungible.sol/ERC1155MixedFungible.json /root/contracts/
WORKDIR /app
ADD package*.json ./
RUN npm install --production
COPY --from=solidity-builder /home/node/contracts contracts/source
COPY --from=solidity-builder /home/node/artifacts/contracts/ERC1155MixedFungible.sol contracts
COPY --from=builder /root/dist dist
COPY --from=builder /root/.env /app/.env
COPY --from=solidity-build /home/node/contracts contracts/source
COPY --from=solidity-build /home/node/artifacts/contracts/ERC1155MixedFungible.sol contracts
COPY --from=build /root/dist dist
COPY --from=build /root/.env /app/.env
RUN chgrp -R 0 /app/ \
&& chmod -R g+rwX /app/
USER 1001
Expand Down
60 changes: 32 additions & 28 deletions src/event-stream/event-stream.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2022 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -18,11 +18,11 @@ import { HttpService } from '@nestjs/axios';
import { Injectable, Logger } from '@nestjs/common';
import { AxiosRequestConfig } from 'axios';
import { lastValueFrom } from 'rxjs';
import * as WebSocket from 'ws';
import WebSocket from 'ws';
import { FFRequestIDHeader } from '../request-context/constants';
import { Context, newContext } from '../request-context/request-context.decorator';
import { IAbiMethod } from '../tokens/tokens.interfaces';
import { getHttpRequestOptions, getWebsocketOptions } from '../utils';
import { Context } from '../request-context/request-context.decorator';
import { FFRequestIDHeader } from '../request-context/constants';
import {
Event,
EventBatch,
Expand All @@ -46,6 +46,7 @@ export class EventStreamSocket {
constructor(
private url: string,
private topic: string,
private namespace: string,
private username: string,
private password: string,
private handleEvents: (events: EventBatch) => void,
Expand All @@ -67,7 +68,7 @@ export class EventStreamSocket {
} else {
this.logger.log('Event stream websocket connected');
}
this.produce({ type: 'listen', topic: this.topic });
this.produce({ type: 'listen', topic: `${this.topic}/${this.namespace}` });
this.produce({ type: 'listenreplies' });
this.ping();
})
Expand Down Expand Up @@ -109,7 +110,11 @@ export class EventStreamSocket {
}

ack(batchNumber: number | undefined) {
this.produce({ type: 'ack', topic: this.topic, batchNumber });
this.produce({ type: 'ack', topic: `${this.topic}/${this.namespace}`, batchNumber });
}

nack(batchNumber: number | undefined) {
this.produce({ type: 'nack', topic: `${this.topic}/${this.namespace}`, batchNumber });
}

close() {
Expand Down Expand Up @@ -176,11 +181,12 @@ export class EventStreamService {
return config;
}

async getStreams(): Promise<EventStream[]> {
async getStreams(ctx: Context): Promise<EventStream[]> {
const response = await lastValueFrom(
this.http.get<EventStream[]>(new URL('/eventstreams', this.baseUrl).href, {
...getHttpRequestOptions(this.username, this.password),
}),
this.http.get<EventStream[]>(
new URL('/eventstreams', this.baseUrl).href,
this.requestOptions(ctx),
),
);
return response.data;
}
Expand All @@ -198,7 +204,7 @@ export class EventStreamService {
timestamps: true,
};

const existingStreams = await this.getStreams();
const existingStreams = await this.getStreams(ctx);
const stream = existingStreams.find(s => s.name === streamDetails.name);
if (stream) {
const patchedStreamRes = await lastValueFrom(
Expand All @@ -207,9 +213,7 @@ export class EventStreamService {
{
...streamDetails,
},
{
...this.requestOptions(ctx),
},
this.requestOptions(ctx),
),
);
this.logger.log(`Event stream for ${topic}: ${stream.id}`);
Expand All @@ -221,9 +225,7 @@ export class EventStreamService {
{
...streamDetails,
},
{
...this.requestOptions(ctx),
},
this.requestOptions(ctx),
),
);
this.logger.log(`Event stream for ${topic}: ${newStreamRes.data.id}`);
Expand All @@ -232,17 +234,16 @@ export class EventStreamService {

async deleteStream(ctx: Context, id: string) {
await lastValueFrom(
this.http.delete(new URL(`/eventstreams/${id}`, this.baseUrl).href, {
...this.requestOptions(ctx),
}),
this.http.delete(new URL(`/eventstreams/${id}`, this.baseUrl).href, this.requestOptions(ctx)),
);
}

async getSubscriptions(ctx: Context): Promise<EventStreamSubscription[]> {
const response = await lastValueFrom(
this.http.get<EventStreamSubscription[]>(new URL('/subscriptions', this.baseUrl).href, {
...this.requestOptions(ctx),
}),
this.http.get<EventStreamSubscription[]>(
new URL('/subscriptions', this.baseUrl).href,
this.requestOptions(ctx),
),
);
return response.data;
}
Expand Down Expand Up @@ -275,7 +276,7 @@ export class EventStreamService {
): Promise<EventStreamSubscription> {
const response = await lastValueFrom(
this.http.post<EventStreamSubscription>(
new URL(`/subscriptions`, instancePath).href,
`${instancePath}/subscriptions`,
{
name,
stream: streamId,
Expand All @@ -284,9 +285,7 @@ export class EventStreamService {
address,
methods,
},
{
...this.requestOptions(ctx),
},
this.requestOptions(ctx),
),
);
this.logger.log(`Created subscription ${name}: ${response.data.id}`);
Expand Down Expand Up @@ -337,15 +336,20 @@ export class EventStreamService {
return true;
}

connect(
async connect(
url: string,
topic: string,
namespace: string,
handleEvents: (events: EventBatch) => void,
handleReceipt: (receipt: EventStreamReply) => void,
) {
const name = `${topic}/${namespace}`;
await this.createOrUpdateStream(newContext(), name, topic);

return new EventStreamSocket(
url,
topic,
namespace,
this.username,
this.password,
handleEvents,
Expand Down
Loading

0 comments on commit cff5c71

Please sign in to comment.