Skip to content

Commit

Permalink
Protocol message to reset client state (#1213)
Browse files Browse the repository at this point in the history
  • Loading branch information
magnetised committed May 14, 2024
1 parent 0dbbc54 commit 2697839
Show file tree
Hide file tree
Showing 34 changed files with 1,186 additions and 117 deletions.
119 changes: 119 additions & 0 deletions clients/typescript/src/_generated/protocol/satellite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,21 @@ export interface SatShapeDataEnd {
$type: "Electric.Satellite.SatShapeDataEnd";
}

export interface SatClientCommand {
$type: "Electric.Satellite.SatClientCommand";
resetDatabase?: SatClientCommand_ResetDatabase | undefined;
}

export interface SatClientCommand_ResetDatabase {
$type: "Electric.Satellite.SatClientCommand.ResetDatabase";
reason: SatClientCommand_ResetDatabase_Reason;
}

export enum SatClientCommand_ResetDatabase_Reason {
PERMISSIONS_CHANGE = 0,
UNRECOGNIZED = -1,
}

/**
* represents the client permissions
* - `Rules`: the global permission rules, defined by the DDLX
Expand Down Expand Up @@ -4387,6 +4402,110 @@ export const SatShapeDataEnd = {

messageTypeRegistry.set(SatShapeDataEnd.$type, SatShapeDataEnd);

function createBaseSatClientCommand(): SatClientCommand {
return { $type: "Electric.Satellite.SatClientCommand", resetDatabase: undefined };
}

export const SatClientCommand = {
$type: "Electric.Satellite.SatClientCommand" as const,

encode(message: SatClientCommand, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer {
if (message.resetDatabase !== undefined) {
SatClientCommand_ResetDatabase.encode(message.resetDatabase, writer.uint32(10).fork()).ldelim();
}
return writer;
},

decode(input: _m0.Reader | Uint8Array, length?: number): SatClientCommand {
const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input);
let end = length === undefined ? reader.len : reader.pos + length;
const message = createBaseSatClientCommand();
while (reader.pos < end) {
const tag = reader.uint32();
switch (tag >>> 3) {
case 1:
if (tag !== 10) {
break;
}

message.resetDatabase = SatClientCommand_ResetDatabase.decode(reader, reader.uint32());
continue;
}
if ((tag & 7) === 4 || tag === 0) {
break;
}
reader.skipType(tag & 7);
}
return message;
},

create<I extends Exact<DeepPartial<SatClientCommand>, I>>(base?: I): SatClientCommand {
return SatClientCommand.fromPartial(base ?? {});
},

fromPartial<I extends Exact<DeepPartial<SatClientCommand>, I>>(object: I): SatClientCommand {
const message = createBaseSatClientCommand();
message.resetDatabase = (object.resetDatabase !== undefined && object.resetDatabase !== null)
? SatClientCommand_ResetDatabase.fromPartial(object.resetDatabase)
: undefined;
return message;
},
};

messageTypeRegistry.set(SatClientCommand.$type, SatClientCommand);

function createBaseSatClientCommand_ResetDatabase(): SatClientCommand_ResetDatabase {
return { $type: "Electric.Satellite.SatClientCommand.ResetDatabase", reason: 0 };
}

export const SatClientCommand_ResetDatabase = {
$type: "Electric.Satellite.SatClientCommand.ResetDatabase" as const,

encode(message: SatClientCommand_ResetDatabase, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer {
if (message.reason !== 0) {
writer.uint32(8).int32(message.reason);
}
return writer;
},

decode(input: _m0.Reader | Uint8Array, length?: number): SatClientCommand_ResetDatabase {
const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input);
let end = length === undefined ? reader.len : reader.pos + length;
const message = createBaseSatClientCommand_ResetDatabase();
while (reader.pos < end) {
const tag = reader.uint32();
switch (tag >>> 3) {
case 1:
if (tag !== 8) {
break;
}

message.reason = reader.int32() as any;
continue;
}
if ((tag & 7) === 4 || tag === 0) {
break;
}
reader.skipType(tag & 7);
}
return message;
},

create<I extends Exact<DeepPartial<SatClientCommand_ResetDatabase>, I>>(base?: I): SatClientCommand_ResetDatabase {
return SatClientCommand_ResetDatabase.fromPartial(base ?? {});
},

fromPartial<I extends Exact<DeepPartial<SatClientCommand_ResetDatabase>, I>>(
object: I,
): SatClientCommand_ResetDatabase {
const message = createBaseSatClientCommand_ResetDatabase();
message.reason = object.reason ?? 0;
return message;
},
};

messageTypeRegistry.set(SatClientCommand_ResetDatabase.$type, SatClientCommand_ResetDatabase);

function createBaseSatPerms(): SatPerms {
return { $type: "Electric.Satellite.SatPerms", id: Long.ZERO, userId: "", rules: undefined, roles: [] };
}
Expand Down
17 changes: 17 additions & 0 deletions clients/typescript/src/satellite/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import {
RootClientImpl,
SatRpcRequest,
SatInStartReplicationReq_Dialect,
SatClientCommand,
} from '../_generated/protocol/satellite'
import {
getObjFromString,
Expand Down Expand Up @@ -72,6 +73,7 @@ import {
DataChange,
isDataChange,
ReplicatedRowTransformer,
CommandCallback,
} from '../util/types'
import {
base64,
Expand Down Expand Up @@ -131,7 +133,9 @@ type Events = {
outbound_started: () => void
[SUBSCRIPTION_DELIVERED]: SubscriptionDeliveredCallback
[SUBSCRIPTION_ERROR]: SubscriptionErrorCallback
command: (cmd: SatClientCommand) => Promise<void>
}

type EventEmitter = AsyncEventEmitter<Events>

export class SatelliteClient implements Client {
Expand Down Expand Up @@ -182,6 +186,7 @@ export class SatelliteClient implements Client {
SatRpcRequest: (msg) => this.handleRpcRequest(msg),
SatOpLogAck: (msg) => void msg, // Server doesn't send that
SatPerms: (msg) => void msg,
SatClientCommand: (msg) => this.handleCommand(msg),
} satisfies HandlerMapping).map((e) => [getFullTypeName(e[0]), e[1]])
)

Expand Down Expand Up @@ -474,6 +479,14 @@ export class SatelliteClient implements Client {
this.emitter.removeListener('relation', callback)
}

subscribeToCommands(callback: CommandCallback): void {
this.emitter.on('command', callback)
}

unsubscribeToCommands(callback: CommandCallback): void {
this.emitter.removeListener('command', callback)
}

enqueueTransaction(transaction: DataTransaction): void {
if (this.outbound.isReplicating !== ReplicationStatus.ACTIVE) {
throw new SatelliteError(
Expand Down Expand Up @@ -958,6 +971,10 @@ export class SatelliteClient implements Client {
this.subscriptionsDataCache.shapeDataEnd()
}

private handleCommand(msg: SatClientCommand): void {
this.emitter.enqueueEmit('command', msg)
}

// For now, unsubscribe responses doesn't send any information back
// It might eventually confirm that the server processed it or was noop.
private handleUnsubscribeResponse(_msg: SatUnsubsResp): UnsubscribeResponse {
Expand Down
3 changes: 3 additions & 0 deletions clients/typescript/src/satellite/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {
AdditionalDataCallback,
DbRecord,
ReplicatedRowTransformer,
CommandCallback,
} from '../util/types'
import {
Shape,
Expand Down Expand Up @@ -116,6 +117,8 @@ export interface Client {
unsubscribeToOutboundStarted(callback: OutboundStartedCallback): void
subscribeToError(callback: ErrorCallback): void
unsubscribeToError(callback: ErrorCallback): void
subscribeToCommands(callback: CommandCallback): void
unsubscribeToCommands(callback: CommandCallback): void

subscribe(subId: string, shapes: ShapeRequest[]): Promise<SubscribeResponse>
unsubscribe(subIds: string[]): Promise<UnsubscribeResponse>
Expand Down
10 changes: 10 additions & 0 deletions clients/typescript/src/satellite/mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {
AdditionalDataCallback,
ConnectivityState,
ReplicatedRowTransformer,
CommandCallback,
} from '../util/types'
import { ElectricConfig } from '../config/index'

Expand Down Expand Up @@ -207,6 +208,7 @@ export class MockSatelliteClient
relationsCb?: (relation: Relation) => void
transactionsCb?: TransactionCallback
additionalDataCb?: AdditionalDataCallback
commandCb?: CommandCallback

relationData: Record<string, DataRecord[]> = {}

Expand Down Expand Up @@ -330,6 +332,14 @@ export class MockSatelliteClient
this.on('error', cb)
}

subscribeToCommands(callback: CommandCallback): void {
this.commandCb = callback
}

unsubscribeToCommands(_callback: CommandCallback): void {
this.commandCb = undefined
}

emitSocketClosedError(ev: SocketCloseReason): void {
this.enqueueEmit('error', new SatelliteError(ev, 'socket closed'))
}
Expand Down
10 changes: 10 additions & 0 deletions clients/typescript/src/satellite/process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import uniqWith from 'lodash.uniqwith'
import {
SatOpMigrate_Type,
SatRelation_RelationType,
SatClientCommand,
} from '../_generated/protocol/satellite'
import { AuthConfig, AuthState } from '../auth/index'
import { DatabaseAdapter } from '../electric/adapter'
Expand Down Expand Up @@ -320,6 +321,7 @@ export class SatelliteProcess implements Satellite {
this.client.subscribeToTransactions(this._applyTransaction.bind(this))
this.client.subscribeToAdditionalData(this._applyAdditionalData.bind(this))
this.client.subscribeToOutboundStarted(this._throttledSnapshot.bind(this))
this.client.subscribeToCommands(this._handleCommand.bind(this))

this.client.subscribeToSubscriptionEvents(
this._handleSubscriptionData.bind(this),
Expand Down Expand Up @@ -1418,6 +1420,14 @@ export class SatelliteProcess implements Satellite {
])
}

async _handleCommand(cmd: SatClientCommand) {
if (cmd.resetDatabase) {
await this._resetClientState({
keepSubscribedShapes: true,
})
}
}

private async maybeGarbageCollect(
origin: string,
commitTimestamp: Date
Expand Down
8 changes: 8 additions & 0 deletions clients/typescript/src/util/proto.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ const msgtypetuples: MappingTuples = {
SatRpcResponse: [22, Pb.SatRpcResponse],
SatOpLogAck: [23, Pb.SatOpLogAck],
SatPerms: [24, Pb.SatPerms],
SatClientCommand: [25, Pb.SatClientCommand],
}

const msgtypemapping = Object.fromEntries(
Expand Down Expand Up @@ -180,6 +181,7 @@ export type SatPbMsg =
| Pb.SatShapeDataEnd
| Pb.SatOpLogAck
| Pb.SatPerms
| Pb.SatClientCommand

export type SatPbMsgObj<
Msg extends { $type: string },
Expand Down Expand Up @@ -418,6 +420,12 @@ export function msgToString(message: MessageOfInterest): string {
}}`
case 'Electric.Satellite.SatPerms':
return `#SatPerms{}`
case 'Electric.Satellite.SatClientCommand':
if (message.resetDatabase) {
return `#SatClientCommand{command: #ResetDatabase{reason: ${message.resetDatabase.reason}}}`
}

return `#SatClientCommand{command: <unknown>}`
}
}

Expand Down
2 changes: 2 additions & 0 deletions clients/typescript/src/util/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
SatOpMigrate_Table,
SatOpMigrate_Type,
SatRelation_RelationType,
SatClientCommand,
} from '../_generated/protocol/satellite'
import { Tag } from '../satellite/oplog'

Expand Down Expand Up @@ -250,6 +251,7 @@ export type IncomingTransactionCallback = (
AckCb: () => void
) => void
export type OutboundStartedCallback = () => void
export type CommandCallback = (cmd: SatClientCommand) => Promise<void>

export type ConnectivityStatus = 'connected' | 'disconnected'
export type ConnectivityState = {
Expand Down
4 changes: 2 additions & 2 deletions clients/typescript/test/satellite/client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -809,7 +809,7 @@ test.serial('default and null test', async (t) => {
})
})

test.serial('subscription succesful', async (t) => {
test.serial('subscription successful', async (t) => {
await connectAndAuth(t.context)
const { client, server } = t.context
await startReplication(client, server)
Expand Down Expand Up @@ -1310,7 +1310,7 @@ test.serial('client correctly handles additional data messages', async (t) => {
})
})

test.serial('unsubscribe successfull', async (t) => {
test.serial('unsubscribe successful', async (t) => {
await connectAndAuth(t.context)
const { client, server } = t.context
await startReplication(client, server)
Expand Down
46 changes: 46 additions & 0 deletions clients/typescript/test/satellite/process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ import { AuthState, insecureAuthToken } from '../../src/auth'
import { ConnectivityStateChangeNotification } from '../../src/notifiers'
import { QueryBuilder } from '../../src/migrators/query-builder'
import { SatelliteOpts } from '../../src/satellite/config'
import {
SatClientCommand,
SatClientCommand_ResetDatabase,
SatClientCommand_ResetDatabase_Reason,
} from '../../src/_generated/protocol/satellite'

export type ContextType = CommonContextType & {
builder: QueryBuilder
Expand Down Expand Up @@ -2535,4 +2540,45 @@ export const processTests = (test: TestFn<ContextType>) => {

t.pass()
})

test('SatClientCommand.ResetDatabase clears all data', async (t) => {
const { client, satellite, adapter } = t.context
const { runMigrations, authState, token } = t.context
await runMigrations()
const tablename = 'parent'

// relations must be present at subscription delivery
client.setRelations(relations)
client.setRelationData(tablename, parentRecord)

await startSatellite(satellite, authState, token)

const shapeDef: Shape = {
tablename,
}

satellite!.relations = relations
const { synced } = await satellite.subscribe([shapeDef])
await synced
await satellite._performSnapshot()
const subscriptionCount =
satellite.subscriptions.getFulfilledSubscriptions().length

await client.commandCb!(
SatClientCommand.fromPartial({
resetDatabase: SatClientCommand_ResetDatabase.fromPartial({
reason: SatClientCommand_ResetDatabase_Reason.PERMISSIONS_CHANGE,
}),
})
)

const results = await adapter.query({
sql: 'SELECT * FROM main.parent',
})

t.deepEqual(results, [])

// make sure our existing subscriptions have been saved
t.assert(satellite.previousShapeSubscriptions.length == subscriptionCount)
})
}
Loading

0 comments on commit 2697839

Please sign in to comment.