From 26978398630412d59b73630c2cade275286ee87d Mon Sep 17 00:00:00 2001 From: Garry Hill Date: Thu, 2 May 2024 17:04:48 +0100 Subject: [PATCH] Protocol message to reset client state (#1213) --- .../src/_generated/protocol/satellite.ts | 119 ++++ clients/typescript/src/satellite/client.ts | 17 + clients/typescript/src/satellite/index.ts | 3 + clients/typescript/src/satellite/mock.ts | 10 + clients/typescript/src/satellite/process.ts | 10 + clients/typescript/src/util/proto.ts | 8 + clients/typescript/src/util/types.ts | 2 + .../typescript/test/satellite/client.test.ts | 4 +- clients/typescript/test/satellite/process.ts | 46 ++ .../electric/satellite/permissions/state.ex | 7 + .../lib/electric/satellite/protobuf.ex | 10 +- .../electric/satellite/protobuf_messages.ex | 580 ++++++++++++++++++ .../lib/electric/satellite/protocol.ex | 22 +- .../lib/electric/satellite/ws_server.ex | 35 +- .../permissions_read_filter_test.exs | 31 +- .../electric/satellite/subscriptions_test.exs | 4 +- .../test/support/postgres_test_connection.ex | 84 ++- ...2_migrations_get_streamed_to_satellite.lux | 1 - ...node_satellite_sends_and_recieves_data.lux | 8 +- ...correctly_updates_serialization_caches.lux | 5 +- ..._node_satellite_does_sync_on_subscribe.lux | 4 +- ...tellite_can_delete_freshly_synced_rows.lux | 4 +- ..._can_resume_subscriptions_on_reconnect.lux | 4 +- ...3.11_node_satellite_compensations_work.lux | 4 +- ...er_correctly_continues_the_replication.lux | 4 +- ...ite_correctly_handles_move_in_move_out.lux | 4 +- ...satellite_can_disconnect_and_reconnect.lux | 6 +- ..._can_transform_at_replication_boundary.lux | 6 +- ...es_dropped_columns_are_not_electrified.lux | 6 + .../06.02_permissions_change_propagation.lux | 13 +- ...06.03_basic_permissions_read_filtering.lux | 30 - .../06.04_permissions_changes_resubscribe.lux | 192 ++++++ e2e/tests/_shared.luxinc | 8 + protocol/satellite.proto | 12 + 34 files changed, 1186 insertions(+), 117 deletions(-) create mode 100644 e2e/tests/06.04_permissions_changes_resubscribe.lux diff --git a/clients/typescript/src/_generated/protocol/satellite.ts b/clients/typescript/src/_generated/protocol/satellite.ts index 07403c3a65..2e5c697931 100644 --- a/clients/typescript/src/_generated/protocol/satellite.ts +++ b/clients/typescript/src/_generated/protocol/satellite.ts @@ -711,6 +711,21 @@ export interface SatShapeDataEnd { $type: "Electric.Satellite.SatShapeDataEnd"; } +export interface SatClientCommand { + $type: "Electric.Satellite.SatClientCommand"; + resetDatabase?: SatClientCommand_ResetDatabase | undefined; +} + +export interface SatClientCommand_ResetDatabase { + $type: "Electric.Satellite.SatClientCommand.ResetDatabase"; + reason: SatClientCommand_ResetDatabase_Reason; +} + +export enum SatClientCommand_ResetDatabase_Reason { + PERMISSIONS_CHANGE = 0, + UNRECOGNIZED = -1, +} + /** * represents the client permissions * - `Rules`: the global permission rules, defined by the DDLX @@ -4387,6 +4402,110 @@ export const SatShapeDataEnd = { messageTypeRegistry.set(SatShapeDataEnd.$type, SatShapeDataEnd); +function createBaseSatClientCommand(): SatClientCommand { + return { $type: "Electric.Satellite.SatClientCommand", resetDatabase: undefined }; +} + +export const SatClientCommand = { + $type: "Electric.Satellite.SatClientCommand" as const, + + encode(message: SatClientCommand, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + if (message.resetDatabase !== undefined) { + SatClientCommand_ResetDatabase.encode(message.resetDatabase, writer.uint32(10).fork()).ldelim(); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): SatClientCommand { + const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseSatClientCommand(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag !== 10) { + break; + } + + message.resetDatabase = SatClientCommand_ResetDatabase.decode(reader, reader.uint32()); + continue; + } + if ((tag & 7) === 4 || tag === 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + create, I>>(base?: I): SatClientCommand { + return SatClientCommand.fromPartial(base ?? {}); + }, + + fromPartial, I>>(object: I): SatClientCommand { + const message = createBaseSatClientCommand(); + message.resetDatabase = (object.resetDatabase !== undefined && object.resetDatabase !== null) + ? SatClientCommand_ResetDatabase.fromPartial(object.resetDatabase) + : undefined; + return message; + }, +}; + +messageTypeRegistry.set(SatClientCommand.$type, SatClientCommand); + +function createBaseSatClientCommand_ResetDatabase(): SatClientCommand_ResetDatabase { + return { $type: "Electric.Satellite.SatClientCommand.ResetDatabase", reason: 0 }; +} + +export const SatClientCommand_ResetDatabase = { + $type: "Electric.Satellite.SatClientCommand.ResetDatabase" as const, + + encode(message: SatClientCommand_ResetDatabase, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + if (message.reason !== 0) { + writer.uint32(8).int32(message.reason); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): SatClientCommand_ResetDatabase { + const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseSatClientCommand_ResetDatabase(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag !== 8) { + break; + } + + message.reason = reader.int32() as any; + continue; + } + if ((tag & 7) === 4 || tag === 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + create, I>>(base?: I): SatClientCommand_ResetDatabase { + return SatClientCommand_ResetDatabase.fromPartial(base ?? {}); + }, + + fromPartial, I>>( + object: I, + ): SatClientCommand_ResetDatabase { + const message = createBaseSatClientCommand_ResetDatabase(); + message.reason = object.reason ?? 0; + return message; + }, +}; + +messageTypeRegistry.set(SatClientCommand_ResetDatabase.$type, SatClientCommand_ResetDatabase); + function createBaseSatPerms(): SatPerms { return { $type: "Electric.Satellite.SatPerms", id: Long.ZERO, userId: "", rules: undefined, roles: [] }; } diff --git a/clients/typescript/src/satellite/client.ts b/clients/typescript/src/satellite/client.ts index 9f57a075ec..42d7a7fc52 100644 --- a/clients/typescript/src/satellite/client.ts +++ b/clients/typescript/src/satellite/client.ts @@ -27,6 +27,7 @@ import { RootClientImpl, SatRpcRequest, SatInStartReplicationReq_Dialect, + SatClientCommand, } from '../_generated/protocol/satellite' import { getObjFromString, @@ -72,6 +73,7 @@ import { DataChange, isDataChange, ReplicatedRowTransformer, + CommandCallback, } from '../util/types' import { base64, @@ -131,7 +133,9 @@ type Events = { outbound_started: () => void [SUBSCRIPTION_DELIVERED]: SubscriptionDeliveredCallback [SUBSCRIPTION_ERROR]: SubscriptionErrorCallback + command: (cmd: SatClientCommand) => Promise } + type EventEmitter = AsyncEventEmitter export class SatelliteClient implements Client { @@ -182,6 +186,7 @@ export class SatelliteClient implements Client { SatRpcRequest: (msg) => this.handleRpcRequest(msg), SatOpLogAck: (msg) => void msg, // Server doesn't send that SatPerms: (msg) => void msg, + SatClientCommand: (msg) => this.handleCommand(msg), } satisfies HandlerMapping).map((e) => [getFullTypeName(e[0]), e[1]]) ) @@ -474,6 +479,14 @@ export class SatelliteClient implements Client { this.emitter.removeListener('relation', callback) } + subscribeToCommands(callback: CommandCallback): void { + this.emitter.on('command', callback) + } + + unsubscribeToCommands(callback: CommandCallback): void { + this.emitter.removeListener('command', callback) + } + enqueueTransaction(transaction: DataTransaction): void { if (this.outbound.isReplicating !== ReplicationStatus.ACTIVE) { throw new SatelliteError( @@ -958,6 +971,10 @@ export class SatelliteClient implements Client { this.subscriptionsDataCache.shapeDataEnd() } + private handleCommand(msg: SatClientCommand): void { + this.emitter.enqueueEmit('command', msg) + } + // For now, unsubscribe responses doesn't send any information back // It might eventually confirm that the server processed it or was noop. private handleUnsubscribeResponse(_msg: SatUnsubsResp): UnsubscribeResponse { diff --git a/clients/typescript/src/satellite/index.ts b/clients/typescript/src/satellite/index.ts index c2bba05969..d04f45a57e 100644 --- a/clients/typescript/src/satellite/index.ts +++ b/clients/typescript/src/satellite/index.ts @@ -22,6 +22,7 @@ import { AdditionalDataCallback, DbRecord, ReplicatedRowTransformer, + CommandCallback, } from '../util/types' import { Shape, @@ -116,6 +117,8 @@ export interface Client { unsubscribeToOutboundStarted(callback: OutboundStartedCallback): void subscribeToError(callback: ErrorCallback): void unsubscribeToError(callback: ErrorCallback): void + subscribeToCommands(callback: CommandCallback): void + unsubscribeToCommands(callback: CommandCallback): void subscribe(subId: string, shapes: ShapeRequest[]): Promise unsubscribe(subIds: string[]): Promise diff --git a/clients/typescript/src/satellite/mock.ts b/clients/typescript/src/satellite/mock.ts index 9d5bf7f8be..f7f2c00984 100644 --- a/clients/typescript/src/satellite/mock.ts +++ b/clients/typescript/src/satellite/mock.ts @@ -22,6 +22,7 @@ import { AdditionalDataCallback, ConnectivityState, ReplicatedRowTransformer, + CommandCallback, } from '../util/types' import { ElectricConfig } from '../config/index' @@ -207,6 +208,7 @@ export class MockSatelliteClient relationsCb?: (relation: Relation) => void transactionsCb?: TransactionCallback additionalDataCb?: AdditionalDataCallback + commandCb?: CommandCallback relationData: Record = {} @@ -330,6 +332,14 @@ export class MockSatelliteClient this.on('error', cb) } + subscribeToCommands(callback: CommandCallback): void { + this.commandCb = callback + } + + unsubscribeToCommands(_callback: CommandCallback): void { + this.commandCb = undefined + } + emitSocketClosedError(ev: SocketCloseReason): void { this.enqueueEmit('error', new SatelliteError(ev, 'socket closed')) } diff --git a/clients/typescript/src/satellite/process.ts b/clients/typescript/src/satellite/process.ts index 41c4e96edb..20026f6037 100644 --- a/clients/typescript/src/satellite/process.ts +++ b/clients/typescript/src/satellite/process.ts @@ -4,6 +4,7 @@ import uniqWith from 'lodash.uniqwith' import { SatOpMigrate_Type, SatRelation_RelationType, + SatClientCommand, } from '../_generated/protocol/satellite' import { AuthConfig, AuthState } from '../auth/index' import { DatabaseAdapter } from '../electric/adapter' @@ -320,6 +321,7 @@ export class SatelliteProcess implements Satellite { this.client.subscribeToTransactions(this._applyTransaction.bind(this)) this.client.subscribeToAdditionalData(this._applyAdditionalData.bind(this)) this.client.subscribeToOutboundStarted(this._throttledSnapshot.bind(this)) + this.client.subscribeToCommands(this._handleCommand.bind(this)) this.client.subscribeToSubscriptionEvents( this._handleSubscriptionData.bind(this), @@ -1418,6 +1420,14 @@ export class SatelliteProcess implements Satellite { ]) } + async _handleCommand(cmd: SatClientCommand) { + if (cmd.resetDatabase) { + await this._resetClientState({ + keepSubscribedShapes: true, + }) + } + } + private async maybeGarbageCollect( origin: string, commitTimestamp: Date diff --git a/clients/typescript/src/util/proto.ts b/clients/typescript/src/util/proto.ts index 5069631abf..4652301323 100644 --- a/clients/typescript/src/util/proto.ts +++ b/clients/typescript/src/util/proto.ts @@ -128,6 +128,7 @@ const msgtypetuples: MappingTuples = { SatRpcResponse: [22, Pb.SatRpcResponse], SatOpLogAck: [23, Pb.SatOpLogAck], SatPerms: [24, Pb.SatPerms], + SatClientCommand: [25, Pb.SatClientCommand], } const msgtypemapping = Object.fromEntries( @@ -180,6 +181,7 @@ export type SatPbMsg = | Pb.SatShapeDataEnd | Pb.SatOpLogAck | Pb.SatPerms + | Pb.SatClientCommand export type SatPbMsgObj< Msg extends { $type: string }, @@ -418,6 +420,12 @@ export function msgToString(message: MessageOfInterest): string { }}` case 'Electric.Satellite.SatPerms': return `#SatPerms{}` + case 'Electric.Satellite.SatClientCommand': + if (message.resetDatabase) { + return `#SatClientCommand{command: #ResetDatabase{reason: ${message.resetDatabase.reason}}}` + } + + return `#SatClientCommand{command: }` } } diff --git a/clients/typescript/src/util/types.ts b/clients/typescript/src/util/types.ts index ad0c3b5da8..7f40cb39db 100644 --- a/clients/typescript/src/util/types.ts +++ b/clients/typescript/src/util/types.ts @@ -5,6 +5,7 @@ import { SatOpMigrate_Table, SatOpMigrate_Type, SatRelation_RelationType, + SatClientCommand, } from '../_generated/protocol/satellite' import { Tag } from '../satellite/oplog' @@ -250,6 +251,7 @@ export type IncomingTransactionCallback = ( AckCb: () => void ) => void export type OutboundStartedCallback = () => void +export type CommandCallback = (cmd: SatClientCommand) => Promise export type ConnectivityStatus = 'connected' | 'disconnected' export type ConnectivityState = { diff --git a/clients/typescript/test/satellite/client.test.ts b/clients/typescript/test/satellite/client.test.ts index b28f48614d..32641612e3 100644 --- a/clients/typescript/test/satellite/client.test.ts +++ b/clients/typescript/test/satellite/client.test.ts @@ -809,7 +809,7 @@ test.serial('default and null test', async (t) => { }) }) -test.serial('subscription succesful', async (t) => { +test.serial('subscription successful', async (t) => { await connectAndAuth(t.context) const { client, server } = t.context await startReplication(client, server) @@ -1310,7 +1310,7 @@ test.serial('client correctly handles additional data messages', async (t) => { }) }) -test.serial('unsubscribe successfull', async (t) => { +test.serial('unsubscribe successful', async (t) => { await connectAndAuth(t.context) const { client, server } = t.context await startReplication(client, server) diff --git a/clients/typescript/test/satellite/process.ts b/clients/typescript/test/satellite/process.ts index d7edcc0619..bf63f3ed95 100644 --- a/clients/typescript/test/satellite/process.ts +++ b/clients/typescript/test/satellite/process.ts @@ -48,6 +48,11 @@ import { AuthState, insecureAuthToken } from '../../src/auth' import { ConnectivityStateChangeNotification } from '../../src/notifiers' import { QueryBuilder } from '../../src/migrators/query-builder' import { SatelliteOpts } from '../../src/satellite/config' +import { + SatClientCommand, + SatClientCommand_ResetDatabase, + SatClientCommand_ResetDatabase_Reason, +} from '../../src/_generated/protocol/satellite' export type ContextType = CommonContextType & { builder: QueryBuilder @@ -2535,4 +2540,45 @@ export const processTests = (test: TestFn) => { t.pass() }) + + test('SatClientCommand.ResetDatabase clears all data', async (t) => { + const { client, satellite, adapter } = t.context + const { runMigrations, authState, token } = t.context + await runMigrations() + const tablename = 'parent' + + // relations must be present at subscription delivery + client.setRelations(relations) + client.setRelationData(tablename, parentRecord) + + await startSatellite(satellite, authState, token) + + const shapeDef: Shape = { + tablename, + } + + satellite!.relations = relations + const { synced } = await satellite.subscribe([shapeDef]) + await synced + await satellite._performSnapshot() + const subscriptionCount = + satellite.subscriptions.getFulfilledSubscriptions().length + + await client.commandCb!( + SatClientCommand.fromPartial({ + resetDatabase: SatClientCommand_ResetDatabase.fromPartial({ + reason: SatClientCommand_ResetDatabase_Reason.PERMISSIONS_CHANGE, + }), + }) + ) + + const results = await adapter.query({ + sql: 'SELECT * FROM main.parent', + }) + + t.deepEqual(results, []) + + // make sure our existing subscriptions have been saved + t.assert(satellite.previousShapeSubscriptions.length == subscriptionCount) + }) } diff --git a/components/electric/lib/electric/satellite/permissions/state.ex b/components/electric/lib/electric/satellite/permissions/state.ex index 5913af0693..a76ec63a92 100644 --- a/components/electric/lib/electric/satellite/permissions/state.ex +++ b/components/electric/lib/electric/satellite/permissions/state.ex @@ -11,6 +11,8 @@ defmodule Electric.Satellite.Permissions.State do alias Electric.Postgres.Extension alias Electric.Satellite.Permissions.Trigger + require Logger + @electric_ddlx Extension.ddlx_relation() @enforce_keys [:rules, :schema] @@ -98,6 +100,7 @@ defmodule Electric.Satellite.Permissions.State do {[], {state, loader}} {rules, _count} -> + Logger.debug(fn -> "Updated global permissions id: #{rules.id}" end) {:ok, loader} = SchemaLoader.save_global_permissions(loader, rules) { @@ -181,6 +184,10 @@ defmodule Electric.Satellite.Permissions.State do if modified? do with {:ok, loader, perms} <- SchemaLoader.save_user_permissions(loader, role.user_id, roles) do + Logger.debug(fn -> "Updated user permissions id: #{perms.id}" end, + user_id: role.user_id + ) + {:ok, loader, [updated_user_permissions(role.user_id, perms)]} end else diff --git a/components/electric/lib/electric/satellite/protobuf.ex b/components/electric/lib/electric/satellite/protobuf.ex index d1e8ca1277..3f50c668f6 100644 --- a/components/electric/lib/electric/satellite/protobuf.ex +++ b/components/electric/lib/electric/satellite/protobuf.ex @@ -13,7 +13,8 @@ defmodule Electric.Satellite.Protobuf do SatRpcRequest, SatRpcResponse, SatOpLogAck, - SatPerms + SatPerms, + SatClientCommand } require Logger @@ -34,7 +35,8 @@ defmodule Electric.Satellite.Protobuf do SatRpcRequest => 21, SatRpcResponse => 22, SatOpLogAck => 23, - SatPerms => 24 + SatPerms => 24, + SatClientCommand => 25 } if Enum.any?(Map.values(@mapping), &(&1 in @reserved)) do @@ -74,6 +76,7 @@ defmodule Electric.Satellite.Protobuf do | %SatRpcRequest{} | %SatRpcResponse{} | %SatPerms{} + | %SatClientCommand{} @type rpc_req() :: %Satellite.SatAuthReq{} @@ -132,7 +135,8 @@ defmodule Electric.Satellite.Protobuf do SatOpAdditionalBegin, SatOpAdditionalCommit, SatOpLogAck, - SatPerms + SatPerms, + SatClientCommand } end end diff --git a/components/electric/lib/electric/satellite/protobuf_messages.ex b/components/electric/lib/electric/satellite/protobuf_messages.ex index a0945d69f7..1aecfd65b8 100644 --- a/components/electric/lib/electric/satellite/protobuf_messages.ex +++ b/components/electric/lib/electric/satellite/protobuf_messages.ex @@ -59,6 +59,65 @@ ) ) end, + defmodule Electric.Satellite.SatClientCommand.ResetDatabase.Reason do + @moduledoc false + ( + defstruct [] + + ( + @spec default() :: :PERMISSIONS_CHANGE + def default() do + :PERMISSIONS_CHANGE + end + ) + + @spec encode(atom() | String.t()) :: integer() | atom() + [ + ( + def encode(:PERMISSIONS_CHANGE) do + 0 + end + + def encode("PERMISSIONS_CHANGE") do + 0 + end + ) + ] + + def encode(x) do + x + end + + @spec decode(integer()) :: atom() | integer() + [ + def decode(0) do + :PERMISSIONS_CHANGE + end + ] + + def decode(x) do + x + end + + @spec constants() :: [{integer(), atom()}] + def constants() do + [{0, :PERMISSIONS_CHANGE}] + end + + @spec has_constant?(any()) :: boolean() + ( + [ + def has_constant?(:PERMISSIONS_CHANGE) do + true + end + ] + + def has_constant?(_) do + false + end + ) + ) + end, defmodule Electric.Satellite.SatErrorResp.ErrorCode do @moduledoc false ( @@ -14835,6 +14894,277 @@ end ) end, + defmodule Electric.Satellite.SatClientCommand do + @moduledoc false + defstruct command: nil + + ( + ( + @spec encode(struct) :: {:ok, iodata} | {:error, any} + def encode(msg) do + try do + {:ok, encode!(msg)} + rescue + e in [Protox.EncodingError, Protox.RequiredFieldsError] -> {:error, e} + end + end + + @spec encode!(struct) :: iodata | no_return + def encode!(msg) do + [] |> encode_command(msg) + end + ) + + [ + defp encode_command(acc, msg) do + case msg.command do + nil -> acc + {:reset_database, _field_value} -> encode_reset_database(acc, msg) + end + end + ] + + [ + defp encode_reset_database(acc, msg) do + try do + {_, child_field_value} = msg.command + [acc, "\n", Protox.Encode.encode_message(child_field_value)] + rescue + ArgumentError -> + reraise Protox.EncodingError.new(:reset_database, "invalid field value"), + __STACKTRACE__ + end + end + ] + + [] + ) + + ( + ( + @spec decode(binary) :: {:ok, struct} | {:error, any} + def decode(bytes) do + try do + {:ok, decode!(bytes)} + rescue + e in [Protox.DecodingError, Protox.IllegalTagError, Protox.RequiredFieldsError] -> + {:error, e} + end + end + + ( + @spec decode!(binary) :: struct | no_return + def decode!(bytes) do + parse_key_value(bytes, struct(Electric.Satellite.SatClientCommand)) + end + ) + ) + + ( + @spec parse_key_value(binary, struct) :: struct + defp parse_key_value(<<>>, msg) do + msg + end + + defp parse_key_value(bytes, msg) do + {field, rest} = + case Protox.Decode.parse_key(bytes) do + {0, _, _} -> + raise %Protox.IllegalTagError{} + + {1, _, bytes} -> + {len, bytes} = Protox.Varint.decode(bytes) + {delimited, rest} = Protox.Decode.parse_delimited(bytes, len) + + {[ + case msg.command do + {:reset_database, previous_value} -> + {:command, + {:reset_database, + Protox.MergeMessage.merge( + previous_value, + Electric.Satellite.SatClientCommand.ResetDatabase.decode!(delimited) + )}} + + _ -> + {:command, + {:reset_database, + Electric.Satellite.SatClientCommand.ResetDatabase.decode!(delimited)}} + end + ], rest} + + {tag, wire_type, rest} -> + {_, rest} = Protox.Decode.parse_unknown(tag, wire_type, rest) + {[], rest} + end + + msg_updated = struct(msg, field) + parse_key_value(rest, msg_updated) + end + ) + + [] + ) + + ( + @spec json_decode(iodata(), keyword()) :: {:ok, struct()} | {:error, any()} + def json_decode(input, opts \\ []) do + try do + {:ok, json_decode!(input, opts)} + rescue + e in Protox.JsonDecodingError -> {:error, e} + end + end + + @spec json_decode!(iodata(), keyword()) :: struct() | no_return() + def json_decode!(input, opts \\ []) do + {json_library_wrapper, json_library} = Protox.JsonLibrary.get_library(opts, :decode) + + Protox.JsonDecode.decode!( + input, + Electric.Satellite.SatClientCommand, + &json_library_wrapper.decode!(json_library, &1) + ) + end + + @spec json_encode(struct(), keyword()) :: {:ok, iodata()} | {:error, any()} + def json_encode(msg, opts \\ []) do + try do + {:ok, json_encode!(msg, opts)} + rescue + e in Protox.JsonEncodingError -> {:error, e} + end + end + + @spec json_encode!(struct(), keyword()) :: iodata() | no_return() + def json_encode!(msg, opts \\ []) do + {json_library_wrapper, json_library} = Protox.JsonLibrary.get_library(opts, :encode) + Protox.JsonEncode.encode!(msg, &json_library_wrapper.encode!(json_library, &1)) + end + ) + + ( + @deprecated "Use fields_defs()/0 instead" + @spec defs() :: %{ + required(non_neg_integer) => {atom, Protox.Types.kind(), Protox.Types.type()} + } + def defs() do + %{ + 1 => + {:reset_database, {:oneof, :command}, + {:message, Electric.Satellite.SatClientCommand.ResetDatabase}} + } + end + + @deprecated "Use fields_defs()/0 instead" + @spec defs_by_name() :: %{ + required(atom) => {non_neg_integer, Protox.Types.kind(), Protox.Types.type()} + } + def defs_by_name() do + %{ + reset_database: + {1, {:oneof, :command}, {:message, Electric.Satellite.SatClientCommand.ResetDatabase}} + } + end + ) + + ( + @spec fields_defs() :: list(Protox.Field.t()) + def fields_defs() do + [ + %{ + __struct__: Protox.Field, + json_name: "resetDatabase", + kind: {:oneof, :command}, + label: :optional, + name: :reset_database, + tag: 1, + type: {:message, Electric.Satellite.SatClientCommand.ResetDatabase} + } + ] + end + + [ + @spec(field_def(atom) :: {:ok, Protox.Field.t()} | {:error, :no_such_field}), + ( + def field_def(:reset_database) do + {:ok, + %{ + __struct__: Protox.Field, + json_name: "resetDatabase", + kind: {:oneof, :command}, + label: :optional, + name: :reset_database, + tag: 1, + type: {:message, Electric.Satellite.SatClientCommand.ResetDatabase} + }} + end + + def field_def("resetDatabase") do + {:ok, + %{ + __struct__: Protox.Field, + json_name: "resetDatabase", + kind: {:oneof, :command}, + label: :optional, + name: :reset_database, + tag: 1, + type: {:message, Electric.Satellite.SatClientCommand.ResetDatabase} + }} + end + + def field_def("reset_database") do + {:ok, + %{ + __struct__: Protox.Field, + json_name: "resetDatabase", + kind: {:oneof, :command}, + label: :optional, + name: :reset_database, + tag: 1, + type: {:message, Electric.Satellite.SatClientCommand.ResetDatabase} + }} + end + ), + def field_def(_) do + {:error, :no_such_field} + end + ] + ) + + [] + + ( + @spec required_fields() :: [] + def required_fields() do + [] + end + ) + + ( + @spec syntax() :: atom() + def syntax() do + :proto3 + end + ) + + [ + @spec(default(atom) :: {:ok, boolean | integer | String.t() | float} | {:error, atom}), + def default(:reset_database) do + {:error, :no_default_value} + end, + def default(_) do + {:error, :no_such_field} + end + ] + + ( + @spec file_options() :: nil + def file_options() do + nil + end + ) + end, defmodule Electric.Satellite.SatRpcResponse do @moduledoc false defstruct method: "", request_id: 0, result: nil @@ -21503,6 +21833,256 @@ end ) end, + defmodule Electric.Satellite.SatClientCommand.ResetDatabase do + @moduledoc false + defstruct reason: :PERMISSIONS_CHANGE + + ( + ( + @spec encode(struct) :: {:ok, iodata} | {:error, any} + def encode(msg) do + try do + {:ok, encode!(msg)} + rescue + e in [Protox.EncodingError, Protox.RequiredFieldsError] -> {:error, e} + end + end + + @spec encode!(struct) :: iodata | no_return + def encode!(msg) do + [] |> encode_reason(msg) + end + ) + + [] + + [ + defp encode_reason(acc, msg) do + try do + if msg.reason == :PERMISSIONS_CHANGE do + acc + else + [ + acc, + "\b", + msg.reason + |> Electric.Satellite.SatClientCommand.ResetDatabase.Reason.encode() + |> Protox.Encode.encode_enum() + ] + end + rescue + ArgumentError -> + reraise Protox.EncodingError.new(:reason, "invalid field value"), __STACKTRACE__ + end + end + ] + + [] + ) + + ( + ( + @spec decode(binary) :: {:ok, struct} | {:error, any} + def decode(bytes) do + try do + {:ok, decode!(bytes)} + rescue + e in [Protox.DecodingError, Protox.IllegalTagError, Protox.RequiredFieldsError] -> + {:error, e} + end + end + + ( + @spec decode!(binary) :: struct | no_return + def decode!(bytes) do + parse_key_value(bytes, struct(Electric.Satellite.SatClientCommand.ResetDatabase)) + end + ) + ) + + ( + @spec parse_key_value(binary, struct) :: struct + defp parse_key_value(<<>>, msg) do + msg + end + + defp parse_key_value(bytes, msg) do + {field, rest} = + case Protox.Decode.parse_key(bytes) do + {0, _, _} -> + raise %Protox.IllegalTagError{} + + {1, _, bytes} -> + {value, rest} = + Protox.Decode.parse_enum( + bytes, + Electric.Satellite.SatClientCommand.ResetDatabase.Reason + ) + + {[reason: value], rest} + + {tag, wire_type, rest} -> + {_, rest} = Protox.Decode.parse_unknown(tag, wire_type, rest) + {[], rest} + end + + msg_updated = struct(msg, field) + parse_key_value(rest, msg_updated) + end + ) + + [] + ) + + ( + @spec json_decode(iodata(), keyword()) :: {:ok, struct()} | {:error, any()} + def json_decode(input, opts \\ []) do + try do + {:ok, json_decode!(input, opts)} + rescue + e in Protox.JsonDecodingError -> {:error, e} + end + end + + @spec json_decode!(iodata(), keyword()) :: struct() | no_return() + def json_decode!(input, opts \\ []) do + {json_library_wrapper, json_library} = Protox.JsonLibrary.get_library(opts, :decode) + + Protox.JsonDecode.decode!( + input, + Electric.Satellite.SatClientCommand.ResetDatabase, + &json_library_wrapper.decode!(json_library, &1) + ) + end + + @spec json_encode(struct(), keyword()) :: {:ok, iodata()} | {:error, any()} + def json_encode(msg, opts \\ []) do + try do + {:ok, json_encode!(msg, opts)} + rescue + e in Protox.JsonEncodingError -> {:error, e} + end + end + + @spec json_encode!(struct(), keyword()) :: iodata() | no_return() + def json_encode!(msg, opts \\ []) do + {json_library_wrapper, json_library} = Protox.JsonLibrary.get_library(opts, :encode) + Protox.JsonEncode.encode!(msg, &json_library_wrapper.encode!(json_library, &1)) + end + ) + + ( + @deprecated "Use fields_defs()/0 instead" + @spec defs() :: %{ + required(non_neg_integer) => {atom, Protox.Types.kind(), Protox.Types.type()} + } + def defs() do + %{ + 1 => + {:reason, {:scalar, :PERMISSIONS_CHANGE}, + {:enum, Electric.Satellite.SatClientCommand.ResetDatabase.Reason}} + } + end + + @deprecated "Use fields_defs()/0 instead" + @spec defs_by_name() :: %{ + required(atom) => {non_neg_integer, Protox.Types.kind(), Protox.Types.type()} + } + def defs_by_name() do + %{ + reason: + {1, {:scalar, :PERMISSIONS_CHANGE}, + {:enum, Electric.Satellite.SatClientCommand.ResetDatabase.Reason}} + } + end + ) + + ( + @spec fields_defs() :: list(Protox.Field.t()) + def fields_defs() do + [ + %{ + __struct__: Protox.Field, + json_name: "reason", + kind: {:scalar, :PERMISSIONS_CHANGE}, + label: :optional, + name: :reason, + tag: 1, + type: {:enum, Electric.Satellite.SatClientCommand.ResetDatabase.Reason} + } + ] + end + + [ + @spec(field_def(atom) :: {:ok, Protox.Field.t()} | {:error, :no_such_field}), + ( + def field_def(:reason) do + {:ok, + %{ + __struct__: Protox.Field, + json_name: "reason", + kind: {:scalar, :PERMISSIONS_CHANGE}, + label: :optional, + name: :reason, + tag: 1, + type: {:enum, Electric.Satellite.SatClientCommand.ResetDatabase.Reason} + }} + end + + def field_def("reason") do + {:ok, + %{ + __struct__: Protox.Field, + json_name: "reason", + kind: {:scalar, :PERMISSIONS_CHANGE}, + label: :optional, + name: :reason, + tag: 1, + type: {:enum, Electric.Satellite.SatClientCommand.ResetDatabase.Reason} + }} + end + + [] + ), + def field_def(_) do + {:error, :no_such_field} + end + ] + ) + + [] + + ( + @spec required_fields() :: [] + def required_fields() do + [] + end + ) + + ( + @spec syntax() :: atom() + def syntax() do + :proto3 + end + ) + + [ + @spec(default(atom) :: {:ok, boolean | integer | String.t() | float} | {:error, atom}), + def default(:reason) do + {:ok, :PERMISSIONS_CHANGE} + end, + def default(_) do + {:error, :no_such_field} + end + ] + + ( + @spec file_options() :: nil + def file_options() do + nil + end + ) + end, defmodule Electric.Satellite.SatOpInsert do @moduledoc false defstruct relation_id: 0, row_data: nil, tags: [] diff --git a/components/electric/lib/electric/satellite/protocol.ex b/components/electric/lib/electric/satellite/protocol.ex index 6c5e9fd946..930160f8a6 100644 --- a/components/electric/lib/electric/satellite/protocol.ex +++ b/components/electric/lib/electric/satellite/protocol.ex @@ -33,7 +33,10 @@ defmodule Electric.Satellite.Protocol do @type lsn() :: non_neg_integer @type deep_msg_list() :: PB.sq_pb_msg() | [deep_msg_list()] @type actions() :: {Shapes.subquery_actions(), [non_neg_integer()]} - @type outgoing() :: {deep_msg_list(), State.t()} | {:error, deep_msg_list(), State.t()} + @type outgoing() :: + {deep_msg_list(), State.t()} + | {:error, deep_msg_list(), State.t()} + | {:close, deep_msg_list(), State.t()} @type txn_processing() :: {deep_msg_list(), actions(), State.t()} @producer_demand 5 @@ -659,10 +662,11 @@ defmodule Electric.Satellite.Protocol do defp apply_migrations({changes, state}) do Enum.flat_map_reduce(changes, state, fn %Changes.Migration{} = migration, state -> + # we're updating the permissions with the new schema, but our permissions haven't actually + # changed, so don't trigger shape reprocessing, which is expensive state = %{state | schema_version: migration.version} |> update_permissions(schema: migration.schema) - |> after_permissions_change() {[migration], state} @@ -718,8 +722,16 @@ defmodule Electric.Satellite.Protocol do end defp after_permissions_change(state) do - # TODO(magnetised): updated permissions must be applied to the shapes - state + if Permissions.filter_reads_enabled?() do + command = + %SatClientCommand{ + command: {:reset_database, %SatClientCommand.ResetDatabase{reason: :PERMISSIONS_CHANGE}} + } + + throw({:close, [command], state}) + else + state + end end # If the client received at least one migration during the initial sync, the value of @@ -1415,6 +1427,8 @@ defmodule Electric.Satellite.Protocol do {:ok, schema_loader, sat_perms} = SchemaLoader.user_permissions(state.schema_loader, State.user_id(state)) + Logger.debug(fn -> "Loaded user permissions id: #{sat_perms.id}" end) + perms = state.auth |> Permissions.new() diff --git a/components/electric/lib/electric/satellite/ws_server.ex b/components/electric/lib/electric/satellite/ws_server.ex index 50962688dc..65725d8ec8 100644 --- a/components/electric/lib/electric/satellite/ws_server.ex +++ b/components/electric/lib/electric/satellite/ws_server.ex @@ -120,7 +120,8 @@ defmodule Electric.Satellite.WebsocketServer do push({reply, state}) {:force_unpause, reply, state} -> - Protocol.unpause_and_send_pending_events([reply], state) + [reply] + |> Protocol.unpause_and_send_pending_events(state) |> Protocol.perform_pending_actions() |> push() end @@ -308,14 +309,18 @@ defmodule Electric.Satellite.WebsocketServer do {:ok, state} end - defp handle_producer_msg(from, events, %State{} = state) - when is_out_rep_active(state) do + defp handle_producer_msg(from, events, %State{} = state) when is_out_rep_active(state) do GenStage.ask(from, 1) - events - |> Protocol.send_events_and_maybe_pause(state) - |> Protocol.perform_pending_actions() - |> push() + try do + events + |> Protocol.send_events_and_maybe_pause(state) + |> Protocol.perform_pending_actions() + |> push() + catch + {:close, msgs, state} -> + push({:close, msgs, state}) + end end defp handle_producer_msg(from, events, %State{} = state) @@ -411,10 +416,24 @@ defmodule Electric.Satellite.WebsocketServer do @typep deep_msg_list() :: PB.sq_pb_msg() | [deep_msg_list()] + # https://www.rfc-editor.org/rfc/rfc6455.html#section-7.4.1 + # 1007 indicates that an endpoint is terminating the connection + # because it has received data within a message that was not + # consistent with the type of the message (e.g., non-UTF-8 [RFC3629] + # data within a text message). + @error_code 1007 + + # Status codes in the range 3000-3999 are reserved for use by + # libraries, frameworks, and applications. + @normal_termination 3000 + @spec push(Protocol.outgoing()) :: WebSock.handle_result() defp push({[], %State{} = state}), do: {:ok, state} defp push({pb_msg, %State{} = state}), do: {:push, binary_frames(pb_msg), state} - defp push({:error, msgs, state}), do: {:stop, :normal, 1007, binary_frames(msgs), state} + defp push({:error, msgs, state}), do: {:stop, :normal, @error_code, binary_frames(msgs), state} + + defp push({:close, msgs, state}), + do: {:stop, :normal, @normal_termination, binary_frames(msgs), state} @spec binary_frames(deep_msg_list()) :: [{:binary, iolist()}] defp binary_frames(pb_msg) when not is_list(pb_msg), do: [binary_frame(pb_msg)] diff --git a/components/electric/test/electric/satellite/subscriptions/permissions_read_filter_test.exs b/components/electric/test/electric/satellite/subscriptions/permissions_read_filter_test.exs index 1385e14c09..b45654e34a 100644 --- a/components/electric/test/electric/satellite/subscriptions/permissions_read_filter_test.exs +++ b/components/electric/test/electric/satellite/subscriptions/permissions_read_filter_test.exs @@ -114,6 +114,7 @@ defmodule Electric.Satellite.Subscriptions.PermissionsReadFilterTest do :setup_electrified_tables, :setup_with_ddlx, :setup_with_sql_execute, + :wait_for_permission_state, :setup_ws ] @@ -372,7 +373,8 @@ defmodule Electric.Satellite.Subscriptions.PermissionsReadFilterTest do "ASSIGN (public.accounts, public.team_memberships.role) TO public.team_memberships.user_id", "ASSIGN (public.projects, public.project_memberships.role) TO public.project_memberships.user_id", "ASSIGN (public.users, 'self') TO public.users.id" - ] + ], + wait_for: [perms: 8] } end @@ -381,6 +383,7 @@ defmodule Electric.Satellite.Subscriptions.PermissionsReadFilterTest do :setup_electrified_tables, :setup_with_ddlx, :setup_with_sql_execute, + :wait_for_permission_state, :setup_ws ] @@ -938,23 +941,15 @@ defmodule Electric.Satellite.Subscriptions.PermissionsReadFilterTest do ] ) - # FIXME(magnetised): when shapes and permissions are properly integrated, we should - # receive the new comment plus all the associated elements of the new - # project tree - assert [ - %NewRecord{ - relation: {"public", "comments"}, - record: %{"id" => ^new_comment_id} - }, - %NewRecord{ - relation: {"public", "issues"}, - record: %{"id" => @issue2_id} - }, - %NewRecord{ - relation: {"public", "projects"}, - record: %{"id" => @project2_id} - } - ] = receive_txn_changes(conn, rel_map) + reset_db_command = %SatClientCommand{ + command: + {:reset_database, + %SatClientCommand.ResetDatabase{ + reason: :PERMISSIONS_CHANGE + }} + } + + assert_receive {^conn, ^reset_db_command}, 1_000 end) end end diff --git a/components/electric/test/electric/satellite/subscriptions_test.exs b/components/electric/test/electric/satellite/subscriptions_test.exs index d1f7c61cdc..eaa4aaf44f 100644 --- a/components/electric/test/electric/satellite/subscriptions_test.exs +++ b/components/electric/test/electric/satellite/subscriptions_test.exs @@ -20,8 +20,8 @@ defmodule Electric.Satellite.SubscriptionsTest do :setup_replicated_db, :setup_electrified_tables, :setup_open_permissions, - :setup_with_sql_execute, - :setup_with_ddlx + :setup_with_ddlx, + :setup_with_sql_execute ] setup ctx do diff --git a/components/electric/test/support/postgres_test_connection.ex b/components/electric/test/support/postgres_test_connection.ex index bf7096ca0d..215ca3218d 100644 --- a/components/electric/test/support/postgres_test_connection.ex +++ b/components/electric/test/support/postgres_test_connection.ex @@ -177,7 +177,11 @@ defmodule Electric.Postgres.TestConnection do [electrified_count: electrified_table_count] end - defp wait_for_message(origin, msg_type) do + defp wait_for_message(_origin, []) do + :ok + end + + defp wait_for_message(origin, msg_type_list) when is_list(msg_type_list) do Stream.resource( fn -> 0 end, fn pos -> @@ -193,26 +197,42 @@ defmodule Electric.Postgres.TestConnection do & &1 ) |> Stream.reject(&(&1.changes == [])) - |> Enum.reduce_while(10, fn - _tx, 0 -> - {:halt, :error} - - %{changes: changes}, n -> - if Enum.any?(changes, &is_struct(&1, msg_type)) do - {:halt, :ok} - else - {:cont, n - 1} + |> Enum.reduce_while(msg_type_list, fn + %{changes: changes}, wait -> + changes + |> Enum.reduce(wait, fn + %type{}, [type | rest] -> + rest + + _, wait -> + wait + end) + |> case do + [] -> + {:halt, :ok} + + rest -> + {:cont, rest} end end) |> case do :ok -> :ok - :error -> - flunk("#{msg_type} didn't show up in the cached WAL") + _ -> + flunk("#{inspect(msg_type_list)} didn't show up in the cached WAL") end end + defp wait_for_message(origin, type, count \\ 1) when is_atom(type) do + wait_list = + type + |> Stream.duplicate(count) + |> Enum.to_list() + + wait_for_message(origin, wait_list) + end + def run_scenario_migrations(conn, scenario) do {tables, [{version, ddl}]} = migration(scenario) for sql <- ddl, do: {:ok, [], []} = :epgsql.squery(conn, sql) @@ -363,19 +383,24 @@ defmodule Electric.Postgres.TestConnection do def setup_with_sql_execute(_), do: :ok - def setup_with_ddlx(%{conn: conn, ddlx: ddlx}) do - ddlx - |> List.wrap() - |> Enum.map(&String.trim/1) - |> Enum.map(fn - "ELECTRIC " <> _ = ddlx -> ddlx - ddl -> "ELECTRIC " <> ddl - end) - |> Enum.map(&Electric.DDLX.parse!/1) - |> Electric.DDLX.Command.pg_sql() - |> Enum.each(fn sql -> - {:ok, 1} = :epgsql.squery(conn, sql) - end) + def setup_with_ddlx(%{conn: conn, ddlx: ddlx, origin: origin}) do + sql = + ddlx + |> List.wrap() + |> Enum.map(&String.trim/1) + |> Enum.map(fn + "ELECTRIC " <> _ = ddlx -> ddlx + ddl -> "ELECTRIC " <> ddl + end) + |> Enum.map(&Electric.DDLX.parse!/1) + |> Electric.DDLX.Command.pg_sql() + |> Enum.join("\n") + + conn + |> :epgsql.squery(["BEGIN;\n", sql, "\nCOMMIT;"]) + |> Enum.each(&(:ok = elem(&1, 0))) + + :ok = wait_for_message(origin, Electric.Replication.Changes.UpdatedPermissions) :ok end @@ -384,6 +409,15 @@ defmodule Electric.Postgres.TestConnection do :ok end + def wait_for_permission_state(%{origin: origin} = cxt) do + msg_count = + cxt + |> Map.get(:wait_for, []) + |> Keyword.get(:perms, 0) + + :ok = wait_for_message(origin, Electric.Replication.Changes.UpdatedPermissions, msg_count) + end + def load_schema(%{conn: _, origin: origin}) do {:ok, schema} = Electric.Postgres.Extension.SchemaCache.load(origin) diff --git a/e2e/tests/02.02_migrations_get_streamed_to_satellite.lux b/e2e/tests/02.02_migrations_get_streamed_to_satellite.lux index f7a431c677..9df6c254f7 100644 --- a/e2e/tests/02.02_migrations_get_streamed_to_satellite.lux +++ b/e2e/tests/02.02_migrations_get_streamed_to_satellite.lux @@ -18,7 +18,6 @@ CALL electric.migration_version('$migration_version'); CREATE TABLE mtable1 (id uuid PRIMARY KEY); ALTER TABLE mtable1 ENABLE ELECTRIC; - ELECTRIC GRANT ALL ON mtable1 TO AUTHENTICATED; COMMIT; """ ?$psql diff --git a/e2e/tests/03.03_node_satellite_sends_and_recieves_data.lux b/e2e/tests/03.03_node_satellite_sends_and_recieves_data.lux index fef1b9b917..a18bd5d2a9 100644 --- a/e2e/tests/03.03_node_satellite_sends_and_recieves_data.lux +++ b/e2e/tests/03.03_node_satellite_sends_and_recieves_data.lux @@ -4,15 +4,17 @@ [invoke setup] -[invoke setup_client 1 "electric_1" 5133] -[invoke setup_client 2 "electric_1" 5133] - [shell proxy_1] [invoke migrate_items_table 20230504114018] +[invoke setup_client 1 "electric_1" 5133] +[invoke setup_client 2 "electric_1" 5133] + [shell satellite_1] ??[rpc] recv: #SatInStartReplicationResp + ??Connectivity state changed: connected [invoke node_sync_items ""] + [shell satellite_2] ??[rpc] recv: #SatInStartReplicationResp [invoke node_sync_items ""] diff --git a/e2e/tests/03.04_node_satellite_correctly_updates_serialization_caches.lux b/e2e/tests/03.04_node_satellite_correctly_updates_serialization_caches.lux index 5d05e96e2d..073806bbcf 100644 --- a/e2e/tests/03.04_node_satellite_correctly_updates_serialization_caches.lux +++ b/e2e/tests/03.04_node_satellite_correctly_updates_serialization_caches.lux @@ -4,8 +4,6 @@ [invoke setup] -[invoke setup_client 1 "electric_1" 5133] -[invoke setup_client 2 "electric_1" 5133] [global migration_version_1=20230504114018] [global migration_version_2=20230505100008] @@ -16,6 +14,9 @@ !\d electric.shadow__public__items !\d items +[invoke setup_client 1 "electric_1" 5133] +[invoke setup_client 2 "electric_1" 5133] + [shell satellite_1] ??[rpc] recv: #SatInStartReplicationResp [invoke node_sync_items ""] diff --git a/e2e/tests/03.06_node_satellite_does_sync_on_subscribe.lux b/e2e/tests/03.06_node_satellite_does_sync_on_subscribe.lux index 56bd46082d..5e579d0e61 100644 --- a/e2e/tests/03.06_node_satellite_does_sync_on_subscribe.lux +++ b/e2e/tests/03.06_node_satellite_does_sync_on_subscribe.lux @@ -4,11 +4,11 @@ [invoke setup] -[invoke setup_client 1 "electric_1" 5133] - [shell proxy_1] [invoke migrate_items_table 20230504114018] +[invoke setup_client 1 "electric_1" 5133] + [shell satellite_1] ??[rpc] recv: #SatInStartReplicationResp [invoke node_await_table "items"] diff --git a/e2e/tests/03.07_node_satellite_can_delete_freshly_synced_rows.lux b/e2e/tests/03.07_node_satellite_can_delete_freshly_synced_rows.lux index af001a9801..b0098095a1 100644 --- a/e2e/tests/03.07_node_satellite_can_delete_freshly_synced_rows.lux +++ b/e2e/tests/03.07_node_satellite_can_delete_freshly_synced_rows.lux @@ -4,11 +4,11 @@ [invoke setup] -[invoke setup_client 1 "electric_1" 5133] - [shell proxy_1] [invoke migrate_items_table 20230504114018] +[invoke setup_client 1 "electric_1" 5133] + [shell satellite_1] ??[rpc] recv: #SatInStartReplicationResp [invoke node_await_table "items"] diff --git a/e2e/tests/03.08_node_satellite_can_resume_subscriptions_on_reconnect.lux b/e2e/tests/03.08_node_satellite_can_resume_subscriptions_on_reconnect.lux index dae3bc2230..359fea5fed 100644 --- a/e2e/tests/03.08_node_satellite_can_resume_subscriptions_on_reconnect.lux +++ b/e2e/tests/03.08_node_satellite_can_resume_subscriptions_on_reconnect.lux @@ -4,11 +4,11 @@ [invoke setup] -[invoke setup_client 1 "electric_1" 5133] - [shell proxy_1] [invoke migrate_items_table 20230504114018] +[invoke setup_client 1 "electric_1" 5133] + [shell satellite_1] ??[rpc] recv: #SatInStartReplicationResp [invoke node_await_table "items"] diff --git a/e2e/tests/03.11_node_satellite_compensations_work.lux b/e2e/tests/03.11_node_satellite_compensations_work.lux index 6b629b737c..16a5ca6ffc 100644 --- a/e2e/tests/03.11_node_satellite_compensations_work.lux +++ b/e2e/tests/03.11_node_satellite_compensations_work.lux @@ -4,14 +4,14 @@ [invoke setup] -[invoke setup_client 1 "electric_1" 5133] - # PREPARATION: Set up dependent tables and add a row that will be referenced [shell proxy_1] [invoke migrate_items_table 001] [invoke migrate_other_items_table 002] +[invoke setup_client 1 "electric_1" 5133] + [shell satellite_1] ??[rpc] recv: #SatInStartReplicationResp [invoke node_sync_other_items ""] diff --git a/e2e/tests/03.12_server_correctly_continues_the_replication.lux b/e2e/tests/03.12_server_correctly_continues_the_replication.lux index be0751629d..82ed5edec5 100644 --- a/e2e/tests/03.12_server_correctly_continues_the_replication.lux +++ b/e2e/tests/03.12_server_correctly_continues_the_replication.lux @@ -4,11 +4,11 @@ [invoke setup] -[invoke setup_client 1 "electric_1" 5133] - [shell proxy_1] [invoke migrate_items_table 20230504114018] +[invoke setup_client 1 "electric_1" 5133] + [shell satellite_1] ??[rpc] recv: #SatInStartReplicationResp [invoke node_sync_items ""] diff --git a/e2e/tests/03.21_node_satellite_correctly_handles_move_in_move_out.lux b/e2e/tests/03.21_node_satellite_correctly_handles_move_in_move_out.lux index e510c290aa..037f632d00 100644 --- a/e2e/tests/03.21_node_satellite_correctly_handles_move_in_move_out.lux +++ b/e2e/tests/03.21_node_satellite_correctly_handles_move_in_move_out.lux @@ -4,8 +4,6 @@ [invoke setup] -[invoke setup_client 1 "electric_1" 5133] - [shell proxy_1] [local sql= """ @@ -36,6 +34,8 @@ """] [invoke migrate_pg 20240130000000 $sql] +[invoke setup_client 1 "electric_1" 5133] + [shell satellite_1] ??[rpc] recv: #SatInStartReplicationResp [invoke node_await_table "comments"] diff --git a/e2e/tests/03.22_node_satellite_can_disconnect_and_reconnect.lux b/e2e/tests/03.22_node_satellite_can_disconnect_and_reconnect.lux index a2d7fe6c7e..5a3681dc71 100644 --- a/e2e/tests/03.22_node_satellite_can_disconnect_and_reconnect.lux +++ b/e2e/tests/03.22_node_satellite_can_disconnect_and_reconnect.lux @@ -4,12 +4,12 @@ [invoke setup] -[invoke setup_client 1 "electric_1" 5133] -[invoke setup_client 2 "electric_1" 5133] - [shell proxy_1] [invoke migrate_items_table 20230504114018] +[invoke setup_client 1 "electric_1" 5133] +[invoke setup_client 2 "electric_1" 5133] + [shell satellite_1] ??[rpc] recv: #SatInStartReplicationResp [invoke node_sync_items ""] diff --git a/e2e/tests/03.24_node_satellite_can_transform_at_replication_boundary.lux b/e2e/tests/03.24_node_satellite_can_transform_at_replication_boundary.lux index 60c7963ebe..04c311187d 100644 --- a/e2e/tests/03.24_node_satellite_can_transform_at_replication_boundary.lux +++ b/e2e/tests/03.24_node_satellite_can_transform_at_replication_boundary.lux @@ -4,12 +4,12 @@ [invoke setup] -[invoke setup_client 1 electric_1 5133] -[invoke setup_client 2 electric_1 5133] - [shell proxy_1] [invoke migrate_items_table 20230504114018] +[invoke setup_client 1 electric_1 5133] +[invoke setup_client 2 electric_1 5133] + [shell satellite_1] ??[rpc] recv: #SatInStartReplicationResp [invoke node_set_item_replication_transform] diff --git a/e2e/tests/03.27_postgres_dropped_columns_are_not_electrified.lux b/e2e/tests/03.27_postgres_dropped_columns_are_not_electrified.lux index c2ae931c9e..f395cb54c5 100644 --- a/e2e/tests/03.27_postgres_dropped_columns_are_not_electrified.lux +++ b/e2e/tests/03.27_postgres_dropped_columns_are_not_electrified.lux @@ -41,8 +41,13 @@ [shell proxy_1] -$fail_pattern|content_b|dropped + !BEGIN; + ?electric=\*# !ALTER TABLE entries ENABLE ELECTRIC; ??ELECTRIC ENABLE + !ELECTRIC GRANT ALL on entries TO AUTHENTICATED; + ?electric=\*# + !COMMIT; ??electric=# [shell pg_1] @@ -60,6 +65,7 @@ # Start two clients and make conflicting writes on both to verify correct conflict resolution # that's not affected by the dropped column content_b. [invoke setup_client 1 "electric_1" 5133] + [shell satellite_1] ?send: #SatAuthReq\{id: ([a-f0-9-]{36}) [global client_1_id=$1] diff --git a/e2e/tests/06.02_permissions_change_propagation.lux b/e2e/tests/06.02_permissions_change_propagation.lux index 1391d3ceb5..aa49aaeff8 100644 --- a/e2e/tests/06.02_permissions_change_propagation.lux +++ b/e2e/tests/06.02_permissions_change_propagation.lux @@ -10,7 +10,6 @@ [global session_id=001] [global project_id=99adf0a5-b3c6-45d7-9986-582e76db4556] - [shell proxy_1] [invoke log "run migration $migration_version_1 on postgres"] """! @@ -77,6 +76,18 @@ [shell electric] ?user_id=$user_id1 .+ Global permissions updated for connection +# TODO: permissions-change: perms change should not drop connection +# currently a perms change drops the connection to force +# a re-subscribe with updated perms + +[shell user_1_ws1] + ?SatClientCommand\{command: .+ResetDatabase\{reason: :PERMISSIONS_CHANGE + ?Server closed the websocket connection + + -$fail_pattern + + [invoke client_session $user_id1 $session_id] + [shell pg_1] !INSERT INTO project_memberships (id, project_id, user_id, role) VALUES ('c197a4ef-0f22-4af1-acb1-bf7200e64900', '$project_id', '$user_id1', 'member'); ?$psql diff --git a/e2e/tests/06.03_basic_permissions_read_filtering.lux b/e2e/tests/06.03_basic_permissions_read_filtering.lux index c624841739..bdc40eadac 100644 --- a/e2e/tests/06.03_basic_permissions_read_filtering.lux +++ b/e2e/tests/06.03_basic_permissions_read_filtering.lux @@ -148,35 +148,5 @@ [shell user_2_ws1] ?SatOpInsert\{.+values: \["c2b114a3-9527-4f3c-8863-b0dc7c85ec76", "$project_id2", "Project 2/Issue 4"\] -# add user 1 to project 2 -[shell pg_1] - """! - BEGIN; - INSERT INTO project_memberships (id, project_id, user_id, role) VALUES - ('943fcfde-cd15-4eb8-9cae-89fa07db55c8', '$project_id2', '$user_id1', 'member'); - COMMIT; - """ - ?$psql - -# TODO: we should automatically receive the project 2 data -# meanwhile, let's prod things by inserting a new issue - -[shell pg_1] - """! - BEGIN; - INSERT INTO issues (id, project_id, name) VALUES - ('12c078a1-bb7f-4b13-a415-a50f9f4c5cdc', '$project_id2', 'Project 2/Issue 5'); - COMMIT; - """ - ?$psql - -[shell user_1_ws1] - -$fail_pattern - ?+SatOpInsert\{.+values: \["$project_id2", "Project 2"\] - ?SatOpInsert\{.+values: \["12c078a1-bb7f-4b13-a415-a50f9f4c5cdc", "$project_id2", "Project 2/Issue 5"\] - -[shell user_2_ws1] - ?SatOpInsert\{.+values: \["12c078a1-bb7f-4b13-a415-a50f9f4c5cdc", "$project_id2", "Project 2/Issue 5"\] - [cleanup] [invoke teardown] diff --git a/e2e/tests/06.04_permissions_changes_resubscribe.lux b/e2e/tests/06.04_permissions_changes_resubscribe.lux new file mode 100644 index 0000000000..f4c7ae1b1c --- /dev/null +++ b/e2e/tests/06.04_permissions_changes_resubscribe.lux @@ -0,0 +1,192 @@ +[doc Permissions changes are propagated to client connection] +[include _shared.luxinc] + +[invoke setup] + +[global migration_version_1=20231109154018] +[global migration_version_2=20240226114300] +[global user_id1=95f21e62-4b90-49c3-874a-174eb17e58cf] +[global user_id2=31377df9-c659-493e-b26f-1ce5fbb0b2df] +[global session_id=001] +[global project_id1=11111111-b3c6-45d7-9986-582e76db4556] +[global project_id2=22222222-88c2-4d44-9b50-734abafa76ae] +[global issue_id1=0f18e8c3-dd50-49dd-812a-312fec8ecc01] +[global issue_id2=9df262ca-fd9d-48c2-80e2-0f86816e7a39] +[global issue_id3=1011b1e6-f506-4c88-bfb0-d061ab189818] + + +# this macro differs from `elixir_client_subscribe` because it doesn't match on the SatSubsDataEnd +# message -- we're matching on the contents of the sub data +[macro subscribe_tables id tables] + """! + {:ok, %{err: nil}} = TestWsClient.make_rpc_call(conn, "subscribe", ProtocolHelpers.subscription_request("$id", request_1: ~w|$tables|)) + """ + ?%Electric.Satellite.SatSubsDataBegin{subscription_id: "$id" +[endmacro] + +[shell proxy_1] + [invoke log "run migration $migration_version_1 on postgres"] + """! + BEGIN; + CALL electric.migration_version('$migration_version_1'); + + CREATE TABLE "users" ( + id uuid NOT NULL PRIMARY KEY, + name text NOT NULL + ); + CREATE TABLE "projects" ( + id uuid NOT NULL PRIMARY KEY, + name text NOT NULL + ); + CREATE TABLE "issues" ( + id uuid NOT NULL PRIMARY KEY, + project_id uuid NOT NULL REFERENCES projects (id), + name text NOT NULL + ); + CREATE TABLE "comments" ( + id uuid NOT NULL PRIMARY KEY, + issues_id uuid NOT NULL REFERENCES issues (id), + author_id uuid NOT NULL REFERENCES users (id), + comment text NOT NULL + ); + CREATE TABLE "project_memberships" ( + id uuid NOT NULL PRIMARY KEY, + project_id uuid NOT NULL REFERENCES projects (id), + user_id uuid NOT NULL REFERENCES users (id), + role text NOT NULL + ); + + ALTER TABLE "users" ENABLE ELECTRIC; + ALTER TABLE "projects" ENABLE ELECTRIC; + ALTER TABLE "issues" ENABLE ELECTRIC; + ALTER TABLE "comments" ENABLE ELECTRIC; + ALTER TABLE "project_memberships" ENABLE ELECTRIC; + + + ELECTRIC GRANT ALL ON public.users TO (public.users, 'self'); + ELECTRIC GRANT READ ON public.users TO AUTHENTICATED; + + ELECTRIC GRANT READ ON public.projects TO (public.projects, 'member'); + ELECTRIC GRANT ALL ON public.issues TO (public.projects, 'member'); + ELECTRIC GRANT READ ON public.comments TO (public.projects, 'member'); + ELECTRIC GRANT WRITE ON public.comments TO (public.projects, 'member') WHERE (row.author_id = auth.user_id); + ELECTRIC GRANT READ ON public.project_memberships TO (public.projects, 'member'); + + ELECTRIC ASSIGN (public.projects, public.project_memberships.role) TO public.project_memberships.user_id; + ELECTRIC ASSIGN (public.users, 'self') TO public.users.id; + + COMMIT; + """ + ?$psql + + +[shell electric] + ?? [info] Applying migration $migration_version_1 + +[shell pg_1] + """! + BEGIN; + + INSERT INTO users (id, name) VALUES + ('$user_id1', 'User 1'), + ('$user_id2', 'User 2'); + + INSERT INTO projects (id, name) VALUES + ('$project_id1', 'Project 1'), + ('$project_id2', 'Project 2'); + + INSERT INTO issues (id, project_id, name) VALUES + ('$issue_id1', '$project_id1', 'Project 1/Issue 1'), + ('$issue_id2', '$project_id1', 'Project 1/Issue 2'), + ('$issue_id3', '$project_id2', 'Project 2/Issue 3'); + + INSERT INTO project_memberships (id, project_id, user_id, role) VALUES + ('80bd296e-d506-4b4c-9024-2b376d06539f', '$project_id1', '$user_id1', 'member'), + ('c720de9c-7078-45f5-810e-71ed410169c5', '$project_id1', '$user_id2', 'member'), + ('e83721cb-10df-4e7d-9729-3249c27eba50', '$project_id2', '$user_id2', 'member'); + + COMMIT; + """ + ?$psql + +[newshell user_1_ws1] + -$fail_pattern + [invoke start_elixir_test 1] + [invoke client_session $user_id1 $session_id] + [invoke subscribe_tables "f624382d-b14c-424a-8836-b042bb12f65a" "issues"] + +[newshell user_2_ws1] + -$fail_pattern + [invoke start_elixir_test 2] + [invoke client_session $user_id2 $session_id] + [invoke subscribe_tables "8afafeef-a20e-4a88-bf7d-7b628a5fd7bb" "issues"] + +[shell electric] + ?user_id=$user_id1 .+SatOpInsert\{.+values: \["11111111-b3c6-45d7-9986-582e76db4556", "Project 1"\] + +[shell user_1_ws1] + ?+SatOpInsert\{.+values: \["$issue_id1", "$project_id1", "Project 1/Issue 1"\] + ?SatOpInsert\{.+values: \["$issue_id2", "$project_id1", "Project 1/Issue 2"\] + +[shell user_2_ws1] + ?+SatOpInsert\{.+values: \["$issue_id1", "$project_id1", "Project 1/Issue 1"\] + ?+SatOpInsert\{.+values: \["$issue_id2", "$project_id1", "Project 1/Issue 2"\] + ?SatOpInsert\{.+values: \["$issue_id3", "$project_id2", "Project 2/Issue 3"\] + +[shell user_1_ws1] + -"Project 2/Issue 4" + +[shell pg_1] + """! + BEGIN; + INSERT INTO issues (id, project_id, name) VALUES + ('c2b114a3-9527-4f3c-8863-b0dc7c85ec76', '$project_id2', 'Project 2/Issue 4'); + COMMIT; + """ + ?$psql + +[shell user_2_ws1] + ?SatOpInsert\{.+values: \["c2b114a3-9527-4f3c-8863-b0dc7c85ec76", "$project_id2", "Project 2/Issue 4"\] + +# add user 1 to project 2 +[shell pg_1] + """! + BEGIN; + INSERT INTO project_memberships (id, project_id, user_id, role) VALUES + ('943fcfde-cd15-4eb8-9cae-89fa07db55c8', '$project_id2', '$user_id1', 'member'); + COMMIT; + """ + ?$psql + +# TODO: permissions-change: perms change should not drop connection + +[shell user_1_ws1] + ?SatClientCommand\{command: .+ResetDatabase\{reason: :PERMISSIONS_CHANGE + ?Server closed the websocket connection + + -$fail_pattern + + [invoke client_session $user_id1 $session_id] + [invoke subscribe_tables "f624382d-b14c-424a-8836-b042bb12f65a" "issues"] + + ?+SatOpInsert\{.+values: \["$issue_id1", "$project_id1", "Project 1/Issue 1"\] + ?+SatOpInsert\{.+values: \["$issue_id2", "$project_id1", "Project 1/Issue 2"\] + ?SatOpInsert\{.+values: \["$issue_id3", "$project_id2", "Project 2/Issue 3"\] + +[shell pg_1] + """! + BEGIN; + INSERT INTO issues (id, project_id, name) VALUES + ('12c078a1-bb7f-4b13-a415-a50f9f4c5cdc', '$project_id2', 'Project 2/Issue 5'); + COMMIT; + """ + ?$psql + +[shell user_1_ws1] + ?SatOpInsert\{.+values: \["12c078a1-bb7f-4b13-a415-a50f9f4c5cdc", "$project_id2", "Project 2/Issue 5"\] + +[shell user_2_ws1] + ?SatOpInsert\{.+values: \["12c078a1-bb7f-4b13-a415-a50f9f4c5cdc", "$project_id2", "Project 2/Issue 5"\] + +[cleanup] + [invoke teardown] diff --git a/e2e/tests/_shared.luxinc b/e2e/tests/_shared.luxinc index 2791cd2256..d8ff4700cd 100644 --- a/e2e/tests/_shared.luxinc +++ b/e2e/tests/_shared.luxinc @@ -144,6 +144,10 @@ ELECTRIC GRANT ALL ON public.items TO AUTHENTICATED; """] [invoke migrate_pg $version $sql] + [my old=$LUX_SHELLNAME] + [shell electric] + ??Updated global permissions + [shell $old] [endmacro] [macro migrate_other_items_table version] @@ -158,4 +162,8 @@ ELECTRIC GRANT ALL ON public.other_items TO AUTHENTICATED; """] [invoke migrate_pg $version $sql] + [my old=$LUX_SHELLNAME] + [shell electric] + ??Updated global permissions + [shell $old] [endmacro] diff --git a/protocol/satellite.proto b/protocol/satellite.proto index 5bc3c51fad..d9c05833fe 100644 --- a/protocol/satellite.proto +++ b/protocol/satellite.proto @@ -660,6 +660,18 @@ message SatShapeDataBegin { message SatShapeDataEnd { } +message SatClientCommand { + message ResetDatabase { + enum Reason { + PERMISSIONS_CHANGE = 0; + } + Reason reason = 1; + } + oneof command { + ResetDatabase reset_database = 1; + } +} + // represents the client permissions // - `Rules`: the global permission rules, defined by the DDLX // - `Roles`: a users actual assigned roles