Skip to content

Commit

Permalink
Merge pull request #64 from kaleido-io/subs
Browse files Browse the repository at this point in the history
Remove auto-migration logic for subscriptions
  • Loading branch information
peterbroadhurst committed Apr 1, 2022
2 parents 80492e9 + 8a6577a commit 58697d4
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 44 deletions.
4 changes: 3 additions & 1 deletion src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import { TokensService } from './tokens/tokens.service';
import { EventStreamProxyGateway } from './eventstream-proxy/eventstream-proxy.gateway';
import { EventStreamReply } from './event-stream/event-stream.interfaces';
import {
TokenApprovalEvent,
TokenBurnEvent,
TokenMintEvent,
TokenPoolEvent,
Expand Down Expand Up @@ -61,6 +62,7 @@ async function bootstrap() {
TokenMintEvent,
TokenBurnEvent,
TokenTransferEvent,
TokenApprovalEvent,
],
});
const config = app.get(ConfigService);
Expand All @@ -84,7 +86,7 @@ async function bootstrap() {
.configure(ethConnectUrl, instancePath, topic, shortPrefix, username, password);

try {
await app.get(TokensService).migrate();
await app.get(TokensService).migrationCheck();
} catch (err) {
// do nothing
}
Expand Down
14 changes: 11 additions & 3 deletions src/tokens/tokens.interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,6 @@ class tokenEventBase {
@ApiProperty()
poolId: string;

@ApiProperty()
type: TokenType;

@ApiProperty()
signer: string;

Expand All @@ -264,6 +261,9 @@ class tokenEventBase {
}

export class TokenPoolEvent extends tokenEventBase {
@ApiProperty()
type: TokenType;

@ApiProperty()
standard: string;
}
Expand All @@ -290,3 +290,11 @@ export class TokenTransferEvent extends tokenEventBase {

export class TokenMintEvent extends OmitType(TokenTransferEvent, ['from']) {}
export class TokenBurnEvent extends OmitType(TokenTransferEvent, ['to']) {}

export class TokenApprovalEvent extends tokenEventBase {
@ApiProperty()
operator: string;

@ApiProperty()
approved: boolean;
}
52 changes: 51 additions & 1 deletion src/tokens/tokens.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,19 @@ import { TokensService } from './tokens.service';

describe('TokensService', () => {
let service: TokensService;
let eventStream: {
addListener: ReturnType<typeof jest.fn>;
getStreams: ReturnType<typeof jest.fn>;
getSubscriptions: ReturnType<typeof jest.fn>;
};

beforeEach(async () => {
eventStream = {
addListener: jest.fn(),
getStreams: jest.fn(),
getSubscriptions: jest.fn(),
};

const module: TestingModule = await Test.createTestingModule({
providers: [
TokensService,
Expand All @@ -33,7 +44,7 @@ describe('TokensService', () => {
},
{
provide: EventStreamService,
useValue: { addListener: jest.fn() },
useValue: eventStream,
},
{
provide: EventStreamProxyGateway,
Expand All @@ -48,4 +59,43 @@ describe('TokensService', () => {
it('should be defined', () => {
expect(service).toBeDefined();
});

describe('Subscription migration', () => {
it('should not migrate if no subscriptions exists', async () => {
service.topic = 'tokens';
service.instancePath = '0x123';
eventStream.getStreams.mockReturnValueOnce([{ name: 'tokens:0x123' }]);
eventStream.getSubscriptions.mockReturnValueOnce([]);
expect(await service.migrationCheck()).toBe(false);
});

it('should not migrate if correct base subscription exists', async () => {
service.topic = 'tokens';
service.instancePath = '0x123';
eventStream.getStreams.mockReturnValueOnce([{ name: 'tokens:0x123' }]);
eventStream.getSubscriptions.mockReturnValueOnce([{ name: 'tokens:0x123:base:TokenCreate' }]);
expect(await service.migrationCheck()).toBe(false);
});

it('should migrate if any event subscriptions are missing', async () => {
service.topic = 'tokens';
service.instancePath = '0x123';
eventStream.getStreams.mockReturnValueOnce([{ name: 'tokens:0x123' }]);
eventStream.getSubscriptions.mockReturnValueOnce([{ name: 'tokens:0x123:p1:TokenCreate' }]);
expect(await service.migrationCheck()).toBe(true);
});

it('should not migrate if all event subscriptions exist', async () => {
service.topic = 'tokens';
service.instancePath = '0x123';
eventStream.getStreams.mockReturnValueOnce([{ name: 'tokens:0x123' }]);
eventStream.getSubscriptions.mockReturnValueOnce([
{ name: 'tokens:0x123:p1:TokenCreate' },
{ name: 'tokens:0x123:p1:TransferSingle' },
{ name: 'tokens:0x123:p1:TransferBatch' },
{ name: 'tokens:0x123:p1:ApprovalForAll' },
]);
expect(await service.migrationCheck()).toBe(false);
});
});
});
108 changes: 69 additions & 39 deletions src/tokens/tokens.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import {
EthConnectAsyncResponse,
EthConnectReturn,
TokenApproval,
TokenApprovalEvent,
TokenBalance,
TokenBalanceQuery,
TokenBurn,
Expand Down Expand Up @@ -139,55 +140,78 @@ export class TokensService {
}

/**
* If there is an existing event stream whose subscriptions don't match the current
* events and naming format, delete the stream so we'll start over.
* This will cause redelivery of all token events, which will poke FireFly to
* (re)activate pools and (re)process all transfers.
* Check for existing event streams and subscriptions that don't match the current
* expected format (ie incorrect names, missing event subscriptions).
*
* TODO: eventually this migration logic can be pruned
* Log a warning if any potential issues are flagged. User may need to delete
* subscriptions manually and reactivate the pool directly.
*/
async migrate() {
async migrationCheck() {
const name = packStreamName(this.topic, this.instancePath);
const streams = await this.eventstream.getStreams();
const existingStream = streams.find(s => s.name === name);
let existingStream = streams.find(s => s.name === name);
if (existingStream === undefined) {
// Look for the old stream name (topic alone)
const oldStream = streams.find(s => s.name === this.topic);
if (oldStream !== undefined) {
this.logger.warn('Old event stream found - deleting and recreating');
await this.eventstream.deleteStream(oldStream.id);
await this.init();
existingStream = streams.find(s => s.name === this.topic);
if (existingStream === undefined) {
return false;
}
return;
this.logger.warn(
`Old event stream found with name ${existingStream.name}. ` +
`The connector will continue to use this stream, but it is recommended ` +
`to create a new stream with the name ${name}.`,
);
}
const subscriptions = await this.eventstream.getSubscriptions();
this.stream = existingStream;

const allSubscriptions = await this.eventstream.getSubscriptions();
const baseSubscription = packSubscriptionName(
this.topic,
this.instancePath,
BASE_SUBSCRIPTION_NAME,
tokenCreateEvent,
);
const streamId = existingStream.id;
const subscriptions = allSubscriptions.filter(
s => s.stream === streamId && s.name !== baseSubscription,
);
if (subscriptions.length === 0) {
return;
return false;
}

const foundEvents = new Set<string>();
for (const sub of subscriptions.filter(s => s.stream === existingStream.id)) {
const foundEvents = new Map<string, string[]>();
for (const sub of subscriptions) {
const parts = unpackSubscriptionName(this.topic, sub.name);
if (parts.event !== undefined && parts.event !== '') {
foundEvents.add(parts.event);
if (parts.poolId === undefined || parts.event === undefined) {
this.logger.warn(
`Non-parseable subscription names found in event stream ${existingStream.name}.` +
`It is recommended to delete all subscriptions and activate all pools again.`,
);
return true;
}
const existing = foundEvents.get(parts.poolId);
if (existing !== undefined) {
existing.push(parts.event);
} else {
foundEvents.set(parts.poolId, [parts.event]);
}
}

if (foundEvents.size === 1 && foundEvents.has(BASE_SUBSCRIPTION_NAME)) {
// Special case - only the base subscription exists (with the correct name),
// but no pools have been activated. This is ok.
return;
}

// Otherwise, expect to have found subscriptions for each of the events.
for (const event of ALL_SUBSCRIBED_EVENTS) {
if (!foundEvents.has(event)) {
this.logger.warn('Incorrect event stream subscriptions found - deleting and recreating');
await this.eventstream.deleteStream(existingStream.id);
await this.init();
return;
// Expect to have found subscriptions for each of the events.
for (const [poolId, events] of foundEvents) {
if (
ALL_SUBSCRIBED_EVENTS.length !== events.length ||
!ALL_SUBSCRIBED_EVENTS.every(event => events.includes(event))
) {
this.logger.warn(
`Event stream subscriptions for pool ${poolId} do not include all expected events ` +
`(${ALL_SUBSCRIBED_EVENTS}). Events may not be properly delivered to this pool. ` +
`It is recommended to delete its subscriptions and activate the pool again.`,
);
return true;
}
}
return false;
}

private postOptions(signer: string, requestId?: string) {
Expand Down Expand Up @@ -395,7 +419,7 @@ class TokenListener implements EventListener {
process(await this.transformTransferSingleEvent(subName, event));
break;
case approvalForAllEventSignature:
process(await this.transformApprovalForAllEvent(subName, event));
process(this.transformApprovalForAllEvent(subName, event));
break;
case transferBatchEventSignature:
for (const msg of await this.transformTransferBatchEvent(subName, event)) {
Expand All @@ -415,19 +439,25 @@ class TokenListener implements EventListener {
const { data } = event;
const unpackedSub = unpackSubscriptionName(this.topic, subName);
const decodedData = decodeHex(event.inputArgs?.data ?? '');

if (unpackedSub.poolId === undefined) {
// should not happen
return undefined;
}

return {
event: 'token-approval',
data: {
data: <TokenApprovalEvent>{
id: `${data.account}:${data.operator}`,
signer: data.account,
operator: data.operator,
location: 'address=' + event.address,
signature: event.signature,
poolId: unpackedSub.poolId,
operator: data.operator,
approved: data.approved,
rawOutput: data,
signer: data.account,
data: decodedData,
timestamp: event.timestamp,
location: 'address=' + event.address,
signature: event.signature,
rawOutput: data,
transaction: {
blockNumber: event.blockNumber,
transactionIndex: event.transactionIndex,
Expand Down

0 comments on commit 58697d4

Please sign in to comment.