Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(client): Better throttle snapshot stop strategy #1251

Merged
merged 13 commits into from
May 15, 2024
5 changes: 5 additions & 0 deletions .changeset/red-insects-provide.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"electric-sql": patch
---

Better throttle snapshot stop strategy
4 changes: 3 additions & 1 deletion clients/typescript/src/drivers/node-postgres/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,12 @@ export async function createEmbeddedPostgres(
const db = pg.getPgClient()
await db.connect()

let stopPromise: Promise<void>

// We use the database directory as the name
// because it uniquely identifies the DB
return {
db,
stop: () => pg.stop(),
stop: () => (stopPromise ??= pg.stop()),
}
}
11 changes: 8 additions & 3 deletions clients/typescript/src/satellite/mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,8 @@ export class MockSatelliteClient
transactionsCb?: TransactionCallback
additionalDataCb?: AdditionalDataCallback

outboundStartedCallback?: OutboundStartedCallback

relationData: Record<string, DataRecord[]> = {}

deliverFirst = false
Expand Down Expand Up @@ -423,15 +425,15 @@ export class MockSatelliteClient
}

unsubscribeToTransactions(): void {
throw new Error('Method not implemented.')
this.transactionsCb = undefined
}

subscribeToAdditionalData(callback: AdditionalDataCallback): void {
this.additionalDataCb = callback
}

unsubscribeToAdditionalData(_cb: AdditionalDataCallback): void {
throw new Error('Method not implemented.')
this.additionalDataCb = undefined
}

enqueueTransaction(transaction: DataTransaction): void {
Expand All @@ -448,10 +450,13 @@ export class MockSatelliteClient

subscribeToOutboundStarted(callback: OutboundStartedCallback): void {
this.on('outbound_started', callback)
this.outboundStartedCallback = callback
}

unsubscribeToOutboundStarted(): void {
throw new Error('Method not implemented.')
if (!this.outboundStartedCallback) return
this.removeListener('outbound_started', this.outboundStartedCallback)
this.outboundStartedCallback = undefined
}

sendErrorAfterTimeout(subscriptionId: string, timeout: number): void {
Expand Down
108 changes: 86 additions & 22 deletions clients/typescript/src/satellite/process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import {
Uuid,
DbRecord as DataRecord,
ReplicatedRowTransformer,
ServerTransaction,
} from '../util/types'
import { SatelliteOpts } from './config'
import { Client, Satellite } from './index'
Expand Down Expand Up @@ -155,6 +156,8 @@ export class SatelliteProcess implements Satellite {
private _connectRetryHandler: ConnectRetryHandler
private initializing?: Waiter

private _removeClientListeners?: () => void

constructor(
dbName: DbName,
adapter: DatabaseAdapter,
Expand Down Expand Up @@ -191,8 +194,6 @@ export class SatelliteProcess implements Satellite {
this.shapeRequestIdGenerator = this.subscriptionIdGenerator

this._connectRetryHandler = connectRetryHandler

this.setClientListeners()
}

/**
Expand All @@ -212,6 +213,8 @@ export class SatelliteProcess implements Satellite {
await this.logDatabaseVersion()
}

this.setClientListeners()

await this.migrator.up()

const isVerified = await this._verifyTableStructure()
Expand Down Expand Up @@ -251,6 +254,7 @@ export class SatelliteProcess implements Satellite {
this.notifier.subscribeToPotentialDataChanges(this._throttledSnapshot)

// Start polling to request a snapshot every `pollingInterval` ms.
clearInterval(this._pollingInterval)
this._pollingInterval = setInterval(
this._throttledSnapshot,
this.opts.pollingInterval
Expand Down Expand Up @@ -314,17 +318,54 @@ export class SatelliteProcess implements Satellite {
await this.adapter.runInTransaction(...stmtsWithTriggers)
}

// Adds all the necessary listeners to the satellite client
// They can be cleared up by calling the function `_removeClientListeners`
setClientListeners(): void {
this.client.subscribeToError(this._handleClientError.bind(this))
this.client.subscribeToRelations(this._updateRelations.bind(this))
this.client.subscribeToTransactions(this._applyTransaction.bind(this))
this.client.subscribeToAdditionalData(this._applyAdditionalData.bind(this))
this.client.subscribeToOutboundStarted(this._throttledSnapshot.bind(this))
// Remove any existing listeners
if (this._removeClientListeners) {
davidmartos96 marked this conversation as resolved.
Show resolved Hide resolved
this._removeClientListeners?.()
this._removeClientListeners = undefined
}

this.client.subscribeToSubscriptionEvents(
this._handleSubscriptionData.bind(this),
const clientErrorCallback = this._handleClientError.bind(this)
this.client.subscribeToError(clientErrorCallback)

const clientRelationsCallback = this._handleClientRelations.bind(this)
this.client.subscribeToRelations(clientRelationsCallback)

const clientTransactionsCallback = this._handleClientTransactions.bind(this)
this.client.subscribeToTransactions(clientTransactionsCallback)

const clientAdditionalDataCallback =
this._handleClientAdditionalData.bind(this)
this.client.subscribeToAdditionalData(clientAdditionalDataCallback)

const clientOutboundStartedCallback =
this._handleClientOutboundStarted.bind(this)
this.client.subscribeToOutboundStarted(clientOutboundStartedCallback)

const clientSubscriptionDataCallback =
this._handleSubscriptionData.bind(this)
const clientSubscriptionErrorCallback =
this._handleSubscriptionError.bind(this)
this.client.subscribeToSubscriptionEvents(
clientSubscriptionDataCallback,
clientSubscriptionErrorCallback
)

// Keep a way to remove the client listeners
this._removeClientListeners = () => {
this.client.unsubscribeToError(clientErrorCallback)
this.client.unsubscribeToRelations(clientRelationsCallback)
this.client.unsubscribeToTransactions(clientTransactionsCallback)
this.client.unsubscribeToAdditionalData(clientAdditionalDataCallback)
this.client.unsubscribeToOutboundStarted(clientOutboundStartedCallback)

this.client.unsubscribeToSubscriptionEvents(
clientSubscriptionDataCallback,
clientSubscriptionErrorCallback
)
}
}

// Unsubscribe from data changes and stop polling
Expand All @@ -333,19 +374,9 @@ export class SatelliteProcess implements Satellite {
}

private async _stop(shutdown?: boolean): Promise<void> {
// Stop snapshotting and polling for changes.
this._throttledSnapshot.cancel()

// Ensure that no snapshot is left running in the background
// by acquiring the mutex and releasing it immediately.
const releaseMutex = await this.snapshotMutex.acquire()
releaseMutex()

if (this._pollingInterval !== undefined) {
clearInterval(this._pollingInterval)

this._pollingInterval = undefined
}
// Stop snapshot polling
clearInterval(this._pollingInterval)
this._pollingInterval = undefined

// Unsubscribe all listeners and remove them
const unsubscribers = [
Expand All @@ -362,13 +393,30 @@ export class SatelliteProcess implements Satellite {
}
})

this._removeClientListeners?.()
this._removeClientListeners = undefined

// Cancel the snapshot throttle
this._throttledSnapshot.cancel()

// Make sure no snapshot is running after we stop the process, otherwise we might be trying to
// interact with a closed database connection
await this._waitForActiveSnapshots()

this.disconnect()

if (shutdown) {
this.client.shutdown()
}
}

// Ensure that no snapshot is left running in the background
// by acquiring the mutex and releasing it immediately.
async _waitForActiveSnapshots(): Promise<void> {
const releaseMutex = await this.snapshotMutex.acquire()
releaseMutex()
}

async subscribe(shapeDefinitions: Shape[]): Promise<ShapeSubscription> {
// Await for client to be ready before doing anything else
await this.initializing?.waitOn()
Expand Down Expand Up @@ -674,6 +722,22 @@ export class SatelliteProcess implements Satellite {
}
}

_handleClientRelations(relation: Relation): void {
this._updateRelations(relation)
}

async _handleClientTransactions(tx: ServerTransaction) {
await this._applyTransaction(tx)
}

async _handleClientAdditionalData(data: AdditionalData) {
await this._applyAdditionalData(data)
}

async _handleClientOutboundStarted() {
await this._throttledSnapshot()
}

// handles async client errors: can be a socket error or a server error message
_handleClientError(satelliteError: SatelliteError) {
if (this.initializing && !this.initializing.finished()) {
Expand Down
21 changes: 16 additions & 5 deletions clients/typescript/test/satellite/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -311,9 +311,14 @@ export const makeContext = async (
await mkdir('.tmp', { recursive: true })
const dbName = `.tmp/test-${randomValue()}.db`
const db = new SqliteDatabase(dbName)
const stop = async () => {
if (!db.open) return
db.close()
}
const adapter = new SqliteDatabaseAdapter(db)
const migrator = new SqliteBundleMigrator(adapter, sqliteMigrations)
makeContextInternal(t, dbName, adapter, migrator, namespace, options)
t.context.stop = stop
}

export const makePgContext = async (
Expand All @@ -337,7 +342,10 @@ export const makePgliteContext = async (
) => {
const dbName = `test-${randomValue()}`
const db = new PGlite()
const stop = () => db.close()
const stop = async () => {
if (db.closed) return
await db.close()
}
const adapter = new PgliteDatabaseAdapter(db)
const migrator = new PgBundleMigrator(adapter, pgMigrations)
makeContextInternal(t, dbName, adapter, migrator, namespace, options)
Expand Down Expand Up @@ -380,8 +388,12 @@ export const mockElectricClient = async (
return electric
}

export const clean = async (t: ExecutionContext<{ dbName: string }>) => {
const { dbName } = t.context
export const cleanAndStopDb = async (
t: ExecutionContext<{ dbName: string; stop?: () => Promise<void> }>
) => {
const { dbName, stop } = t.context

await stop?.()

await removeFile(dbName, { force: true })
await removeFile(`${dbName}-journal`, { force: true })
Expand All @@ -396,8 +408,7 @@ export const cleanAndStopSatellite = async (
) => {
const { satellite } = t.context
await satellite.stop()
await clean(t)
await t.context.stop?.()
await cleanAndStopDb(t)
}

export async function migrateDb(
Expand Down
Loading
Loading