Skip to content

Commit

Permalink
Use streams for subscribing to signals.
Browse files Browse the repository at this point in the history
Fixes #81
  • Loading branch information
robert-ancell committed Sep 14, 2020
1 parent f5ae86c commit 1e398df
Show file tree
Hide file tree
Showing 6 changed files with 229 additions and 229 deletions.
63 changes: 45 additions & 18 deletions bin/dart-dbus.dart
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,7 @@ String generateRemoteObjectClass(DBusIntrospectNode node) {
return null;
}

var classes = <String>[];
var methods = <String>[];

for (var interface in node.interfaces) {
Expand All @@ -495,7 +496,9 @@ String generateRemoteObjectClass(DBusIntrospectNode node) {
}

for (var signal in interface.signals) {
methods.add(generateRemoteSignalSubscription(interface, signal));
classes.add(generateRemoteSignalClass(className, interface, signal));
methods
.add(generateRemoteSignalSubscription(className, interface, signal));
}
}

Expand All @@ -506,8 +509,9 @@ String generateRemoteObjectClass(DBusIntrospectNode node) {
source += '\n';
source += methods.join('\n');
source += '}\n';
classes.add(source);

return source;
return classes.join('\n');
}

/// Generate methods for the remote [property].
Expand Down Expand Up @@ -607,40 +611,63 @@ String generateRemoteMethodCall(
return source;
}

/// Generates a class to contain a signal response.
String generateRemoteSignalClass(String classPrefix,
DBusIntrospectInterface interface, DBusIntrospectSignal signal) {
var properties = <String>[];
var params = <String>[];
var index = 0;
for (var arg in signal.args) {
var type = getDartType(arg.type);
var argName = arg.name ?? 'arg_${index}';
var valueName = 'values[${index}]';
var convertedValue = type.dbusToNative(valueName);
properties
.add(' ${type.nativeType} get ${argName} => ${convertedValue};\n');
params.add('this.${argName}');
index++;
}

var source = '';
source += '/// Signal data for ${interface.name}.${signal.name}.\n';
source += 'class ${classPrefix}${signal.name} extends DBusSignal{\n';
source += properties.join();
source += '\n';
source +=
' ${classPrefix}${signal.name}(DBusSignal signal) : super(signal.sender, signal.path, signal.interface, signal.member, signal.values);\n';
source += '}\n';

return source;
}

/// Generates a method to subscribe to a signal.
String generateRemoteSignalSubscription(
String generateRemoteSignalSubscription(String classPrefix,
DBusIntrospectInterface interface, DBusIntrospectSignal signal) {
var argValues = <String>[];
var argsList = <String>[];
var index = 0;
var valueChecks = <String>[];
if (signal.args.isEmpty) {
valueChecks.add('values.isEmpty');
valueChecks.add('signal.values.isEmpty');
} else {
valueChecks.add('values.length == ${signal.args.length}');
valueChecks.add('signal.values.length == ${signal.args.length}');
}
for (var arg in signal.args) {
var type = getDartType(arg.type);
var argName = arg.name ?? 'arg_${index}';
argsList.add('${type.nativeType} ${argName}');
var valueName = 'values[${index}]';
var valueName = 'signal.values[${index}]';
valueChecks
.add("${valueName}.signature == DBusSignature('${arg.type.value}')");
var convertedValue = type.dbusToNative(valueName);
argValues.add(convertedValue);
index++;
}

var source = '';
source += ' /// Subscribes to ${interface.name}.${signal.name}\n';
source += ' /// Subscribes to ${interface.name}.${signal.name}.\n';
source +=
' Future<DBusSignalSubscription> subscribe${signal.name}(void Function(${argsList.join(', ')}) callback) async {\n';
' Stream<${classPrefix}${signal.name}> subscribe${signal.name}() async* {\n';
source +=
" return await subscribeSignal('${interface.name}', '${signal.name}', (values) {\n";
" var signals = await subscribeSignal('${interface.name}', '${signal.name}');\n";
source += ' await for (var signal in signals) {\n';
source += ' if (${valueChecks.join(' && ')}) {\n';
source += ' callback(${argValues.join(', ')});\n';
source += ' yield ${classPrefix}${signal.name}(signal);\n';
source += ' }\n';
source += ' });\n';
source += ' }\n';
source += ' }\n';

return source;
Expand Down
39 changes: 14 additions & 25 deletions example/object_manager.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,20 @@ void main() async {
var object = DBusRemoteObject(client, 'org.freedesktop.NetworkManager',
DBusObjectPath('/org/freedesktop'));

await object.subscribeObjectManagerSignals(
interfacesAddedCallback: interfacesAdded,
interfacesRemovedCallback: interfacesRemoved,
propertiesChangedCallback: propertiesChanged);
object.subscribeObjectManagerSignals().listen((signal) {
if (signal is DBusObjectManagerInterfacesAddedSignal) {
print('${signal.changedPath.value}');
printInterfacesAndProperties(signal.interfacesAndProperties);
} else if (signal is DBusObjectManagerInterfacesRemovedSignal) {
for (var interface in signal.interfaces) {
print('${signal.changedPath.value} removed interfaces ${interface}');
}
} else if (signal is DBusPropertiesChangedSignal) {
print('${signal.path.value}');
printInterfacesAndProperties(
{signal.propertiesInterface: signal.changedProperties});
}
});

var objects = await object.getManagedObjects();
objects.forEach((objectPath, interfacesAndProperties) {
Expand All @@ -17,27 +27,6 @@ void main() async {
});
}

void interfacesAdded(DBusObjectPath objectPath,
Map<String, Map<String, DBusValue>> interfacesAndProperties) {
print('${objectPath.value}');
printInterfacesAndProperties(interfacesAndProperties);
}

void interfacesRemoved(DBusObjectPath objectPath, List<String> interfaces) {
for (var interface in interfaces) {
print('${objectPath.value} removed interfaces ${interface}');
}
}

void propertiesChanged(
DBusObjectPath objectPath,
String interfaceName,
Map<String, DBusValue> changedProperties,
List<String> invalidatedProperties) {
print('${objectPath.value}');
printInterfacesAndProperties({interfaceName: changedProperties});
}

void printInterfacesAndProperties(
Map<String, Map<String, DBusValue>> interfacesAndProperties) {
interfacesAndProperties.forEach((interface, properties) {
Expand Down
5 changes: 2 additions & 3 deletions example/properties.dart
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ void main() async {
print('${address.toNative()}');
}

await object.subscribePropertiesChanged(
(interface, changedProperties, invalidatedProperties) {
properties.forEach((name, value) {
await object.subscribePropertiesChanged().listen((signal) {
signal.changedProperties.forEach((name, value) {
print('${name}: ${value.toNative()}');
});
});
Expand Down
8 changes: 5 additions & 3 deletions example/signals.dart
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ void main(List<String> args) async {
if (mode == 'client') {
var object = DBusRemoteObject(client, 'com.canonical.DBusDart',
DBusObjectPath('/com/canonical/DBusDart'));
await object.subscribeSignal('com.canonical.DBusDart', 'Ping', (values) {
var count = (values[0] as DBusUint64).value;
var signals =
await object.subscribeSignal('com.canonical.DBusDart', 'Ping');
await for (var signal in signals) {
var count = (signal.values[0] as DBusUint64).value;
print('Ping ${count}!');
});
}
} else if (mode == 'server') {
await client.requestName('com.canonical.DBusDart');
var object = TestObject();
Expand Down
124 changes: 58 additions & 66 deletions lib/src/dbus_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -18,39 +18,61 @@ import 'getuid.dart';
// FIXME: Use more efficient data store than List<int>?
// FIXME: Use ByteData more efficiently - don't copy when reading/writing

typedef SignalCallback = Function(DBusObjectPath path, String interface,
String member, List<DBusValue> values);

class _MethodCall {
int serial;
var completer = Completer<DBusMethodResponse>();

_MethodCall(this.serial);
}

/// A subscription to D-Bus signals.
class DBusSignalSubscription {
/// Senders sibscribed to.
/// A signal received from a client.
class DBusSignal {
/// Client that sent the signal.
final String sender;

/// Interface subscribed to.
/// Path of the object emitting the signal.
final DBusObjectPath path;

/// Interface emitting the signal.
final String interface;

/// Member subscribed to.
/// Signal name;
final String member;

/// Path subscribed to.
final DBusObjectPath path;
/// Values associated with the signal.
final List<DBusValue> values;

const DBusSignal(
this.sender, this.path, this.interface, this.member, this.values);
}

/// Root path subscribed to.
class _DBusSignalSubscription {
final DBusClient client;
final String sender;
final String interface;
final String member;
final DBusObjectPath path;
final DBusObjectPath pathNamespace;
StreamController controller;

/// Function called when signals are received.
SignalCallback callback;
Stream<DBusSignal> get stream => controller.stream;

/// Creates
DBusSignalSubscription(this.sender, this.interface, this.member, this.path,
this.pathNamespace, this.callback);
_DBusSignalSubscription(this.client, this.sender, this.interface, this.member,
this.path, this.pathNamespace) {
controller =
StreamController<DBusSignal>(onListen: onListen, onCancel: onCancel);
}

void onListen() {
client._addMatch(client._makeMatchRule(
'signal', sender, interface, member, path, pathNamespace));
}

Future onCancel() async {
await client._removeMatch(client._makeMatchRule(
'signal', sender, interface, member, path, pathNamespace));
client._signalSubscriptions.remove(this);
}
}

/// A client connection to a D-Bus server.
Expand All @@ -62,7 +84,7 @@ class DBusClient {
Completer _connectCompleter;
var _lastSerial = 0;
final _methodCalls = <_MethodCall>[];
final _signalSubscriptions = <DBusSignalSubscription>[];
final _signalSubscriptions = <_DBusSignalSubscription>[];
final _objectTree = DBusObjectTree();
final _matchRules = <String, int>{};
final _ownedNames = <String, String>{};
Expand Down Expand Up @@ -254,30 +276,16 @@ class DBusClient {
return await _callMethod(destination, path, interface, member, values);
}

/// Subscribe to signals on the D-Bus and call [callback] when one is received.
///
/// Setting [sender], [interface], [member] or [path] will filter signals that match the given values.
///
/// When the subscription is no longer needed call [unsubscribeSignals].
Future<DBusSignalSubscription> subscribeSignals(
SignalCallback callback, {
/// Subscribe to signals that match [sender], [interface], [member], [path] and/or [pathNamespace].
Stream<DBusSignal> subscribeSignals({
String sender,
String interface,
String member,
DBusObjectPath path,
DBusObjectPath pathNamespace,
}) async {
var subscription = DBusSignalSubscription(
sender, interface, member, path, pathNamespace, callback);

// Update match rules on the D-Bus server.
await _addMatch(_makeMatchRule(
'signal',
subscription.sender,
subscription.interface,
subscription.member,
subscription.path,
subscription.pathNamespace));
}) async* {
var subscription = _DBusSignalSubscription(
this, sender, interface, member, path, pathNamespace);

// Get the unique name of the sender (as this is the name the messages will use).
if (sender != null && !_ownedNames.containsValue(sender)) {
Expand All @@ -289,25 +297,9 @@ class DBusClient {

_signalSubscriptions.add(subscription);

return subscription;
}

/// Unsubscribe a [subscription] previously set using [subscribeSignals].
void unsubscribeSignals(DBusSignalSubscription subscription) async {
if (!_signalSubscriptions.contains(subscription)) {
throw 'Attempted to remove unknown signal subscription';
await for (var signal in subscription.stream) {
yield signal;
}

/// Unsubscribe on the server
await _removeMatch(_makeMatchRule(
'signal',
subscription.sender,
subscription.interface,
subscription.member,
subscription.path,
subscription.pathNamespace));

_signalSubscriptions.remove(subscription);
}

/// Emits a signal from a D-Bus object.
Expand Down Expand Up @@ -400,24 +392,24 @@ class DBusClient {
_connectCompleter.complete();

// Listen for name ownership changes - we need these to match incoming signals.
await subscribeSignals(_handleNameOwnerChanged,
var signals = await subscribeSignals(
sender: 'org.freedesktop.DBus',
interface: 'org.freedesktop.DBus',
member: 'NameOwnerChanged');
signals.listen(_handleNameOwnerChanged);
}

/// Handles the org.freedesktop.DBus.NameOwnerChanged signal and updates the table of known names.
void _handleNameOwnerChanged(DBusObjectPath path, String interface,
String member, List<DBusValue> values) {
if (values.length != 3 ||
values[0].signature != DBusSignature('s') ||
values[1].signature != DBusSignature('s') ||
values[2].signature != DBusSignature('s')) {
throw 'AddMatch returned invalid result: ${values}';
void _handleNameOwnerChanged(DBusSignal signal) {
if (signal.values.length != 3 ||
signal.values[0].signature != DBusSignature('s') ||
signal.values[1].signature != DBusSignature('s') ||
signal.values[2].signature != DBusSignature('s')) {
throw 'AddMatch returned invalid result: ${signal.values}';
}

var name = (values[0] as DBusString).value;
var newOwner = (values[2] as DBusString).value;
var name = (signal.values[0] as DBusString).value;
var newOwner = (signal.values[2] as DBusString).value;
if (newOwner != '') {
_ownedNames[name] = newOwner;
} else {
Expand Down Expand Up @@ -624,8 +616,8 @@ class DBusClient {
continue;
}

subscription.callback(
message.path, message.interface, message.member, message.values);
subscription.controller.add(DBusSignal(message.sender, message.path,
message.interface, message.member, message.values));
}
}

Expand Down
Loading

0 comments on commit 1e398df

Please sign in to comment.