Skip to content

Commit

Permalink
Merge pull request #668 from NixOS/notifications
Browse files Browse the repository at this point in the history
Turn hydra-notify into a daemon
  • Loading branch information
edolstra committed Aug 14, 2019
2 parents f13a2cb + 92d8d6b commit f17cd94
Show file tree
Hide file tree
Showing 22 changed files with 239 additions and 227 deletions.
17 changes: 17 additions & 0 deletions hydra-module.nix
Expand Up @@ -379,6 +379,23 @@ in
};
};

systemd.services.hydra-notify =
{ wantedBy = [ "multi-user.target" ];
requires = [ "hydra-init.service" ];
after = [ "hydra-init.service" ];
restartTriggers = [ hydraConf ];
environment = env // {
PGPASSFILE = "${baseDir}/pgpass-queue-runner"; # grrr
};
serviceConfig =
{ ExecStart = "@${cfg.package}/bin/hydra-notify hydra-notify";
# FIXME: run this under a less privileged user?
User = "hydra-queue-runner";
Restart = "always";
RestartSec = 5;
};
};

# If there is less than a certain amount of free disk space, stop
# the queue/evaluator to prevent builds from failing or aborting.
systemd.services.hydra-check-space =
Expand Down
59 changes: 27 additions & 32 deletions src/hydra-queue-runner/builder.cc
Expand Up @@ -99,6 +99,8 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
unsigned int maxSilentTime, buildTimeout;
unsigned int repeats = step->isDeterministic ? 1 : 0;

auto conn(dbPool.get());

{
std::set<Build::ptr> dependents;
std::set<Step::ptr> steps;
Expand All @@ -122,8 +124,10 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,

for (auto build2 : dependents) {
if (build2->drvPath == step->drvPath) {
build = build2;
enqueueNotificationItem({NotificationItem::Type::BuildStarted, build->id});
build = build2;
pqxx::work txn(*conn);
notifyBuildStarted(txn, build->id);
txn.commit();
}
{
auto i = jobsetRepeats.find(std::make_pair(build2->projectName, build2->jobsetName));
Expand All @@ -144,8 +148,6 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,

bool quit = buildId == buildOne && step->drvPath == buildDrvPath;

auto conn(dbPool.get());

RemoteResult result;
BuildOutput res;
unsigned int stepNr = 0;
Expand All @@ -170,11 +172,6 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
} catch (...) {
ignoreException();
}

/* Asynchronously run plugins. FIXME: if we're killed,
plugin actions might not be run. Need to ensure
at-least-once semantics. */
enqueueNotificationItem({NotificationItem::Type::StepFinished, buildId, {}, stepNr, result.logFile});
}
});

Expand Down Expand Up @@ -231,6 +228,11 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
time_t stepStopTime = time(0);
if (!result.stopTime) result.stopTime = stepStopTime;

/* For standard failures, we don't care about the error
message. */
if (result.stepStatus != bsAborted)
result.errorMsg = "";

/* Account the time we spent building this step by dividing it
among the jobsets that depend on it. */
{
Expand All @@ -243,6 +245,13 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
}
}

/* Finish the step in the database. */
if (stepNr) {
pqxx::work txn(*conn);
finishBuildStep(txn, result, buildId, stepNr, machine->sshName);
txn.commit();
}

/* The step had a hopefully temporary failure (e.g. network
issue). Retry a number of times. */
if (result.canRetry) {
Expand All @@ -256,11 +265,6 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
}
if (retry) {
auto mc = startDbUpdate();
{
pqxx::work txn(*conn);
finishBuildStep(txn, result, buildId, stepNr, machine->sshName);
txn.commit();
}
stepFinished = true;
if (quit) exit(1);
return sRetry;
Expand Down Expand Up @@ -315,8 +319,6 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,

pqxx::work txn(*conn);

finishBuildStep(txn, result, buildId, stepNr, machine->sshName);

for (auto & b : direct) {
printMsg(lvlInfo, format("marking build %1% as succeeded") % b->id);
markSucceededBuild(txn, b, res, buildId != b->id || result.isCached,
Expand All @@ -342,8 +344,12 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,

/* Send notification about the builds that have this step as
the top-level. */
for (auto id : buildIDs)
enqueueNotificationItem({NotificationItem::Type::BuildFinished, id});
{
pqxx::work txn(*conn);
for (auto id : buildIDs)
notifyBuildFinished(txn, id, {});
txn.commit();
}

/* Wake up any dependent steps that have no other
dependencies. */
Expand All @@ -369,11 +375,6 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,

} else {

/* For standard failures, we don't care about the error
message. */
if (result.stepStatus != bsAborted)
result.errorMsg = "";

/* Register failure in the database for all Build objects that
directly or indirectly depend on this step. */

Expand Down Expand Up @@ -419,11 +420,6 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
result.stepStatus, result.errorMsg, buildId == build2->id ? 0 : buildId);
}

if (result.stepStatus != bsCachedFailure && !stepFinished) {
assert(stepNr);
finishBuildStep(txn, result, buildId, stepNr, machine->sshName);
}

/* Mark all builds that depend on this derivation as failed. */
for (auto & build2 : indirect) {
if (build2->finishedInDB) continue;
Expand Down Expand Up @@ -462,11 +458,10 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,

/* Send notification about this build and its dependents. */
{
auto notificationSenderQueue_(notificationSenderQueue.lock());
notificationSenderQueue_->push(NotificationItem{NotificationItem::Type::BuildFinished, buildId, dependentIDs});
pqxx::work txn(*conn);
notifyBuildFinished(txn, buildId, dependentIDs);
txn.commit();
}
notificationSenderWakeup.notify_one();

}

// FIXME: keep stats about aborted steps?
Expand Down
107 changes: 17 additions & 90 deletions src/hydra-queue-runner/hydra-queue-runner.cc
Expand Up @@ -268,6 +268,9 @@ unsigned int State::createBuildStep(pqxx::work & txn, time_t startTime, BuildID
("insert into BuildStepOutputs (build, stepnr, name, path) values ($1, $2, $3, $4)")
(buildId)(stepNr)(output.first)(output.second.path).exec();

if (status == bsBusy)
txn.exec(fmt("notify step_started, '%d\t%d'", buildId, stepNr));

return stepNr;
}

Expand Down Expand Up @@ -299,6 +302,9 @@ void State::finishBuildStep(pqxx::work & txn, const RemoteResult & result,
(result.timesBuilt, result.timesBuilt > 0)
(result.isNonDeterministic, result.timesBuilt > 1)
.exec();
assert(result.logFile.find('\t') == std::string::npos);
txn.exec(fmt("notify step_finished, '%d\t%d\t%s'",
buildId, stepNr, result.logFile));
}


Expand Down Expand Up @@ -450,74 +456,20 @@ bool State::checkCachedFailure(Step::ptr step, Connection & conn)
}


void State::notificationSender()
void State::notifyBuildStarted(pqxx::work & txn, BuildID buildId)
{
while (true) {
try {

NotificationItem item;
{
auto notificationSenderQueue_(notificationSenderQueue.lock());
while (notificationSenderQueue_->empty())
notificationSenderQueue_.wait(notificationSenderWakeup);
item = notificationSenderQueue_->front();
notificationSenderQueue_->pop();
}

MaintainCount<counter> mc(nrNotificationsInProgress);

printMsg(lvlChatty, format("sending notification about build %1%") % item.id);

auto now1 = std::chrono::steady_clock::now();

Pid pid = startProcess([&]() {
Strings argv;
switch (item.type) {
case NotificationItem::Type::BuildStarted:
argv = {"hydra-notify", "build-started", std::to_string(item.id)};
for (auto id : item.dependentIds)
argv.push_back(std::to_string(id));
break;
case NotificationItem::Type::BuildFinished:
argv = {"hydra-notify", "build-finished", std::to_string(item.id)};
for (auto id : item.dependentIds)
argv.push_back(std::to_string(id));
break;
case NotificationItem::Type::StepFinished:
argv = {"hydra-notify", "step-finished", std::to_string(item.id), std::to_string(item.stepNr), item.logPath};
break;
};
printMsg(lvlChatty, "Executing hydra-notify " + concatStringsSep(" ", argv));
execvp("hydra-notify", (char * *) stringsToCharPtrs(argv).data()); // FIXME: remove cast
throw SysError("cannot start hydra-notify");
});

int res = pid.wait();

if (!statusOk(res))
throw Error("notification about build %d failed: %s", item.id, statusToString(res));

auto now2 = std::chrono::steady_clock::now();

if (item.type == NotificationItem::Type::BuildFinished) {
auto conn(dbPool.get());
pqxx::work txn(*conn);
txn.parameterized
("update Builds set notificationPendingSince = null where id = $1")
(item.id)
.exec();
txn.commit();
}
txn.exec(fmt("notify build_started, '%s'", buildId));
}

nrNotificationTimeMs += std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count();
nrNotificationsDone++;

} catch (std::exception & e) {
nrNotificationsFailed++;
printMsg(lvlError, format("notification sender: %1%") % e.what());
sleep(5);
}
}
void State::notifyBuildFinished(pqxx::work & txn, BuildID buildId,
const std::vector<BuildID> & dependentIds)
{
auto payload = fmt("%d ", buildId);
for (auto & d : dependentIds)
payload += fmt("%d ", d);
// FIXME: apparently parameterized() doesn't support NOTIFY.
txn.exec(fmt("notify build_finished, '%s'", payload));
}


Expand Down Expand Up @@ -589,13 +541,6 @@ void State::dumpStatus(Connection & conn, bool log)
root.attr("nrDbConnections", dbPool.count());
root.attr("nrActiveDbUpdates", nrActiveDbUpdates);
root.attr("memoryTokensInUse", memoryTokens.currentUse());
root.attr("nrNotificationsDone", nrNotificationsDone);
root.attr("nrNotificationsFailed", nrNotificationsFailed);
root.attr("nrNotificationsInProgress", nrNotificationsInProgress);
root.attr("nrNotificationsPending", notificationSenderQueue.lock()->size());
root.attr("nrNotificationTimeMs", nrNotificationTimeMs);
uint64_t nrNotificationsTotal = nrNotificationsDone + nrNotificationsFailed;
root.attr("nrNotificationTimeAvgMs", nrNotificationsTotal == 0 ? 0.0 : (float) nrNotificationTimeMs / nrNotificationsTotal);

{
auto nested = root.object("machines");
Expand Down Expand Up @@ -843,24 +788,6 @@ void State::run(BuildID buildOne)

std::thread(&State::dispatcher, this).detach();

/* Idem for notification sending. */
auto maxConcurrentNotifications = config->getIntOption("max-concurrent-notifications", 2);
for (uint64_t i = 0; i < maxConcurrentNotifications; ++i)
std::thread(&State::notificationSender, this).detach();

/* Enqueue notification items for builds that were finished
previously, but for which we didn't manage to send
notifications. */
{
auto conn(dbPool.get());
pqxx::work txn(*conn);
auto res = txn.parameterized("select id from Builds where notificationPendingSince > 0").exec();
for (auto const & row : res) {
auto id = row["id"].as<BuildID>();
enqueueNotificationItem({NotificationItem::Type::BuildFinished, id});
}
}

/* Periodically clean up orphaned busy steps in the database. */
std::thread([&]() {
while (true) {
Expand Down
6 changes: 2 additions & 4 deletions src/hydra-queue-runner/queue-monitor.cc
Expand Up @@ -193,13 +193,12 @@ bool State::getQueuedBuilds(Connection & conn,
(build->id)
((int) (ex.step->drvPath == build->drvPath ? bsFailed : bsDepFailed))
(time(0)).exec();
notifyBuildFinished(txn, build->id, {});
txn.commit();
build->finishedInDB = true;
nrBuildsDone++;
}

enqueueNotificationItem({NotificationItem::Type::BuildFinished, build->id});

return;
}

Expand Down Expand Up @@ -230,13 +229,12 @@ bool State::getQueuedBuilds(Connection & conn,
time_t now = time(0);
printMsg(lvlInfo, format("marking build %1% as succeeded (cached)") % build->id);
markSucceededBuild(txn, build, res, true, now, now);
notifyBuildFinished(txn, build->id, {});
txn.commit();
}

build->finishedInDB = true;

enqueueNotificationItem({NotificationItem::Type::BuildFinished, build->id});

return;
}

Expand Down
40 changes: 4 additions & 36 deletions src/hydra-queue-runner/state.hh
Expand Up @@ -347,39 +347,6 @@ private:
counter bytesSent{0};
counter bytesReceived{0};
counter nrActiveDbUpdates{0};
counter nrNotificationsDone{0};
counter nrNotificationsFailed{0};
counter nrNotificationsInProgress{0};
counter nrNotificationTimeMs{0};

/* Notification sender work queue. FIXME: if hydra-queue-runner is
killed before it has finished sending notifications about a
build, then the notifications may be lost. It would be better
to mark builds with pending notification in the database. */
struct NotificationItem
{
enum class Type : char {
BuildStarted,
BuildFinished,
StepFinished,
};
Type type;
BuildID id;
std::vector<BuildID> dependentIds;
unsigned int stepNr;
nix::Path logPath;
};
nix::Sync<std::queue<NotificationItem>> notificationSenderQueue;
std::condition_variable notificationSenderWakeup;

void enqueueNotificationItem(const NotificationItem && item)
{
{
auto notificationSenderQueue_(notificationSenderQueue.lock());
notificationSenderQueue_->emplace(item);
}
notificationSenderWakeup.notify_one();
}

/* Specific build to do for --build-one (testing only). */
BuildID buildOne;
Expand Down Expand Up @@ -540,9 +507,10 @@ private:

bool checkCachedFailure(Step::ptr step, Connection & conn);

/* Thread that asynchronously invokes hydra-notify to send build
notifications. */
void notificationSender();
void notifyBuildStarted(pqxx::work & txn, BuildID buildId);

void notifyBuildFinished(pqxx::work & txn, BuildID buildId,
const std::vector<BuildID> & dependentIds);

/* Acquire the global queue runner lock, or null if somebody else
has it. */
Expand Down

0 comments on commit f17cd94

Please sign in to comment.