/
PortableThreadPool.HillClimbing.cs
461 lines (400 loc) · 24.5 KB
/
PortableThreadPool.HillClimbing.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
using System.Diagnostics;
using System.Diagnostics.Tracing;
namespace System.Threading
{
internal sealed partial class PortableThreadPool
{
/// <summary>
/// Hill climbing algorithm used for determining the number of threads needed for the thread pool.
/// </summary>
private sealed partial class HillClimbing
{
private const int LogCapacity = 200;
private const int DefaultSampleIntervalMsLow = 10;
private const int DefaultSampleIntervalMsHigh = 200;
public static readonly bool IsDisabled = AppContextConfigHelper.GetBooleanConfig("System.Threading.ThreadPool.HillClimbing.Disable", false);
// SOS's ThreadPool command depends on this name
public static readonly HillClimbing ThreadPoolHillClimber = new HillClimbing();
// SOS's ThreadPool command depends on the enum values
public enum StateOrTransition
{
Warmup,
Initializing,
RandomMove,
ClimbingMove,
ChangePoint,
Stabilizing,
Starvation,
ThreadTimedOut,
CooperativeBlocking,
}
// SOS's ThreadPool command depends on the names of all fields
private struct LogEntry
{
public int tickCount;
public StateOrTransition stateOrTransition;
public int newControlSetting;
public int lastHistoryCount;
public float lastHistoryMean;
}
private readonly int _wavePeriod;
private readonly int _samplesToMeasure;
private readonly double _targetThroughputRatio;
private readonly double _targetSignalToNoiseRatio;
private readonly double _maxChangePerSecond;
private readonly double _maxChangePerSample;
private readonly int _maxThreadWaveMagnitude;
private readonly int _sampleIntervalMsLow;
private readonly double _threadMagnitudeMultiplier;
private readonly int _sampleIntervalMsHigh;
private readonly double _throughputErrorSmoothingFactor;
private readonly double _gainExponent;
private readonly double _maxSampleError;
private double _currentControlSetting;
private long _totalSamples;
private int _lastThreadCount;
private double _averageThroughputNoise;
private double _secondsElapsedSinceLastChange;
private double _completionsSinceLastChange;
private int _accumulatedCompletionCount;
private double _accumulatedSampleDurationSeconds;
private readonly double[] _samples;
private readonly double[] _threadCounts;
private int _currentSampleMs;
private readonly Random.XoshiroImpl _randomIntervalGenerator = new Random.XoshiroImpl();
private readonly LogEntry[] _log = new LogEntry[LogCapacity]; // SOS's ThreadPool command depends on this name
private int _logStart; // SOS's ThreadPool command depends on this name
private int _logSize; // SOS's ThreadPool command depends on this name
public HillClimbing()
{
_wavePeriod = AppContextConfigHelper.GetInt32Config("System.Threading.ThreadPool.HillClimbing.WavePeriod", 4, false);
_maxThreadWaveMagnitude = AppContextConfigHelper.GetInt32Config("System.Threading.ThreadPool.HillClimbing.MaxWaveMagnitude", 20, false);
_threadMagnitudeMultiplier = AppContextConfigHelper.GetInt32Config("System.Threading.ThreadPool.HillClimbing.WaveMagnitudeMultiplier", 100, false) / 100.0;
_samplesToMeasure = _wavePeriod * AppContextConfigHelper.GetInt32Config("System.Threading.ThreadPool.HillClimbing.WaveHistorySize", 8, false);
_targetThroughputRatio = AppContextConfigHelper.GetInt32Config("System.Threading.ThreadPool.HillClimbing.Bias", 15, false) / 100.0;
_targetSignalToNoiseRatio = AppContextConfigHelper.GetInt32Config("System.Threading.ThreadPool.HillClimbing.TargetSignalToNoiseRatio", 300, false) / 100.0;
_maxChangePerSecond = AppContextConfigHelper.GetInt32Config("System.Threading.ThreadPool.HillClimbing.MaxChangePerSecond", 4, false);
_maxChangePerSample = AppContextConfigHelper.GetInt32Config("System.Threading.ThreadPool.HillClimbing.MaxChangePerSample", 20, false);
int sampleIntervalMsLow = AppContextConfigHelper.GetInt32Config("System.Threading.ThreadPool.HillClimbing.SampleIntervalLow", DefaultSampleIntervalMsLow, false);
int sampleIntervalMsHigh = AppContextConfigHelper.GetInt32Config("System.Threading.ThreadPool.HillClimbing.SampleIntervalHigh", DefaultSampleIntervalMsHigh, false);
if (sampleIntervalMsLow <= sampleIntervalMsHigh)
{
_sampleIntervalMsLow = sampleIntervalMsLow;
_sampleIntervalMsHigh = sampleIntervalMsHigh;
}
else
{
_sampleIntervalMsLow = DefaultSampleIntervalMsLow;
_sampleIntervalMsHigh = DefaultSampleIntervalMsHigh;
}
_throughputErrorSmoothingFactor = AppContextConfigHelper.GetInt32Config("System.Threading.ThreadPool.HillClimbing.ErrorSmoothingFactor", 1, false) / 100.0;
_gainExponent = AppContextConfigHelper.GetInt32Config("System.Threading.ThreadPool.HillClimbing.GainExponent", 200, false) / 100.0;
_maxSampleError = AppContextConfigHelper.GetInt32Config("System.Threading.ThreadPool.HillClimbing.MaxSampleErrorPercent", 15, false) / 100.0;
_samples = new double[_samplesToMeasure];
_threadCounts = new double[_samplesToMeasure];
_currentSampleMs = _randomIntervalGenerator.Next(_sampleIntervalMsLow, _sampleIntervalMsHigh + 1);
}
public (int newThreadCount, int newSampleMs) Update(int currentThreadCount, double sampleDurationSeconds, int numCompletions)
{
//
// If someone changed the thread count without telling us, update our records accordingly.
//
if (currentThreadCount != _lastThreadCount)
ForceChange(currentThreadCount, StateOrTransition.Initializing);
//
// Update the cumulative stats for this thread count
//
_secondsElapsedSinceLastChange += sampleDurationSeconds;
_completionsSinceLastChange += numCompletions;
//
// Add in any data we've already collected about this sample
//
sampleDurationSeconds += _accumulatedSampleDurationSeconds;
numCompletions += _accumulatedCompletionCount;
//
// We need to make sure we're collecting reasonably accurate data. Since we're just counting the end
// of each work item, we are goinng to be missing some data about what really happened during the
// sample interval. The count produced by each thread includes an initial work item that may have
// started well before the start of the interval, and each thread may have been running some new
// work item for some time before the end of the interval, which did not yet get counted. So
// our count is going to be off by +/- threadCount workitems.
//
// The exception is that the thread that reported to us last time definitely wasn't running any work
// at that time, and the thread that's reporting now definitely isn't running a work item now. So
// we really only need to consider threadCount-1 threads.
//
// Thus the percent error in our count is +/- (threadCount-1)/numCompletions.
//
// We cannot rely on the frequency-domain analysis we'll be doing later to filter out this error, because
// of the way it accumulates over time. If this sample is off by, say, 33% in the negative direction,
// then the next one likely will be too. The one after that will include the sum of the completions
// we missed in the previous samples, and so will be 33% positive. So every three samples we'll have
// two "low" samples and one "high" sample. This will appear as periodic variation right in the frequency
// range we're targeting, which will not be filtered by the frequency-domain translation.
//
if (_totalSamples > 0 && ((currentThreadCount - 1.0) / numCompletions) >= _maxSampleError)
{
// not accurate enough yet. Let's accumulate the data so far, and tell the ThreadPool
// to collect a little more.
_accumulatedSampleDurationSeconds = sampleDurationSeconds;
_accumulatedCompletionCount = numCompletions;
return (currentThreadCount, 10);
}
//
// We've got enouugh data for our sample; reset our accumulators for next time.
//
_accumulatedSampleDurationSeconds = 0;
_accumulatedCompletionCount = 0;
//
// Add the current thread count and throughput sample to our history
//
double throughput = numCompletions / sampleDurationSeconds;
if (NativeRuntimeEventSource.Log.IsEnabled())
{
NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadAdjustmentSample(throughput);
}
int sampleIndex = (int)(_totalSamples % _samplesToMeasure);
_samples[sampleIndex] = throughput;
_threadCounts[sampleIndex] = currentThreadCount;
_totalSamples++;
//
// Set up defaults for our metrics
//
Complex threadWaveComponent = default;
Complex throughputWaveComponent = default;
double throughputErrorEstimate = 0;
Complex ratio = default;
double confidence = 0;
StateOrTransition state = StateOrTransition.Warmup;
//
// How many samples will we use? It must be at least the three wave periods we're looking for, and it must also be a whole
// multiple of the primary wave's period; otherwise the frequency we're looking for will fall between two frequency bands
// in the Fourier analysis, and we won't be able to measure it accurately.
//
int sampleCount = ((int)Math.Min(_totalSamples - 1, _samplesToMeasure)) / _wavePeriod * _wavePeriod;
if (sampleCount > _wavePeriod)
{
//
// Average the throughput and thread count samples, so we can scale the wave magnitudes later.
//
double sampleSum = 0;
double threadSum = 0;
for (int i = 0; i < sampleCount; i++)
{
sampleSum += _samples[(_totalSamples - sampleCount + i) % _samplesToMeasure];
threadSum += _threadCounts[(_totalSamples - sampleCount + i) % _samplesToMeasure];
}
double averageThroughput = sampleSum / sampleCount;
double averageThreadCount = threadSum / sampleCount;
if (averageThroughput > 0 && averageThreadCount > 0)
{
//
// Calculate the periods of the adjacent frequency bands we'll be using to measure noise levels.
// We want the two adjacent Fourier frequency bands.
//
double adjacentPeriod1 = sampleCount / (((double)sampleCount / _wavePeriod) + 1);
double adjacentPeriod2 = sampleCount / (((double)sampleCount / _wavePeriod) - 1);
//
// Get the three different frequency components of the throughput (scaled by average
// throughput). Our "error" estimate (the amount of noise that might be present in the
// frequency band we're really interested in) is the average of the adjacent bands.
//
throughputWaveComponent = GetWaveComponent(_samples, sampleCount, _wavePeriod) / averageThroughput;
throughputErrorEstimate = (GetWaveComponent(_samples, sampleCount, adjacentPeriod1) / averageThroughput).Abs();
if (adjacentPeriod2 <= sampleCount)
{
throughputErrorEstimate = Math.Max(throughputErrorEstimate, (GetWaveComponent(_samples, sampleCount, adjacentPeriod2) / averageThroughput).Abs());
}
//
// Do the same for the thread counts, so we have something to compare to. We don't measure thread count
// noise, because there is none; these are exact measurements.
//
threadWaveComponent = GetWaveComponent(_threadCounts, sampleCount, _wavePeriod) / averageThreadCount;
//
// Update our moving average of the throughput noise. We'll use this later as feedback to
// determine the new size of the thread wave.
//
if (_averageThroughputNoise == 0)
_averageThroughputNoise = throughputErrorEstimate;
else
_averageThroughputNoise = (_throughputErrorSmoothingFactor * throughputErrorEstimate) + ((1.0 - _throughputErrorSmoothingFactor) * _averageThroughputNoise);
if (threadWaveComponent.Abs() > 0)
{
//
// Adjust the throughput wave so it's centered around the target wave, and then calculate the adjusted throughput/thread ratio.
//
ratio = (throughputWaveComponent - (_targetThroughputRatio * threadWaveComponent)) / threadWaveComponent;
state = StateOrTransition.ClimbingMove;
}
else
{
ratio = new Complex(0, 0);
state = StateOrTransition.Stabilizing;
}
//
// Calculate how confident we are in the ratio. More noise == less confident. This has
// the effect of slowing down movements that might be affected by random noise.
//
double noiseForConfidence = Math.Max(_averageThroughputNoise, throughputErrorEstimate);
if (noiseForConfidence > 0)
confidence = (threadWaveComponent.Abs() / noiseForConfidence) / _targetSignalToNoiseRatio;
else
confidence = 1.0; //there is no noise!
}
}
//
// We use just the real part of the complex ratio we just calculated. If the throughput signal
// is exactly in phase with the thread signal, this will be the same as taking the magnitude of
// the complex move and moving that far up. If they're 180 degrees out of phase, we'll move
// backward (because this indicates that our changes are having the opposite of the intended effect).
// If they're 90 degrees out of phase, we won't move at all, because we can't tell whether we're
// having a negative or positive effect on throughput.
//
double move = Math.Min(1.0, Math.Max(-1.0, ratio.Real));
//
// Apply our confidence multiplier.
//
move *= Math.Min(1.0, Math.Max(0.0, confidence));
//
// Now apply non-linear gain, such that values around zero are attenuated, while higher values
// are enhanced. This allows us to move quickly if we're far away from the target, but more slowly
// if we're getting close, giving us rapid ramp-up without wild oscillations around the target.
//
double gain = _maxChangePerSecond * sampleDurationSeconds;
move = Math.Pow(Math.Abs(move), _gainExponent) * (move >= 0.0 ? 1 : -1) * gain;
move = Math.Min(move, _maxChangePerSample);
//
// If the result was positive, and CPU is > 95%, refuse the move.
//
PortableThreadPool threadPoolInstance = ThreadPoolInstance;
if (move > 0.0 && threadPoolInstance._cpuUtilization > CpuUtilizationHigh)
move = 0.0;
//
// Apply the move to our control setting
//
_currentControlSetting += move;
//
// Calculate the new thread wave magnitude, which is based on the moving average we've been keeping of
// the throughput error. This average starts at zero, so we'll start with a nice safe little wave at first.
//
int newThreadWaveMagnitude = (int)(0.5 + (_currentControlSetting * _averageThroughputNoise * _targetSignalToNoiseRatio * _threadMagnitudeMultiplier * 2.0));
newThreadWaveMagnitude = Math.Min(newThreadWaveMagnitude, _maxThreadWaveMagnitude);
newThreadWaveMagnitude = Math.Max(newThreadWaveMagnitude, 1);
//
// Make sure our control setting is within the ThreadPool's limits. When some threads are blocked due to
// cooperative blocking, ensure that hill climbing does not decrease the thread count below the expected
// minimum.
//
int maxThreads = threadPoolInstance._maxThreads;
int minThreads = threadPoolInstance.MinThreadsGoal;
_currentControlSetting = Math.Min(maxThreads - newThreadWaveMagnitude, _currentControlSetting);
_currentControlSetting = Math.Max(minThreads, _currentControlSetting);
//
// Calculate the new thread count (control setting + square wave)
//
int newThreadCount = (int)(_currentControlSetting + newThreadWaveMagnitude * ((_totalSamples / (_wavePeriod / 2)) % 2));
//
// Make sure the new thread count doesn't exceed the ThreadPool's limits
//
newThreadCount = Math.Min(maxThreads, newThreadCount);
newThreadCount = Math.Max(minThreads, newThreadCount);
//
// Record these numbers for posterity
//
if (NativeRuntimeEventSource.Log.IsEnabled())
{
NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadAdjustmentStats(sampleDurationSeconds, throughput, threadWaveComponent.Real, throughputWaveComponent.Real,
throughputErrorEstimate, _averageThroughputNoise, ratio.Real, confidence, _currentControlSetting, (ushort)newThreadWaveMagnitude);
}
//
// If all of this caused an actual change in thread count, log that as well.
//
if (newThreadCount != currentThreadCount)
{
ChangeThreadCount(newThreadCount, state);
_secondsElapsedSinceLastChange = 0;
_completionsSinceLastChange = 0;
}
//
// Return the new thread count and sample interval. This is randomized to prevent correlations with other periodic
// changes in throughput. Among other things, this prevents us from getting confused by Hill Climbing instances
// running in other processes.
//
// If we're at minThreads, and we seem to be hurting performance by going higher, we can't go any lower to fix this. So
// we'll simply stay at minThreads much longer, and only occasionally try a higher value.
//
int newSampleInterval;
if (ratio.Real < 0.0 && newThreadCount == minThreads)
newSampleInterval = (int)(0.5 + _currentSampleMs * (10.0 * Math.Min(-ratio.Real, 1.0)));
else
newSampleInterval = _currentSampleMs;
return (newThreadCount, newSampleInterval);
}
private void ChangeThreadCount(int newThreadCount, StateOrTransition state)
{
_lastThreadCount = newThreadCount;
if (state != StateOrTransition.CooperativeBlocking) // this can be noisy
{
_currentSampleMs = _randomIntervalGenerator.Next(_sampleIntervalMsLow, _sampleIntervalMsHigh + 1);
}
double throughput = _secondsElapsedSinceLastChange > 0 ? _completionsSinceLastChange / _secondsElapsedSinceLastChange : 0;
LogTransition(newThreadCount, throughput, state);
}
private void LogTransition(int newThreadCount, double throughput, StateOrTransition stateOrTransition)
{
// Use the _log array as a circular array for log entries
int index = (_logStart + _logSize) % LogCapacity;
if (_logSize == LogCapacity)
{
_logStart = (_logStart + 1) % LogCapacity;
_logSize--; // hide this slot while we update it
}
ref LogEntry entry = ref _log[index];
entry.tickCount = Environment.TickCount;
entry.stateOrTransition = stateOrTransition;
entry.newControlSetting = newThreadCount;
entry.lastHistoryCount = (int)(Math.Min(_totalSamples, _samplesToMeasure) / _wavePeriod) * _wavePeriod;
entry.lastHistoryMean = (float)throughput;
_logSize++;
if (NativeRuntimeEventSource.Log.IsEnabled())
{
NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadAdjustmentAdjustment(
throughput,
(uint)newThreadCount,
(NativeRuntimeEventSource.ThreadAdjustmentReasonMap)stateOrTransition);
}
}
public void ForceChange(int newThreadCount, StateOrTransition state)
{
if (_lastThreadCount != newThreadCount)
{
_currentControlSetting += newThreadCount - _lastThreadCount;
ChangeThreadCount(newThreadCount, state);
}
}
private Complex GetWaveComponent(double[] samples, int numSamples, double period)
{
Debug.Assert(numSamples >= period); // can't measure a wave that doesn't fit
Debug.Assert(period >= 2); // can't measure above the Nyquist frequency
Debug.Assert(numSamples <= samples.Length); // can't measure more samples than we have
//
// Calculate the sinusoid with the given period.
// We're using the Goertzel algorithm for this. See http://en.wikipedia.org/wiki/Goertzel_algorithm.
//
double w = 2 * Math.PI / period;
double cos = Math.Cos(w);
double coeff = 2 * cos;
double q0, q1 = 0, q2 = 0;
for (int i = 0; i < numSamples; ++i)
{
q0 = coeff * q1 - q2 + samples[(_totalSamples - numSamples + i) % _samplesToMeasure];
q2 = q1;
q1 = q0;
}
return new Complex(q1 - q2 * cos, q2 * Math.Sin(w)) / numSamples;
}
}
}
}