diff --git a/README.md b/README.md index 35e6b8df..c4be9255 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ For development updates make sure to check out the official [ElectricSQL Discord --- -Non official Dart client implementation for [Electric](https://electric-sql.com/) based on commit `cd77e4a67f5367e1376cab752bd16177801d20b5` of the [electric git repository](https://github.com/electric-sql/electric) +Non official Dart client implementation for [Electric](https://electric-sql.com/) based on commit `6017643d64f52796e56e451a2275efc5ad088e4f` of the [electric git repository](https://github.com/electric-sql/electric) - Client based on the typescript client from the `clients/typescript` subfolder. diff --git a/packages/electricsql/lib/src/auth/util.dart b/packages/electricsql/lib/src/auth/util.dart index 6dcbc3db..5cd4d30c 100644 --- a/packages/electricsql/lib/src/auth/util.dart +++ b/packages/electricsql/lib/src/auth/util.dart @@ -5,7 +5,9 @@ Future authToken({String? iss, String? key}) async { final mockKey = key ?? 'integration-tests-signing-key-example'; final int nowInSecs = DateTime.now().millisecondsSinceEpoch ~/ 1000; - final iat = nowInSecs - 1; // Remove 1 second + + // Subtract 1 second to account for clock precision when validating the token + final iat = nowInSecs - 1; final jwt = JWT( { diff --git a/packages/electricsql/lib/src/migrators/bundle.dart b/packages/electricsql/lib/src/migrators/bundle.dart index cdc7197d..e2ac99f9 100644 --- a/packages/electricsql/lib/src/migrators/bundle.dart +++ b/packages/electricsql/lib/src/migrators/bundle.dart @@ -112,7 +112,9 @@ class BundleMigrator implements Migrator { if (migration.version != version) { throw Exception( - 'Migrations cannot be altered once applied: expecting $version at index $i.', + 'Local migrations $version does not match server version ${migration.version}. ' + 'This is an unrecoverable error. Please clear your local storage and try again. ' + 'Check documentation (https://electric-sql.com/docs/reference/limitations) to learn more.', ); } } @@ -141,7 +143,7 @@ class BundleMigrator implements Migrator { await adapter.runInTransaction([ ...statements, - Statement(applied, [version, DateTime.now().millisecondsSinceEpoch]) + Statement(applied, [version, DateTime.now().millisecondsSinceEpoch]), ]); } diff --git a/packages/electricsql/lib/src/migrators/schema.dart b/packages/electricsql/lib/src/migrators/schema.dart index 582c0a03..89bec0d3 100644 --- a/packages/electricsql/lib/src/migrators/schema.dart +++ b/packages/electricsql/lib/src/migrators/schema.dart @@ -20,7 +20,7 @@ final kBaseMigrations = [ //"-- Somewhere to track migrations\n", 'CREATE TABLE IF NOT EXISTS $migrationsTable (\n id INTEGER PRIMARY KEY AUTOINCREMENT,\n version TEXT NOT NULL UNIQUE,\n applied_at TEXT NOT NULL\n);', //"-- Initialisation of the metadata table\n", - "INSERT INTO $metaTable (key, value) VALUES ('compensations', 0), ('lastAckdRowId','0'), ('lastSentRowId', '0'), ('lsn', ''), ('clientId', ''), ('subscriptions', '');", + "INSERT INTO $metaTable (key, value) VALUES ('compensations', 1), ('lastAckdRowId','0'), ('lastSentRowId', '0'), ('lsn', ''), ('clientId', ''), ('subscriptions', '');", //"-- These are toggles for turning the triggers on and off\n", 'DROP TABLE IF EXISTS $triggersTable;', 'CREATE TABLE $triggersTable (tablename TEXT PRIMARY KEY, flag INTEGER);', @@ -28,5 +28,5 @@ final kBaseMigrations = [ 'CREATE TABLE $shadowTable (\n namespace TEXT NOT NULL,\n tablename TEXT NOT NULL,\n primaryKey TEXT NOT NULL,\n tags TEXT NOT NULL,\n PRIMARY KEY (namespace, tablename, primaryKey));', ], version: '0', - ) + ), ]; diff --git a/packages/electricsql/lib/src/migrators/triggers.dart b/packages/electricsql/lib/src/migrators/triggers.dart index ea5d6526..66e20a79 100644 --- a/packages/electricsql/lib/src/migrators/triggers.dart +++ b/packages/electricsql/lib/src/migrators/triggers.dart @@ -132,7 +132,6 @@ List generateOplogTriggers( List generateCompensationTriggers( TableFullName tableFullName, Table table, - Tables tables, ) { final tableName = table.tableName; final namespace = table.namespace; @@ -140,35 +139,43 @@ List generateCompensationTriggers( List makeTriggers(ForeignKey foreignKey) { final childKey = foreignKey.childKey; - final fkTable = tables[foreignKey.table]; - if (fkTable == null) { - throw Exception('Table ${foreignKey.table} for foreign key not found.'); - } - final joinedFkPKs = joinColsForJSON(fkTable.primary, null); - final joinedFkCols = joinColsForJSON(fkTable.columns, null); + + const fkTableNamespace = + 'main'; // currently, Electric always uses the 'main' namespace + final fkTableName = foreignKey.table; + final fkTablePK = + foreignKey.parentKey; // primary key of the table pointed at by the FK. + final joinedFkPKs = joinColsForJSON([fkTablePK], null); + return [ - 'DROP TRIGGER IF EXISTS compensation_insert_${namespace}_${tableName}_${childKey}_into_oplog;', + ''' + -- Triggers for foreign key compensations + DROP TRIGGER IF EXISTS compensation_insert_${namespace}_${tableName}_${childKey}_into_oplog;''', + // The compensation trigger inserts a row in `_electric_oplog` if the row pointed at by the FK exists + // The way how this works is that the values for the row are passed to the nested SELECT + // which will return those values for every record that matches the query + // which can be at most once since we filter on the foreign key which is also the primary key and thus is unique. ''' CREATE TRIGGER compensation_insert_${namespace}_${tableName}_${childKey}_into_oplog AFTER INSERT ON $tableFullName - WHEN 1 == (SELECT flag from _electric_trigger_settings WHERE tablename == '${fkTable.namespace}.${fkTable.tableName}') AND + WHEN 1 == (SELECT flag from _electric_trigger_settings WHERE tablename == '$fkTableNamespace.$fkTableName') AND 1 == (SELECT value from _electric_meta WHERE key == 'compensations') BEGIN INSERT INTO _electric_oplog (namespace, tablename, optype, primaryKey, newRow, oldRow, timestamp) - SELECT '${fkTable.namespace}', '${fkTable.tableName}', 'UPDATE', json_object($joinedFkPKs), json_object($joinedFkCols), NULL, NULL - FROM ${fkTable.namespace}.${fkTable.tableName} WHERE ${foreignKey.parentKey} = new.${foreignKey.childKey}; + SELECT '$fkTableNamespace', '$fkTableName', 'UPDATE', json_object($joinedFkPKs), json_object($joinedFkPKs), NULL, NULL + FROM $fkTableNamespace.$fkTableName WHERE ${foreignKey.parentKey} = new.${foreignKey.childKey}; END; ''', 'DROP TRIGGER IF EXISTS compensation_update_${namespace}_${tableName}_${foreignKey.childKey}_into_oplog;', ''' CREATE TRIGGER compensation_update_${namespace}_${tableName}_${foreignKey.childKey}_into_oplog AFTER UPDATE ON $namespace.$tableName - WHEN 1 == (SELECT flag from _electric_trigger_settings WHERE tablename == '${fkTable.namespace}.${fkTable.tableName}') AND + WHEN 1 == (SELECT flag from _electric_trigger_settings WHERE tablename == '$fkTableNamespace.$fkTableName') AND 1 == (SELECT value from _electric_meta WHERE key == 'compensations') BEGIN INSERT INTO _electric_oplog (namespace, tablename, optype, primaryKey, newRow, oldRow, timestamp) - SELECT '${fkTable.namespace}', '${fkTable.tableName}', 'UPDATE', json_object($joinedFkPKs), json_object($joinedFkCols), NULL, NULL - FROM ${fkTable.namespace}.${fkTable.tableName} WHERE ${foreignKey.parentKey} = new.${foreignKey.childKey}; + SELECT '$fkTableNamespace', '$fkTableName', 'UPDATE', json_object($joinedFkPKs), json_object($joinedFkPKs), NULL, NULL + FROM $fkTableNamespace.$fkTableName WHERE ${foreignKey.parentKey} = new.${foreignKey.childKey}; END; ''', ].map(Statement.new).toList(); @@ -185,16 +192,11 @@ List generateCompensationTriggers( /// @returns An array of SQLite statements that add the necessary oplog and compensation triggers. List generateTableTriggers( TableFullName tableFullName, - Tables tables, + Table table, ) { - final table = tables[tableFullName]; - if (table == null) { - throw Exception( - 'Could not generate triggers for $tableFullName. Table not found.', - ); - } final oplogTriggers = generateOplogTriggers(tableFullName, table); - final fkTriggers = generateCompensationTriggers(tableFullName, table, tables); + final fkTriggers = + generateCompensationTriggers(tableFullName, table); //, tables) return [...oplogTriggers, ...fkTriggers]; } @@ -203,8 +205,8 @@ List generateTableTriggers( /// @returns An array of SQLite statements that add the necessary oplog and compensation triggers for all tables. List generateTriggers(Tables tables) { final List tableTriggers = []; - tables.forEach((tableFullName, _table) { - final triggers = generateTableTriggers(tableFullName, tables); + tables.forEach((tableFullName, table) { + final triggers = generateTableTriggers(tableFullName, table); //, tables) tableTriggers.addAll(triggers); }); @@ -213,7 +215,7 @@ List generateTriggers(Tables tables) { Statement( 'CREATE TABLE _electric_trigger_settings(tablename TEXT PRIMARY KEY, flag INTEGER);', ), - ...tableTriggers + ...tableTriggers, ]; return stmts; diff --git a/packages/electricsql/lib/src/proto/satellite.pb.dart b/packages/electricsql/lib/src/proto/satellite.pb.dart index cef64169..965564d8 100644 --- a/packages/electricsql/lib/src/proto/satellite.pb.dart +++ b/packages/electricsql/lib/src/proto/satellite.pb.dart @@ -893,6 +893,11 @@ class SatRelationColumn extends $pb.GeneratedMessage { ? '' : 'primaryKey', protoName: 'primaryKey') + ..aOB( + 4, + const $core.bool.fromEnvironment('protobuf.omit_field_names') + ? '' + : 'isNullable') ..hasRequiredFields = false; SatRelationColumn._() : super(); @@ -900,6 +905,7 @@ class SatRelationColumn extends $pb.GeneratedMessage { $core.String? name, $core.String? type, $core.bool? primaryKey, + $core.bool? isNullable, }) { final _result = create(); if (name != null) { @@ -911,6 +917,9 @@ class SatRelationColumn extends $pb.GeneratedMessage { if (primaryKey != null) { _result.primaryKey = primaryKey; } + if (isNullable != null) { + _result.isNullable = isNullable; + } return _result; } factory SatRelationColumn.fromBuffer($core.List<$core.int> i, @@ -975,6 +984,18 @@ class SatRelationColumn extends $pb.GeneratedMessage { $core.bool hasPrimaryKey() => $_has(2); @$pb.TagNumber(3) void clearPrimaryKey() => clearField(3); + + @$pb.TagNumber(4) + $core.bool get isNullable => $_getBF(3); + @$pb.TagNumber(4) + set isNullable($core.bool v) { + $_setBool(3, v); + } + + @$pb.TagNumber(4) + $core.bool hasIsNullable() => $_has(3); + @$pb.TagNumber(4) + void clearIsNullable() => clearField(4); } class SatRelation extends $pb.GeneratedMessage { diff --git a/packages/electricsql/lib/src/proto/satellite.pbenum.dart b/packages/electricsql/lib/src/proto/satellite.pbenum.dart index a143e933..cd9467e2 100644 --- a/packages/electricsql/lib/src/proto/satellite.pbenum.dart +++ b/packages/electricsql/lib/src/proto/satellite.pbenum.dart @@ -171,7 +171,7 @@ class SatInStartReplicationResp_ReplicationError_Code extends $pb.ProtobufEnum { ]; static final $core - .Map<$core.int, SatInStartReplicationResp_ReplicationError_Code> + .Map<$core.int, SatInStartReplicationResp_ReplicationError_Code> _byValue = $pb.ProtobufEnum.initByValue(values); static SatInStartReplicationResp_ReplicationError_Code? valueOf( $core.int value) => diff --git a/packages/electricsql/lib/src/proto/satellite.pbjson.dart b/packages/electricsql/lib/src/proto/satellite.pbjson.dart index 86f2ce29..6966c598 100644 --- a/packages/electricsql/lib/src/proto/satellite.pbjson.dart +++ b/packages/electricsql/lib/src/proto/satellite.pbjson.dart @@ -283,12 +283,13 @@ const SatRelationColumn$json = const { const {'1': 'name', '3': 1, '4': 1, '5': 9, '10': 'name'}, const {'1': 'type', '3': 2, '4': 1, '5': 9, '10': 'type'}, const {'1': 'primaryKey', '3': 3, '4': 1, '5': 8, '10': 'primaryKey'}, + const {'1': 'is_nullable', '3': 4, '4': 1, '5': 8, '10': 'isNullable'}, ], }; /// Descriptor for `SatRelationColumn`. Decode as a `google.protobuf.DescriptorProto`. final $typed_data.Uint8List satRelationColumnDescriptor = $convert.base64Decode( - 'ChFTYXRSZWxhdGlvbkNvbHVtbhISCgRuYW1lGAEgASgJUgRuYW1lEhIKBHR5cGUYAiABKAlSBHR5cGUSHgoKcHJpbWFyeUtleRgDIAEoCFIKcHJpbWFyeUtleQ=='); + 'ChFTYXRSZWxhdGlvbkNvbHVtbhISCgRuYW1lGAEgASgJUgRuYW1lEhIKBHR5cGUYAiABKAlSBHR5cGUSHgoKcHJpbWFyeUtleRgDIAEoCFIKcHJpbWFyeUtleRIfCgtpc19udWxsYWJsZRgEIAEoCFIKaXNOdWxsYWJsZQ=='); @$core.Deprecated('Use satRelationDescriptor instead') const SatRelation$json = const { '1': 'SatRelation', diff --git a/packages/electricsql/lib/src/satellite/client.dart b/packages/electricsql/lib/src/satellite/client.dart index 114b9825..27fd0907 100644 --- a/packages/electricsql/lib/src/satellite/client.dart +++ b/packages/electricsql/lib/src/satellite/client.dart @@ -11,13 +11,14 @@ import 'package:electricsql/src/satellite/satellite.dart'; import 'package:electricsql/src/satellite/shapes/cache.dart'; import 'package:electricsql/src/satellite/shapes/types.dart'; import 'package:electricsql/src/sockets/sockets.dart'; +import 'package:electricsql/src/util/bitmask_helpers.dart'; import 'package:electricsql/src/util/common.dart'; import 'package:electricsql/src/util/debug/debug.dart'; import 'package:electricsql/src/util/extension.dart'; import 'package:electricsql/src/util/proto.dart'; import 'package:electricsql/src/util/types.dart'; import 'package:events_emitter/events_emitter.dart'; -import 'package:fpdart/fpdart.dart'; +import 'package:logging/logging.dart'; import 'package:retry/retry.dart' as retry_lib; class IncomingHandler { @@ -39,7 +40,7 @@ class SatelliteClient extends EventEmitter implements Client { final SocketFactory socketFactory; final SatelliteClientOpts opts; final Notifier notifier; - Completer? initializing; + Completer? initializing; Socket? socket; @@ -78,24 +79,6 @@ class SatelliteClient extends EventEmitter implements Client { ); } - Either _toMessage(Uint8List data) { - final code = data[0]; - final type = getMsgFromCode(code); - - if (type == null) { - return Left( - SatelliteException( - SatelliteErrorCode.unexpectedMessageType, - '$code', - ), - ); - } - - return Right( - DecodedMessage(decodeMessage(data.sublist(1), type), type), - ); - } - IncomingHandler getIncomingHandlerForMessage(SatMsgType msgType) { switch (msgType) { case SatMsgType.authResp: @@ -204,15 +187,19 @@ class SatelliteClient extends EventEmitter implements Client { } @override - Future> connect({ + Future connect({ bool Function(Object error, int attempt)? retryHandler, }) async { - Future _attemptBody() { - initializing = Completer(); - // This is so that errors can be sent to the completer even when no one is - // waiting for the future - initializing!.future.ignore(); + if (!isClosed()) { + close(); + } + + initializing = Completer(); + // This is so that errors can be sent to the completer even when no one is + // waiting for the future + initializing!.future.ignore(); + Future _attemptBody() { final Completer connectCompleter = Completer(); // TODO: ensure any previous socket is closed, or reject @@ -287,16 +274,14 @@ class SatelliteClient extends EventEmitter implements Client { ); } catch (e) { // We're very sure that no calls are going to modify `this.initializing` before this promise resolves - initializing!.completeError(e); + initializing?.completeError(e); initializing = null; rethrow; } - - return const Right(null); } @override - Future close() async { + void close() { logger.info('closing client'); outbound = resetReplication(outbound.enqueuedLsn, outbound.ackLsn, null); @@ -337,30 +322,34 @@ class SatelliteClient extends EventEmitter implements Client { } void handleIncoming(Uint8List data) { - final messageOrError = _toMessage(data); - logger.info( - 'Received message ${messageOrError.match((error) => error.toString(), (a) => a.msgType.name)}', - ); + bool handleIsRpc = false; + try { + final messageInfo = toMessage(data); - messageOrError.match( - (error) { - emit('error', error); - }, - (messageInfo) { - final handler = getIncomingHandlerForMessage(messageInfo.msgType); - try { - final response = handler.handle(messageInfo.msg); + if (logger.level <= Level.FINE) { + logger.fine('[proto] recv: ${msgToString(messageInfo.msg)}'); + } - if (handler.isRpc) { - emit('rpc_response', response); - } - } catch (error) { - logger.warning( - 'uncaught errors while processing incoming message: $error', - ); + final handler = getIncomingHandlerForMessage(messageInfo.msgType); + final response = handler.handle(messageInfo.msg); + handleIsRpc = handler.isRpc; + if (handleIsRpc) { + emit('rpc_response', response); + } + } catch (error) { + if (error is SatelliteException) { + if (handleIsRpc) { + emit('error', error); } - }, - ); + // no one is watching this error + logger.warning( + 'unhandled satellite errors while processing incoming message: $error', + ); + } else { + // This is an unexpected runtime error + rethrow; + } + } } @override @@ -404,7 +393,7 @@ class SatelliteClient extends EventEmitter implements Client { } @override - Either enqueueTransaction( + void enqueueTransaction( DataTransaction transaction, ) { if (outbound.isReplicating != ReplicationStatus.active) { @@ -420,16 +409,14 @@ class SatelliteClient extends EventEmitter implements Client { if (throttledPushTransaction != null) { throttledPushTransaction!.call(); } - - return const Right(null); } @override - Future startReplication( + Future startReplication( LSN? lsn, String? schemaVersion, List? subscriptionIds, - ) { + ) async { if (inbound.isReplicating != ReplicationStatus.stopped) { throw SatelliteException( SatelliteErrorCode.replicationAlreadyStarted, @@ -453,7 +440,9 @@ class SatelliteClient extends EventEmitter implements Client { } request = SatInStartReplicationReq(schemaVersion: schemaVersion); } else { - logger.info('starting replication with lsn: ${base64.encode(lsn)}'); + logger.info( + 'starting replication with lsn: ${base64.encode(lsn)} subscriptions: $subscriptionIds', + ); request = SatInStartReplicationReq( lsn: lsn, subscriptionIds: subscriptionIds, @@ -461,21 +450,32 @@ class SatelliteClient extends EventEmitter implements Client { } // Then set the replication state - final future = rpc(request).then((_) { - initializing?.complete(); + inbound = resetReplication(lsn, lsn, ReplicationStatus.starting); + + return rpc(request).then((resp) { + // FIXME: process handles BEHIND_WINDOW, we cant reject or resolve + // initializing. If process returns without triggering another + // RPC, initializing will never resolve. + // We shall improve this code to make no assumptions on how + // process handles errors + if (resp.error != null) { + if (resp.error!.code != SatelliteErrorCode.behindWindow) { + initializing?.completeError(resp.error!); + } + } else { + initializing?.complete(); + } initializing = null; + return resp; }).catchError((Object e) { initializing?.completeError(e); initializing = null; throw e; }); - inbound = resetReplication(lsn, lsn, ReplicationStatus.starting); - - return future; } @override - Future stopReplication() { + Future stopReplication() { if (inbound.isReplicating != ReplicationStatus.active) { return Future.error( SatelliteException( @@ -487,11 +487,13 @@ class SatelliteClient extends EventEmitter implements Client { inbound.isReplicating = ReplicationStatus.stopping; final request = SatInStopReplicationReq(); - return rpc(request); + return rpc(request); } void sendMessage(Object request) { - logger.fine('Sending message ${request.runtimeType}($request)'); + if (logger.level <= Level.FINE) { + logger.fine('[proto] send: ${msgToString(request)}'); + } final _socket = socket; if (_socket == null) { throw SatelliteException( @@ -543,6 +545,9 @@ class SatelliteClient extends EventEmitter implements Client { final respValue = distinguishOn(resp as Object); final reqValue = distinguishOn(request); if (respValue == reqValue) { + errorListener?.cancel(); + errorListener = null; + return completer.complete(resp); } else { // This WAS an RPC response, but not the one we were expecting, waiting more @@ -563,6 +568,10 @@ class SatelliteClient extends EventEmitter implements Client { rpcRespListener = on('rpc_response', (resp) { rpcRespListener?.cancel(); rpcRespListener = null; + + errorListener?.cancel(); + errorListener = null; + completer.complete(resp as T); }); } @@ -608,23 +617,25 @@ class SatelliteClient extends EventEmitter implements Client { return AuthResponse(serverId, error); } - void handleStartResp(SatInStartReplicationResp resp) { + StartReplicationResponse handleStartResp(SatInStartReplicationResp resp) { if (inbound.isReplicating == ReplicationStatus.starting) { if (resp.hasErr()) { inbound.isReplicating = ReplicationStatus.stopped; - emit('error', startReplicationErrorToSatelliteError(resp.err)); + return StartReplicationResponse( + error: startReplicationErrorToSatelliteError(resp.err), + ); } else { inbound.isReplicating = ReplicationStatus.active; } } else { - emit( - 'error', - SatelliteException( + return StartReplicationResponse( + error: SatelliteException( SatelliteErrorCode.unexpectedState, "unexpected state ${inbound.isReplicating} handling 'start' response", ), ); } + return StartReplicationResponse(); } void handleInStartReplicationReq(SatInStartReplicationReq message) { @@ -804,8 +815,13 @@ class SatelliteClient extends EventEmitter implements Client { schemaName: relation.schema, // TODO tableName: relation.table, tableType: relation.tableType, - columns: relation.columns - .map((c) => SatRelationColumn(name: c.name, type: c.type)), + columns: relation.columns.map( + (c) => SatRelationColumn( + name: c.name, + type: c.type, + isNullable: c.isNullable, + ), + ), ); sendMessage(satRelation); @@ -840,13 +856,13 @@ class SatelliteClient extends EventEmitter implements Client { } } - void handleStopResp(SatInStopReplicationResp value) { + StopReplicationResponse handleStopResp(SatInStopReplicationResp value) { if (inbound.isReplicating == ReplicationStatus.stopping) { inbound.isReplicating = ReplicationStatus.stopped; + return StopReplicationResponse(); } else { - emit( - 'error', - SatelliteException( + return StopReplicationResponse( + error: SatelliteException( SatelliteErrorCode.unexpectedState, "unexpected state ${inbound.isReplicating} handling 'stop' response", ), @@ -895,6 +911,7 @@ class SatelliteClient extends EventEmitter implements Client { (c) => RelationColumn( name: c.name, type: c.type, + isNullable: c.isNullable, primaryKey: c.primaryKey, ), ) @@ -918,9 +935,15 @@ class SatelliteClient extends EventEmitter implements Client { } void handleErrorResp(SatErrorResp error) { + // TODO: this causing intermittent issues in tests because + // no one might catch this error. We shall pass this information + // as part of connectivity state emit( 'error', - Exception('server replied with error code: ${error.errorType}'), + SatelliteException( + SatelliteErrorCode.serverError, + 'server replied with error code: ${error.errorType}', + ), ); } @@ -1193,22 +1216,6 @@ Record? deserializeRow( ); } -void setMaskBit(List array, int indexFromStart) { - final byteIndex = (indexFromStart / 8).floor(); - final bitIndex = 7 - (indexFromStart % 8); - - final mask = 0x01 << bitIndex; - array[byteIndex] = array[byteIndex] | mask; -} - -int getMaskBit(List array, int indexFromStart) { - if (array.isEmpty) return 0; - final byteIndex = (indexFromStart / 8).floor(); - final bitIndex = 7 - (indexFromStart % 8); - - return (array[byteIndex] >>> bitIndex) & 0x01; -} - int calculateNumBytes(int columnNum) { final rem = columnNum % 8; if (rem == 0) { @@ -1264,3 +1271,17 @@ Uint8List encodeSocketMessage(SatMsgType msgType, Object msg) { return buffer; } + +DecodedMessage toMessage(Uint8List data) { + final code = data[0]; + final type = getMsgFromCode(code); + + if (type == null) { + throw SatelliteException( + SatelliteErrorCode.unexpectedMessageType, + '$code', + ); + } + + return DecodedMessage(decodeMessage(data.sublist(1), type), type); +} diff --git a/packages/electricsql/lib/src/satellite/config.dart b/packages/electricsql/lib/src/satellite/config.dart index 46d549d6..586f77c6 100644 --- a/packages/electricsql/lib/src/satellite/config.dart +++ b/packages/electricsql/lib/src/satellite/config.dart @@ -8,6 +8,7 @@ const SatelliteOpts kSatelliteDefaults = SatelliteOpts( shadowTable: QualifiedTablename('main', '_electric_shadow'), pollingInterval: Duration(milliseconds: 2000), minSnapshotWindow: Duration(milliseconds: 40), + clearOnBehindWindow: true, ); const kDefaultSatelliteTimeout = 3000; @@ -52,6 +53,9 @@ class SatelliteOpts { /// Throttle snapshotting to once per `minSnapshotWindow` milliseconds. final Duration minSnapshotWindow; + /// On reconnect, clear client's state if cannot catch up with Electric buffered WAL + final bool clearOnBehindWindow; + const SatelliteOpts({ required this.metaTable, required this.migrationsTable, @@ -60,6 +64,7 @@ class SatelliteOpts { required this.shadowTable, required this.pollingInterval, required this.minSnapshotWindow, + required this.clearOnBehindWindow, }); SatelliteOpts copyWith({ @@ -70,6 +75,7 @@ class SatelliteOpts { QualifiedTablename? shadowTable, Duration? pollingInterval, Duration? minSnapshotWindow, + bool? clearOnBehindWindow, }) { return SatelliteOpts( metaTable: metaTable ?? this.metaTable, @@ -79,6 +85,7 @@ class SatelliteOpts { shadowTable: shadowTable ?? this.shadowTable, pollingInterval: pollingInterval ?? this.pollingInterval, minSnapshotWindow: minSnapshotWindow ?? this.minSnapshotWindow, + clearOnBehindWindow: clearOnBehindWindow ?? this.clearOnBehindWindow, ); } @@ -89,6 +96,7 @@ class SatelliteOpts { oplogTable: overrides.oplogTable, pollingInterval: overrides.pollingInterval, minSnapshotWindow: overrides.minSnapshotWindow, + clearOnBehindWindow: overrides.clearOnBehindWindow, ); } } @@ -99,6 +107,7 @@ class SatelliteOverrides { final QualifiedTablename oplogTable; final Duration? pollingInterval; final Duration? minSnapshotWindow; + final bool? clearOnBehindWindow; SatelliteOverrides({ this.metaTable, @@ -106,5 +115,6 @@ class SatelliteOverrides { required this.oplogTable, this.pollingInterval, this.minSnapshotWindow, + this.clearOnBehindWindow, }); } diff --git a/packages/electricsql/lib/src/satellite/merge.dart b/packages/electricsql/lib/src/satellite/merge.dart index 65bf7f02..8aefad61 100644 --- a/packages/electricsql/lib/src/satellite/merge.dart +++ b/packages/electricsql/lib/src/satellite/merge.dart @@ -4,6 +4,67 @@ import 'package:electricsql/src/satellite/oplog.dart'; import 'package:electricsql/src/util/sets.dart'; import 'package:electricsql/src/util/types.dart' show Row; +/// Merge server-sent operation with local pending oplog to arrive at the same row state the server is at. +/// @param localOrigin string specifying the local origin +/// @param local local oplog entries +/// @param incomingOrigin string specifying the upstream origin +/// @param incoming incoming oplog entries +/// @returns Changes to be made to the shadow tables +PendingChanges mergeEntries( + String localOrigin, + List local, + String incomingOrigin, + List incoming, +) { + final localTableChanges = + localOperationsToTableChanges(local, (DateTime timestamp) { + return generateTag(localOrigin, timestamp); + }); + final incomingTableChanges = remoteOperationsToTableChanges(incoming); + + for (final incomingTableChangeEntry in incomingTableChanges.entries) { + final tablename = incomingTableChangeEntry.key; + final incomingMapping = incomingTableChangeEntry.value; + final localMapping = localTableChanges[tablename]; + + if (localMapping == null) { + continue; + } + + for (final incomingMappingEntry in incomingMapping.entries) { + final primaryKey = incomingMappingEntry.key; + final incomingChanges = incomingMappingEntry.value; + final localInfo = localMapping[primaryKey]; + if (localInfo == null) { + continue; + } + final localChanges = localInfo.oplogEntryChanges; + + final changes = mergeChangesLastWriteWins( + localOrigin, + localChanges.changes, + incomingOrigin, + incomingChanges.changes, + incomingChanges.fullRow, + ); + late final ChangesOpType optype; + + final tags = mergeOpTags(localChanges, incomingChanges); + if (tags.isEmpty) { + optype = ChangesOpType.delete; + } else { + optype = ChangesOpType.upsert; + } + + incomingChanges.changes = changes; + incomingChanges.optype = optype; + incomingChanges.tags = tags; + } + } + + return incomingTableChanges; +} + /// Merge two sets of changes, using the timestamp to arbitrate conflicts /// so that the last write wins. /// diff --git a/packages/electricsql/lib/src/satellite/mock.dart b/packages/electricsql/lib/src/satellite/mock.dart index e80fe2ba..d2261e0c 100644 --- a/packages/electricsql/lib/src/satellite/mock.dart +++ b/packages/electricsql/lib/src/satellite/mock.dart @@ -60,10 +60,7 @@ class MockSatelliteProcess implements Satellite { } @override - Future start( - AuthConfig authConfig, { - SatelliteReplicationOptions? opts, - }) async { + Future start(AuthConfig authConfig) async { await Future.delayed(const Duration(milliseconds: 50)); return ConnectionWrapper( @@ -246,13 +243,13 @@ class MockSatelliteClient extends EventEmitter implements Client { } @override - Future close() { + void close() { closed = true; _removeAllListeners(); for (final t in timeouts) { t.cancel(); } - return Future.value(null); + return; } void _removeAllListeners() { @@ -273,7 +270,7 @@ class MockSatelliteClient extends EventEmitter implements Client { } @override - Future startReplication( + Future startReplication( LSN? lsn, String? schemaVersion, List? subscriptionIds, @@ -289,30 +286,34 @@ class MockSatelliteClient extends EventEmitter implements Client { timeouts.add(t); if (lsn != null && bytesToNumber(lsn) == kMockBehindWindowLsn) { - return Future.error( - SatelliteException( - SatelliteErrorCode.behindWindow, - 'MOCK BEHIND_WINDOW_LSN ERROR', + return Future.value( + StartReplicationResponse( + error: SatelliteException( + SatelliteErrorCode.behindWindow, + 'MOCK BEHIND_WINDOW_LSN ERROR', + ), ), ); } if (lsn != null && bytesToNumber(lsn) == kMockInvalidPositionLsn) { - return Future.error( - SatelliteException( - SatelliteErrorCode.invalidPosition, - 'MOCK INVALID_POSITION ERROR', + return Future.value( + StartReplicationResponse( + error: SatelliteException( + SatelliteErrorCode.invalidPosition, + 'MOCK INVALID_POSITION ERROR', + ), ), ); } - return Future.value(); + return Future.value(StartReplicationResponse()); } @override - Future stopReplication() { + Future stopReplication() { replicating = false; - return Future.value(); + return Future.value(StopReplicationResponse()); } @override diff --git a/packages/electricsql/lib/src/satellite/oplog.dart b/packages/electricsql/lib/src/satellite/oplog.dart index c167a389..bf207e63 100644 --- a/packages/electricsql/lib/src/satellite/oplog.dart +++ b/packages/electricsql/lib/src/satellite/oplog.dart @@ -268,10 +268,10 @@ OplogTableChanges localOperationsToTableChanges( }); } -ShadowTableChanges remoteOperationsToTableChanges(List operations) { - final ShadowTableChanges initialValue = {}; +PendingChanges remoteOperationsToTableChanges(List operations) { + final PendingChanges initialValue = {}; - return operations.fold(initialValue, (acc, entry) { + return operations.fold(initialValue, (acc, entry) { final entryChanges = remoteEntryToChanges(entry); // Sort for deterministic key generation. @@ -322,7 +322,7 @@ typedef OplogColumnChanges = Map; // First key qualifiedTablenameStr // Second key primaryKey -typedef ShadowTableChanges = Map>; +typedef PendingChanges = Map>; typedef OplogTableChanges = Map>; class ShadowEntry with EquatableMixin { diff --git a/packages/electricsql/lib/src/satellite/process.dart b/packages/electricsql/lib/src/satellite/process.dart index add6bec0..0698af41 100644 --- a/packages/electricsql/lib/src/satellite/process.dart +++ b/packages/electricsql/lib/src/satellite/process.dart @@ -1,6 +1,7 @@ import 'dart:async'; import 'dart:convert'; +import 'package:collection/collection.dart'; import 'package:electricsql/src/auth/auth.dart'; import 'package:electricsql/src/electric/adapter.dart' hide Transaction; import 'package:electricsql/src/migrators/migrators.dart'; @@ -39,6 +40,7 @@ typedef SubscriptionNotifier = ({ }); const throwErrors = [ + SatelliteErrorCode.connectionFailed, SatelliteErrorCode.invalidPosition, SatelliteErrorCode.behindWindow, ]; @@ -130,10 +132,7 @@ class SatelliteProcess implements Satellite { } @override - Future start( - AuthConfig authConfig, { - SatelliteReplicationOptions? opts, - }) async { + Future start(AuthConfig authConfig) async { // TODO(dart): Explicitly enable foreign keys, which is used by Electric await adapter.run(Statement('PRAGMA foreign_keys = ON')); @@ -185,7 +184,7 @@ This means there is a notifier subscription leak.`'''); // Start polling to request a snapshot every `pollingInterval` ms. _pollingInterval = Timer.periodic( - this.opts.pollingInterval, + opts.pollingInterval, (_) => throttledSnapshot(), ); @@ -218,7 +217,7 @@ This means there is a notifier subscription leak.`'''); subscriptions.setState(subscriptionsState); } - final connectionFuture = _connectAndStartReplication(opts); + final connectionFuture = _connectAndStartReplication(); return ConnectionWrapper( connectionFuture: connectionFuture, ); @@ -234,22 +233,22 @@ This means there is a notifier subscription leak.`'''); List shapeDefs, ) async { final stmts = []; + final tablenames = []; // reverts to off on commit/abort stmts.add(Statement('PRAGMA defer_foreign_keys = ON')); - shapeDefs - .expand((ShapeDefinition def) => def.definition.selects) - .map( - (ShapeSelect select) => - // We need "fully qualified" table names in the next calls - 'main.${select.tablename}', - ) - .fold(stmts, (List stmts, String tablename) { + shapeDefs.expand((ShapeDefinition def) => def.definition.selects).map( + (ShapeSelect select) { + tablenames.add(select.tablename); + // We need "fully qualified" table names in the next calls + return 'main.${select.tablename}'; + }, + ).fold(stmts, (List stmts, String tablename) { stmts.addAll([ ..._disableTriggers([tablename]), Statement( 'DELETE FROM $tablename', ), - ..._enableTriggers([tablename]) + ..._enableTriggers([tablename]), ]); return stmts; // does not delete shadow rows but we can do that @@ -304,7 +303,7 @@ This means there is a notifier subscription leak.`'''); _potentialDataChangeSubscription = null; } - await client.close(); + client.close(); } @override @@ -459,7 +458,7 @@ This means there is a notifier subscription leak.`'''); columns, records, maxSqlParameters, - ) + ), ]); } @@ -512,28 +511,40 @@ This means there is a notifier subscription leak.`'''); } } + Future _handleBehindWindow() async { + logger.warning( + 'client cannot resume replication from server, resetting replication state', + ); + final subscriptionIds = subscriptions.getFulfilledSubscriptions(); + final List shapeDefs = subscriptionIds + .map((subId) => subscriptions.shapesForActiveSubscription(subId)) + .whereNotNull() + .expand((List s) => s.map((i) => i.definition)) + .toList(); + + await _resetClientState(); + + await _connectAndStartReplication(); + + logger.warning('successfully reconnected with server. re-subscribing.'); + + if (shapeDefs.isNotEmpty) { + unawaited(subscribe(shapeDefs)); + } + } + Future _handleSubscriptionError( SubscriptionErrorData errorData, ) async { final subscriptionId = errorData.subscriptionId; final satelliteError = errorData.error; - // this is obviously too conservative and note - // that it does not update meta transactionally - final ids = await subscriptions.unsubscribeAll(); - logger - .severe('Encountered a subscription error: ${satelliteError.message}'); - - _lsn = null; - await adapter.runInTransaction([ - _setMetaStatement('lsn', null), - _setMetaStatement('subscriptions', subscriptions.serialize()) - ]); + .severe('encountered a subscription error: ${satelliteError.message}'); - await client.unsubscribe(ids); + await _resetClientState(); - // Call the `onSuccess` callback for this subscription + // Call the `onFailure` callback for this subscription if (subscriptionId != null) { final completer = subscriptionNotifiers[subscriptionId]!; @@ -543,18 +554,33 @@ This means there is a notifier subscription leak.`'''); } } + Future _resetClientState() async { + _lsn = null; + + // TODO: this is obviously too conservative + // we should also work on updating subscriptions + // atomically on unsubscribe() + await subscriptions.unsubscribeAll(); + + await adapter.runInTransaction([ + _setMetaStatement('lsn', null), + _setMetaStatement('subscriptions', subscriptions.serialize()), + ]); + } + @visibleForTesting Future connectivityStateChanged( ConnectivityState status, ) async { connectivityState = status; + logger.fine('connectivity state changed $status'); // TODO: no op if state is the same switch (status) { case ConnectivityState.available: { setClientListeners(); - return _connectAndStartReplication(null); + return _connectAndStartReplication(); } case ConnectivityState.error: case ConnectivityState.disconnected: @@ -572,9 +598,7 @@ This means there is a notifier subscription leak.`'''); } } - Future _connectAndStartReplication( - SatelliteReplicationOptions? opts, - ) async { + Future _connectAndStartReplication() async { logger.info('connecting and starting replication'); final _authState = authState; @@ -593,27 +617,29 @@ This means there is a notifier subscription leak.`'''); // about fulfilled subscriptions final subscriptionIds = subscriptions.getFulfilledSubscriptions(); - await client.startReplication(_lsn, schemaVersion, subscriptionIds); - } catch (error) { - if (error is SatelliteException) { + final StartReplicationResponse(:error) = await client.startReplication( + _lsn, + schemaVersion, + subscriptionIds.isNotEmpty ? subscriptionIds : null, + ); + if (error != null) { if (error.code == SatelliteErrorCode.behindWindow && - opts?.clearOnBehindWindow == true) { - return _handleSubscriptionError( - SubscriptionErrorData( - error: error, - subscriptionId: null, - ), - ).then( - (_) => _connectAndStartReplication(opts), - ); - } - - if (throwErrors.contains(error.code)) { - rethrow; + opts.clearOnBehindWindow) { + return await _handleBehindWindow(); } + throw error; } - logger.warning("couldn't start replication: $error"); - return; + } catch (error) { + if (error is! SatelliteException) { + rethrow; + } + if (throwErrors.contains(error.code)) { + rethrow; + } + + logger.warning( + "couldn't start replication with reason: ${error.message}", + ); } } @@ -931,64 +957,6 @@ This means there is a notifier subscription leak.`'''); ); } - // Merge changes, with last-write-wins and add-wins semantics. - // clearTags field is used by the calling code to determine new value of - // the shadowTags - ShadowTableChanges mergeEntries( - String localOrigin, - List local, - String incomingOrigin, - List incoming, - ) { - final localTableChanges = - localOperationsToTableChanges(local, (DateTime timestamp) { - return generateTag(localOrigin, timestamp); - }); - final incomingTableChanges = remoteOperationsToTableChanges(incoming); - - for (final incomingTableChangeEntry in incomingTableChanges.entries) { - final tablename = incomingTableChangeEntry.key; - final incomingMapping = incomingTableChangeEntry.value; - final localMapping = localTableChanges[tablename]; - - if (localMapping == null) { - continue; - } - - for (final incomingMappingEntry in incomingMapping.entries) { - final primaryKey = incomingMappingEntry.key; - final incomingChanges = incomingMappingEntry.value; - final localInfo = localMapping[primaryKey]; - if (localInfo == null) { - continue; - } - final localChanges = localInfo.oplogEntryChanges; - - final changes = mergeChangesLastWriteWins( - localOrigin, - localChanges.changes, - incomingOrigin, - incomingChanges.changes, - incomingChanges.fullRow, - ); - late final ChangesOpType optype; - - final tags = mergeOpTags(localChanges, incomingChanges); - if (tags.isEmpty) { - optype = ChangesOpType.delete; - } else { - optype = ChangesOpType.upsert; - } - - incomingChanges.changes = changes; - incomingChanges.optype = optype; - incomingChanges.tags = tags; - } - } - - return incomingTableChanges; - } - @visibleForTesting Future updateRelations(Relation rel) async { if (rel.tableType == SatRelation_RelationType.TABLE) { @@ -1182,7 +1150,7 @@ This means there is a notifier subscription leak.`'''); final allStatements = [ ..._disableTriggers(notNewTableNames), ...stmts, - ..._enableTriggers(tablenames) + ..._enableTriggers(tablenames), ]; if (transaction.migrationVersion != null) { @@ -1241,7 +1209,7 @@ This means there is a notifier subscription leak.`'''); Statement( 'UPDATE $triggers SET flag = ? WHERE $tablesOr', [if (flag) 1 else 0, ...tablenames], - ) + ), ]; } else { return []; @@ -1365,6 +1333,7 @@ This means there is a notifier subscription leak.`'''); RelationColumn( name: c['name']! as String, type: c['type']! as String, + isNullable: (c['notnull']! as int) == 0, primaryKey: (c['pk']! as int) > 0, ), ); @@ -1506,7 +1475,7 @@ List generateTriggersForTable(MigrationTable tbl) { }).toList(), ); final fullTableName = '${table.namespace}.${table.tableName}'; - return generateOplogTriggers(fullTableName, table); + return generateTableTriggers(fullTableName, table); } class _WhereAndValues { diff --git a/packages/electricsql/lib/src/satellite/satellite.dart b/packages/electricsql/lib/src/satellite/satellite.dart index 63030a48..fde65192 100644 --- a/packages/electricsql/lib/src/satellite/satellite.dart +++ b/packages/electricsql/lib/src/satellite/satellite.dart @@ -36,12 +36,6 @@ class ConnectionWrapper { }); } -class SatelliteReplicationOptions { - final bool clearOnBehindWindow; - - SatelliteReplicationOptions({required this.clearOnBehindWindow}); -} - abstract class Satellite { DbName get dbName; DatabaseAdapter get adapter; @@ -50,10 +44,7 @@ abstract class Satellite { ConnectivityState? connectivityState; - Future start( - AuthConfig authConfig, { - SatelliteReplicationOptions? opts, - }); + Future start(AuthConfig authConfig); Future stop(); Future subscribe( List shapeDefinitions, @@ -65,17 +56,17 @@ abstract class Client { Future connect({ bool Function(Object error, int attempt)? retryHandler, }); - Future close(); + void close(); Future authenticate( AuthState authState, ); bool isClosed(); - Future startReplication( + Future startReplication( LSN? lsn, String? schemaVersion, List? subscriptionIds, ); - Future stopReplication(); + Future stopReplication(); void subscribeToRelations(void Function(Relation relation) callback); void subscribeToTransactions( Future Function(Transaction transaction) callback, diff --git a/packages/electricsql/lib/src/sockets/sockets.dart b/packages/electricsql/lib/src/sockets/sockets.dart index 0d0580e8..e8bdbba9 100644 --- a/packages/electricsql/lib/src/sockets/sockets.dart +++ b/packages/electricsql/lib/src/sockets/sockets.dart @@ -1,6 +1,7 @@ import 'dart:async'; import 'dart:typed_data'; +import 'package:electricsql/src/util/types.dart'; import 'package:meta/meta.dart'; import 'package:web_socket_channel/web_socket_channel.dart'; @@ -88,7 +89,14 @@ abstract class WebSocketBase implements Socket { @override void onError(void Function(Object error) cb) { - _errorCallbacks.add(cb); + _errorCallbacks.add( + (_) => cb( + SatelliteException( + SatelliteErrorCode.connectionFailed, + 'failed to establish connection', + ), + ), + ); } @override diff --git a/packages/electricsql/lib/src/util/bitmask_helpers.dart b/packages/electricsql/lib/src/util/bitmask_helpers.dart new file mode 100644 index 00000000..c0fc6d36 --- /dev/null +++ b/packages/electricsql/lib/src/util/bitmask_helpers.dart @@ -0,0 +1,47 @@ +/// Sets a bit in the mask. Modifies the mask in place. +/// +/// Mask is represented as a Uint8Array, which will be serialized element-by-element as a mask. +/// This means that `indexFromStart` enumerates all bits in the mask in the order they will be serialized: +/// +/// @example +/// setMaskBit(new Uint8Array([0b00000000, 0b00000000]), 0) +/// // => new Uint8Array([0b10000000, 0b00000000]) +/// +/// @example +/// setMaskBit(new Uint8Array([0b00000000, 0b00000000]), 8) +/// // => new Uint8Array([0b00000000, 0b10000000]) +/// +/// @param array Uint8Array mask +/// @param indexFromStart bit index in the mask +void setMaskBit(List array, int indexFromStart) { + final byteIndex = (indexFromStart / 8).floor(); + final bitIndex = 7 - (indexFromStart % 8); + + final mask = 0x01 << bitIndex; + array[byteIndex] = array[byteIndex] | mask; +} + +/// Reads a bit in the mask +/// +/// Mask is represented as a Uint8Array, which will be serialized element-by-element as a mask. +/// This means that `indexFromStart` enumerates all bits in the mask in the order they will be serialized: +/// +/// @example +/// getMaskBit(new Uint8Array([0b10000000, 0b00000000]), 0) +/// // => 1 +/// +/// @example +/// getMaskBit(new Uint8Array([0b10000000, 0b00000000]), 8) +/// // => 0 +/// +/// @param array Uint8Array mask +/// @param indexFromStart bit index in the mask +int getMaskBit(List array, int indexFromStart) { + if (array.isEmpty) return 0; + final byteIndex = (indexFromStart / 8).floor(); + final bitIndex = 7 - (indexFromStart % 8); + + final value = (array[byteIndex] >>> bitIndex) & 0x01; + assert(value == 0 || value == 1, 'Invalid bit value $value'); + return value; +} diff --git a/packages/electricsql/lib/src/util/proto.dart b/packages/electricsql/lib/src/util/proto.dart index a05cd696..a6726e32 100644 --- a/packages/electricsql/lib/src/util/proto.dart +++ b/packages/electricsql/lib/src/util/proto.dart @@ -1,7 +1,11 @@ +import 'dart:convert'; import 'dart:typed_data'; +import 'package:collection/collection.dart'; import 'package:electricsql/src/proto/satellite.pb.dart'; import 'package:electricsql/src/satellite/shapes/types.dart'; +import 'package:electricsql/src/util/bitmask_helpers.dart'; +import 'package:electricsql/src/util/common.dart'; import 'package:electricsql/src/util/types.dart'; const kProtobufPackage = 'Electric.Satellite.v1_4'; @@ -343,3 +347,98 @@ List shapeRequestToSatShapeReq(List shapeRequests) { } return shapeReqs; } + +String msgToString(Object message) { + if (message is SatAuthReq) { + return '#SatAuthReq{id: ${message.id}, token: ${message.token}}'; + } else if (message is SatAuthResp) { + return '#SatAuthResp{id: ${message.id}}'; + } else if (message is SatPingReq) { + return '#SatPingReq{}'; + } else if (message is SatPingResp) { + return '#SatPingResp{lsn: ${message.hasLsn() ? base64.encode(message.lsn) : 'NULL'}}'; + } else if (message is SatErrorResp) { + return '#SatErrorResp{type: ${message.errorType.name}}'; + } else if (message is SatInStartReplicationResp) { + return '#SatInStartReplicationResp{}'; + } else if (message is SatInStartReplicationReq) { + final schemaVersion = + message.hasSchemaVersion() ? ' schema: ${message.schemaVersion},' : ''; + return '#SatInStartReplicationReq{lsn: ${base64.encode(message.lsn)},$schemaVersion subscriptions: [${message.subscriptionIds}]}'; + } else if (message is SatInStopReplicationReq) { + return '#SatInStopReplicationReq{}'; + } else if (message is SatInStopReplicationResp) { + return '#SatInStopReplicationResp{}'; + } else if (message is SatOpLog) { + return '#SatOpLog{ops: [${message.ops.map(opToString).join(', ')}]}'; + } else if (message is SatRelation) { + final cols = message.columns + .map((x) => '${x.name}: ${x.type}${x.primaryKey ? ' PK' : ''}') + .join(', '); + return '#SatRelation{for: ${message.schemaName}.${message.tableName}, as: ${message.relationId}, cols: [$cols]}'; + } else if (message is SatMigrationNotification) { + return '#SatMigrationNotification{to: ${message.newSchemaVersion}, from: ${message.newSchemaVersion}}'; + } else if (message is SatSubsReq) { + return '#SatSubsReq{id: ${message.subscriptionId}, shapes: ${message.shapeRequests}}'; + } else if (message is SatSubsResp) { + if (message.hasErr()) { + final shapeErrors = message.err.shapeRequestError.map( + (x) => '${x.requestId}: ${x.code.name} (${x.message})', + ); + return '#SatSubsReq{id: ${message.subscriptionId}, err: ${message.err.code.name} (${message.err.message}), shapes: [$shapeErrors]}'; + } else { + return '#SatSubsReq{id: ${message.subscriptionId}}'; + } + } else if (message is SatSubsDataError) { + final shapeErrors = message.shapeRequestError.map( + (x) => '${x.requestId}: ${x.code.name} (${x.message})', + ); + final code = message.code.name; + return '#SatSubsDataError{id: ${message.subscriptionId}, code: $code, msg: "${message.message}", errors: [$shapeErrors]}'; + } else if (message is SatSubsDataBegin) { + return '#SatSubsDataBegin{id: ${message.subscriptionId}, lsn: ${base64.encode(message.lsn)}}'; + } else if (message is SatSubsDataEnd) { + return '#SatSubsDataEnd{}'; + } else if (message is SatShapeDataBegin) { + return '#SatShapeDataBegin{id: ${message.requestId}}'; + } else if (message is SatShapeDataEnd) { + return '#SatShapeDataEnd{}'; + } else if (message is SatUnsubsReq) { + return '#SatUnsubsReq{ids: ${message.subscriptionIds}}'; + } else if (message is SatUnsubsResp) { + return '#SatUnsubsResp{}'; + } + + assert(false, "Can't convert ${message.runtimeType} to string"); + return '#Unknown'; +} + +String opToString(SatTransOp op) { + if (op.hasBegin()) { + return '#Begin{lsn: ${base64.encode(op.begin.lsn)}, ts: ${op.begin.commitTimestamp}, isMigration: ${op.begin.isMigration}}'; + } + if (op.hasCommit()) return '#Commit{lsn: ${base64.encode(op.commit.lsn)}}'; + if (op.hasInsert()) { + return '#Insert{for: ${op.insert.relationId}, tags: [${op.insert.tags}], new: [${op.insert.hasRowData() ? rowToString(op.insert.rowData) : ''}]}'; + } + if (op.hasUpdate()) { + return '#Update{for: ${op.update.relationId}, tags: [${op.update.tags}], new: [${op.update.hasRowData() ? rowToString(op.update.rowData) : ''}], old: data: [${op.update.hasOldRowData() ? rowToString(op.update.oldRowData) : ''}]}'; + } + if (op.hasDelete()) { + return '#Delete{for: ${op.delete.relationId}, tags: [${op.delete.tags}], old: [${op.delete.hasOldRowData() ? rowToString(op.delete.oldRowData) : ''}]}'; + } + if (op.hasMigrate()) { + return '#Migrate{vsn: ${op.migrate.version}, for: ${op.migrate.table.name}, stmts: [${op.migrate.stmts.map((x) => x.sql.replaceAll('\n', '\\n')).join('; ')}]}'; + } + return ''; +} + +String rowToString(SatOpRow row) { + return row.values + .mapIndexed( + (i, x) => getMaskBit(row.nullsBitmask, i) == 0 + ? json.encode(TypeDecoder.text(x)) + : '∅', + ) + .join(', '); +} diff --git a/packages/electricsql/lib/src/util/types.dart b/packages/electricsql/lib/src/util/types.dart index 82b24b55..c05e07f9 100644 --- a/packages/electricsql/lib/src/util/types.dart +++ b/packages/electricsql/lib/src/util/types.dart @@ -43,6 +43,7 @@ class SatelliteException implements Exception { } enum SatelliteErrorCode { + connectionFailed, internal, timeout, replicationNotStarted, @@ -52,6 +53,7 @@ enum SatelliteErrorCode { protocolViolation, unknownDataType, authError, + serverError, subscriptionAlreadyExists, unexpectedSubscriptionState, @@ -263,16 +265,18 @@ class Relation with EquatableMixin { class RelationColumn with EquatableMixin { final String name; final String type; + final bool isNullable; final bool? primaryKey; RelationColumn({ required this.name, required this.type, + required this.isNullable, this.primaryKey, }); @override - List get props => [name, type, primaryKey]; + List get props => [name, type, isNullable, primaryKey]; } enum AckType { @@ -306,6 +310,18 @@ class AuthResponse { AuthResponse(this.serverId, this.error); } +class StartReplicationResponse { + final SatelliteException? error; + + StartReplicationResponse({this.error}); +} + +class StopReplicationResponse { + final SatelliteException? error; + + StopReplicationResponse({this.error}); +} + class TransactionEvent { final Transaction transaction; final void Function() ackCb; diff --git a/packages/electricsql/proto/satellite.proto b/packages/electricsql/proto/satellite.proto index a615740b..bce3cccc 100644 --- a/packages/electricsql/proto/satellite.proto +++ b/packages/electricsql/proto/satellite.proto @@ -176,6 +176,7 @@ message SatRelationColumn { string name = 1; string type = 2; bool primaryKey = 3; + bool is_nullable = 4; } message SatRelation { diff --git a/packages/electricsql/pubspec.yaml b/packages/electricsql/pubspec.yaml index 4d87c5df..bc7bbfdf 100644 --- a/packages/electricsql/pubspec.yaml +++ b/packages/electricsql/pubspec.yaml @@ -14,7 +14,6 @@ dependencies: equatable: ^2.0.0 events_emitter: ^0.5.2 fixnum: ^1.0.0 - fpdart: ^1.0.0 http: ^1.0.0 logging: ^1.0.0 meta: ^1.0.0 diff --git a/packages/electricsql/test/drivers/common.dart b/packages/electricsql/test/drivers/common.dart index 09815095..640176b2 100644 --- a/packages/electricsql/test/drivers/common.dart +++ b/packages/electricsql/test/drivers/common.dart @@ -9,7 +9,7 @@ void runTests(DatabaseAdapter Function() getAdapter) { await adapter.run(Statement("INSERT INTO items VALUES ('foo');")); final result = await adapter.query(Statement('SELECT * FROM items;')); expect(result, [ - {'value': 'foo'} + {'value': 'foo'}, ]); }); diff --git a/packages/electricsql/test/drivers/drift_test.dart b/packages/electricsql/test/drivers/drift_test.dart index da087d5a..010b5f26 100644 --- a/packages/electricsql/test/drivers/drift_test.dart +++ b/packages/electricsql/test/drivers/drift_test.dart @@ -79,6 +79,7 @@ class GenericDb extends GeneratedDatabase { } class NoVersionDelegatedDatabase extends DelegatedDatabase { + // ignore: unreachable_from_main final DelegatedDatabase db; NoVersionDelegatedDatabase( diff --git a/packages/electricsql/test/satellite/client_test.dart b/packages/electricsql/test/satellite/client_test.dart index bc56a9bd..dd46497e 100644 --- a/packages/electricsql/test/satellite/client_test.dart +++ b/packages/electricsql/test/satellite/client_test.dart @@ -50,7 +50,7 @@ void main() { }); tearDown(() async { - await client.close(); + client.close(); await server.close(); }); @@ -138,10 +138,10 @@ void main() { server.nextResponses([startResp]); try { - await client.startReplication(null, null, null); - fail('Should have failed'); + final resp = await client.startReplication(null, null, null); + expect(resp.error?.code, SatelliteErrorCode.behindWindow); } catch (e) { - expect((e as SatelliteException).code, SatelliteErrorCode.behindWindow); + fail('Should not throw. Error: $e'); } }); @@ -305,8 +305,8 @@ void main() { table: 'table', tableType: SatRelation_RelationType.TABLE, columns: [ - RelationColumn(name: 'name1', type: 'TEXT'), - RelationColumn(name: 'name2', type: 'TEXT'), + RelationColumn(name: 'name1', type: 'TEXT', isNullable: true), + RelationColumn(name: 'name2', type: 'TEXT', isNullable: true), ], ); @@ -316,8 +316,8 @@ void main() { tableName: 'table', tableType: SatRelation_RelationType.TABLE, columns: [ - SatRelationColumn(name: 'name1', type: 'TEXT'), - SatRelationColumn(name: 'name2', type: 'TEXT'), + SatRelationColumn(name: 'name1', type: 'TEXT', isNullable: true), + SatRelationColumn(name: 'name2', type: 'TEXT', isNullable: true), ], ); @@ -620,12 +620,24 @@ void main() { table: 'Items', tableType: SatRelation_RelationType.TABLE, columns: [ - RelationColumn(name: 'id', type: 'uuid'), - RelationColumn(name: 'content', type: 'text'), - RelationColumn(name: 'text_null', type: 'text'), - RelationColumn(name: 'text_null_default', type: 'text'), - RelationColumn(name: 'intvalue_null', type: 'integer'), - RelationColumn(name: 'intvalue_null_default', type: 'integer'), + RelationColumn(name: 'id', type: 'uuid', isNullable: false), + RelationColumn(name: 'content', type: 'text', isNullable: false), + RelationColumn(name: 'text_null', type: 'text', isNullable: true), + RelationColumn( + name: 'text_null_default', + type: 'text', + isNullable: true, + ), + RelationColumn( + name: 'intvalue_null', + type: 'integer', + isNullable: true, + ), + RelationColumn( + name: 'intvalue_null_default', + type: 'integer', + isNullable: true, + ), ], ); @@ -977,8 +989,8 @@ void main() { table: 'table', tableType: SatRelation_RelationType.TABLE, columns: [ - RelationColumn(name: 'name1', type: 'TEXT'), - RelationColumn(name: 'name2', type: 'TEXT'), + RelationColumn(name: 'name1', type: 'TEXT', isNullable: true), + RelationColumn(name: 'name2', type: 'TEXT', isNullable: true), ], ); diff --git a/packages/electricsql/test/satellite/common.dart b/packages/electricsql/test/satellite/common.dart index fb2f1599..798a16be 100644 --- a/packages/electricsql/test/satellite/common.dart +++ b/packages/electricsql/test/satellite/common.dart @@ -27,11 +27,13 @@ Map kTestRelations = { RelationColumn( name: 'id', type: 'INTEGER', + isNullable: false, primaryKey: true, ), RelationColumn( name: 'parent', type: 'INTEGER', + isNullable: true, primaryKey: false, ), ], @@ -45,16 +47,19 @@ Map kTestRelations = { RelationColumn( name: 'id', type: 'INTEGER', + isNullable: false, primaryKey: true, ), RelationColumn( name: 'value', type: 'TEXT', + isNullable: true, primaryKey: false, ), RelationColumn( name: 'other', type: 'INTEGER', + isNullable: true, primaryKey: false, ), ], @@ -68,6 +73,7 @@ Map kTestRelations = { RelationColumn( name: 'id', type: 'INTEGER', + isNullable: false, primaryKey: true, ), ], diff --git a/packages/electricsql/test/satellite/merge_test.dart b/packages/electricsql/test/satellite/merge_test.dart new file mode 100644 index 00000000..f251b367 --- /dev/null +++ b/packages/electricsql/test/satellite/merge_test.dart @@ -0,0 +1,47 @@ +import 'dart:convert'; + +import 'package:electricsql/src/satellite/merge.dart'; +import 'package:electricsql/src/satellite/oplog.dart'; +import 'package:test/test.dart'; + +void main() { + test('merging entries: local no-op updates should cancel incoming delete', + () { + final pk = primaryKeyToStr({'id': 1}); + + final local = [ + OplogEntry( + rowid: 0, + namespace: 'main', + tablename: 'public', + optype: OpType.update, + timestamp: '1970-01-02T03:46:41.000Z', // 100001000 as a unix timestamp + primaryKey: pk, + newRow: json.encode({'id': 1}), + oldRow: null, + clearTags: json.encode(['common@100000000']), + ), + ]; + + final remote = [ + OplogEntry( + rowid: 0, + namespace: 'main', + tablename: 'public', + optype: OpType.delete, + timestamp: '1970-01-02T03:46:42.000Z', // 100002000 as a unix timestamp + primaryKey: pk, + oldRow: json.encode({'id': 1, 'value': 'TEST'}), + clearTags: json.encode(['common@100000000']), + ), + ]; + + final merged = mergeEntries('local', local, 'remote', remote); + + // Merge should resolve into the UPSERT for this row, since the remote DELETE didn't observe this local update + expect(merged['main.public']![pk]!.optype, ChangesOpType.upsert); + + expect(merged['main.public']![pk]!.tags, ['local@100001000']); + expect(merged['main.public']![pk]!.fullRow, {'id': 1, 'value': 'TEST'}); + }); +} diff --git a/packages/electricsql/test/satellite/process_migration_test.dart b/packages/electricsql/test/satellite/process_migration_test.dart index b3384af1..a425952e 100644 --- a/packages/electricsql/test/satellite/process_migration_test.dart +++ b/packages/electricsql/test/satellite/process_migration_test.dart @@ -305,14 +305,14 @@ void main() { ), insertRow, updateRow, - insertExtendedWithoutValueRow + insertExtendedWithoutValueRow, ].map((Row row) { return { ...row, 'baz': null, }; }), - insertExtendedRow + insertExtendedRow, ]; expect(rowsAfterMigration.toSet(), expectedRowsAfterMigration.toSet()); @@ -349,7 +349,7 @@ void main() { Statement( 'UPDATE parent SET value = ?, other = ? WHERE id = ?;', ['still local', 5, 1], - ) + ), ], ); @@ -527,6 +527,68 @@ void main() { isTrue, ); }); + + final migrationWithFKs = [ + SchemaChange( + migrationType: SatOpMigrate_Type.CREATE_TABLE, + sql: ''' + CREATE TABLE "test_items" ( + "id" TEXT NOT NULL, + CONSTRAINT "test_items_pkey" PRIMARY KEY ("id") + ) WITHOUT ROWID; + ''', + table: SatOpMigrate_Table( + name: 'test_items', + columns: [SatOpMigrate_Column(name: 'id')], + fks: [], + pks: ['id'], + ), + ), + SchemaChange( + migrationType: SatOpMigrate_Type.CREATE_TABLE, + sql: ''' + CREATE TABLE "test_other_items" ( + "id" TEXT NOT NULL, + "item_id" TEXT, + -- CONSTRAINT "test_other_items_item_id_fkey" FOREIGN KEY ("item_id") REFERENCES "test_items" ("id"), + CONSTRAINT "test_other_items_pkey" PRIMARY KEY ("id") + ) WITHOUT ROWID; + ''', + table: SatOpMigrate_Table( + name: 'test_other_items', + columns: [ + SatOpMigrate_Column(name: 'id'), + SatOpMigrate_Column(name: 'item_id'), + ], + fks: [ + SatOpMigrate_ForeignKey( + fkCols: ['item_id'], + pkTable: 'test_items', + pkCols: ['id'], + ), + ], + pks: ['id'], + ), + ), + ]; + + test('apply another migration', () async { + final migrationTx = Transaction( + origin: 'remote', + commitTimestamp: Int64(DateTime.now().millisecondsSinceEpoch), + changes: migrationWithFKs, + lsn: [], + // starts at 3, because the app already defines 2 migrations + // (see test/support/migrations/migrations.js) + // which are loaded when Satellite is started + migrationVersion: '3', + ); + + // Apply the migration transaction + await satellite.applyTransaction(migrationTx); + + await assertDbHasTables(['test_items', 'test_other_items']); + }); } Future populateDB(SatelliteTestContext context) async { @@ -629,21 +691,25 @@ final addColumnRelation = Relation( RelationColumn( name: 'id', type: 'INTEGER', + isNullable: false, primaryKey: true, ), RelationColumn( name: 'value', type: 'TEXT', + isNullable: true, primaryKey: false, ), RelationColumn( name: 'other', type: 'INTEGER', + isNullable: true, primaryKey: false, ), RelationColumn( name: 'baz', type: 'TEXT', + isNullable: true, primaryKey: false, ), ], @@ -657,16 +723,19 @@ final newTableRelation = Relation( RelationColumn( name: 'id', type: 'TEXT', + isNullable: false, primaryKey: true, ), RelationColumn( name: 'foo', type: 'INTEGER', + isNullable: true, primaryKey: false, ), RelationColumn( name: 'bar', type: 'TEXT', + isNullable: true, primaryKey: false, ), ], diff --git a/packages/electricsql/test/satellite/process_tags_test.dart b/packages/electricsql/test/satellite/process_tags_test.dart index d23237e5..8d72dfc6 100644 --- a/packages/electricsql/test/satellite/process_tags_test.dart +++ b/packages/electricsql/test/satellite/process_tags_test.dart @@ -679,7 +679,7 @@ void main() { final userTable = await adapter.query(Statement('SELECT * FROM parent;')); final expectedUserTable = [ - {'id': 2, 'value': 'local', 'other': null} + {'id': 2, 'value': 'local', 'other': null}, ]; expect(expectedUserTable, userTable); }); diff --git a/packages/electricsql/test/satellite/process_test.dart b/packages/electricsql/test/satellite/process_test.dart index 6aa22a53..12f59d24 100644 --- a/packages/electricsql/test/satellite/process_test.dart +++ b/packages/electricsql/test/satellite/process_test.dart @@ -10,10 +10,10 @@ import 'package:electricsql/src/electric/adapter.dart' hide Transaction; import 'package:electricsql/src/migrators/migrators.dart'; import 'package:electricsql/src/notifiers/mock.dart'; import 'package:electricsql/src/notifiers/notifiers.dart'; +import 'package:electricsql/src/satellite/merge.dart'; import 'package:electricsql/src/satellite/mock.dart'; import 'package:electricsql/src/satellite/oplog.dart'; import 'package:electricsql/src/satellite/process.dart'; -import 'package:electricsql/src/satellite/satellite.dart'; import 'package:electricsql/src/satellite/shapes/types.dart'; import 'package:electricsql/src/util/common.dart'; import 'package:electricsql/src/util/tablename.dart'; @@ -80,7 +80,7 @@ void main() { final meta = await loadSatelliteMetaTable(adapter); expect(meta, { - 'compensations': 0, + 'compensations': 1, 'lastAckdRowId': '0', 'lastSentRowId': '0', 'lsn': '', @@ -281,7 +281,7 @@ void main() { OpType.insert, incomingTs, encodeTags([ - generateTag('remote', DateTime.fromMillisecondsSinceEpoch(incomingTs)) + generateTag('remote', DateTime.fromMillisecondsSinceEpoch(incomingTs)), ]), newValues: { 'id': 1, @@ -302,9 +302,7 @@ void main() { final local = await satellite.getEntries(); final localTimestamp = DateTime.parse(local[0].timestamp).millisecondsSinceEpoch; - final merged = satellite.mergeEntries(clientId, local, 'remote', [ - incomingEntry, - ]); + final merged = mergeEntries(clientId, local, 'remote', [incomingEntry]); final item = merged['main.parent']!['{"id":1}']; expect( @@ -369,9 +367,7 @@ void main() { oldValues: {}, ); - final merged = satellite.mergeEntries(clientId, local, 'remote', [ - incomingEntry, - ]); + final merged = mergeEntries(clientId, local, 'remote', [incomingEntry]); final item = merged['main.parent']!['{"id":1}']; expect( @@ -631,7 +627,7 @@ void main() { ), ]; - final merged = satellite.mergeEntries(clientId, local, 'remote', incoming); + final merged = mergeEntries(clientId, local, 'remote', incoming); final item = merged['main.parent']!['{"id":1}']; expect( @@ -715,7 +711,7 @@ void main() { ), ]; - final merged = satellite.mergeEntries(clientId, local, 'remote', incoming); + final merged = mergeEntries(clientId, local, 'remote', incoming); final item = merged['main.parent']!['{"id":1}']!; // The incoming entry modified the value of the `value` column to `'remote'` @@ -772,7 +768,7 @@ void main() { ]; final local = []; - final merged = satellite.mergeEntries(clientId, local, 'remote', incoming); + final merged = mergeEntries(clientId, local, 'remote', incoming); final item = merged['main.parent']!['{"id":1}']; expect( @@ -1280,10 +1276,7 @@ void main() { final base64lsn = base64.encode(numberToBytes(kMockBehindWindowLsn)); await satellite.setMeta('lsn', base64lsn); try { - final conn = await satellite.start( - authConfig, - opts: SatelliteReplicationOptions(clearOnBehindWindow: true), - ); + final conn = await satellite.start(authConfig); await conn.connectionFuture; final lsnAfter = await satellite.getMeta('lsn'); expect(lsnAfter, isNot(base64lsn)); diff --git a/packages/electricsql/test/satellite/serialization_test.dart b/packages/electricsql/test/satellite/serialization_test.dart index d2a9b33e..f9618021 100644 --- a/packages/electricsql/test/satellite/serialization_test.dart +++ b/packages/electricsql/test/satellite/serialization_test.dart @@ -11,13 +11,13 @@ void main() { table: 'table', tableType: SatRelation_RelationType.TABLE, columns: [ - RelationColumn(name: 'name1', type: 'TEXT'), - RelationColumn(name: 'name2', type: 'TEXT'), - RelationColumn(name: 'name3', type: 'TEXT'), - RelationColumn(name: 'int1', type: 'INTEGER'), - RelationColumn(name: 'int2', type: 'INTEGER'), - RelationColumn(name: 'float1', type: 'FLOAT4'), - RelationColumn(name: 'float2', type: 'FLOAT4'), + RelationColumn(name: 'name1', type: 'TEXT', isNullable: true), + RelationColumn(name: 'name2', type: 'TEXT', isNullable: true), + RelationColumn(name: 'name3', type: 'TEXT', isNullable: true), + RelationColumn(name: 'int1', type: 'INTEGER', isNullable: true), + RelationColumn(name: 'int2', type: 'INTEGER', isNullable: true), + RelationColumn(name: 'float1', type: 'FLOAT4', isNullable: true), + RelationColumn(name: 'float2', type: 'FLOAT4', isNullable: true), ], ); @@ -43,15 +43,15 @@ void main() { table: 'table', tableType: SatRelation_RelationType.TABLE, columns: [ - RelationColumn(name: 'bit0', type: 'TEXT'), - RelationColumn(name: 'bit1', type: 'TEXT'), - RelationColumn(name: 'bit2', type: 'TEXT'), - RelationColumn(name: 'bit3', type: 'TEXT'), - RelationColumn(name: 'bit4', type: 'TEXT'), - RelationColumn(name: 'bit5', type: 'TEXT'), - RelationColumn(name: 'bit6', type: 'TEXT'), - RelationColumn(name: 'bit7', type: 'TEXT'), - RelationColumn(name: 'bit8', type: 'TEXT'), + RelationColumn(name: 'bit0', type: 'TEXT', isNullable: true), + RelationColumn(name: 'bit1', type: 'TEXT', isNullable: true), + RelationColumn(name: 'bit2', type: 'TEXT', isNullable: true), + RelationColumn(name: 'bit3', type: 'TEXT', isNullable: true), + RelationColumn(name: 'bit4', type: 'TEXT', isNullable: true), + RelationColumn(name: 'bit5', type: 'TEXT', isNullable: true), + RelationColumn(name: 'bit6', type: 'TEXT', isNullable: true), + RelationColumn(name: 'bit7', type: 'TEXT', isNullable: true), + RelationColumn(name: 'bit8', type: 'TEXT', isNullable: true), ], ); diff --git a/packages/electricsql/test/support/satellite_helpers.dart b/packages/electricsql/test/support/satellite_helpers.dart index a387986e..fd11f62c 100644 --- a/packages/electricsql/test/support/satellite_helpers.dart +++ b/packages/electricsql/test/support/satellite_helpers.dart @@ -75,7 +75,7 @@ OplogEntry generateLocalOplogEntry( } if (optype != OpType.delete && clearTags == null) { tags = encodeTags([ - generateTag('remote', DateTime.fromMillisecondsSinceEpoch(timestamp)) + generateTag('remote', DateTime.fromMillisecondsSinceEpoch(timestamp)), ]); } @@ -188,7 +188,7 @@ primaryKey = ? final args = [ oplog.namespace, oplog.tablename, - getShadowPrimaryKey(oplog) + getShadowPrimaryKey(oplog), ]; query = Statement(selectTags, args); } else { diff --git a/packages/electricsql_cli/test/ensure_build_test.dart b/packages/electricsql_cli/test/ensure_build_test.dart index eb199f58..12cf24b4 100644 --- a/packages/electricsql_cli/test/ensure_build_test.dart +++ b/packages/electricsql_cli/test/ensure_build_test.dart @@ -7,7 +7,8 @@ import 'package:test/test.dart'; void main() { test( 'ensure_build', - () => expectBuildClean(packageRelativeDirectory: 'packages/electricsql_cli'), + () => + expectBuildClean(packageRelativeDirectory: 'packages/electricsql_cli'), timeout: const Timeout(Duration(minutes: 2)), ); } diff --git a/todos_flutter/analysis_options.yaml b/todos_flutter/analysis_options.yaml index 976112c1..ba669d6c 100644 --- a/todos_flutter/analysis_options.yaml +++ b/todos_flutter/analysis_options.yaml @@ -1,30 +1,6 @@ -# This file configures the analyzer, which statically analyzes Dart code to -# check for errors, warnings, and lints. -# -# The issues identified by the analyzer are surfaced in the UI of Dart-enabled -# IDEs (https://dart.dev/tools#ides-and-editors). The analyzer can also be -# invoked from the command line by running `flutter analyze`. - -# The following line activates a set of recommended lints for Flutter apps, -# packages, and plugins designed to encourage good coding practices. include: package:flutter_lints/flutter.yaml linter: - # The lint rules applied to this project can be customized in the - # section below to disable rules from the `package:flutter_lints/flutter.yaml` - # included above or to enable additional rules. A list of all available lints - # and their documentation is published at - # https://dart-lang.github.io/linter/lints/index.html. - # - # Instead of disabling a lint rule for the entire project in the - # section below, it can also be suppressed for a single line of code - # or a specific dart file by using the `// ignore: name_of_lint` and - # `// ignore_for_file: name_of_lint` syntax on the line or in the file - # producing the lint. rules: avoid_print: false library_private_types_in_public_api: false - # prefer_single_quotes: true # Uncomment to enable the `prefer_single_quotes` rule - -# Additional information about this file can be found at -# https://dart.dev/guides/language/analysis-options diff --git a/todos_flutter/lib/generated/electric_migrations.dart b/todos_flutter/lib/generated/electric_migrations.dart index f0624ac7..6e734a2d 100644 --- a/todos_flutter/lib/generated/electric_migrations.dart +++ b/todos_flutter/lib/generated/electric_migrations.dart @@ -3,29 +3,29 @@ import 'package:electricsql/electricsql.dart'; final kElectricMigrations = [ - Migration( - statements: [ - "CREATE TABLE \"todolist\" (\n \"id\" TEXT NOT NULL,\n \"filter\" TEXT,\n \"editing\" TEXT,\n CONSTRAINT \"todolist_pkey\" PRIMARY KEY (\"id\")\n) WITHOUT ROWID;\n", - "CREATE TABLE \"todo\" (\n \"id\" TEXT NOT NULL,\n \"listid\" TEXT,\n \"text\" TEXT,\n \"completed\" INTEGER DEFAULT 0 NOT NULL,\n CONSTRAINT \"todo_pkey\" PRIMARY KEY (\"id\")\n) WITHOUT ROWID;\n", - "\n -- Toggles for turning the triggers on and off\n INSERT OR IGNORE INTO _electric_trigger_settings(tablename,flag) VALUES ('main.todolist', 1);\n ", - "\n /* Triggers for table todolist */\n\n -- ensures primary key is immutable\n DROP TRIGGER IF EXISTS update_ensure_main_todolist_primarykey;\n ", - "\n CREATE TRIGGER update_ensure_main_todolist_primarykey\n BEFORE UPDATE ON main.todolist\n BEGIN\n SELECT\n CASE\n WHEN old.id != new.id THEN\n\t\tRAISE (ABORT, 'cannot change the value of column id as it belongs to the primary key')\n END;\n END;\n ", - "\n -- Triggers that add INSERT, UPDATE, DELETE operation to the _opslog table\n DROP TRIGGER IF EXISTS insert_main_todolist_into_oplog;\n ", - "\n CREATE TRIGGER insert_main_todolist_into_oplog\n AFTER INSERT ON main.todolist\n WHEN 1 == (SELECT flag from _electric_trigger_settings WHERE tablename == 'main.todolist')\n BEGIN\n INSERT INTO _electric_oplog (namespace, tablename, optype, primaryKey, newRow, oldRow, timestamp)\n VALUES ('main', 'todolist', 'INSERT', json_object('id', new.id), json_object('editing', new.editing, 'filter', new.filter, 'id', new.id), NULL, NULL);\n END;\n ", - "\n DROP TRIGGER IF EXISTS update_main_todolist_into_oplog;\n ", - "\n CREATE TRIGGER update_main_todolist_into_oplog\n AFTER UPDATE ON main.todolist\n WHEN 1 == (SELECT flag from _electric_trigger_settings WHERE tablename == 'main.todolist')\n BEGIN\n INSERT INTO _electric_oplog (namespace, tablename, optype, primaryKey, newRow, oldRow, timestamp)\n VALUES ('main', 'todolist', 'UPDATE', json_object('id', new.id), json_object('editing', new.editing, 'filter', new.filter, 'id', new.id), json_object('editing', old.editing, 'filter', old.filter, 'id', old.id), NULL);\n END;\n ", - "\n DROP TRIGGER IF EXISTS delete_main_todolist_into_oplog;\n ", - "\n CREATE TRIGGER delete_main_todolist_into_oplog\n AFTER DELETE ON main.todolist\n WHEN 1 == (SELECT flag from _electric_trigger_settings WHERE tablename == 'main.todolist')\n BEGIN\n INSERT INTO _electric_oplog (namespace, tablename, optype, primaryKey, newRow, oldRow, timestamp)\n VALUES ('main', 'todolist', 'DELETE', json_object('id', old.id), NULL, json_object('editing', old.editing, 'filter', old.filter, 'id', old.id), NULL);\n END;\n ", - "\n -- Toggles for turning the triggers on and off\n INSERT OR IGNORE INTO _electric_trigger_settings(tablename,flag) VALUES ('main.todo', 1);\n ", - "\n /* Triggers for table todo */\n\n -- ensures primary key is immutable\n DROP TRIGGER IF EXISTS update_ensure_main_todo_primarykey;\n ", - "\n CREATE TRIGGER update_ensure_main_todo_primarykey\n BEFORE UPDATE ON main.todo\n BEGIN\n SELECT\n CASE\n WHEN old.id != new.id THEN\n\t\tRAISE (ABORT, 'cannot change the value of column id as it belongs to the primary key')\n END;\n END;\n ", - "\n -- Triggers that add INSERT, UPDATE, DELETE operation to the _opslog table\n DROP TRIGGER IF EXISTS insert_main_todo_into_oplog;\n ", - "\n CREATE TRIGGER insert_main_todo_into_oplog\n AFTER INSERT ON main.todo\n WHEN 1 == (SELECT flag from _electric_trigger_settings WHERE tablename == 'main.todo')\n BEGIN\n INSERT INTO _electric_oplog (namespace, tablename, optype, primaryKey, newRow, oldRow, timestamp)\n VALUES ('main', 'todo', 'INSERT', json_object('id', new.id), json_object('completed', new.completed, 'id', new.id, 'listid', new.listid, 'text', new.text), NULL, NULL);\n END;\n ", - "\n DROP TRIGGER IF EXISTS update_main_todo_into_oplog;\n ", - "\n CREATE TRIGGER update_main_todo_into_oplog\n AFTER UPDATE ON main.todo\n WHEN 1 == (SELECT flag from _electric_trigger_settings WHERE tablename == 'main.todo')\n BEGIN\n INSERT INTO _electric_oplog (namespace, tablename, optype, primaryKey, newRow, oldRow, timestamp)\n VALUES ('main', 'todo', 'UPDATE', json_object('id', new.id), json_object('completed', new.completed, 'id', new.id, 'listid', new.listid, 'text', new.text), json_object('completed', old.completed, 'id', old.id, 'listid', old.listid, 'text', old.text), NULL);\n END;\n ", - "\n DROP TRIGGER IF EXISTS delete_main_todo_into_oplog;\n ", - "\n CREATE TRIGGER delete_main_todo_into_oplog\n AFTER DELETE ON main.todo\n WHEN 1 == (SELECT flag from _electric_trigger_settings WHERE tablename == 'main.todo')\n BEGIN\n INSERT INTO _electric_oplog (namespace, tablename, optype, primaryKey, newRow, oldRow, timestamp)\n VALUES ('main', 'todo', 'DELETE', json_object('id', old.id), NULL, json_object('completed', old.completed, 'id', old.id, 'listid', old.listid, 'text', old.text), NULL);\n END;\n ", - ], - version: "20230803150134_671", - ), + Migration( + statements: [ + "CREATE TABLE \"todolist\" (\n \"id\" TEXT NOT NULL,\n \"filter\" TEXT,\n \"editing\" TEXT,\n CONSTRAINT \"todolist_pkey\" PRIMARY KEY (\"id\")\n) WITHOUT ROWID;\n", + "CREATE TABLE \"todo\" (\n \"id\" TEXT NOT NULL,\n \"listid\" TEXT,\n \"text\" TEXT,\n \"completed\" INTEGER DEFAULT 0 NOT NULL,\n CONSTRAINT \"todo_pkey\" PRIMARY KEY (\"id\")\n) WITHOUT ROWID;\n", + "\n -- Toggles for turning the triggers on and off\n INSERT OR IGNORE INTO _electric_trigger_settings(tablename,flag) VALUES ('main.todolist', 1);\n ", + "\n /* Triggers for table todolist */\n\n -- ensures primary key is immutable\n DROP TRIGGER IF EXISTS update_ensure_main_todolist_primarykey;\n ", + "\n CREATE TRIGGER update_ensure_main_todolist_primarykey\n BEFORE UPDATE ON main.todolist\n BEGIN\n SELECT\n CASE\n WHEN old.id != new.id THEN\n\t\tRAISE (ABORT, 'cannot change the value of column id as it belongs to the primary key')\n END;\n END;\n ", + "\n -- Triggers that add INSERT, UPDATE, DELETE operation to the _opslog table\n DROP TRIGGER IF EXISTS insert_main_todolist_into_oplog;\n ", + "\n CREATE TRIGGER insert_main_todolist_into_oplog\n AFTER INSERT ON main.todolist\n WHEN 1 == (SELECT flag from _electric_trigger_settings WHERE tablename == 'main.todolist')\n BEGIN\n INSERT INTO _electric_oplog (namespace, tablename, optype, primaryKey, newRow, oldRow, timestamp)\n VALUES ('main', 'todolist', 'INSERT', json_object('id', new.id), json_object('editing', new.editing, 'filter', new.filter, 'id', new.id), NULL, NULL);\n END;\n ", + "\n DROP TRIGGER IF EXISTS update_main_todolist_into_oplog;\n ", + "\n CREATE TRIGGER update_main_todolist_into_oplog\n AFTER UPDATE ON main.todolist\n WHEN 1 == (SELECT flag from _electric_trigger_settings WHERE tablename == 'main.todolist')\n BEGIN\n INSERT INTO _electric_oplog (namespace, tablename, optype, primaryKey, newRow, oldRow, timestamp)\n VALUES ('main', 'todolist', 'UPDATE', json_object('id', new.id), json_object('editing', new.editing, 'filter', new.filter, 'id', new.id), json_object('editing', old.editing, 'filter', old.filter, 'id', old.id), NULL);\n END;\n ", + "\n DROP TRIGGER IF EXISTS delete_main_todolist_into_oplog;\n ", + "\n CREATE TRIGGER delete_main_todolist_into_oplog\n AFTER DELETE ON main.todolist\n WHEN 1 == (SELECT flag from _electric_trigger_settings WHERE tablename == 'main.todolist')\n BEGIN\n INSERT INTO _electric_oplog (namespace, tablename, optype, primaryKey, newRow, oldRow, timestamp)\n VALUES ('main', 'todolist', 'DELETE', json_object('id', old.id), NULL, json_object('editing', old.editing, 'filter', old.filter, 'id', old.id), NULL);\n END;\n ", + "\n -- Toggles for turning the triggers on and off\n INSERT OR IGNORE INTO _electric_trigger_settings(tablename,flag) VALUES ('main.todo', 1);\n ", + "\n /* Triggers for table todo */\n\n -- ensures primary key is immutable\n DROP TRIGGER IF EXISTS update_ensure_main_todo_primarykey;\n ", + "\n CREATE TRIGGER update_ensure_main_todo_primarykey\n BEFORE UPDATE ON main.todo\n BEGIN\n SELECT\n CASE\n WHEN old.id != new.id THEN\n\t\tRAISE (ABORT, 'cannot change the value of column id as it belongs to the primary key')\n END;\n END;\n ", + "\n -- Triggers that add INSERT, UPDATE, DELETE operation to the _opslog table\n DROP TRIGGER IF EXISTS insert_main_todo_into_oplog;\n ", + "\n CREATE TRIGGER insert_main_todo_into_oplog\n AFTER INSERT ON main.todo\n WHEN 1 == (SELECT flag from _electric_trigger_settings WHERE tablename == 'main.todo')\n BEGIN\n INSERT INTO _electric_oplog (namespace, tablename, optype, primaryKey, newRow, oldRow, timestamp)\n VALUES ('main', 'todo', 'INSERT', json_object('id', new.id), json_object('completed', new.completed, 'id', new.id, 'listid', new.listid, 'text', new.text), NULL, NULL);\n END;\n ", + "\n DROP TRIGGER IF EXISTS update_main_todo_into_oplog;\n ", + "\n CREATE TRIGGER update_main_todo_into_oplog\n AFTER UPDATE ON main.todo\n WHEN 1 == (SELECT flag from _electric_trigger_settings WHERE tablename == 'main.todo')\n BEGIN\n INSERT INTO _electric_oplog (namespace, tablename, optype, primaryKey, newRow, oldRow, timestamp)\n VALUES ('main', 'todo', 'UPDATE', json_object('id', new.id), json_object('completed', new.completed, 'id', new.id, 'listid', new.listid, 'text', new.text), json_object('completed', old.completed, 'id', old.id, 'listid', old.listid, 'text', old.text), NULL);\n END;\n ", + "\n DROP TRIGGER IF EXISTS delete_main_todo_into_oplog;\n ", + "\n CREATE TRIGGER delete_main_todo_into_oplog\n AFTER DELETE ON main.todo\n WHEN 1 == (SELECT flag from _electric_trigger_settings WHERE tablename == 'main.todo')\n BEGIN\n INSERT INTO _electric_oplog (namespace, tablename, optype, primaryKey, newRow, oldRow, timestamp)\n VALUES ('main', 'todo', 'DELETE', json_object('id', old.id), NULL, json_object('completed', old.completed, 'id', old.id, 'listid', old.listid, 'text', old.text), NULL);\n END;\n ", + ], + version: "20230824162513_622", + ), ]; diff --git a/todos_flutter/lib/main.dart b/todos_flutter/lib/main.dart index 43110ba5..f6def118 100644 --- a/todos_flutter/lib/main.dart +++ b/todos_flutter/lib/main.dart @@ -188,8 +188,6 @@ class _DeleteDbButton extends StatelessWidget { onPressed: () async { await impl.deleteTodosDbFile(); - // TODO: False positive remove in Dart 3.1.0 - // ignore: use_build_context_synchronously if (!context.mounted) return; ScaffoldMessenger.of(context).showSnackBar( diff --git a/todos_flutter/pubspec.lock b/todos_flutter/pubspec.lock index d3ffaf1c..7864089e 100644 --- a/todos_flutter/pubspec.lock +++ b/todos_flutter/pubspec.lock @@ -197,10 +197,10 @@ packages: dependency: transitive description: name: collection - sha256: "4a07be6cb69c84d677a6c3096fcf960cc3285a8330b4603e0d463d15d9bd934c" + sha256: f092b211a4319e98e5ff58223576de6c2803db36221657b46c82574721240687 url: "https://pub.dev" source: hosted - version: "1.17.1" + version: "1.17.2" convert: dependency: transitive description: @@ -278,7 +278,7 @@ packages: path: "../packages/electricsql_cli" relative: true source: path - version: "0.1.0+1" + version: "0.1.0+2" electricsql_flutter: dependency: "direct main" description: @@ -368,14 +368,6 @@ packages: description: flutter source: sdk version: "0.0.0" - fpdart: - dependency: transitive - description: - name: fpdart - sha256: b254aef83353a12e355df6f1231d9b646638bb9493b3f2a2c4800961f7e8bf0b - url: "https://pub.dev" - source: hosted - version: "1.0.0" frontend_server_client: dependency: transitive description: @@ -492,18 +484,18 @@ packages: dependency: transitive description: name: matcher - sha256: "6501fbd55da300384b768785b83e5ce66991266cec21af89ab9ae7f5ce1c4cbb" + sha256: "1803e76e6653768d64ed8ff2e1e67bea3ad4b923eb5c56a295c3e634bad5960e" url: "https://pub.dev" source: hosted - version: "0.12.15" + version: "0.12.16" material_color_utilities: dependency: transitive description: name: material_color_utilities - sha256: d92141dc6fe1dad30722f9aa826c7fbc896d021d792f80678280601aff8cf724 + sha256: "9528f2f296073ff54cb9fee677df673ace1218163c3bc7628093e7eed5203d41" url: "https://pub.dev" source: hosted - version: "0.2.0" + version: "0.5.0" material_design_icons_flutter: dependency: "direct main" description: @@ -721,10 +713,10 @@ packages: dependency: transitive description: name: source_span - sha256: dd904f795d4b4f3b870833847c461801f6750a9fa8e61ea5ac53f9422b31f250 + sha256: "53e943d4206a5e30df338fd4c6e7a077e02254531b138a15aec3bd143c1a8b3c" url: "https://pub.dev" source: hosted - version: "1.9.1" + version: "1.10.0" sqlite3: dependency: "direct main" description: @@ -809,10 +801,10 @@ packages: dependency: transitive description: name: test_api - sha256: eb6ac1540b26de412b3403a163d919ba86f6a973fe6cc50ae3541b80092fdcfb + sha256: "75760ffd7786fffdfb9597c35c5b27eaeec82be8edfb6d71d32651128ed7aab8" url: "https://pub.dev" source: hosted - version: "0.5.1" + version: "0.6.0" timing: dependency: transitive description: @@ -853,6 +845,14 @@ packages: url: "https://pub.dev" source: hosted version: "1.1.0" + web: + dependency: transitive + description: + name: web + sha256: dc8ccd225a2005c1be616fe02951e2e342092edf968cf0844220383757ef8f10 + url: "https://pub.dev" + source: hosted + version: "0.1.4-beta" web_socket_channel: dependency: transitive description: @@ -886,5 +886,5 @@ packages: source: hosted version: "3.1.2" sdks: - dart: ">=3.0.0 <4.0.0" + dart: ">=3.1.0-185.0.dev <4.0.0" flutter: ">=3.10.0"