Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Signal Streams aren't broadcast by default #293

Closed
Taha-Firoz opened this issue Oct 27, 2021 · 9 comments
Closed

Signal Streams aren't broadcast by default #293

Taha-Firoz opened this issue Oct 27, 2021 · 9 comments

Comments

@Taha-Firoz
Copy link
Contributor

Signals streams aren't generated as broadcast by default, this causes a runtime error when the stream is listened to again after a widget is destroyed and rebuilt.

This is the code that gets generated as of now, but it crashes if a widget is rebuilt and the stream is listened to again

somesignal = DBusRemoteObjectSignalStream(
            object: this,
            interface: 'com.foo.bar',
            name: 'somesignal',
            signature: DBusSignature('a(ss)'))
        .map((signal) =>
            OrgFreedesktopDBusIntrospectablesomesignal(signal));

The crashing doesn't occur here since the stream is now built as a broadcast stream and can be listened to multiple times.

somesignal = DBusRemoteObjectSignalStream(
            object: this,
            interface: 'com.foo.bar',
            name: 'somesignal',
            signature: DBusSignature('a(ss)'))
        .asBroadcastStream()
        .map((signal) =>
            OrgFreedesktopDBusIntrospectablesomesignal(signal));
@robert-ancell
Copy link
Collaborator

Thanks @Taha-Firoz! When I first implemented the streams it seemed like I should implement the simplest case, and leave it to the API user in case they needed to use something more complex. However, I was less experienced in Dart at the time, so this assumption needs to be revisited. Is there any downside to always making signals broadcast? @jpnurmi perhaps you have a good idea on this too?

@jpnurmi
Copy link
Contributor

jpnurmi commented Oct 28, 2021

If I understood correctly, there may be two issues here. a) The widget should clean up after itself to avoid multiple subscriptions at a time, and b) standalone events should be broadcasted.

a) Even though you want to subscribe manually instead of using Flutter's StreamBuilder, its source code still acts as a good example of how to manage stream subscriptions. In short, initState() calls listen(), didUpdateWidget() calls cancel() and listen(), and dispose() calls cancel(). This way, rebuilding the widget would not lead to multiple subscriptions at a time.

b) There's a good explanation for choosing between single-subscription vs. broadcast streams in the Stream docs

Single-subscription streams are generally used for streaming chunks of larger contiguous data like file I/O.
...
Broadcast streams are used for independent events/observers.

In the case of D-Bus events, would they always belong to the latter category?

@Taha-Firoz
Copy link
Contributor Author

@jpnurmi Well I was doing atleast two of those things, I would listen in initstate, and cancel in dispose. However if I were using the streams generated by dart Dbus I would still get the runtime error stream already listened too.

@Taha-Firoz
Copy link
Contributor Author

@robert-ancell Any way you can guide me into making a PR for this?

@robert-ancell
Copy link
Collaborator

b) There's a good explanation for choosing between single-subscription vs. broadcast streams in the Stream docs

Single-subscription streams are generally used for streaming chunks of larger contiguous data like file I/O.
...
Broadcast streams are used for independent events/observers.

In the case of D-Bus events, would they always belong to the latter category?

I think yes, which suggests we definitely should be using broadcast streams by default.

@robert-ancell
Copy link
Collaborator

@robert-ancell Any way you can guide me into making a PR for this?

Any particular help you need?

@Taha-Firoz
Copy link
Contributor Author

This is what I was looking for 😅 I can just replace the regular streams constructors with stream broadcast ones.

@jpnurmi
Copy link
Contributor

jpnurmi commented Dec 6, 2021

Should the controller in DBusSignalStream be changed to a broadcast stream too?

For example:

import 'package:dbus/dbus.dart';
import 'package:upower/upower.dart';

void main() async {
  final bus = DBusClient.system(); // start app
  var client = UPowerClient(bus: bus); // provide

  await client.connect(); // enter a page
  await client.close(); // leave a page

  await client.connect(); // enter a page again => **boom**
  await client.close(); // ...leave a page

  await bus.close(); // close app
}

Results in:

StateError: Bad state: Stream has already been listened to.
#0      _StreamController._subscribe (dart:async/stream_controller.dart:635:7)
#1      _ControllerStream._createSubscription (dart:async/stream_controller.dart:786:19)
#2      _StreamImpl.listen (dart:async/stream_impl.dart:473:9)
#3      DBusSignalStream.listen
#4      new _ForwardingStreamSubscription (dart:async/stream_pipe.dart:114:10)
#5      _ForwardingStream._createSubscription (dart:async/stream_pipe.dart:86:16)
#6      _ForwardingStream.listen (dart:async/stream_pipe.dart:81:12)
#7      UPowerClient.connect

The next problem is in DBusClient._removeMatch() but that's another issue.

@jpnurmi
Copy link
Contributor

jpnurmi commented Dec 6, 2021

I opened a couple of PRs that make it possible to open and close D-Bus-based clients (NM, UPower, BlueZ, ...) multiple times.

Here's another example NetworkManager:

import 'package:dbus/dbus.dart';
import 'package:nm/nm.dart';

void main() async {
  final bus = DBusClient.system();
  final client = NetworkManagerClient(bus: bus);

  await client.connect();
  print('Running NetworkManager ${client.version}');
  await client.close();

  await client.connect();
  print('Running NetworkManager ${client.version}');
  await client.close();

  await bus.close();
}

This runs fine with #308 and #309 applied, but I'm not sure if I'm just fixing the symptoms.

robert-ancell pushed a commit that referenced this issue Dec 7, 2021
Reset the reference count before awaiting the RemoveMatch() D-Bus call
to prevent that a reference goes missing if _addMatch() increases the
reference count during the await.

Related issue: #293
robert-ancell pushed a commit that referenced this issue Jan 14, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants