diff --git a/.editorconfig b/.editorconfig index 856e718..00158f2 100644 --- a/.editorconfig +++ b/.editorconfig @@ -147,7 +147,7 @@ csharp_style_conditional_delegate_call = true # Code block preferences # https://docs.microsoft.com/visualstudio/ide/editorconfig-language-conventions#code-block-preferences -csharp_prefer_braces = true # IDE0011 +csharp_prefer_braces = when_multiline # IDE0011 # Unused value preferences # https://docs.microsoft.com/visualstudio/ide/editorconfig-language-conventions#unused-value-preferences @@ -472,4 +472,4 @@ dotnet_diagnostic.S3459.severity = none # S3459: Unassigned members should be re dotnet_diagnostic.S3871.severity = none # S3871: Exception types should be "public" dotnet_diagnostic.S1186.severity = none # S1186: Methods should not be empty dotnet_diagnostic.S1144.severity = none # S1144: Unused private types should be removed -dotnet_diagnostic.S6608.severity = none # S6608: Indexing at Count-1 should be used instead of "Enumerable" extension method "Last" \ No newline at end of file +dotnet_diagnostic.S6608.severity = none # S6608: Indexing at Count-1 should be used instead of "Enumerable" extension method "Last" diff --git a/CHANGELOG.md b/CHANGELOG.md index 8e4d5a2..59db4d3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,22 @@ All notable changes to **NCronJob** will be documented in this file. The project ## [Unreleased] +### Added +- **Retry Mechanism:** Improved robustness of job executions, especially in cases of transient failures. Includes exponential backoff and fixed interval strategies. +- **Concurrency Control:** Introduced a `SupportsConcurrency` attribute to provide fine-grained control over the concurrency behavior of individual jobs. This attribute allows specifying the maximum degree of parallelism. +- **New WaitForJobsOrTimeout:** Added an enhanced version of `WaitForJobsOrTimeout` to the Tests project that allows time advancement for each run, providing more precise control over test execution timing. +- **Cancellation Support:** Jobs now support graceful cancellation during execution and SIGTERM. + +### Changed +- **Concurrency Management:** Implemented improved global concurrency handling techniques within `CronScheduler`. This change enhances flexibility and scalability in managing job executions. +- **Async Job Executions:** Job executions are now fully async from the moment the scheduler triggers the job, ensuring better performance and resource utilization. + +### Fixed +- **Test Framework Bugs:** Addressed specific bugs in the testing framework, ensuring that all tests are now passing and provide reliable results. + +### Contributors +- Support for concurrent jobs and retries, as well as overall improvements, implemented by [@falvarez1](https://github.com/falvarez1) in PR [#21](https://github.com/linkdotnet/NCronJob/pull/21). + ## [2.0.5] - 2024-04-19 ### Changed diff --git a/README.md b/README.md index 5def395..0d4e31e 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,7 @@ jobs: - [x] Parameterized jobs - instant as well as cron jobs! - [x] Integrated in ASP.NET - Access your DI container like you would in any other service - [x] Get notified when a job is done (either successfully or with an error). +- [x] Retries - If a job fails, it will be retried. ## Not features @@ -41,7 +42,6 @@ look into a more advanced scheduler like `Hangfire` or `Quartz`. - [ ] Job persistence - Jobs are not persisted between restarts of the application. - [ ] Job history - There is no history of jobs that have been run. -- [ ] Retries - If a job fails, it is not retried. - [ ] Progress state - There is no way to track the progress of a job. The library will support notifying when a job is done, but not the progress of the job itself. - [ ] The job scheduler always uses UTC time. We might change that in the future. @@ -220,6 +220,142 @@ Services.AddNCronJob(options => }); ``` +## Retry Support + +The new Retry support provides a robust mechanism for handling transient failures by retrying failed operations. This feature is implemented using the `RetryPolicy` attribute that can be applied to any class implementing the `IJob` interface. + +### How It Works + +The `RetryPolicy` attribute allows you to specify the number of retry attempts and the strategy for handling retries. There are two built-in retry strategies: +- **ExponentialBackoff:** Increases the delay between retry attempts exponentially. +- **FixedInterval:** Keeps the delay between retry attempts consistent. + +### Using Retry Policies + +Here are examples of how to use the built-in retry policies: + +#### Example 1: Basic Retry Policy, defaults to Exponential Backoff + +```csharp +[RetryPolicy(retryCount: 4)] +public class RetryJob(ILogger logger) : IJob +{ + public async Task RunAsync(JobExecutionContext context, CancellationToken token) + { + var attemptCount = context.Attempts; + + if (attemptCount <= 3) + { + logger.LogWarning("RetryJob simulating failure."); + throw new InvalidOperationException("Simulated operation failure in RetryJob."); + } + + logger.LogInformation($"RetryJob with Id {context.Id} was attempted {attemptCount} times."); + await Task.CompletedTask; + } +} +``` + +#### Example 2: Fixed Interval + +```csharp +[RetryPolicy(4, PolicyType.FixedInterval)] +public class FixedIntervalRetryJob(ILogger logger) : IJob +{ + public async Task RunAsync(JobExecutionContext context, CancellationToken token) + { + var attemptCount = context.Attempts; + + if (attemptCount <= 3) + { + logger.LogWarning("FixedIntervalRetryJob simulating failure."); + throw new InvalidOperationException("Simulated operation failure in FixedIntervalRetryJob."); + } + + logger.LogInformation($"FixedIntervalRetryJob with Id {context.Id} was attempted {attemptCount} times."); + await Task.CompletedTask; + } +} +``` + +### Advanced: Custom Retry Policies + +You can also create custom retry policies by implementing the `IPolicyCreator` interface. This allows you to define complex retry logic tailored to your specific needs. + +```csharp +[RetryPolicy(retryCount:4, delayFactor:1)] +public class CustomPolicyJob(ILogger logger) : IJob +{ + public async Task RunAsync(JobExecutionContext context, CancellationToken token) + { + var attemptCount = context.Attempts; + + if (attemptCount <= 3) + { + logger.LogWarning("FixedIntervalRetryJob simulating failure."); + throw new InvalidOperationException("Simulated operation failure in FixedIntervalRetryJob."); + } + + logger.LogInformation($"CustomPolicyJob with Id {context.Id} was attempted {attemptCount} times."); + await Task.CompletedTask; + } +} + +public class MyCustomPolicyCreator : IPolicyCreator +{ + public IAsyncPolicy CreatePolicy(int maxRetryAttempts = 3, double delayFactor = 2) + { + return Policy.Handle() + .WaitAndRetryAsync(maxRetryAttempts, + retryAttempt => TimeSpan.FromSeconds(Math.Pow(delayFactor, retryAttempt))); + } +} +``` + +## Concurrency Support + +Concurrency support allows multiple instances of the same job type to run simultaneously, controlled by the `SupportsConcurrency` attribute. This feature is crucial for efficiently managing jobs that are capable of running in parallel without interference. + +### How It Works + +The `SupportsConcurrency` attribute specifies the maximum degree of parallelism for job instances. This means you can define how many instances of a particular job can run concurrently, optimizing performance and resource utilization based on the nature of the job and the system capabilities. + +### Using the SupportsConcurrency Attribute + +Here is an example of how to apply this attribute to a job: + +#### Example: Concurrency in Jobs + +```csharp +[SupportsConcurrency(10)] +public class ConcurrentJob : IJob +{ + private readonly ILogger logger; + + public ConcurrentJob(ILogger logger) + { + this.logger = logger; + } + + public async Task RunAsync(JobExecutionContext context, CancellationToken token) + { + logger.LogInformation($"ConcurrentJob with Id {context.Id} is running."); + // Simulate some work by delaying + await Task.Delay(5000, token); + logger.LogInformation($"ConcurrentJob with Id {context.Id} has completed."); + } +} +``` + +### Important Considerations + +#### Ensuring Job Idempotency +When using concurrency, it's essential to ensure that each job instance is idempotent. This means that even if the job is executed multiple times concurrently or sequentially, the outcome and side effects should remain consistent, without unintended duplication or conflict. + +#### Resource Allocation Caution +Jobs that are marked to support concurrency should be designed carefully to avoid contention over shared resources. This includes, but is not limited to, database connections, file handles, or any external systems. In scenarios where shared resources are unavoidable, proper synchronization mechanisms or concurrency control techniques, such as semaphores, mutexes, or transactional control, should be implemented to prevent race conditions and ensure data integrity. + + ## Support & Contributing Thanks to all [contributors](https://github.com/linkdotnet/NCronJob/graphs/contributors) and people that are creating diff --git a/sample/NCronJobSample/Jobs/.editorconfig b/sample/NCronJobSample/Jobs/.editorconfig new file mode 100644 index 0000000..8871b44 --- /dev/null +++ b/sample/NCronJobSample/Jobs/.editorconfig @@ -0,0 +1,15 @@ + +# All files +[*] +indent_style = space + +# Xml files +[*.xml] +indent_size = 2 + +[*.{cs,vb}] +dotnet_diagnostic.CA2008.severity = none +dotnet_diagnostic.CA1848.severity = none +dotnet_diagnostic.S2629.severity = none +dotnet_diagnostic.S2696.severity = none +dotnet_diagnostic.CA2254.severity = none diff --git a/sample/NCronJobSample/Jobs/ConcurrentTaskExecutorJob.cs b/sample/NCronJobSample/Jobs/ConcurrentTaskExecutorJob.cs new file mode 100644 index 0000000..ecc58d3 --- /dev/null +++ b/sample/NCronJobSample/Jobs/ConcurrentTaskExecutorJob.cs @@ -0,0 +1,84 @@ +using System.Security.Cryptography; +using LinkDotNet.NCronJob; + +namespace NCronJobSample; + +[SupportsConcurrency(10)] // Supports up to 10 concurrent instances +public partial class ConcurrentTaskExecutorJob : IJob +{ + private readonly ILogger logger; + + public ConcurrentTaskExecutorJob(ILogger logger) => this.logger = logger; + + public async Task RunAsync(JobExecutionContext context, CancellationToken token) + { + ArgumentNullException.ThrowIfNull(context); + + var jobId = context.Id.ToString(); // Unique identifier for the job instance + LogStartingJob(jobId); + + try + { + // Simulate different types of work by choosing a random task type + var taskType = RandomNumberGenerator.GetInt32(1, 4); // Randomly pick a task type between 1 and 3 + await (taskType switch + { + 1 => SimulateDataProcessing(jobId, token), + 2 => SimulateApiCall(jobId, token), + 3 => PerformCalculations(jobId, token), + _ => Task.CompletedTask + }); + + LogJobCompleted(jobId); + } + catch (OperationCanceledException) + { + LogCancellationConfirmed(jobId); + } + } + + private async Task SimulateDataProcessing(string jobId, CancellationToken token) + { + // Simulate processing chunks of data + for (var i = 0; i < 5; i++) + { + token.ThrowIfCancellationRequested(); + await Task.Delay(RandomNumberGenerator.GetInt32(200, 500), token); // Simulate work + LogProgress($"Data processing (Chunk {i + 1}/5)", jobId); + } + } + + private async Task SimulateApiCall(string jobId, CancellationToken token) + { + // Simulate asynchronous API calls + for (var i = 0; i < 3; i++) + { + token.ThrowIfCancellationRequested(); + await Task.Delay(RandomNumberGenerator.GetInt32(500, 1000), token); // Simulate API latency + LogProgress($"API Call {i + 1}/3 completed", jobId); + } + } + + private async Task PerformCalculations(string jobId, CancellationToken token) + { + // Perform some random calculations + for (var i = 0; i < 10; i++) + { + token.ThrowIfCancellationRequested(); + await Task.Delay(RandomNumberGenerator.GetInt32(100, 300), token); // Simulate calculation time + LogProgress($"Calculation {i + 1}/10", jobId); + } + } + + [LoggerMessage(LogLevel.Information, "Job {JobId} started.")] + private partial void LogStartingJob(string jobId); + + [LoggerMessage(LogLevel.Information, "Job {JobId} completed.")] + private partial void LogJobCompleted(string jobId); + + [LoggerMessage(LogLevel.Information, "Job {JobId} progress: {Message}.")] + private partial void LogProgress(string message, string jobId); + + [LoggerMessage(LogLevel.Warning, "Cancellation confirmed for Job {JobId}.")] + private partial void LogCancellationConfirmed(string jobId); +} diff --git a/sample/NCronJobSample/PrintHelloWorldJob.cs b/sample/NCronJobSample/Jobs/PrintHelloWorldJob.cs similarity index 100% rename from sample/NCronJobSample/PrintHelloWorldJob.cs rename to sample/NCronJobSample/Jobs/PrintHelloWorldJob.cs diff --git a/sample/NCronJobSample/Jobs/TestCancellationJob.cs b/sample/NCronJobSample/Jobs/TestCancellationJob.cs new file mode 100644 index 0000000..51fdfe7 --- /dev/null +++ b/sample/NCronJobSample/Jobs/TestCancellationJob.cs @@ -0,0 +1,63 @@ +using LinkDotNet.NCronJob; +using System.Security.Cryptography; + +namespace NCronJobSample; + + +[SupportsConcurrency(10)] +public partial class MultiInstanceJob : IJob +{ + private readonly ILogger logger; + + public MultiInstanceJob(ILogger logger) => this.logger = logger; + + public async Task RunAsync(JobExecutionContext context, CancellationToken token) + { + ArgumentNullException.ThrowIfNull(context); + + token.Register(() => LogCancellationRequestedInJob(context.Parameter)); + + LogMessage(context.Parameter); + + try + { + // Simulate a long-running job + for (var i = 0; i < 15; i++) // Simulate 15 units of work + { + if (token.IsCancellationRequested) + { + LogCancellationNotice(); + token.ThrowIfCancellationRequested(); // Properly handle the cancellation + } + + // Simulate work by delaying a random amount of time + var variableMs = TimeSpan.FromMilliseconds(1000 + RandomNumberGenerator.GetInt32(2000)); + await Task.Delay(variableMs, token); + + // Log each unit of work completion + LogWorkUnitCompleted(i + 1, context.Parameter); + } + } + catch (OperationCanceledException) + { + LogCancellationConfirmed(context.Parameter); + } + } + + [LoggerMessage(LogLevel.Information, "Message: {Parameter}")] + private partial void LogMessage(object? parameter); + + [LoggerMessage(LogLevel.Warning, "Job cancelled by request.")] + private partial void LogCancellationNotice(); + + [LoggerMessage(LogLevel.Information, "Cancellation confirmed. Clean-up complete for {Parameter}.")] + private partial void LogCancellationConfirmed(object? parameter); + + [LoggerMessage(LogLevel.Information, "Completed work unit {Number}/15 for {Parameter}")] + private partial void LogWorkUnitCompleted(int number, object? parameter); + + [LoggerMessage(LogLevel.Debug, "Cancellation requested for TestCancellationJob {Parameter}.")] + private partial void LogCancellationRequestedInJob(object? parameter); + +} + diff --git a/sample/NCronJobSample/Jobs/TestRetryJob.cs b/sample/NCronJobSample/Jobs/TestRetryJob.cs new file mode 100644 index 0000000..a67b751 --- /dev/null +++ b/sample/NCronJobSample/Jobs/TestRetryJob.cs @@ -0,0 +1,30 @@ +using LinkDotNet.NCronJob; + +namespace NCronJobSample; + +[RetryPolicy(retryCount: 4)] +public class TestRetryJob(ILogger logger, int maxFailuresBeforeSuccess = 3) + : IJob +{ + + /// + /// Runs the job, simulating failures based on a retry count. Will fail 3 times and then succeed. + /// + public async Task RunAsync(JobExecutionContext context, CancellationToken token) + { + ArgumentNullException.ThrowIfNull(context); + + var attemptCount = context.Attempts; + + if (attemptCount <= maxFailuresBeforeSuccess) + { + logger.LogWarning("TestRetryJob simulating failure."); + throw new InvalidOperationException("Simulated operation failure in TestRetryJob."); + } + + await Task.Delay(3000, token); + logger.LogInformation($"TestRetryJob with instance Id {context.Id} completed successfully on attempt {attemptCount}."); + await Task.CompletedTask; + } +} + diff --git a/sample/NCronJobSample/Program.cs b/sample/NCronJobSample/Program.cs index 767840e..bbc8ce7 100644 --- a/sample/NCronJobSample/Program.cs +++ b/sample/NCronJobSample/Program.cs @@ -12,11 +12,20 @@ // Add NCronJob to the container. builder.Services.AddNCronJob(n => n - // Execute the job every minute - .AddJob(p => p.WithCronExpression("* * * * *").WithParameter("Hello from NCronJob")) - + // Execute the job every 2 minutes + .AddJob(p => + p.WithCronExpression("*/2 * * * *").WithParameter("Hello from NCronJob")) // Register a handler that gets executed when the job is done .AddNotificationHandler() + + // Multiple instances of the same job with different cron expressions can be supported + // by marking the job with [SupportsConcurrency] attribute + .AddJob(p => + p.WithCronExpression("*/25 * * * * *")) + + // A job can support retries by marking it with [RetryPolicy(retryCount: 4)] attribute + .AddJob(p => + p.WithCronExpression("*/5 * * * * *")) ); var app = builder.Build(); @@ -37,4 +46,18 @@ .WithName("TriggerInstantJob") .WithOpenApi(); +app.MapPost("/trigger-instant-concurrent", (IInstantJobRegistry instantJobRegistry) => + { + instantJobRegistry.RunInstantJob(); + }) + .WithSummary("Triggers a job that can run concurrently with other instances.") + .WithDescription( + """ + This endpoint triggers an instance of 'TestCancellationJob' that is designed + to run concurrently with other instances of the same job. Each instance operates + independently, allowing parallel processing without mutual interference. + """) + .WithName("TriggerConcurrentJob") + .WithOpenApi(); + app.Run(); diff --git a/sample/NCronJobSample/appsettings.Development.json b/sample/NCronJobSample/appsettings.Development.json index 0c208ae..b51981a 100644 --- a/sample/NCronJobSample/appsettings.Development.json +++ b/sample/NCronJobSample/appsettings.Development.json @@ -1,8 +1,9 @@ { "Logging": { "LogLevel": { - "Default": "Information", - "Microsoft.AspNetCore": "Warning" + "Default": "Information", + "Microsoft.AspNetCore": "Warning", + "LinkDotNet.NCronJob": "Trace" } } } diff --git a/src/LinkDotNet.NCronJob/Configuration/ConcurrencySettings.cs b/src/LinkDotNet.NCronJob/Configuration/ConcurrencySettings.cs new file mode 100644 index 0000000..09e2883 --- /dev/null +++ b/src/LinkDotNet.NCronJob/Configuration/ConcurrencySettings.cs @@ -0,0 +1,18 @@ +namespace LinkDotNet.NCronJob; + +/// +/// Represents the configuration settings for managing concurrency within the application. +/// +/// +/// This configuration is utilized to specify the maximum number of concurrent operations +/// that the system can execute simultaneously. +/// +internal class ConcurrencySettings() +{ + /// + /// The total number of concurrent jobs that can be executed + /// by the scheduler at any one time, irrespective of the job type. + /// + public int MaxDegreeOfParallelism { get; set; } +} + diff --git a/src/LinkDotNet.NCronJob/Configuration/JobOptionBuilder.cs b/src/LinkDotNet.NCronJob/Configuration/JobOptionBuilder.cs index 3bfc676..aaa4d37 100644 --- a/src/LinkDotNet.NCronJob/Configuration/JobOptionBuilder.cs +++ b/src/LinkDotNet.NCronJob/Configuration/JobOptionBuilder.cs @@ -1,4 +1,4 @@ -using System.Diagnostics.CodeAnalysis; +using System.Collections.Concurrent; namespace LinkDotNet.NCronJob; @@ -13,14 +13,23 @@ public sealed class JobOptionBuilder /// Adds a cron expression for the given job. /// /// The cron expression that defines when the job should be executed. - /// If set to true, the cron expression can specify second-level precision. + /// + /// Specifies whether the cron expression should consider second-level precision. + /// This parameter is optional. If not provided, or set to null, it auto-detects based on the number + /// of parts in the cron expression (6 parts indicate second-level precision, otherwise minute-level precision). + /// /// Returns a that allows adding parameters to the job. - public ParameterBuilder WithCronExpression([StringSyntax(StringSyntaxAttribute.Regex)] string cronExpression, bool enableSecondPrecision = false) + public ParameterBuilder WithCronExpression(string cronExpression, bool? enableSecondPrecision = null) { + ArgumentNullException.ThrowIfNull(cronExpression); + + cronExpression = cronExpression.Trim(); + var determinedPrecision = DetermineAndValidatePrecision(cronExpression, enableSecondPrecision); + var jobOption = new JobOption { CronExpression = cronExpression, - EnableSecondPrecision = enableSecondPrecision + EnableSecondPrecision = determinedPrecision }; jobOptions.Add(jobOption); @@ -28,5 +37,21 @@ public ParameterBuilder WithCronExpression([StringSyntax(StringSyntaxAttribute.R return new ParameterBuilder(this, jobOption); } + + private static bool DetermineAndValidatePrecision(string cronExpression, bool? enableSecondPrecision) + { + var parts = cronExpression.Split(' '); + var precisionRequired = enableSecondPrecision ?? (parts.Length == 6); + + var expectedLength = precisionRequired ? 6 : 5; + if (parts.Length != expectedLength) + { + var precisionText = precisionRequired ? "second precision" : "minute precision"; + throw new ArgumentException($"Invalid cron expression format for {precisionText}.", nameof(cronExpression)); + } + + return precisionRequired; + } + internal List GetJobOptions() => jobOptions; } diff --git a/src/LinkDotNet.NCronJob/Configuration/NCronJobOptionBuilder.cs b/src/LinkDotNet.NCronJob/Configuration/NCronJobOptionBuilder.cs index 12506db..75bfac6 100644 --- a/src/LinkDotNet.NCronJob/Configuration/NCronJobOptionBuilder.cs +++ b/src/LinkDotNet.NCronJob/Configuration/NCronJobOptionBuilder.cs @@ -1,6 +1,9 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; +using Microsoft.Extensions.Options; using NCrontab; +using System.Reflection; +using System.Runtime; namespace LinkDotNet.NCronJob; @@ -10,8 +13,13 @@ namespace LinkDotNet.NCronJob; public sealed class NCronJobOptionBuilder { private readonly IServiceCollection services; + private readonly ConcurrencySettings settings; - internal NCronJobOptionBuilder(IServiceCollection services) => this.services = services; + internal NCronJobOptionBuilder(IServiceCollection services, ConcurrencySettings settings) + { + this.services = services; + this.settings = settings; + } /// /// Adds a job to the service collection that gets executed based on the given cron expression. @@ -33,10 +41,18 @@ public NCronJobOptionBuilder AddJob(Action? options = null) options?.Invoke(builder); var jobOptions = builder.GetJobOptions(); + var concurrencyAttribute = typeof(T).GetCustomAttribute(); + if (concurrencyAttribute != null && concurrencyAttribute.MaxDegreeOfParallelism > settings.MaxDegreeOfParallelism) + { + throw new InvalidOperationException($"The MaxDegreeOfParallelism for {typeof(T).Name} " + + $"({concurrencyAttribute.MaxDegreeOfParallelism}) cannot exceed " + + $"the global limit ({settings.MaxDegreeOfParallelism})."); + } + foreach (var option in jobOptions.Where(c => !string.IsNullOrEmpty(c.CronExpression))) { var cron = GetCronExpression(option); - var entry = new RegistryEntry(typeof(T), new(option.Parameter), cron); + var entry = new RegistryEntry(typeof(T), option.Parameter, cron); services.AddSingleton(entry); } diff --git a/src/LinkDotNet.NCronJob/Configuration/ParameterBuilder.cs b/src/LinkDotNet.NCronJob/Configuration/ParameterBuilder.cs index ca64264..3a0b617 100644 --- a/src/LinkDotNet.NCronJob/Configuration/ParameterBuilder.cs +++ b/src/LinkDotNet.NCronJob/Configuration/ParameterBuilder.cs @@ -22,8 +22,8 @@ internal ParameterBuilder(JobOptionBuilder optionBuilder, JobOption jobOption) /// The parameter that can be passed down to the job. This only applies to cron jobs.
/// When an instant job is triggered a parameter can be passed down via the interface. ///
- /// The paramter to add that will be passed to the cron job. - /// Returns a that allows adding more options (like additional cron defintions) to the job. + /// The parameter to add that will be passed to the cron job. + /// Returns a that allows adding more options (like additional cron definitions) to the job. /// /// Calling this method multiple times on the same cron expression, will overwrite the last set value. /// Therefore: diff --git a/src/LinkDotNet.NCronJob/Configuration/SupportsConcurrencyAttribute.cs b/src/LinkDotNet.NCronJob/Configuration/SupportsConcurrencyAttribute.cs new file mode 100644 index 0000000..058a25b --- /dev/null +++ b/src/LinkDotNet.NCronJob/Configuration/SupportsConcurrencyAttribute.cs @@ -0,0 +1,24 @@ +namespace LinkDotNet.NCronJob; + +/// +/// Specifies that multiple instances of the same job type can run simultaneously. +/// This attribute controls the maximum degree of parallelism allowed for instances of the job. +/// +[AttributeUsage(AttributeTargets.Class, Inherited = false, AllowMultiple = false)] +public sealed class SupportsConcurrencyAttribute : Attribute +{ + /// + /// Gets the maximum number of concurrent instances allowed for the job. + /// + public int MaxDegreeOfParallelism { get; } + + /// + /// Initializes a new instance of the class with the specified maximum degree of parallelism. + /// + /// The maximum number of concurrent instances that can run. + public SupportsConcurrencyAttribute(int maxDegreeOfParallelism) => MaxDegreeOfParallelism = maxDegreeOfParallelism; + + /// + public SupportsConcurrencyAttribute() => MaxDegreeOfParallelism = Environment.ProcessorCount; +} + diff --git a/src/LinkDotNet.NCronJob/Execution/JobExecutor.cs b/src/LinkDotNet.NCronJob/Execution/JobExecutor.cs index 098e556..4e0849d 100644 --- a/src/LinkDotNet.NCronJob/Execution/JobExecutor.cs +++ b/src/LinkDotNet.NCronJob/Execution/JobExecutor.cs @@ -1,5 +1,6 @@ using System.Diagnostics.CodeAnalysis; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; namespace LinkDotNet.NCronJob; @@ -8,18 +9,31 @@ internal sealed partial class JobExecutor : IDisposable { private readonly IServiceProvider serviceProvider; private readonly ILogger logger; + private readonly RetryHandler retryHandler; private bool isDisposed; + private CancellationTokenSource? shutdown; - public JobExecutor(IServiceProvider serviceProvider, ILogger logger) + public JobExecutor(IServiceProvider serviceProvider, + ILogger logger, + IHostApplicationLifetime lifetime, + RetryHandler retryHandler) { this.serviceProvider = serviceProvider; this.logger = logger; + this.retryHandler = retryHandler; + + lifetime.ApplicationStopping.Register(() => this.shutdown?.Cancel()); } [SuppressMessage("Reliability", "CA2000:Dispose objects before losing scope", Justification = "Service will be disposed in continuation task")] - public void RunJob(RegistryEntry run, CancellationToken stoppingToken) + public async Task RunJob(RegistryEntry run, CancellationToken stoppingToken) { + // stoppingToken is never cancelled when the job is triggered outside the BackgroundProcess, + // so we need to tie into the IHostApplicationLifetime + shutdown = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken); + var stopToken = shutdown.Token; + if (isDisposed) { LogSkipAsDisposed(); @@ -29,32 +43,35 @@ public void RunJob(RegistryEntry run, CancellationToken stoppingToken) var scope = serviceProvider.CreateScope(); var job = (IJob)scope.ServiceProvider.GetRequiredService(run.Type); - ExecuteJob(run, job, scope, stoppingToken); + var jobExecutionInstance = new JobExecutionContext(run.Type, run.Output); + await ExecuteJob(jobExecutionInstance, job, scope, stopToken); } - public void Dispose() => isDisposed = true; + public void Dispose() + { + shutdown?.Dispose(); + isDisposed = true; + } - private void ExecuteJob(RegistryEntry run, IJob job, IServiceScope serviceScope, CancellationToken stoppingToken) + private async Task ExecuteJob(JobExecutionContext runContext, IJob job, IServiceScope serviceScope, CancellationToken stoppingToken) { try { - LogRunningJob(run.Type); - - // We don't want to await jobs explicitly because that - // could interfere with other job runs - job.RunAsync(run.Context, stoppingToken) - .ContinueWith( - task => AfterJobCompletionTask(task.Exception), - TaskScheduler.Default) - .ConfigureAwait(false); + LogRunningJob(job.GetType()); + + await retryHandler.ExecuteAsync(async token => await job.RunAsync(runContext, token), runContext, stoppingToken); + + stoppingToken.ThrowIfCancellationRequested(); + + await AfterJobCompletionTask(null, stoppingToken); } catch (Exception exc) when (exc is not OperationCanceledException or AggregateException) { // This part is only reached if the synchronous part of the job throws an exception - AfterJobCompletionTask(exc); + await AfterJobCompletionTask(exc, default); } - - void AfterJobCompletionTask(Exception? exc) + // This needs to be async otherwise it can deadlock or try to use the disposed scope, maybe it needs to create its own serviceScope + async Task AfterJobCompletionTask(Exception? exc, CancellationToken ct) { if (isDisposed) { @@ -62,13 +79,13 @@ void AfterJobCompletionTask(Exception? exc) return; } - var notificationServiceType = typeof(IJobNotificationHandler<>).MakeGenericType(run.Type); + var notificationServiceType = typeof(IJobNotificationHandler<>).MakeGenericType(runContext.JobType); if (serviceScope.ServiceProvider.GetService(notificationServiceType) is IJobNotificationHandler notificationService) { try { - notificationService.HandleAsync(run.Context, exc, stoppingToken).ConfigureAwait(false); + await notificationService.HandleAsync(runContext, exc, ct).ConfigureAwait(false); } catch (Exception innerExc) when (innerExc is not OperationCanceledException or AggregateException) { diff --git a/src/LinkDotNet.NCronJob/JobExecutionContext.cs b/src/LinkDotNet.NCronJob/JobExecutionContext.cs index a977dfa..ff7ca43 100644 --- a/src/LinkDotNet.NCronJob/JobExecutionContext.cs +++ b/src/LinkDotNet.NCronJob/JobExecutionContext.cs @@ -3,11 +3,23 @@ namespace LinkDotNet.NCronJob; /// /// Represents the context of a job execution. /// +/// The Type that represents the Job /// The passed in parameters to a job. -public sealed record JobExecutionContext(object? Parameter) +public sealed record JobExecutionContext(Type JobType, object? Parameter) { + /// + /// The Job Instance Identifier, generated once upon creation of the context. + /// + public Guid Id { get; } = Guid.NewGuid(); + /// /// The output of a job that can be read by the . /// public object? Output { get; set; } + + /// + /// The attempts made to execute the job for one run. Will be incremented when a retry is triggered. + /// Retries will only occur when is set on the Job. + /// + public int Attempts { get; internal set; } } diff --git a/src/LinkDotNet.NCronJob/LinkDotNet.NCronJob.csproj b/src/LinkDotNet.NCronJob/LinkDotNet.NCronJob.csproj index 414b9e5..2934bb1 100644 --- a/src/LinkDotNet.NCronJob/LinkDotNet.NCronJob.csproj +++ b/src/LinkDotNet.NCronJob/LinkDotNet.NCronJob.csproj @@ -1,4 +1,4 @@ - + net8.0;net9.0 @@ -59,6 +59,7 @@ + diff --git a/src/LinkDotNet.NCronJob/NCronJobExtensions.cs b/src/LinkDotNet.NCronJob/NCronJobExtensions.cs index cd91eef..67fa9a8 100644 --- a/src/LinkDotNet.NCronJob/NCronJobExtensions.cs +++ b/src/LinkDotNet.NCronJob/NCronJobExtensions.cs @@ -25,12 +25,17 @@ public static class NCronJobExtensions this IServiceCollection services, Action? options = null) { - var builder = new NCronJobOptionBuilder(services); + // 4 is just an arbitrary multiplier based on system observed I/O, this could come from Configuration + var settings = new ConcurrencySettings { MaxDegreeOfParallelism = Environment.ProcessorCount * 4 }; + services.AddSingleton(settings); + + var builder = new NCronJobOptionBuilder(services, settings); options?.Invoke(builder); services.AddHostedService(); services.AddSingleton(); services.AddSingleton(); + services.AddSingleton(); services.AddSingleton(c => c.GetRequiredService()); services.TryAddSingleton(TimeProvider.System); diff --git a/src/LinkDotNet.NCronJob/Registry/CronRegistry.cs b/src/LinkDotNet.NCronJob/Registry/CronRegistry.cs index 8b26fbc..bb73c72 100644 --- a/src/LinkDotNet.NCronJob/Registry/CronRegistry.cs +++ b/src/LinkDotNet.NCronJob/Registry/CronRegistry.cs @@ -1,15 +1,20 @@ using System.Collections.Frozen; +using Microsoft.Extensions.Logging; namespace LinkDotNet.NCronJob; -internal sealed class CronRegistry : IInstantJobRegistry +internal sealed partial class CronRegistry : IInstantJobRegistry { private readonly JobExecutor jobExecutor; + private readonly ILogger logger; private readonly FrozenSet cronJobs; - public CronRegistry(IEnumerable jobs, JobExecutor jobExecutor) + public CronRegistry(IEnumerable jobs, + JobExecutor jobExecutor, + ILogger logger) { this.jobExecutor = jobExecutor; + this.logger = logger; cronJobs = jobs.Where(c => c.CrontabSchedule is not null).ToFrozenSet(); } @@ -19,8 +24,36 @@ public CronRegistry(IEnumerable jobs, JobExecutor jobExecutor) public void RunInstantJob(object? parameter = null, CancellationToken token = default) where TJob : IJob { - var executionContext = new JobExecutionContext(parameter); - var run = new RegistryEntry(typeof(TJob), executionContext, null); - jobExecutor.RunJob(run, token); + token.Register(() => LogCancellationRequested(parameter)); + + var run = new RegistryEntry(typeof(TJob), parameter, null); + + var jobName = typeof(TJob).Name; + _ = Task.Run(async () => + { + try + { + using (logger.BeginScope(new Dictionary + { + { "JobName", jobName }, + { "JobTypeFullName", typeof(TJob).FullName ?? jobName } + })) + { + await jobExecutor.RunJob(run, CancellationToken.None); + } + } + catch + { + LogCancellationNotice(jobName); + } + }, token); + } + + [LoggerMessage(LogLevel.Warning, "Job {JobName} cancelled by request.")] + private partial void LogCancellationNotice(string jobName); + + [LoggerMessage(LogLevel.Debug, "Cancellation requested for CronRegistry {Parameter}.")] + private partial void LogCancellationRequested(object? parameter); } + diff --git a/src/LinkDotNet.NCronJob/Registry/CronRegsitryEntry.cs b/src/LinkDotNet.NCronJob/Registry/CronRegsitryEntry.cs index 4f767cf..f58b488 100644 --- a/src/LinkDotNet.NCronJob/Registry/CronRegsitryEntry.cs +++ b/src/LinkDotNet.NCronJob/Registry/CronRegsitryEntry.cs @@ -4,5 +4,7 @@ namespace LinkDotNet.NCronJob; internal sealed record RegistryEntry( Type Type, - JobExecutionContext Context, - CrontabSchedule? CrontabSchedule); + object? Output, + CrontabSchedule? CrontabSchedule, + int JobExecutionCount = 0, + JobPriority Priority = JobPriority.Normal); diff --git a/src/LinkDotNet.NCronJob/Registry/IInstantJobRegistry.cs b/src/LinkDotNet.NCronJob/Registry/IInstantJobRegistry.cs index 2055b89..710a805 100644 --- a/src/LinkDotNet.NCronJob/Registry/IInstantJobRegistry.cs +++ b/src/LinkDotNet.NCronJob/Registry/IInstantJobRegistry.cs @@ -11,7 +11,8 @@ public interface IInstantJobRegistry /// An optional token to cancel the job. /// /// - /// The contents of are not serialized and deserialized. It is the reference to the -object that gets passed in. + /// This is a fire-and-forget process, the Job will be run in the background. The contents of + /// are not serialized and deserialized. It is the reference to the -object that gets passed in. /// /// /// Running a job with a parameter: diff --git a/src/LinkDotNet.NCronJob/Registry/JobPriority.cs b/src/LinkDotNet.NCronJob/Registry/JobPriority.cs new file mode 100644 index 0000000..87c7b92 --- /dev/null +++ b/src/LinkDotNet.NCronJob/Registry/JobPriority.cs @@ -0,0 +1,12 @@ +namespace LinkDotNet.NCronJob; + +/// +/// When competing for resources the Higher the priority the more +/// likely the job will be executed over others of lower priority. +/// +internal enum JobPriority +{ + Low = 0, + Normal = 1, + High = 2 +} diff --git a/src/LinkDotNet.NCronJob/RetryPolicies/ExponentialBackoffPolicyCreator.cs b/src/LinkDotNet.NCronJob/RetryPolicies/ExponentialBackoffPolicyCreator.cs new file mode 100644 index 0000000..7d0de37 --- /dev/null +++ b/src/LinkDotNet.NCronJob/RetryPolicies/ExponentialBackoffPolicyCreator.cs @@ -0,0 +1,33 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Polly; + +namespace LinkDotNet.NCronJob; + +/// +/// A policy creator that configures an exponential back-off retry policy. +/// +internal partial class ExponentialBackoffPolicyCreator : IPolicyCreator, IInitializablePolicyCreator +{ + private ILogger logger = default!; + + /// + public void Initialize(IServiceProvider serviceProvider) => + logger = serviceProvider.GetRequiredService>(); + + /// + public IAsyncPolicy CreatePolicy(int maxRetryAttempts = 3, double delayFactor = 2) => + Policy + .Handle() + .WaitAndRetryAsync( + maxRetryAttempts, + retryAttempt => TimeSpan.FromSeconds(Math.Pow(delayFactor, retryAttempt)), + onRetry: (exception, timeSpan, retryCount, context) => + { + LogRetryAttempt(exception.Message, timeSpan, retryCount); + }); + + [LoggerMessage(LogLevel.Warning, "Retry {RetryCount} due to error: {Message}. Retrying after {TimeSpan}.")] + private partial void LogRetryAttempt(string message, TimeSpan timeSpan, int retryCount); + +} diff --git a/src/LinkDotNet.NCronJob/RetryPolicies/FixedIntervalRetryPolicyCreator.cs b/src/LinkDotNet.NCronJob/RetryPolicies/FixedIntervalRetryPolicyCreator.cs new file mode 100644 index 0000000..c62b055 --- /dev/null +++ b/src/LinkDotNet.NCronJob/RetryPolicies/FixedIntervalRetryPolicyCreator.cs @@ -0,0 +1,33 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Polly; + +namespace LinkDotNet.NCronJob; + +/// +/// A policy creator that configures a fixed interval retry policy. +/// +internal partial class FixedIntervalRetryPolicyCreator : IPolicyCreator, IInitializablePolicyCreator +{ + private ILogger logger = default!; + + /// + public void Initialize(IServiceProvider serviceProvider) => + logger = serviceProvider.GetRequiredService>(); + + /// + public IAsyncPolicy CreatePolicy(int maxRetryAttempts = 3, double delayFactor = 2) => + // Here, delayFactor will represent the fixed number of seconds between retries + Policy + .Handle() + .WaitAndRetryAsync( + maxRetryAttempts, + _ => TimeSpan.FromSeconds(delayFactor), // Fixed delay between retries + onRetry: (exception, timeSpan, retryCount, context) => + { + LogRetryAttempt(exception.Message, timeSpan, retryCount); + }); + + [LoggerMessage(LogLevel.Warning, "Retry {RetryCount} due to error: {Message}. Retrying after {TimeSpan}.")] + private partial void LogRetryAttempt(string message, TimeSpan timeSpan, int retryCount); +} diff --git a/src/LinkDotNet.NCronJob/RetryPolicies/IInitializablePolicyCreator.cs b/src/LinkDotNet.NCronJob/RetryPolicies/IInitializablePolicyCreator.cs new file mode 100644 index 0000000..269730f --- /dev/null +++ b/src/LinkDotNet.NCronJob/RetryPolicies/IInitializablePolicyCreator.cs @@ -0,0 +1,17 @@ +namespace LinkDotNet.NCronJob; + +/// +/// Provides a mechanism for initializing policy creators with necessary services. +/// +/// +/// This interface is intended for use within the retry mechanism to allow policy creators to receive +/// and utilize services provided by the application's service provider, facilitating dependency injection. +/// +internal interface IInitializablePolicyCreator +{ + /// + /// Initializes the policy creator with the specified service provider. + /// + /// The service provider used to resolve dependencies needed by the policy creator. + void Initialize(IServiceProvider serviceProvider); +} diff --git a/src/LinkDotNet.NCronJob/RetryPolicies/IPolicyCreator.cs b/src/LinkDotNet.NCronJob/RetryPolicies/IPolicyCreator.cs new file mode 100644 index 0000000..5b1c227 --- /dev/null +++ b/src/LinkDotNet.NCronJob/RetryPolicies/IPolicyCreator.cs @@ -0,0 +1,18 @@ +using Polly; + +namespace LinkDotNet.NCronJob; + +/// +/// Defines a contract used to create retry policies. +/// +public interface IPolicyCreator +{ + /// + /// Creates and configures an asynchronous retry policy. + /// + /// The maximum number of retry attempts. Defaults to 3. + /// The factor that determines the delay between retries. Defaults to 2, + /// which can be used to calculate exponential back-off or other delay strategies. + /// An asynchronous retry policy configured according to the specified parameters. + IAsyncPolicy CreatePolicy(int maxRetryAttempts = 3, double delayFactor = 2); +} diff --git a/src/LinkDotNet.NCronJob/RetryPolicies/PolicyCreatorFactory.cs b/src/LinkDotNet.NCronJob/RetryPolicies/PolicyCreatorFactory.cs new file mode 100644 index 0000000..e2b2bd1 --- /dev/null +++ b/src/LinkDotNet.NCronJob/RetryPolicies/PolicyCreatorFactory.cs @@ -0,0 +1,21 @@ +using Polly; + +namespace LinkDotNet.NCronJob; + +internal class PolicyCreatorFactory(IServiceProvider serviceProvider) +{ + public IAsyncPolicy CreatePolicy(PolicyType policyType, int retryCount, double delayFactor) => + policyType switch + { + PolicyType.ExponentialBackoff => Create(retryCount, delayFactor), + PolicyType.FixedInterval => Create(retryCount, delayFactor), + _ => throw new ArgumentException("Unsupported policy type") + }; + + public IAsyncPolicy Create(int retryCount, double delayFactor) where TPolicyCreator : IPolicyCreator, new() + { + var creator = new TPolicyCreator(); + (creator as IInitializablePolicyCreator)?.Initialize(serviceProvider); + return creator.CreatePolicy(retryCount, delayFactor); + } +} diff --git a/src/LinkDotNet.NCronJob/RetryPolicies/PolicyType.cs b/src/LinkDotNet.NCronJob/RetryPolicies/PolicyType.cs new file mode 100644 index 0000000..2648976 --- /dev/null +++ b/src/LinkDotNet.NCronJob/RetryPolicies/PolicyType.cs @@ -0,0 +1,20 @@ +namespace LinkDotNet.NCronJob; + +/// +/// Defines the types of retry policies that can be applied to operations to handle transient faults by retrying failed operations. +/// +public enum PolicyType +{ + /// + /// Specifies a retry policy that implements an exponential backoff strategy. + /// This policy increases the delay between retry attempts exponentially, which helps to reduce the load on the system and increases the probability of a successful retry under high contention scenarios. + /// + ExponentialBackoff, + + /// + /// Specifies a retry policy that implements a fixed interval strategy. + /// Each retry attempt will wait for a consistent delay period between attempts regardless of the number of retries. This is useful for scenarios where the expected time for resolving a transient fault is consistent. + /// + FixedInterval +} + diff --git a/src/LinkDotNet.NCronJob/RetryPolicies/RetryHandler.cs b/src/LinkDotNet.NCronJob/RetryPolicies/RetryHandler.cs new file mode 100644 index 0000000..52ce393 --- /dev/null +++ b/src/LinkDotNet.NCronJob/RetryPolicies/RetryHandler.cs @@ -0,0 +1,69 @@ +using System.Reflection; +using Polly; +using Microsoft.Extensions.Logging; + +namespace LinkDotNet.NCronJob; + +/// +/// Manages retries for operations prone to transient failures using Polly's retry policies. +/// +/// +/// This handler is configured to use retry policies dynamically based on attributes applied to job types, +/// allowing for flexible retry strategies such as fixed intervals or exponential back-off. The handler +/// is typically registered as a singleton to efficiently use resources and ensure consistent application +/// of retry policies. Retries and their outcomes are logged to facilitate debugging and monitoring. +/// +/// +/// Usage: +/// +/// [RetryPolicy(retryCount: 3, delayFactor: 2)] +/// public class MyJob +/// { +/// public async Task RunAsync(JobExecutionContext context, CancellationToken token) +/// { +/// // Job logic that may require retries +/// } +/// } +/// +/// +/// +internal sealed partial class RetryHandler +{ + private readonly ILogger logger; + private readonly IServiceProvider serviceProvider; + + public RetryHandler( + ILogger logger, + IServiceProvider serviceProvider) + { + this.logger = logger; + this.serviceProvider = serviceProvider; + } + + public async Task ExecuteAsync(Func operation, JobExecutionContext runContext, CancellationToken cancellationToken) + { + try + { + var retryPolicyAttribute = runContext.JobType.GetCustomAttribute(); + var retryPolicy = retryPolicyAttribute?.CreatePolicy(serviceProvider) ?? Policy.NoOpAsync(); + + // Execute the operation using the given retry policy + await retryPolicy.ExecuteAsync(() => + { + runContext.Attempts++; + return operation(cancellationToken); + }); + } + catch (Exception ex) + { + LogRetryHandlerException(ex.Message); + throw; // Ensure exceptions are not swallowed if not handled internally + } + } + + [LoggerMessage(LogLevel.Error, "Error occurred during an operation with retries. {Message}")] + private partial void LogRetryHandlerException(string message); + + [LoggerMessage(LogLevel.Debug, "Attempt {RetryCount} for {JobName}")] + private partial void LogRetryAttempt(int retryCount, string jobName); +} diff --git a/src/LinkDotNet.NCronJob/RetryPolicies/RetryPolicyAttribute.cs b/src/LinkDotNet.NCronJob/RetryPolicies/RetryPolicyAttribute.cs new file mode 100644 index 0000000..a27da09 --- /dev/null +++ b/src/LinkDotNet.NCronJob/RetryPolicies/RetryPolicyAttribute.cs @@ -0,0 +1,40 @@ +using Polly; + +namespace LinkDotNet.NCronJob; + +/// +/// Decorates a class with a retry policy using an exponential back-off strategy provided by the . +/// This attribute is typically applied to classes that implement jobs or operations which are susceptible to transient failures. +/// +/// +/// The exponential back-off strategy increases the delay between retry attempts exponentially, which is useful for scenarios where repeated failures +/// are likely to be resolved by allowing more time before retrying. +/// +public sealed class RetryPolicyAttribute : RetryPolicyBaseAttribute +{ + + /// + /// Gets the type of policy creator used to generate the retry policy. + /// The type of policy determines how retries are performed, such as using + /// for exponential delay increases between retries, + /// or for consistent delay periods between retries. + /// + public PolicyType PolicyCreatorType { get; } + + /// + /// Initializes a new instance of the class with specified retry count and policy type. + /// + /// The maximum number of retry attempts. + /// The type of retry policy to create, as defined by the enum. + public RetryPolicyAttribute(int retryCount = 3, PolicyType policyCreatorType = PolicyType.ExponentialBackoff) + { + RetryCount = retryCount; + PolicyCreatorType = policyCreatorType; + } + + internal override IAsyncPolicy CreatePolicy(IServiceProvider serviceProvider) + { + var factory = new PolicyCreatorFactory(serviceProvider); + return factory.CreatePolicy(PolicyCreatorType, RetryCount, 2); + } +} diff --git a/src/LinkDotNet.NCronJob/RetryPolicies/RetryPolicyAttribute.generic.cs b/src/LinkDotNet.NCronJob/RetryPolicies/RetryPolicyAttribute.generic.cs new file mode 100644 index 0000000..6984611 --- /dev/null +++ b/src/LinkDotNet.NCronJob/RetryPolicies/RetryPolicyAttribute.generic.cs @@ -0,0 +1,47 @@ +using Polly; + +namespace LinkDotNet.NCronJob; + +/// +/// Applies a retry policy to Jobs, specifying the type of retry policy creator to configure the retry behavior dynamically. +/// +/// +/// The type of the policy creator which must implement and have a parameterless constructor. +/// This allows the attribute to instantiate the policy creator and apply the specified retry policy. +/// +/// +/// This attribute enables developers to define how retry policies are applied to specific operations or entire classes, +/// making it flexible to integrate different strategies for handling retries, such as exponential back-off or fixed interval retries. +/// +/// +/// Applying a custom retry policy to a method: +/// +/// [RetryPolicyAttribute<CustomRetryPolicyCreator>(retryCount: 5, delayFactor: 1.5)] +/// public class MyJob +/// { +/// public async Task RunAsync(JobExecutionContext context, CancellationToken token) +/// { +/// // Unreliable Job logic that may require retries +/// } +/// } +/// +/// Here, CustomRetryPolicyCreator defines how the retries are performed, and it must implement the . +/// +public sealed class RetryPolicyAttribute : RetryPolicyBaseAttribute where TPolicyCreator : IPolicyCreator, new() +{ + internal override IAsyncPolicy CreatePolicy(IServiceProvider serviceProvider) + { + var factory = new PolicyCreatorFactory(serviceProvider); + var policy = factory.Create(RetryCount, DelayFactor); + return policy; + } + + /// + public RetryPolicyAttribute(int retryCount = 3, double delayFactor = 2) + { + ArgumentOutOfRangeException.ThrowIfNegativeOrZero(retryCount); + ArgumentOutOfRangeException.ThrowIfNegative(delayFactor); + this.DelayFactor = delayFactor; + this.RetryCount = retryCount; + } +} diff --git a/src/LinkDotNet.NCronJob/RetryPolicies/RetryPolicyBaseAttribute.cs b/src/LinkDotNet.NCronJob/RetryPolicies/RetryPolicyBaseAttribute.cs new file mode 100644 index 0000000..3a94770 --- /dev/null +++ b/src/LinkDotNet.NCronJob/RetryPolicies/RetryPolicyBaseAttribute.cs @@ -0,0 +1,35 @@ +using Polly; + +namespace LinkDotNet.NCronJob; + +/// +/// Abstract base class for defining retry policy attributes, encapsulating common parameters like retry count and delay factor. +/// Derived classes must implement the creation of specific retry policies tailored to operational needs. +/// +[AttributeUsage(AttributeTargets.Class, Inherited = false)] +public abstract class RetryPolicyBaseAttribute : Attribute +{ + /// + /// The factor that determines the delay between retry attempts. + /// This value can influence the duration between retries, for example, in an exponential backoff strategy, + /// where the delay increases exponentially with each attempt. + /// + public double DelayFactor { get; protected set; } + + /// + /// Gets the number of retry attempts before giving up. The default is 3 retries. + /// A retry is considered an attempt to execute the operation again after a failure. + /// + /// + /// The number of times to retry the decorated operation if it fails, before failing permanently. + /// + public int RetryCount { get; protected set; } + + /// + /// When implemented in a derived class, creates an that defines the retry behavior for an operation. + /// This method must be implemented to return a specific policy instance based on the retry settings and the operational context. + /// + /// The service provider that can be used to resolve dependencies needed by the policy. + /// An instance of configured according to the retry settings. + internal abstract IAsyncPolicy CreatePolicy(IServiceProvider serviceProvider); +} diff --git a/src/LinkDotNet.NCronJob/Scheduler/CronScheduler.LogMessage.cs b/src/LinkDotNet.NCronJob/Scheduler/CronScheduler.LogMessage.cs index a84b6b4..e3b45b0 100644 --- a/src/LinkDotNet.NCronJob/Scheduler/CronScheduler.LogMessage.cs +++ b/src/LinkDotNet.NCronJob/Scheduler/CronScheduler.LogMessage.cs @@ -6,4 +6,19 @@ internal sealed partial class CronScheduler { [LoggerMessage(LogLevel.Debug, "Next run of job '{JobType}' is at {NextRun} UTC")] private partial void LogNextJobRun(Type jobType, DateTime nextRun); + + [LoggerMessage(LogLevel.Debug, "Running job '{JobType}'.")] + private partial void LogRunningJob(Type jobType); + + [LoggerMessage(LogLevel.Debug, "Job completed successfully: '{JobType}'.")] + private partial void LogCompletedJob(Type jobType); + + [LoggerMessage(LogLevel.Warning, "Exception occurred in job {JobType}: {Message}")] + private partial void LogExceptionInJob(string message, Type jobType); + + [LoggerMessage(LogLevel.Trace, "Cancellation requested for CronScheduler from stopToken.")] + private partial void LogCancellationRequestedInJob(); + + [LoggerMessage(LogLevel.Trace, "Operation was cancelled.")] + private partial void LogCancellationOperationInJob(); } diff --git a/src/LinkDotNet.NCronJob/Scheduler/CronScheduler.cs b/src/LinkDotNet.NCronJob/Scheduler/CronScheduler.cs index 25a6cd1..8ee71db 100644 --- a/src/LinkDotNet.NCronJob/Scheduler/CronScheduler.cs +++ b/src/LinkDotNet.NCronJob/Scheduler/CronScheduler.cs @@ -1,47 +1,164 @@ using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; +using System.Collections.Concurrent; +using System.Reflection; namespace LinkDotNet.NCronJob; -/// -/// Represents a background service that schedules jobs based on a cron expression. -/// internal sealed partial class CronScheduler : BackgroundService { private readonly JobExecutor jobExecutor; private readonly CronRegistry registry; private readonly TimeProvider timeProvider; private readonly ILogger logger; + private readonly SemaphoreSlim semaphore; + private CancellationTokenSource? shutdown; + private readonly PriorityQueue jobQueue = + new(new JobQueueTupleComparer()); + private readonly int globalConcurrencyLimit; + private readonly ConcurrentDictionary runningJobCounts = []; + public CronScheduler( JobExecutor jobExecutor, CronRegistry registry, TimeProvider timeProvider, - ILogger logger) + ConcurrencySettings concurrencySettings, + ILoggerFactory loggerFactory, + IHostApplicationLifetime lifetime) { this.jobExecutor = jobExecutor; this.registry = registry; this.timeProvider = timeProvider; - this.logger = logger; + this.logger = loggerFactory.CreateLogger(); + this.globalConcurrencyLimit = concurrencySettings.MaxDegreeOfParallelism; + this.semaphore = new SemaphoreSlim(concurrencySettings.MaxDegreeOfParallelism); + + lifetime.ApplicationStopping.Register(() => this.shutdown?.Cancel()); + } + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + shutdown = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken); + var stopToken = shutdown.Token; + stopToken.Register(LogCancellationRequestedInJob); + var runningTasks = new List(); + + ScheduleInitialJobs(); + + try + { + while (!stopToken.IsCancellationRequested && jobQueue.Count > 0) + { + // Remove completed or canceled tasks from the list to avoid memory overflow + runningTasks.RemoveAll(t => t.IsCompleted || t.IsFaulted || t.IsCanceled); + + if (jobQueue.TryPeek(out var nextJob, out var priorityTuple)) + { + var utcNow = timeProvider.GetUtcNow().DateTime; + var delay = priorityTuple.NextRunTime - utcNow; + if (delay > TimeSpan.Zero) + { + await Task.Delay(delay, timeProvider, stopToken); + } + + if (stopToken.IsCancellationRequested) + break; + + // Recheck the queue to confirm that the next job is still the correct job to execute + if (jobQueue.TryPeek(out var confirmedNextJob, out _) + && confirmedNextJob == nextJob + && CanStartJob(nextJob) + && runningTasks.Count < globalConcurrencyLimit) + { + jobQueue.Dequeue(); + UpdateRunningJobCount(nextJob.Type, 1); + + await semaphore.WaitAsync(stopToken); + var task = Task.Run(async () => + { + try + { + await ExecuteJob(nextJob, stopToken); + } + finally + { + semaphore.Release(); + UpdateRunningJobCount(nextJob.Type, -1); // Decrement count when job is done + } + }, stopToken); + + runningTasks.Add(task); + ScheduleJob(nextJob); // Reschedule immediately after execution + } + } + + // Wait for at least one task to complete if there are no available slots + if (runningTasks.Count >= globalConcurrencyLimit) + { + // This prevents starting new jobs until at least one current job finishes + await Task.WhenAny(runningTasks); + } + } + + // Wait for all remaining tasks to complete + await Task.WhenAll(runningTasks); + } + catch (OperationCanceledException) + { + LogCancellationOperationInJob(); + } } - protected override Task ExecuteAsync(CancellationToken stoppingToken) + private void ScheduleInitialJobs() { - var tasks = registry.GetAllCronJobs().Select(c => ScheduleJobAsync(c, stoppingToken)); - return Task.WhenAll(tasks); + foreach (var job in registry.GetAllCronJobs()) + { + ScheduleJob(job); + } } - private async Task ScheduleJobAsync(RegistryEntry entry, CancellationToken stoppingToken) + private void ScheduleJob(RegistryEntry job) { - while (!stoppingToken.IsCancellationRequested) + var utcNow = timeProvider.GetUtcNow().DateTime; + var nextRunTime = job.CrontabSchedule?.GetNextOccurrence(utcNow) ?? DateTime.MaxValue; + // higher means more priority + var priorityValue = (int)job.Priority; + jobQueue.Enqueue(job, (nextRunTime, priorityValue)); + } + + + private async Task ExecuteJob(RegistryEntry entry, CancellationToken stoppingToken) + { + try { - var now = timeProvider.GetUtcNow().DateTime; - var runDate = entry.CrontabSchedule!.GetNextOccurrence(now); - LogNextJobRun(entry.Type, runDate); + LogRunningJob(entry.Type); + + await jobExecutor.RunJob(entry, stoppingToken); - var delay = runDate - now; - await Task.Delay(delay, timeProvider, stoppingToken).ConfigureAwait(ConfigureAwaitOptions.ForceYielding); - jobExecutor.RunJob(entry, stoppingToken); + LogCompletedJob(entry.Type); + } + catch (Exception ex) + { + LogExceptionInJob(ex.Message, entry.Type); } } + + private bool CanStartJob(RegistryEntry jobEntry) + { + var attribute = jobEntry.Type.GetCustomAttribute(); + var maxAllowed = attribute?.MaxDegreeOfParallelism ?? 1; // Default to 1 if no attribute is found + var currentCount = runningJobCounts.GetOrAdd(jobEntry.Type, _ => 0); + + return currentCount < maxAllowed; + } + + private void UpdateRunningJobCount(Type jobType, int change) => + runningJobCounts.AddOrUpdate(jobType, change, (type, existingVal) => Math.Max(0, existingVal + change)); + + public override void Dispose() + { + shutdown?.Dispose(); + semaphore.Dispose(); + base.Dispose(); + } } diff --git a/src/LinkDotNet.NCronJob/Scheduler/JobQueueComparer.cs b/src/LinkDotNet.NCronJob/Scheduler/JobQueueComparer.cs new file mode 100644 index 0000000..23d62f5 --- /dev/null +++ b/src/LinkDotNet.NCronJob/Scheduler/JobQueueComparer.cs @@ -0,0 +1,28 @@ +namespace LinkDotNet.NCronJob; + +internal class JobQueueComparer : IComparer +{ + public int Compare(RegistryEntry? x, RegistryEntry? y) + { + if (x == null && y == null) + return 0; + if (x == null) + return -1; + if (y == null) + return 1; + + // Compare next scheduled run times + var nowDateTime = DateTime.UtcNow; + var xNextRunTime = x.CrontabSchedule?.GetNextOccurrence(nowDateTime); + var yNextRunTime = y.CrontabSchedule?.GetNextOccurrence(nowDateTime); + var timeComparison = DateTime.Compare(xNextRunTime.GetValueOrDefault(), yNextRunTime.GetValueOrDefault()); + + if (timeComparison != 0) + return timeComparison; + + // If times are the same, compare by priority (higher priority should come first) + // Reverse comparison because higher enum values should be prioritized + return y.Priority.CompareTo(x.Priority); + } +} + diff --git a/src/LinkDotNet.NCronJob/Scheduler/JobQueueTupleComparer.cs b/src/LinkDotNet.NCronJob/Scheduler/JobQueueTupleComparer.cs new file mode 100644 index 0000000..5a09dcf --- /dev/null +++ b/src/LinkDotNet.NCronJob/Scheduler/JobQueueTupleComparer.cs @@ -0,0 +1,15 @@ +namespace LinkDotNet.NCronJob; + +internal class JobQueueTupleComparer : IComparer<(DateTime NextRunTime, int Priority)> +{ + public int Compare((DateTime NextRunTime, int Priority) x, (DateTime NextRunTime, int Priority) y) + { + // First, compare by DateTime + var timeComparison = DateTime.Compare(x.NextRunTime, y.NextRunTime); + if (timeComparison != 0) + return timeComparison; + + // If times are the same, use the priority where higher values indicate higher priority + return y.Priority.CompareTo(x.Priority); + } +} diff --git a/tests/NCronJob.Tests/ExtensionsTests.cs b/tests/NCronJob.Tests/ExtensionsTests.cs index 89dfe0f..719f8e5 100644 --- a/tests/NCronJob.Tests/ExtensionsTests.cs +++ b/tests/NCronJob.Tests/ExtensionsTests.cs @@ -10,18 +10,20 @@ public class NCronJobTests public void AddingWrongCronExpressionLeadsToException() { var collection = new ServiceCollection(); - var builder = new NCronJobOptionBuilder(collection); + var settings = new ConcurrencySettings { MaxDegreeOfParallelism = Environment.ProcessorCount * 4 }; + var builder = new NCronJobOptionBuilder(collection, settings); Action act = () => builder.AddJob(o => o.WithCronExpression("not-valid")); - act.ShouldThrow(); + act.ShouldThrow(); } [Fact] public void AddingCronJobWithSecondPrecisionExpressionNotThrowException() { var collection = new ServiceCollection(); - var builder = new NCronJobOptionBuilder(collection); + var settings = new ConcurrencySettings { MaxDegreeOfParallelism = Environment.ProcessorCount * 4 }; + var builder = new NCronJobOptionBuilder(collection, settings); Action act = () => builder.AddJob(o => { @@ -31,6 +33,56 @@ public void AddingCronJobWithSecondPrecisionExpressionNotThrowException() act.ShouldNotThrow(); } + [Fact] + public void AddingNullCronExpressionThrowsArgumentNullException() + { + var builder = new JobOptionBuilder(); + Should.Throw(() => builder.WithCronExpression(null!)); + } + + [Fact] + public void AddingCronExpressionWithIncorrectSegmentCountThrowsArgumentException() + { + var builder = new JobOptionBuilder(); + Should.Throw(() => builder.WithCronExpression("* * *")); + } + + [Fact] + public void AddingValidCronExpressionWithMinutePrecisionDoesNotThrowException() + { + var builder = new JobOptionBuilder(); + Should.NotThrow(() => builder.WithCronExpression("5 * * * *")); + } + + [Fact] + public void AddingValidCronExpressionWithSecondPrecisionDoesNotThrowException() + { + var builder = new JobOptionBuilder(); + Should.NotThrow(() => builder.WithCronExpression("30 5 * * * *", true)); + } + + [Fact] + public void AddingCronExpressionWithInvalidSecondPrecisionThrowsArgumentException() + { + var builder = new JobOptionBuilder(); + Should.Throw(() => builder.WithCronExpression("5 * * * *", true)); + } + + + [Fact] + public void AutoDetectSecondPrecisionWhenNotSpecified() + { + var builder = new JobOptionBuilder(); + builder.WithCronExpression("0 0 12 * * ?"); + var options = builder.GetJobOptions(); + options.ShouldContain(o => o.CronExpression == "0 0 12 * * ?" && o.EnableSecondPrecision); + + builder.WithCronExpression("0 1 * * *"); + options = builder.GetJobOptions(); + options.ShouldContain(o => o.CronExpression == "0 1 * * *" && !o.EnableSecondPrecision); + } + + private sealed class FakeJob : IJob { public Task RunAsync(JobExecutionContext context, CancellationToken token) diff --git a/tests/NCronJob.Tests/JobOptionBuilderTests.cs b/tests/NCronJob.Tests/JobOptionBuilderTests.cs index 38ded17..f1c1ecc 100644 --- a/tests/NCronJob.Tests/JobOptionBuilderTests.cs +++ b/tests/NCronJob.Tests/JobOptionBuilderTests.cs @@ -59,4 +59,25 @@ public void ShouldAddMultipleCronJobsEvenWithoutParameters() options[1].EnableSecondPrecision.ShouldBeFalse(); options[1].Parameter.ShouldBeNull(); } + + [Fact] + public void ShouldCreateMultipleJobsWithoutAnd() + { + var builder = new JobOptionBuilder(); + builder.WithCronExpression("* * * * *") + .WithParameter("foo"); + + builder.WithCronExpression("0 * * * *") + .WithParameter("bar"); + + var options = builder.GetJobOptions(); + + options.Count.ShouldBe(2); + options[0].CronExpression.ShouldBe("* * * * *"); + options[0].EnableSecondPrecision.ShouldBeFalse(); + options[0].Parameter.ShouldBe("foo"); + options[1].CronExpression.ShouldBe("0 * * * *"); + options[1].EnableSecondPrecision.ShouldBeFalse(); + options[1].Parameter.ShouldBe("bar"); + } } diff --git a/tests/NCronJob.Tests/MockHostApplicationLifetime.cs b/tests/NCronJob.Tests/MockHostApplicationLifetime.cs new file mode 100644 index 0000000..f7b4c2b --- /dev/null +++ b/tests/NCronJob.Tests/MockHostApplicationLifetime.cs @@ -0,0 +1,14 @@ +using Microsoft.Extensions.Hosting; + +namespace NCronJob.Tests; + +public class MockHostApplicationLifetime : IHostApplicationLifetime +{ + public CancellationToken ApplicationStarted { get; set; } = new CancellationToken(false); + public CancellationToken ApplicationStopping { get; set; } = new CancellationToken(false); + public CancellationToken ApplicationStopped { get; set; } = new CancellationToken(false); + + public void StopApplication() + { + } +} diff --git a/tests/NCronJob.Tests/NCronJob.Tests.csproj b/tests/NCronJob.Tests/NCronJob.Tests.csproj index 82c3cd2..6f255cd 100644 --- a/tests/NCronJob.Tests/NCronJob.Tests.csproj +++ b/tests/NCronJob.Tests/NCronJob.Tests.csproj @@ -14,9 +14,9 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive + - all @@ -25,7 +25,7 @@ - + diff --git a/tests/NCronJob.Tests/NCronJobIntegrationTests.cs b/tests/NCronJob.Tests/NCronJobIntegrationTests.cs index 3cac40d..648383a 100644 --- a/tests/NCronJob.Tests/NCronJobIntegrationTests.cs +++ b/tests/NCronJob.Tests/NCronJobIntegrationTests.cs @@ -4,8 +4,8 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Time.Testing; using Shouldly; -using TimeProviderExtensions; namespace NCronJob.Tests; @@ -14,13 +14,14 @@ public sealed class NCronJobIntegrationTests : JobIntegrationBase [Fact] public async Task CronJobThatIsScheduledEveryMinuteShouldBeExecuted() { - var fakeTimer = TimeProviderFactory.GetTimeProvider(); + var fakeTimer = new FakeTimeProvider(); ServiceCollection.AddSingleton(fakeTimer); ServiceCollection.AddNCronJob(n => n.AddJob(p => p.WithCronExpression("* * * * *"))); var provider = CreateServiceProvider(); await provider.GetRequiredService().StartAsync(CancellationToken); + fakeTimer.Advance(TimeSpan.FromMinutes(1)); var jobFinished = await WaitForJobsOrTimeout(1); jobFinished.ShouldBeTrue(); } @@ -28,21 +29,37 @@ public async Task CronJobThatIsScheduledEveryMinuteShouldBeExecuted() [Fact] public async Task AdvancingTheWholeTimeShouldHaveTenEntries() { - var fakeTimer = TimeProviderFactory.GetTimeProvider(); + var fakeTimer = new FakeTimeProvider(); ServiceCollection.AddSingleton(fakeTimer); ServiceCollection.AddNCronJob(n => n.AddJob(p => p.WithCronExpression("* * * * *"))); var provider = CreateServiceProvider(); await provider.GetRequiredService().StartAsync(CancellationToken); - var jobFinished = await WaitForJobsOrTimeout(10); + void AdvanceTime() => fakeTimer.Advance(TimeSpan.FromMinutes(1)); + var jobFinished = await WaitForJobsOrTimeout(10, AdvanceTime); + jobFinished.ShouldBeTrue(); } + [Fact] + public async Task JobsShouldCancelOnCancellation() + { + var fakeTimer = new FakeTimeProvider(); + ServiceCollection.AddSingleton(fakeTimer); + ServiceCollection.AddNCronJob(n => n.AddJob(p => p.WithCronExpression("* * * * *"))); + var provider = CreateServiceProvider(); + + await provider.GetRequiredService().StartAsync(CancellationToken); + + var jobFinished = await DoNotWaitJustCancel(10); + jobFinished.ShouldBeFalse(); + } + [Fact] public async Task EachJobRunHasItsOwnScope() { - var fakeTimer = new ManualTimeProvider(); + var fakeTimer = new FakeTimeProvider(); var storage = new Storage(); ServiceCollection.AddSingleton(fakeTimer); ServiceCollection.AddSingleton(storage); @@ -63,7 +80,7 @@ public async Task EachJobRunHasItsOwnScope() [Fact] public async Task ExecuteAnInstantJob() { - var fakeTimer = TimeProviderFactory.GetTimeProvider(); + var fakeTimer = new FakeTimeProvider(); ServiceCollection.AddSingleton(fakeTimer); ServiceCollection.AddNCronJob(n => n.AddJob()); var provider = CreateServiceProvider(); @@ -78,13 +95,15 @@ public async Task ExecuteAnInstantJob() [Fact] public async Task CronJobShouldPassDownParameter() { - var fakeTimer = TimeProviderFactory.GetTimeProvider(); + var fakeTimer = new FakeTimeProvider(); ServiceCollection.AddSingleton(fakeTimer); ServiceCollection.AddNCronJob(n => n.AddJob(p => p.WithCronExpression("* * * * *").WithParameter("Hello World"))); var provider = CreateServiceProvider(); await provider.GetRequiredService().StartAsync(CancellationToken); + fakeTimer.Advance(TimeSpan.FromMinutes(1)); + var content = await CommunicationChannel.Reader.ReadAsync(CancellationToken); content.ShouldBe("Hello World"); } @@ -92,7 +111,7 @@ public async Task CronJobShouldPassDownParameter() [Fact] public async Task InstantJobShouldGetParameter() { - var fakeTimer = TimeProviderFactory.GetTimeProvider(); + var fakeTimer = new FakeTimeProvider(); ServiceCollection.AddSingleton(fakeTimer); ServiceCollection.AddNCronJob(n => n.AddJob()); var provider = CreateServiceProvider(); @@ -107,21 +126,25 @@ public async Task InstantJobShouldGetParameter() [Fact] public async Task CronJobThatIsScheduledEverySecondShouldBeExecuted() { - var fakeTimer = TimeProviderFactory.GetTimeProvider(TimeSpan.FromSeconds(1)); + var fakeTimer = new FakeTimeProvider(); + fakeTimer.Advance(TimeSpan.FromSeconds(1)); ServiceCollection.AddSingleton(fakeTimer); ServiceCollection.AddNCronJob(n => n.AddJob(p => p.WithCronExpression("* * * * * *", true))); var provider = CreateServiceProvider(); await provider.GetRequiredService().StartAsync(CancellationToken); - var jobFinished = await WaitForJobsOrTimeout(2); + void AdvanceTime() => fakeTimer.Advance(TimeSpan.FromSeconds(1)); + var jobFinished = await WaitForJobsOrTimeout(10, AdvanceTime); + jobFinished.ShouldBeTrue(); } [Fact] public async Task CanRunSecondPrecisionAndMinutePrecisionJobs() { - var fakeTimer = TimeProviderFactory.GetTimeProvider(TimeSpan.FromSeconds(1)); + var fakeTimer = new FakeTimeProvider(); + fakeTimer.Advance(TimeSpan.FromSeconds(1)); ServiceCollection.AddSingleton(fakeTimer); ServiceCollection.AddNCronJob(n => n.AddJob( p => p.WithCronExpression("* * * * * *", true).And.WithCronExpression("* * * * *"))); @@ -129,14 +152,15 @@ public async Task CanRunSecondPrecisionAndMinutePrecisionJobs() await provider.GetRequiredService().StartAsync(CancellationToken); - var jobFinished = await WaitForJobsOrTimeout(61); + void AdvanceTime() => fakeTimer.Advance(TimeSpan.FromSeconds(1)); + var jobFinished = await WaitForJobsOrTimeout(61, AdvanceTime); jobFinished.ShouldBeTrue(); } [Fact] public async Task LongRunningJobShouldNotBlockScheduler() { - var fakeTimer = TimeProviderFactory.GetTimeProvider(); + var fakeTimer = new FakeTimeProvider(); ServiceCollection.AddSingleton(fakeTimer); ServiceCollection.AddNCronJob(n => n .AddJob(p => p.WithCronExpression("* * * * *")) @@ -145,6 +169,7 @@ public async Task LongRunningJobShouldNotBlockScheduler() await provider.GetRequiredService().StartAsync(CancellationToken); + fakeTimer.Advance(TimeSpan.FromMinutes(1)); var jobFinished = await WaitForJobsOrTimeout(1); jobFinished.ShouldBeTrue(); } @@ -152,7 +177,7 @@ public async Task LongRunningJobShouldNotBlockScheduler() [Fact] public async Task NotRegisteredJobShouldNotAbortOtherRuns() { - var fakeTimer = TimeProviderFactory.GetTimeProvider(); + var fakeTimer = new FakeTimeProvider(); ServiceCollection.AddSingleton(fakeTimer); ServiceCollection.AddNCronJob(n => n.AddJob(p => p.WithCronExpression("* * * * *"))); ServiceCollection.AddTransient(); @@ -161,21 +186,22 @@ public async Task NotRegisteredJobShouldNotAbortOtherRuns() await provider.GetRequiredService().StartAsync(CancellationToken); + fakeTimer.Advance(TimeSpan.FromMinutes(1)); var jobFinished = await WaitForJobsOrTimeout(1); jobFinished.ShouldBeTrue(); } [Fact] - public void ThrowIfJobWithDependenciesIsNotRegistered() + public async Task ThrowIfJobWithDependenciesIsNotRegistered() { ServiceCollection .AddNCronJob(n => n.AddJob(p => p.WithCronExpression("* * * * *"))); var provider = CreateServiceProvider(); - Assert.Throws(() => + await Assert.ThrowsAsync(async () => { - using var executor = new JobExecutor(provider, NullLogger.Instance); - executor.RunJob(new RegistryEntry(typeof(JobWithDependency), new JobExecutionContext(null), null), CancellationToken.None); + using var executor = provider.CreateScope().ServiceProvider.GetRequiredService(); + await executor.RunJob(new RegistryEntry(typeof(JobWithDependency), new JobExecutionContext(null!, null), null), CancellationToken.None); }); } @@ -192,16 +218,23 @@ private sealed class Storage private sealed class SimpleJob(ChannelWriter writer) : IJob { public async Task RunAsync(JobExecutionContext context, CancellationToken token) - => await writer.WriteAsync(true, token); + { + try + { + context.Output = "Job Completed"; + await writer.WriteAsync(context.Output, token); + } + catch (Exception ex) + { + await writer.WriteAsync(ex, token); + } + } } - private sealed class LongRunningJob : IJob + private sealed class LongRunningJob(TimeProvider timeProvider) : IJob { - public Task RunAsync(JobExecutionContext context, CancellationToken token) - { - Task.Delay(1000, token).GetAwaiter().GetResult(); - return Task.CompletedTask; - } + public async Task RunAsync(JobExecutionContext context, CancellationToken token) => + await Task.Delay(TimeSpan.FromSeconds(10), timeProvider, token); } private sealed class ScopedServiceJob(ChannelWriter writer, Storage storage, GuidGenerator guidGenerator) : IJob @@ -224,4 +257,4 @@ private sealed class JobWithDependency(ChannelWriter writer, GuidGenerat public async Task RunAsync(JobExecutionContext context, CancellationToken token) => await writer.WriteAsync(guidGenerator.NewGuid, token); } -} +} \ No newline at end of file diff --git a/tests/NCronJob.Tests/NCronJobNotificationHandlerTests.cs b/tests/NCronJob.Tests/NCronJobNotificationHandlerTests.cs index 933960c..30eb795 100644 --- a/tests/NCronJob.Tests/NCronJobNotificationHandlerTests.cs +++ b/tests/NCronJob.Tests/NCronJobNotificationHandlerTests.cs @@ -2,6 +2,7 @@ using LinkDotNet.NCronJob; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Time.Testing; using Shouldly; namespace NCronJob.Tests; @@ -11,7 +12,7 @@ public class NCronJobNotificationHandlerTests : JobIntegrationBase [Fact] public async Task ShouldCallNotificationHandlerWhenJobIsDone() { - var fakeTimer = TimeProviderFactory.GetTimeProvider(); + var fakeTimer = new FakeTimeProvider(); ServiceCollection.AddSingleton(fakeTimer); ServiceCollection.AddNCronJob(n => n .AddJob(p => p.WithCronExpression("* * * * *")) @@ -29,7 +30,7 @@ public async Task ShouldCallNotificationHandlerWhenJobIsDone() [Fact] public async Task ShouldPassDownExceptionToNotificationHandler() { - var fakeTimer = TimeProviderFactory.GetTimeProvider(); + var fakeTimer = new FakeTimeProvider(); ServiceCollection.AddSingleton(fakeTimer); ServiceCollection.AddNCronJob(n => n .AddJob(p => p.WithCronExpression("* * * * *")) @@ -47,7 +48,7 @@ public async Task ShouldPassDownExceptionToNotificationHandler() [Fact] public async Task HandlerThatThrowsExceptionShouldNotInfluenceOtherHandlers() { - var fakeTimer = TimeProviderFactory.GetTimeProvider(); + var fakeTimer = new FakeTimeProvider(); ServiceCollection.AddSingleton(fakeTimer); ServiceCollection.AddNCronJob(n => n .AddJob(p => p.WithCronExpression("* * * * *")) @@ -66,7 +67,7 @@ public async Task HandlerThatThrowsExceptionShouldNotInfluenceOtherHandlers() [Fact] public async Task HandlerThatThrowsExceptionInAsyncPartShouldNotInfluenceOtherHandlers() { - var fakeTimer = TimeProviderFactory.GetTimeProvider(); + var fakeTimer = new FakeTimeProvider(); ServiceCollection.AddSingleton(fakeTimer); ServiceCollection.AddNCronJob(n => n .AddJob(p => p.WithCronExpression("* * * * *")) diff --git a/tests/NCronJob.Tests/NCronJobRetryTests.cs b/tests/NCronJob.Tests/NCronJobRetryTests.cs new file mode 100644 index 0000000..f454f6b --- /dev/null +++ b/tests/NCronJob.Tests/NCronJobRetryTests.cs @@ -0,0 +1,105 @@ +using System.Threading.Channels; +using LinkDotNet.NCronJob; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Time.Testing; +using Polly; +using Shouldly; + +namespace NCronJob.Tests; + +public sealed class NCronJobRetryTests : JobIntegrationBase +{ + [Fact] + public async Task JobShouldRetryOnFailure() + { + var fakeTimer = new FakeTimeProvider(); + ServiceCollection.AddSingleton(fakeTimer); + // 3 retries PolicyType.ExponentialBackoff + ServiceCollection.AddSingleton(new MaxFailuresWrapper(3)); + ServiceCollection.AddNCronJob(n => n.AddJob(p => p.WithCronExpression("* * * * *"))); + var provider = CreateServiceProvider(); + + await provider.GetRequiredService().StartAsync(CancellationToken.None); + + fakeTimer.Advance(TimeSpan.FromMinutes(1)); + + // Validate that the job was retried the correct number of times + // Fail 3 times = 3 retries + 1 success + var attempts = await CommunicationChannel.Reader.ReadAsync(CancellationToken); + attempts.ShouldBe(4); + } + + [Fact] + public async Task JobWithCustomPolicyShouldRetryOnFailure() + { + var fakeTimer = new FakeTimeProvider(); + ServiceCollection.AddSingleton(fakeTimer); + // 3 retries custom policy MyCustomPolicyCreator + ServiceCollection.AddSingleton(new MaxFailuresWrapper(3)); + ServiceCollection.AddNCronJob(n => n.AddJob(p => p.WithCronExpression("* * * * *"))); + var provider = CreateServiceProvider(); + + await provider.GetRequiredService().StartAsync(CancellationToken.None); + + fakeTimer.Advance(TimeSpan.FromMinutes(1)); + + // Validate that the job was retried the correct number of times + // Fail 3 times = 3 retries + 1 success + var attempts = await CommunicationChannel.Reader.ReadAsync(CancellationToken); + attempts.ShouldBe(4); + } + + private sealed class MaxFailuresWrapper(int maxFailuresBeforeSuccess = 3) + { + public int MaxFailuresBeforeSuccess { get; set; } = maxFailuresBeforeSuccess; + } + + [RetryPolicy(retryCount: 4, PolicyType.ExponentialBackoff)] + private sealed class FailingJob(ChannelWriter writer, MaxFailuresWrapper maxFailuresWrapper) + : IJob + { + public async Task RunAsync(JobExecutionContext context, CancellationToken token) + { + ArgumentNullException.ThrowIfNull(context); + + var attemptCount = context.Attempts; + + if (attemptCount <= maxFailuresWrapper.MaxFailuresBeforeSuccess) + { + throw new InvalidOperationException("Job Failed"); + } + + await writer.WriteAsync(attemptCount, token); + } + } + + + [RetryPolicy(3, 1)] + private sealed class JobUsingCustomPolicy(ChannelWriter writer, MaxFailuresWrapper maxFailuresWrapper) + : IJob + { + public async Task RunAsync(JobExecutionContext context, CancellationToken token) + { + ArgumentNullException.ThrowIfNull(context); + + var attemptCount = context.Attempts; + + if (attemptCount <= maxFailuresWrapper.MaxFailuresBeforeSuccess) + { + throw new InvalidOperationException("Job Failed"); + } + + await writer.WriteAsync(attemptCount, token); + } + } + + private sealed class MyCustomPolicyCreator : IPolicyCreator + { + public IAsyncPolicy CreatePolicy(int maxRetryAttempts = 3, double delayFactor = 2) => + Policy.Handle() + .WaitAndRetryAsync(maxRetryAttempts, + retryAttempt => TimeSpan.FromSeconds(Math.Pow(delayFactor, retryAttempt))); + } + +} diff --git a/tests/NCronJob.Tests/TestHelper.cs b/tests/NCronJob.Tests/TestHelper.cs index 1abf42e..046fbd9 100644 --- a/tests/NCronJob.Tests/TestHelper.cs +++ b/tests/NCronJob.Tests/TestHelper.cs @@ -1,5 +1,8 @@ +using System.Runtime.CompilerServices; using System.Threading.Channels; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Shouldly; namespace NCronJob.Tests; @@ -17,6 +20,9 @@ protected JobIntegrationBase() ServiceCollection = new(); ServiceCollection.AddLogging() .AddScoped>(_ => CommunicationChannel.Writer); + + var mockLifetime = new MockHostApplicationLifetime(); + ServiceCollection.AddSingleton(mockLifetime); } public void Dispose() @@ -36,7 +42,38 @@ protected virtual void Dispose(bool disposing) protected async Task WaitForJobsOrTimeout(int jobRuns) { - using var timeoutTcs = new CancellationTokenSource(100); + using var timeoutTcs = new CancellationTokenSource(TimeSpan.FromSeconds(2)); + try + { + await Task.WhenAll(GetCompletionJobs(jobRuns, timeoutTcs.Token)); + return true; + } + catch + { + return false; + } + } + + protected async Task WaitForJobsOrTimeout(int jobRuns, Action timeAdvancer) + { + using var timeoutCts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + try + { + await foreach (var jobSuccessful in GetCompletionJobsAsync(jobRuns, timeAdvancer, timeoutCts.Token)) + { + jobSuccessful.ShouldBe("Job Completed"); + } + return true; + } + catch + { + return false; + } + } + + protected async Task DoNotWaitJustCancel(int jobRuns) + { + using var timeoutTcs = new CancellationTokenSource(10); try { await Task.WhenAll(GetCompletionJobs(jobRuns, timeoutTcs.Token)); @@ -55,4 +92,14 @@ protected IEnumerable GetCompletionJobs(int expectedJobCount, Cancellation yield return CommunicationChannel.Reader.ReadAsync(cancellationToken).AsTask(); } } + + private async IAsyncEnumerable GetCompletionJobsAsync(int expectedJobCount, Action timeAdvancer, [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + for (var i = 0; i < expectedJobCount; i++) + { + timeAdvancer(); + var jobResult = await CommunicationChannel.Reader.ReadAsync(cancellationToken); + yield return jobResult; + } + } } diff --git a/tests/NCronJob.Tests/TimeProviderFactory.cs b/tests/NCronJob.Tests/TimeProviderFactory.cs deleted file mode 100644 index b9e4104..0000000 --- a/tests/NCronJob.Tests/TimeProviderFactory.cs +++ /dev/null @@ -1,16 +0,0 @@ -using TimeProviderExtensions; - -namespace NCronJob.Tests; - -internal static class TimeProviderFactory -{ - public static ManualTimeProvider GetTimeProvider(TimeSpan? advanceTime = null) - => new() - { - AutoAdvanceBehavior = new AutoAdvanceBehavior - { - UtcNowAdvanceAmount = advanceTime ?? TimeSpan.Zero, - TimerAutoTriggerCount = 1, - }, - }; -}