-
Notifications
You must be signed in to change notification settings - Fork 2k
/
DeploymentBasedQueueBalancer.cs
183 lines (168 loc) · 7.89 KB
/
DeploymentBasedQueueBalancer.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Orleans.Runtime;
using Microsoft.Extensions.Options;
using Orleans.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
namespace Orleans.Streams
{
/// <summary>
/// DeploymentBasedQueueBalancer is a stream queue balancer that uses deployment information to
/// help balance queue distribution.
/// DeploymentBasedQueueBalancer uses the deployment configuration to determine how many silos
/// to expect and uses a silo status oracle to determine which of the silos are available. With
/// this information it tries to balance the queues using a best fit resource balancing algorithm.
/// </summary>
public class DeploymentBasedQueueBalancer : QueueBalancerBase, IStreamQueueBalancer
{
private readonly ISiloStatusOracle siloStatusOracle;
private readonly IDeploymentConfiguration deploymentConfig;
private readonly DeploymentBasedQueueBalancerOptions options;
private readonly ConcurrentDictionary<SiloAddress, bool> immatureSilos;
private List<QueueId> allQueues;
private bool isStarting;
public DeploymentBasedQueueBalancer(
ISiloStatusOracle siloStatusOracle,
IDeploymentConfiguration deploymentConfig,
DeploymentBasedQueueBalancerOptions options,
IServiceProvider services,
ILogger<DeploymentBasedQueueBalancer> logger)
: base (services, logger)
{
this.siloStatusOracle = siloStatusOracle ?? throw new ArgumentNullException(nameof(siloStatusOracle));
this.deploymentConfig = deploymentConfig ?? throw new ArgumentNullException(nameof(deploymentConfig));
this.options = options;
isStarting = true;
// record all already active silos as already mature.
// Even if they are not yet, they will be mature by the time I mature myself (after I become !isStarting).
immatureSilos = new ConcurrentDictionary<SiloAddress, bool>(
from s in siloStatusOracle.GetApproximateSiloStatuses(true).Keys
where !s.Equals(siloStatusOracle.SiloAddress)
select new KeyValuePair<SiloAddress, bool>(s, false));
}
public static IStreamQueueBalancer Create(IServiceProvider services, string name, IDeploymentConfiguration deploymentConfiguration)
{
var options = services.GetRequiredService<IOptionsMonitor<DeploymentBasedQueueBalancerOptions>>().Get(name);
return ActivatorUtilities.CreateInstance<DeploymentBasedQueueBalancer>(services, options, deploymentConfiguration);
}
public override Task Initialize(IStreamQueueMapper queueMapper)
{
if (queueMapper == null)
{
throw new ArgumentNullException("queueMapper");
}
this.allQueues = queueMapper.GetAllQueues().ToList();
NotifyAfterStart().Ignore();
return base.Initialize(queueMapper);
}
private async Task NotifyAfterStart()
{
await Task.Delay(this.options.SiloMaturityPeriod);
isStarting = false;
await NotifyListeners();
}
private async Task RecordImmatureSilo(SiloAddress updatedSilo)
{
immatureSilos[updatedSilo] = true; // record as immature
await Task.Delay(this.options.SiloMaturityPeriod);
immatureSilos[updatedSilo] = false; // record as mature
}
public override IEnumerable<QueueId> GetMyQueues()
{
BestFitBalancer<string, QueueId> balancer = GetBalancer();
bool useIdealDistribution = this.options.IsFixed || isStarting;
Dictionary<string, List<QueueId>> distribution = useIdealDistribution
? balancer.IdealDistribution
: balancer.GetDistribution(GetActiveSilos(siloStatusOracle, immatureSilos));
List<QueueId> myQueues;
if (distribution.TryGetValue(siloStatusOracle.SiloName, out myQueues))
{
if (!useIdealDistribution)
{
HashSet<QueueId> queuesOfImmatureSilos = GetQueuesOfImmatureSilos(siloStatusOracle, immatureSilos, balancer.IdealDistribution);
// filter queues that belong to immature silos
myQueues.RemoveAll(queue => queuesOfImmatureSilos.Contains(queue));
}
return myQueues;
}
return Enumerable.Empty<QueueId>();
}
private static List<string> GetActiveSilos(ISiloStatusOracle siloStatusOracle, ConcurrentDictionary<SiloAddress, bool> immatureSilos)
{
var activeSiloNames = new List<string>();
foreach (var kvp in siloStatusOracle.GetApproximateSiloStatuses(true))
{
bool immatureBit;
if (!(immatureSilos.TryGetValue(kvp.Key, out immatureBit) && immatureBit)) // if not immature now or any more
{
string siloName;
if (siloStatusOracle.TryGetSiloName(kvp.Key, out siloName))
{
activeSiloNames.Add(siloName);
}
}
}
return activeSiloNames;
}
/// <summary>
/// Checks to see if deployment configuration has changed, by adding or removing silos.
/// If so, it updates the list of all silo names and creates a new resource balancer.
/// This should occur rarely.
/// </summary>
private BestFitBalancer<string, QueueId> GetBalancer()
{
var allSiloNames = deploymentConfig.GetAllSiloNames();
// rebuild balancer with new list of instance names
return new BestFitBalancer<string, QueueId>(allSiloNames, allQueues);
}
private static HashSet<QueueId> GetQueuesOfImmatureSilos(ISiloStatusOracle siloStatusOracle,
ConcurrentDictionary<SiloAddress, bool> immatureSilos,
Dictionary<string, List<QueueId>> idealDistribution)
{
HashSet<QueueId> queuesOfImmatureSilos = new HashSet<QueueId>();
foreach (var silo in immatureSilos.Where(s => s.Value)) // take only those from immature set that have their immature status bit set
{
string siloName;
if (siloStatusOracle.TryGetSiloName(silo.Key, out siloName))
{
List<QueueId> queues;
if (idealDistribution.TryGetValue(siloName, out queues))
{
queuesOfImmatureSilos.UnionWith(queues);
}
}
}
return queuesOfImmatureSilos;
}
protected override void OnClusterMembershipChange(HashSet<SiloAddress> activeSilos)
{
SignalClusterChange(activeSilos).Ignore();
}
private async Task SignalClusterChange(HashSet<SiloAddress> activeSilos)
{
List<Task> tasks = new List<Task>();
// look at all currently active silos not including myself
foreach (var silo in activeSilos)
{
if (!silo.Equals(siloStatusOracle.SiloAddress) && !immatureSilos.ContainsKey(silo))
{
tasks.Add(RecordImmatureSilo(silo));
}
}
if (!isStarting)
{
// notify, uncoditionaly, and deal with changes in GetMyQueues()
await NotifyListeners();
}
if (tasks.Count > 0)
{
await Task.WhenAll(tasks);
await this.NotifyListeners(); // notify, uncoditionaly, and deal with changes it in GetMyQueues()
}
}
}
}