Skip to content

Commit

Permalink
Implement batching for WorkflowsPurger (#22)
Browse files Browse the repository at this point in the history
Implement batching for WorkflowsPurger

---------

Co-authored-by: Viktor Shevchenko <viktor.shevchenko@idealscorp.com>
  • Loading branch information
1 parent f04ba30 commit f70b43c
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 16 deletions.
6 changes: 6 additions & 0 deletions src/WorkflowCore/Models/WorkflowOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,18 @@ public WorkflowOptions(IServiceCollection services)
public bool EnablePolling { get; set; } = true;
public bool EnableLifeCycleEventsPublisher { get; set; } = true;
public EventsPurgerOptions EventsPurgerOptions { get; private set; } = new EventsPurgerOptions(batchSize: 100);
public WorkflowsPurgerOptions WorkflowsPurgerOptions { get; private set; } = new WorkflowsPurgerOptions(batchSize: 100);

public void SetEventsPurgerOptions(EventsPurgerOptions eventsPurgerOptions)
{
EventsPurgerOptions = eventsPurgerOptions;
}

public void SetWorkflowsPurgerOptions(WorkflowsPurgerOptions workflowsPurgerOptions)
{
WorkflowsPurgerOptions = workflowsPurgerOptions;
}

public void UsePersistence(Func<IServiceProvider, IPersistenceProvider> factory)
{
PersistanceFactory = factory;
Expand Down
22 changes: 22 additions & 0 deletions src/WorkflowCore/Models/WorkflowsPurgerOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using System;

namespace WorkflowCore.Models
{
public class WorkflowsPurgerOptions
{
public int BatchSize { get; } = 100;
public int DeleteCommandTimeoutSeconds { get; } = 120;

public WorkflowsPurgerOptions(int batchSize, int deleteCommandTimeoutSeconds = 120)
{
if (batchSize < 0)
throw new ArgumentOutOfRangeException("Batch size shoud be greater than 0");

if (deleteCommandTimeoutSeconds < 0)
throw new ArgumentOutOfRangeException("Timeout shoud be greater than 0");

BatchSize = batchSize;
DeleteCommandTimeoutSeconds = deleteCommandTimeoutSeconds;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,33 +13,55 @@ namespace WorkflowCore.Persistence.EntityFramework.Services
public class WorkflowPurger : IWorkflowPurger
{
private readonly IWorkflowDbContextFactory _contextFactory;
private readonly WorkflowsPurgerOptions _options;

public WorkflowPurger(IWorkflowDbContextFactory contextFactory)
public WorkflowPurger(IWorkflowDbContextFactory contextFactory, WorkflowsPurgerOptions options)
{
_contextFactory = contextFactory;
_options = options;
}

public async Task PurgeWorkflows(WorkflowStatus status, DateTime olderThan, CancellationToken cancellationToken = default)
{
var olderThanUtc = olderThan.ToUniversalTime();
using (var db = ConstructDbContext())
{
var workflows = await db.Set<PersistedWorkflow>().Where(x => x.Status == status && x.CompleteTime < olderThanUtc).ToListAsync(cancellationToken);
foreach (var wf in workflows)
{
foreach (var pointer in wf.ExecutionPointers)
int deleteEvents = _options.BatchSize;
db.Database.SetCommandTimeout(_options.DeleteCommandTimeoutSeconds);

#if NET6_0_OR_GREATER
while(deleteEvents != 0)
{
foreach (var extAttr in pointer.ExtensionAttributes)
deleteEvents = await db.Set<PersistedWorkflow>()
.Where(x => x.Status == status && x.CompleteTime < olderThanUtc)
.Take(_options.BatchSize)
.ExecuteDeleteAsync(cancellationToken);
}
#else
while (deleteEvents != 0)
{
var workflows = db.Set<PersistedWorkflow>()
.Where(x => x.Status == status && x.CompleteTime < olderThanUtc)
.Take(_options.BatchSize);

foreach (var wf in workflows)
{
db.Remove(extAttr);
foreach (var pointer in wf.ExecutionPointers)
{
foreach (var extAttr in pointer.ExtensionAttributes)
{
db.Remove(extAttr);
}

db.Remove(pointer);
}
db.Remove(wf);
}

db.Remove(pointer);
deleteEvents = await db.SaveChangesAsync(cancellationToken);
}
db.Remove(wf);
}

await db.SaveChangesAsync(cancellationToken);
#endif
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public static class ServiceCollectionExtensions
public static WorkflowOptions UseMySQL(this WorkflowOptions options, string connectionString, bool canCreateDB, bool canMigrateDB, Action<MySqlDbContextOptionsBuilder> mysqlOptionsAction = null)
{
options.UsePersistence(sp => new EntityFrameworkPersistenceProvider(new MysqlContextFactory(connectionString, mysqlOptionsAction), canCreateDB, canMigrateDB));
options.Services.AddTransient<IWorkflowPurger>(sp => new WorkflowPurger(new MysqlContextFactory(connectionString, mysqlOptionsAction)));
options.Services.AddTransient<IWorkflowPurger>(sp => new WorkflowPurger(new MysqlContextFactory(connectionString, mysqlOptionsAction), options.WorkflowsPurgerOptions));
options.Services.AddTransient<IEventsPurger>(sp => new EventsPurger(new MysqlContextFactory(connectionString, mysqlOptionsAction), options.EventsPurgerOptions));
return options;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ static EntityFrameworkPersistenceProvider GetPersistenceFactory(string connectio
}

options.UsePersistence(sp => GetPersistenceFactory(connectionString, canCreateDB, canMigrateDB, schemaName));
options.Services.AddTransient<IWorkflowPurger>(sp => new WorkflowPurger(new PostgresContextFactory(connectionString, schemaName)));
options.Services.AddTransient<IWorkflowPurger>(sp => new WorkflowPurger(new PostgresContextFactory(connectionString, schemaName), options.WorkflowsPurgerOptions));
options.Services.AddTransient<IEventsPurger>(sp => new EventsPurger(new PostgresContextFactory(connectionString, schemaName), options.EventsPurgerOptions));
options.Services.AddTransient<IExtendedPersistenceProvider>(sp => GetPersistenceFactory(connectionString, canCreateDB, canMigrateDB, schemaName));
return options;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public static class ServiceCollectionExtensions
public static WorkflowOptions UseSqlServer(this WorkflowOptions options, string connectionString, bool canCreateDB, bool canMigrateDB, Action<DbConnection> initAction = null)
{
options.UsePersistence(sp => new EntityFrameworkPersistenceProvider(new SqlContextFactory(connectionString, initAction), canCreateDB, canMigrateDB));
options.Services.AddTransient<IWorkflowPurger>(sp => new WorkflowPurger(new SqlContextFactory(connectionString, initAction)));
options.Services.AddTransient<IWorkflowPurger>(sp => new WorkflowPurger(new SqlContextFactory(connectionString, initAction), options.WorkflowsPurgerOptions));
options.Services.AddTransient<IEventsPurger>(sp => new EventsPurger(new SqlContextFactory(connectionString, initAction), options.EventsPurgerOptions));
return options;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public static class ServiceCollectionExtensions
public static WorkflowOptions UseSqlite(this WorkflowOptions options, string connectionString, bool canCreateDB)
{
options.UsePersistence(sp => new EntityFrameworkPersistenceProvider(new SqliteContextFactory(connectionString), canCreateDB, false));
options.Services.AddTransient<IWorkflowPurger>(sp => new WorkflowPurger(new SqliteContextFactory(connectionString)));
options.Services.AddTransient<IWorkflowPurger>(sp => new WorkflowPurger(new SqliteContextFactory(connectionString), options.WorkflowsPurgerOptions));
return options;
}
}
Expand Down

0 comments on commit f70b43c

Please sign in to comment.