Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename DistributedJob to AtomicJob #69

Merged
merged 1 commit into from
Apr 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions docs/DbProviders.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ var order = new Order(args);
db.Orders.Add(order);
await db.SaveChangesAsync();

var job = await distributedJobFactory.CreateJobAsync(new EfCoreSteppingDbContext(db));
var job = await atomicJobFactory.CreateJobAsync(new EfCoreSteppingDbContext(db));

job.AddStep(new SendOrderCreatedEmailStep(order));
job.AddStep(new SendOrderCreatedSmsStep(order));

await job.StartAsync(); // it will commit the DB transaction
```

Use methods of `IAdvancedDistributedJob`: (it's equivalent to the above)
Use methods of `IAdvancedAtomicJob`: (it's equivalent to the above)

```csharp
var db = serviceProvider.GetRequiredService<MyDbContext>();
Expand All @@ -41,7 +41,7 @@ var order = new Order(args);
db.Orders.Add(order);
await db.SaveChangesAsync();

var job = await distributedJobFactory.CreateAdvancedJobAsync(new EfCoreSteppingDbContext(db));
var job = await atomicJobFactory.CreateAdvancedJobAsync(new EfCoreSteppingDbContext(db));

job.AddStep(new SendOrderCreatedEmailStep(order));
job.AddStep(new SendOrderCreatedSmsStep(order));
Expand Down Expand Up @@ -72,15 +72,15 @@ var order = new Order(args);
var collection = steppingDbContext.Database.GetCollection<Order>("Orders");
await collection.InsertOneAsync(order);

var job = await distributedJobFactory.CreateJobAsync(steppingDbContext);
var job = await atomicJobFactory.CreateJobAsync(steppingDbContext);

job.AddStep(new SendOrderCreatedEmailStep(order));
job.AddStep(new SendOrderCreatedSmsStep(order));

await job.StartAsync(); // it will commit the DB transaction
```

Use methods of `IAdvancedDistributedJob`: (it's equivalent to the above)
Use methods of `IAdvancedAtomicJob`: (it's equivalent to the above)

```csharp
var client = new MongoClient(connectionString);
Expand All @@ -95,7 +95,7 @@ var order = new Order(args);
var collection = steppingDbContext.Database.GetCollection<Order>("Orders");
await collection.InsertOneAsync(order);

var job = await distributedJobFactory.CreateJobAsync(steppingDbContext);
var job = await atomicJobFactory.CreateJobAsync(steppingDbContext);

job.AddStep(new SendOrderCreatedEmailStep(order));
job.AddStep(new SendOrderCreatedSmsStep(order));
Expand Down
8 changes: 4 additions & 4 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ Stepping is a distributed [BASE](https://en.wikipedia.org/wiki/Eventual_consiste

We have provided documentation for the following languages: [English](./README.md), [简体中文](./README.zh-CN.md).

## What are `Job` and `Step` in Stepping?
## What are `AtomicJob` and `Step`?

`Job` is a distributed transaction unit, and `Step` is a specific task inside a job.
`AtomicJob` is a distributed atomic job unit, and `Step` is a specific task inside a job.

A job contains one or many steps, and the transaction manager will execute them in order. If step 1 fails, it will be retried until success, and then step 2 starts to execute.

Expand Down Expand Up @@ -40,7 +40,7 @@ Stepping also supports the "multi-tenant with multi-DB" scenario, meaning it wor
The transaction manager will eventually complete the added steps:

```csharp
var job = await distributedJobFactory.CreateJobAsync();
var job = await atomicJobFactory.CreateJobAsync();

job.AddStep(new RequestBank1TransferOutStep(args)); // step with args
job.AddStep<RequestBank2TransferInStep>(); // step without args
Expand All @@ -61,7 +61,7 @@ var order = new Order(args);
db.Orders.Add(order);
await db.SaveChangesAsync();

var job = await distributedJobFactory.CreateJobAsync(new EfCoreSteppingDbContext(db));
var job = await atomicJobFactory.CreateJobAsync(new EfCoreSteppingDbContext(db));

job.AddStep(new SendOrderCreatedEmailStep(order));
job.AddStep(new SendOrderCreatedSmsStep(order));
Expand Down
8 changes: 4 additions & 4 deletions docs/README.zh-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ Stepping 是一个基于 [BASE](https://en.wikipedia.org/wiki/Eventual_consisten

我们已为以下语言提供了文档:[English](./README.md),[简体中文](./README.zh-CN.md)。

## Stepping 中 `Job` 和 `Step` 是什么?
## `AtomicJob` 和 `Step` 是什么?

`Job` 是一个分布式事务单元,而 `Step` 是 job 中一个特定的任务。
`AtomicJob` 是一个分布式原子作业单元,而 `Step` 是 job 中一个特定的任务。

一个 job(作业)包含了一个或多个 step(步骤),事务管理器会按顺序执行步骤。如果步骤 1 失败了,它将重试直到成功,然后开始执行步骤 2。

Expand Down Expand Up @@ -40,7 +40,7 @@ Stepping 也支持“多租户且多数据库”的场景,这意味着无论
事务管理器会最终完成添加的步骤:

```csharp
var job = await distributedJobFactory.CreateJobAsync();
var job = await atomicJobFactory.CreateJobAsync();

job.AddStep(new RequestBank1TransferOutStep(args)); // 带参数的步骤
job.AddStep<RequestBank2TransferInStep>(); // 不带参数的步骤
Expand All @@ -61,7 +61,7 @@ var order = new Order(args);
db.Orders.Add(order);
await db.SaveChangesAsync();

var job = await distributedJobFactory.CreateJobAsync(new EfCoreSteppingDbContext(db));
var job = await atomicJobFactory.CreateJobAsync(new EfCoreSteppingDbContext(db));

job.AddStep(new SendOrderCreatedEmailStep(order));
job.AddStep(new SendOrderCreatedSmsStep(order));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ private static IServiceCollection AddSteppingServices(this IServiceCollection se
services.TryAddTransient<ISteppingJsonSerializer, NewtonsoftSteppingJsonSerializer>();
services.TryAddTransient<NewtonsoftSteppingJsonSerializer>();

services.TryAddTransient<IDistributedJobGidGenerator, DistributedJobGidGenerator>();
services.TryAddTransient<DistributedJobGidGenerator>();
services.TryAddTransient<IAtomicJobGidGenerator, AtomicJobGidGenerator>();
services.TryAddTransient<AtomicJobGidGenerator>();

services.TryAddTransient<IDistributedJobFactory, DistributedJobFactory>();
services.TryAddTransient<DistributedJobFactory>();
services.TryAddTransient<IAtomicJobFactory, AtomicJobFactory>();
services.TryAddTransient<AtomicJobFactory>();

services.TryAddTransient<IStepArgsSerializer, JsonStepArgsSerializer>();
services.TryAddTransient<JsonStepArgsSerializer>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

namespace Stepping.Core.Jobs;

public class DistributedJob : IAdvancedDistributedJob
public class AtomicJob : IAdvancedAtomicJob
{
public virtual string Gid { get; }
public virtual List<IStep> Steps { get; } = new();
Expand All @@ -25,7 +25,7 @@ public class DistributedJob : IAdvancedDistributedJob
/// You should set <see cref="DbContext"/> for eventual consistency
/// when the current session has DB-write operations in the DB transaction.
/// </summary>
internal DistributedJob(
internal AtomicJob(
string gid,
ISteppingDbContext? dbContext,
IServiceProvider serviceProvider)
Expand All @@ -44,14 +44,14 @@ internal DistributedJob(
}
}

public virtual IDistributedJob AddStep<TStep>(TStep step) where TStep : IStep
public virtual IAtomicJob AddStep<TStep>(TStep step) where TStep : IStep
{
Steps.Add(step);

return this;
}

public virtual IDistributedJob AddStep<TStep>() where TStep : IStepWithoutArgs
public virtual IAtomicJob AddStep<TStep>() where TStep : IStepWithoutArgs
{
Steps.Add(StepResolver.Resolve<TStep>());

Expand Down
32 changes: 32 additions & 0 deletions src/Stepping.Core/Stepping/Core/Jobs/AtomicJobFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
using Microsoft.Extensions.DependencyInjection;
using Stepping.Core.Databases;

namespace Stepping.Core.Jobs;

public class AtomicJobFactory : IAtomicJobFactory
{
protected IServiceProvider ServiceProvider { get; }

public AtomicJobFactory(IServiceProvider serviceProvider)
{
ServiceProvider = serviceProvider;
}

public virtual async Task<IAtomicJob> CreateJobAsync(string? gid, ISteppingDbContext? dbContext)
{
return new AtomicJob(gid ?? await GenerateGidAsync(), dbContext, ServiceProvider);
}

public virtual async Task<IAdvancedAtomicJob> CreateAdvancedJobAsync(string? gid,
ISteppingDbContext? dbContext)
{
return new AtomicJob(gid ?? await GenerateGidAsync(), dbContext, ServiceProvider);
}

protected virtual async Task<string> GenerateGidAsync()
{
var generator = ServiceProvider.GetRequiredService<IAtomicJobGidGenerator>();

return await generator.CreateAsync();
}
}
12 changes: 12 additions & 0 deletions src/Stepping.Core/Stepping/Core/Jobs/AtomicJobFactoryExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using Stepping.Core.Databases;

namespace Stepping.Core.Jobs;

public static class AtomicJobFactoryExtensions
{
public static Task<IAtomicJob> CreateJobAsync(this IAtomicJobFactory factory,
ISteppingDbContext dbContext) => factory.CreateJobAsync(null, dbContext);

public static Task<IAdvancedAtomicJob> CreateAdvancedJobAsync(this IAtomicJobFactory factory,
ISteppingDbContext dbContext) => factory.CreateAdvancedJobAsync(null, dbContext);
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
namespace Stepping.Core.Jobs;

public class DistributedJobGidGenerator : IDistributedJobGidGenerator
public class AtomicJobGidGenerator : IAtomicJobGidGenerator
{
public Task<string> CreateAsync() => Task.FromResult(Guid.NewGuid().ToString());
}
32 changes: 0 additions & 32 deletions src/Stepping.Core/Stepping/Core/Jobs/DistributedJobFactory.cs

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
namespace Stepping.Core.Jobs;

public interface IAdvancedDistributedJob : IDistributedJob
public interface IAdvancedAtomicJob : IAtomicJob
{
/// <summary>
/// Send "prepare" to TM, insert a barrier record to DB.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

namespace Stepping.Core.Jobs;

public interface IDistributedJob
public interface IAtomicJob
{
string Gid { get; }

Expand All @@ -21,12 +21,12 @@ public interface IDistributedJob
/// <summary>
/// Add a step for the job to do in order.
/// </summary>
IDistributedJob AddStep<TStep>(TStep step) where TStep : IStep;
IAtomicJob AddStep<TStep>(TStep step) where TStep : IStep;

/// <summary>
/// Add a step for the job to do in order.
/// </summary>
IDistributedJob AddStep<TStep>() where TStep : IStepWithoutArgs;
IAtomicJob AddStep<TStep>() where TStep : IStepWithoutArgs;

/// <summary>
/// Send "prepare" to TM, insert a barrier record to DB, commit the DB transaction, and send "submit" to TM.
Expand Down
10 changes: 10 additions & 0 deletions src/Stepping.Core/Stepping/Core/Jobs/IAtomicJobFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using Stepping.Core.Databases;

namespace Stepping.Core.Jobs;

public interface IAtomicJobFactory
{
Task<IAtomicJob> CreateJobAsync(string? gid = null, ISteppingDbContext? dbContext = null);

Task<IAdvancedAtomicJob> CreateAdvancedJobAsync(string? gid = null, ISteppingDbContext? dbContext = null);
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
namespace Stepping.Core.Jobs;

public interface IDistributedJobGidGenerator
public interface IAtomicJobGidGenerator
{
Task<string> CreateAsync();
}
10 changes: 0 additions & 10 deletions src/Stepping.Core/Stepping/Core/Jobs/IDistributedJobFactory.cs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ namespace Stepping.Core.TransactionManagers;

public interface ITmClient
{
Task PrepareAsync(IDistributedJob job, CancellationToken cancellationToken = default);
Task PrepareAsync(IAtomicJob job, CancellationToken cancellationToken = default);

Task SubmitAsync(IDistributedJob job, CancellationToken cancellationToken = default);
Task SubmitAsync(IAtomicJob job, CancellationToken cancellationToken = default);
}
Loading