/
Actor.cs
275 lines (223 loc) · 8.47 KB
/
Actor.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
using System.Collections.Concurrent;
using NewLife.Log;
namespace NewLife.Model;
/// <summary>无锁并行编程模型</summary>
/// <remarks>
/// 文档 https://newlifex.com/core/actor
///
/// 独立线程轮询消息队列,简单设计避免影响默认线程池。
/// 适用于任务颗粒较大的场合,例如IO操作。
/// </remarks>
public interface IActor
{
/// <summary>添加消息,驱动内部处理</summary>
/// <param name="message">消息</param>
/// <param name="sender">发送者</param>
/// <returns>返回待处理消息数</returns>
Int32 Tell(Object message, IActor? sender = null);
}
/// <summary>Actor上下文</summary>
public class ActorContext
{
/// <summary>发送者</summary>
public IActor? Sender { get; set; }
/// <summary>消息</summary>
public Object? Message { get; set; }
}
/// <summary>无锁并行编程模型</summary>
/// <remarks>
/// 独立线程轮询消息队列,简单设计避免影响默认线程池。
/// </remarks>
public abstract class Actor : DisposeBase, IActor
{
#region 属性
/// <summary>名称</summary>
public String Name { get; set; }
/// <summary>是否启用</summary>
public Boolean Active { get; private set; }
/// <summary>受限容量。最大可堆积的消息数,默认Int32.MaxValue</summary>
public Int32 BoundedCapacity { get; set; } = Int32.MaxValue;
/// <summary>批大小。每次处理消息数,默认1,大于1表示启用批量处理模式</summary>
public Int32 BatchSize { get; set; } = 1;
/// <summary>是否长时间运行。长时间运行任务使用独立线程,默认false</summary>
public Boolean LongRunning { get; set; }
/// <summary>存放消息的邮箱。默认FIFO实现,外部可覆盖</summary>
protected BlockingCollection<ActorContext>? MailBox { get; set; }
/// <summary>
/// 性能追踪器
/// </summary>
public ITracer? Tracer { get; set; }
/// <summary>
/// 父级性能追踪器。用于把内外调用链关联起来
/// </summary>
public ISpan? TracerParent { get; set; }
private Task? _task;
private Exception? _error;
private CancellationTokenSource? _source;
///// <summary>已完成任务</summary>
//public static Task CompletedTask { get; } = Task.CompletedTask;
#endregion
#region 构造
/// <summary>实例化</summary>
public Actor() => Name = GetType().Name.TrimEnd("Actor");
/// <summary>销毁</summary>
/// <param name="disposing"></param>
protected override void Dispose(Boolean disposing)
{
base.Dispose(disposing);
_error = null;
Stop(0);
if (_source != null)
{
_source.Cancel();
_source.TryDispose();
}
_task.TryDispose();
MailBox.TryDispose();
}
/// <summary>已重载。显示名称</summary>
/// <returns></returns>
public override String ToString() => Name;
#endregion
#region 方法
/// <summary>通知开始处理</summary>
/// <remarks>
/// 添加消息时自动触发
/// </remarks>
public virtual Task? Start()
{
if (Active) return _task;
lock (this)
{
if (Active) return _task;
if (Tracer == null && TracerParent is DefaultSpan ds) Tracer = ds.Builder?.Tracer;
using var span = Tracer?.NewSpan("actor:Start", Name);
_source = new CancellationTokenSource();
MailBox ??= new BlockingCollection<ActorContext>(BoundedCapacity);
// 启动异步
if (_task == null)
{
lock (this)
{
_task ??= OnStart();
}
}
Active = true;
return _task;
}
}
/// <summary>开始时,返回执行线程包装任务</summary>
/// <returns></returns>
protected virtual Task OnStart() => Task.Factory.StartNew(DoActorWork, LongRunning ? TaskCreationOptions.LongRunning : TaskCreationOptions.None);
/// <summary>通知停止添加消息,并等待处理完成</summary>
/// <param name="msTimeout">等待的毫秒数。0表示不等待,-1表示无限等待</param>
public virtual Boolean Stop(Int32 msTimeout = 0)
{
using var span = Tracer?.NewSpan("actor:Stop", $"{Name} msTimeout={msTimeout}");
try
{
MailBox?.CompleteAdding();
if (msTimeout > 0 && _source != null && !_source.IsCancellationRequested)
_source.CancelAfter(msTimeout);
if (_error != null) throw _error;
if (msTimeout == 0 || _task == null) return true;
return _task.Wait(msTimeout);
}
catch (Exception ex)
{
span?.SetError(ex, null);
throw;
}
}
/// <summary>添加消息,驱动内部处理</summary>
/// <param name="message">消息</param>
/// <param name="sender">发送者</param>
/// <returns>返回待处理消息数</returns>
public virtual Int32 Tell(Object message, IActor? sender = null)
{
//using var span = Tracer?.NewSpan("actor:Tell", Name);
if (!Active)
{
if (_error != null) throw _error;
// 自动开始
Start();
if (!Active) throw new ObjectDisposedException(nameof(Actor));
}
var box = MailBox ?? throw new ArgumentNullException(nameof(MailBox));
box.Add(new ActorContext { Sender = sender, Message = message });
return box.Count;
}
/// <summary>循环消费消息</summary>
private void DoActorWork()
{
DefaultSpan.Current = TracerParent;
using var span = Tracer?.NewSpan("actor:Loop", Name);
try
{
Loop();
}
catch (OperationCanceledException) { }
catch (InvalidOperationException) { /*CompleteAdding后Take会抛出IOE异常*/}
catch (Exception ex)
{
span?.SetError(ex, null);
_error = ex;
XTrace.WriteException(ex);
}
Active = false;
}
/// <summary>循环消费消息</summary>
protected virtual void Loop()
{
var box = MailBox;
if (box == null || _source == null) return;
var span = DefaultSpan.Current;
var token = _source.Token;
while (!_source.IsCancellationRequested && !box.IsCompleted)
{
if (BatchSize <= 1)
{
var ctx = box.Take(token);
var task = ReceiveAsync(ctx, token);
task?.Wait(token);
if (span != null) span.Value++;
}
else
{
var list = new List<ActorContext>();
// 阻塞取一个
var ctx = box.Take(token);
list.Add(ctx);
// 不阻塞取一批
for (var i = 1; i < BatchSize; i++)
{
if (!box.TryTake(out ctx)) break;
list.Add(ctx);
}
if (span != null) span.Value += list.Count;
var task = ReceiveAsync(list.ToArray(), token);
task?.Wait(token);
}
}
}
#if NET45
/// <summary>处理消息。批大小为1时使用该方法</summary>
/// <param name="context">上下文</param>
/// <param name="cancellationToken">取消通知</param>
protected virtual Task ReceiveAsync(ActorContext context, CancellationToken cancellationToken) => Task.FromResult(0);
/// <summary>批量处理消息。批大小大于1时使用该方法</summary>
/// <param name="contexts">上下文集合</param>
/// <param name="cancellationToken">取消通知</param>
protected virtual Task ReceiveAsync(ActorContext[] contexts, CancellationToken cancellationToken) => Task.FromResult(0);
#else
/// <summary>处理消息。批大小为1时使用该方法</summary>
/// <param name="context">上下文</param>
/// <param name="cancellationToken">取消通知</param>
protected virtual Task ReceiveAsync(ActorContext context, CancellationToken cancellationToken) => Task.CompletedTask;
/// <summary>批量处理消息。批大小大于1时使用该方法</summary>
/// <param name="contexts">上下文集合</param>
/// <param name="cancellationToken">取消通知</param>
protected virtual Task ReceiveAsync(ActorContext[] contexts, CancellationToken cancellationToken) => Task.CompletedTask;
#endif
#endregion
}