This repository has been archived by the owner on Nov 1, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 510
/
PortableThreadPool.WorkerThread.cs
268 lines (242 loc) · 12.2 KB
/
PortableThreadPool.WorkerThread.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
namespace System.Threading
{
internal partial class PortableThreadPool
{
/// <summary>
/// The worker thread infastructure for the CLR thread pool.
/// </summary>
private static class WorkerThread
{
/// <summary>
/// Semaphore for controlling how many threads are currently working.
/// </summary>
private static readonly LowLevelLifoSemaphore s_semaphore = new LowLevelLifoSemaphore(0, MaxPossibleThreadCount, SemaphoreSpinCount);
/// <summary>
/// Maximum number of spins a thread pool worker thread performs before waiting for work
/// </summary>
private static int SemaphoreSpinCount
{
get => AppContextConfigHelper.GetInt16Config("ThreadPool_UnfairSemaphoreSpinLimit", 70, false);
}
private static void WorkerThreadStart()
{
PortableThreadPoolEventSource log = PortableThreadPoolEventSource.Log;
if (log.IsEnabled())
{
log.WorkerThreadStart(ThreadCounts.VolatileReadCounts(ref ThreadPoolInstance._separated.counts).numExistingThreads);
}
while (true)
{
while (WaitForRequest())
{
if (TakeActiveRequest())
{
Volatile.Write(ref ThreadPoolInstance._separated.lastDequeueTime, Environment.TickCount);
if (ThreadPoolWorkQueue.Dispatch())
{
// If the queue runs out of work for us, we need to update the number of working workers to reflect that we are done working for now
RemoveWorkingWorker();
}
}
else
{
// If we woke up but couldn't find a request, we need to update the number of working workers to reflect that we are done working for now
RemoveWorkingWorker();
}
}
ThreadPoolInstance._hillClimbingThreadAdjustmentLock.Acquire();
try
{
// At this point, the thread's wait timed out. We are shutting down this thread.
// We are going to decrement the number of exisiting threads to no longer include this one
// and then change the max number of threads in the thread pool to reflect that we don't need as many
// as we had. Finally, we are going to tell hill climbing that we changed the max number of threads.
ThreadCounts counts = ThreadCounts.VolatileReadCounts(ref ThreadPoolInstance._separated.counts);
while (true)
{
if (counts.numExistingThreads == counts.numProcessingWork)
{
// In this case, enough work came in that this thread should not time out and should go back to work.
break;
}
ThreadCounts newCounts = counts;
newCounts.numExistingThreads--;
newCounts.numThreadsGoal = Math.Max(ThreadPoolInstance._minThreads, Math.Min(newCounts.numExistingThreads, newCounts.numThreadsGoal));
ThreadCounts oldCounts = ThreadCounts.CompareExchangeCounts(ref ThreadPoolInstance._separated.counts, newCounts, counts);
if (oldCounts == counts)
{
HillClimbing.ThreadPoolHillClimber.ForceChange(newCounts.numThreadsGoal, HillClimbing.StateOrTransition.ThreadTimedOut);
if (log.IsEnabled())
{
log.WorkerThreadStop(newCounts.numExistingThreads);
}
return;
}
}
}
finally
{
ThreadPoolInstance._hillClimbingThreadAdjustmentLock.Release();
}
}
}
/// <summary>
/// Waits for a request to work.
/// </summary>
/// <returns>If this thread was woken up before it timed out.</returns>
private static bool WaitForRequest()
{
PortableThreadPoolEventSource log = PortableThreadPoolEventSource.Log;
if (log.IsEnabled())
{
log.WorkerThreadWait(ThreadCounts.VolatileReadCounts(ref ThreadPoolInstance._separated.counts).numExistingThreads);
}
return s_semaphore.Wait(ThreadPoolThreadTimeoutMs);
}
/// <summary>
/// Reduce the number of working workers by one, but maybe add back a worker (possibily this thread) if a thread request comes in while we are marking this thread as not working.
/// </summary>
private static void RemoveWorkingWorker()
{
ThreadCounts currentCounts = ThreadCounts.VolatileReadCounts(ref ThreadPoolInstance._separated.counts);
while (true)
{
ThreadCounts newCounts = currentCounts;
newCounts.numProcessingWork--;
ThreadCounts oldCounts = ThreadCounts.CompareExchangeCounts(ref ThreadPoolInstance._separated.counts, newCounts, currentCounts);
if (oldCounts == currentCounts)
{
break;
}
currentCounts = oldCounts;
}
// It's possible that we decided we had thread requests just before a request came in,
// but reduced the worker count *after* the request came in. In this case, we might
// miss the notification of a thread request. So we wake up a thread (maybe this one!)
// if there is work to do.
if (ThreadPoolInstance._numRequestedWorkers > 0)
{
MaybeAddWorkingWorker();
}
}
internal static void MaybeAddWorkingWorker()
{
ThreadCounts counts = ThreadCounts.VolatileReadCounts(ref ThreadPoolInstance._separated.counts);
ThreadCounts newCounts;
while (true)
{
newCounts = counts;
newCounts.numProcessingWork = Math.Max(counts.numProcessingWork, Math.Min((short)(counts.numProcessingWork + 1), counts.numThreadsGoal));
newCounts.numExistingThreads = Math.Max(counts.numExistingThreads, newCounts.numProcessingWork);
if (newCounts == counts)
{
return;
}
ThreadCounts oldCounts = ThreadCounts.CompareExchangeCounts(ref ThreadPoolInstance._separated.counts, newCounts, counts);
if (oldCounts == counts)
{
break;
}
counts = oldCounts;
}
int toCreate = newCounts.numExistingThreads - counts.numExistingThreads;
int toRelease = newCounts.numProcessingWork - counts.numProcessingWork;
if (toRelease > 0)
{
s_semaphore.Release(toRelease);
}
while (toCreate > 0)
{
if (TryCreateWorkerThread())
{
toCreate--;
}
else
{
counts = ThreadCounts.VolatileReadCounts(ref ThreadPoolInstance._separated.counts);
while (true)
{
newCounts = counts;
newCounts.numProcessingWork -= (short)toCreate;
newCounts.numExistingThreads -= (short)toCreate;
ThreadCounts oldCounts = ThreadCounts.CompareExchangeCounts(ref ThreadPoolInstance._separated.counts, newCounts, counts);
if (oldCounts == counts)
{
break;
}
counts = oldCounts;
}
toCreate = 0;
}
}
}
/// <summary>
/// Returns if the current thread should stop processing work on the thread pool.
/// A thread should stop processing work on the thread pool when work remains only when
/// there are more worker threads in the thread pool than we currently want.
/// </summary>
/// <returns>Whether or not this thread should stop processing work even if there is still work in the queue.</returns>
internal static bool ShouldStopProcessingWorkNow()
{
ThreadCounts counts = ThreadCounts.VolatileReadCounts(ref ThreadPoolInstance._separated.counts);
while (true)
{
// When there are more threads processing work than the thread count goal, hill climbing must have decided
// to decrease the number of threads. Stop processing if the counts can be updated. We may have more
// threads existing than the thread count goal and that is ok, the cold ones will eventually time out if
// the thread count goal is not increased again. This logic is a bit different from the original CoreCLR
// code from which this implementation was ported, which turns a processing thread into a retired thread
// and checks for pending requests like RemoveWorkingWorker. In this implementation there are
// no retired threads, so only the count of threads processing work is considered.
if (counts.numProcessingWork <= counts.numThreadsGoal)
{
return false;
}
ThreadCounts newCounts = counts;
newCounts.numProcessingWork--;
ThreadCounts oldCounts = ThreadCounts.CompareExchangeCounts(ref ThreadPoolInstance._separated.counts, newCounts, counts);
if (oldCounts == counts)
{
return true;
}
counts = oldCounts;
}
}
private static bool TakeActiveRequest()
{
int count = ThreadPoolInstance._numRequestedWorkers;
while (count > 0)
{
int prevCount = Interlocked.CompareExchange(ref ThreadPoolInstance._numRequestedWorkers, count - 1, count);
if (prevCount == count)
{
return true;
}
count = prevCount;
}
return false;
}
private static bool TryCreateWorkerThread()
{
try
{
Thread workerThread = new Thread(WorkerThreadStart);
workerThread.IsThreadPoolThread = true;
workerThread.IsBackground = true;
workerThread.Start();
}
catch (ThreadStartException)
{
return false;
}
catch (OutOfMemoryException)
{
return false;
}
return true;
}
}
}
}