diff --git a/src/WorkflowCore/Models/WorkflowOptions.cs b/src/WorkflowCore/Models/WorkflowOptions.cs index e44597d4c..d0b6d63fb 100644 --- a/src/WorkflowCore/Models/WorkflowOptions.cs +++ b/src/WorkflowCore/Models/WorkflowOptions.cs @@ -40,12 +40,18 @@ public WorkflowOptions(IServiceCollection services) public bool EnablePolling { get; set; } = true; public bool EnableLifeCycleEventsPublisher { get; set; } = true; public EventsPurgerOptions EventsPurgerOptions { get; private set; } = new EventsPurgerOptions(batchSize: 100); + public WorkflowsPurgerOptions WorkflowsPurgerOptions { get; private set; } = new WorkflowsPurgerOptions(batchSize: 100); public void SetEventsPurgerOptions(EventsPurgerOptions eventsPurgerOptions) { EventsPurgerOptions = eventsPurgerOptions; } + public void SetWorkflowsPurgerOptions(WorkflowsPurgerOptions workflowsPurgerOptions) + { + WorkflowsPurgerOptions = workflowsPurgerOptions; + } + public void UsePersistence(Func factory) { PersistanceFactory = factory; diff --git a/src/WorkflowCore/Models/WorkflowsPurgerOptions.cs b/src/WorkflowCore/Models/WorkflowsPurgerOptions.cs new file mode 100644 index 000000000..0d8745f72 --- /dev/null +++ b/src/WorkflowCore/Models/WorkflowsPurgerOptions.cs @@ -0,0 +1,22 @@ +using System; + +namespace WorkflowCore.Models +{ + public class WorkflowsPurgerOptions + { + public int BatchSize { get; } = 100; + public int DeleteCommandTimeoutSeconds { get; } = 120; + + public WorkflowsPurgerOptions(int batchSize, int deleteCommandTimeoutSeconds = 120) + { + if (batchSize < 0) + throw new ArgumentOutOfRangeException("Batch size shoud be greater than 0"); + + if (deleteCommandTimeoutSeconds < 0) + throw new ArgumentOutOfRangeException("Timeout shoud be greater than 0"); + + BatchSize = batchSize; + DeleteCommandTimeoutSeconds = deleteCommandTimeoutSeconds; + } + } +} diff --git a/src/providers/WorkflowCore.Persistence.EntityFramework/Services/WorkflowPurger.cs b/src/providers/WorkflowCore.Persistence.EntityFramework/Services/WorkflowPurger.cs index 904895809..57feca256 100644 --- a/src/providers/WorkflowCore.Persistence.EntityFramework/Services/WorkflowPurger.cs +++ b/src/providers/WorkflowCore.Persistence.EntityFramework/Services/WorkflowPurger.cs @@ -13,33 +13,55 @@ namespace WorkflowCore.Persistence.EntityFramework.Services public class WorkflowPurger : IWorkflowPurger { private readonly IWorkflowDbContextFactory _contextFactory; + private readonly WorkflowsPurgerOptions _options; - public WorkflowPurger(IWorkflowDbContextFactory contextFactory) + public WorkflowPurger(IWorkflowDbContextFactory contextFactory, WorkflowsPurgerOptions options) { _contextFactory = contextFactory; + _options = options; } - + public async Task PurgeWorkflows(WorkflowStatus status, DateTime olderThan, CancellationToken cancellationToken = default) { var olderThanUtc = olderThan.ToUniversalTime(); using (var db = ConstructDbContext()) { - var workflows = await db.Set().Where(x => x.Status == status && x.CompleteTime < olderThanUtc).ToListAsync(cancellationToken); - foreach (var wf in workflows) - { - foreach (var pointer in wf.ExecutionPointers) + int deleteEvents = _options.BatchSize; + db.Database.SetCommandTimeout(_options.DeleteCommandTimeoutSeconds); + + #if NET6_0_OR_GREATER + while(deleteEvents != 0) { - foreach (var extAttr in pointer.ExtensionAttributes) + deleteEvents = await db.Set() + .Where(x => x.Status == status && x.CompleteTime < olderThanUtc) + .Take(_options.BatchSize) + .ExecuteDeleteAsync(cancellationToken); + } + #else + while (deleteEvents != 0) + { + var workflows = db.Set() + .Where(x => x.Status == status && x.CompleteTime < olderThanUtc) + .Take(_options.BatchSize); + + foreach (var wf in workflows) { - db.Remove(extAttr); + foreach (var pointer in wf.ExecutionPointers) + { + foreach (var extAttr in pointer.ExtensionAttributes) + { + db.Remove(extAttr); + } + + db.Remove(pointer); + } + db.Remove(wf); } - db.Remove(pointer); + deleteEvents = await db.SaveChangesAsync(cancellationToken); } - db.Remove(wf); - } - await db.SaveChangesAsync(cancellationToken); + #endif } } diff --git a/src/providers/WorkflowCore.Persistence.MySQL/ServiceCollectionExtensions.cs b/src/providers/WorkflowCore.Persistence.MySQL/ServiceCollectionExtensions.cs index 3c885b964..e54a3be88 100644 --- a/src/providers/WorkflowCore.Persistence.MySQL/ServiceCollectionExtensions.cs +++ b/src/providers/WorkflowCore.Persistence.MySQL/ServiceCollectionExtensions.cs @@ -12,7 +12,7 @@ public static class ServiceCollectionExtensions public static WorkflowOptions UseMySQL(this WorkflowOptions options, string connectionString, bool canCreateDB, bool canMigrateDB, Action mysqlOptionsAction = null) { options.UsePersistence(sp => new EntityFrameworkPersistenceProvider(new MysqlContextFactory(connectionString, mysqlOptionsAction), canCreateDB, canMigrateDB)); - options.Services.AddTransient(sp => new WorkflowPurger(new MysqlContextFactory(connectionString, mysqlOptionsAction))); + options.Services.AddTransient(sp => new WorkflowPurger(new MysqlContextFactory(connectionString, mysqlOptionsAction), options.WorkflowsPurgerOptions)); options.Services.AddTransient(sp => new EventsPurger(new MysqlContextFactory(connectionString, mysqlOptionsAction), options.EventsPurgerOptions)); return options; } diff --git a/src/providers/WorkflowCore.Persistence.PostgreSQL/ServiceCollectionExtensions.cs b/src/providers/WorkflowCore.Persistence.PostgreSQL/ServiceCollectionExtensions.cs index 841f2e937..57c45f437 100644 --- a/src/providers/WorkflowCore.Persistence.PostgreSQL/ServiceCollectionExtensions.cs +++ b/src/providers/WorkflowCore.Persistence.PostgreSQL/ServiceCollectionExtensions.cs @@ -17,7 +17,7 @@ static EntityFrameworkPersistenceProvider GetPersistenceFactory(string connectio } options.UsePersistence(sp => GetPersistenceFactory(connectionString, canCreateDB, canMigrateDB, schemaName)); - options.Services.AddTransient(sp => new WorkflowPurger(new PostgresContextFactory(connectionString, schemaName))); + options.Services.AddTransient(sp => new WorkflowPurger(new PostgresContextFactory(connectionString, schemaName), options.WorkflowsPurgerOptions)); options.Services.AddTransient(sp => new EventsPurger(new PostgresContextFactory(connectionString, schemaName), options.EventsPurgerOptions)); options.Services.AddTransient(sp => GetPersistenceFactory(connectionString, canCreateDB, canMigrateDB, schemaName)); return options; diff --git a/src/providers/WorkflowCore.Persistence.SqlServer/ServiceCollectionExtensions.cs b/src/providers/WorkflowCore.Persistence.SqlServer/ServiceCollectionExtensions.cs index 10aa7ede1..8591903cb 100644 --- a/src/providers/WorkflowCore.Persistence.SqlServer/ServiceCollectionExtensions.cs +++ b/src/providers/WorkflowCore.Persistence.SqlServer/ServiceCollectionExtensions.cs @@ -12,7 +12,7 @@ public static class ServiceCollectionExtensions public static WorkflowOptions UseSqlServer(this WorkflowOptions options, string connectionString, bool canCreateDB, bool canMigrateDB, Action initAction = null) { options.UsePersistence(sp => new EntityFrameworkPersistenceProvider(new SqlContextFactory(connectionString, initAction), canCreateDB, canMigrateDB)); - options.Services.AddTransient(sp => new WorkflowPurger(new SqlContextFactory(connectionString, initAction))); + options.Services.AddTransient(sp => new WorkflowPurger(new SqlContextFactory(connectionString, initAction), options.WorkflowsPurgerOptions)); options.Services.AddTransient(sp => new EventsPurger(new SqlContextFactory(connectionString, initAction), options.EventsPurgerOptions)); return options; } diff --git a/src/providers/WorkflowCore.Persistence.Sqlite/ServiceCollectionExtensions.cs b/src/providers/WorkflowCore.Persistence.Sqlite/ServiceCollectionExtensions.cs index 0d5dd5a8e..983e83cf6 100644 --- a/src/providers/WorkflowCore.Persistence.Sqlite/ServiceCollectionExtensions.cs +++ b/src/providers/WorkflowCore.Persistence.Sqlite/ServiceCollectionExtensions.cs @@ -12,7 +12,7 @@ public static class ServiceCollectionExtensions public static WorkflowOptions UseSqlite(this WorkflowOptions options, string connectionString, bool canCreateDB) { options.UsePersistence(sp => new EntityFrameworkPersistenceProvider(new SqliteContextFactory(connectionString), canCreateDB, false)); - options.Services.AddTransient(sp => new WorkflowPurger(new SqliteContextFactory(connectionString))); + options.Services.AddTransient(sp => new WorkflowPurger(new SqliteContextFactory(connectionString), options.WorkflowsPurgerOptions)); return options; } }