Skip to content

Commit

Permalink
Reland "[ Service / package:dds ] Add stream support to package:dds a…
Browse files Browse the repository at this point in the history
…nd enable DDS for VM service tests"

This reverts commit cccddf3.

Change-Id: Iabde3542d5be33ffabf50efd9226597aef876ab7
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/143961
Reviewed-by: Alexander Aprelev <aam@google.com>
Commit-Queue: Ben Konyi <bkonyi@google.com>
  • Loading branch information
bkonyi authored and commit-bot@chromium.org committed Apr 17, 2020
1 parent 40661e3 commit 5b19445
Show file tree
Hide file tree
Showing 40 changed files with 708 additions and 133 deletions.
2 changes: 1 addition & 1 deletion DEPS
Expand Up @@ -146,7 +146,7 @@ vars = {
"usage_tag": "3.4.0",
"watcher_rev": "0.9.7+14",
"web_components_rev": "8f57dac273412a7172c8ade6f361b407e2e4ed02",
"web_socket_channel_tag": "1.0.15",
"web_socket_channel_tag": "1.1.0",
"WebCore_rev": "fb11e887f77919450e497344da570d780e078bc8",
"yaml_tag": "2.2.0",
"zlib_rev": "c44fb7248079cc3d5563b14b3f758aee60d6b415",
Expand Down
9 changes: 9 additions & 0 deletions pkg/dds/lib/dds.dart
Expand Up @@ -7,15 +7,24 @@
library dds;

import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'dart:typed_data';

import 'package:async/async.dart';
import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc;
import 'package:pedantic/pedantic.dart';
import 'package:shelf/shelf.dart';
import 'package:shelf/shelf_io.dart' as io;
import 'package:shelf_proxy/shelf_proxy.dart';
import 'package:shelf_web_socket/shelf_web_socket.dart';
import 'package:stream_channel/stream_channel.dart';
import 'package:web_socket_channel/web_socket_channel.dart';

part 'src/binary_compatible_peer.dart';
part 'src/client.dart';
part 'src/dds_impl.dart';
part 'src/stream_manager.dart';

/// An intermediary between a Dart VM service and its clients that offers
/// additional functionality on top of the standard VM service protocol.
Expand Down
60 changes: 60 additions & 0 deletions pkg/dds/lib/src/binary_compatible_peer.dart
@@ -0,0 +1,60 @@
// Copyright (c) 2020, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

part of dds;

/// Adds support for binary events send from the VM service, which are not part
/// of the official JSON RPC 2.0 specification.
///
/// A binary event from the VM service has the form:
/// ```
/// type BinaryEvent {
/// dataOffset : uint32,
/// metadata : uint8[dataOffset-4],
/// data : uint8[],
/// }
/// ```
/// where `metadata` is the JSON body of the event.
///
/// [_BinaryCompatiblePeer] assumes that only stream events can contain a
/// binary payload (e.g., clients cannot send a `BinaryEvent` to the VM service).
class _BinaryCompatiblePeer extends json_rpc.Peer {
_BinaryCompatiblePeer(WebSocketChannel ws, _StreamManager streamManager)
: super(
ws.transform<String>(
StreamChannelTransformer(
StreamTransformer<dynamic, String>.fromHandlers(
handleData: (data, EventSink<String> sink) =>
_transformStream(streamManager, data, sink)),
StreamSinkTransformer<String, dynamic>.fromHandlers(
handleData: (String data, EventSink<dynamic> sink) {
sink.add(data);
},
),
),
),
);

static void _transformStream(
_StreamManager streamManager, dynamic data, EventSink<String> sink) {
if (data is String) {
// Non-binary messages come in as Strings. Simply forward to the sink.
sink.add(data);
} else if (data is Uint8List) {
// Only binary events will result in `data` being of type Uint8List. We
// need to manually forward them here.
final bytesView =
ByteData.view(data.buffer, data.offsetInBytes, data.lengthInBytes);
const metadataOffset = 4;
final dataOffset = bytesView.getUint32(0, Endian.little);
final metadataLength = dataOffset - metadataOffset;
final metadata = Utf8Decoder().convert(new Uint8List.view(
bytesView.buffer,
bytesView.offsetInBytes + metadataOffset,
metadataLength));
final decodedMetadata = json.decode(metadata);
streamManager.streamNotify(decodedMetadata['params']['streamId'], data);
}
}
}
68 changes: 68 additions & 0 deletions pkg/dds/lib/src/client.dart
@@ -0,0 +1,68 @@
// Copyright (c) 2020, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

part of dds;

/// Representation of a single DDS client which manages the connection and
/// DDS request intercepting / forwarding.
class _DartDevelopmentServiceClient {
_DartDevelopmentServiceClient(
this.dds,
this.ws,
json_rpc.Peer vmServicePeer,
) : _vmServicePeer = vmServicePeer {
_clientPeer = json_rpc.Peer(ws.cast<String>());
_registerJsonRpcMethods();
}

/// Start receiving JSON RPC requests from the client.
///
/// Returned future completes when the peer is closed.
Future<void> listen() => _clientPeer.listen().then(
(_) => dds.streamManager.clientDisconnect(this),
);

/// Close the connection to the client.
Future<void> close() async {
// Cleanup the JSON RPC server for this connection if DDS has shutdown.
await _clientPeer.close();
}

/// Send a JSON RPC notification to the client.
void sendNotification(String method, [dynamic parameters]) async {
if (_clientPeer.isClosed) {
return;
}
_clientPeer.sendNotification(method, parameters);
}

/// Registers handlers for JSON RPC methods which need to be intercepted by
/// DDS as well as fallback request forwarder.
void _registerJsonRpcMethods() {
_clientPeer.registerMethod('streamListen', (parameters) async {
final streamId = parameters['streamId'].asString;
await dds.streamManager.streamListen(this, streamId);
return _success;
});

_clientPeer.registerMethod('streamCancel', (parameters) async {
final streamId = parameters['streamId'].asString;
await dds.streamManager.streamCancel(this, streamId);
return _success;
});

// Unless otherwise specified, the request is forwarded to the VM service.
_clientPeer.registerFallback((parameters) async =>
await _vmServicePeer.sendRequest(parameters.method, parameters.asMap));
}

static const _success = <String, dynamic>{
'type': 'Success',
};

final _DartDevelopmentService dds;
final json_rpc.Peer _vmServicePeer;
final WebSocketChannel ws;
json_rpc.Peer _clientPeer;
}
54 changes: 42 additions & 12 deletions pkg/dds/lib/src/dds_impl.dart
Expand Up @@ -5,12 +5,20 @@
part of dds;

class _DartDevelopmentService implements DartDevelopmentService {
_DartDevelopmentService(this._remoteVmServiceUri, this._uri);
_DartDevelopmentService(this._remoteVmServiceUri, this._uri) {
_streamManager = _StreamManager(this);
}

Future<void> startService() async {
// Establish the connection to the VM service.
_vmServiceSocket = await WebSocket.connect(remoteVmServiceWsUri.toString());
_vmServiceStream = _vmServiceSocket.asBroadcastStream();
_vmServiceSocket = WebSocketChannel.connect(remoteVmServiceWsUri);
_vmServiceClient = _BinaryCompatiblePeer(_vmServiceSocket, _streamManager);
// Setup the JSON RPC client with the VM service.
unawaited(_vmServiceClient.listen());

// Setup stream event handling.
streamManager.listen();

// Once we have a connection to the VM service, we're ready to spawn the intermediary.
await _startDDSServer();
}
Expand All @@ -28,8 +36,20 @@ class _DartDevelopmentService implements DartDevelopmentService {

/// Stop accepting requests after gracefully handling existing requests.
Future<void> shutdown() async {
// Don't accept anymore HTTP requests.
await _server.close();
await _vmServiceSocket.close();

// Close all incoming websocket connections.
final futures = <Future>[];
for (final client in _clients) {
futures.add(client.close());
}
await Future.wait(futures);

// Close connection to VM service.
await _vmServiceSocket.sink.close();

_done.complete();
}

// Attempt to upgrade HTTP requests to a websocket before processing them as
Expand All @@ -38,16 +58,18 @@ class _DartDevelopmentService implements DartDevelopmentService {
Cascade _handlers() => Cascade().add(_webSocketHandler()).add(_httpHandler());

Handler _webSocketHandler() => webSocketHandler((WebSocketChannel ws) {
// TODO(bkonyi): actually process requests instead of blindly forwarding them.
_vmServiceStream.listen(
(event) => ws.sink.add(event),
onDone: () => ws.sink.close(),
final client = _DartDevelopmentServiceClient(
this,
ws,
_vmServiceClient,
);
ws.stream.listen((event) => _vmServiceSocket.add(event));
_clients.add(client);
client.listen().then((_) => _clients.remove(client));
});

Handler _httpHandler() {
// TODO(bkonyi): actually process requests instead of blindly forwarding them.
// DDS doesn't support any HTTP requests itself, so we just forward all of
// them to the VM service.
final cascade = Cascade().add(proxyHandler(remoteVmServiceUri));
return cascade.handler;
}
Expand Down Expand Up @@ -79,7 +101,15 @@ class _DartDevelopmentService implements DartDevelopmentService {

bool get isRunning => _uri != null;

WebSocket _vmServiceSocket;
Stream _vmServiceStream;
Future<void> get done => _done.future;
Completer _done = Completer<void>();

_StreamManager get streamManager => _streamManager;
_StreamManager _streamManager;

final List<_DartDevelopmentServiceClient> _clients = [];

json_rpc.Peer _vmServiceClient;
WebSocketChannel _vmServiceSocket;
HttpServer _server;
}
113 changes: 113 additions & 0 deletions pkg/dds/lib/src/stream_manager.dart
@@ -0,0 +1,113 @@
// Copyright (c) 2020, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

part of dds;

class _StreamManager {
_StreamManager(this.dds);

/// Send `streamNotify` notifications to clients subscribed to `streamId`.
///
/// If `data` is of type `Uint8List`, the notification is assumed to be a
/// binary event and is forwarded directly over the subscriber's websocket.
/// Otherwise, the event is sent via the JSON RPC client.
void streamNotify(String streamId, data) {
if (streamListeners.containsKey(streamId)) {
final listeners = streamListeners[streamId];
final isBinaryData = data is Uint8List;
for (final listener in listeners) {
if (isBinaryData) {
listener.ws.sink.add(data);
} else {
listener.sendNotification('streamNotify', data);
}
}
}
}

/// Start listening for `streamNotify` events from the VM service and forward
/// them to the clients which have subscribed to the stream.
void listen() => dds._vmServiceClient.registerMethod(
'streamNotify',
(parameters) {
final streamId = parameters['streamId'].asString;
streamNotify(streamId, parameters.value);
},
);

/// Subscribes `client` to a stream.
///
/// If `client` is the first client to listen to `stream`, DDS will send a
/// `streamListen` request for `stream` to the VM service.
Future<void> streamListen(
_DartDevelopmentServiceClient client,
String stream,
) async {
assert(stream != null && stream.isNotEmpty);
if (!streamListeners.containsKey(stream)) {
// This will return an RPC exception if the stream doesn't exist. This
// will throw and the exception will be forwarded to the client.
final result = await dds._vmServiceClient.sendRequest('streamListen', {
'streamId': stream,
});
assert(result['type'] == 'Success');
streamListeners[stream] = <_DartDevelopmentServiceClient>[];
}
if (streamListeners[stream].contains(client)) {
throw kStreamAlreadySubscribedException;
}
streamListeners[stream].add(client);
}

/// Unsubscribes `client` from a stream.
///
/// If `client` is the last client to unsubscribe from `stream`, DDS will
/// send a `streamCancel` request for `stream` to the VM service.
Future<void> streamCancel(
_DartDevelopmentServiceClient client,
String stream,
) async {
assert(stream != null && stream.isNotEmpty);
final listeners = streamListeners[stream];
if (listeners == null || !listeners.contains(client)) {
throw kStreamNotSubscribedException;
}
listeners.remove(client);
if (listeners.isEmpty) {
streamListeners.remove(stream);
final result = await dds._vmServiceClient.sendRequest('streamCancel', {
'streamId': stream,
});
assert(result['type'] == 'Success');
} else {
streamListeners[stream] = listeners;
}
}

/// Cleanup stream subscriptions for `client` when it has disconnected.
void clientDisconnect(_DartDevelopmentServiceClient client) {
for (final streamId in streamListeners.keys.toList()) {
streamCancel(client, streamId);
}
}

// These error codes must be kept in sync with those in vm/json_stream.h and
// vmservice.dart.
static const kStreamAlreadySubscribed = 103;
static const kStreamNotSubscribed = 104;

// Keep these messages in sync with the VM service.
static final kStreamAlreadySubscribedException = json_rpc.RpcException(
kStreamAlreadySubscribed,
'Stream already subscribed',
);

static final kStreamNotSubscribedException = json_rpc.RpcException(
kStreamNotSubscribed,
'Stream not subscribed',
);

final _DartDevelopmentService dds;
final streamListeners = <String, List<_DartDevelopmentServiceClient>>{};
}
4 changes: 3 additions & 1 deletion pkg/dds/pubspec.yaml
Expand Up @@ -11,13 +11,15 @@ environment:
sdk: '>=2.6.0 <3.0.0'

dependencies:
async: ^2.4.1
json_rpc_2: ^2.1.0
pedantic: ^1.7.0
shelf: ^0.7.5
shelf_proxy: ^0.1.0+7
shelf_web_socket: ^0.2.3
stream_channel: ^2.0.0
web_socket_channel: ^1.1.0

dev_dependencies:
pedantic: ^1.7.0
test: ^1.0.0
vm_service: ^4.0.0

0 comments on commit 5b19445

Please sign in to comment.