-
Notifications
You must be signed in to change notification settings - Fork 0
/
Pipeline.cs
150 lines (133 loc) · 5.69 KB
/
Pipeline.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
namespace Ncqrs.EventBus
{
public class Pipeline
{
private const int MaxDegreeOfParallelismForProcessing = 1;
private readonly ElementFetcher _fetcher;
private readonly PipelineProcessor _processor;
private readonly string _name;
private readonly IBrowsableElementStore _elementStore;
private readonly Demultiplexer _demultiplexer;
private readonly BlockingCollection<IProcessingElement> _preProcessingQueue = new BlockingCollection<IProcessingElement>();
private readonly BlockingCollection<IProcessingElement> _postProcessingQueue = new BlockingCollection<IProcessingElement>();
private readonly BlockingCollection<Action> _preDemultiplexingQueue = new BlockingCollection<Action>();
private readonly CancellationTokenSource _cancellation = new CancellationTokenSource();
private readonly Timer _fetchTimer;
public Pipeline(string name, IElementProcessor elementProcessor, IBrowsableElementStore elementStore, IFetchPolicy fetchPolicy)
{
_name = name;
_elementStore = elementStore;
_demultiplexer = new Demultiplexer();
_demultiplexer.EventDemultiplexed += OnDemultiplexed;
_processor = new PipelineProcessor(elementProcessor);
_processor.EventProcessed += OnEventProcessed;
_fetcher = new ElementFetcher(fetchPolicy, _elementStore, name);
_fetcher.ElementFetched += OnElementFetched;
_fetchTimer = new Timer(x => EvaluateFetchPolicy(), null, TimeSpan.Zero, TimeSpan.FromSeconds(1));
}
public static Pipeline CreateWithLazyElementMarking(string name, IElementProcessor elementProcessor, IBrowsableElementStore elementStore)
{
return Create(name,elementProcessor, new LazyMarkingBrowsableElementStore(elementStore));
}
public static Pipeline Create(string name, IElementProcessor elementProcessor,IBrowsableElementStore elementStore)
{
const int minimumPendingEvents = 10;
const int batchSize = 20;
return new Pipeline(name, elementProcessor, elementStore, new ThresholdedFetchPolicy(minimumPendingEvents, batchSize));
}
private void OnDemultiplexed(object sender, ElementDemultiplexedEventArgs e)
{
_preProcessingQueue.Add(e.DemultiplexedElement);
}
private void OnEventProcessed(object sender, ElementProcessedEventArgs e)
{
_preDemultiplexingQueue.Add(() => _demultiplexer.MarkAsProcessed(e.ProcessedElement));
_postProcessingQueue.Add(e.ProcessedElement);
}
private void OnElementFetched(object sender, ElementFetchedEventArgs e)
{
_preDemultiplexingQueue.Add(() => _demultiplexer.Demultiplex(e.ProcessingElement));
}
public void Start()
{
StartProcessor();
StartDemultiplexer();
StartPostProcessor();
EvaluateFetchPolicy();
}
public void Stop()
{
_cancellation.Cancel();
}
private void StartDemultiplexer()
{
Task.Factory.StartNew(DemultiplexEventsAndEvaluateFetchPolicy, _cancellation.Token, TaskCreationOptions.LongRunning);
}
private void DemultiplexEventsAndEvaluateFetchPolicy(object cancellationToken)
{
try
{
var eventStream = _preDemultiplexingQueue.GetConsumingEnumerable((CancellationToken) cancellationToken);
foreach (var evnt in eventStream)
{
evnt();
EvaluateFetchPolicy();
}
}
catch (OperationCanceledException)
{
Debug.WriteLine("DemultiplexEventsAndEvaluateFetchPolicy operation cancelled.");
}
}
private void EvaluateFetchPolicy()
{
_fetcher.EvaluateFetchPolicy(new PipelineState(_preDemultiplexingQueue.Count));
}
private void StartProcessor()
{
for (int i = 0; i < MaxDegreeOfParallelismForProcessing; i++)
{
Task.Factory.StartNew(ProcessElements, _cancellation.Token, TaskCreationOptions.LongRunning);
}
}
private void ProcessElements(object cancellationToken)
{
try
{
var eventStream = _preProcessingQueue.GetConsumingEnumerable((CancellationToken)cancellationToken);
foreach (var evnt in eventStream)
{
_processor.ProcessNext(evnt);
}
}
catch (OperationCanceledException)
{
Debug.WriteLine("ProcessElements operation cancelled.");
}
}
private void StartPostProcessor()
{
Task.Factory.StartNew(PostProcessElements, _cancellation.Token, TaskCreationOptions.LongRunning);
}
private void PostProcessElements(object cancellationToken)
{
try
{
var eventStream = _postProcessingQueue.GetConsumingEnumerable((CancellationToken)cancellationToken);
foreach (var evnt in eventStream)
{
_elementStore.MarkLastProcessedElement(_name, evnt);
}
}
catch (OperationCanceledException)
{
Debug.WriteLine("PostProcessElements operation cancelled.");
}
}
}
}