Skip to content

Commit

Permalink
feat: make remote stack traces available separately instead of throwi…
Browse files Browse the repository at this point in the history
…ng with them

Closes #272
  • Loading branch information
blaugold committed Jan 26, 2022
1 parent 0ebc362 commit cfce0de
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 50 deletions.
9 changes: 9 additions & 0 deletions packages/cbl/lib/src/errors.dart
@@ -1,3 +1,4 @@
import 'service/channel.dart' as channel;
import 'support/utils.dart';

/// Base class for custom exceptions in the `cbl` package.
Expand All @@ -8,6 +9,14 @@ abstract class CouchbaseLiteException implements Exception {
/// An optional error code specifies the cause of this exception.
Object? get code => null;

/// The [StackTrace] from the execution context where this exception
/// originated from.
///
/// When an exception originates in a different execution context than the
/// current one, this property contains the original stack trace, if it is
/// available.
StackTrace? get remoteStackTrace => channel.remoteStackTrace(this);

String get _typeName;

@override
Expand Down
146 changes: 96 additions & 50 deletions packages/cbl/lib/src/service/channel.dart
Expand Up @@ -43,6 +43,21 @@ typedef MessageContextRestorer = void Function(
void Function() restore,
);

/// Returns the remote [StackTrace] for an exception that was emitted by a
/// [Channel].
///
/// When a channel emits an exception, it has the stack trace of the point
/// where the channel was used. This function returns the stack trace of the
/// exception when it was thrown on the other end of the channel.
StackTrace? remoteStackTrace(Object exception) {
if (!_isValidExpandoKey(exception)) {
return null;
}
return _remoteStackTraceExpando[exception];
}

final _remoteStackTraceExpando = Expando<StackTrace>();

/// The status of a [Channel].
enum ChannelStatus {
initial,
Expand Down Expand Up @@ -110,28 +125,38 @@ class Channel {
ChannelStatus get status => _status;

final _callHandlers = <Type, _UntypedCallHandler>{};
final _callCompleter = <int, Completer>{};
final _callCompleter = <int, Completer<_Message>>{};

final _streamHandlers = <Type, _UntypedStreamHandler>{};
final _streamControllers = <int, StreamController>{};
final _streamControllers = <int, StreamController<_Message>>{};
final _streamSubscriptions = <int, StreamSubscription>{};

/// Makes a call to an endpoint at other side of the channel.
Future<R> call<R>(Request<R> request) {
Future<R> call<R>(Request<R> request) async {
_checkIsOpen();
final id = _generateConversationId();
final completer = _callCompleter[id] = Completer<Object?>();
final completer = _callCompleter[id] = Completer<_Message>();
_sendMessage(_CallRequest(id, request, _captureMessageContext()));
return completer.future.then(_checkType);

final message = await completer.future;
if (message is _CallSuccess) {
return message.data as R;
}
if (message is _CallError) {
// ignore: only_throw_errors
throw message.error;
}

throw StateError('Unexpected message: $message');
}

/// Returns a stream for an endpoint at the other side of the channel.
Stream<R> stream<R>(Request<R> request) {
_checkIsOpen();
final id = _generateConversationId();
// ignore: close_sinks
late StreamController<Object?> controller;
controller = StreamController<Object?>(
late StreamController<_Message> controller;
controller = StreamController<_Message>(
onListen: () {
_streamControllers[id] = controller;
_sendMessage(_ListenToStream(id, request, _captureMessageContext()));
Expand All @@ -144,7 +169,17 @@ class Channel {
},
);

return controller.stream.map(_checkType);
return controller.stream.map((message) {
if (message is _StreamData) {
return message.data as R;
}
if (message is _StreamError) {
// ignore: only_throw_errors
throw message.error;
}

throw StateError('Unexpected message: $message');
});
}

/// Adds a call endpoint to this side of the channel.
Expand Down Expand Up @@ -204,12 +239,24 @@ class Channel {
}

_restoreMessageContext(message.context, () {
// Associate the remote stack trace with the returned error.
// This can only be done for error that are not of a primitive type,
// which should be the case usually.
if (message is _ErrorMessage) {
final error = message.error;
if (_isValidExpandoKey(error)) {
final stackTrace = message.stackTrace;
if (stackTrace != null) {
_remoteStackTraceExpando[error] = stackTrace;
}
}
}

// Handle the message.
if (message is _CallRequest) {
_handleCallRequest(message);
} else if (message is _CallSuccess) {
_handleCallSuccessResponse(message);
} else if (message is _CallError) {
_handleCallErrorResponse(message);
} else if (message is _CallSuccess || message is _CallError) {
_handleCallResponse(message);
} else if (message is _ListenToStream) {
_handleListenToStream(message);
} else if (message is _PauseStream) {
Expand All @@ -218,10 +265,8 @@ class Channel {
_handleResumeStream(message);
} else if (message is _CancelStream) {
_handleCancelStream(message);
} else if (message is _StreamEvent) {
} else if (message is _StreamData || message is _StreamError) {
_handleStreamEvent(message);
} else if (message is _StreamError) {
_handleStreamError(message);
} else if (message is _StreamDone) {
_handleStreamDone(message);
}
Expand Down Expand Up @@ -292,13 +337,8 @@ class Channel {
);
}

void _handleCallSuccessResponse(_CallSuccess message) =>
_takeCallCompleter(message)?.complete(message.result);

void _handleCallErrorResponse(_CallError message) {
_takeCallCompleter(message)
?.completeError(message.error, message.stackTrace);
}
void _handleCallResponse(_Message message) =>
_takeCallCompleter(message)?.complete(message);

void _handleListenToStream(_ListenToStream message) {
final handler = _getStreamHandler(message);
Expand All @@ -312,7 +352,7 @@ class Channel {
.asyncExpand((stream) => stream)
.listen(
(event) {
_sendStreamEvent(message.conversationId, event);
_sendStreamData(message.conversationId, event);
},
// ignore: avoid_types_on_closure_parameters
onError: (Object error, StackTrace stackTrace) =>
Expand All @@ -330,11 +370,8 @@ class Channel {
void _handleCancelStream(_CancelStream message) =>
_takeStreamSubscription(message)?.cancel();

void _handleStreamEvent(_StreamEvent message) =>
_getStreamController(message)?.add(message.result);

void _handleStreamError(_StreamError message) => _getStreamController(message)
?.addError(message.error, message.stackTrace);
void _handleStreamEvent(_Message message) =>
_getStreamController(message)?.add(message);

void _handleStreamDone(_StreamDone message) {
_getStreamController(message)?.close();
Expand Down Expand Up @@ -415,15 +452,15 @@ class Channel {
if (message is _CallRequest) {
print('-> ${message.request.runtimeType}');
} else if (message is _CallSuccess) {
print('<- ${message.result?.runtimeType}');
print('<- ${message.data?.runtimeType}');
} else if (message is _CallError) {
print('!- ${message.error}');
} else if (message is _ListenToStream) {
print('=> ${message.request.runtimeType}');
} else if (message is _CancelStream) {
print('=|');
} else if (message is _StreamEvent) {
print('=> ${message.result?.runtimeType}');
} else if (message is _StreamData) {
print('=> ${message.data?.runtimeType}');
} else if (message is _StreamDone) {
print('|=');
} else if (message is _StreamError) {
Expand All @@ -434,10 +471,10 @@ class Channel {
_transport.sink.add(message);
}

void _sendCallSuccess(int conversationId, Object? result) =>
void _sendCallSuccess(int conversationId, Object? data) =>
_sendMessage(_CallSuccess(
conversationId,
result,
data,
_captureMessageContext(),
));

Expand All @@ -453,10 +490,10 @@ class Channel {
_captureMessageContext(),
));

void _sendStreamEvent(int conversationId, Object? result) =>
_sendMessage(_StreamEvent(
void _sendStreamData(int conversationId, Object? data) =>
_sendMessage(_StreamData(
conversationId,
result,
data,
_captureMessageContext(),
));

Expand Down Expand Up @@ -551,7 +588,7 @@ SerializationRegistry channelSerializationRegistry() => SerializationRegistry()
.._addProtocolMessage('PauseStream', _PauseStream.deserialize)
.._addProtocolMessage('ResumeStream', _ResumeStream.deserialize)
.._addProtocolMessage('CancelStream', _CancelStream.deserialize)
.._addProtocolMessage('StreamEvent', _StreamEvent.deserialize)
.._addProtocolMessage('StreamData', _StreamData.deserialize)
.._addProtocolMessage('StreamError', _StreamError.deserialize)
.._addProtocolMessage('StreamDone', _StreamDone.deserialize);

Expand Down Expand Up @@ -604,20 +641,20 @@ abstract class _RequestMessage extends _Message {
abstract class _SuccessMessage extends _Message {
_SuccessMessage(
int conversationId,
this.result,
this.data,
Object? context,
) : super(conversationId, context);

_SuccessMessage.deserialize(StringMap map, SerializationContext context)
: result = context.deserializePolymorphic(map['result']),
: data = context.deserializePolymorphic(map['data']),
super.deserialize(map);

final Object? result;
final Object? data;

@override
StringMap serialize(SerializationContext context) => {
...super.serialize(context),
'result': context.serializePolymorphic(result),
'data': context.serializePolymorphic(data),
};
}

Expand Down Expand Up @@ -667,8 +704,8 @@ class _CallRequest extends _RequestMessage {
}

class _CallSuccess extends _SuccessMessage {
_CallSuccess(int conversationId, Object? result, Object? context)
: super(conversationId, result, context);
_CallSuccess(int conversationId, Object? data, Object? context)
: super(conversationId, data, context);

_CallSuccess._fromJson(StringMap map, SerializationContext context)
: super.deserialize(map, context);
Expand Down Expand Up @@ -746,16 +783,15 @@ class _CancelStream extends _Message {
_CancelStream._fromJson(map);
}

class _StreamEvent extends _SuccessMessage {
_StreamEvent(int conversationId, Object? response, Object? context)
: super(conversationId, response, context);
class _StreamData extends _SuccessMessage {
_StreamData(int conversationId, Object? data, Object? context)
: super(conversationId, data, context);

_StreamEvent._fromJson(StringMap map, SerializationContext context)
_StreamData._fromJson(StringMap map, SerializationContext context)
: super.deserialize(map, context);

static _StreamEvent deserialize(
StringMap map, SerializationContext context) =>
_StreamEvent._fromJson(map, context);
static _StreamData deserialize(StringMap map, SerializationContext context) =>
_StreamData._fromJson(map, context);
}

class _StreamError extends _ErrorMessage {
Expand Down Expand Up @@ -783,3 +819,13 @@ class _StreamDone extends _Message {
static _StreamDone deserialize(StringMap map, SerializationContext context) =>
_StreamDone._fromJson(map);
}

bool _isValidExpandoKey(Object? value) {
// `dart:ffi` values are not allowed as expando keys, too, but they cannot
// be sent over a channel anyway.
if (value == null || value is String || value is num || value is bool) {
return false;
}

return true;
}

0 comments on commit cfce0de

Please sign in to comment.