Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Diamond graphs for observable aren't composable? #484

Closed
iam opened this issue Feb 13, 2019 · 2 comments
Closed

Diamond graphs for observable aren't composable? #484

iam opened this issue Feb 13, 2019 · 2 comments

Comments

@iam
Copy link
Contributor

iam commented Feb 13, 2019

When trying to design an algorithm that requires turning a regular rx chain into a diamond (imagine there's no multi-threading necessary here), there seems to be a lack of composability:

     A
   /   \
  B     C
   \   /
     D
     |
     E

(A, B, C, D, E are some linear chains)

The (only working) approach I could figure out to this would look like:

auto A = ... | publish();
auto B = A | ...;
auto C = A | ...;
auto D = B | merge(C);
auto E = D | ...;

E | subscribe(...);
A.connect();                    // <<<<< not composable

The problem is that calling connect doesn't compose, so the whole graph can't just be expressed as an observable<typeof(E)>.

What's the best practice to solve this? Should we change some operators to make this easier?


RxJava has autoConnect which rxcpp seems to lack.

ref_count would almost work except it calls connect too soon (when B is subscribed, but before waiting for C).


Possible solutions:

  • Add auto_connect to connectable_observable (problem: it doesn't seem to call 'unsubscribe' ever in RxJava. Not sure if this is acceptable?)
auto A = ... | publish() | auto_connect(2);
auto B = A | ...;
auto C = A | ...;
auto D = B | merge(C);
auto E = D | ...;

E | subscribe(...);
  • Change ref_count to take a threshold value
    ref_count(N): call connect() when reaching >=N subscribers.
    unsubscribe from source when reaching 0 subscribers
    (Today's behavior is simply N=1).
auto A = ... | publish() | ref_count(2);
auto B = A | ...;
auto C = A | ...;
auto D = B | merge(C);
auto E = D | ...;

E | subscribe(...);
  • Change ref_count to take another observable
    ref_count(Obs): subscribes/unsubscribes to its source as normal. also call Obs.connect() when reaching >=1 subscribers.
    (Today's behavior is simply Obs=source).
auto A = ... | publish();
auto B = A | ...;
auto C = A | ...;
auto D = B | merge(C) | ref_count(A);
auto E = D | ...;

E | subscribe(...);
  • Add some kind of new operator to avoid publish?
auto A = <src> | fork(2);
auto B = A | ...;
auto C = A | ...;
auto D = B | merge(C);
auto E = D | ...;

E.subscribe(...);
/*
  calls D.subscribe which:
  1) calls B.subscribe which calls A.subscribe -> the fork doesn't let it through
  2) calls C.subscribe which calls A.subscribe -> the fork N==2, so call the <src> subscribe

  and then the whole graph is subscribed at <src>
*/

fork(N) waits for N subscribers before subscribing to source. it forwards all the observer callbacks to all its subscribers. unsubscribe when subscriber count reaches 0.

(and this could also avoid all the mutex overhead of publish/ref_count).


What do you think? I'd be happy to write a PR if I knew what was the best way.

This has been bothering me for quite some time in some code I wrote for a project of mine, I ended up returning a pair<observable<T>, connectable_observable<Y>> the design of which "broke" immediately once I tried to integrate this into another larger observable chain.

@kirkshoop
Copy link
Member

I like 'Change ref_count to take another observable' the best.

avoid all the mutex overhead of publish/ref_count

To do this write a new subject that does no synchronization and use that with the multicast operator to replace publish. This only works if there is only one thread (or external synchronization) for both the producer and all calls to subscribe - also be careful of reentrancy.

iam added a commit to iam/rxcpp that referenced this issue Feb 15, 2019
The existing `connectable_observable.ref_count()` operator calls
connect on the source when it's subscribed to.

Generalize this by allowing an optional parameter `other`, i.e.
`observable.ref_count(connectable_observable other)` to be used as the
connect target.

Useful for implementing diamond graphs while retaining composability:

```
     A
   /   \
  B     C
   \   /
     D
     |
     E

auto A = ... | publish();
auto B = A | ...;
auto C = A | ...;
auto D = B | merge(C) | ref_count(A);
auto E = D | ...;

E | subscribe(...);
```

Resolves: ReactiveX#484
@iam
Copy link
Contributor Author

iam commented Feb 15, 2019

Sounds good to me, I uploaded a PR with that approach you suggested works best.

iam added a commit to iam/rxcpp that referenced this issue Feb 27, 2019
The existing `connectable_observable.ref_count()` operator calls
connect on the source when it's subscribed to.

Generalize this by allowing an optional parameter `other`, i.e.
`observable.ref_count(connectable_observable other)` to be used as the
connect target.

Useful for implementing diamond graphs while retaining composability:

```
     A
   /   \
  B     C
   \   /
     D
     |
     E

auto A = ... | publish();
auto B = A | ...;
auto C = A | ...;
auto D = B | merge(C) | ref_count(A);
auto E = D | ...;

E | subscribe(...);
```

Resolves: ReactiveX#484
kirkshoop pushed a commit that referenced this issue Mar 6, 2019
The existing `connectable_observable.ref_count()` operator calls
connect on the source when it's subscribed to.

Generalize this by allowing an optional parameter `other`, i.e.
`observable.ref_count(connectable_observable other)` to be used as the
connect target.

Useful for implementing diamond graphs while retaining composability:

```
     A
   /   \
  B     C
   \   /
     D
     |
     E

auto A = ... | publish();
auto B = A | ...;
auto C = A | ...;
auto D = B | merge(C) | ref_count(A);
auto E = D | ...;

E | subscribe(...);
```

Resolves: #484
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants