diff --git a/.changeset/red-insects-provide.md b/.changeset/red-insects-provide.md new file mode 100644 index 0000000000..b0d0f34ae8 --- /dev/null +++ b/.changeset/red-insects-provide.md @@ -0,0 +1,5 @@ +--- +"electric-sql": patch +--- + +Better throttle snapshot stop strategy diff --git a/clients/typescript/src/drivers/node-postgres/database.ts b/clients/typescript/src/drivers/node-postgres/database.ts index 5d6d21d956..f9cfb914f7 100644 --- a/clients/typescript/src/drivers/node-postgres/database.ts +++ b/clients/typescript/src/drivers/node-postgres/database.ts @@ -58,10 +58,12 @@ export async function createEmbeddedPostgres( const db = pg.getPgClient() await db.connect() + let stopPromise: Promise + // We use the database directory as the name // because it uniquely identifies the DB return { db, - stop: () => pg.stop(), + stop: () => (stopPromise ??= pg.stop()), } } diff --git a/clients/typescript/src/satellite/mock.ts b/clients/typescript/src/satellite/mock.ts index 9d5bf7f8be..b2637dd929 100644 --- a/clients/typescript/src/satellite/mock.ts +++ b/clients/typescript/src/satellite/mock.ts @@ -208,6 +208,8 @@ export class MockSatelliteClient transactionsCb?: TransactionCallback additionalDataCb?: AdditionalDataCallback + outboundStartedCallback?: OutboundStartedCallback + relationData: Record = {} deliverFirst = false @@ -423,7 +425,7 @@ export class MockSatelliteClient } unsubscribeToTransactions(): void { - throw new Error('Method not implemented.') + this.transactionsCb = undefined } subscribeToAdditionalData(callback: AdditionalDataCallback): void { @@ -431,7 +433,7 @@ export class MockSatelliteClient } unsubscribeToAdditionalData(_cb: AdditionalDataCallback): void { - throw new Error('Method not implemented.') + this.additionalDataCb = undefined } enqueueTransaction(transaction: DataTransaction): void { @@ -448,10 +450,13 @@ export class MockSatelliteClient subscribeToOutboundStarted(callback: OutboundStartedCallback): void { this.on('outbound_started', callback) + this.outboundStartedCallback = callback } unsubscribeToOutboundStarted(): void { - throw new Error('Method not implemented.') + if (!this.outboundStartedCallback) return + this.removeListener('outbound_started', this.outboundStartedCallback) + this.outboundStartedCallback = undefined } sendErrorAfterTimeout(subscriptionId: string, timeout: number): void { diff --git a/clients/typescript/src/satellite/process.ts b/clients/typescript/src/satellite/process.ts index 41c4e96edb..3dd732c831 100644 --- a/clients/typescript/src/satellite/process.ts +++ b/clients/typescript/src/satellite/process.ts @@ -39,6 +39,7 @@ import { Uuid, DbRecord as DataRecord, ReplicatedRowTransformer, + ServerTransaction, } from '../util/types' import { SatelliteOpts } from './config' import { Client, Satellite } from './index' @@ -155,6 +156,8 @@ export class SatelliteProcess implements Satellite { private _connectRetryHandler: ConnectRetryHandler private initializing?: Waiter + private _removeClientListeners?: () => void + constructor( dbName: DbName, adapter: DatabaseAdapter, @@ -191,8 +194,6 @@ export class SatelliteProcess implements Satellite { this.shapeRequestIdGenerator = this.subscriptionIdGenerator this._connectRetryHandler = connectRetryHandler - - this.setClientListeners() } /** @@ -212,6 +213,8 @@ export class SatelliteProcess implements Satellite { await this.logDatabaseVersion() } + this.setClientListeners() + await this.migrator.up() const isVerified = await this._verifyTableStructure() @@ -251,6 +254,7 @@ export class SatelliteProcess implements Satellite { this.notifier.subscribeToPotentialDataChanges(this._throttledSnapshot) // Start polling to request a snapshot every `pollingInterval` ms. + clearInterval(this._pollingInterval) this._pollingInterval = setInterval( this._throttledSnapshot, this.opts.pollingInterval @@ -314,17 +318,54 @@ export class SatelliteProcess implements Satellite { await this.adapter.runInTransaction(...stmtsWithTriggers) } + // Adds all the necessary listeners to the satellite client + // They can be cleared up by calling the function `_removeClientListeners` setClientListeners(): void { - this.client.subscribeToError(this._handleClientError.bind(this)) - this.client.subscribeToRelations(this._updateRelations.bind(this)) - this.client.subscribeToTransactions(this._applyTransaction.bind(this)) - this.client.subscribeToAdditionalData(this._applyAdditionalData.bind(this)) - this.client.subscribeToOutboundStarted(this._throttledSnapshot.bind(this)) + // Remove any existing listeners + if (this._removeClientListeners) { + this._removeClientListeners?.() + this._removeClientListeners = undefined + } - this.client.subscribeToSubscriptionEvents( - this._handleSubscriptionData.bind(this), + const clientErrorCallback = this._handleClientError.bind(this) + this.client.subscribeToError(clientErrorCallback) + + const clientRelationsCallback = this._handleClientRelations.bind(this) + this.client.subscribeToRelations(clientRelationsCallback) + + const clientTransactionsCallback = this._handleClientTransactions.bind(this) + this.client.subscribeToTransactions(clientTransactionsCallback) + + const clientAdditionalDataCallback = + this._handleClientAdditionalData.bind(this) + this.client.subscribeToAdditionalData(clientAdditionalDataCallback) + + const clientOutboundStartedCallback = + this._handleClientOutboundStarted.bind(this) + this.client.subscribeToOutboundStarted(clientOutboundStartedCallback) + + const clientSubscriptionDataCallback = + this._handleSubscriptionData.bind(this) + const clientSubscriptionErrorCallback = this._handleSubscriptionError.bind(this) + this.client.subscribeToSubscriptionEvents( + clientSubscriptionDataCallback, + clientSubscriptionErrorCallback ) + + // Keep a way to remove the client listeners + this._removeClientListeners = () => { + this.client.unsubscribeToError(clientErrorCallback) + this.client.unsubscribeToRelations(clientRelationsCallback) + this.client.unsubscribeToTransactions(clientTransactionsCallback) + this.client.unsubscribeToAdditionalData(clientAdditionalDataCallback) + this.client.unsubscribeToOutboundStarted(clientOutboundStartedCallback) + + this.client.unsubscribeToSubscriptionEvents( + clientSubscriptionDataCallback, + clientSubscriptionErrorCallback + ) + } } // Unsubscribe from data changes and stop polling @@ -333,19 +374,9 @@ export class SatelliteProcess implements Satellite { } private async _stop(shutdown?: boolean): Promise { - // Stop snapshotting and polling for changes. - this._throttledSnapshot.cancel() - - // Ensure that no snapshot is left running in the background - // by acquiring the mutex and releasing it immediately. - const releaseMutex = await this.snapshotMutex.acquire() - releaseMutex() - - if (this._pollingInterval !== undefined) { - clearInterval(this._pollingInterval) - - this._pollingInterval = undefined - } + // Stop snapshot polling + clearInterval(this._pollingInterval) + this._pollingInterval = undefined // Unsubscribe all listeners and remove them const unsubscribers = [ @@ -362,6 +393,16 @@ export class SatelliteProcess implements Satellite { } }) + this._removeClientListeners?.() + this._removeClientListeners = undefined + + // Cancel the snapshot throttle + this._throttledSnapshot.cancel() + + // Make sure no snapshot is running after we stop the process, otherwise we might be trying to + // interact with a closed database connection + await this._waitForActiveSnapshots() + this.disconnect() if (shutdown) { @@ -369,6 +410,13 @@ export class SatelliteProcess implements Satellite { } } + // Ensure that no snapshot is left running in the background + // by acquiring the mutex and releasing it immediately. + async _waitForActiveSnapshots(): Promise { + const releaseMutex = await this.snapshotMutex.acquire() + releaseMutex() + } + async subscribe(shapeDefinitions: Shape[]): Promise { // Await for client to be ready before doing anything else await this.initializing?.waitOn() @@ -674,6 +722,22 @@ export class SatelliteProcess implements Satellite { } } + _handleClientRelations(relation: Relation): void { + this._updateRelations(relation) + } + + async _handleClientTransactions(tx: ServerTransaction) { + await this._applyTransaction(tx) + } + + async _handleClientAdditionalData(data: AdditionalData) { + await this._applyAdditionalData(data) + } + + async _handleClientOutboundStarted() { + await this._throttledSnapshot() + } + // handles async client errors: can be a socket error or a server error message _handleClientError(satelliteError: SatelliteError) { if (this.initializing && !this.initializing.finished()) { diff --git a/clients/typescript/test/satellite/common.ts b/clients/typescript/test/satellite/common.ts index e6fb68f437..3526992a38 100644 --- a/clients/typescript/test/satellite/common.ts +++ b/clients/typescript/test/satellite/common.ts @@ -311,9 +311,14 @@ export const makeContext = async ( await mkdir('.tmp', { recursive: true }) const dbName = `.tmp/test-${randomValue()}.db` const db = new SqliteDatabase(dbName) + const stop = async () => { + if (!db.open) return + db.close() + } const adapter = new SqliteDatabaseAdapter(db) const migrator = new SqliteBundleMigrator(adapter, sqliteMigrations) makeContextInternal(t, dbName, adapter, migrator, namespace, options) + t.context.stop = stop } export const makePgContext = async ( @@ -337,7 +342,10 @@ export const makePgliteContext = async ( ) => { const dbName = `test-${randomValue()}` const db = new PGlite() - const stop = () => db.close() + const stop = async () => { + if (db.closed) return + await db.close() + } const adapter = new PgliteDatabaseAdapter(db) const migrator = new PgBundleMigrator(adapter, pgMigrations) makeContextInternal(t, dbName, adapter, migrator, namespace, options) @@ -380,8 +388,12 @@ export const mockElectricClient = async ( return electric } -export const clean = async (t: ExecutionContext<{ dbName: string }>) => { - const { dbName } = t.context +export const cleanAndStopDb = async ( + t: ExecutionContext<{ dbName: string; stop?: () => Promise }> +) => { + const { dbName, stop } = t.context + + await stop?.() await removeFile(dbName, { force: true }) await removeFile(`${dbName}-journal`, { force: true }) @@ -396,8 +408,7 @@ export const cleanAndStopSatellite = async ( ) => { const { satellite } = t.context await satellite.stop() - await clean(t) - await t.context.stop?.() + await cleanAndStopDb(t) } export async function migrateDb( diff --git a/clients/typescript/test/satellite/process.ts b/clients/typescript/test/satellite/process.ts index d7edcc0619..22179b1e5c 100644 --- a/clients/typescript/test/satellite/process.ts +++ b/clients/typescript/test/satellite/process.ts @@ -35,7 +35,11 @@ import { SatelliteError, SatelliteErrorCode, } from '../../src/util/types' -import { relations, ContextType as CommonContextType, clean } from './common' +import { + relations, + ContextType as CommonContextType, + cleanAndStopDb, +} from './common' import { numberToBytes, base64, blobToHexString } from '../../src/util/encoders' @@ -143,7 +147,8 @@ export const processTests = (test: TestFn) => { await satellite.stop() - await startSatellite(satellite, authState, token) + const conn = await startSatellite(satellite, authState, token) + await conn.connectionPromise const clientId2 = satellite._authState!.clientId t.truthy(clientId2) @@ -154,11 +159,12 @@ export const processTests = (test: TestFn) => { const { satellite, authState } = t.context await t.notThrowsAsync(async () => { - await startSatellite( + const conn = await startSatellite( satellite, authState, insecureAuthToken({ user_id: 'test-userA' }) ) + await conn.connectionPromise }) }) @@ -166,11 +172,12 @@ export const processTests = (test: TestFn) => { const { satellite, authState } = t.context await t.notThrowsAsync(async () => { - await startSatellite( + const conn = await startSatellite( satellite, authState, insecureAuthToken({ sub: 'test-userB' }) ) + await conn.connectionPromise }) }) @@ -190,7 +197,7 @@ export const processTests = (test: TestFn) => { test('cannot update user id', async (t) => { const { satellite, authState, token } = t.context - await startSatellite(satellite, authState, token) + const conn = await startSatellite(satellite, authState, token) const error = t.throws(() => { satellite.setToken(insecureAuthToken({ sub: 'test-user2' })) }) @@ -198,6 +205,7 @@ export const processTests = (test: TestFn) => { error?.message, "Can't change user ID when reconnecting. Previously connected with user ID 'test-user' but trying to reconnect with user ID 'test-user2'" ) + await conn.connectionPromise }) test('cannot UPDATE primary key', async (t) => { @@ -1547,7 +1555,8 @@ export const processTests = (test: TestFn) => { token, } = t.context await runMigrations() - await startSatellite(satellite, authState, token) + const conn = await startSatellite(satellite, authState, token) + await conn.connectionPromise // give some time for Satellite to start // (needed because connecting and starting replication are async) @@ -1556,16 +1565,18 @@ export const processTests = (test: TestFn) => { // we're expecting 2 assertions t.plan(4) - notifier.subscribeToConnectivityStateChanges( - (notification: ConnectivityStateChangeNotification) => { - t.is(notification.dbName, dbName) - t.is(notification.connectivityState.status, 'disconnected') - t.is( - notification.connectivityState.reason?.code, - SatelliteErrorCode.AUTH_EXPIRED - ) - } - ) + const unsubConnectivityChanges = + notifier.subscribeToConnectivityStateChanges( + (notification: ConnectivityStateChangeNotification) => { + t.is(notification.dbName, dbName) + t.is(notification.connectivityState.status, 'disconnected') + t.is( + notification.connectivityState.reason?.code, + SatelliteErrorCode.AUTH_EXPIRED + ) + } + ) + t.teardown(unsubConnectivityChanges) // mock JWT expiration client.emitSocketClosedError(SatelliteErrorCode.AUTH_EXPIRED) @@ -2504,7 +2515,7 @@ export const processTests = (test: TestFn) => { await satellite.stop() // Remove/close the database connection - await clean(t) + await cleanAndStopDb(t) // Wait for the snapshot to finish to consider the test successful await snapshotPromise @@ -2535,4 +2546,47 @@ export const processTests = (test: TestFn) => { t.pass() }) + + test("don't schedule snapshots from polling interval when closing satellite process", async (t) => { + const { + adapter, + runMigrations, + satellite, + authState, + token, + opts, + builder, + } = t.context + + await runMigrations() + + // Replace the snapshot function to simulate a slow snapshot + // that access the database after closing + satellite._performSnapshot = async () => { + try { + await sleepAsync(500) + await adapter.query(builder.getLocalTableNames()) + return new Date() + } catch (e) { + t.fail() + throw e + } + } + + const conn = await startSatellite(satellite, authState, token) + await conn.connectionPromise + + // Let the process schedule a snapshot + await sleepAsync(opts.pollingInterval * 2) + + await satellite.stop() + + // Remove/close the database connection + await cleanAndStopDb(t) + + // Wait for the snapshot to finish to consider the test successful + await sleepAsync(1000) + + t.pass() + }) } diff --git a/clients/typescript/test/satellite/sqlite/process.timing.test.ts b/clients/typescript/test/satellite/sqlite/process.timing.test.ts index 649fdebd9b..9bb861768f 100644 --- a/clients/typescript/test/satellite/sqlite/process.timing.test.ts +++ b/clients/typescript/test/satellite/sqlite/process.timing.test.ts @@ -1,9 +1,9 @@ import anyTest, { TestFn } from 'ava' import { processTimingTests } from '../process.timing' -import { makeContext, clean, ContextType } from '../common' +import { makeContext, cleanAndStopDb, ContextType } from '../common' const test = anyTest as TestFn test.beforeEach(async (t) => makeContext(t, 'main')) -test.afterEach.always(clean) +test.afterEach.always(cleanAndStopDb) processTimingTests(test)