Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for Reactive Extensions within grains #315

Closed
ReubenBond opened this issue Apr 10, 2015 · 13 comments
Closed

Support for Reactive Extensions within grains #315

ReubenBond opened this issue Apr 10, 2015 · 13 comments

Comments

@ReubenBond
Copy link
Member

Currently, Orleans does not support Reactive Extensions.

This proposal is for us to set a SynchronizationContext within grain calls, so that Rx works when using .ObserveOn() - I think it's fine if the SynchronizationContext is lazily created on request, to make it pay-for-what-you-use rather than incurring a per-grain/per-call cost.

This bit me in the foot today. I worked around it by shifting to a non-Rx implementation, but the Rx-based implementation was much more natural.

@gabikliot
Copy link
Contributor

By not supporting Reactive Extensions what exactly do you mean? It does not set or provide any IScheduler for RX operations, correct. And you want to support that by setting the SynchronizationContext? How would you do it lazily? Can it be done by the application code inside the grain. You have access to TaskScheduelr.Currect, so any translation from TPL to RX IScheduler should be possible, right?

So basically what I am asking is:

  1. Is it correct to say that Orleans does not do anything to help with RX scheduler, but also does not prevent the application from using and setting it correctly? Which is quite different from saying: does not support.
  2. What does it mean for Orleans to actively support Rx?

I thought that by being 100% TPL compatible we support anything that TPL supports (in a sense that support means "does not prevent from using").

EDIT: Cannot we just use that: System.Reactive.Concurrency.Scheduler.TaskPool https://msdn.microsoft.com/en-us/library/system.reactive.concurrency.scheduler.taskpool(v=vs.103).aspx ?

@veikkoeeva
Copy link
Contributor

@gabikliot, @ReubenBond I had this same issue. Some concrete meat to the discussion and a way to achieve this at SO: Is it in general dubious to call Task.Factory.StartNew(async () => {}) in Subscribe.

Hmm, I wonder if I had a bit smarter code. I'll check when I get home. :)

<edit: Yes, basically what's in the SO post, but with a modest modification:

var x = source.SelectMany(i =>
            {                    
                return Task.Factory.StartNew(async () =>
                {
                    return await... 
                }, CancellationToken.None, TaskCreationOptions.None, orleansScheduler).Unwrap().ToObservable();
            }).Publish().Connect();

@gabikliot
Copy link
Contributor

Let me please make sure I understand the solution (please bear with me, I am new to RX internals).

  1. One way to solve the problem is avoid using ObserveOn and just use Subscribe, or Publish with Connect. That way basically we are not consuming in a different asynchronous context, like would be the case with ObserveOn, but in the current thread/task context. Would just work out of the box in Orleans, due to TPL working.
  2. Do consume in ObserveOn . Since ObserveOn needs its own RX IScheduler or SynchronizationContext and since Orleans does not set the SynchronizationContext, one needs to do the following:
var factory = new TaskFactory(TaskScheduler.Current); // Grabs the Orleans task scheduler
var rxScheduler = new TaskPoolScheduler(factory);
someObservable.ObserveOn(rxScheduler)...

Is that correct?
If what I wrote above is indeed correct, then the only remaining question is whether Orleans should also set the SynchronizationContext. The concern is that it will impact perf, as TPL will propagate it along all Task calls. @ReubenBond had an idea on how to do this lazily?

For the context, this is also relevant: https://orleans.codeplex.com/discussions/560493.

@gabikliot
Copy link
Contributor

Just for the context, we think this is an important question, since in our Streaming we basically support "Rx-like" async stream. The grain can now subscribe to a stream with its IAsyncObserver. Our IAsyncObserver is currently not polymorphically compatible with the regular synch RX, but one can write a thin shim/conversion layer from our "async-RX" to .NET sync-RX.
So what I would like to unsure, at least, that nothing prevents anyone from doing so, and that it is indeed possible w.r.t. to all the threading issues.

The next question, for the future, would be how we can take this "conversion layer" and put it inside Orleans.

@veikkoeeva
Copy link
Contributor

@gabikliot

  1. Do consume in ObserveOn . Since ObserveOn needs its own RX IScheduler or SynchronizationContext and since Orleans does not set the SynchronizationContext , one needs to do the following:

That is how I understand it. I have wondered the relation between Orleans and ConfigureAwait(false). As I understand, calling it like that prevents capturing the SynchronizationContext. I wonder if there could be ConfigureAwait(true) in Orleans and it'd provide a synchronization context those who need it (I think F# async and Rx, as for examples). Though it feels one could write an Orleans compatible IScheduler for Rx.

@gabikliot
Copy link
Contributor

Though it feels one could write an Orleans compatible IScheduler for Rx

That was my feeling as well.
Or some kind of extension where we capture SynchronizationContext , just like you wrote.

@gabikliot
Copy link
Contributor

@ReubenBond , I thought how we can make progress on this issue.
Maybe you can contribute an example code, some small and isolated example of RX usage that does not work now in Orleans. It can be a simple unit test that can be added to our test project.

Then we can play around with the ideas above (using System.Reactive.Concurrency.Scheduler.TaskPool or setting SynchronizationContext). We will also measure any perf. impact and perhaps will be able to solve this issue quickly.

@ReubenBond
Copy link
Member Author

I think this could be implemented as you suggested, @gabikliot, by using TaskPoolScheduler,
something like so

var rxScheduler = new TaskPoolScheduler(new TaskFactory(RuntimeContext.Current));

I'll close this issue now. We can always re-open it if someone becomes interested/affected. Maybe if/when #940 is implemented and there's more impetus for Rx.

@gabikliot
Copy link
Contributor

Sounds good.

@iskandersierra
Copy link

Hello @gabikliot, @ReubenBond how is this issue progressing? I'm interested in developing some reactive logic independent of Orleans, and then use Orleans/Akka/Service Fabric to express actors. At least there is some gide on how to address this problem in Orleans? It is enough with the following code you posted before?

var factory = new TaskFactory(TaskScheduler.Current); // Grabs the Orleans task scheduler
var rxScheduler = new TaskPoolScheduler(factory);
someObservable.ObserveOn(rxScheduler)...

Thanks a lot

@talarari
Copy link

We tried using the suggested workarounds and they dont work.
When we use rx inside a grain it locks the grain and all requests to it timeout.
Is this supported in any way? Rx makes life a lot easier for certain things, we very much would like to use it in grains.

@ReubenBond
Copy link
Member Author

I'll close this for now, since it's been unanswered (by me, mostly, my apologies) for a long period. Subscribing from within OnActivateAsync or a grain method, and providing the current TaskScheduler for both SubscribeOn and ObserveOn ought to work, though. If not, let's get a repro project and investigate in a new issue. Subscribing from a grain's constructor likely wouldn't work, at least not in the current version of Orleans, since it does not run on the grain's scheduler. If there is a deadlock then it most likely indicates that the grain is blocking synchronously on some code, or that a method is waiting to process some non-ending stream within a method (and therefore not exiting the async method) for a grain which is not marked [Reentrant] or a method which is not marked [AlwaysInterleave]. Either way, if that arises we can use a repro project and a new issue to investigate.

@ghost ghost locked as resolved and limited conversation to collaborators Sep 30, 2021
@ReubenBond
Copy link
Member Author

ReubenBond commented Dec 16, 2022

Courtesy of @akourbat, here is a sample demonstrating Rx in Orleans: https://github.com/ReubenBond/TestOrleansRx

The pertinent bit is this:

var rxScheduler = new TaskPoolScheduler(new TaskFactory(TaskScheduler.Current));

Observable.Interval(TimeSpan.FromSeconds(1))
    .ObserveOn(rxScheduler)
    .Subscribe(x => _ticksSubj.OnNext(x));

Note that this leaks the IObservable<T>, so you'd want to be holding a reference to that and disposing it somewhere (eg, on grain dispose).

Also note: do not call SubscribeOn(rxScheduler), since it seems to use a blocking loop internally.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests

7 participants