New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Long running operation and stashing guidance #3071
Comments
I understand the desire to not make the grains reentrant, in general, but have you considered allowing specific calls to be allowed to interleave using the AlwaysInterleaveAttribute? |
Understood. Yes, this deviates from Orleans task scheduling. It sounds like you're refactoring the logic to conform with Orleans task scheduling, but in areas that need this specific task execution pattern you may want to consider the use of the utility AsyncSerialExecutor. This utility ensures serial execution of actions. It was initially developed to help with the development/maintenance of in reentrant grains or interleaved calls, but may also fit your needs. |
From my understanding, this should not cause a deadlock, but the result returned in 7, will not include the information delivered in the 6 tell, because Actor(State) will not process the 6 tell until 5 is completed in 7. To get the results from the 6 tell into Actor(State) prior to the results of 5 being returned in 7, the 6 tell call needs be marked AlwaysInterleave. |
@jason-bragg thank you for the detailed responses. Both the AlwaysInterleaveAttribute and AsyncSerialExecutor look very useful based on where we are today. I need to do a bit more research in to the AlwaysInterleaveAttribute, but if it does what I think it does, this will save a lot of time until the problem can be solved properly. I suspect the best way to resolve the issue long term is to create a new actor that is responsible for querying and aggregating results, so state change messages have the following flow Actor(State) > Actor(Forecast) > Actor(ResultsAggregator). Any request for results directly calls the Actor(ResultsAggregator). Is it possible to explain why the operation won't cause a deadlock, but will return incorrect results? There may be some context missing here - the exact interaction in step 7 is that the Actor(State) awaits the response and manipulates before returning it to the Client. This example may describe the implementation for accurately public interface IForecastGrain : IGrainWithIntegerKey
{
Task Forecast(int parameter, IStateGrain actorRef);
Task<int> GetResults(int parameter);
}
public interface IStateGrain : IGrainWithIntegerKey
{
Task SetDetails(int parameter);
Task<int> GetResults(int criteria);
Task ForecastComplete(int result);
}
public class StateGrain : Grain, IStateGrain
{
private int _parameter = 1;
private bool _isForecasting;
private bool _requiresAdditionalForecast;
public Task SetDetails(int parameter)
{
// Modify state
_parameter += parameter;
// Only forecast if not busy
if (!_isForecasting)
{
ForecastDetails();
}
else
{
_requiresAdditionalForecast = true;
}
return TaskDone.Done;
}
public async Task<int> GetResults(int criteria)
{
var grain = GrainFactory.GetGrain<IForecastGrain>(1);
// Get results
var val = await grain.GetResults(criteria);
// Post processing
return val * _parameter;
}
public Task ForecastComplete(int result)
{
// Update result based on forecast complete
_parameter -= result;
_isForecasting = false;
if (_requiresAdditionalForecast)
{
_requiresAdditionalForecast = false;
ForecastDetails();
}
return TaskDone.Done;
}
private void ForecastDetails()
{
_isForecasting = true;
var grain = GrainFactory.GetGrain<IForecastGrain>(1);
// Fire and forget
grain.Forecast(_parameter, this.AsReference<IStateGrain>()).FireAndForget(r => _isForecasting = false);
}
}
public class ForecastGrain : Grain, IForecastGrain
{
private int _parameter = 1;
public async Task Forecast(int parameter, IStateGrain actorRef)
{
// In the real implementation this work is done on the .NET thread pool
await Task.Delay(60000);
_parameter *= parameter;
actorRef.ForecastComplete(_parameter).FireAndForget();
}
public Task<int> GetResults(int parameter)
{
return Task.FromResult(_parameter * parameter);
}
} |
In the code sample, their could be blocking (and possibly timeouts) but not deadlocks. This could occur because while the ForecastGrain is performing Forecast(..) it cannot process any other messages, so all calls to it's GetResults(..) would wait until the forecast was complete, and it's ForecastComplete callback would be queued on the StateGrain only after all other calls queued for that grain. So given the following calls to StateGrain:
The first SetDetails (1) would modify the state and kick off a forecast. |
I don't think the behavior you seek is hard, it's just not clear what behavior you want. |
I'm confused with the code
In this approach, Did I miss something? |
Again thanks for the feedback. The pattern suggested in the Sergey's post is what I am attempting to achieve. I do have a follow on question based on the suggestion. The documentation suggests creating a Task using Task.Run, awaiting that task and then returning to the caller. My understanding is that awaiting the task dispatched on .NET Thread pool won't complete until the forecast completes. This means the return value won't be available until that Task completes. Is the suggestion to start the Task and store it within the Actor(Forecast)? I believe there are two possible solutions, please can you let me know if either summarise the suggestion? public Task<bool> BeginForecast(int parameter, IStateGrain actorRef)
{
if (_forecastingInProgress)
{
return Task.FromResult(false);
}
var orleansTs = TaskScheduler.Current;
_forecastingInProgress = true;
// Should this be:
// - _forecastTasks = Task.Run
// OR
// - await Task.Run
_forecastingTask = Task.Run(async () =>
{
await Task.Delay(60000);
await Task.Factory.StartNew(async () =>
{
var grain = GrainFactory.GetGrain<IForecastGrain>(this.GetPrimaryKeyLong());
var res = _parameter * parameter;
await grain.CompleteForecast(res, actorRef);
}, CancellationToken.None, TaskCreationOptions.None, scheduler: orleansTs);
});
return Task.FromResult(true);
} public async Task<bool> BeginForecast2(int parameter, IStateGrain actorRef)
{
if (_forecastingInProgress)
{
return false;
}
var orleansTs = TaskScheduler.Current;
_forecastingInProgress = true;
// Should this be:
// - _forecastTasks = Task.Run
// OR
// - await Task.Run
await Task.Factory.StartNew(async () =>
{
await Task.Delay(60000);
await Task.Factory.StartNew(async () =>
{
var grain = GrainFactory.GetGrain<IForecastGrain>(this.GetPrimaryKeyLong());
var res = _parameter * parameter;
await grain.CompleteForecast(res, actorRef);
}, CancellationToken.None, TaskCreationOptions.None, scheduler: orleansTs);
}, CancellationToken.None, TaskCreationOptions.None, scheduler: TaskScheduler.Default);
return true;
} I've extended the example to represent the code more accurately. I've also attempted to incorporate the suggestions, but I'm not quite sure how to achieve the "and return to the caller" quickly. public interface IForecastGrain : IGrainWithIntegerKey
{
Task<bool> BeginForecast(int parameter, IStateGrain actorRef);
Task<int> GetResults(int parameter);
Task CompleteForecast(int result, IStateGrain actorRef);
}
public interface IStateGrain : IGrainWithIntegerKey
{
Task SetDetails(int parameter);
Task<ForecastResults> GetResults(int criteria);
[AlwaysInterleave]
Task ForecastComplete(int result);
}
public class StateGrain : Grain, IStateGrain
{
private readonly CachedForecastResults _cachedResults = new CachedForecastResults();
private bool _isForecasting;
private bool _requiresAdditionalForecast;
private int _state;
private IDisposable _timer;
public async Task SetDetails(int newState)
{
// Modify state
_state += newState;
// Only forecast if not busy
if (!_isForecasting)
{
await ForecastDetails();
}
else
{
_requiresAdditionalForecast = true;
}
}
public async Task<ForecastResults> GetResults(int criteria)
{
var grain = GrainFactory.GetGrain<IForecastGrain>(1);
// Get results
var val = await _cachedResults.GetOrAdd(criteria, () => grain.GetResults(criteria));
// Post processing
return new ForecastResults(_state, val);
}
public async Task ForecastComplete(int result)
{
_cachedResults.Clear();
// Update result based on forecast complete
_isForecasting = false;
if (_requiresAdditionalForecast)
{
_requiresAdditionalForecast = false;
await ForecastDetails();
}
}
private async Task ForecastDetails()
{
_isForecasting = true;
var grain = GrainFactory.GetGrain<IForecastGrain>(1);
// Fire and forget
var result = await grain.BeginForecast(_state, this.AsReference<IStateGrain>());
if (result)
{
if (_timer != null) _timer.Dispose();
_isForecasting = true;
}
else
{
_requiresAdditionalForecast = true;
if (_timer == null)
{
_timer = this.RegisterTimer(o => ForecastDetails(), null, TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(10));
}
}
}
}
public class ForecastGrain : Grain, IForecastGrain
{
private int _parameter = 1;
private bool _forecastingInProgress;
private Task _forecastingTask;
public Task<bool> BeginForecast(int parameter, IStateGrain actorRef)
{
if (_forecastingInProgress)
{
return Task.FromResult(false);
}
var orleansTs = TaskScheduler.Current;
_forecastingInProgress = true;
// Should this be:
// - _forecastTasks = Task.Run
// OR
// - await Task.Run
_forecastingTask = Task.Run(async () =>
{
await Task.Delay(60000);
await Task.Factory.StartNew(async () =>
{
var grain = GrainFactory.GetGrain<IForecastGrain>(this.GetPrimaryKeyLong());
var res = _parameter * parameter;
await grain.CompleteForecast(res, actorRef);
}, CancellationToken.None, TaskCreationOptions.None, scheduler: orleansTs);
});
return Task.FromResult(true);
}
public Task<int> GetResults(int parameter)
{
return Task.FromResult(_parameter * parameter);
}
public async Task CompleteForecast(int result, IStateGrain actorRef)
{
_parameter = result;
_forecastingInProgress = false;
await actorRef.ForecastComplete(result);
}
}
public class CachedForecastResults
{
private Dictionary<int, int> _results = new Dictionary<int, int>();
public async Task<int> GetOrAdd(int parm, Func<Task<int>> getFunc)
{
if (_results.ContainsKey(parm))
{
return _results[parm];
}
var res = await getFunc();
_results.Add(parm, res);
return res;
}
public void Clear()
{
_results.Clear();
}
}
[Immutable]
public class ForecastResults
{
public ForecastResults(int currentState, int result)
{
CurrentState = currentState;
Result = result;
}
public int CurrentState { get; private set; }
public int Result { get; private set; }
} |
You are right, and I realize it's a bit confusing for this case. It was written primarily with another case in mind - to avoid running blocking operation on silo's scheduler thread. If those operations are relatively fast, awaiting them is the most straightforward thing to do. Your case is different - you are executing long running computations. I hope this clears it a little bit. |
@sergeybykov You said
Why an observer between two grains need additional pulling from State grain? @AlexMeakin I had an issue which was partially similar. The additional state changes didn't need to be stored for later startup of another long running operation. That doesn't make much difference however. I've implemented it in a different way compared to what Sergey suggests but BeginOperation is good good. You can even use a simple SMS stream from the Forecast grain to publish the results to subscribers or use observers. The alternate approach which I choose because the code was more linear in that case was that the LongRunning grain (a.k.a forecast) was marked Reentrant and then when I received BeginOperation calls. I created My case was a MatchMaker grain. My approach is more complex that using a stream/observer but the calling code would linearly get the result and the API surface might be a bit nicer to some. Back then I was a beginner (even more than now in Orleans) so wanted to avoid using any additional complex features like observers/streams specially in unit and integration tests which might have contributed to my choices as well. |
Observer messages are inherently unreliable because the caller gets no indication if a message gets lost due to a connectivity issue or a node failure, and hence doesn't know to resend it. Even though the probability of such an occurrence is fairly low, it's not zero. Periodic polling covers for that by ensuring no operation will be left in an unfinished state. |
@sergeybykov fair enough. Thanks. I'll update observer docs to reflect the fact. |
Looks like this can be closed now. |
It sounds like that What I found out when trying to replicate the above example is that await Task.Factory.StartNew(async () =>
{
var grain = GrainFactory.GetGrain<IForecastGrain>(this.GetPrimaryKeyLong());
var res = _parameter * parameter;
await grain.CompleteForecast(res, actorRef);
}, CancellationToken.None, TaskCreationOptions.None, scheduler: orleansTs); is scheduling a Task on the
What did I miss here? (I am running Orleans 2.2.0) |
@ztl8702 in the case that the task is very long running, you could use |
@ReubenBond thanks! My follow-up question is: how do I properly interact with the Orleans runtime from within the It appears that:
(The latter three requires So it also appears that:
Just want to check that my understanding is correct. :) |
Your solutions (point 2 & 3) are both fine approaches.
Yes, that is correct.
Yes. I would go further and suggest that you never access the internals of a grain object from the |
Got it: essentially the code within Thanks for helping out! |
Hi
We are currently refactoring our application to use Orleans and have a use case we are struggling to implement.
Background
The application in question is an interactive forecasting application that has been around for approximately 15 years. The application is being re-factored to improve performance, scalability and re-platform to web technologies. The initial phase of development migrated the Domain Driven Design application to our own Actor framework. Our actor framework uses a variant of a Limited Concurrency Task Scheduler to ensure only one task is executed at once per Actor. The current implementation uses await inconsistently, in certain circumstance the application expects await to interleave, in other cases it is reliant on deterministic ordering. Specifically calling await within an Actor expects tasks to be scheduled sequentially, but calling await on another Actor expects the caller to interleave/be re-entrant. This is leading to a lot of complexity in the code.
The web application uses a reactive architecture, users issue commands to the server which are validated quickly, once validated the user is informed that processing has started. Users are notified of forecast completion via web sockets. Forecasting is a reasonably time consuming operation 0.5 to 120 seconds, depending on data volumes. Forecasting is completed on the .NET thread pool, not the Orleans or Activation scheduler. The forecast duration is longer than the default message time out in Orleans.
In an attempt to resolve the interleaving problems and support horizontal scalability, the application is being further refactored to Orleans. Generally our grains are not re-entrant to reduce complexity.
Use case
It should be possible for users to update the forecast parameters even whilst forecasting is in progress. When an initial update is made, forecasting should start. If a forecast is in progress when the parameters are updated, the changes should be stashed and persisted until the current forecast completes. Once the original forecast completes, any pending changes are aggregated and a new forecast should start, which includes all of the stashed changes. Our users understand that the application is eventually consistent.
Current Progress
The refactor to Orleans is currently in development and to support the use case above two actors have been created: Actor(State) and Actor(Forecast). The Actor(State) is responsible for manipulating state, persisting state, tracking whether a forecast is in progress and starting a new forecast when appropriate. Actor(Forecast) is responsible for forecasting the State from Actor(State). Once forecasting has completed, the results are stored in Actor(Forecast). All clients require a combination of the state and results when displaying information to users, so all requests for results are proxied via the Actor(State).
To support the use case above, forecasting has been refactored to the Tell pattern. When an update is received, the state is updated and persisted by the Actor(State). Once persisted, the Actor(State) tells (via fire and forget *) the Actor(Forecast) to start processing. Once forecasting finishes the Actor(Forecast) tells the Actor(State) it has completed. The approach frees up the Actor(State)'s mailbox during forecasting. Actor(Forecast) sends aggregated results back to the Actor(State), which the Actor(State) stores in memory, so basic metrics can be returned when forecasting is in progress. In circumstances where the Actor(State) doesn't have the appropriate data available it asks the Actor(Forecast) for the results, so they can be returned to the client.
NOTE: The end goal is to move all forecast interactions to be Tell based, but this is a reasonably large amount of work. In the short term the application needs to be available.
Normally the following happens:
Required Guidance
Does the current implementation lead to deadlocks in the following case?
In this case is Actor(State) blocked because it is awaiting the results from Actor(Forecast) and Actor(Forecast) can't complete because it is trying to send a message to the Actor(State)? Or does the operation complete because the Actor(Forecast) is telling (fire and forget) the Actor(State) it has finished, so it's Task completes.
If this does lead to a deadlock, what is the best way to resolve the problem? I'd prefer to avoid making the Grains re-entrant. It may be possible to create a short lived re-entrant grain responsible for mediating between the two Actors, but the performance may be bad at scale.
* Fire and forget calls the target grain, but don't await the response. All fire and forget actions use a method similar to Ignore in https://github.com/dotnet/orleans/blob/master/src/Orleans/Async/TaskExtensions.cs. The main difference is that method logs all exceptions to ELK and supports a continuation. The continuation is used to recover from failures, either using retry or informing the source that the operation failed. This pattern is loosely based on Akka's Tell.
The text was updated successfully, but these errors were encountered: