Skip to content

Commit

Permalink
WorkerSupervisor enable the reduction of Worker (#751) (#1156)
Browse files Browse the repository at this point in the history
* Avoid NullReferenceException in case of job continuation (#819)
* WorkerSupervisor enable the reduction of Worker  (#751)

Worker supervisor should also stop worker if max number of worker has decreased
* Removed not used methods from IWorkerSupervisor interface
* Added observability property, to read number of workers from test
* Added tests for WorkerSupervisor
* [Refactoring] change accessibility to private for methods CreateWorker and StopWorker
* Extend WorkerSupervisor tests and logging when redusing MaxWorker
* Set WorkerSupervisor interval via optional parameter to optimize testing

Co-authored-by: Alexander Köpke <koepalex@xdigital.de>
Co-authored-by: Marc Schier <marcschier@users.noreply.github.com>
  • Loading branch information
3 people committed May 12, 2021
1 parent 960802a commit a85b368
Show file tree
Hide file tree
Showing 4 changed files with 289 additions and 43 deletions.
Expand Up @@ -106,6 +106,12 @@ public class Worker : IWorker, IDisposable {
_logger.Information("Stopping worker...");
_heartbeatTimer.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);

// Inform services, that this worker has stopped working, so orchestrator can reassign job
if (_jobProcess != null) {
_jobProcess.Status = WorkerStatus.Stopped;
await SendHeartbeatWithoutResetTimer(); // need to be send before cancel the CancellationToken
}

// Stop worker
_cts.Cancel();
await _worker;
Expand Down Expand Up @@ -142,6 +148,12 @@ public class Worker : IWorker, IDisposable {
/// </summary>
/// <param name="sender"></param>
private async void HeartbeatTimer_ElapsedAsync(object sender) {

await SendHeartbeatWithoutResetTimer();
Try.Op(() => _heartbeatTimer.Change(_heartbeatInterval, Timeout.InfiniteTimeSpan));
}

private async Task SendHeartbeatWithoutResetTimer() {
try {
_logger.Debug("Sending heartbeat...");

Expand All @@ -155,10 +167,9 @@ public class Worker : IWorker, IDisposable {
return; // Done
}
catch (Exception ex) {
_logger.Information(ex, "Could not send worker heartbeat.");
_logger.Debug(ex, "Could not send worker heartbeat.");
kModuleExceptions.WithLabels(AgentId, ex.Source, ex.GetType().FullName, ex.Message, ex.StackTrace, "Could not send worker hearbeat").Inc();
}
Try.Op(() => _heartbeatTimer.Change(_heartbeatInterval, Timeout.InfiniteTimeSpan));
}

/// <summary>
Expand Down Expand Up @@ -226,6 +237,8 @@ public class Worker : IWorker, IDisposable {

// Check if the job is to be continued with new configuration settings
if (_jobProcess.JobContinuation == null) {
_jobProcess.Status = WorkerStatus.Stopped;
await SendHeartbeatWithoutResetTimer();
_jobProcess = null;
break;
}
Expand All @@ -234,6 +247,11 @@ public class Worker : IWorker, IDisposable {
if (jobProcessInstruction?.Job?.JobConfiguration == null ||
jobProcessInstruction?.ProcessMode == null) {
_logger.Information("Job continuation invalid, continue listening...");
if (_jobProcess != null) {
_jobProcess.Status = WorkerStatus.Stopped;
await SendHeartbeatWithoutResetTimer();
}

_jobProcess = null;
break;
}
Expand All @@ -242,6 +260,11 @@ public class Worker : IWorker, IDisposable {
}
catch (OperationCanceledException) {
_logger.Information("Processing cancellation received ...");
if (_jobProcess != null) {
_jobProcess.Status = WorkerStatus.Stopped;
await SendHeartbeatWithoutResetTimer();
}

_jobProcess = null;
}
finally {
Expand All @@ -262,7 +285,7 @@ private class JobProcess : IDisposable {
public JobProcessingInstructionModel JobContinuation { get; private set; }

/// <inheritdoc/>
public WorkerStatus Status { get; private set; } = WorkerStatus.Stopped;
public WorkerStatus Status { get; internal set; } = WorkerStatus.Stopped;

/// <inheritdoc/>
public JobInfoModel Job => _currentJobProcessInstruction.Job;
Expand Down
Expand Up @@ -26,12 +26,18 @@ public class WorkerSupervisor : IWorkerSupervisor, IDisposable {
/// <param name="lifetimeScope"></param>
/// <param name="agentConfigProvider"></param>
/// <param name="logger"></param>
/// <param name="timerDelayInSeconds">The time the supervisor is waiting before ensure worker</param>
public WorkerSupervisor(ILifetimeScope lifetimeScope,
IAgentConfigProvider agentConfigProvider, ILogger logger) {
IAgentConfigProvider agentConfigProvider, ILogger logger, int timerDelayInSeconds = 10) {

if (timerDelayInSeconds <= 0) {
timerDelayInSeconds = 10;
}

_lifetimeScope = lifetimeScope;
_agentConfigProvider = agentConfigProvider;
_logger = logger;
_ensureWorkerRunningTimer = new Timer(TimeSpan.FromSeconds(10).TotalMilliseconds);
_ensureWorkerRunningTimer = new Timer(TimeSpan.FromSeconds(timerDelayInSeconds).TotalMilliseconds);
_ensureWorkerRunningTimer.Elapsed += EnsureWorkerRunningTimer_ElapsedAsync;
}

Expand All @@ -56,26 +62,36 @@ public class WorkerSupervisor : IWorkerSupervisor, IDisposable {
}

/// <inheritdoc/>
public Task<IWorker> CreateWorker() {
public int NumberOfWorkers => _instances.Count;

/// <summary>
/// Create worker
/// </summary>
/// <returns></returns>
private Task<IWorker> CreateWorker() {
var maxWorkers = _agentConfigProvider.Config?.MaxWorkers ?? kDefaultWorkers;
if (_instances.Count >= maxWorkers) {
throw new MaxWorkersReachedException(maxWorkers);
}

var childScope = _lifetimeScope.BeginLifetimeScope();
var worker = childScope.Resolve<IWorker>(new NamedParameter("workerInstance", _instances.Count));
_instances[worker] = childScope;
_logger.Information("Creating new worker with id {WorkerId}", worker.WorkerId);

return Task.FromResult(worker);
}

/// <inheritdoc/>
public async Task StopWorker(string workerId) {
if (!_instances.Keys.Any(w => w.WorkerId == workerId)) {
throw new WorkerNotFoundException(workerId);
}

var worker = _instances.Where(w => w.Key.WorkerId == workerId).Single();

/// <summary>
/// Stop a single worker
/// </summary>
/// <returns>awaitable task</returns>
private async Task StopWorker() {
// sort workers, so that a worker in state Stopped, Stopping or WaitingForJob will terminate first
var worker = _instances.OrderBy(kvp => kvp.Key.Status).First();
var workerId = worker.Key.WorkerId;
_logger.Information("Stopping worker with id {WorkerId}", workerId);

await worker.Key.StopAsync();
worker.Value.Dispose();
_instances.Remove(worker.Key);
Expand Down Expand Up @@ -107,26 +123,34 @@ public class WorkerSupervisor : IWorkerSupervisor, IDisposable {
/// </summary>
/// <returns></returns>
private async Task EnsureWorkersAsync() {
var workerStartTasks = new List<Task>();

while (true) {
var workers = _agentConfigProvider.Config?.MaxWorkers ?? kDefaultWorkers;
while (_instances.Count < workers) {
_logger.Information("Creating new worker...");
var worker = await CreateWorker();
}

foreach (var stoppedWorker in _instances.Keys.Where(s => s.Status == WorkerStatus.Stopped)) {
_logger.Information("Starting worker '{workerId}'...", stoppedWorker.WorkerId);
workerStartTasks.Add(stoppedWorker.StartAsync());
}
await Task.WhenAll(workerStartTasks);
// the configuration might have been changed by workers execution
var newWorkers = _agentConfigProvider.Config?.MaxWorkers ?? kDefaultWorkers;
if (workers >= newWorkers) {
break;
}
if (_agentConfigProvider.Config?.MaxWorkers <= 0) {
_logger.Error("MaxWorker can't be zero or negative! using default value {DefaultMaxWorkers}", kDefaultWorkers);
_agentConfigProvider.Config.MaxWorkers = kDefaultWorkers;
}

var workers = _agentConfigProvider.Config?.MaxWorkers ?? kDefaultWorkers;
var delta = workers - _instances.Count;

// start new worker if necessary
while (delta > 0) {
var worker = await CreateWorker();
delta--;
}

// terminate running worker if necessary
while (delta < 0) {
await StopWorker();
delta++;
}

//restart stopped worker if necessary
var workerStartTasks = new List<Task>();
foreach (var stoppedWorker in _instances.Keys.Where(s => s.Status == WorkerStatus.Stopped)) {
_logger.Information("Starting worker '{workerId}'...", stoppedWorker.WorkerId);
workerStartTasks.Add(stoppedWorker.StartAsync());
}
await Task.WhenAll(workerStartTasks);
}

private const int kDefaultWorkers = 5; // TODO - single listener, dynamic workers.
Expand Down
Expand Up @@ -12,16 +12,8 @@ namespace Microsoft.Azure.IIoT.Agent.Framework {
public interface IWorkerSupervisor : IHostProcess {

/// <summary>
/// Create worker
/// The amount of workers currently running
/// </summary>
/// <returns></returns>
Task<IWorker> CreateWorker();

/// <summary>
/// Stop worker
/// </summary>
/// <param name="workerId"></param>
/// <returns></returns>
Task StopWorker(string workerId);
int NumberOfWorkers { get; }
}
}

0 comments on commit a85b368

Please sign in to comment.