Skip to content

Commit

Permalink
fix(client): Better throttle snapshot stop strategy (#1251)
Browse files Browse the repository at this point in the history
We've encountered a race condition that can occurr if, after cancelling
the throttle when stopping the process, some other piece of code calls
throttlesnapshot. That's because cancelling the throttle doesn't prevent
from scheduling new ones.

The proposed solution is to assign undefined to the function, so places
that would normally trigger a snapshot, like the poll interval, would
just do nothing until properly closed.

On top of this, we found that `setClientListeners` is only called when
instantiating the process, but it's cleaned up when calling `stop` with
the `disconnect` function. This makes it so that stopping and then
starting the process won't have some of the client listeners properly
set up. The proposed solution is to just move the client listeners
intialization to the `start` function.

I'm not sure how to test this. It would probably need some way to mock
client events after a stop and a start.
  • Loading branch information
davidmartos96 committed May 15, 2024
1 parent 379fad5 commit 237e323
Show file tree
Hide file tree
Showing 7 changed files with 191 additions and 50 deletions.
5 changes: 5 additions & 0 deletions .changeset/red-insects-provide.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"electric-sql": patch
---

Better throttle snapshot stop strategy
4 changes: 3 additions & 1 deletion clients/typescript/src/drivers/node-postgres/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,12 @@ export async function createEmbeddedPostgres(
const db = pg.getPgClient()
await db.connect()

let stopPromise: Promise<void>

// We use the database directory as the name
// because it uniquely identifies the DB
return {
db,
stop: () => pg.stop(),
stop: () => (stopPromise ??= pg.stop()),
}
}
11 changes: 8 additions & 3 deletions clients/typescript/src/satellite/mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,8 @@ export class MockSatelliteClient
transactionsCb?: TransactionCallback
additionalDataCb?: AdditionalDataCallback

outboundStartedCallback?: OutboundStartedCallback

relationData: Record<string, DataRecord[]> = {}

deliverFirst = false
Expand Down Expand Up @@ -423,15 +425,15 @@ export class MockSatelliteClient
}

unsubscribeToTransactions(): void {
throw new Error('Method not implemented.')
this.transactionsCb = undefined
}

subscribeToAdditionalData(callback: AdditionalDataCallback): void {
this.additionalDataCb = callback
}

unsubscribeToAdditionalData(_cb: AdditionalDataCallback): void {
throw new Error('Method not implemented.')
this.additionalDataCb = undefined
}

enqueueTransaction(transaction: DataTransaction): void {
Expand All @@ -448,10 +450,13 @@ export class MockSatelliteClient

subscribeToOutboundStarted(callback: OutboundStartedCallback): void {
this.on('outbound_started', callback)
this.outboundStartedCallback = callback
}

unsubscribeToOutboundStarted(): void {
throw new Error('Method not implemented.')
if (!this.outboundStartedCallback) return
this.removeListener('outbound_started', this.outboundStartedCallback)
this.outboundStartedCallback = undefined
}

sendErrorAfterTimeout(subscriptionId: string, timeout: number): void {
Expand Down
108 changes: 86 additions & 22 deletions clients/typescript/src/satellite/process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import {
Uuid,
DbRecord as DataRecord,
ReplicatedRowTransformer,
ServerTransaction,
} from '../util/types'
import { SatelliteOpts } from './config'
import { Client, Satellite } from './index'
Expand Down Expand Up @@ -155,6 +156,8 @@ export class SatelliteProcess implements Satellite {
private _connectRetryHandler: ConnectRetryHandler
private initializing?: Waiter

private _removeClientListeners?: () => void

constructor(
dbName: DbName,
adapter: DatabaseAdapter,
Expand Down Expand Up @@ -191,8 +194,6 @@ export class SatelliteProcess implements Satellite {
this.shapeRequestIdGenerator = this.subscriptionIdGenerator

this._connectRetryHandler = connectRetryHandler

this.setClientListeners()
}

/**
Expand All @@ -212,6 +213,8 @@ export class SatelliteProcess implements Satellite {
await this.logDatabaseVersion()
}

this.setClientListeners()

await this.migrator.up()

const isVerified = await this._verifyTableStructure()
Expand Down Expand Up @@ -251,6 +254,7 @@ export class SatelliteProcess implements Satellite {
this.notifier.subscribeToPotentialDataChanges(this._throttledSnapshot)

// Start polling to request a snapshot every `pollingInterval` ms.
clearInterval(this._pollingInterval)
this._pollingInterval = setInterval(
this._throttledSnapshot,
this.opts.pollingInterval
Expand Down Expand Up @@ -314,17 +318,54 @@ export class SatelliteProcess implements Satellite {
await this.adapter.runInTransaction(...stmtsWithTriggers)
}

// Adds all the necessary listeners to the satellite client
// They can be cleared up by calling the function `_removeClientListeners`
setClientListeners(): void {
this.client.subscribeToError(this._handleClientError.bind(this))
this.client.subscribeToRelations(this._updateRelations.bind(this))
this.client.subscribeToTransactions(this._applyTransaction.bind(this))
this.client.subscribeToAdditionalData(this._applyAdditionalData.bind(this))
this.client.subscribeToOutboundStarted(this._throttledSnapshot.bind(this))
// Remove any existing listeners
if (this._removeClientListeners) {
this._removeClientListeners?.()
this._removeClientListeners = undefined
}

this.client.subscribeToSubscriptionEvents(
this._handleSubscriptionData.bind(this),
const clientErrorCallback = this._handleClientError.bind(this)
this.client.subscribeToError(clientErrorCallback)

const clientRelationsCallback = this._handleClientRelations.bind(this)
this.client.subscribeToRelations(clientRelationsCallback)

const clientTransactionsCallback = this._handleClientTransactions.bind(this)
this.client.subscribeToTransactions(clientTransactionsCallback)

const clientAdditionalDataCallback =
this._handleClientAdditionalData.bind(this)
this.client.subscribeToAdditionalData(clientAdditionalDataCallback)

const clientOutboundStartedCallback =
this._handleClientOutboundStarted.bind(this)
this.client.subscribeToOutboundStarted(clientOutboundStartedCallback)

const clientSubscriptionDataCallback =
this._handleSubscriptionData.bind(this)
const clientSubscriptionErrorCallback =
this._handleSubscriptionError.bind(this)
this.client.subscribeToSubscriptionEvents(
clientSubscriptionDataCallback,
clientSubscriptionErrorCallback
)

// Keep a way to remove the client listeners
this._removeClientListeners = () => {
this.client.unsubscribeToError(clientErrorCallback)
this.client.unsubscribeToRelations(clientRelationsCallback)
this.client.unsubscribeToTransactions(clientTransactionsCallback)
this.client.unsubscribeToAdditionalData(clientAdditionalDataCallback)
this.client.unsubscribeToOutboundStarted(clientOutboundStartedCallback)

this.client.unsubscribeToSubscriptionEvents(
clientSubscriptionDataCallback,
clientSubscriptionErrorCallback
)
}
}

// Unsubscribe from data changes and stop polling
Expand All @@ -333,19 +374,9 @@ export class SatelliteProcess implements Satellite {
}

private async _stop(shutdown?: boolean): Promise<void> {
// Stop snapshotting and polling for changes.
this._throttledSnapshot.cancel()

// Ensure that no snapshot is left running in the background
// by acquiring the mutex and releasing it immediately.
const releaseMutex = await this.snapshotMutex.acquire()
releaseMutex()

if (this._pollingInterval !== undefined) {
clearInterval(this._pollingInterval)

this._pollingInterval = undefined
}
// Stop snapshot polling
clearInterval(this._pollingInterval)
this._pollingInterval = undefined

// Unsubscribe all listeners and remove them
const unsubscribers = [
Expand All @@ -362,13 +393,30 @@ export class SatelliteProcess implements Satellite {
}
})

this._removeClientListeners?.()
this._removeClientListeners = undefined

// Cancel the snapshot throttle
this._throttledSnapshot.cancel()

// Make sure no snapshot is running after we stop the process, otherwise we might be trying to
// interact with a closed database connection
await this._waitForActiveSnapshots()

this.disconnect()

if (shutdown) {
this.client.shutdown()
}
}

// Ensure that no snapshot is left running in the background
// by acquiring the mutex and releasing it immediately.
async _waitForActiveSnapshots(): Promise<void> {
const releaseMutex = await this.snapshotMutex.acquire()
releaseMutex()
}

async subscribe(shapeDefinitions: Shape[]): Promise<ShapeSubscription> {
// Await for client to be ready before doing anything else
await this.initializing?.waitOn()
Expand Down Expand Up @@ -674,6 +722,22 @@ export class SatelliteProcess implements Satellite {
}
}

_handleClientRelations(relation: Relation): void {
this._updateRelations(relation)
}

async _handleClientTransactions(tx: ServerTransaction) {
await this._applyTransaction(tx)
}

async _handleClientAdditionalData(data: AdditionalData) {
await this._applyAdditionalData(data)
}

async _handleClientOutboundStarted() {
await this._throttledSnapshot()
}

// handles async client errors: can be a socket error or a server error message
_handleClientError(satelliteError: SatelliteError) {
if (this.initializing && !this.initializing.finished()) {
Expand Down
21 changes: 16 additions & 5 deletions clients/typescript/test/satellite/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -311,9 +311,14 @@ export const makeContext = async (
await mkdir('.tmp', { recursive: true })
const dbName = `.tmp/test-${randomValue()}.db`
const db = new SqliteDatabase(dbName)
const stop = async () => {
if (!db.open) return
db.close()
}
const adapter = new SqliteDatabaseAdapter(db)
const migrator = new SqliteBundleMigrator(adapter, sqliteMigrations)
makeContextInternal(t, dbName, adapter, migrator, namespace, options)
t.context.stop = stop
}

export const makePgContext = async (
Expand All @@ -337,7 +342,10 @@ export const makePgliteContext = async (
) => {
const dbName = `test-${randomValue()}`
const db = new PGlite()
const stop = () => db.close()
const stop = async () => {
if (db.closed) return
await db.close()
}
const adapter = new PgliteDatabaseAdapter(db)
const migrator = new PgBundleMigrator(adapter, pgMigrations)
makeContextInternal(t, dbName, adapter, migrator, namespace, options)
Expand Down Expand Up @@ -380,8 +388,12 @@ export const mockElectricClient = async (
return electric
}

export const clean = async (t: ExecutionContext<{ dbName: string }>) => {
const { dbName } = t.context
export const cleanAndStopDb = async (
t: ExecutionContext<{ dbName: string; stop?: () => Promise<void> }>
) => {
const { dbName, stop } = t.context

await stop?.()

await removeFile(dbName, { force: true })
await removeFile(`${dbName}-journal`, { force: true })
Expand All @@ -396,8 +408,7 @@ export const cleanAndStopSatellite = async (
) => {
const { satellite } = t.context
await satellite.stop()
await clean(t)
await t.context.stop?.()
await cleanAndStopDb(t)
}

export async function migrateDb(
Expand Down
Loading

0 comments on commit 237e323

Please sign in to comment.