Skip to content

Commit

Permalink
fix(client): asyncEventEmitter to not silence unhandled exceptions ra…
Browse files Browse the repository at this point in the history
…ised in event handlers (#1247)

Currently if there is an unhandled exception in an event handler of
asyncEventEmitter they are silenced and don't make it to a global error
handler or console. This changes it to re-throw them async using
`queueMicrotask` so that they happen outside of the promise awaited by
`allSettled`.

---------

Co-authored-by: msfstef <msfstef@gmail.com>
  • Loading branch information
samwillis and msfstef committed May 30, 2024
1 parent 6d4fb3d commit 17e793c
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 27 deletions.
5 changes: 5 additions & 0 deletions .changeset/five-rats-shake.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"electric-sql": patch
---

Fix asyncEventEmitter to not silence unhandled exceptions raised in event handlers.
5 changes: 5 additions & 0 deletions .changeset/selfish-years-suffer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"electric-sql": patch
---

Fix `ShapeManager` bug where manager state gets reset but the Satellite process is still assuming it is accessible.
7 changes: 4 additions & 3 deletions clients/typescript/src/satellite/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ export class SatelliteClient implements Client {
this.socketHandler = undefined

if (this.socket !== undefined) {
this.socket!.closeAndRemoveListeners()
this.socket.closeAndRemoveListeners()
this.socket = undefined
}
}
Expand All @@ -367,9 +367,10 @@ export class SatelliteClient implements Client {
return this.outbound.isReplicating
}

shutdown(): void {
this.disconnect()
async shutdown(): Promise<void> {
this.emitter.removeAllListeners()
await this.emitter.waitForProcessing()
this.disconnect()
this.isDown = true
}

Expand Down
2 changes: 1 addition & 1 deletion clients/typescript/src/satellite/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ export interface Satellite extends IShapeManager {
export interface Client {
connect(): Promise<void>
disconnect(): void
shutdown(): void
shutdown(): Promise<void>
authenticate(authState: AuthState): Promise<AuthResponse>
isConnected(): boolean
getOutboundReplicationStatus(): ReplicationStatus
Expand Down
3 changes: 2 additions & 1 deletion clients/typescript/src/satellite/mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,8 @@ export class MockSatelliteClient
: ReplicationStatus.STOPPED
}

shutdown(): void {
async shutdown(): Promise<void> {
await this.waitForProcessing()
this.isDown = true
}

Expand Down
19 changes: 13 additions & 6 deletions clients/typescript/src/satellite/process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ export class SatelliteProcess implements Satellite {
this.disconnect()

if (shutdown) {
this.client.shutdown()
await this.client.shutdown()
}
}

Expand Down Expand Up @@ -503,24 +503,29 @@ export class SatelliteProcess implements Satellite {
subsData.subscriptionId
)

await this._applySubscriptionData(
const applied = await this._applySubscriptionData(
subsData.data,
subsData.lsn,
[],
subsData.subscriptionId
)

const toBeUnsubbed = afterApply()
if (toBeUnsubbed.length > 0) await this.unsubscribeIds(toBeUnsubbed)
if (applied) {
const toBeUnsubbed = afterApply()
if (toBeUnsubbed.length > 0) await this.unsubscribeIds(toBeUnsubbed)
}
}

/** Insert incoming subscription data into the database. */
/**
* Insert incoming subscription data into the database.
* Returns flag indicating whether application was successful or not.
*/
private async _applySubscriptionData(
changes: InitialDataChange[],
lsn: LSN,
additionalStmts: Statement[] = [],
subscriptionId?: string
) {
): Promise<boolean> {
const namespace = this.builder.defaultNamespace
const stmts: Statement[] = []

Expand Down Expand Up @@ -662,6 +667,7 @@ export class SatelliteProcess implements Satellite {
})
})
this.notifier.actuallyChanged(this.dbName, notificationChanges, 'initial')
return true
} catch (e) {
this._handleSubscriptionError(
new SatelliteError(
Expand All @@ -670,6 +676,7 @@ export class SatelliteProcess implements Satellite {
),
subscriptionId
)
return false
}
}

Expand Down
41 changes: 29 additions & 12 deletions clients/typescript/src/util/asyncEventEmitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export class AsyncEventEmitter<Events extends EventMap> {
private eventQueue: Array<
EmittedEvent<keyof Events, Parameters<Events[keyof Events]>>
> = []
private processing = false // indicates whether the event queue is currently being processed
private processing: Promise<PromiseSettledResult<void>[]> | false = false // indicates whether the event queue is currently being processed

private getListeners<E extends keyof Events>(event: E): Array<Events[E]> {
return this.listeners[event] ?? []
Expand Down Expand Up @@ -127,8 +127,6 @@ export class AsyncEventEmitter<Events extends EventMap> {
* and an 'error' event is emitted, the error is thrown.
*/
private processQueue() {
this.processing = true

const emittedEvent = this.eventQueue.shift()
if (emittedEvent) {
// We call all listeners and process the next event when all listeners finished.
Expand All @@ -148,15 +146,27 @@ export class AsyncEventEmitter<Events extends EventMap> {
// deep copy because once listeners mutate the `this.listeners` array as they remove themselves
// which breaks the `map` which iterates over that same array while the contents may shift
const ls = [...listeners]
const listenerProms = ls.map(async (listener) => await listener(...args))

Promise
// wait for all listeners to finish,
// some may fail (i.e.return a rejected promise)
// but that should not stop the queue from being processed
// hence the use of `allSettled` rather than `all`
.allSettled(listenerProms)
.then(() => this.processQueue()) // only process the next event when all listeners have finished
const listenerProms = ls.map((listener) => listener(...args))

// wait for all listeners to finish,
// some may fail (i.e.return a rejected promise)
// but that should not stop the queue from being processed
// hence the use of `allSettled` rather than `all`
this.processing = Promise.allSettled(listenerProms)

// only process the next event when all listeners have finished
this.processing.then((settledPromises) => {
this.processQueue()

// re-throw any rejected promises such that the global error
// handler can catch them and log them, otherwise they would
// be suppressed and bugs may go unnoticed
settledPromises.forEach((rejectedProm) => {
if (rejectedProm.status === 'rejected') {
throw rejectedProm.reason
}
})
})
} else {
// signal that the queue is no longer being processed
this.processing = false
Expand Down Expand Up @@ -251,4 +261,11 @@ export class AsyncEventEmitter<Events extends EventMap> {
this.maxListeners = maxListeners
return this
}

/**
* Wait for event queue to finish processing.
*/
async waitForProcessing(): Promise<void> {
await this.processing
}
}
5 changes: 3 additions & 2 deletions clients/typescript/src/util/encoders/sqliteEncoders.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export const sqliteTypeEncoder = {
json: (string: string) => textEncoder.encode(string),
timetz: (string: string) =>
sqliteTypeEncoder.text(stringToTimetzString(string)),
bytea: (bytes: Uint8Array) => bytes, // no-op
bytea: (bytes: Uint8Array) => bytes,
}

export const sqliteTypeDecoder = {
Expand All @@ -22,7 +22,8 @@ export const sqliteTypeDecoder = {
json: bytesToString,
timetz: bytesToTimetzString,
float: bytesToFloat,
bytea: (bytes: Uint8Array) => bytes, // no-op
// ensure it is in Uint8Array format and not Buffer etc
bytea: (bytes: Uint8Array) => new Uint8Array(bytes),
}

export function boolToBytes(b: number) {
Expand Down
4 changes: 2 additions & 2 deletions clients/typescript/test/satellite/process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2498,7 +2498,7 @@ export const processTests = (test: TestFn<ContextType>) => {
t.plan(3)
const { client, satellite } = t.context

client.shutdown()
await client.shutdown()

const retry = (_e: any, a: number) => {
if (a > 0) {
Expand All @@ -2519,7 +2519,7 @@ export const processTests = (test: TestFn<ContextType>) => {

test.serial('connection cancelled on disconnect', async (t) => {
const { client, satellite, authState, token } = t.context
client.shutdown() // such that satellite can't connect to Electric and will keep retrying
await client.shutdown() // such that satellite can't connect to Electric and will keep retrying
const { connectionPromise } = await startSatellite(
satellite,
authState,
Expand Down

0 comments on commit 17e793c

Please sign in to comment.