Skip to content

Commit

Permalink
feat(client): Shape subscription status notifier (#1276)
Browse files Browse the repository at this point in the history
The usual notifier pattern but for shape subscription status

Added callback to shape manager to internalize notifying logic entirely
to the shape manager rather than having arbitrary calls within the
satellite process

We could add additionall utility to subscribe to particular subscription
with a given key but I think for now just having this general notifier
method will cover use cases that need it, we don't need to
expose/document it.
  • Loading branch information
msfstef committed May 20, 2024
1 parent f4af7f6 commit 276149d
Show file tree
Hide file tree
Showing 10 changed files with 402 additions and 131 deletions.
5 changes: 5 additions & 0 deletions .changeset/young-plants-try.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"electric-sql": patch
---

Add notifier method `subscribeToShapeSubscriptionSyncStatusChanges` for listening to shape subscription status updates
51 changes: 51 additions & 0 deletions clients/typescript/src/notifiers/event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@ import {
Notifier,
PotentialChangeCallback,
PotentialChangeNotification,
ShapeSubscriptionSyncStatusChangeCallback,
ShapeSubscriptionSyncStatusChangeNotification,
UnsubscribeFunction,
} from './index'
import { SyncStatus } from '../client/model/shapes'

export const EVENT_NAMES = {
authChange: 'auth:changed',
actualDataChange: 'data:actually:changed',
potentialDataChange: 'data:potentially:changed',
connectivityStateChange: 'network:connectivity:changed',
shapeSubscriptionStatusChange: 'shape:status:changed',
}

// Initialise global emitter to be shared between all
Expand Down Expand Up @@ -201,6 +205,41 @@ export class EventNotifier implements Notifier {
}
}

shapeSubscriptionSyncStatusChanged(
dbName: string,
key: string,
status: SyncStatus
): void {
if (!this._hasDbName(dbName)) {
return
}

this._emitShapeSubscriptionSyncStatusChange(dbName, key, status)
}

subscribeToShapeSubscriptionSyncStatusChanges(
callback: ShapeSubscriptionSyncStatusChangeCallback
): UnsubscribeFunction {
const thisHasDbName = this._hasDbName.bind(this)

const wrappedCallback = (
notification: ShapeSubscriptionSyncStatusChangeNotification
) => {
if (thisHasDbName(notification.dbName)) {
callback(notification)
}
}

this._subscribe(EVENT_NAMES.shapeSubscriptionStatusChange, wrappedCallback)

return () => {
this._unsubscribe(
EVENT_NAMES.shapeSubscriptionStatusChange,
wrappedCallback
)
}
}

_getDbNames(): DbName[] {
const idx = this.attachedDbIndex

Expand Down Expand Up @@ -261,6 +300,18 @@ export class EventNotifier implements Notifier {
return notification
}

_emitShapeSubscriptionSyncStatusChange(
dbName: DbName,
key: string,
status: SyncStatus
): ShapeSubscriptionSyncStatusChangeNotification {
const notification = { dbName, key, status }

this._emit(EVENT_NAMES.shapeSubscriptionStatusChange, notification)

return notification
}

_emit(eventName: string, notification: Notification) {
this.events.emit(eventName, notification)
}
Expand Down
26 changes: 26 additions & 0 deletions clients/typescript/src/notifiers/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { AuthState } from '../auth/index'
import { SyncStatus } from '../client/model/shapes'
import { QualifiedTablename } from '../util/tablename'
import {
ConnectivityState,
Expand Down Expand Up @@ -39,11 +40,18 @@ export interface ConnectivityStateChangeNotification {
connectivityState: ConnectivityState
}

export interface ShapeSubscriptionSyncStatusChangeNotification {
dbName: DbName
key: string
status: SyncStatus
}

export type Notification =
| AuthStateNotification
| ChangeNotification
| PotentialChangeNotification
| ConnectivityStateChangeNotification
| ShapeSubscriptionSyncStatusChangeNotification

export type AuthStateCallback = (notification: AuthStateNotification) => void
export type ChangeCallback = (notification: ChangeNotification) => void
Expand All @@ -54,11 +62,16 @@ export type ConnectivityStateChangeCallback = (
notification: ConnectivityStateChangeNotification
) => void

export type ShapeSubscriptionSyncStatusChangeCallback = (
notification: ShapeSubscriptionSyncStatusChangeNotification
) => void

export type NotificationCallback =
| AuthStateCallback
| ChangeCallback
| PotentialChangeCallback
| ConnectivityStateChangeCallback
| ShapeSubscriptionSyncStatusChangeCallback

export type UnsubscribeFunction = () => void

Expand Down Expand Up @@ -129,4 +142,17 @@ export interface Notifier {
subscribeToConnectivityStateChanges(
callback: ConnectivityStateChangeCallback
): UnsubscribeFunction

// Notification for shape subscription sync status changes.
// Every notification will include a key that uniquely identifies the
// shape for which the sync status changed, as well as the new sync status.
shapeSubscriptionSyncStatusChanged(
dbName: DbName,
key: string,
status: SyncStatus
): void

subscribeToShapeSubscriptionSyncStatusChanges(
callback: ShapeSubscriptionSyncStatusChangeCallback
): UnsubscribeFunction
}
4 changes: 2 additions & 2 deletions clients/typescript/src/satellite/mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ import {
} from '../_generated/protocol/satellite'
import { ShapeSubscription } from './process'
import { DbSchema } from '../client/model/schema'
import { getAllTablesForShape } from './shapes'
import { SyncStatus } from '../client/model/shapes'
import { getTableNamesForShapes } from './shapes/shapeManager'

export const MOCK_BEHIND_WINDOW_LSN = 42
export const MOCK_INTERNAL_ERROR = 27
Expand Down Expand Up @@ -276,7 +276,7 @@ export class MockSatelliteClient
const shapeReqToUuid: Record<string, string> = {}

for (const shape of shapes) {
const tables = getAllTablesForShape(shape.definition, 'main')
const tables = getTableNamesForShapes([shape.definition], 'main')
for (const { tablename } of tables) {
if (tablename === 'failure' || tablename === 'Items') {
return Promise.resolve({
Expand Down
14 changes: 12 additions & 2 deletions clients/typescript/src/satellite/process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,12 @@ export class SatelliteProcess implements Satellite {
this.relations = {}

this.previousShapeSubscriptions = []
this.subscriptionManager = new ShapeManager()
this.subscriptionManager = new ShapeManager(
this.notifier.shapeSubscriptionSyncStatusChanged.bind(
this.notifier,
this.dbName
)
)

this._throttledSnapshot = throttle(
this._mutexSnapshot.bind(this),
Expand Down Expand Up @@ -446,6 +451,7 @@ export class SatelliteProcess implements Satellite {

if (error) throw error

// persist subscription metadata
await this._setMeta('subscriptions', this.subscriptionManager.serialize())

return {
Expand All @@ -470,7 +476,7 @@ export class SatelliteProcess implements Satellite {
)
} else {
return this.unsubscribeIds(
this.subscriptionManager.getServerID(target.shapes)
this.subscriptionManager.getServerIDsForShapes(target.shapes)
)
}
}
Expand All @@ -482,6 +488,8 @@ export class SatelliteProcess implements Satellite {

// If the server didn't send an error, we persist the fact the subscription was deleted.
this.subscriptionManager.unsubscribeMade(subscriptionIds)

// persist subscription metadata
await this.adapter.run(
this._setMetaStatement(
'subscriptions',
Expand All @@ -501,6 +509,7 @@ export class SatelliteProcess implements Satellite {
[],
subsData.subscriptionId
)

const toBeUnsubbed = afterApply()
if (toBeUnsubbed.length > 0) await this.unsubscribeIds(toBeUnsubbed)
}
Expand Down Expand Up @@ -1549,6 +1558,7 @@ export class SatelliteProcess implements Satellite {
...stmts,
...this._enableTriggers(affectedTables)
)

this.subscriptionManager.goneBatchDelivered(subscriptionIds)

this._notifyChanges(fakeOplogEntries, 'remote')
Expand Down
108 changes: 0 additions & 108 deletions clients/typescript/src/satellite/shapes/index.ts

This file was deleted.

Loading

0 comments on commit 276149d

Please sign in to comment.