Skip to content

Commit

Permalink
hydra-queue-runner: Improve dispatcher
Browse files Browse the repository at this point in the history
We now take the machine speed factor into account, just like
build-remote.pl.
  • Loading branch information
edolstra committed Jun 17, 2015
1 parent 3855131 commit a40ca6b
Showing 1 changed file with 84 additions and 62 deletions.
146 changes: 84 additions & 62 deletions src/hydra-queue-runner/hydra-queue-runner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <thread>
#include <cmath>
#include <chrono>
#include <algorithm>

#include <pqxx/pqxx>

Expand Down Expand Up @@ -159,13 +160,7 @@ struct Machine
unsigned int maxJobs = 1;
float speedFactor = 1.0;

Sync<unsigned int> currentJobs;

Machine()
{
auto currentJobs_(currentJobs.lock());
*currentJobs_ = 0;
}
std::atomic<unsigned int> currentJobs{0};

bool supportsStep(Step::ptr step)
{
Expand All @@ -187,13 +182,11 @@ struct MachineReservation
Machine::ptr machine;
MachineReservation(Machine::ptr machine) : machine(machine)
{
auto currentJobs_(machine->currentJobs.lock());
(*currentJobs_)++;
machine->currentJobs++;
}
~MachineReservation()
{
auto currentJobs_(machine->currentJobs.lock());
if (*currentJobs_ > 0) (*currentJobs_)--;
machine->currentJobs--;
}
};

Expand Down Expand Up @@ -284,8 +277,6 @@ class State

void wakeDispatcher();

MachineReservation::ptr findMachine(Step::ptr step);

void builder(Step::ptr step, MachineReservation::ptr reservation);

/* Perform the given build step. Return true if the step is to be
Expand Down Expand Up @@ -878,49 +869,98 @@ void State::dispatcher()

auto sleepUntil = system_time::max();

{
auto runnable_(runnable.lock());
printMsg(lvlDebug, format("%1% runnable builds") % runnable_->size());
bool keepGoing;

/* FIXME: we're holding the runnable lock too long
here. This could be more efficient. */
do {
/* Bail out when there are no slots left. */
std::vector<Machine::ptr> machinesSorted;
{
auto machines_(machines.lock());
machinesSorted.insert(machinesSorted.end(),
machines_->begin(), machines_->end());
}

system_time now = std::chrono::system_clock::now();
/* Sort the machines by a combination of speed factor and
available slots. Prioritise the available machines as
follows:
for (auto i = runnable_->begin(); i != runnable_->end(); ) {
auto step = i->lock();
- First by load divided by speed factor, rounded to the
nearest integer. This causes fast machines to be
preferred over slow machines with similar loads.
/* Delete dead steps. */
if (!step) {
i = runnable_->erase(i);
continue;
}
- Then by speed factor.
/* Skip previously failed steps that aren't ready to
be retried. */
- Finally by load. */
sort(machinesSorted.begin(), machinesSorted.end(),
[](const Machine::ptr & a, const Machine::ptr & b) -> bool
{
auto step_(step->state.lock());
if (step_->tries > 0 && step_->after > now) {
if (step_->after < sleepUntil)
sleepUntil = step_->after;
float ta = roundf(a->currentJobs / a->speedFactor);
float tb = roundf(b->currentJobs / b->speedFactor);
return
ta != tb ? ta > tb :
a->speedFactor != b->speedFactor ? a->speedFactor > b->speedFactor :
a->maxJobs > b->maxJobs;
});

/* Find a machine with a free slot and find a step to run
on it. Once we find such a pair, we restart the outer
loop because the machine sorting will have changed. */
keepGoing = false;
system_time now = std::chrono::system_clock::now();

for (auto & machine : machinesSorted) {
// FIXME: can we lose a wakeup if a builder exits concurrently?
if (machine->currentJobs >= machine->maxJobs) continue;

auto runnable_(runnable.lock());
printMsg(lvlDebug, format("%1% runnable builds") % runnable_->size());

/* FIXME: we're holding the runnable lock too long
here. This could be more efficient. */

for (auto i = runnable_->begin(); i != runnable_->end(); ) {
auto step = i->lock();

/* Delete dead steps. */
if (!step) {
i = runnable_->erase(i);
continue;
}

/* Can this machine do this step? */
if (!machine->supportsStep(step)) {
++i;
continue;
}
}

auto reservation = findMachine(step);
if (!reservation) {
printMsg(lvlDebug, format("cannot execute step ‘%1%’ right now") % step->drvPath);
++i;
continue;
}
/* Skip previously failed steps that aren't ready
to be retried. */
{
auto step_(step->state.lock());
if (step_->tries > 0 && step_->after > now) {
if (step_->after < sleepUntil)
sleepUntil = step_->after;
++i;
continue;
}
}

i = runnable_->erase(i);
/* Make a slot reservation and start a thread to
do the build. */
auto reservation = std::make_shared<MachineReservation>(machine);
i = runnable_->erase(i);

auto builderThread = std::thread(&State::builder, this, step, reservation);
builderThread.detach(); // FIXME?
auto builderThread = std::thread(&State::builder, this, step, reservation);
builderThread.detach(); // FIXME?

keepGoing = true;
break;
}

if (keepGoing) break;
}
}

} while (keepGoing);

/* Sleep until we're woken up (either because a runnable build
is added, or because a build finishes). */
Expand All @@ -944,23 +984,6 @@ void State::wakeDispatcher()
}


MachineReservation::ptr State::findMachine(Step::ptr step)
{
auto machines_(machines.lock());

for (auto & machine : *machines_) {
if (!machine->supportsStep(step)) continue;
{
auto currentJobs_(machine->currentJobs.lock());
if (*currentJobs_ >= machine->maxJobs) continue;
}
return std::make_shared<MachineReservation>(machine);
}

return 0;
}


void State::builder(Step::ptr step, MachineReservation::ptr reservation)
{
bool retry = true;
Expand Down Expand Up @@ -1274,9 +1297,8 @@ void State::dumpStatus()
{
auto machines_(machines.lock());
for (auto & m : *machines_) {
auto currentJobs_(m->currentJobs.lock());
printMsg(lvlError, format("machine %1%: %2%/%3% active")
% m->sshName % *currentJobs_ % m->maxJobs);
% m->sshName % m->currentJobs % m->maxJobs);
}
}
}
Expand Down

0 comments on commit a40ca6b

Please sign in to comment.