Skip to content

Commit

Permalink
feat: make Channel streams pausable
Browse files Browse the repository at this point in the history
  • Loading branch information
blaugold committed Nov 1, 2021
1 parent dc5c49d commit 1453997
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 12 deletions.
64 changes: 54 additions & 10 deletions packages/cbl/lib/src/service/channel.dart
Expand Up @@ -107,6 +107,8 @@ class Channel {
_streamControllers[id] = controller;
_sendMessage(_ListenToStream(id, request));
},
onPause: () => _sendMessage(_PauseStream(id)),
onResume: () => _sendMessage(_ResumeStream(id)),
onCancel: () {
_streamControllers.remove(id);
_sendMessage(_CancelStream(id));
Expand Down Expand Up @@ -180,6 +182,10 @@ class Channel {
_handleCallErrorResponse(message);
} else if (message is _ListenToStream) {
_handleListenToStream(message);
} else if (message is _PauseStream) {
_handlePauseStream(message);
} else if (message is _ResumeStream) {
_handleResumeStream(message);
} else if (message is _CancelStream) {
_handleCancelStream(message);
} else if (message is _StreamEvent) {
Expand Down Expand Up @@ -284,14 +290,14 @@ class Channel {
);
}

void _handleCancelStream(_CancelStream message) {
final subscription = _takeStreamSubscription(message);
if (subscription == null) {
return;
}
void _handlePauseStream(_PauseStream message) =>
_getStreamSubscription(message)?.pause();

subscription.cancel();
}
void _handleResumeStream(_ResumeStream message) =>
_getStreamSubscription(message)?.resume();

void _handleCancelStream(_CancelStream message) =>
_takeStreamSubscription(message)?.cancel();

void _handleStreamEvent(_StreamEvent message) =>
_getStreamController(message)?.add(message.result);
Expand Down Expand Up @@ -354,6 +360,12 @@ class Channel {
// the event is ignored.
_streamControllers[message.conversationId];

StreamSubscription? _getStreamSubscription(_Message message) =>
// It's possible that a stream never created a subscription, for example
// when the request could not be deserialized. In those cases the
// event is ignored.
_streamSubscriptions[message.conversationId];

StreamSubscription? _takeStreamSubscription(_Message message) =>
// It's possible that a stream never created a subscription, for example
// when the request could not be deserialized. In those cases the
Expand Down Expand Up @@ -485,6 +497,8 @@ SerializationRegistry channelSerializationRegistry() => SerializationRegistry()
.._addProtocolMessage('CallSuccess', _CallSuccess.deserialize)
.._addProtocolMessage('CallError', _CallError.deserialize)
.._addProtocolMessage('ListenToStream', _ListenToStream.deserialize)
.._addProtocolMessage('PauseStream', _PauseStream.deserialize)
.._addProtocolMessage('ResumeStream', _ResumeStream.deserialize)
.._addProtocolMessage('CancelStream', _CancelStream.deserialize)
.._addProtocolMessage('StreamEvent', _StreamEvent.deserialize)
.._addProtocolMessage('StreamError', _StreamError.deserialize)
Expand Down Expand Up @@ -579,7 +593,9 @@ class _CallRequest extends _RequestMessage {
: super.deserialize(map, context);

static _CallRequest deserialize(
StringMap map, SerializationContext context) =>
StringMap map,
SerializationContext context,
) =>
_CallRequest._fromJson(map, context);
}

Expand All @@ -591,7 +607,9 @@ class _CallSuccess extends _SuccessMessage {
: super.deserialize(map, context);

static _CallSuccess deserialize(
StringMap map, SerializationContext context) =>
StringMap map,
SerializationContext context,
) =>
_CallSuccess._fromJson(map, context);
}

Expand All @@ -614,10 +632,36 @@ class _ListenToStream extends _RequestMessage {
: super.deserialize(map, context);

static _ListenToStream deserialize(
StringMap map, SerializationContext context) =>
StringMap map,
SerializationContext context,
) =>
_ListenToStream._fromJson(map, context);
}

class _PauseStream extends _Message {
_PauseStream(int conversationId) : super(conversationId);

_PauseStream._fromJson(StringMap map) : super.deserialize(map);

static _PauseStream deserialize(
StringMap map,
SerializationContext context,
) =>
_PauseStream._fromJson(map);
}

class _ResumeStream extends _Message {
_ResumeStream(int conversationId) : super(conversationId);

_ResumeStream._fromJson(StringMap map) : super.deserialize(map);

static _ResumeStream deserialize(
StringMap map,
SerializationContext context,
) =>
_ResumeStream._fromJson(map);
}

class _CancelStream extends _Message {
_CancelStream(int conversationId) : super(conversationId);

Expand Down
43 changes: 41 additions & 2 deletions packages/cbl_e2e_tests/lib/src/service/channel_test.dart
Expand Up @@ -126,6 +126,43 @@ void main() {
]),
);
});

channelTest('pause and resume stream', () async {
final channel = await openTestChannel();

var isPaused = false;
var events = 0;
final sub = channel.stream(InfiniteStream()).listen((event) {
expect(isPaused, isFalse);
events++;
});

// Verify that the stream is emitting events.
while (events <= 0) {
await Future<void>.delayed(InfiniteStream.interval);
}

// Pause the stream.
sub.pause();
await Future<void>.delayed(InfiniteStream.interval * 2);
isPaused = true;

// Wait a view intervals while stream is paused to verify that the stream
// is paused.
await Future<void>.delayed(InfiniteStream.interval * 10);

// Resume the stream
isPaused = false;
events = 0;
sub.resume();

// Verify that the stream is emitting events again.
while (events <= 0) {
await Future<void>.delayed(InfiniteStream.interval);
}

await sub.cancel();
});
});
}

Expand Down Expand Up @@ -230,8 +267,8 @@ void registerTestHandlers(Channel channel) {
(EchoRequest req) => Stream.value('Input: ${req.input}'))
..addStreamEndpoint((ThrowTestError _) =>
Stream<void>.error(const TestError('Oops'), StackTrace.current))
..addStreamEndpoint((InfiniteStream _) =>
Stream<void>.periodic(const Duration(milliseconds: 100)));
..addStreamEndpoint(
(InfiniteStream _) => Stream<void>.periodic(InfiniteStream.interval));
}

class TestIsolateConfig {
Expand Down Expand Up @@ -317,6 +354,8 @@ class ThrowTestError extends Request<Null> {
}

class InfiniteStream extends Request<Null> {
static const interval = Duration(milliseconds: 10);

@override
StringMap serialize(SerializationContext context) => {};

Expand Down

0 comments on commit 1453997

Please sign in to comment.