Skip to content

Commit

Permalink
fix: prevent deadlock when starting replicator during transaction (#470)
Browse files Browse the repository at this point in the history
  • Loading branch information
blaugold committed Mar 21, 2023
1 parent d381eb4 commit f142752
Show file tree
Hide file tree
Showing 11 changed files with 177 additions and 77 deletions.
64 changes: 36 additions & 28 deletions packages/cbl/lib/src/database/database_base.dart
Expand Up @@ -47,7 +47,8 @@ mixin DatabaseBase<T extends DocumentDelegate> implements Database {
/// The same note as for [dictKeys] applies here.
SharedKeysTable get sharedKeysTable;

final _asyncTransactionLock = Lock();
/// Lock under which asynchronous transactions are executed.
final asyncTransactionLock = Lock();

/// Creates a [DocumentDelegate] from [oldDelegate] for a new document which
/// is being used with this database for the first time.
Expand Down Expand Up @@ -192,6 +193,9 @@ mixin DatabaseBase<T extends DocumentDelegate> implements Database {
});
}

/// Whether the current transaction belongs to this database, if one exists.
bool get ownsCurrentTransaction => _Transaction.current?.database == this;

/// Method to implement by by [Database] implementations to begin a new
/// transaction.
FutureOr<void> beginTransaction();
Expand Down Expand Up @@ -233,7 +237,7 @@ mixin DatabaseBase<T extends DocumentDelegate> implements Database {
bool requiresNewTransaction = false,
required bool async,
}) {
final currentTransaction = Zone.current[#_transaction] as _Transaction?;
final currentTransaction = _Transaction.current;
if (currentTransaction != null) {
if (requiresNewTransaction) {
throw DatabaseException(
Expand All @@ -251,7 +255,7 @@ mixin DatabaseBase<T extends DocumentDelegate> implements Database {
return fn();
}

if (!async && _asyncTransactionLock.locked) {
if (!async && asyncTransactionLock.locked) {
throw DatabaseException(
'Cannot start a new synchronous transaction while an asynchronous '
'transaction is still active.',
Expand All @@ -261,51 +265,55 @@ mixin DatabaseBase<T extends DocumentDelegate> implements Database {

final transaction = _Transaction(this);

FutureOr<R> invokeFn() =>
runZoned(fn, zoneValues: {#_transaction: transaction});

return beginTransaction().then((_) {
if (async) {
return _asyncTransactionLock.synchronized(() async {
try {
final result = await invokeFn();
transaction.end();
await endTransaction(commit: true);
return result;
// ignore: avoid_catches_without_on_clauses
} catch (e) {
transaction.end();
await endTransaction(commit: false);
rethrow;
}
});
} else {
if (async) {
return asyncTransactionLock.synchronized(() async {
await beginTransaction();
try {
final result = invokeFn();
final result = await transaction.runWith(fn);
transaction.end();
final endTransactionResult = endTransaction(commit: true);
assert(endTransactionResult is! Future);
await endTransaction(commit: true);
return result;
// ignore: avoid_catches_without_on_clauses
} catch (e) {
transaction.end();
final endTransactionResult = endTransaction(commit: false);
assert(endTransactionResult is! Future);
await endTransaction(commit: false);
rethrow;
}
});
} else {
final startTransactionResult = beginTransaction();
assert(startTransactionResult is! Future);
try {
final result = transaction.runWith(fn);
transaction.end();
final endTransactionResult = endTransaction(commit: true);
assert(endTransactionResult is! Future);
return result;
// ignore: avoid_catches_without_on_clauses
} catch (e) {
transaction.end();
final endTransactionResult = endTransaction(commit: false);
assert(endTransactionResult is! Future);
rethrow;
}
});
}
}
}

class _Transaction {
_Transaction(this.database);

static _Transaction? get current =>
Zone.current[#_transaction] as _Transaction?;

final Database database;

bool get isActive => _isActive;
var _isActive = true;

T runWith<T>(T Function() fn) =>
runZoned(fn, zoneValues: {#_transaction: this});

void end() {
_isActive = false;
}
Expand Down
9 changes: 9 additions & 0 deletions packages/cbl/lib/src/replication/ffi_replicator.dart
Expand Up @@ -8,6 +8,7 @@ import '../database.dart';
import '../database/ffi_database.dart';
import '../document/document.dart';
import '../document/ffi_document.dart';
import '../errors.dart';
import '../fleece/containers.dart' as fl;
import '../support/async_callback.dart';
import '../support/edition.dart';
Expand Down Expand Up @@ -163,6 +164,14 @@ class FfiReplicator

@override
void start({bool reset = false}) => useSync(() {
if (_database.ownsCurrentTransaction) {
throw DatabaseException(
'A replicator cannot be started from within a database '
'transaction.',
DatabaseErrorCode.transactionNotClosed,
);
}

if (_isStarted) {
return;
}
Expand Down
44 changes: 30 additions & 14 deletions packages/cbl/lib/src/replication/proxy_replicator.dart
Expand Up @@ -6,6 +6,7 @@ import '../database.dart';
import '../database/proxy_database.dart';
import '../document/document.dart';
import '../document/proxy_document.dart';
import '../errors.dart';
import '../service/cbl_service.dart';
import '../service/cbl_service_api.dart';
import '../service/proxy_object.dart';
Expand All @@ -27,14 +28,15 @@ class ProxyReplicator extends ProxyObject
with ClosableResourceMixin
implements AsyncReplicator {
ProxyReplicator({
required this.database,
required ProxyDatabase database,
required int objectId,
required ReplicatorConfiguration config,
required void Function() unregisterCallbacks,
}) : assert(database == config.database),
}) : _database = database,
assert(database == config.database),
_config = ReplicatorConfiguration.from(config),
super(database.channel, objectId, proxyFinalizer: unregisterCallbacks) {
attachTo(database);
attachTo(_database);
}

static Future<ProxyReplicator> create(
Expand Down Expand Up @@ -110,7 +112,7 @@ class ProxyReplicator extends ProxyObject
}
}

final ProxyDatabase database;
final ProxyDatabase _database;

late final _listenerTokens = ListenerTokenRegistry(this);

Expand All @@ -125,11 +127,25 @@ class ProxyReplicator extends ProxyObject
)));

@override
Future<void> start({bool reset = false}) =>
use(() => channel.call(StartReplicator(
replicatorId: objectId,
reset: reset,
)));
// ignore: prefer_expression_function_bodies
Future<void> start({bool reset = false}) => use(() {
if (_database.ownsCurrentTransaction) {
throw DatabaseException(
'A replicator cannot be started from within a database '
'transaction.',
DatabaseErrorCode.transactionNotClosed,
);
}

// Starting a replicator while the database has an active transaction
// causes a deadlock. To avoid this, we synchronize the start call with
// the database's transaction lock.
return _database.asyncTransactionLock
.synchronized(() => channel.call(StartReplicator(
replicatorId: objectId,
reset: reset,
)));
});

@override
Future<void> stop() => use(_stop);
Expand All @@ -147,7 +163,7 @@ class ProxyReplicator extends ProxyObject
ReplicatorChangeListener listener) async {
late final ProxyListenerToken<ReplicatorChange> token;
final listenerId =
database.client.registerReplicatorChangeListener((status) {
_database.client.registerReplicatorChangeListener((status) {
token.callListener(ReplicatorChangeImpl(this, status));
});

Expand All @@ -157,7 +173,7 @@ class ProxyReplicator extends ProxyObject
));

return token =
ProxyListenerToken(database.client, this, listenerId, listener);
ProxyListenerToken(_database.client, this, listenerId, listener);
}

@override
Expand All @@ -174,7 +190,7 @@ class ProxyReplicator extends ProxyObject
) async {
late final ProxyListenerToken<DocumentReplication> token;
final listenerId =
database.client.registerDocumentReplicationListener((event) {
_database.client.registerDocumentReplicationListener((event) {
token.callListener(DocumentReplicationImpl(
this,
event.isPush,
Expand All @@ -188,7 +204,7 @@ class ProxyReplicator extends ProxyObject
));

return token =
ProxyListenerToken(database.client, this, listenerId, listener);
ProxyListenerToken(_database.client, this, listenerId, listener);
}

@override
Expand Down Expand Up @@ -227,7 +243,7 @@ class ProxyReplicator extends ProxyObject
String toString() => [
'ProxyReplicator(',
[
'database: $database',
'database: $_database',
'type: ${describeEnum(config.replicatorType)}',
if (config.continuous) 'CONTINUOUS'
].join(', '),
Expand Down
Expand Up @@ -42,7 +42,7 @@ void main() {
);

// Open it again without key.
expect(openSyncTestDatabase, throwsNotADatabaseFileError);
expect(openSyncTestDatabase, throwsNotADatabaseFile);
});
});

Expand All @@ -68,7 +68,7 @@ void main() {
);

// Open it again without key.
expect(openSyncTestDatabase, throwsNotADatabaseFileError);
expect(openSyncTestDatabase, throwsNotADatabaseFile);
});
});

Expand Down
6 changes: 3 additions & 3 deletions packages/cbl_e2e_tests/lib/src/database/database_test.dart
Expand Up @@ -162,7 +162,7 @@ void main() {

await db.changeEncryptionKey(key);

expect(Future(openTestDatabase), throwsNotADatabaseFileError);
expect(Future(openTestDatabase), throwsNotADatabaseFile);
});

apiTest('changeEncryptionKey: decrypt database', () async {
Expand All @@ -176,7 +176,7 @@ void main() {

await expectLater(
Future(openTestDatabase),
throwsNotADatabaseFileError,
throwsNotADatabaseFile,
);

await db.changeEncryptionKey(null);
Expand All @@ -203,7 +203,7 @@ void main() {
encryptionKey: keyA,
),
)),
throwsNotADatabaseFileError,
throwsNotADatabaseFile,
);
});

Expand Down
57 changes: 57 additions & 0 deletions packages/cbl_e2e_tests/lib/src/replication/replicator_test.dart
Expand Up @@ -8,6 +8,7 @@ import '../../test_binding_impl.dart';
import '../test_binding.dart';
import '../utils/api_variant.dart';
import '../utils/database_utils.dart';
import '../utils/matchers.dart';
import '../utils/replicator_utils.dart';
import '../utils/test_document.dart';

Expand Down Expand Up @@ -703,6 +704,62 @@ void main() {
);
},
);

test(
'supports starting replicator while async document save',
() async {
Database.log.console.level = LogLevel.debug;
final db = await openAsyncTestDatabase();
final repl = await db.createTestReplicator();

final documentSave = db.saveDocument(MutableDocument());
final replStart = repl.start();

await documentSave;
await replStart;
},
);

test(
'supports starting replicator while async transaction is open',
() async {
final db = await openAsyncTestDatabase();
final repl = await db.createTestReplicator();

final transactionWork = Completer<void>();
final transaction = db.inBatch(() => transactionWork.future);
final replStart = repl.start();

transactionWork.complete();
await transaction;
await replStart;
},
);

apiTest(
'throws when starting a replicator from within a transaction',
() async {
final db = await openTestDatabase();
final repl = await db.createTestReplicator();

final exceptionMatcher = throwsA(isDatabaseException
.havingCode(DatabaseErrorCode.transactionNotClosed)
.havingMessage(
'A replicator cannot be started from within a database '
'transaction.',
));

if (db is SyncDatabase) {
db.inBatchSync(() {
expect(repl.start, exceptionMatcher);
});
} else {
await db.inBatch(() async {
expect(repl.start, exceptionMatcher);
});
}
},
);
});
}

Expand Down
18 changes: 12 additions & 6 deletions packages/cbl_e2e_tests/lib/src/utils/matchers.dart
Expand Up @@ -176,12 +176,18 @@ class _Equality extends Matcher {

/// === Errors =================================================================
final throwsNotADatabaseFileError = throwsA(
isA<DatabaseException>().having(
(it) => it.code,
'code',
DatabaseErrorCode.notADatabaseFile,
),
final isDatabaseException = isA<DatabaseException>();

extension DatabaseExceptionMatcherExt on test.TypeMatcher<DatabaseException> {
test.TypeMatcher<DatabaseException> havingMessage(String message) =>
having((it) => it.message, 'message', message);

test.TypeMatcher<DatabaseException> havingCode(DatabaseErrorCode code) =>
having((it) => it.code, 'code', code);
}

final throwsNotADatabaseFile = throwsA(
isDatabaseException.havingCode(DatabaseErrorCode.notADatabaseFile),
);

final isTypedDataException = isA<TypedDataException>();
Expand Down

0 comments on commit f142752

Please sign in to comment.