Skip to content

Commit

Permalink
Merge pull request #741 from atsign-foundation/gkc/feat/progress-and-…
Browse files Browse the repository at this point in the history
…logger-listeners

feat: provide a way to listen to progress and logger messages
  • Loading branch information
XavierChanth committed Jan 26, 2024
2 parents b803642 + b5f55ae commit ac23723
Show file tree
Hide file tree
Showing 13 changed files with 129 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import 'dart:async';

import 'package:at_utils/at_logger.dart';
import 'package:logging/logging.dart';

class StreamingLoggingHandler implements LoggingHandler {
final LoggingHandler _wrappedLoggingHandler;
final StreamController<String> _logSC = StreamController.broadcast();

StreamingLoggingHandler(this._wrappedLoggingHandler);

@override
void call(LogRecord record) {
_wrappedLoggingHandler.call(record);
_logSC.add('${record.level.name}'
'|${record.time}'
'|${record.loggerName}'
'|${record.message}');
}

Stream<String> get stream => _logSC.stream;
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class SshnpDartPureImpl extends SshnpCore
required super.atClient,
required super.params,
required AtSshKeyPair? identityKeyPair,
required super.logStream,
}) {
this.identityKeyPair = identityKeyPair;
_sshnpdChannel = SshnpdDefaultChannel(
Expand Down Expand Up @@ -52,7 +53,9 @@ class SshnpDartPureImpl extends SshnpCore
/// Ensure that sshnp is initialized
await callInitialization();

logger.info('Sending request to sshnpd');
var msg = 'Sending session request to the device daemon';
logger.info(msg);
sendProgress(msg);

/// Send an ssh request to sshnpd
await notify(
Expand Down Expand Up @@ -81,14 +84,17 @@ class SshnpDartPureImpl extends SshnpCore
);

/// Wait for a response from sshnpd
sendProgress('Waiting for response from the device daemon');
var acked = await sshnpdChannel.waitForDaemonResponse();
if (acked != SshnpdAck.acknowledged) {
throw SshnpError('sshnpd did not acknowledge the request');
throw SshnpError('No response from the device daemon');
} else {
sendProgress('Received response from the device daemon');
}

if (sshnpdChannel.ephemeralPrivateKey == null) {
throw SshnpError(
'Expected an ephemeral private key from sshnpd, but it was not set',
'Expected an ephemeral private key from device daemon, but it was not set',
);
}

Expand All @@ -102,13 +108,15 @@ class SshnpDartPureImpl extends SshnpCore
await keyUtil.addKeyPair(keyPair: ephemeralKeyPair);

/// Start srv
sendProgress('Creating connection to socket rendezvous');
SSHSocket? sshSocket = await srvdChannel.runSrv(
directSsh: true,
sessionAESKeyString: sshnpdChannel.sessionAESKeyString,
sessionIVString: sshnpdChannel.sessionIVString,
);

/// Start the initial tunnel
sendProgress('Starting tunnel session');
tunnelSshClient = await startInitialTunnelSession(
ephemeralKeyPairIdentifier: ephemeralKeyPair.identifier,
sshSocket: sshSocket,
Expand Down Expand Up @@ -142,9 +150,11 @@ class SshnpDartPureImpl extends SshnpCore
'Cannot execute runShell, tunnel has not yet been created');
}

sendProgress('Starting user session');
SSHClient userSession =
await startUserSession(tunnelSession: tunnelSshClient!);

sendProgress('Starting remote shell');
SSHSession shell = await userSession.shell();

return SSHSessionAsSshnpRemoteProcess(shell);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class SshnpOpensshLocalImpl extends SshnpCore
SshnpOpensshLocalImpl({
required super.atClient,
required super.params,
required super.logStream,
}) {
_sshnpdChannel = SshnpdDefaultChannel(
atClient: atClient,
Expand Down Expand Up @@ -49,7 +50,9 @@ class SshnpOpensshLocalImpl extends SshnpCore
/// Ensure that sshnp is initialized
await callInitialization();

logger.info('Sending request to sshnpd');
var msg = 'Sending session request to the device daemon';
logger.info(msg);
sendProgress(msg);

/// Send an ssh request to sshnpd
await notify(
Expand Down Expand Up @@ -78,9 +81,12 @@ class SshnpOpensshLocalImpl extends SshnpCore
);

/// Wait for a response from sshnpd
sendProgress('Waiting for response from the device daemon');
var acked = await sshnpdChannel.waitForDaemonResponse();
if (acked != SshnpdAck.acknowledged) {
throw SshnpError('sshnpd did not acknowledge the request');
throw SshnpError('No response from the device daemon');
} else {
sendProgress('Received response from the device daemon');
}

if (sshnpdChannel.ephemeralPrivateKey == null) {
Expand All @@ -95,6 +101,7 @@ class SshnpOpensshLocalImpl extends SshnpCore
await server.close();

/// Start srv
sendProgress('Creating connection to socket rendezvous');
await srvdChannel.runSrv(
directSsh: true,
localRvPort: localRvPort,
Expand All @@ -113,6 +120,7 @@ class SshnpOpensshLocalImpl extends SshnpCore
await keyUtil.addKeyPair(keyPair: ephemeralKeyPair);

/// Start the initial tunnel
sendProgress('Starting tunnel session');
Process? bean = await startInitialTunnelSession(
ephemeralKeyPairIdentifier: ephemeralKeyPair.identifier,
localRvPort: localRvPort,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ class SshnpUnsignedImpl extends SshnpCore
SshnpUnsignedImpl({
required super.atClient,
required super.params,
required super.logStream,
}) {
if (Platform.isWindows) {
throw SshnpError(
Expand Down Expand Up @@ -90,7 +91,7 @@ class SshnpUnsignedImpl extends SshnpCore
..sharedBy = params.clientAtSign
..sharedWith = params.sshnpdAtSign
..metadata = (Metadata()..ttl = 10000),
'$localPort ${srvdChannel.clientPort} ${keyUtil.username} ${srvdChannel.host} $sessionId',
'$localPort ${srvdChannel.daemonPort} ${keyUtil.username} ${srvdChannel.host} $sessionId',
checkForFinalDeliveryStatus: false,
waitForFinalDeliveryStatus: false,
);
Expand Down
29 changes: 26 additions & 3 deletions packages/dart/noports_core/lib/src/sshnp/sshnp.dart
Original file line number Diff line number Diff line change
@@ -1,32 +1,44 @@
import 'dart:async';

import 'package:at_client/at_client.dart' hide StringBuffer;
import 'package:at_utils/at_logger.dart';
import 'package:noports_core/src/common/streaming_logging_handler.dart';
import 'package:noports_core/sshnp_foundation.dart';

abstract interface class SshnpRemoteProcess {
Future<void> get done;

Stream<List<int>> get stderr;

StreamSink<List<int>> get stdin;

Stream<List<int>> get stdout;
}

abstract interface class Sshnp {
static final StreamingLoggingHandler _slh =
StreamingLoggingHandler(AtSignLogger.defaultLoggingHandler);

/// Legacy v3.x.x client
@Deprecated(
'Legacy unsigned client - only for connecting with ^3.0.0 daemons')
factory Sshnp.unsigned({
required AtClient atClient,
required SshnpParams params,
}) {
return SshnpUnsignedImpl(atClient: atClient, params: params);
AtSignLogger.defaultLoggingHandler = _slh;
return SshnpUnsignedImpl(
atClient: atClient, params: params, logStream: _slh.stream);
}

/// Think of this as the "default" client - calls openssh
factory Sshnp.openssh({
required AtClient atClient,
required SshnpParams params,
}) {
return SshnpOpensshLocalImpl(atClient: atClient, params: params);
AtSignLogger.defaultLoggingHandler = _slh;
return SshnpOpensshLocalImpl(
atClient: atClient, params: params, logStream: _slh.stream);
}

/// Uses a dartssh2 ssh client - requires that you pass in the identity keypair
Expand All @@ -35,8 +47,12 @@ abstract interface class Sshnp {
required SshnpParams params,
required AtSshKeyPair? identityKeyPair,
}) {
AtSignLogger.defaultLoggingHandler = _slh;
var sshnp = SshnpDartPureImpl(
atClient: atClient, params: params, identityKeyPair: identityKeyPair);
atClient: atClient,
params: params,
identityKeyPair: identityKeyPair,
logStream: _slh.stream);
if (identityKeyPair != null) {
sshnp.keyUtil.addKeyPair(keyPair: identityKeyPair);
}
Expand Down Expand Up @@ -72,4 +88,11 @@ abstract interface class Sshnp {
/// - Iterable<String> of atSigns of sshnpd that did not respond
/// - Map<String, dynamic> where the keys are all atSigns included in the maps, and the values being their device info
Future<SshnpDeviceList> listDevices();

/// Yields a string every time something interesting happens with regards to
/// progress towards establishing the ssh connection.
Stream<String>? get progressStream;

/// Yields every log message that is written to [stderr]
Stream<String>? get logStream;
}
32 changes: 30 additions & 2 deletions packages/dart/noports_core/lib/src/sshnp/sshnp_core.dart
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,27 @@ abstract class SshnpCore
@protected
SshnpdChannel get sshnpdChannel;

final StreamController<String> _progressStreamController =
StreamController<String>.broadcast();

/// Yields a string every time something interesting happens with regards to
/// progress towards establishing the ssh connection.
@override
Stream<String>? get progressStream => _progressStreamController.stream;

/// Yields every log message that is written to [stderr]
@override
final Stream<String>? logStream;

/// Subclasses should use this method to generate progress messages
sendProgress(String message) {
_progressStreamController.add(message);
}

SshnpCore({
required this.atClient,
required this.params,
this.logStream,
}) : sessionId = Uuid().v4(),
namespace = '${params.device}.${DefaultArgs.namespace}',
localPort = params.localPort {
Expand All @@ -80,6 +98,7 @@ abstract class SshnpCore
@mustCallSuper
Future<void> initialize() async {
if (!isSafeToInitialize) return;

logger.info('Initializing SshnpCore');

/// Start the sshnpd payload handler
Expand All @@ -88,11 +107,14 @@ abstract class SshnpCore
if (params.discoverDaemonFeatures) {
late Map<String, dynamic> pingResponse;
try {
sendProgress('Pinging daemon to discover features');
pingResponse =
await sshnpdChannel.ping().timeout(Duration(seconds: 10));
} catch (e) {
logger.severe(
'No ping response from ${params.device}${params.sshnpdAtSign}');
var msg =
'No ping response from ${params.device}${params.sshnpdAtSign}';
sendProgress(msg);
logger.severe(msg);
rethrow;
}

Expand All @@ -112,16 +134,22 @@ abstract class SshnpCore
}

/// Set the remote username to use for the ssh session
sendProgress('Resolving remote username for user session');
remoteUsername = await sshnpdChannel.resolveRemoteUsername();

/// Set the username to use for the initial ssh tunnel
sendProgress('Resolving remote username for tunnel session');
tunnelUsername = await sshnpdChannel.resolveTunnelUsername(
remoteUsername: remoteUsername);

/// Shares the public key if required
if (params.sendSshPublicKey) {
sendProgress('Sharing ssh public key');
}
await sshnpdChannel.sharePublicKeyIfRequired(identityKeyPair);

/// Retrieve the srvd host and port pair
sendProgress('Fetching host and port from srvd');
await srvdChannel.callInitialization();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ abstract class SrvdChannel<T> with AsyncInitialization, AtClientBindings {
if (params.host.startsWith('@')) {
srv = srvGenerator(
host,
daemonPort!, // everything was backwards back then
clientPort,
localPort: params.localSshdPort,
bindLocalPort: false,
);
Expand Down Expand Up @@ -212,7 +212,7 @@ abstract class SrvdChannel<T> with AsyncInitialization, AtClientBindings {
counter++;
if (counter > 150) {
logger.warning('Timed out waiting for srvd response');
throw ('Connection timeout to srvd $host service\nhint: make sure host is valid and online');
throw ('Connection timeout to srvd $host service');
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ class SshClientHelper {
// Ensure we are connected and authenticated correctly
logger.info('awaiting SSHClient.ping');
await client.ping().catchError((e) => throw e);
logger.info('SSHClient.ping complete');
} catch (e, s) {
throw SshnpError(
'Failed to authenticate as $username@$host:$port : $e',
Expand Down
1 change: 1 addition & 0 deletions packages/dart/noports_core/test/sshnp/sshnp_core_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ void main() {
when(() => mockParams.localPort).thenReturn(0);
when(() => mockParams.verbose).thenReturn(verbose);
when(() => mockParams.discoverDaemonFeatures).thenReturn(false);
when(() => mockParams.sendSshPublicKey).thenReturn(false);
when(() => mockAtClient.getPreferences()).thenReturn(null);
when(() => mockAtClient.setPreferences(any())).thenReturn(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ void main() {
when(() => mockParams.authenticateClientToRvd).thenReturn(true);
when(() => mockParams.encryptRvdTraffic).thenReturn(true);
when(() => mockParams.discoverDaemonFeatures).thenReturn(false);
when(() => mockParams.sendSshPublicKey).thenReturn(false);

when(subscribeInvocation)
.thenAnswer((_) => notificationStreamController.stream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ void main() {
when(() => mockParams.localPort).thenReturn(0);
when(() => mockParams.verbose).thenReturn(false);
when(() => mockParams.discoverDaemonFeatures).thenReturn(false);
when(() => mockParams.sendSshPublicKey).thenReturn(false);
when(() => mockAtClient.getPreferences()).thenReturn(null);
when(() => mockAtClient.setPreferences(any())).thenReturn(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ void main() {
when(() => mockParams.authenticateClientToRvd).thenReturn(true);
when(() => mockParams.encryptRvdTraffic).thenReturn(true);
when(() => mockParams.discoverDaemonFeatures).thenReturn(false);
when(() => mockParams.sendSshPublicKey).thenReturn(false);
when(subscribeInvocation)
.thenAnswer((_) => notificationStreamController.stream);
}
Expand Down

0 comments on commit ac23723

Please sign in to comment.