-
Notifications
You must be signed in to change notification settings - Fork 2
Activity heartbeat and cancellation
Activity heartbeats are the pulses which worker sends to Amazon SWF to let it know that worker is alive and doing its job. If Amazon SWF does not receive the heartbeat within configured interval it will raise Activity Timedout event. By default workflow will be failed on activity timedout event but you can configure it to take other appropriate action e.g. reschedule the activity. To enable the heartbeat support you need to:
-
Provide heartbeat timeout: Heartbeat timeout represent the interval within which Amazon SWF should receive pulse from worker. You can provide heartbeat timeout in ActivityDescriptionAttribute and it will be used while registering the activity with Amazon SWF or you can provide/override the heartbeat timeout while scheduling the activity. Following examples shows both approaches:
// When you will register the following activity with Amazon SWF its default heartbeat timeout will be 2 seconds. [ActivityDescription("1.0", DefaultHeartbeatTimeoutInSeconds=2)] public class TranscodeActivity : Activity { [Execute] public string Transcode(string input) { // transcode the file return Complete(transcodedPath); } } //In following example TranscodeActivity will be scheduled with heartbeat timeout of 3 second. ```cs public class TranscodeWorkflow : Workflow { public TranscodeWorkflow() { ScheduleActivity<TranscodeActivity>() .WithTimeouts(_=>new ActivityTimeouts(){ HeartbeatTimeout = TimeSpan.FromSeconds(3)}); } }
-
Enable heartbeat in activity: Other than configuring the heartbeat timeout, as show above, you also need to enable the heartbeat on activity for it to send pulses to Amazon SWF, when it start execution. You can use either of following approaches to enable the heartbeat on a activity:
-
Use EnableHeartbeatAttribute: It also allow you to provide the heartbeat interval. If you do not provide
heartbeat interval then it will be equal to ActivityDescriptionAttribute.DefaultHeartbeatTimeoutInSeconds.
Generally it is good idea to keep heartbeat interval shorter then heartbeat timeout otherwise activity can occasionally timedout if pulse are sent just near the timed out period.
[ActivityDescription("1.0", DefaultHeartbeatTimeoutInSeconds=2)] [EnableHeartbeat] //Or [EnableHeartbeat(IntervalInMilliSeconds = 1000)]; public class TranscodeActivity : Activity { [Execute] public string Transcode(string input) { // transcode the file return Complete(transcodedPath); } }
-
Using ActivityHeartbeat's API: You can also enable the heartbeat dynamically on activity as shown below in
example:
[ActivityDescription("1.0", DefaultHeartbeatTimeoutInSeconds=2)] public class TranscodeActivity : Activity { public TranscodeActivity() { Hearbeat.Enable(TimeSpan.FromSecond(1)); } [Execute] public string Transcode(string input) { // transcode the file return Complete(transcodedPath); } }
-
Use EnableHeartbeatAttribute: It also allow you to provide the heartbeat interval. If you do not provide
heartbeat interval then it will be equal to ActivityDescriptionAttribute.DefaultHeartbeatTimeoutInSeconds.
Generally it is good idea to keep heartbeat interval shorter then heartbeat timeout otherwise activity can occasionally timedout if pulse are sent just near the timed out period.
You can provide heartbeat progress details to Amazon SWF as shown in following example. Last reported details message will be available to you in ActivityTimedoutEvent.Details property only if Amazon SWF has timedout the activity.
[ActivityDescription("1.0")]
public class OrderActivity: Activity
{
private string _progressDetail;
public OrderActivity()
{
Hearbeat.ProvideDetails(()=>_progressDetail);
}
[ExecuteMethod]
public async Task<AsynResponse> OnExecute(ActivityArgs args, CancellationToken token)
{
_progressDetail = "starting";
await do.ioworkfow
_progressDetail = "logged request";
token.ThrowWhenCancelled();
await do.anotheriowork
_progressDetail = "finished some other work";
//...
return Complete("done");
}
}
Heartbeat can also inform you if currently executing activity is no more tracked in Amazon SWF. It can happen because either activity has timedout by SWF or workflow is terminated in Amazon SWF. In such case you do not want to keep worker CPU busy on any intensive operation. You can handle this scenario by handling the ActivityHearbeat.ActivityTerminated event as shown in following example:
public class TranscodeActivity: Activity
{
private volatile bool _terminated;
public OrderActivity()
{
Hearbeat.ActivityTerminated+=(s,e)= _terminated = true;
}
[ExecuteMethod]
public async Task<AsynResponse> OnExecute(ActivityArgs args)
{
//do some work
if(_terminated) Defer;
//do cpu intensive work;
//...
return Complete("done");
}
}
Activity heartbeats are quite useful in providing the elasticity in your workflows. If you shutdown the worker machine then worker running on these machine will stop sending heartbeat to Amazon SWF and as a result activity will be timed out and you can easily configure workflow as shown in following example to reschedule the activity for it be processed on other live worker machines:
[WorkflowDescription("1.0")]
public class TranscodeWorkflow : Workflow
{
public TranscodeWorkflow()
{
ScheduleActivity<TranscodeActivity>().OnTimeout(Reschedule);
}
}
//Or you can restart the workflow as shown below
[WorkflowDescription("1.0")]
public class TranscodeWorkflow : Workflow
{
public TranscodeWorkflow()
{
ScheduleActivity<TranscodeActivity>().OnTimeout(_=>RestartWorkflow());
}
}
Heartbeat also play important role in collaborative cancellation of activities. When worker is sending the heartbeat to AmazonSWF it also check if running activity is requested to be cancelled. Guflow gives you easy way to respond to cancellation request in your activity.
- One of the approach is to pass CancellationToken as one of the argument to your activity execution method. Guflow will signal CancellationToken as when it will detect that activity is requested to to be cancelled. You can use usual .NET approach to cancel the activity as shown in following example:
[ActivityDescription("1.0")] public class OrderActivit: Activity { [ExecuteMethod] public async Task<AsynResponse> OnExecute(ActivityArgs args, CancellationToken token) { await do.ioworkfow token.ThrowWhenCancelled(); await do.anotheriowork //... return Complete("done"); } }
- Other approach is to register the event handler with for ActivityHearbeat.CancellationRequested event as shown in below example:
[ActivityDescription("1.0")] public class OrderActivit: Activity { private volatile bool _cancelled; public OrderActivity() { Hearbeat.CancellationRequested+= (s,e)=> _cancelled = true; } [ExecuteMethod] public async Task<AsynResponse> OnExecute(ActivityArgs args) { await do.ioworkfow if(_cancelled) return Cancel("detail"); await do.anotheriowork //... return Complete("done"); } }
Guflow
- Prerequisite
- Installation
-
Workflows
- Creating first workflow
- Registration
- Hosting
- Start workflow
- Schedule activities
- Schedule timers
- Schedule lambda function
- Schedule child workflows
- Lambda functions vs activities
- Workflow input
- Workflow actions
- Signals
- Workflow branches
- Deflow algorithm
- Workflow events
- Query APIs
- Custom polling strategy
- Things to take care of
- Activites
- Unit testing
- Performance & scalability
- Error handling
- Logging
- Debugging
- Tutorial
- Release notes