diff --git a/pkgs/dart_mcp_server/CHANGELOG.md b/pkgs/dart_mcp_server/CHANGELOG.md index d5fb6d0..f153498 100644 --- a/pkgs/dart_mcp_server/CHANGELOG.md +++ b/pkgs/dart_mcp_server/CHANGELOG.md @@ -15,6 +15,7 @@ * Add the abillity to limit the output of `analyze_files` to a set of paths. * Stop reporting non-zero exit codes from command line tools as tool errors. * Add descriptions for pub tools, add support for `pub deps` and `pub outdated`. +* Fix a bug in hot_reload ([#290](https://github.com/dart-lang/ai/issues/290)). # 0.1.0 (Dart SDK 3.9.0) diff --git a/pkgs/dart_mcp_server/lib/src/mixins/dtd.dart b/pkgs/dart_mcp_server/lib/src/mixins/dtd.dart index d1ec204..693823b 100644 --- a/pkgs/dart_mcp_server/lib/src/mixins/dtd.dart +++ b/pkgs/dart_mcp_server/lib/src/mixins/dtd.dart @@ -177,7 +177,9 @@ base mixin DartToolingDaemonSupport return _callOnVmService( callback: (vmService) async { final appListener = await _AppListener.forVmService(vmService, this); - if (!appListener.registeredServices.contains(_flutterDriverService)) { + if (!appListener.registeredServices.containsKey( + _flutterDriverService, + )) { return _flutterDriverNotRegistered; } final vm = await vmService.getVM(); @@ -358,36 +360,17 @@ base mixin DartToolingDaemonSupport Future hotReload(CallToolRequest request) async { return _callOnVmService( callback: (vmService) async { + final appListener = await _AppListener.forVmService(vmService, this); if (request.arguments?['clearRuntimeErrors'] == true) { - (await _AppListener.forVmService(vmService, this)).errorLog.clear(); + appListener.errorLog.clear(); } final vm = await vmService.getVM(); ReloadReport? report; - StreamSubscription? serviceStreamSubscription; - try { - final hotReloadMethodNameCompleter = Completer(); - serviceStreamSubscription = vmService - .onEvent(EventStreams.kService) - .listen((Event e) { - if (e.kind == EventKind.kServiceRegistered) { - final serviceName = e.service!; - if (serviceName == 'reloadSources') { - // This may look something like 's0.reloadSources'. - hotReloadMethodNameCompleter.complete(e.method); - } - } - }); - await vmService.streamListen(EventStreams.kService); - - final hotReloadMethodName = await hotReloadMethodNameCompleter.future - .timeout( - const Duration(milliseconds: 1000), - onTimeout: () async { - return null; - }, - ); + try { + final hotReloadMethodName = await appListener + .waitForServiceRegistration('reloadSources'); /// If we haven't seen a specific one, we just call the default one. if (hotReloadMethodName == null) { @@ -406,9 +389,12 @@ base mixin DartToolingDaemonSupport report = ReloadReport(success: false); } } - } finally { - await serviceStreamSubscription?.cancel(); - await vmService.streamCancel(EventStreams.kService); + } catch (e) { + // Handle potential errors during the process + return CallToolResult( + isError: true, + content: [TextContent(text: 'Hot reload failed: $e')], + ); } final success = report.success == true; return CallToolResult( @@ -1066,7 +1052,12 @@ class _AppListener { /// A broadcast stream of all errors that come in after you start listening. Stream get errorsStream => _errorsController.stream; - final Set registeredServices; + /// A map of service names to the names of their methods. + final Map registeredServices; + + /// A map of service names to completers that should be fired when the service + /// is registered. + final _pendingServiceRequests = >>{}; /// Controller for the [errorsStream]. final StreamController _errorsController; @@ -1105,9 +1096,36 @@ class _AppListener { final errorLog = ErrorLog(); errorsController.stream.listen(errorLog.add); final subscriptions = >[]; - final registeredServices = {}; + final registeredServices = {}; + final pendingServiceRequests = >>{}; try { + subscriptions.addAll([ + vmService.onServiceEvent.listen((Event e) { + switch (e.kind) { + case EventKind.kServiceRegistered: + final serviceName = e.service!; + registeredServices[serviceName] = e.method; + // If there are any pending requests for this service, complete + // them. + if (pendingServiceRequests.containsKey(serviceName)) { + for (final completer + in pendingServiceRequests[serviceName]!) { + completer.complete(e.method); + } + pendingServiceRequests.remove(serviceName); + } + case EventKind.kServiceUnregistered: + registeredServices.remove(e.service!); + } + }), + vmService.onIsolateEvent.listen((e) { + switch (e.kind) { + case EventKind.kServiceExtensionAdded: + registeredServices[e.extensionRPC!] = null; + } + }), + ]); subscriptions.add( vmService.onExtensionEventWithHistory.listen((Event e) { if (e.extensionKind == 'Flutter.Error') { @@ -1135,23 +1153,6 @@ class _AppListener { }), ); - subscriptions.addAll([ - vmService.onServiceEvent.listen((Event e) { - switch (e.kind) { - case EventKind.kServiceRegistered: - registeredServices.add(e.service!); - case EventKind.kServiceUnregistered: - registeredServices.remove(e.service!); - } - }), - vmService.onIsolateEvent.listen((e) { - switch (e.kind) { - case EventKind.kServiceExtensionAdded: - registeredServices.add(e.extensionRPC!); - } - }), - ]); - await [ vmService.streamListen(EventStreams.kExtension), vmService.streamListen(EventStreams.kIsolate), @@ -1161,7 +1162,9 @@ class _AppListener { final vm = await vmService.getVM(); final isolate = await vmService.getIsolate(vm.isolates!.first.id!); - registeredServices.addAll(isolate.extensionRPCs ?? []); + for (final extension in isolate.extensionRPCs ?? []) { + registeredServices[extension] = null; + } } catch (e) { logger.log(LoggingLevel.error, 'Error subscribing to app errors: $e'); } @@ -1175,18 +1178,46 @@ class _AppListener { }(); } + /// Returns a future that completes with the registered method name for the + /// given [serviceName]. + Future waitForServiceRegistration( + String serviceName, { + Duration timeout = const Duration(seconds: 1), + }) async { + if (registeredServices.containsKey(serviceName)) { + return registeredServices[serviceName]; + } + final completer = Completer(); + _pendingServiceRequests.putIfAbsent(serviceName, () => []).add(completer); + + return completer.future.timeout( + timeout, + onTimeout: () { + // Important: Clean up the completer from the list on timeout. + _pendingServiceRequests[serviceName]?.remove(completer); + if (_pendingServiceRequests[serviceName]?.isEmpty ?? false) { + _pendingServiceRequests.remove(serviceName); + } + return null; // Return null on timeout + }, + ); + } + Future shutdown() async { errorLog.clear(); registeredServices.clear(); await _errorsController.close(); await Future.wait(_subscriptions.map((s) => s.cancel())); try { - await _vmService.streamCancel(EventStreams.kExtension); - await _vmService.streamCancel(EventStreams.kIsolate); - await _vmService.streamCancel(EventStreams.kStderr); - await _vmService.streamCancel(EventStreams.kService); + await [ + _vmService.streamCancel(EventStreams.kExtension), + _vmService.streamCancel(EventStreams.kIsolate), + _vmService.streamCancel(EventStreams.kStderr), + _vmService.streamCancel(EventStreams.kService), + ].wait; } on RPCError catch (_) { - // The vm service might already be disposed in which causes these to fail. + // The vm service might already be disposed which could cause these to + // fail. } } }