Skip to content

Commit

Permalink
Abort unsupported build steps
Browse files Browse the repository at this point in the history
If we don't see machine that supports a build step for
'max_unsupported_time' seconds, the step is aborted. The default is 0,
which is appropriate for Hydra installations that don't provision
missing machines dynamically.
  • Loading branch information
edolstra committed Mar 26, 2020
1 parent 5b73100 commit f5cdbfe
Show file tree
Hide file tree
Showing 4 changed files with 200 additions and 84 deletions.
178 changes: 95 additions & 83 deletions src/hydra-queue-runner/builder.cc
Expand Up @@ -376,109 +376,121 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
}
}

} else {
} else
failStep(*conn, step, buildId, result, machine, stepFinished, quit);

/* Register failure in the database for all Build objects that
directly or indirectly depend on this step. */
// FIXME: keep stats about aborted steps?
nrStepsDone++;
totalStepTime += stepStopTime - stepStartTime;
totalStepBuildTime += result.stopTime - result.startTime;
machine->state->nrStepsDone++;
machine->state->totalStepTime += stepStopTime - stepStartTime;
machine->state->totalStepBuildTime += result.stopTime - result.startTime;

std::vector<BuildID> dependentIDs;
if (quit) exit(0); // testing hack; FIXME: this won't run plugins

while (true) {
/* Get the builds and steps that depend on this step. */
std::set<Build::ptr> indirect;
{
auto steps_(steps.lock());
std::set<Step::ptr> steps;
getDependents(step, indirect, steps);

/* If there are no builds left, delete all referring
steps from ‘steps’. As for the success case, we can
be certain no new referrers can be added. */
if (indirect.empty()) {
for (auto & s : steps) {
printMsg(lvlDebug, "finishing build step ‘%s’",
localStore->printStorePath(s->drvPath));
steps_->erase(s->drvPath);
}
}
}
return sDone;
}

if (indirect.empty() && stepFinished) break;

/* Update the database. */
{
auto mc = startDbUpdate();
void State::failStep(
Connection & conn,
Step::ptr step,
BuildID buildId,
const RemoteResult & result,
Machine::ptr machine,
bool & stepFinished,
bool & quit)
{
/* Register failure in the database for all Build objects that
directly or indirectly depend on this step. */

pqxx::work txn(*conn);
std::vector<BuildID> dependentIDs;

/* Create failed build steps for every build that
depends on this, except when this step is cached
and is the top-level of that build (since then it's
redundant with the build's isCachedBuild field). */
for (auto & build2 : indirect) {
if ((result.stepStatus == bsCachedFailure && build2->drvPath == step->drvPath) ||
(result.stepStatus != bsCachedFailure && buildId == build2->id) ||
build2->finishedInDB)
continue;
createBuildStep(txn, 0, build2->id, step, machine->sshName,
result.stepStatus, result.errorMsg, buildId == build2->id ? 0 : buildId);
while (true) {
/* Get the builds and steps that depend on this step. */
std::set<Build::ptr> indirect;
{
auto steps_(steps.lock());
std::set<Step::ptr> steps;
getDependents(step, indirect, steps);

/* If there are no builds left, delete all referring
steps from ‘steps’. As for the success case, we can
be certain no new referrers can be added. */
if (indirect.empty()) {
for (auto & s : steps) {
printMsg(lvlDebug, "finishing build step ‘%s’",
localStore->printStorePath(s->drvPath));
steps_->erase(s->drvPath);
}
}
}

/* Mark all builds that depend on this derivation as failed. */
for (auto & build2 : indirect) {
if (build2->finishedInDB) continue;
printMsg(lvlError, format("marking build %1% as failed") % build2->id);
txn.parameterized
("update Builds set finished = 1, buildStatus = $2, startTime = $3, stopTime = $4, isCachedBuild = $5, notificationPendingSince = $4 where id = $1 and finished = 0")
(build2->id)
((int) (build2->drvPath != step->drvPath && result.buildStatus() == bsFailed ? bsDepFailed : result.buildStatus()))
(result.startTime)
(result.stopTime)
(result.stepStatus == bsCachedFailure ? 1 : 0).exec();
nrBuildsDone++;
}
if (indirect.empty() && stepFinished) break;

/* Remember failed paths in the database so that they
won't be built again. */
if (result.stepStatus != bsCachedFailure && result.canCache)
for (auto & path : step->drv->outputPaths())
txn.parameterized("insert into FailedPaths values ($1)")(localStore->printStorePath(path)).exec();
/* Update the database. */
{
auto mc = startDbUpdate();

txn.commit();
pqxx::work txn(conn);

/* Create failed build steps for every build that
depends on this, except when this step is cached
and is the top-level of that build (since then it's
redundant with the build's isCachedBuild field). */
for (auto & build : indirect) {
if ((result.stepStatus == bsCachedFailure && build->drvPath == step->drvPath) ||
((result.stepStatus != bsCachedFailure && result.stepStatus != bsUnsupported) && buildId == build->id) ||
build->finishedInDB)
continue;
createBuildStep(txn,
0, build->id, step, machine ? machine->sshName : "",
result.stepStatus, result.errorMsg, buildId == build->id ? 0 : buildId);
}

stepFinished = true;

/* Remove the indirect dependencies from ‘builds’. This
will cause them to be destroyed. */
for (auto & b : indirect) {
auto builds_(builds.lock());
b->finishedInDB = true;
builds_->erase(b->id);
dependentIDs.push_back(b->id);
if (buildOne == b->id) quit = true;
/* Mark all builds that depend on this derivation as failed. */
for (auto & build : indirect) {
if (build->finishedInDB) continue;
printMsg(lvlError, format("marking build %1% as failed") % build->id);
txn.parameterized
("update Builds set finished = 1, buildStatus = $2, startTime = $3, stopTime = $4, isCachedBuild = $5, notificationPendingSince = $4 where id = $1 and finished = 0")
(build->id)
((int) (build->drvPath != step->drvPath && result.buildStatus() == bsFailed ? bsDepFailed : result.buildStatus()))
(result.startTime)
(result.stopTime)
(result.stepStatus == bsCachedFailure ? 1 : 0).exec();
nrBuildsDone++;
}
}

/* Send notification about this build and its dependents. */
{
pqxx::work txn(*conn);
notifyBuildFinished(txn, buildId, dependentIDs);
/* Remember failed paths in the database so that they
won't be built again. */
if (result.stepStatus != bsCachedFailure && result.canCache)
for (auto & path : step->drv->outputPaths())
txn.parameterized("insert into FailedPaths values ($1)")(localStore->printStorePath(path)).exec();

txn.commit();
}
}

// FIXME: keep stats about aborted steps?
nrStepsDone++;
totalStepTime += stepStopTime - stepStartTime;
totalStepBuildTime += result.stopTime - result.startTime;
machine->state->nrStepsDone++;
machine->state->totalStepTime += stepStopTime - stepStartTime;
machine->state->totalStepBuildTime += result.stopTime - result.startTime;
stepFinished = true;

if (quit) exit(0); // testing hack; FIXME: this won't run plugins
/* Remove the indirect dependencies from ‘builds’. This
will cause them to be destroyed. */
for (auto & b : indirect) {
auto builds_(builds.lock());
b->finishedInDB = true;
builds_->erase(b->id);
dependentIDs.push_back(b->id);
if (buildOne == b->id) quit = true;
}
}

return sDone;
/* Send notification about this build and its dependents. */
{
pqxx::work txn(conn);
notifyBuildFinished(txn, buildId, dependentIDs);
txn.commit();
}
}


Expand Down
85 changes: 85 additions & 0 deletions src/hydra-queue-runner/dispatcher.cc
Expand Up @@ -300,6 +300,8 @@ system_time State::doDispatch()

} while (keepGoing);

abortUnsupported();

return sleepUntil;
}

Expand All @@ -314,6 +316,89 @@ void State::wakeDispatcher()
}


void State::abortUnsupported()
{
/* Make a copy of 'runnable' and 'machines' so we don't block them
very long. */
auto runnable2 = *runnable.lock();
auto machines2 = *machines.lock();

system_time now = std::chrono::system_clock::now();
auto now2 = time(0);

std::unordered_set<Step::ptr> aborted;

for (auto & wstep : runnable2) {
auto step(wstep.lock());
if (!step) continue;

bool supported = false;
for (auto & machine : machines2) {
if (machine.second->supportsStep(step)) {
step->state.lock()->lastSupported = now;
supported = true;
break;
}
}

if (!supported
&& std::chrono::duration_cast<std::chrono::seconds>(now - step->state.lock()->lastSupported).count() >= maxUnsupportedTime)
{
printError("aborting unsupported build step '%s' (type '%s')",
localStore->printStorePath(step->drvPath),
step->systemType);

aborted.insert(step);

auto conn(dbPool.get());

std::set<Build::ptr> dependents;
std::set<Step::ptr> steps;
getDependents(step, dependents, steps);

/* Maybe the step got cancelled. */
if (dependents.empty()) continue;

/* Find the build that has this step as the top-level (if
any). */
Build::ptr build;
for (auto build2 : dependents) {
if (build2->drvPath == step->drvPath)
build = build2;
}
if (!build) build = *dependents.begin();

bool stepFinished = false;
bool quit = false;

failStep(
*conn, step, build->id,
RemoteResult {
.stepStatus = bsUnsupported,
.errorMsg = fmt("unsupported system type '%s'",
step->systemType),
.startTime = now2,
.stopTime = now2,
},
nullptr, stepFinished, quit);

if (quit) exit(1);
}
}

/* Clean up 'runnable'. */
{
auto runnable_(runnable.lock());
for (auto i = runnable_->begin(); i != runnable_->end(); ) {
if (aborted.count(i->lock()))
i = runnable_->erase(i);
else
++i;
}
}
}


void Jobset::addStep(time_t startTime, time_t duration)
{
auto steps_(steps.lock());
Expand Down
1 change: 1 addition & 0 deletions src/hydra-queue-runner/hydra-queue-runner.cc
Expand Up @@ -46,6 +46,7 @@ std::string getEnvOrDie(const std::string & key)

State::State()
: config(std::make_unique<::Config>())
, maxUnsupportedTime(config->getIntOption("max_unsupported_time", 0))
, dbPool(config->getIntOption("max_db_connections", 128))
, memoryTokens(config->getIntOption("nar_buffer_size", getMemSize() / 2))
, maxOutputSize(config->getIntOption("max_output_size", 2ULL << 30))
Expand Down
20 changes: 19 additions & 1 deletion src/hydra-queue-runner/state.hh
Expand Up @@ -68,7 +68,7 @@ struct RemoteResult
std::unique_ptr<nix::TokenServer::Token> tokens;
std::shared_ptr<nix::FSAccessor> accessor;

BuildStatus buildStatus()
BuildStatus buildStatus() const
{
return stepStatus == bsCachedFailure ? bsFailed : stepStatus;
}
Expand Down Expand Up @@ -198,6 +198,10 @@ struct Step

/* The time at which this step became runnable. */
system_time runnableSince;

/* The time that we last saw a machine that supports this
step. */
system_time lastSupported = std::chrono::system_clock::now();
};

std::atomic_bool finished{false}; // debugging
Expand Down Expand Up @@ -303,6 +307,9 @@ private:
const float retryBackoff = 3.0;
const unsigned int maxParallelCopyClosure = 4;

/* Time in seconds before unsupported build steps are aborted. */
const unsigned int maxUnsupportedTime = 0;

nix::Path hydraData, logDir;

bool useSubstitutes = false;
Expand Down Expand Up @@ -483,6 +490,15 @@ private:
Build::ptr referringBuild, Step::ptr referringStep, std::set<nix::StorePath> & finishedDrvs,
std::set<Step::ptr> & newSteps, std::set<Step::ptr> & newRunnable);

void failStep(
Connection & conn,
Step::ptr step,
BuildID buildId,
const RemoteResult & result,
Machine::ptr machine,
bool & stepFinished,
bool & quit);

Jobset::ptr createJobset(pqxx::work & txn,
const std::string & projectName, const std::string & jobsetName);

Expand All @@ -497,6 +513,8 @@ private:

void wakeDispatcher();

void abortUnsupported();

void builder(MachineReservation::ptr reservation);

/* Perform the given build step. Return true if the step is to be
Expand Down

0 comments on commit f5cdbfe

Please sign in to comment.