Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove auto-migration logic for subscriptions #64

Merged
merged 4 commits into from
Apr 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import { TokensService } from './tokens/tokens.service';
import { EventStreamProxyGateway } from './eventstream-proxy/eventstream-proxy.gateway';
import { EventStreamReply } from './event-stream/event-stream.interfaces';
import {
TokenApprovalEvent,
TokenBurnEvent,
TokenMintEvent,
TokenPoolEvent,
Expand Down Expand Up @@ -61,6 +62,7 @@ async function bootstrap() {
TokenMintEvent,
TokenBurnEvent,
TokenTransferEvent,
TokenApprovalEvent,
],
});
const config = app.get(ConfigService);
Expand All @@ -84,7 +86,7 @@ async function bootstrap() {
.configure(ethConnectUrl, instancePath, topic, shortPrefix, username, password);

try {
await app.get(TokensService).migrate();
await app.get(TokensService).migrationCheck();
} catch (err) {
// do nothing
}
Expand Down
14 changes: 11 additions & 3 deletions src/tokens/tokens.interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,6 @@ class tokenEventBase {
@ApiProperty()
poolId: string;

@ApiProperty()
type: TokenType;

@ApiProperty()
signer: string;

Expand All @@ -264,6 +261,9 @@ class tokenEventBase {
}

export class TokenPoolEvent extends tokenEventBase {
@ApiProperty()
type: TokenType;

@ApiProperty()
standard: string;
}
Expand All @@ -290,3 +290,11 @@ export class TokenTransferEvent extends tokenEventBase {

export class TokenMintEvent extends OmitType(TokenTransferEvent, ['from']) {}
export class TokenBurnEvent extends OmitType(TokenTransferEvent, ['to']) {}

export class TokenApprovalEvent extends tokenEventBase {
@ApiProperty()
operator: string;

@ApiProperty()
approved: boolean;
}
52 changes: 51 additions & 1 deletion src/tokens/tokens.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,19 @@ import { TokensService } from './tokens.service';

describe('TokensService', () => {
let service: TokensService;
let eventStream: {
addListener: ReturnType<typeof jest.fn>;
getStreams: ReturnType<typeof jest.fn>;
getSubscriptions: ReturnType<typeof jest.fn>;
};

beforeEach(async () => {
eventStream = {
addListener: jest.fn(),
getStreams: jest.fn(),
getSubscriptions: jest.fn(),
};

const module: TestingModule = await Test.createTestingModule({
providers: [
TokensService,
Expand All @@ -33,7 +44,7 @@ describe('TokensService', () => {
},
{
provide: EventStreamService,
useValue: { addListener: jest.fn() },
useValue: eventStream,
},
{
provide: EventStreamProxyGateway,
Expand All @@ -48,4 +59,43 @@ describe('TokensService', () => {
it('should be defined', () => {
expect(service).toBeDefined();
});

describe('Subscription migration', () => {
it('should not migrate if no subscriptions exists', async () => {
service.topic = 'tokens';
service.instancePath = '0x123';
eventStream.getStreams.mockReturnValueOnce([{ name: 'tokens:0x123' }]);
eventStream.getSubscriptions.mockReturnValueOnce([]);
expect(await service.migrationCheck()).toBe(false);
});

it('should not migrate if correct base subscription exists', async () => {
service.topic = 'tokens';
service.instancePath = '0x123';
eventStream.getStreams.mockReturnValueOnce([{ name: 'tokens:0x123' }]);
eventStream.getSubscriptions.mockReturnValueOnce([{ name: 'tokens:0x123:base:TokenCreate' }]);
expect(await service.migrationCheck()).toBe(false);
});

it('should migrate if any event subscriptions are missing', async () => {
service.topic = 'tokens';
service.instancePath = '0x123';
eventStream.getStreams.mockReturnValueOnce([{ name: 'tokens:0x123' }]);
eventStream.getSubscriptions.mockReturnValueOnce([{ name: 'tokens:0x123:p1:TokenCreate' }]);
expect(await service.migrationCheck()).toBe(true);
});

it('should not migrate if all event subscriptions exist', async () => {
service.topic = 'tokens';
service.instancePath = '0x123';
eventStream.getStreams.mockReturnValueOnce([{ name: 'tokens:0x123' }]);
eventStream.getSubscriptions.mockReturnValueOnce([
{ name: 'tokens:0x123:p1:TokenCreate' },
{ name: 'tokens:0x123:p1:TransferSingle' },
{ name: 'tokens:0x123:p1:TransferBatch' },
{ name: 'tokens:0x123:p1:ApprovalForAll' },
]);
expect(await service.migrationCheck()).toBe(false);
});
});
});
108 changes: 69 additions & 39 deletions src/tokens/tokens.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import {
EthConnectAsyncResponse,
EthConnectReturn,
TokenApproval,
TokenApprovalEvent,
TokenBalance,
TokenBalanceQuery,
TokenBurn,
Expand Down Expand Up @@ -139,55 +140,78 @@ export class TokensService {
}

/**
* If there is an existing event stream whose subscriptions don't match the current
* events and naming format, delete the stream so we'll start over.
* This will cause redelivery of all token events, which will poke FireFly to
* (re)activate pools and (re)process all transfers.
* Check for existing event streams and subscriptions that don't match the current
* expected format (ie incorrect names, missing event subscriptions).
*
* TODO: eventually this migration logic can be pruned
* Log a warning if any potential issues are flagged. User may need to delete
* subscriptions manually and reactivate the pool directly.
*/
async migrate() {
async migrationCheck() {
const name = packStreamName(this.topic, this.instancePath);
const streams = await this.eventstream.getStreams();
const existingStream = streams.find(s => s.name === name);
let existingStream = streams.find(s => s.name === name);
if (existingStream === undefined) {
// Look for the old stream name (topic alone)
const oldStream = streams.find(s => s.name === this.topic);
if (oldStream !== undefined) {
this.logger.warn('Old event stream found - deleting and recreating');
await this.eventstream.deleteStream(oldStream.id);
await this.init();
existingStream = streams.find(s => s.name === this.topic);
if (existingStream === undefined) {
return false;
}
return;
this.logger.warn(
`Old event stream found with name ${existingStream.name}. ` +
`The connector will continue to use this stream, but it is recommended ` +
`to create a new stream with the name ${name}.`,
);
}
const subscriptions = await this.eventstream.getSubscriptions();
this.stream = existingStream;

const allSubscriptions = await this.eventstream.getSubscriptions();
const baseSubscription = packSubscriptionName(
this.topic,
this.instancePath,
BASE_SUBSCRIPTION_NAME,
tokenCreateEvent,
);
const streamId = existingStream.id;
const subscriptions = allSubscriptions.filter(
s => s.stream === streamId && s.name !== baseSubscription,
);
if (subscriptions.length === 0) {
return;
return false;
}

const foundEvents = new Set<string>();
for (const sub of subscriptions.filter(s => s.stream === existingStream.id)) {
const foundEvents = new Map<string, string[]>();
for (const sub of subscriptions) {
const parts = unpackSubscriptionName(this.topic, sub.name);
if (parts.event !== undefined && parts.event !== '') {
foundEvents.add(parts.event);
if (parts.poolId === undefined || parts.event === undefined) {
this.logger.warn(
`Non-parseable subscription names found in event stream ${existingStream.name}.` +
`It is recommended to delete all subscriptions and activate all pools again.`,
);
return true;
}
const existing = foundEvents.get(parts.poolId);
if (existing !== undefined) {
existing.push(parts.event);
} else {
foundEvents.set(parts.poolId, [parts.event]);
}
}

if (foundEvents.size === 1 && foundEvents.has(BASE_SUBSCRIPTION_NAME)) {
// Special case - only the base subscription exists (with the correct name),
// but no pools have been activated. This is ok.
return;
}

// Otherwise, expect to have found subscriptions for each of the events.
for (const event of ALL_SUBSCRIBED_EVENTS) {
if (!foundEvents.has(event)) {
this.logger.warn('Incorrect event stream subscriptions found - deleting and recreating');
await this.eventstream.deleteStream(existingStream.id);
await this.init();
return;
// Expect to have found subscriptions for each of the events.
for (const [poolId, events] of foundEvents) {
if (
ALL_SUBSCRIBED_EVENTS.length !== events.length ||
!ALL_SUBSCRIBED_EVENTS.every(event => events.includes(event))
) {
this.logger.warn(
`Event stream subscriptions for pool ${poolId} do not include all expected events ` +
`(${ALL_SUBSCRIBED_EVENTS}). Events may not be properly delivered to this pool. ` +
`It is recommended to delete its subscriptions and activate the pool again.`,
);
return true;
}
}
return false;
}

private postOptions(signer: string, requestId?: string) {
Expand Down Expand Up @@ -395,7 +419,7 @@ class TokenListener implements EventListener {
process(await this.transformTransferSingleEvent(subName, event));
break;
case approvalForAllEventSignature:
process(await this.transformApprovalForAllEvent(subName, event));
process(this.transformApprovalForAllEvent(subName, event));
break;
case transferBatchEventSignature:
for (const msg of await this.transformTransferBatchEvent(subName, event)) {
Expand All @@ -415,19 +439,25 @@ class TokenListener implements EventListener {
const { data } = event;
const unpackedSub = unpackSubscriptionName(this.topic, subName);
const decodedData = decodeHex(event.inputArgs?.data ?? '');

if (unpackedSub.poolId === undefined) {
// should not happen
return undefined;
}

return {
event: 'token-approval',
data: {
data: <TokenApprovalEvent>{
id: `${data.account}:${data.operator}`,
signer: data.account,
operator: data.operator,
location: 'address=' + event.address,
signature: event.signature,
poolId: unpackedSub.poolId,
operator: data.operator,
approved: data.approved,
rawOutput: data,
signer: data.account,
data: decodedData,
timestamp: event.timestamp,
location: 'address=' + event.address,
signature: event.signature,
rawOutput: data,
transaction: {
blockNumber: event.blockNumber,
transactionIndex: event.transactionIndex,
Expand Down