Skip to content

Commit

Permalink
Update to reflect BinaryCacheStore changes
Browse files Browse the repository at this point in the history
BinaryCacheStore no longer implements buildPaths() and ensurePath(),
so we need to use copyPath() / copyClosure().
  • Loading branch information
edolstra committed Oct 7, 2016
1 parent fb0d2d2 commit ee2e9f5
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 32 deletions.
3 changes: 2 additions & 1 deletion hydra-module.nix
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ in
hydra_logo ${cfg.logo}
''}
gc_roots_dir ${cfg.gcRootsDir}
use-substitutes ${if cfg.useSubstitutes then "1" else "0"}
'';

environment.systemPackages = [ cfg.package ];
Expand Down Expand Up @@ -325,7 +326,7 @@ in
IN_SYSTEMD = "1"; # to get log severity levels
};
serviceConfig =
{ ExecStart = "@${cfg.package}/bin/hydra-queue-runner hydra-queue-runner -v --option build-use-substitutes ${if cfg.useSubstitutes then "true" else "false"}";
{ ExecStart = "@${cfg.package}/bin/hydra-queue-runner hydra-queue-runner -v";
ExecStopPost = "${cfg.package}/bin/hydra-queue-runner --unlock";
User = "hydra-queue-runner";
Restart = "always";
Expand Down
2 changes: 1 addition & 1 deletion src/hydra-queue-runner/build-remote.cc
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ void State::buildRemote(ref<Store> destStore,
a no-op for regular stores, but for the binary cache store,
this will copy the inputs to the binary cache from the local
store. */
destStore->buildPaths(basicDrv.inputSrcs);
copyClosure(ref<Store>(localStore), destStore, basicDrv.inputSrcs);

/* Copy the input closure. */
if (/* machine->sshName != "localhost" */ true) {
Expand Down
16 changes: 8 additions & 8 deletions src/hydra-queue-runner/hydra-queue-runner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,6 @@ MaintainCount State::startDbUpdate()
}


ref<Store> State::getLocalStore()
{
return ref<Store>(_localStore);
}


ref<Store> State::getDestStore()
{
return ref<Store>(_destStore);
Expand Down Expand Up @@ -797,14 +791,20 @@ void State::run(BuildID buildOne)
if (!lock)
throw Error("hydra-queue-runner is already running");

_localStore = openStore();
localStore = openStore();

if (hydraConfig["store_uri"] == "") {
_destStore = _localStore;
_destStore = localStore;
} else {
_destStore = openStoreAt(hydraConfig["store_uri"]);
}

auto isTrue = [](const std::string & s) {
return s == "1" || s == "true";
};

useSubstitutes = isTrue(hydraConfig["use-substitutes"]);

{
auto conn(dbPool.get());
clearBusy(*conn, 0);
Expand Down
47 changes: 31 additions & 16 deletions src/hydra-queue-runner/queue-monitor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,12 @@ void State::queueMonitorLoop()
receiver buildsBumped(*conn, "builds_bumped");
receiver jobsetSharesChanged(*conn, "jobset_shares_changed");

auto localStore = getLocalStore();
auto destStore = getDestStore();

unsigned int lastBuildId = 0;

while (true) {
bool done = getQueuedBuilds(*conn, localStore, destStore, lastBuildId);
bool done = getQueuedBuilds(*conn, destStore, lastBuildId);

/* Sleep until we get notification from the database about an
event. */
Expand Down Expand Up @@ -69,7 +68,7 @@ struct PreviousFailure : public std::exception {
};


bool State::getQueuedBuilds(Connection & conn, ref<Store> localStore,
bool State::getQueuedBuilds(Connection & conn,
ref<Store> destStore, unsigned int & lastBuildId)
{
printMsg(lvlInfo, format("checking the queue for builds > %1%...") % lastBuildId);
Expand Down Expand Up @@ -415,29 +414,44 @@ Step::ptr State::createStep(ref<Store> destStore,
bool valid = true;
PathSet outputs = step->drv.outputPaths();
DerivationOutputs missing;
PathSet missingPaths;
for (auto & i : step->drv.outputs)
if (!destStore->isValidPath(i.second.path)) {
valid = false;
missing[i.first] = i.second;
missingPaths.insert(i.second.path);
}

/* Try to substitute the missing paths. Note: can't use the more
efficient querySubstitutablePaths() here because upstream Hydra
servers don't allow it (they have "WantMassQuery: 0"). */
assert(missing.size() == missingPaths.size());
if (!missing.empty() && settings.useSubstitutes) {
SubstitutablePathInfos infos;
destStore->querySubstitutablePathInfos(missingPaths, infos); // FIXME
if (infos.size() == missingPaths.size()) {
/* Try to copy the missing paths from the local store or from
substitutes. */
if (!missing.empty()) {

size_t avail = 0;
for (auto & i : missing) {
if (/* localStore != destStore && */ localStore->isValidPath(i.second.path))
avail++;
else if (useSubstitutes) {
SubstitutablePathInfos infos;
localStore->querySubstitutablePathInfos({i.second.path}, infos);
if (infos.size() == 1)
avail++;
}
}

if (missing.size() == avail) {
valid = true;
for (auto & i : missing) {
try {
printMsg(lvlInfo, format("substituting output ‘%1%’ of ‘%2%’") % i.second.path % drvPath);

time_t startTime = time(0);
destStore->ensurePath(i.second.path);

if (localStore->isValidPath(i.second.path))
printInfo("copying output ‘%1%’ of ‘%2%’ from local store", i.second.path, drvPath);
else {
printInfo("substituting output ‘%1%’ of ‘%2%’", i.second.path, drvPath);
localStore->ensurePath(i.second.path);
// FIXME: should copy directly from substituter to destStore.
}

copyClosure(ref<Store>(localStore), destStore, {i.second.path});

time_t stopTime = time(0);

{
Expand All @@ -448,6 +462,7 @@ Step::ptr State::createStep(ref<Store> destStore,
}

} catch (Error & e) {
printError("while copying/substituting output ‘%s’ of ‘%s’: %s", i.second.path, drvPath, e.what());
valid = false;
break;
}
Expand Down
10 changes: 4 additions & 6 deletions src/hydra-queue-runner/state.hh
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,8 @@ private:

std::map<std::string, std::string> hydraConfig;

bool useSubstitutes = false;

/* The queued builds. */
typedef std::map<BuildID, Build::ptr> Builds;
nix::Sync<Builds> builds;
Expand Down Expand Up @@ -374,7 +376,7 @@ private:

std::atomic<time_t> lastDispatcherCheck{0};

std::shared_ptr<nix::Store> _localStore;
std::shared_ptr<nix::Store> localStore;
std::shared_ptr<nix::Store> _destStore;

/* Token server to prevent threads from allocating too many big
Expand All @@ -401,10 +403,6 @@ private:

MaintainCount startDbUpdate();

/* Return a store object that can access derivations produced by
hydra-evaluator. */
nix::ref<nix::Store> getLocalStore();

/* Return a store object to store build results. */
nix::ref<nix::Store> getDestStore();

Expand Down Expand Up @@ -436,7 +434,7 @@ private:
void queueMonitorLoop();

/* Check the queue for new builds. */
bool getQueuedBuilds(Connection & conn, nix::ref<nix::Store> localStore,
bool getQueuedBuilds(Connection & conn,
nix::ref<nix::Store> destStore, unsigned int & lastBuildId);

/* Handle cancellation, deletion and priority bumps. */
Expand Down

0 comments on commit ee2e9f5

Please sign in to comment.