Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove auto-migration logic for subscriptions #64

Merged
merged 4 commits into from
Apr 1, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ async function bootstrap() {
.configure(ethConnectUrl, instancePath, topic, shortPrefix, username, password);

try {
await app.get(TokensService).migrate();
await app.get(TokensService).migrationCheck();
} catch (err) {
// do nothing
}
Expand Down
39 changes: 38 additions & 1 deletion src/tokens/tokens.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,19 @@ import { TokensService } from './tokens.service';

describe('TokensService', () => {
let service: TokensService;
let eventStream: {
addListener: ReturnType<typeof jest.fn>;
getStreams: ReturnType<typeof jest.fn>;
getSubscriptions: ReturnType<typeof jest.fn>;
};

beforeEach(async () => {
eventStream = {
addListener: jest.fn(),
getStreams: jest.fn(),
getSubscriptions: jest.fn(),
};

const module: TestingModule = await Test.createTestingModule({
providers: [
TokensService,
Expand All @@ -33,7 +44,7 @@ describe('TokensService', () => {
},
{
provide: EventStreamService,
useValue: { addListener: jest.fn() },
useValue: eventStream,
},
{
provide: EventStreamProxyGateway,
Expand All @@ -48,4 +59,30 @@ describe('TokensService', () => {
it('should be defined', () => {
expect(service).toBeDefined();
});

describe('Subscription migration', () => {
it('should not migrate if no subscriptions exists', async () => {
service.topic = 'tokens';
service.instancePath = '0x123';
eventStream.getStreams.mockReturnValueOnce([{ name: 'tokens:0x123' }]);
eventStream.getSubscriptions.mockReturnValueOnce([]);
expect(await service.migrationCheck()).toBe(false);
});

it('should not migrate if correct base subscription exists', async () => {
service.topic = 'tokens';
service.instancePath = '0x123';
eventStream.getStreams.mockReturnValueOnce([{ name: 'tokens:0x123' }]);
eventStream.getSubscriptions.mockReturnValueOnce([{ name: 'tokens:0x123:base:TokenCreate' }]);
expect(await service.migrationCheck()).toBe(false);
});

it('should migrate if event subscriptions are missing', async () => {
service.topic = 'tokens';
service.instancePath = '0x123';
eventStream.getStreams.mockReturnValueOnce([{ name: 'tokens:0x123' }]);
eventStream.getSubscriptions.mockReturnValueOnce([{ name: 'tokens:0x123:p1:TokenCreate' }]);
expect(await service.migrationCheck()).toBe(true);
});
});
});
56 changes: 29 additions & 27 deletions src/tokens/tokens.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,55 +139,57 @@ export class TokensService {
}

/**
* If there is an existing event stream whose subscriptions don't match the current
* events and naming format, delete the stream so we'll start over.
* This will cause redelivery of all token events, which will poke FireFly to
* (re)activate pools and (re)process all transfers.
* Check for existing event streams and subscriptions that don't match the current
* expected format (ie incorrect names, missing event subscriptions).
*
* TODO: eventually this migration logic can be pruned
* Log a warning if any potential issues are flagged. User may need to delete
* subscriptions manually and reactivate the pool directly.
*/
async migrate() {
async migrationCheck() {
const name = packStreamName(this.topic, this.instancePath);
const streams = await this.eventstream.getStreams();
const existingStream = streams.find(s => s.name === name);
if (existingStream === undefined) {
// Look for the old stream name (topic alone)
const oldStream = streams.find(s => s.name === this.topic);
if (oldStream !== undefined) {
this.logger.warn('Old event stream found - deleting and recreating');
await this.eventstream.deleteStream(oldStream.id);
await this.init();
this.logger.warn('Old event stream found - please delete and recreate!');
awrichar marked this conversation as resolved.
Show resolved Hide resolved
return true;
}
return;
return false;
}
const subscriptions = await this.eventstream.getSubscriptions();
const allSubscriptions = await this.eventstream.getSubscriptions();
const subscriptions = allSubscriptions.filter(s => s.stream === existingStream.id);
if (subscriptions.length === 0) {
return;
return false;
}

const baseSubscription = packSubscriptionName(
this.topic,
this.instancePath,
BASE_SUBSCRIPTION_NAME,
tokenCreateEvent,
);
if (subscriptions.length === 1 && subscriptions[0].name === baseSubscription) {
// Special case - only the base subscription exists (with the correct name),
// but no pools have been activated. This is ok.
return false;
}

const foundEvents = new Set<string>();
for (const sub of subscriptions.filter(s => s.stream === existingStream.id)) {
for (const sub of subscriptions) {
const parts = unpackSubscriptionName(this.topic, sub.name);
if (parts.event !== undefined && parts.event !== '') {
foundEvents.add(parts.event);
}
}

if (foundEvents.size === 1 && foundEvents.has(BASE_SUBSCRIPTION_NAME)) {
// Special case - only the base subscription exists (with the correct name),
// but no pools have been activated. This is ok.
return;
}

// Otherwise, expect to have found subscriptions for each of the events.
for (const event of ALL_SUBSCRIBED_EVENTS) {
if (!foundEvents.has(event)) {
this.logger.warn('Incorrect event stream subscriptions found - deleting and recreating');
await this.eventstream.deleteStream(existingStream.id);
await this.init();
return;
}
// Expect to have found subscriptions for each of the events.
if (!ALL_SUBSCRIBED_EVENTS.every(event => foundEvents.has(event))) {
this.logger.warn('Incorrect event stream subscriptions found - please delete and recreate!');
awrichar marked this conversation as resolved.
Show resolved Hide resolved
return true;
}
return false;
}

private postOptions(signer: string, requestId?: string) {
Expand Down