Skip to content

Commit

Permalink
Add an option for flutter daemon to listen on a TCP port
Browse files Browse the repository at this point in the history
Added a new class DaemonConnection to reuse the connection handling
between daemon server and client, and handle connection with different
medium (stdio, socket).

Added a new option `listen-on-tcp-port` to the flutter daemon command,
when passed, the daemon will accept commands on a port instead of stdio.
  • Loading branch information
chingjun committed Dec 16, 2021
1 parent 840e109 commit 759e9cf
Show file tree
Hide file tree
Showing 6 changed files with 610 additions and 261 deletions.
7 changes: 5 additions & 2 deletions packages/flutter_tools/lib/src/commands/attach.dart
Expand Up @@ -18,6 +18,7 @@ import '../base/io.dart';
import '../build_info.dart';
import '../commands/daemon.dart';
import '../compile.dart';
import '../daemon.dart';
import '../device.dart';
import '../device_port_forwarder.dart';
import '../fuchsia/fuchsia_device.dart';
Expand Down Expand Up @@ -235,8 +236,10 @@ known, it can be explicitly provided to attach via the command-line, e.g.

final Daemon daemon = boolArg('machine')
? Daemon(
stdinCommandStream,
stdoutCommandResponse,
DaemonConnection(
daemonStreams: StdioDaemonStreams(),
logger: globals.logger,
),
notifyingLogger: (globals.logger is NotifyingLogger)
? globals.logger as NotifyingLogger
: NotifyingLogger(verbose: globals.logger.isVerbose, parent: globals.logger),
Expand Down
199 changes: 93 additions & 106 deletions packages/flutter_tools/lib/src/commands/daemon.dart
Expand Up @@ -18,7 +18,7 @@ import '../base/logger.dart';
import '../base/terminal.dart';
import '../base/utils.dart';
import '../build_info.dart';
import '../convert.dart';
import '../daemon.dart';
import '../device.dart';
import '../device_port_forwarder.dart';
import '../emulator.dart';
Expand All @@ -41,7 +41,13 @@ const String protocolVersion = '0.6.1';
/// It can be shutdown with a `daemon.shutdown` command (or by killing the
/// process).
class DaemonCommand extends FlutterCommand {
DaemonCommand({ this.hidden = false });
DaemonCommand({ this.hidden = false }) {
argParser.addOption(
'listen-on-tcp-port',
help: 'If specified, the daemon will be listening for commands on the specified port instead of stdio.',
valueHelp: 'port',
);
}

@override
final String name = 'daemon';
Expand All @@ -57,9 +63,25 @@ class DaemonCommand extends FlutterCommand {

@override
Future<FlutterCommandResult> runCommand() async {
if (argResults['listen-on-tcp-port'] != null) {
final String port = stringArg('listen-on-tcp-port');
await _DaemonServer(
port: int.parse(port),
logger: StdoutLogger(
terminal: globals.terminal,
stdio: globals.stdio,
outputPreferences: globals.outputPreferences,
),
notifyingLogger: asLogger<NotifyingLogger>(globals.logger),
).run();
return FlutterCommandResult.success();
}
globals.printStatus('Starting device daemon...');
final Daemon daemon = Daemon(
stdinCommandStream, stdoutCommandResponse,
DaemonConnection(
daemonStreams: StdioDaemonStreams(),
logger: globals.logger,
),
notifyingLogger: asLogger<NotifyingLogger>(globals.logger),
);
final int code = await daemon.onExit;
Expand All @@ -70,14 +92,57 @@ class DaemonCommand extends FlutterCommand {
}
}

typedef DispatchCommand = void Function(Map<String, dynamic> command);
class _DaemonServer {
_DaemonServer({
this.port,
this.logger,
this.notifyingLogger,
});

int port;

/// Stdout logger used to print general server-related errors.
Logger logger;

// Logger that sends the message to the other end of daemon connection.
NotifyingLogger notifyingLogger;

Future<void> run() async {
final ServerSocket serverSocket = await ServerSocket.bind(InternetAddress.loopbackIPv4, port);
logger.printStatus('Daemon server listening on ${serverSocket.port}');

final StreamSubscription<Socket> subscription = serverSocket.listen(
(Socket socket) async {
// We have to listen to socket.done. Otherwise when the connection is
// reset, we will receive an uncatchable exception.
// https://github.com/dart-lang/sdk/issues/25518
final Future<void> socketDone = socket.done.catchError((dynamic error, StackTrace stackTrace) {
logger.printError('Socket error: $error');
logger.printTrace('$stackTrace');
});
final Daemon daemon = Daemon(
DaemonConnection(
daemonStreams: TcpDaemonStreams(socket),
logger: logger,
),
notifyingLogger: notifyingLogger,
);
await daemon.onExit;
await socketDone;
},
);

// Wait indefinitely until the server closes.
await subscription.asFuture<void>();
await subscription.cancel();
}
}

typedef CommandHandler = Future<dynamic> Function(Map<String, dynamic> args);

class Daemon {
Daemon(
Stream<Map<String, dynamic>> commandStream,
this.sendCommand, {
this.connection, {
this.notifyingLogger,
this.logToStdout = false,
}) {
Expand All @@ -89,26 +154,26 @@ class Daemon {
_registerDomain(devToolsDomain = DevToolsDomain(this));

// Start listening.
_commandSubscription = commandStream.listen(
_commandSubscription = connection.incomingCommands.listen(
_handleRequest,
onDone: () {
shutdown();
if (!_onExitCompleter.isCompleted) {
_onExitCompleter.complete(0);
}
},
);
}

DaemonConnection connection;

DaemonDomain daemonDomain;
AppDomain appDomain;
DeviceDomain deviceDomain;
EmulatorDomain emulatorDomain;
DevToolsDomain devToolsDomain;
StreamSubscription<Map<String, dynamic>> _commandSubscription;
int _outgoingRequestId = 1;
final Map<String, Completer<dynamic>> _outgoingRequestCompleters = <String, Completer<dynamic>>{};

final DispatchCommand sendCommand;
final NotifyingLogger notifyingLogger;
final bool logToStdout;

Expand All @@ -134,62 +199,27 @@ class Daemon {

try {
final String method = request['method'] as String;
if (method != null) {
if (!method.contains('.')) {
throw 'method not understood: $method';
}

final String prefix = method.substring(0, method.indexOf('.'));
final String name = method.substring(method.indexOf('.') + 1);
if (_domainMap[prefix] == null) {
throw 'no domain for method: $method';
}

_domainMap[prefix].handleCommand(name, id, castStringKeyedMap(request['params']) ?? const <String, dynamic>{});
} else {
// If there was no 'method' field then it's a response to a daemon-to-editor request.
final Completer<dynamic> completer = _outgoingRequestCompleters[id.toString()];
if (completer == null) {
throw 'unexpected response with id: $id';
}
_outgoingRequestCompleters.remove(id.toString());
assert(method != null);
if (!method.contains('.')) {
throw 'method not understood: $method';
}

if (request['error'] != null) {
completer.completeError(request['error']);
} else {
completer.complete(request['result']);
}
final String prefix = method.substring(0, method.indexOf('.'));
final String name = method.substring(method.indexOf('.') + 1);
if (_domainMap[prefix] == null) {
throw 'no domain for method: $method';
}
} on Exception catch (error, trace) {
_send(<String, dynamic>{
'id': id,
'error': _toJsonable(error),
'trace': '$trace',
});
}
}

Future<dynamic> sendRequest(String method, [ dynamic args ]) {
final Map<String, dynamic> map = <String, dynamic>{'method': method};
if (args != null) {
map['params'] = _toJsonable(args);
_domainMap[prefix].handleCommand(name, id, castStringKeyedMap(request['params']) ?? const <String, dynamic>{});
} on Exception catch (error, trace) {
connection.sendErrorResponse(id, _toJsonable(error), trace);
}

final int id = _outgoingRequestId++;
final Completer<dynamic> completer = Completer<dynamic>();

map['id'] = id.toString();
_outgoingRequestCompleters[id.toString()] = completer;

_send(map);
return completer.future;
}

void _send(Map<String, dynamic> map) => sendCommand(map);

Future<void> shutdown({ dynamic error }) async {
await devToolsDomain?.dispose();
await _commandSubscription?.cancel();
await connection.dispose();
for (final Domain domain in _domainMap.values) {
await domain.dispose();
}
Expand Down Expand Up @@ -225,30 +255,16 @@ abstract class Domain {
}
throw 'command not understood: $name.$command';
}).then<dynamic>((dynamic result) {
if (result == null) {
_send(<String, dynamic>{'id': id});
} else {
_send(<String, dynamic>{'id': id, 'result': _toJsonable(result)});
}
}).catchError((dynamic error, dynamic trace) {
_send(<String, dynamic>{
'id': id,
'error': _toJsonable(error),
'trace': '$trace',
});
daemon.connection.sendResponse(id, _toJsonable(result));
}).catchError((Object error, StackTrace stackTrace) {
daemon.connection.sendErrorResponse(id, _toJsonable(error), stackTrace);
});
}

void sendEvent(String name, [ dynamic args ]) {
final Map<String, dynamic> map = <String, dynamic>{'event': name};
if (args != null) {
map['params'] = _toJsonable(args);
}
_send(map);
daemon.connection.sendEvent(name, args != null ? _toJsonable(args) : null);
}

void _send(Map<String, dynamic> map) => daemon._send(map);

String _getStringArg(Map<String, dynamic> args, String name, { bool required = false }) {
if (required && !args.containsKey(name)) {
throw '$name is required';
Expand Down Expand Up @@ -346,7 +362,7 @@ class DaemonDomain extends Domain {
/// --web-allow-expose-url switch. The client may return the same URL back if
/// tunnelling is not required for a given URL.
Future<String> exposeUrl(String url) async {
final dynamic res = await daemon.sendRequest('app.exposeUrl', <String, String>{'url': url});
final dynamic res = await daemon.connection.sendRequest('app.exposeUrl', <String, String>{'url': url});
if (res is Map<String, dynamic> && res['url'] is String) {
return res['url'] as String;
} else {
Expand Down Expand Up @@ -907,35 +923,6 @@ class DevToolsDomain extends Domain {
}
}

Stream<Map<String, dynamic>> get stdinCommandStream => globals.stdio.stdin
.transform<String>(utf8.decoder)
.transform<String>(const LineSplitter())
.where((String line) => line.startsWith('[{') && line.endsWith('}]'))
.map<Map<String, dynamic>>((String line) {
line = line.substring(1, line.length - 1);
return castStringKeyedMap(json.decode(line));
});

void stdoutCommandResponse(Map<String, dynamic> command) {
globals.stdio.stdoutWrite(
'[${jsonEncodeObject(command)}]\n',
fallback: (String message, dynamic error, StackTrace stack) {
throwToolExit('Failed to write daemon command response to stdout: $error');
},
);
}

String jsonEncodeObject(dynamic object) {
return json.encode(object, toEncodable: _toEncodable);
}

dynamic _toEncodable(dynamic object) {
if (object is OperationResult) {
return _operationResultToMap(object);
}
return object;
}

Future<Map<String, dynamic>> _deviceToMap(Device device) async {
return <String, dynamic>{
'id': device.id,
Expand Down Expand Up @@ -970,7 +957,7 @@ dynamic _toJsonable(dynamic obj) {
return obj;
}
if (obj is OperationResult) {
return obj;
return _operationResultToMap(obj);
}
if (obj is ToolExit) {
return obj.message;
Expand Down
7 changes: 5 additions & 2 deletions packages/flutter_tools/lib/src/commands/run.dart
Expand Up @@ -14,6 +14,7 @@ import '../base/common.dart';
import '../base/file_system.dart';
import '../base/utils.dart';
import '../build_info.dart';
import '../daemon.dart';
import '../device.dart';
import '../features.dart';
import '../globals.dart' as globals;
Expand Down Expand Up @@ -556,8 +557,10 @@ class RunCommand extends RunCommandBase {
throwToolExit('"--machine" does not support "-d all".');
}
final Daemon daemon = Daemon(
stdinCommandStream,
stdoutCommandResponse,
DaemonConnection(
daemonStreams: StdioDaemonStreams(),
logger: globals.logger,
),
notifyingLogger: (globals.logger is NotifyingLogger)
? globals.logger as NotifyingLogger
: NotifyingLogger(verbose: globals.logger.isVerbose, parent: globals.logger),
Expand Down

0 comments on commit 759e9cf

Please sign in to comment.