Skip to content

Commit

Permalink
fix(client): client reset should work with nested shapes (#1151)
Browse files Browse the repository at this point in the history
  • Loading branch information
icehaunter committed Apr 12, 2024
1 parent debba1b commit b7faf72
Show file tree
Hide file tree
Showing 7 changed files with 116 additions and 58 deletions.
5 changes: 5 additions & 0 deletions .changeset/strong-flies-sell.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"electric-sql": patch
---

Fixed GC on the client after a server-requested reset if nested shapes are used
4 changes: 3 additions & 1 deletion clients/typescript/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@
"lodash.partition": "^4.6.0",
"lodash.pick": "^4.4.0",
"lodash.throttle": "^4.1.1",
"lodash.uniqwith": "^4.5.0",
"loglevel": "^1.8.1",
"long": "^5.2.0",
"object.hasown": "^1.1.2",
Expand Down Expand Up @@ -216,6 +217,7 @@
"@types/lodash.partition": "^4.6.7",
"@types/lodash.pick": "^4.4.7",
"@types/lodash.throttle": "^4.1.7",
"@types/lodash.uniqwith": "^4.5.9",
"@types/node": "^18.8.4",
"@types/prompts": "^2.4.9",
"@types/react": "^18.0.18",
Expand Down Expand Up @@ -252,8 +254,8 @@
"web-worker": "^1.2.0"
},
"peerDependencies": {
"@op-engineering/op-sqlite": ">= 2.0.16",
"@capacitor-community/sqlite": ">= 5.6.2",
"@op-engineering/op-sqlite": ">= 2.0.16",
"@tauri-apps/plugin-sql": "2.0.0-alpha.5",
"expo-sqlite": ">= 13.0.0",
"react": ">= 16.8.0",
Expand Down
47 changes: 25 additions & 22 deletions clients/typescript/src/satellite/mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import {
} from '../_generated/protocol/satellite'
import { ShapeSubscription } from './process'
import { DbSchema } from '../client/model/schema'
import { getAllTablesForShape } from './shapes'

export const MOCK_BEHIND_WINDOW_LSN = 42
export const MOCK_INTERNAL_ERROR = 27
Expand Down Expand Up @@ -238,31 +239,33 @@ export class MockSatelliteClient
const shapeReqToUuid: Record<string, string> = {}

for (const shape of shapes) {
const { tablename } = shape.definition
if (tablename === 'failure' || tablename === 'Items') {
return Promise.resolve({
subscriptionId,
error: new SatelliteError(SatelliteErrorCode.TABLE_NOT_FOUND),
})
}
if (tablename === 'another' || tablename === 'User') {
return new Promise((resolve) => {
this.sendErrorAfterTimeout(subscriptionId, 1)
resolve({
const tables = getAllTablesForShape(shape.definition, 'main')
for (const { tablename } of tables) {
if (tablename === 'failure' || tablename === 'Items') {
return Promise.resolve({
subscriptionId,
error: new SatelliteError(SatelliteErrorCode.TABLE_NOT_FOUND),
})
})
} else {
shapeReqToUuid[shape.requestId] = genUUID()
const records: DataRecord[] = this.relationData[tablename] ?? []

for (const record of records) {
const dataChange: InitialDataChange = {
relation: this.relations[tablename],
record,
tags: [generateTag('remote', new Date())],
}
if (tablename === 'another' || tablename === 'User') {
return new Promise((resolve) => {
this.sendErrorAfterTimeout(subscriptionId, 1)
resolve({
subscriptionId,
})
})
} else {
shapeReqToUuid[shape.requestId] = genUUID()
const records: DataRecord[] = this.relationData[tablename] ?? []

for (const record of records) {
const dataChange: InitialDataChange = {
relation: this.relations[tablename],
record,
tags: [generateTag('remote', new Date())],
}
data.push(dataChange)
}
data.push(dataChange)
}
}
}
Expand Down
50 changes: 28 additions & 22 deletions clients/typescript/src/satellite/process.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import throttle from 'lodash.throttle'
import uniqWith from 'lodash.uniqwith'

import {
SatOpMigrate_Type,
Expand Down Expand Up @@ -64,7 +65,7 @@ import Log from 'loglevel'
import { generateTableTriggers } from '../migrators/triggers'
import { prepareInsertBatchedStatements } from '../util/statements'
import { mergeEntries } from './merge'
import { SubscriptionsManager } from './shapes'
import { SubscriptionsManager, getAllTablesForShape } from './shapes'
import { InMemorySubscriptionsManager } from './shapes/manager'
import {
Shape,
Expand Down Expand Up @@ -293,27 +294,23 @@ export class SatelliteProcess implements Satellite {
async _garbageCollectShapeHandler(
shapeDefs: ShapeDefinition[]
): Promise<void> {
const stmts: Statement[] = []
const tablenames: string[] = []
// reverts to off on commit/abort
stmts.push({ sql: 'PRAGMA defer_foreign_keys = ON' })
shapeDefs
.flatMap((def: ShapeDefinition) => def.definition)
.map((select: Shape) => {
tablenames.push('main.' + select.tablename)
return 'main.' + select.tablename
}) // We need "fully qualified" table names in the next calls
.reduce((stmts: Statement[], tablename: string) => {
stmts.push({
sql: `DELETE FROM ${tablename}`,
})
return stmts
// does not delete shadow rows but we can do that
}, stmts)
const allTables = shapeDefs
.map((def: ShapeDefinition) => def.definition)
.flatMap((x) => getAllTablesForShape(x))
const tables = uniqWith(allTables, (a, b) => a.isEqual(b))

// TODO: table and schema warrant escaping here too, but they aren't in the triggers table.
const tablenames = tables.map((x) => x.toString())

const deleteStmts = tables.map((x) => ({
sql: `DELETE FROM ${x.toString()}`,
}))

const stmtsWithTriggers = [
// reverts to off on commit/abort
{ sql: 'PRAGMA defer_foreign_keys = ON' },
...this._disableTriggers(tablenames),
...stmts,
...deleteStmts,
...this._enableTriggers(tablenames),
]

Expand Down Expand Up @@ -641,14 +638,23 @@ export class SatelliteProcess implements Satellite {
subscriptionId?: string
): Promise<void> {
Log.error('encountered a subscription error: ' + satelliteError.message)
let resettingError: any

await this._resetClientState()

try {
await this._resetClientState()
} catch (error) {
// If we encounter an error here, we want to float it to the client so that the bug is visible
// instead of just a broken state.
resettingError = error
resettingError.stack +=
'\n Encountered when handling a subscription error: \n ' +
satelliteError.stack
}
// Call the `onFailure` callback for this subscription
if (subscriptionId) {
const { reject: onFailure } = this.subscriptionNotifiers[subscriptionId]
delete this.subscriptionNotifiers[subscriptionId] // GC the notifiers for this subscription ID
onFailure(satelliteError)
onFailure(resettingError ?? satelliteError)
}
}

Expand Down
22 changes: 22 additions & 0 deletions clients/typescript/src/satellite/shapes/index.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import uniqWith from 'lodash.uniqwith'

import {
Shape,
ShapeDefinition,
ShapeRequest,
SubscriptionData,
SubscriptionId,
} from './types'
import { QualifiedTablename } from '../../util'

/**
* Manages the state of satellite shape subscriptions
Expand Down Expand Up @@ -79,3 +82,22 @@ export interface SubscriptionsManager {
*/
setState(serialized: string): void
}

/** List all tables covered by a given shape */
export function getAllTablesForShape(
shape: Shape,
schema = 'main'
): QualifiedTablename[] {
return uniqWith(doGetAllTablesForShape(shape, schema), (a, b) => a.isEqual(b))
}

function doGetAllTablesForShape(
shape: Shape,
schema: string
): QualifiedTablename[] {
const includes =
shape.include?.flatMap((x) => doGetAllTablesForShape(x.select, schema)) ??
[]
includes.push(new QualifiedTablename(schema, shape.tablename))
return includes
}
20 changes: 12 additions & 8 deletions clients/typescript/test/satellite/process.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1871,19 +1871,18 @@ test.serial('a shape delivery that triggers garbage collection', async (t) => {
t.context
await runMigrations()

const tablename = 'parent'
const qualified = new QualifiedTablename('main', tablename).toString()

// relations must be present at subscription delivery
client.setRelations(relations)
client.setRelationData(tablename, parentRecord)
client.setRelationData('parent', parentRecord)
client.setRelationData('child', childRecord)
client.setRelationData('another', {})

const conn = await startSatellite(satellite, authState, token)
await conn.connectionPromise

const shapeDef1: Shape = {
tablename: 'parent',
include: [{ foreignKey: ['parent'], select: { tablename: 'child' } }],
}
const shapeDef2: Shape = {
tablename: 'another',
Expand All @@ -1892,22 +1891,27 @@ test.serial('a shape delivery that triggers garbage collection', async (t) => {
satellite!.relations = relations
const { synced: synced1 } = await satellite.subscribe([shapeDef1])
await synced1
const row = await adapter.query({ sql: `SELECT id FROM main.parent` })
t.is(row.length, 1)
const row1 = await adapter.query({ sql: `SELECT id FROM main.child` })
t.is(row1.length, 1)
const { synced } = await satellite.subscribe([shapeDef2])

try {
await synced
t.fail()
} catch (expected: any) {
t.true(expected instanceof SatelliteError)
try {
const row = await adapter.query({
sql: `SELECT id FROM ${qualified}`,
})
const row = await adapter.query({ sql: `SELECT id FROM main.parent` })
t.is(row.length, 0)
const row1 = await adapter.query({ sql: `SELECT id FROM main.child` })
t.is(row1.length, 0)

const shadowRows = await adapter.query({
sql: `SELECT tags FROM _electric_shadow`,
})
t.is(shadowRows.length, 1)
t.is(shadowRows.length, 2)

const subsMeta = await satellite._getMeta('subscriptions')
const subsObj = JSON.parse(subsMeta)
Expand Down
26 changes: 21 additions & 5 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit b7faf72

Please sign in to comment.