Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkgs/dart_mcp_server/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* Add the abillity to limit the output of `analyze_files` to a set of paths.
* Stop reporting non-zero exit codes from command line tools as tool errors.
* Add descriptions for pub tools, add support for `pub deps` and `pub outdated`.
* Fix a bug in hot_reload ([#290](https://github.com/dart-lang/ai/issues/290)).

# 0.1.0 (Dart SDK 3.9.0)

Expand Down
137 changes: 84 additions & 53 deletions pkgs/dart_mcp_server/lib/src/mixins/dtd.dart
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,9 @@ base mixin DartToolingDaemonSupport
return _callOnVmService(
callback: (vmService) async {
final appListener = await _AppListener.forVmService(vmService, this);
if (!appListener.registeredServices.contains(_flutterDriverService)) {
if (!appListener.registeredServices.containsKey(
_flutterDriverService,
)) {
return _flutterDriverNotRegistered;
}
final vm = await vmService.getVM();
Expand Down Expand Up @@ -358,36 +360,17 @@ base mixin DartToolingDaemonSupport
Future<CallToolResult> hotReload(CallToolRequest request) async {
return _callOnVmService(
callback: (vmService) async {
final appListener = await _AppListener.forVmService(vmService, this);
if (request.arguments?['clearRuntimeErrors'] == true) {
(await _AppListener.forVmService(vmService, this)).errorLog.clear();
appListener.errorLog.clear();
}

final vm = await vmService.getVM();
ReloadReport? report;
StreamSubscription<Event>? serviceStreamSubscription;
try {
final hotReloadMethodNameCompleter = Completer<String?>();
serviceStreamSubscription = vmService
.onEvent(EventStreams.kService)
.listen((Event e) {
if (e.kind == EventKind.kServiceRegistered) {
final serviceName = e.service!;
if (serviceName == 'reloadSources') {
// This may look something like 's0.reloadSources'.
hotReloadMethodNameCompleter.complete(e.method);
}
}
});

await vmService.streamListen(EventStreams.kService);

final hotReloadMethodName = await hotReloadMethodNameCompleter.future
.timeout(
const Duration(milliseconds: 1000),
onTimeout: () async {
return null;
},
);
try {
final hotReloadMethodName = await appListener
.waitForServiceRegistration('reloadSources');

/// If we haven't seen a specific one, we just call the default one.
if (hotReloadMethodName == null) {
Expand All @@ -406,9 +389,12 @@ base mixin DartToolingDaemonSupport
report = ReloadReport(success: false);
}
}
} finally {
await serviceStreamSubscription?.cancel();
await vmService.streamCancel(EventStreams.kService);
} catch (e) {
// Handle potential errors during the process
return CallToolResult(
isError: true,
content: [TextContent(text: 'Hot reload failed: $e')],
);
}
final success = report.success == true;
return CallToolResult(
Expand Down Expand Up @@ -1066,7 +1052,12 @@ class _AppListener {
/// A broadcast stream of all errors that come in after you start listening.
Stream<String> get errorsStream => _errorsController.stream;

final Set<String> registeredServices;
/// A map of service names to the names of their methods.
final Map<String, String?> registeredServices;

/// A map of service names to completers that should be fired when the service
/// is registered.
final _pendingServiceRequests = <String, List<Completer<String?>>>{};

/// Controller for the [errorsStream].
final StreamController<String> _errorsController;
Expand Down Expand Up @@ -1105,9 +1096,36 @@ class _AppListener {
final errorLog = ErrorLog();
errorsController.stream.listen(errorLog.add);
final subscriptions = <StreamSubscription<void>>[];
final registeredServices = <String>{};
final registeredServices = <String, String?>{};
final pendingServiceRequests = <String, List<Completer<String?>>>{};

try {
subscriptions.addAll([
vmService.onServiceEvent.listen((Event e) {
switch (e.kind) {
case EventKind.kServiceRegistered:
final serviceName = e.service!;
registeredServices[serviceName] = e.method;
// If there are any pending requests for this service, complete
// them.
if (pendingServiceRequests.containsKey(serviceName)) {
for (final completer
in pendingServiceRequests[serviceName]!) {
completer.complete(e.method);
}
pendingServiceRequests.remove(serviceName);
}
case EventKind.kServiceUnregistered:
registeredServices.remove(e.service!);
}
}),
vmService.onIsolateEvent.listen((e) {
switch (e.kind) {
case EventKind.kServiceExtensionAdded:
registeredServices[e.extensionRPC!] = null;
}
}),
]);
subscriptions.add(
vmService.onExtensionEventWithHistory.listen((Event e) {
if (e.extensionKind == 'Flutter.Error') {
Expand Down Expand Up @@ -1135,23 +1153,6 @@ class _AppListener {
}),
);

subscriptions.addAll([
vmService.onServiceEvent.listen((Event e) {
switch (e.kind) {
case EventKind.kServiceRegistered:
registeredServices.add(e.service!);
case EventKind.kServiceUnregistered:
registeredServices.remove(e.service!);
}
}),
vmService.onIsolateEvent.listen((e) {
switch (e.kind) {
case EventKind.kServiceExtensionAdded:
registeredServices.add(e.extensionRPC!);
}
}),
]);

await [
vmService.streamListen(EventStreams.kExtension),
vmService.streamListen(EventStreams.kIsolate),
Expand All @@ -1161,7 +1162,9 @@ class _AppListener {

final vm = await vmService.getVM();
final isolate = await vmService.getIsolate(vm.isolates!.first.id!);
registeredServices.addAll(isolate.extensionRPCs ?? []);
for (final extension in isolate.extensionRPCs ?? <String>[]) {
registeredServices[extension] = null;
}
} catch (e) {
logger.log(LoggingLevel.error, 'Error subscribing to app errors: $e');
}
Expand All @@ -1175,18 +1178,46 @@ class _AppListener {
}();
}

/// Returns a future that completes with the registered method name for the
/// given [serviceName].
Future<String?> waitForServiceRegistration(
String serviceName, {
Duration timeout = const Duration(seconds: 1),
}) async {
if (registeredServices.containsKey(serviceName)) {
return registeredServices[serviceName];
}
final completer = Completer<String?>();
_pendingServiceRequests.putIfAbsent(serviceName, () => []).add(completer);

return completer.future.timeout(
timeout,
onTimeout: () {
// Important: Clean up the completer from the list on timeout.
_pendingServiceRequests[serviceName]?.remove(completer);
if (_pendingServiceRequests[serviceName]?.isEmpty ?? false) {
_pendingServiceRequests.remove(serviceName);
}
return null; // Return null on timeout
},
);
}

Future<void> shutdown() async {
errorLog.clear();
registeredServices.clear();
await _errorsController.close();
await Future.wait(_subscriptions.map((s) => s.cancel()));
try {
await _vmService.streamCancel(EventStreams.kExtension);
await _vmService.streamCancel(EventStreams.kIsolate);
await _vmService.streamCancel(EventStreams.kStderr);
await _vmService.streamCancel(EventStreams.kService);
await [
_vmService.streamCancel(EventStreams.kExtension),
_vmService.streamCancel(EventStreams.kIsolate),
_vmService.streamCancel(EventStreams.kStderr),
_vmService.streamCancel(EventStreams.kService),
].wait;
} on RPCError catch (_) {
// The vm service might already be disposed in which causes these to fail.
// The vm service might already be disposed which could cause these to
// fail.
}
}
}
Expand Down