From 8d40d054bdcb7a3f1724881f71fd249485a3ad1b Mon Sep 17 00:00:00 2001 From: msfstef Date: Tue, 21 May 2024 17:49:40 +0300 Subject: [PATCH] Wait for async event queue to finish processing --- clients/typescript/src/satellite/client.ts | 7 +++-- clients/typescript/src/satellite/index.ts | 2 +- clients/typescript/src/satellite/mock.ts | 3 +- clients/typescript/src/satellite/process.ts | 19 +++++++++---- .../typescript/src/util/asyncEventEmitter.ts | 28 ++++++++++++------- clients/typescript/test/satellite/process.ts | 4 +-- 6 files changed, 40 insertions(+), 23 deletions(-) diff --git a/clients/typescript/src/satellite/client.ts b/clients/typescript/src/satellite/client.ts index 60e8937130..2c43d80e05 100644 --- a/clients/typescript/src/satellite/client.ts +++ b/clients/typescript/src/satellite/client.ts @@ -354,7 +354,7 @@ export class SatelliteClient implements Client { this.socketHandler = undefined if (this.socket !== undefined) { - this.socket!.closeAndRemoveListeners() + this.socket.closeAndRemoveListeners() this.socket = undefined } } @@ -367,9 +367,10 @@ export class SatelliteClient implements Client { return this.outbound.isReplicating } - shutdown(): void { - this.disconnect() + async shutdown(): Promise { this.emitter.removeAllListeners() + await this.emitter.waitForProcessing() + this.disconnect() this.isDown = true } diff --git a/clients/typescript/src/satellite/index.ts b/clients/typescript/src/satellite/index.ts index 117f1a00da..c42061d12f 100644 --- a/clients/typescript/src/satellite/index.ts +++ b/clients/typescript/src/satellite/index.ts @@ -91,7 +91,7 @@ export interface Satellite extends IShapeManager { export interface Client { connect(): Promise disconnect(): void - shutdown(): void + shutdown(): Promise authenticate(authState: AuthState): Promise isConnected(): boolean getOutboundReplicationStatus(): ReplicationStatus diff --git a/clients/typescript/src/satellite/mock.ts b/clients/typescript/src/satellite/mock.ts index a3503bf1d5..873ac810d9 100644 --- a/clients/typescript/src/satellite/mock.ts +++ b/clients/typescript/src/satellite/mock.ts @@ -403,7 +403,8 @@ export class MockSatelliteClient : ReplicationStatus.STOPPED } - shutdown(): void { + async shutdown(): Promise { + await this.waitForProcessing() this.isDown = true } diff --git a/clients/typescript/src/satellite/process.ts b/clients/typescript/src/satellite/process.ts index aa5475f6ee..ac33814bca 100644 --- a/clients/typescript/src/satellite/process.ts +++ b/clients/typescript/src/satellite/process.ts @@ -386,7 +386,7 @@ export class SatelliteProcess implements Satellite { this.disconnect() if (shutdown) { - this.client.shutdown() + await this.client.shutdown() } } @@ -503,24 +503,29 @@ export class SatelliteProcess implements Satellite { subsData.subscriptionId ) - await this._applySubscriptionData( + const applied = await this._applySubscriptionData( subsData.data, subsData.lsn, [], subsData.subscriptionId ) - const toBeUnsubbed = afterApply() - if (toBeUnsubbed.length > 0) await this.unsubscribeIds(toBeUnsubbed) + if (applied) { + const toBeUnsubbed = afterApply() + if (toBeUnsubbed.length > 0) await this.unsubscribeIds(toBeUnsubbed) + } } - /** Insert incoming subscription data into the database. */ + /** + * Insert incoming subscription data into the database. + * Returns flag indicating whether application was successful or not. + */ private async _applySubscriptionData( changes: InitialDataChange[], lsn: LSN, additionalStmts: Statement[] = [], subscriptionId?: string - ) { + ): Promise { const namespace = this.builder.defaultNamespace const stmts: Statement[] = [] @@ -662,6 +667,7 @@ export class SatelliteProcess implements Satellite { }) }) this.notifier.actuallyChanged(this.dbName, notificationChanges, 'initial') + return true } catch (e) { this._handleSubscriptionError( new SatelliteError( @@ -670,6 +676,7 @@ export class SatelliteProcess implements Satellite { ), subscriptionId ) + return false } } diff --git a/clients/typescript/src/util/asyncEventEmitter.ts b/clients/typescript/src/util/asyncEventEmitter.ts index d83e733078..bb8249447a 100644 --- a/clients/typescript/src/util/asyncEventEmitter.ts +++ b/clients/typescript/src/util/asyncEventEmitter.ts @@ -24,7 +24,7 @@ export class AsyncEventEmitter { private eventQueue: Array< EmittedEvent> > = [] - private processing = false // indicates whether the event queue is currently being processed + private processing: Promise[]> | false = false // indicates whether the event queue is currently being processed private getListeners(event: E): Array { return this.listeners[event] ?? [] @@ -127,8 +127,6 @@ export class AsyncEventEmitter { * and an 'error' event is emitted, the error is thrown. */ private processQueue() { - this.processing = true - const emittedEvent = this.eventQueue.shift() if (emittedEvent) { // We call all listeners and process the next event when all listeners finished. @@ -161,13 +159,16 @@ export class AsyncEventEmitter { } }) - Promise - // wait for all listeners to finish, - // some may fail (i.e.return a rejected promise) - // but that should not stop the queue from being processed - // hence the use of `allSettled` rather than `all` - .allSettled(listenerProms) - .then(() => this.processQueue()) // only process the next event when all listeners have finished + // wait for all listeners to finish, + // some may fail (i.e.return a rejected promise) + // but that should not stop the queue from being processed + // hence the use of `allSettled` rather than `all` + const processingProm = Promise.allSettled(listenerProms) + + // only process the next event when all listeners have finished + processingProm.then(() => this.processQueue()) + + this.processing = processingProm } else { // signal that the queue is no longer being processed this.processing = false @@ -262,4 +263,11 @@ export class AsyncEventEmitter { this.maxListeners = maxListeners return this } + + /** + * Wait for event queue to finish processing. + */ + async waitForProcessing(): Promise { + await this.processing + } } diff --git a/clients/typescript/test/satellite/process.ts b/clients/typescript/test/satellite/process.ts index 226a2304ae..42c5fdbd23 100644 --- a/clients/typescript/test/satellite/process.ts +++ b/clients/typescript/test/satellite/process.ts @@ -2498,7 +2498,7 @@ export const processTests = (test: TestFn) => { t.plan(3) const { client, satellite } = t.context - client.shutdown() + await client.shutdown() const retry = (_e: any, a: number) => { if (a > 0) { @@ -2519,7 +2519,7 @@ export const processTests = (test: TestFn) => { test.serial('connection cancelled on disconnect', async (t) => { const { client, satellite, authState, token } = t.context - client.shutdown() // such that satellite can't connect to Electric and will keep retrying + await client.shutdown() // such that satellite can't connect to Electric and will keep retrying const { connectionPromise } = await startSatellite( satellite, authState,