Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Interop between Akka.Streams and IObservable #3112

Merged
merged 9 commits into from
Oct 22, 2018

Conversation

Horusiath
Copy link
Contributor

@Horusiath Horusiath commented Sep 16, 2017

Motivation

Currently Akka.Streams is pretty self-contained - Rx.NET Observables adoption is much wider: partially probably due to fact that IObservable<T> is part of standard library and reactive streams are much younger and not so well-known. As it doesn't bring any extra dependencies, and may help with interop with existing technologies (i.e. SignalR for .NET Core 2.0 supports observables natively), I've decided to make this PR.

API

This PR aims to introduce 2 new methods:

  • Sink.AsObservable<T>() (already implemented with tests) - allows to materialize stream as IObservable<T>, which can be used for multi-observers scenarios (internally it's using immutable structures + interlocked, so it should be thread-safe as well, but that's harder to check).
  • Source.FromObservable<T>(IObservable<T> observable, int maxCapacity, OverflowStrategy strategy) (in progress) it works in a similar way to how Source.FromEvent works right now - actually I want to merge those implementations together as C# events can be described as observables easily.

@@ -1003,7 +1003,7 @@ private Action<TEventArgs> SetupOverflowStrategy(OverflowStrategy overflowStrate
/// <param name="addHandler">Action which attaches given event handler to the underlying .NET event.</param>
/// <param name="removeHandler">Action which detaches given event handler to the underlying .NET event.</param>
/// <param name="maxBuffer">Maximum size of the buffer, used in situation when amount of emitted events is higher than current processing capabilities of the downstream.</param>
/// <param name="overflowStrategy">Overflow strategy used, when buffer (size specified by <paramref name="maxBuffer"/>) has been overflown.</param>
/// <param name="overflowStrategy">Overflow strategy used, when b{uffer (size specified by <paramref name="maxBuffer"/>) has been overflown.</param>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure?

@alexvaluyskiy
Copy link
Contributor

Related #2354

@alexvaluyskiy
Copy link
Contributor

@Horusiath Probably we need an integration with System.Threading.Tasks.Channels in the future too

@Horusiath
Copy link
Contributor Author

When building Source.FromObservable I've run into following problem - since observables don't have any notion of demand/backpressure, they got pretty straightforward way of working order of OnNext/OnError/OnCompleted signals:

var subject = new Subject<int>();
subject.Subscribe(
    onNext: x => Console.WriteLine($"Event: {x}"), 
    onError: e => Console.WriteLine($"Error: {e.Message}"), 
    onCompleted: () => Console.WriteLine("Completed"));

subject.OnNext(1);
subject.OnError(new Exception("hello"));
subject.OnNext(2);

// this will produce: 
//  - Event: 1
//  - Error: hello

When using reactive streams, we do have a backpressure - this means that no new events will be send downstream until an explicit demand will be signalized using subscription.Request(n). However backpressure works only on events! This means that errors and completion signals are being send using a side lane, as soon as they come. This means that errors and completions may came out of order.

What does it mean in the current implementation?:

var subject = new Subject<int>();
var probe = this.CreateManualSubscriberProbe<int>();
Source.FromObservable(subject).To(Sink.FromSubscriber(probe)).Run(_materializer);

var subscription = probe.ExpectSubscription();

subject.OnNext(1);
subject.OnError(new Exception("hello"));
subject.OnNext(2);

subscription.Request(2);

// what user may expect:
subscription.ExpectNext(1); // this won't happen as error is not backpressured and will be delivered immediatelly
subscription.ExpectError();

Ofc, there's an alternative approach - we could buffer errors/completions just like we buffer normal events - but this would complicate buffering and overflow strategy.

I think, that we should allow current behavior, simply because subscription.Request should happen immediately after subscribing to an observable.

@Horusiath Horusiath changed the title [WIP] Interop betweek Akka.Streams and IObservable Interop betweek Akka.Streams and IObservable Sep 17, 2017
@alexvaluyskiy
Copy link
Contributor

I think a documentation for this integration should be a part of the PR

@Aaronontheweb
Copy link
Member

@Horusiath is this still in a "do not merge" state or is it ready for review?

@Horusiath
Copy link
Contributor Author

@Aaronontheweb it's ready to be reviewed. There are some issues with Mono runtime (as usual).

@Aaronontheweb
Copy link
Member

There are some issues with Mono runtime (as usual).

Grrr... I'll take a look at this and see what I can dig up on those Mono issues.

.RunForEach(Console.WriteLine, materializer);
```

You may notice two extra parameters here. One of the advantages of Akka.Streams (and reactive streams in general) over Reactive Extensions in notion of backpressure - absent in Rx.NET. This puts a constraint of rate limiting the events incoming form upstream. If an observable will be producing events faster, than downstream is able to consume them, source stage will start to buffer them up to a provided `maxBufferCapacity` limit. Once that limit is reached, an overflow strategy will be applied. There are several different overflow strategies to choose from:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is “in notion” correct? Sounds a bit strange to me or should it have been “is notion” ?

foreach (var disposer in old.Values) disposer.Dispose(unregister: false);
},
onUpstreamFailure: e =>
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don’t we need to remove observers in case of failure too?

Copy link
Contributor Author

@Horusiath Horusiath Sep 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we 100% sure that upstream failure should cause removal? What about Observable.Catch<>?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if the observable catches the failure the Sink is dead so it wouldn’t produce any new items

_maxBuffer = maxBuffer;
_overflowStrategy = overflowStrategy;
Shape = new SourceShape<TEventArgs>(Out);
this._observable = observable;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe want to remove “this”, haven’t used it anywhere else so...

@Horusiath
Copy link
Contributor Author

I couldn't debug this shit to identify the Mono issue. Any ideas?

return new TestDisposer(this);
}

public void Event(T element) => Observer.OnNext(element);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try to put `[MethodImpl(MethodImplOptions.NoInlining)] here. It could on Mono

@nesc58
Copy link

nesc58 commented Feb 13, 2018

Something new? This interop would be very nice.

@Aaronontheweb
Copy link
Member

@neekgreen we need to disable Mono compilation on these builds. I'm sure the Mono runtime can run with these changes just fine, but the Mono compiler cannot.

@Danthar Danthar added this to the 1.4.0 milestone Apr 18, 2018
@vasily-kirichenko
Copy link
Contributor

Any plans to include it to 1.4?

@Horusiath
Copy link
Contributor Author

I'll rebase it and push again - the only reason why it didn't land so far was some unrecognized issue on linux/mono.

@Danthar Danthar merged commit 771103c into akkadotnet:dev Oct 22, 2018
@Aaronontheweb Aaronontheweb modified the milestones: 1.4.0, 1.3.10 Oct 23, 2018
@Aaronontheweb
Copy link
Member

Since this was merged into dev, it's going into the v1.3.10 release 🥂

@Aaronontheweb Aaronontheweb changed the title Interop betweek Akka.Streams and IObservable Interop between Akka.Streams and IObservable Nov 1, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants