From 237e32341adac36a44b061a61803972467d81b18 Mon Sep 17 00:00:00 2001 From: David Martos Date: Wed, 15 May 2024 10:29:17 +0200 Subject: [PATCH] fix(client): Better throttle snapshot stop strategy (#1251) We've encountered a race condition that can occurr if, after cancelling the throttle when stopping the process, some other piece of code calls throttlesnapshot. That's because cancelling the throttle doesn't prevent from scheduling new ones. The proposed solution is to assign undefined to the function, so places that would normally trigger a snapshot, like the poll interval, would just do nothing until properly closed. On top of this, we found that `setClientListeners` is only called when instantiating the process, but it's cleaned up when calling `stop` with the `disconnect` function. This makes it so that stopping and then starting the process won't have some of the client listeners properly set up. The proposed solution is to just move the client listeners intialization to the `start` function. I'm not sure how to test this. It would probably need some way to mock client events after a stop and a start. --- .changeset/red-insects-provide.md | 5 + .../src/drivers/node-postgres/database.ts | 4 +- clients/typescript/src/satellite/mock.ts | 11 +- clients/typescript/src/satellite/process.ts | 108 ++++++++++++++---- clients/typescript/test/satellite/common.ts | 21 +++- clients/typescript/test/satellite/process.ts | 88 +++++++++++--- .../satellite/sqlite/process.timing.test.ts | 4 +- 7 files changed, 191 insertions(+), 50 deletions(-) create mode 100644 .changeset/red-insects-provide.md diff --git a/.changeset/red-insects-provide.md b/.changeset/red-insects-provide.md new file mode 100644 index 0000000000..b0d0f34ae8 --- /dev/null +++ b/.changeset/red-insects-provide.md @@ -0,0 +1,5 @@ +--- +"electric-sql": patch +--- + +Better throttle snapshot stop strategy diff --git a/clients/typescript/src/drivers/node-postgres/database.ts b/clients/typescript/src/drivers/node-postgres/database.ts index 5d6d21d956..f9cfb914f7 100644 --- a/clients/typescript/src/drivers/node-postgres/database.ts +++ b/clients/typescript/src/drivers/node-postgres/database.ts @@ -58,10 +58,12 @@ export async function createEmbeddedPostgres( const db = pg.getPgClient() await db.connect() + let stopPromise: Promise + // We use the database directory as the name // because it uniquely identifies the DB return { db, - stop: () => pg.stop(), + stop: () => (stopPromise ??= pg.stop()), } } diff --git a/clients/typescript/src/satellite/mock.ts b/clients/typescript/src/satellite/mock.ts index 9d5bf7f8be..b2637dd929 100644 --- a/clients/typescript/src/satellite/mock.ts +++ b/clients/typescript/src/satellite/mock.ts @@ -208,6 +208,8 @@ export class MockSatelliteClient transactionsCb?: TransactionCallback additionalDataCb?: AdditionalDataCallback + outboundStartedCallback?: OutboundStartedCallback + relationData: Record = {} deliverFirst = false @@ -423,7 +425,7 @@ export class MockSatelliteClient } unsubscribeToTransactions(): void { - throw new Error('Method not implemented.') + this.transactionsCb = undefined } subscribeToAdditionalData(callback: AdditionalDataCallback): void { @@ -431,7 +433,7 @@ export class MockSatelliteClient } unsubscribeToAdditionalData(_cb: AdditionalDataCallback): void { - throw new Error('Method not implemented.') + this.additionalDataCb = undefined } enqueueTransaction(transaction: DataTransaction): void { @@ -448,10 +450,13 @@ export class MockSatelliteClient subscribeToOutboundStarted(callback: OutboundStartedCallback): void { this.on('outbound_started', callback) + this.outboundStartedCallback = callback } unsubscribeToOutboundStarted(): void { - throw new Error('Method not implemented.') + if (!this.outboundStartedCallback) return + this.removeListener('outbound_started', this.outboundStartedCallback) + this.outboundStartedCallback = undefined } sendErrorAfterTimeout(subscriptionId: string, timeout: number): void { diff --git a/clients/typescript/src/satellite/process.ts b/clients/typescript/src/satellite/process.ts index 41c4e96edb..3dd732c831 100644 --- a/clients/typescript/src/satellite/process.ts +++ b/clients/typescript/src/satellite/process.ts @@ -39,6 +39,7 @@ import { Uuid, DbRecord as DataRecord, ReplicatedRowTransformer, + ServerTransaction, } from '../util/types' import { SatelliteOpts } from './config' import { Client, Satellite } from './index' @@ -155,6 +156,8 @@ export class SatelliteProcess implements Satellite { private _connectRetryHandler: ConnectRetryHandler private initializing?: Waiter + private _removeClientListeners?: () => void + constructor( dbName: DbName, adapter: DatabaseAdapter, @@ -191,8 +194,6 @@ export class SatelliteProcess implements Satellite { this.shapeRequestIdGenerator = this.subscriptionIdGenerator this._connectRetryHandler = connectRetryHandler - - this.setClientListeners() } /** @@ -212,6 +213,8 @@ export class SatelliteProcess implements Satellite { await this.logDatabaseVersion() } + this.setClientListeners() + await this.migrator.up() const isVerified = await this._verifyTableStructure() @@ -251,6 +254,7 @@ export class SatelliteProcess implements Satellite { this.notifier.subscribeToPotentialDataChanges(this._throttledSnapshot) // Start polling to request a snapshot every `pollingInterval` ms. + clearInterval(this._pollingInterval) this._pollingInterval = setInterval( this._throttledSnapshot, this.opts.pollingInterval @@ -314,17 +318,54 @@ export class SatelliteProcess implements Satellite { await this.adapter.runInTransaction(...stmtsWithTriggers) } + // Adds all the necessary listeners to the satellite client + // They can be cleared up by calling the function `_removeClientListeners` setClientListeners(): void { - this.client.subscribeToError(this._handleClientError.bind(this)) - this.client.subscribeToRelations(this._updateRelations.bind(this)) - this.client.subscribeToTransactions(this._applyTransaction.bind(this)) - this.client.subscribeToAdditionalData(this._applyAdditionalData.bind(this)) - this.client.subscribeToOutboundStarted(this._throttledSnapshot.bind(this)) + // Remove any existing listeners + if (this._removeClientListeners) { + this._removeClientListeners?.() + this._removeClientListeners = undefined + } - this.client.subscribeToSubscriptionEvents( - this._handleSubscriptionData.bind(this), + const clientErrorCallback = this._handleClientError.bind(this) + this.client.subscribeToError(clientErrorCallback) + + const clientRelationsCallback = this._handleClientRelations.bind(this) + this.client.subscribeToRelations(clientRelationsCallback) + + const clientTransactionsCallback = this._handleClientTransactions.bind(this) + this.client.subscribeToTransactions(clientTransactionsCallback) + + const clientAdditionalDataCallback = + this._handleClientAdditionalData.bind(this) + this.client.subscribeToAdditionalData(clientAdditionalDataCallback) + + const clientOutboundStartedCallback = + this._handleClientOutboundStarted.bind(this) + this.client.subscribeToOutboundStarted(clientOutboundStartedCallback) + + const clientSubscriptionDataCallback = + this._handleSubscriptionData.bind(this) + const clientSubscriptionErrorCallback = this._handleSubscriptionError.bind(this) + this.client.subscribeToSubscriptionEvents( + clientSubscriptionDataCallback, + clientSubscriptionErrorCallback ) + + // Keep a way to remove the client listeners + this._removeClientListeners = () => { + this.client.unsubscribeToError(clientErrorCallback) + this.client.unsubscribeToRelations(clientRelationsCallback) + this.client.unsubscribeToTransactions(clientTransactionsCallback) + this.client.unsubscribeToAdditionalData(clientAdditionalDataCallback) + this.client.unsubscribeToOutboundStarted(clientOutboundStartedCallback) + + this.client.unsubscribeToSubscriptionEvents( + clientSubscriptionDataCallback, + clientSubscriptionErrorCallback + ) + } } // Unsubscribe from data changes and stop polling @@ -333,19 +374,9 @@ export class SatelliteProcess implements Satellite { } private async _stop(shutdown?: boolean): Promise { - // Stop snapshotting and polling for changes. - this._throttledSnapshot.cancel() - - // Ensure that no snapshot is left running in the background - // by acquiring the mutex and releasing it immediately. - const releaseMutex = await this.snapshotMutex.acquire() - releaseMutex() - - if (this._pollingInterval !== undefined) { - clearInterval(this._pollingInterval) - - this._pollingInterval = undefined - } + // Stop snapshot polling + clearInterval(this._pollingInterval) + this._pollingInterval = undefined // Unsubscribe all listeners and remove them const unsubscribers = [ @@ -362,6 +393,16 @@ export class SatelliteProcess implements Satellite { } }) + this._removeClientListeners?.() + this._removeClientListeners = undefined + + // Cancel the snapshot throttle + this._throttledSnapshot.cancel() + + // Make sure no snapshot is running after we stop the process, otherwise we might be trying to + // interact with a closed database connection + await this._waitForActiveSnapshots() + this.disconnect() if (shutdown) { @@ -369,6 +410,13 @@ export class SatelliteProcess implements Satellite { } } + // Ensure that no snapshot is left running in the background + // by acquiring the mutex and releasing it immediately. + async _waitForActiveSnapshots(): Promise { + const releaseMutex = await this.snapshotMutex.acquire() + releaseMutex() + } + async subscribe(shapeDefinitions: Shape[]): Promise { // Await for client to be ready before doing anything else await this.initializing?.waitOn() @@ -674,6 +722,22 @@ export class SatelliteProcess implements Satellite { } } + _handleClientRelations(relation: Relation): void { + this._updateRelations(relation) + } + + async _handleClientTransactions(tx: ServerTransaction) { + await this._applyTransaction(tx) + } + + async _handleClientAdditionalData(data: AdditionalData) { + await this._applyAdditionalData(data) + } + + async _handleClientOutboundStarted() { + await this._throttledSnapshot() + } + // handles async client errors: can be a socket error or a server error message _handleClientError(satelliteError: SatelliteError) { if (this.initializing && !this.initializing.finished()) { diff --git a/clients/typescript/test/satellite/common.ts b/clients/typescript/test/satellite/common.ts index e6fb68f437..3526992a38 100644 --- a/clients/typescript/test/satellite/common.ts +++ b/clients/typescript/test/satellite/common.ts @@ -311,9 +311,14 @@ export const makeContext = async ( await mkdir('.tmp', { recursive: true }) const dbName = `.tmp/test-${randomValue()}.db` const db = new SqliteDatabase(dbName) + const stop = async () => { + if (!db.open) return + db.close() + } const adapter = new SqliteDatabaseAdapter(db) const migrator = new SqliteBundleMigrator(adapter, sqliteMigrations) makeContextInternal(t, dbName, adapter, migrator, namespace, options) + t.context.stop = stop } export const makePgContext = async ( @@ -337,7 +342,10 @@ export const makePgliteContext = async ( ) => { const dbName = `test-${randomValue()}` const db = new PGlite() - const stop = () => db.close() + const stop = async () => { + if (db.closed) return + await db.close() + } const adapter = new PgliteDatabaseAdapter(db) const migrator = new PgBundleMigrator(adapter, pgMigrations) makeContextInternal(t, dbName, adapter, migrator, namespace, options) @@ -380,8 +388,12 @@ export const mockElectricClient = async ( return electric } -export const clean = async (t: ExecutionContext<{ dbName: string }>) => { - const { dbName } = t.context +export const cleanAndStopDb = async ( + t: ExecutionContext<{ dbName: string; stop?: () => Promise }> +) => { + const { dbName, stop } = t.context + + await stop?.() await removeFile(dbName, { force: true }) await removeFile(`${dbName}-journal`, { force: true }) @@ -396,8 +408,7 @@ export const cleanAndStopSatellite = async ( ) => { const { satellite } = t.context await satellite.stop() - await clean(t) - await t.context.stop?.() + await cleanAndStopDb(t) } export async function migrateDb( diff --git a/clients/typescript/test/satellite/process.ts b/clients/typescript/test/satellite/process.ts index d7edcc0619..22179b1e5c 100644 --- a/clients/typescript/test/satellite/process.ts +++ b/clients/typescript/test/satellite/process.ts @@ -35,7 +35,11 @@ import { SatelliteError, SatelliteErrorCode, } from '../../src/util/types' -import { relations, ContextType as CommonContextType, clean } from './common' +import { + relations, + ContextType as CommonContextType, + cleanAndStopDb, +} from './common' import { numberToBytes, base64, blobToHexString } from '../../src/util/encoders' @@ -143,7 +147,8 @@ export const processTests = (test: TestFn) => { await satellite.stop() - await startSatellite(satellite, authState, token) + const conn = await startSatellite(satellite, authState, token) + await conn.connectionPromise const clientId2 = satellite._authState!.clientId t.truthy(clientId2) @@ -154,11 +159,12 @@ export const processTests = (test: TestFn) => { const { satellite, authState } = t.context await t.notThrowsAsync(async () => { - await startSatellite( + const conn = await startSatellite( satellite, authState, insecureAuthToken({ user_id: 'test-userA' }) ) + await conn.connectionPromise }) }) @@ -166,11 +172,12 @@ export const processTests = (test: TestFn) => { const { satellite, authState } = t.context await t.notThrowsAsync(async () => { - await startSatellite( + const conn = await startSatellite( satellite, authState, insecureAuthToken({ sub: 'test-userB' }) ) + await conn.connectionPromise }) }) @@ -190,7 +197,7 @@ export const processTests = (test: TestFn) => { test('cannot update user id', async (t) => { const { satellite, authState, token } = t.context - await startSatellite(satellite, authState, token) + const conn = await startSatellite(satellite, authState, token) const error = t.throws(() => { satellite.setToken(insecureAuthToken({ sub: 'test-user2' })) }) @@ -198,6 +205,7 @@ export const processTests = (test: TestFn) => { error?.message, "Can't change user ID when reconnecting. Previously connected with user ID 'test-user' but trying to reconnect with user ID 'test-user2'" ) + await conn.connectionPromise }) test('cannot UPDATE primary key', async (t) => { @@ -1547,7 +1555,8 @@ export const processTests = (test: TestFn) => { token, } = t.context await runMigrations() - await startSatellite(satellite, authState, token) + const conn = await startSatellite(satellite, authState, token) + await conn.connectionPromise // give some time for Satellite to start // (needed because connecting and starting replication are async) @@ -1556,16 +1565,18 @@ export const processTests = (test: TestFn) => { // we're expecting 2 assertions t.plan(4) - notifier.subscribeToConnectivityStateChanges( - (notification: ConnectivityStateChangeNotification) => { - t.is(notification.dbName, dbName) - t.is(notification.connectivityState.status, 'disconnected') - t.is( - notification.connectivityState.reason?.code, - SatelliteErrorCode.AUTH_EXPIRED - ) - } - ) + const unsubConnectivityChanges = + notifier.subscribeToConnectivityStateChanges( + (notification: ConnectivityStateChangeNotification) => { + t.is(notification.dbName, dbName) + t.is(notification.connectivityState.status, 'disconnected') + t.is( + notification.connectivityState.reason?.code, + SatelliteErrorCode.AUTH_EXPIRED + ) + } + ) + t.teardown(unsubConnectivityChanges) // mock JWT expiration client.emitSocketClosedError(SatelliteErrorCode.AUTH_EXPIRED) @@ -2504,7 +2515,7 @@ export const processTests = (test: TestFn) => { await satellite.stop() // Remove/close the database connection - await clean(t) + await cleanAndStopDb(t) // Wait for the snapshot to finish to consider the test successful await snapshotPromise @@ -2535,4 +2546,47 @@ export const processTests = (test: TestFn) => { t.pass() }) + + test("don't schedule snapshots from polling interval when closing satellite process", async (t) => { + const { + adapter, + runMigrations, + satellite, + authState, + token, + opts, + builder, + } = t.context + + await runMigrations() + + // Replace the snapshot function to simulate a slow snapshot + // that access the database after closing + satellite._performSnapshot = async () => { + try { + await sleepAsync(500) + await adapter.query(builder.getLocalTableNames()) + return new Date() + } catch (e) { + t.fail() + throw e + } + } + + const conn = await startSatellite(satellite, authState, token) + await conn.connectionPromise + + // Let the process schedule a snapshot + await sleepAsync(opts.pollingInterval * 2) + + await satellite.stop() + + // Remove/close the database connection + await cleanAndStopDb(t) + + // Wait for the snapshot to finish to consider the test successful + await sleepAsync(1000) + + t.pass() + }) } diff --git a/clients/typescript/test/satellite/sqlite/process.timing.test.ts b/clients/typescript/test/satellite/sqlite/process.timing.test.ts index 649fdebd9b..9bb861768f 100644 --- a/clients/typescript/test/satellite/sqlite/process.timing.test.ts +++ b/clients/typescript/test/satellite/sqlite/process.timing.test.ts @@ -1,9 +1,9 @@ import anyTest, { TestFn } from 'ava' import { processTimingTests } from '../process.timing' -import { makeContext, clean, ContextType } from '../common' +import { makeContext, cleanAndStopDb, ContextType } from '../common' const test = anyTest as TestFn test.beforeEach(async (t) => makeContext(t, 'main')) -test.afterEach.always(clean) +test.afterEach.always(cleanAndStopDb) processTimingTests(test)