Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load Splitting? #493

Closed
shinayser opened this issue Aug 19, 2020 · 3 comments
Closed

Load Splitting? #493

shinayser opened this issue Aug 19, 2020 · 3 comments
Labels

Comments

@shinayser
Copy link

Hey Guys!

I was wondering, is it possible to make load balancing with rxdart?
Imagine the following situation: I have one stream producer and many listeners but I want that the data produced are split between the listeners.

Like, listeners are A, B and C:

Emits [0], A receives but not B and C
Then it emits [1], B receives but not A and C
Then it emits [2], C receives but not A and B

@shinayser shinayser changed the title Load Balacing? Load Splitting? Aug 19, 2020
@shinayser
Copy link
Author

shinayser commented Aug 19, 2020

I just elaborated a nice solution for that and created a gist. Perhaps you guys could add it to the library itself?

https://gist.github.com/shinayser/519dedda4626e0c80203bc0673880963

@hoc081098
Copy link
Collaborator

hoc081098 commented Aug 21, 2020

Since Dart 2.9, we can use Stream.multi constructor. Here is my implementation:

Dartpad

import 'dart:async';

extension SplitExtension<T> on Stream<T> {
  Stream<T> split() {
    final controllers = <MultiStreamController<T>>[];
    var index = -1;
    var done = false;
    StreamSubscription<T> subscription;

    return Stream.multi(
      (controller) {
        if (done) {
          return controller.closeSync();
        }

        final wasEmpty = controllers.isEmpty;
        controllers.add(controller);

        if (wasEmpty) {
          subscription = listen(
            (value) {
              if (controllers.every((s) => s.isPaused)) {
                return;
              }

              while (true) {
                index = (index + 1) % controllers.length;
                final controller = controllers[index];

                if (!controller.isPaused) {
                  controller.addSync(value);
                  return;
                }
              }
            },
            onError: (e, StackTrace st) {
              if (controllers.every((s) => s.isPaused)) {
                return;
              }

              while (true) {
                index = (index + 1) % controllers.length;
                final controller = controllers[index];

                if (!controller.isPaused) {
                  controller.addErrorSync(e, st);
                  return;
                }
              }
            },
            onDone: () {
              done = true;
              controllers.forEach((c) {
                c.onCancel = null;
                c.closeSync();
              });
              controllers.clear();
            },
          );
        }

        controller.onCancel = () {
          controllers.remove(controller);

          if (controllers.isEmpty) {
            subscription?.cancel();
            subscription = null;
            done = true;
          }
        };
      },
      isBroadcast: true,
    );
  }
}

@shinayser
Copy link
Author

shinayser commented Aug 21, 2020

Your implementation is great, much better then mine actually! I just think that the errors should be emitted to EVERY listener and not only 1, since this can cause some side efects.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants