Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Retry Mechanism and Concurrency Control for Jobs #21

Merged
merged 15 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ csharp_style_conditional_delegate_call = true

# Code block preferences
# https://docs.microsoft.com/visualstudio/ide/editorconfig-language-conventions#code-block-preferences
csharp_prefer_braces = true # IDE0011
csharp_prefer_braces = when_multiline # IDE0011

# Unused value preferences
# https://docs.microsoft.com/visualstudio/ide/editorconfig-language-conventions#unused-value-preferences
Expand Down Expand Up @@ -472,4 +472,4 @@ dotnet_diagnostic.S3459.severity = none # S3459: Unassigned members should be re
dotnet_diagnostic.S3871.severity = none # S3871: Exception types should be "public"
dotnet_diagnostic.S1186.severity = none # S1186: Methods should not be empty
dotnet_diagnostic.S1144.severity = none # S1144: Unused private types should be removed
dotnet_diagnostic.S6608.severity = none # S6608: Indexing at Count-1 should be used instead of "Enumerable" extension method "Last"
dotnet_diagnostic.S6608.severity = none # S6608: Indexing at Count-1 should be used instead of "Enumerable" extension method "Last"
16 changes: 16 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,22 @@ All notable changes to **NCronJob** will be documented in this file. The project

## [Unreleased]

### Added
- **Retry Mechanism:** Improved robustness of job executions, especially in cases of transient failures. Includes exponential backoff and fixed interval strategies.
falvarez1 marked this conversation as resolved.
Show resolved Hide resolved
- **Concurrency Control:** Introduced a `SupportsConcurrency` attribute to provide fine-grained control over the concurrency behavior of individual jobs. This attribute allows specifying the maximum degree of parallelism.
- **New WaitForJobsOrTimeout:** Added an enhanced version of `WaitForJobsOrTimeout` to the Tests project that allows time advancement for each run, providing more precise control over test execution timing.
- **Cancellation Support:** Jobs now support graceful cancellation during execution and SIGTERM.

### Changed
- **Concurrency Management:** Implemented improved global concurrency handling techniques within `CronScheduler`. This change enhances flexibility and scalability in managing job executions.
- **Async Job Executions:** Job executions are now fully async from the moment the scheduler triggers the job, ensuring better performance and resource utilization.

### Fixed
- **Test Framework Bugs:** Addressed specific bugs in the testing framework, ensuring that all tests are now passing and provide reliable results.

### Contributors
- Support for concurrent jobs and retries, as well as overall improvements, implemented by [@falvarez1](https://github.com/falvarez1) in PR [#21](https://github.com/linkdotnet/NCronJob/pull/21).
falvarez1 marked this conversation as resolved.
Show resolved Hide resolved

## [2.0.5] - 2024-04-19

### Changed
Expand Down
138 changes: 137 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ jobs:
- [x] Parameterized jobs - instant as well as cron jobs!
- [x] Integrated in ASP.NET - Access your DI container like you would in any other service
- [x] Get notified when a job is done (either successfully or with an error).
- [x] Retries - If a job fails, it will be retried.

## Not features

Expand All @@ -41,7 +42,6 @@ look into a more advanced scheduler like `Hangfire` or `Quartz`.

- [ ] Job persistence - Jobs are not persisted between restarts of the application.
- [ ] Job history - There is no history of jobs that have been run.
- [ ] Retries - If a job fails, it is not retried.
- [ ] Progress state - There is no way to track the progress of a job. The library will support notifying when a job is
done, but not the progress of the job itself.
- [ ] The job scheduler always uses UTC time. We might change that in the future.
Expand Down Expand Up @@ -220,6 +220,142 @@ Services.AddNCronJob(options =>
});
```

## Retry Support
linkdotnet marked this conversation as resolved.
Show resolved Hide resolved

The new Retry support provides a robust mechanism for handling transient failures by retrying failed operations. This feature is implemented using the `RetryPolicy` attribute that can be applied to any class implementing the `IJob` interface.

### How It Works

The `RetryPolicy` attribute allows you to specify the number of retry attempts and the strategy for handling retries. There are two built-in retry strategies:
- **ExponentialBackoff:** Increases the delay between retry attempts exponentially.
- **FixedInterval:** Keeps the delay between retry attempts consistent.

### Using Retry Policies

Here are examples of how to use the built-in retry policies:

#### Example 1: Basic Retry Policy, defaults to Exponential Backoff

```csharp
[RetryPolicy(retryCount: 4)]
public class RetryJob(ILogger<RetryJob> logger) : IJob
{
public async Task RunAsync(JobExecutionContext context, CancellationToken token)
{
var attemptCount = context.Attempts;

if (attemptCount <= 3)
{
logger.LogWarning("RetryJob simulating failure.");
throw new InvalidOperationException("Simulated operation failure in RetryJob.");
}

logger.LogInformation($"RetryJob with Id {context.Id} was attempted {attemptCount} times.");
await Task.CompletedTask;
}
}
```

#### Example 2: Fixed Interval

```csharp
[RetryPolicy(4, PolicyType.FixedInterval)]
public class FixedIntervalRetryJob(ILogger<FixedIntervalRetryJob> logger) : IJob
{
public async Task RunAsync(JobExecutionContext context, CancellationToken token)
{
var attemptCount = context.Attempts;

if (attemptCount <= 3)
{
logger.LogWarning("FixedIntervalRetryJob simulating failure.");
throw new InvalidOperationException("Simulated operation failure in FixedIntervalRetryJob.");
}

logger.LogInformation($"FixedIntervalRetryJob with Id {context.Id} was attempted {attemptCount} times.");
await Task.CompletedTask;
}
}
```

### Advanced: Custom Retry Policies

You can also create custom retry policies by implementing the `IPolicyCreator` interface. This allows you to define complex retry logic tailored to your specific needs.

```csharp
[RetryPolicy<MyCustomPolicyCreator>(retryCount:4, delayFactor:1)]
public class CustomPolicyJob(ILogger<CustomPolicyJob> logger) : IJob
{
public async Task RunAsync(JobExecutionContext context, CancellationToken token)
{
var attemptCount = context.Attempts;

if (attemptCount <= 3)
{
logger.LogWarning("FixedIntervalRetryJob simulating failure.");
throw new InvalidOperationException("Simulated operation failure in FixedIntervalRetryJob.");
}

logger.LogInformation($"CustomPolicyJob with Id {context.Id} was attempted {attemptCount} times.");
await Task.CompletedTask;
}
}

public class MyCustomPolicyCreator : IPolicyCreator
{
public IAsyncPolicy CreatePolicy(int maxRetryAttempts = 3, double delayFactor = 2)
{
return Policy.Handle<Exception>()
.WaitAndRetryAsync(maxRetryAttempts,
retryAttempt => TimeSpan.FromSeconds(Math.Pow(delayFactor, retryAttempt)));
}
}
```

## Concurrency Support

Concurrency support allows multiple instances of the same job type to run simultaneously, controlled by the `SupportsConcurrency` attribute. This feature is crucial for efficiently managing jobs that are capable of running in parallel without interference.

### How It Works

The `SupportsConcurrency` attribute specifies the maximum degree of parallelism for job instances. This means you can define how many instances of a particular job can run concurrently, optimizing performance and resource utilization based on the nature of the job and the system capabilities.

### Using the SupportsConcurrency Attribute

Here is an example of how to apply this attribute to a job:

#### Example: Concurrency in Jobs

```csharp
[SupportsConcurrency(10)]
public class ConcurrentJob : IJob
{
private readonly ILogger<ConcurrentJob> logger;

public ConcurrentJob(ILogger<ConcurrentJob> logger)
{
this.logger = logger;
}

public async Task RunAsync(JobExecutionContext context, CancellationToken token)
{
logger.LogInformation($"ConcurrentJob with Id {context.Id} is running.");
// Simulate some work by delaying
await Task.Delay(5000, token);
logger.LogInformation($"ConcurrentJob with Id {context.Id} has completed.");
}
}
```

### Important Considerations

#### Ensuring Job Idempotency
When using concurrency, it's essential to ensure that each job instance is idempotent. This means that even if the job is executed multiple times concurrently or sequentially, the outcome and side effects should remain consistent, without unintended duplication or conflict.

#### Resource Allocation Caution
Jobs that are marked to support concurrency should be designed carefully to avoid contention over shared resources. This includes, but is not limited to, database connections, file handles, or any external systems. In scenarios where shared resources are unavoidable, proper synchronization mechanisms or concurrency control techniques, such as semaphores, mutexes, or transactional control, should be implemented to prevent race conditions and ensure data integrity.


## Support & Contributing

Thanks to all [contributors](https://github.com/linkdotnet/NCronJob/graphs/contributors) and people that are creating
Expand Down
15 changes: 15 additions & 0 deletions sample/NCronJobSample/Jobs/.editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@

# All files
[*]
indent_style = space

# Xml files
[*.xml]
indent_size = 2

[*.{cs,vb}]
dotnet_diagnostic.CA2008.severity = none
dotnet_diagnostic.CA1848.severity = none
dotnet_diagnostic.S2629.severity = none
dotnet_diagnostic.S2696.severity = none
dotnet_diagnostic.CA2254.severity = none
84 changes: 84 additions & 0 deletions sample/NCronJobSample/Jobs/ConcurrentTaskExecutorJob.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
using System.Security.Cryptography;
using LinkDotNet.NCronJob;

namespace NCronJobSample;

[SupportsConcurrency(10)] // Supports up to 10 concurrent instances
public partial class ConcurrentTaskExecutorJob : IJob
{
private readonly ILogger<ConcurrentTaskExecutorJob> logger;

public ConcurrentTaskExecutorJob(ILogger<ConcurrentTaskExecutorJob> logger) => this.logger = logger;

public async Task RunAsync(JobExecutionContext context, CancellationToken token)
{
ArgumentNullException.ThrowIfNull(context);

var jobId = context.Id.ToString(); // Unique identifier for the job instance
LogStartingJob(jobId);

try
{
// Simulate different types of work by choosing a random task type
var taskType = RandomNumberGenerator.GetInt32(1, 4); // Randomly pick a task type between 1 and 3
await (taskType switch
{
1 => SimulateDataProcessing(jobId, token),
2 => SimulateApiCall(jobId, token),
3 => PerformCalculations(jobId, token),
_ => Task.CompletedTask
});

LogJobCompleted(jobId);
}
catch (OperationCanceledException)
{
LogCancellationConfirmed(jobId);
}
}

private async Task SimulateDataProcessing(string jobId, CancellationToken token)
{
// Simulate processing chunks of data
for (var i = 0; i < 5; i++)
{
token.ThrowIfCancellationRequested();
await Task.Delay(RandomNumberGenerator.GetInt32(200, 500), token); // Simulate work
LogProgress($"Data processing (Chunk {i + 1}/5)", jobId);
}
}

private async Task SimulateApiCall(string jobId, CancellationToken token)
{
// Simulate asynchronous API calls
for (var i = 0; i < 3; i++)
{
token.ThrowIfCancellationRequested();
await Task.Delay(RandomNumberGenerator.GetInt32(500, 1000), token); // Simulate API latency
LogProgress($"API Call {i + 1}/3 completed", jobId);
}
}

private async Task PerformCalculations(string jobId, CancellationToken token)
{
// Perform some random calculations
for (var i = 0; i < 10; i++)
{
token.ThrowIfCancellationRequested();
await Task.Delay(RandomNumberGenerator.GetInt32(100, 300), token); // Simulate calculation time
LogProgress($"Calculation {i + 1}/10", jobId);
}
}

[LoggerMessage(LogLevel.Information, "Job {JobId} started.")]
private partial void LogStartingJob(string jobId);

[LoggerMessage(LogLevel.Information, "Job {JobId} completed.")]
private partial void LogJobCompleted(string jobId);

[LoggerMessage(LogLevel.Information, "Job {JobId} progress: {Message}.")]
private partial void LogProgress(string message, string jobId);

[LoggerMessage(LogLevel.Warning, "Cancellation confirmed for Job {JobId}.")]
private partial void LogCancellationConfirmed(string jobId);
}
63 changes: 63 additions & 0 deletions sample/NCronJobSample/Jobs/TestCancellationJob.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
using LinkDotNet.NCronJob;
using System.Security.Cryptography;

namespace NCronJobSample;


[SupportsConcurrency(10)]
public partial class MultiInstanceJob : IJob
{
private readonly ILogger<MultiInstanceJob> logger;

public MultiInstanceJob(ILogger<MultiInstanceJob> logger) => this.logger = logger;

public async Task RunAsync(JobExecutionContext context, CancellationToken token)
{
ArgumentNullException.ThrowIfNull(context);

token.Register(() => LogCancellationRequestedInJob(context.Parameter));

LogMessage(context.Parameter);

try
{
// Simulate a long-running job
for (var i = 0; i < 15; i++) // Simulate 15 units of work
{
if (token.IsCancellationRequested)
{
LogCancellationNotice();
token.ThrowIfCancellationRequested(); // Properly handle the cancellation
}

// Simulate work by delaying a random amount of time
var variableMs = TimeSpan.FromMilliseconds(1000 + RandomNumberGenerator.GetInt32(2000));
await Task.Delay(variableMs, token);

// Log each unit of work completion
LogWorkUnitCompleted(i + 1, context.Parameter);
}
}
catch (OperationCanceledException)
{
LogCancellationConfirmed(context.Parameter);
}
}

[LoggerMessage(LogLevel.Information, "Message: {Parameter}")]
private partial void LogMessage(object? parameter);

[LoggerMessage(LogLevel.Warning, "Job cancelled by request.")]
private partial void LogCancellationNotice();

[LoggerMessage(LogLevel.Information, "Cancellation confirmed. Clean-up complete for {Parameter}.")]
private partial void LogCancellationConfirmed(object? parameter);

[LoggerMessage(LogLevel.Information, "Completed work unit {Number}/15 for {Parameter}")]
private partial void LogWorkUnitCompleted(int number, object? parameter);

[LoggerMessage(LogLevel.Debug, "Cancellation requested for TestCancellationJob {Parameter}.")]
private partial void LogCancellationRequestedInJob(object? parameter);

}

30 changes: 30 additions & 0 deletions sample/NCronJobSample/Jobs/TestRetryJob.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
using LinkDotNet.NCronJob;

namespace NCronJobSample;

[RetryPolicy(retryCount: 4)]
public class TestRetryJob(ILogger<TestRetryJob> logger, int maxFailuresBeforeSuccess = 3)
: IJob
{

/// <summary>
/// Runs the job, simulating failures based on a retry count. Will fail 3 times and then succeed.
/// </summary>
public async Task RunAsync(JobExecutionContext context, CancellationToken token)
{
ArgumentNullException.ThrowIfNull(context);

var attemptCount = context.Attempts;

if (attemptCount <= maxFailuresBeforeSuccess)
{
logger.LogWarning("TestRetryJob simulating failure.");
throw new InvalidOperationException("Simulated operation failure in TestRetryJob.");
}

await Task.Delay(3000, token);
logger.LogInformation($"TestRetryJob with instance Id {context.Id} completed successfully on attempt {attemptCount}.");
await Task.CompletedTask;
}
}

Loading