Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(client): asyncEventEmitter to not silence unhandled exceptions raised in event handlers #1247

Merged
merged 6 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/five-rats-shake.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"electric-sql": patch
---

Fix asyncEventEmitter to not silence unhandled exceptions raised in event handlers.
7 changes: 4 additions & 3 deletions clients/typescript/src/satellite/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ export class SatelliteClient implements Client {
this.socketHandler = undefined

if (this.socket !== undefined) {
this.socket!.closeAndRemoveListeners()
this.socket.closeAndRemoveListeners()
this.socket = undefined
}
}
Expand All @@ -367,9 +367,10 @@ export class SatelliteClient implements Client {
return this.outbound.isReplicating
}

shutdown(): void {
this.disconnect()
async shutdown(): Promise<void> {
this.emitter.removeAllListeners()
await this.emitter.waitForProcessing()
this.disconnect()
this.isDown = true
}

Expand Down
2 changes: 1 addition & 1 deletion clients/typescript/src/satellite/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ export interface Satellite extends IShapeManager {
export interface Client {
connect(): Promise<void>
disconnect(): void
shutdown(): void
shutdown(): Promise<void>
authenticate(authState: AuthState): Promise<AuthResponse>
isConnected(): boolean
getOutboundReplicationStatus(): ReplicationStatus
Expand Down
3 changes: 2 additions & 1 deletion clients/typescript/src/satellite/mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,8 @@ export class MockSatelliteClient
: ReplicationStatus.STOPPED
}

shutdown(): void {
async shutdown(): Promise<void> {
await this.waitForProcessing()
this.isDown = true
}

Expand Down
19 changes: 13 additions & 6 deletions clients/typescript/src/satellite/process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ export class SatelliteProcess implements Satellite {
this.disconnect()

if (shutdown) {
this.client.shutdown()
await this.client.shutdown()
}
}

Expand Down Expand Up @@ -503,24 +503,29 @@ export class SatelliteProcess implements Satellite {
subsData.subscriptionId
)

await this._applySubscriptionData(
const applied = await this._applySubscriptionData(
subsData.data,
subsData.lsn,
[],
subsData.subscriptionId
)

const toBeUnsubbed = afterApply()
if (toBeUnsubbed.length > 0) await this.unsubscribeIds(toBeUnsubbed)
if (applied) {
const toBeUnsubbed = afterApply()
if (toBeUnsubbed.length > 0) await this.unsubscribeIds(toBeUnsubbed)
}
}

/** Insert incoming subscription data into the database. */
/**
* Insert incoming subscription data into the database.
* Returns flag indicating whether application was successful or not.
*/
private async _applySubscriptionData(
changes: InitialDataChange[],
lsn: LSN,
additionalStmts: Statement[] = [],
subscriptionId?: string
) {
): Promise<boolean> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

really ugly fix but this is a symptom of how we've mixed and matched the architecture here - if _applySubscriptionData fails, it still resolves but fires _handleSubscriptionError. However the error handler resets the state of the subscriptionManager, so the afterApply call in lines 513-516 still fires because _applySubscriptionData resolves without an issue but because the state of the subscription manager has been reset it attempts to resolve promises that are no longer present.

I think satellite probably needs a bit of rearchitecting as we're seeing various issues w.r.t. timing and async.

const namespace = this.builder.defaultNamespace
const stmts: Statement[] = []

Expand Down Expand Up @@ -662,6 +667,7 @@ export class SatelliteProcess implements Satellite {
})
})
this.notifier.actuallyChanged(this.dbName, notificationChanges, 'initial')
return true
} catch (e) {
this._handleSubscriptionError(
new SatelliteError(
Expand All @@ -670,6 +676,7 @@ export class SatelliteProcess implements Satellite {
),
subscriptionId
)
return false
}
}

Expand Down
43 changes: 31 additions & 12 deletions clients/typescript/src/util/asyncEventEmitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export class AsyncEventEmitter<Events extends EventMap> {
private eventQueue: Array<
EmittedEvent<keyof Events, Parameters<Events[keyof Events]>>
> = []
private processing = false // indicates whether the event queue is currently being processed
private processing: Promise<PromiseSettledResult<void>[]> | false = false // indicates whether the event queue is currently being processed

private getListeners<E extends keyof Events>(event: E): Array<Events[E]> {
return this.listeners[event] ?? []
Expand Down Expand Up @@ -127,8 +127,6 @@ export class AsyncEventEmitter<Events extends EventMap> {
* and an 'error' event is emitted, the error is thrown.
*/
private processQueue() {
this.processing = true

const emittedEvent = this.eventQueue.shift()
if (emittedEvent) {
// We call all listeners and process the next event when all listeners finished.
Expand All @@ -148,15 +146,29 @@ export class AsyncEventEmitter<Events extends EventMap> {
// deep copy because once listeners mutate the `this.listeners` array as they remove themselves
// which breaks the `map` which iterates over that same array while the contents may shift
const ls = [...listeners]
const listenerProms = ls.map(async (listener) => await listener(...args))

Promise
// wait for all listeners to finish,
// some may fail (i.e.return a rejected promise)
// but that should not stop the queue from being processed
// hence the use of `allSettled` rather than `all`
.allSettled(listenerProms)
.then(() => this.processQueue()) // only process the next event when all listeners have finished
const listenerProms = ls.map(async (listener) => {
try {
await listener(...args)
} catch (e) {
// If a listener throws an error, we re-throw it asynchronously so that the queue can continue
// to be processed, this ensures that the exception isn't swallowed by allSettled below.
// It will likely be caught by a global error handler, or be logged to the console.
queueMicrotask(() => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is queueMicrotask available in all environments?
The JS docs say:

The queueMicrotask() method, which is exposed on the Window or Worker interface

So they are talking about browser environments.
Looked it up for node and node seems to support it as well.
Not sure about native environments such as react-native?

Perhaps we don't need this queueMicrotask trick since allSettled returns an array of promise statuses (fulfilled or rejected). So we could loop over the processingProm array, aggregate all errors into one, and then throw the aggregated error. Would that work?

Copy link
Contributor

@msfstef msfstef May 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure queueMicrotask is widely supported, but I agree with your suggestion and I think looking at the settled promises and rethrowing is more idiomatic/clear.

It does change the behaviour slightly though as it will only throw once everything has settled, whereas with queueMicrotask the error would be bubbled up as soon as it happens. @samwillis what's your opinion?

https://github.com/electric-sql/electric/pull/1247/files#diff-503f0d4f6fe4b83c4f32b152d83374bb5a0f8f100a5604cfddc3ffa3cfee40f6R149-R169

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've merged the PR to avoid having it linger for too long as I think it's an important one to have in main, but if there's any disagreements on this last change let me know.

throw e
})
}
})

// wait for all listeners to finish,
// some may fail (i.e.return a rejected promise)
// but that should not stop the queue from being processed
// hence the use of `allSettled` rather than `all`
const processingProm = Promise.allSettled(listenerProms)

// only process the next event when all listeners have finished
processingProm.then(() => this.processQueue())

this.processing = processingProm
} else {
// signal that the queue is no longer being processed
this.processing = false
Expand Down Expand Up @@ -251,4 +263,11 @@ export class AsyncEventEmitter<Events extends EventMap> {
this.maxListeners = maxListeners
return this
}

/**
* Wait for event queue to finish processing.
*/
async waitForProcessing(): Promise<void> {
await this.processing
}
}
4 changes: 2 additions & 2 deletions clients/typescript/test/satellite/process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2498,7 +2498,7 @@ export const processTests = (test: TestFn<ContextType>) => {
t.plan(3)
const { client, satellite } = t.context

client.shutdown()
await client.shutdown()

const retry = (_e: any, a: number) => {
if (a > 0) {
Expand All @@ -2519,7 +2519,7 @@ export const processTests = (test: TestFn<ContextType>) => {

test.serial('connection cancelled on disconnect', async (t) => {
const { client, satellite, authState, token } = t.context
client.shutdown() // such that satellite can't connect to Electric and will keep retrying
await client.shutdown() // such that satellite can't connect to Electric and will keep retrying
const { connectionPromise } = await startSatellite(
satellite,
authState,
Expand Down
Loading