Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: provide a way to listen to progress and logger messages #741

Merged
merged 4 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import 'dart:async';

import 'package:at_utils/at_logger.dart';
import 'package:logging/logging.dart';

class StreamingLoggingHandler implements LoggingHandler {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A new at_logger LoggingHandler so we have a way to provide a stream of log messages

final LoggingHandler _wrappedLoggingHandler;
final StreamController<String> _logSC = StreamController.broadcast();

StreamingLoggingHandler(this._wrappedLoggingHandler);

@override
void call(LogRecord record) {
_wrappedLoggingHandler.call(record);
_logSC.add('${record.level.name}'
'|${record.time}'
'|${record.loggerName}'
'|${record.message}');
}

Stream<String> get stream => _logSC.stream;
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class SshnpDartPureImpl extends SshnpCore
required super.atClient,
required super.params,
required AtSshKeyPair? identityKeyPair,
required super.logStream,
}) {
this.identityKeyPair = identityKeyPair;
_sshnpdChannel = SshnpdDefaultChannel(
Expand Down Expand Up @@ -52,7 +53,9 @@ class SshnpDartPureImpl extends SshnpCore
/// Ensure that sshnp is initialized
await callInitialization();

logger.info('Sending request to sshnpd');
var msg = 'Sending session request to device daemon';
logger.info(msg);
sendProgress(msg);

/// Send an ssh request to sshnpd
await notify(
Expand Down Expand Up @@ -81,14 +84,17 @@ class SshnpDartPureImpl extends SshnpCore
);

/// Wait for a response from sshnpd
sendProgress('Waiting for response from the device daemon');
var acked = await sshnpdChannel.waitForDaemonResponse();
if (acked != SshnpdAck.acknowledged) {
throw SshnpError('sshnpd did not acknowledge the request');
throw SshnpError('No response from the device daemon');
} else {
sendProgress('Received response from the device daemon');
}

if (sshnpdChannel.ephemeralPrivateKey == null) {
throw SshnpError(
'Expected an ephemeral private key from sshnpd, but it was not set',
'Expected an ephemeral private key from device daemon, but it was not set',
);
}

Expand All @@ -102,13 +108,15 @@ class SshnpDartPureImpl extends SshnpCore
await keyUtil.addKeyPair(keyPair: ephemeralKeyPair);

/// Start srv
sendProgress('Creating connection to socket rendezvous');
SSHSocket? sshSocket = await srvdChannel.runSrv(
directSsh: true,
sessionAESKeyString: sshnpdChannel.sessionAESKeyString,
sessionIVString: sshnpdChannel.sessionIVString,
);

/// Start the initial tunnel
sendProgress('Starting tunnel session');
tunnelSshClient = await startInitialTunnelSession(
ephemeralKeyPairIdentifier: ephemeralKeyPair.identifier,
sshSocket: sshSocket,
Expand Down Expand Up @@ -142,9 +150,11 @@ class SshnpDartPureImpl extends SshnpCore
'Cannot execute runShell, tunnel has not yet been created');
}

sendProgress('Starting user session');
SSHClient userSession =
await startUserSession(tunnelSession: tunnelSshClient!);

sendProgress('Starting remote shell');
SSHSession shell = await userSession.shell();

return SSHSessionAsSshnpRemoteProcess(shell);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class SshnpOpensshLocalImpl extends SshnpCore
SshnpOpensshLocalImpl({
required super.atClient,
required super.params,
required super.logStream,
}) {
_sshnpdChannel = SshnpdDefaultChannel(
atClient: atClient,
Expand Down Expand Up @@ -49,7 +50,9 @@ class SshnpOpensshLocalImpl extends SshnpCore
/// Ensure that sshnp is initialized
await callInitialization();

logger.info('Sending request to sshnpd');
var msg = 'Sending session request to device daemon';
logger.info(msg);
sendProgress(msg);

/// Send an ssh request to sshnpd
await notify(
Expand Down Expand Up @@ -78,9 +81,12 @@ class SshnpOpensshLocalImpl extends SshnpCore
);

/// Wait for a response from sshnpd
sendProgress('Waiting for response from the device daemon');
var acked = await sshnpdChannel.waitForDaemonResponse();
if (acked != SshnpdAck.acknowledged) {
throw SshnpError('sshnpd did not acknowledge the request');
throw SshnpError('No response from the device daemon');
} else {
sendProgress('Received response from the device daemon');
}

if (sshnpdChannel.ephemeralPrivateKey == null) {
Expand All @@ -95,6 +101,7 @@ class SshnpOpensshLocalImpl extends SshnpCore
await server.close();

/// Start srv
sendProgress('Creating connection to socket rendezvous');
await srvdChannel.runSrv(
directSsh: true,
localRvPort: localRvPort,
Expand All @@ -113,6 +120,7 @@ class SshnpOpensshLocalImpl extends SshnpCore
await keyUtil.addKeyPair(keyPair: ephemeralKeyPair);

/// Start the initial tunnel
sendProgress('Starting tunnel session');
Process? bean = await startInitialTunnelSession(
ephemeralKeyPairIdentifier: ephemeralKeyPair.identifier,
localRvPort: localRvPort,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ class SshnpUnsignedImpl extends SshnpCore
SshnpUnsignedImpl({
required super.atClient,
required super.params,
required super.logStream,
}) {
if (Platform.isWindows) {
throw SshnpError(
Expand Down Expand Up @@ -90,7 +91,7 @@ class SshnpUnsignedImpl extends SshnpCore
..sharedBy = params.clientAtSign
..sharedWith = params.sshnpdAtSign
..metadata = (Metadata()..ttl = 10000),
'$localPort ${srvdChannel.clientPort} ${keyUtil.username} ${srvdChannel.host} $sessionId',
'$localPort ${srvdChannel.daemonPort} ${keyUtil.username} ${srvdChannel.host} $sessionId',
Copy link
Contributor Author

@gkc gkc Jan 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made the legacy impl code consistent with the current impls (dart, openssh)

checkForFinalDeliveryStatus: false,
waitForFinalDeliveryStatus: false,
);
Expand Down
29 changes: 26 additions & 3 deletions packages/dart/noports_core/lib/src/sshnp/sshnp.dart
Original file line number Diff line number Diff line change
@@ -1,32 +1,44 @@
import 'dart:async';

import 'package:at_client/at_client.dart' hide StringBuffer;
import 'package:at_utils/at_logger.dart';
import 'package:noports_core/src/common/streaming_logging_handler.dart';
import 'package:noports_core/sshnp_foundation.dart';

abstract interface class SshnpRemoteProcess {
Future<void> get done;

Stream<List<int>> get stderr;

StreamSink<List<int>> get stdin;

Stream<List<int>> get stdout;
}

abstract interface class Sshnp {
static final StreamingLoggingHandler _slh =
StreamingLoggingHandler(AtSignLogger.defaultLoggingHandler);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make one of our StreamingLoggingHandlers, and wrap the current defaultLoggingHandler in it


/// Legacy v3.x.x client
@Deprecated(
'Legacy unsigned client - only for connecting with ^3.0.0 daemons')
factory Sshnp.unsigned({
required AtClient atClient,
required SshnpParams params,
}) {
return SshnpUnsignedImpl(atClient: atClient, params: params);
AtSignLogger.defaultLoggingHandler = _slh;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set the defaultLoggingHandler to our StreamingLoggingHandler

return SshnpUnsignedImpl(
atClient: atClient, params: params, logStream: _slh.stream);
}

/// Think of this as the "default" client - calls openssh
factory Sshnp.openssh({
required AtClient atClient,
required SshnpParams params,
}) {
return SshnpOpensshLocalImpl(atClient: atClient, params: params);
AtSignLogger.defaultLoggingHandler = _slh;
return SshnpOpensshLocalImpl(
atClient: atClient, params: params, logStream: _slh.stream);
}

/// Uses a dartssh2 ssh client - requires that you pass in the identity keypair
Expand All @@ -35,8 +47,12 @@ abstract interface class Sshnp {
required SshnpParams params,
required AtSshKeyPair? identityKeyPair,
}) {
AtSignLogger.defaultLoggingHandler = _slh;
var sshnp = SshnpDartPureImpl(
atClient: atClient, params: params, identityKeyPair: identityKeyPair);
atClient: atClient,
params: params,
identityKeyPair: identityKeyPair,
logStream: _slh.stream);
if (identityKeyPair != null) {
sshnp.keyUtil.addKeyPair(keyPair: identityKeyPair);
}
Expand Down Expand Up @@ -72,4 +88,11 @@ abstract interface class Sshnp {
/// - Iterable<String> of atSigns of sshnpd that did not respond
/// - Map<String, dynamic> where the keys are all atSigns included in the maps, and the values being their device info
Future<SshnpDeviceList> listDevices();

/// Yields a string every time something interesting happens with regards to
/// progress towards establishing the ssh connection.
Stream<String>? get progressStream;

/// Yields every log message that is written to [stderr]
Stream<String>? get logStream;
XavierChanth marked this conversation as resolved.
Show resolved Hide resolved
}
32 changes: 30 additions & 2 deletions packages/dart/noports_core/lib/src/sshnp/sshnp_core.dart
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,27 @@ abstract class SshnpCore
@protected
SshnpdChannel get sshnpdChannel;

final StreamController<String> _progressStreamController =
StreamController<String>.broadcast();

/// Yields a string every time something interesting happens with regards to
/// progress towards establishing the ssh connection.
@override
Stream<String>? get progressStream => _progressStreamController.stream;

/// Yields every log message that is written to [stderr]
@override
final Stream<String>? logStream;

/// Subclasses should use this method to generate progress messages
sendProgress(String message) {
_progressStreamController.add(message);
}

SshnpCore({
required this.atClient,
required this.params,
this.logStream,
}) : sessionId = Uuid().v4(),
namespace = '${params.device}.${DefaultArgs.namespace}',
localPort = params.localPort {
Expand All @@ -80,6 +98,7 @@ abstract class SshnpCore
@mustCallSuper
Future<void> initialize() async {
if (!isSafeToInitialize) return;

logger.info('Initializing SshnpCore');

/// Start the sshnpd payload handler
Expand All @@ -88,11 +107,14 @@ abstract class SshnpCore
if (params.discoverDaemonFeatures) {
late Map<String, dynamic> pingResponse;
try {
sendProgress('Pinging daemon to discover features');
pingResponse =
await sshnpdChannel.ping().timeout(Duration(seconds: 10));
} catch (e) {
logger.severe(
'No ping response from ${params.device}${params.sshnpdAtSign}');
var msg =
'No ping response from ${params.device}${params.sshnpdAtSign}';
sendProgress(msg);
logger.severe(msg);
rethrow;
}

Expand All @@ -112,16 +134,22 @@ abstract class SshnpCore
}

/// Set the remote username to use for the ssh session
sendProgress('Resolving remote username for user session');
remoteUsername = await sshnpdChannel.resolveRemoteUsername();

/// Set the username to use for the initial ssh tunnel
sendProgress('Resolving remote username for tunnel session');
tunnelUsername = await sshnpdChannel.resolveTunnelUsername(
remoteUsername: remoteUsername);

/// Shares the public key if required
if (params.sendSshPublicKey) {
sendProgress('Sharing ssh public key');
}
await sshnpdChannel.sharePublicKeyIfRequired(identityKeyPair);

/// Retrieve the srvd host and port pair
sendProgress('Fetching host and port from srvd');
await srvdChannel.callInitialization();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ abstract class SrvdChannel<T> with AsyncInitialization, AtClientBindings {
if (params.host.startsWith('@')) {
srv = srvGenerator(
host,
daemonPort!, // everything was backwards back then
clientPort,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made the legacy impl code consistent with the current impls (dart, openssh)

localPort: params.localSshdPort,
bindLocalPort: false,
);
Expand Down Expand Up @@ -212,7 +212,7 @@ abstract class SrvdChannel<T> with AsyncInitialization, AtClientBindings {
counter++;
if (counter > 150) {
logger.warning('Timed out waiting for srvd response');
throw ('Connection timeout to srvd $host service\nhint: make sure host is valid and online');
throw ('Connection timeout to srvd $host service');
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ class SshClientHelper {
// Ensure we are connected and authenticated correctly
logger.info('awaiting SSHClient.ping');
await client.ping().catchError((e) => throw e);
logger.info('SSHClient.ping complete');
} catch (e, s) {
throw SshnpError(
'Failed to authenticate as $username@$host:$port : $e',
Expand Down
26 changes: 19 additions & 7 deletions packages/dart/sshnoports/bin/sshnp.dart
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,17 @@ void main(List<String> args) async {
throw e;
});

// A listen progress listener for the CLI
// Will only log if verbose is false, since if verbose is true
// there will already be a boatload of log messages
logProgress(String s) {
if (!(params?.verbose ?? true)) {
stderr.writeln('${DateTime.now()} : $s');
}
}

sshnp.progressStream?.listen((s) => logProgress(s));

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a simple progress listener into the sshnp CLI so that, if run in non-verbose mode, the user will get enough output to keep them from wondering what is going on and why. (If run in verbose mode, a normal user gets too much information)

if (params.listDevices) {
stderr.writeln('Searching for devices...');
var deviceList = await sshnp.listDevices();
Expand Down Expand Up @@ -114,6 +125,7 @@ void main(List<String> args) async {
stdout.write('$res\n');
exit(0);
} else {
logProgress('Starting user session');
Process process = await Process.start(
res.command,
res.args,
Expand All @@ -130,22 +142,22 @@ void main(List<String> args) async {
printUsage(error: error);
exit(1);
} on SshnpError catch (error, stackTrace) {
stderr.writeln(error.toString());
stderr.writeln('\nError : $error');
if (params?.verbose ?? true) {
stderr.writeln('\nStack Trace: ${stackTrace.toString()}');
stderr.writeln('\nStack Trace: $stackTrace');
}
exit(1);
} catch (error, stackTrace) {
stderr.writeln(error.toString());
stderr.writeln('\nStack Trace: ${stackTrace.toString()}');
stderr.writeln('\nError : $error');
stderr.writeln('\nStack Trace: $stackTrace');
exit(1);
}
}, (Object error, StackTrace stackTrace) async {
if (error is SSHError) {
stderr.writeln('\nError: ${error.toString()}');
stderr.writeln('\n\nError: $error');
} else {
stderr.writeln('Error: ${error.toString()}');
stderr.writeln('\nStack Trace: ${stackTrace.toString()}');
stderr.writeln('\nError: $error');
stderr.writeln('\nStack Trace: $stackTrace');
}
exit(1);
});
Expand Down