Skip to content

Commit

Permalink
[ DDS ] Add getStreamHistory RPC and package:vm_service extensions
Browse files Browse the repository at this point in the history
Adds a DDS RPC which allows for stream history to be manually requested
in addition to being sent upon initial stream subscription.

Also adds an initial implementation of
package:dds/vm_service_extensions.dart, which adds DDS functionality to
the `VmService` class.

Fixes #44505

Change-Id: I198a6fd7fca15f131a6fdd95e7860a6f98ef06a7
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/177182
Reviewed-by: Kenzie Schmoll <kenzieschmoll@google.com>
Commit-Queue: Ben Konyi <bkonyi@google.com>
  • Loading branch information
bkonyi authored and commit-bot@chromium.org committed Dec 30, 2020
1 parent b3f22d0 commit 34b4c91
Show file tree
Hide file tree
Showing 10 changed files with 195 additions and 7 deletions.
5 changes: 5 additions & 0 deletions pkg/dds/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
# 1.7.0
- Added `package:dds/vm_service_extensions.dart`, which adds DDS functionality to
`package:vm_service` when imported.
- Added `getStreamHistory` RPC.

# 1.6.1
- Fixed unhandled `StateError` that could be thrown if the VM service disconnected
while a request was outstanding.
Expand Down
29 changes: 27 additions & 2 deletions pkg/dds/dds_protocol.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Dart Development Service Protocol 1.1
# Dart Development Service Protocol 1.2

This document describes _version 1.1_ of the Dart Development Service Protocol.
This document describes _version 1.2_ of the Dart Development Service Protocol.
This protocol is an extension of the Dart VM Service Protocol and implements it
in it's entirety. For details on the VM Service Protocol, see the [Dart VM Service Protocol Specification][service-protocol].

Expand Down Expand Up @@ -103,6 +103,19 @@ disabled.

See [Size](#size).


### getStreamHistory

```
StreamHistory getStreamHistory(string streamId)
```

The _getStreamHistory_ RPC is used to retrieve historical events for streams
which support event history (see [Streams](#streams) for a list of supported
streams).

See [StreamHistory](#streamhistory).

### requirePermissionToResume

```
Expand Down Expand Up @@ -189,12 +202,24 @@ class Size extends Response {

A simple object representing a size response.

### StreamHistory

```
class StreamHistory extends Response {
// A list of historical Events for a stream.
List<Event> history;
}
```

See [getStreamHistory](#getStreamHistory).

## Revision History

version | comments
------- | --------
1.0 | Initial revision
1.1 | Added `getDartDevelopmentServiceVersion` RPC.
1.2 | Added `getStreamHistory` RPC.

[resume]: https://github.com/dart-lang/sdk/blob/master/runtime/vm/service/service.md#resume
[success]: https://github.com/dart-lang/sdk/blob/master/runtime/vm/service/service.md#success
Expand Down
2 changes: 1 addition & 1 deletion pkg/dds/lib/dds.dart
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ abstract class DartDevelopmentService {

/// The version of the DDS protocol supported by this [DartDevelopmentService]
/// instance.
static const String protocolVersion = '1.1';
static const String protocolVersion = '1.2';
}

class DartDevelopmentServiceException implements Exception {
Expand Down
14 changes: 14 additions & 0 deletions pkg/dds/lib/src/client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,20 @@ class DartDevelopmentServiceClient {
(parameters) => dds.isolateManager.resumeIsolate(this, parameters),
);

_clientPeer.registerMethod('getStreamHistory', (parameters) {
final stream = parameters['stream'].asString;
final events = dds.streamManager.getStreamHistory(stream);
if (events == null) {
throw json_rpc.RpcException.invalidParams(
"Event history is not collected for stream '$stream'",
);
}
return <String, dynamic>{
'type': 'StreamHistory',
'history': events,
};
});

_clientPeer.registerMethod(
'getLogHistorySize',
(parameters) => {
Expand Down
9 changes: 9 additions & 0 deletions pkg/dds/lib/src/stream_manager.dart
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,15 @@ class StreamManager {
}
}

List<Map<String, dynamic>> getStreamHistory(String stream) {
if (!loggingRepositories.containsKey(stream)) {
return null;
}
return [
for (final event in loggingRepositories[stream]()) event,
];
}

/// Unsubscribes `client` from a stream.
///
/// If `client` is the last client to unsubscribe from `stream`, DDS will
Expand Down
84 changes: 84 additions & 0 deletions pkg/dds/lib/vm_service_extensions.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright (c) 2020, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:collection';

import 'package:meta/meta.dart';
import 'package:vm_service/src/vm_service.dart';

extension DdsExtension on VmService {
static bool _factoriesRegistered = false;
static Version _ddsVersion;

/// The _getDartDevelopmentServiceVersion_ RPC is used to determine what version of
/// the Dart Development Service Protocol is served by a DDS instance.
///
/// The result of this call is cached for subsequent invocations.
Future<Version> getDartDevelopmentServiceVersion() async {
if (_ddsVersion == null) {
_ddsVersion =
await _callHelper<Version>('getDartDevelopmentServiceVersion');
}
return _ddsVersion;
}

/// Retrieve the event history for `stream`.
///
/// If `stream` does not have event history collected, a parameter error is
/// returned.
Future<StreamHistory> getStreamHistory(String stream) async {
if (!(await _versionCheck(1, 2))) {
throw UnimplementedError('getStreamHistory requires DDS version 1.2');
}
return _callHelper<StreamHistory>('getStreamHistory', args: {
'stream': stream,
});
}

Future<bool> _versionCheck(int major, int minor) async {
if (_ddsVersion == null) {
_ddsVersion = await getDartDevelopmentServiceVersion();
}
return ((_ddsVersion.major == major && _ddsVersion.minor >= minor) ||
(_ddsVersion.major > major));
}

Future<T> _callHelper<T>(String method,
{String isolateId, Map args = const {}}) {
if (!_factoriesRegistered) {
_registerFactories();
}
return callMethod(
method,
args: {
if (isolateId != null) 'isolateId': isolateId,
...args,
},
).then((e) => e as T);
}

static void _registerFactories() {
addTypeFactory('StreamHistory', StreamHistory.parse);
_factoriesRegistered = true;
}
}

/// A collection of historical [Event]s from some stream.
class StreamHistory extends Response {
static StreamHistory parse(Map<String, dynamic> json) =>
json == null ? null : StreamHistory._fromJson(json);

StreamHistory({@required List<Event> history}) : _history = history;

StreamHistory._fromJson(Map<String, dynamic> json)
: _history = List<Event>.from(
createServiceObject(json['history'], const ['Event']) as List ??
[]) {
type = json['type'];
}

/// Historical [Event]s for a stream.
List<Event> get history => UnmodifiableListView(_history);
final List<Event> _history;
}
4 changes: 2 additions & 2 deletions pkg/dds/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ description: >-
A library used to spawn the Dart Developer Service, used to communicate with
a Dart VM Service instance.
version: 1.6.1
version: 1.7.0

homepage: https://github.com/dart-lang/sdk/tree/master/pkg/dds

Expand All @@ -20,10 +20,10 @@ dependencies:
shelf_web_socket: ^0.2.3
sse: ^3.5.0
stream_channel: ^2.0.0
vm_service: ^5.0.0
web_socket_channel: ^1.1.0

dev_dependencies:
shelf_static: ^0.2.8
test: ^1.0.0
vm_service: ^5.0.0
webdriver: ^2.1.2
7 changes: 5 additions & 2 deletions pkg/dds/test/common/test_helper.dart
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ import 'dart:io';

Uri remoteVmServiceUri;

Future<Process> spawnDartProcess(String script) async {
Future<Process> spawnDartProcess(
String script, {
bool pauseOnStart = true,
}) async {
final executable = Platform.executable;
final tmpDir = await Directory.systemTemp.createTemp('dart_service');
final serviceInfoUri = tmpDir.uri.resolve('service_info.json');
Expand All @@ -16,7 +19,7 @@ Future<Process> spawnDartProcess(String script) async {
final arguments = [
'--disable-dart-dev',
'--observe=0',
'--pause-isolates-on-start',
if (pauseOnStart) '--pause-isolates-on-start',
'--write-service-info=$serviceInfoUri',
...Platform.executableArguments,
Platform.script.resolve(script).toString(),
Expand Down
8 changes: 8 additions & 0 deletions pkg/dds/test/get_stream_history_script.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import 'dart:developer';

void main() {
for (int i = 0; i < 10; ++i) {
log(i.toString());
}
debugger();
}
40 changes: 40 additions & 0 deletions pkg/dds/test/get_stream_history_test.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright (c) 2020, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:io';

import 'package:dds/dds.dart';
import 'package:dds/vm_service_extensions.dart';
import 'package:test/test.dart';
import 'package:vm_service/vm_service_io.dart';
import 'common/test_helper.dart';

void main() {
Process process;
DartDevelopmentService dds;

setUp(() async {
process = await spawnDartProcess('get_stream_history_script.dart',
pauseOnStart: false);
});

tearDown(() async {
await dds?.shutdown();
process?.kill();
dds = null;
process = null;
});

test('getStreamHistory returns log history', () async {
dds = await DartDevelopmentService.startDartDevelopmentService(
remoteVmServiceUri,
);
expect(dds.isRunning, true);
final service = await vmServiceConnectUri(dds.wsUri.toString());
final result = await service.getStreamHistory('Logging');
expect(result, isNotNull);
expect(result, isA<StreamHistory>());
expect(result.history.length, 10);
});
}

0 comments on commit 34b4c91

Please sign in to comment.