Skip to content

Commit

Permalink
Wait for async event queue to finish processing
Browse files Browse the repository at this point in the history
  • Loading branch information
msfstef committed May 21, 2024
1 parent 0e2ac33 commit 8d40d05
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 23 deletions.
7 changes: 4 additions & 3 deletions clients/typescript/src/satellite/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ export class SatelliteClient implements Client {
this.socketHandler = undefined

if (this.socket !== undefined) {
this.socket!.closeAndRemoveListeners()
this.socket.closeAndRemoveListeners()
this.socket = undefined
}
}
Expand All @@ -367,9 +367,10 @@ export class SatelliteClient implements Client {
return this.outbound.isReplicating
}

shutdown(): void {
this.disconnect()
async shutdown(): Promise<void> {
this.emitter.removeAllListeners()
await this.emitter.waitForProcessing()
this.disconnect()
this.isDown = true
}

Expand Down
2 changes: 1 addition & 1 deletion clients/typescript/src/satellite/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ export interface Satellite extends IShapeManager {
export interface Client {
connect(): Promise<void>
disconnect(): void
shutdown(): void
shutdown(): Promise<void>
authenticate(authState: AuthState): Promise<AuthResponse>
isConnected(): boolean
getOutboundReplicationStatus(): ReplicationStatus
Expand Down
3 changes: 2 additions & 1 deletion clients/typescript/src/satellite/mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,8 @@ export class MockSatelliteClient
: ReplicationStatus.STOPPED
}

shutdown(): void {
async shutdown(): Promise<void> {
await this.waitForProcessing()
this.isDown = true
}

Expand Down
19 changes: 13 additions & 6 deletions clients/typescript/src/satellite/process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ export class SatelliteProcess implements Satellite {
this.disconnect()

if (shutdown) {
this.client.shutdown()
await this.client.shutdown()
}
}

Expand Down Expand Up @@ -503,24 +503,29 @@ export class SatelliteProcess implements Satellite {
subsData.subscriptionId
)

await this._applySubscriptionData(
const applied = await this._applySubscriptionData(
subsData.data,
subsData.lsn,
[],
subsData.subscriptionId
)

const toBeUnsubbed = afterApply()
if (toBeUnsubbed.length > 0) await this.unsubscribeIds(toBeUnsubbed)
if (applied) {
const toBeUnsubbed = afterApply()
if (toBeUnsubbed.length > 0) await this.unsubscribeIds(toBeUnsubbed)
}
}

/** Insert incoming subscription data into the database. */
/**
* Insert incoming subscription data into the database.
* Returns flag indicating whether application was successful or not.
*/
private async _applySubscriptionData(
changes: InitialDataChange[],
lsn: LSN,
additionalStmts: Statement[] = [],
subscriptionId?: string
) {
): Promise<boolean> {
const namespace = this.builder.defaultNamespace
const stmts: Statement[] = []

Expand Down Expand Up @@ -662,6 +667,7 @@ export class SatelliteProcess implements Satellite {
})
})
this.notifier.actuallyChanged(this.dbName, notificationChanges, 'initial')
return true
} catch (e) {
this._handleSubscriptionError(
new SatelliteError(
Expand All @@ -670,6 +676,7 @@ export class SatelliteProcess implements Satellite {
),
subscriptionId
)
return false
}
}

Expand Down
28 changes: 18 additions & 10 deletions clients/typescript/src/util/asyncEventEmitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export class AsyncEventEmitter<Events extends EventMap> {
private eventQueue: Array<
EmittedEvent<keyof Events, Parameters<Events[keyof Events]>>
> = []
private processing = false // indicates whether the event queue is currently being processed
private processing: Promise<PromiseSettledResult<void>[]> | false = false // indicates whether the event queue is currently being processed

private getListeners<E extends keyof Events>(event: E): Array<Events[E]> {
return this.listeners[event] ?? []
Expand Down Expand Up @@ -127,8 +127,6 @@ export class AsyncEventEmitter<Events extends EventMap> {
* and an 'error' event is emitted, the error is thrown.
*/
private processQueue() {
this.processing = true

const emittedEvent = this.eventQueue.shift()
if (emittedEvent) {
// We call all listeners and process the next event when all listeners finished.
Expand Down Expand Up @@ -161,13 +159,16 @@ export class AsyncEventEmitter<Events extends EventMap> {
}
})

Promise
// wait for all listeners to finish,
// some may fail (i.e.return a rejected promise)
// but that should not stop the queue from being processed
// hence the use of `allSettled` rather than `all`
.allSettled(listenerProms)
.then(() => this.processQueue()) // only process the next event when all listeners have finished
// wait for all listeners to finish,
// some may fail (i.e.return a rejected promise)
// but that should not stop the queue from being processed
// hence the use of `allSettled` rather than `all`
const processingProm = Promise.allSettled(listenerProms)

// only process the next event when all listeners have finished
processingProm.then(() => this.processQueue())

this.processing = processingProm
} else {
// signal that the queue is no longer being processed
this.processing = false
Expand Down Expand Up @@ -262,4 +263,11 @@ export class AsyncEventEmitter<Events extends EventMap> {
this.maxListeners = maxListeners
return this
}

/**
* Wait for event queue to finish processing.
*/
async waitForProcessing(): Promise<void> {
await this.processing
}
}
4 changes: 2 additions & 2 deletions clients/typescript/test/satellite/process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2498,7 +2498,7 @@ export const processTests = (test: TestFn<ContextType>) => {
t.plan(3)
const { client, satellite } = t.context

client.shutdown()
await client.shutdown()

const retry = (_e: any, a: number) => {
if (a > 0) {
Expand All @@ -2519,7 +2519,7 @@ export const processTests = (test: TestFn<ContextType>) => {

test.serial('connection cancelled on disconnect', async (t) => {
const { client, satellite, authState, token } = t.context
client.shutdown() // such that satellite can't connect to Electric and will keep retrying
await client.shutdown() // such that satellite can't connect to Electric and will keep retrying
const { connectionPromise } = await startSatellite(
satellite,
authState,
Expand Down

0 comments on commit 8d40d05

Please sign in to comment.