Skip to content

Commit

Permalink
Reduce substitution memory consumption
Browse files Browse the repository at this point in the history
copyStorePath() now pipes the output of srcStore->narFromPath()
directly into dstStore->addToStore(). The sink used by the former is
converted into a source usable by the latter using
boost::coroutine2. This is based on [1].

This reduces the maximum resident size of

  $ nix build --store ~/my-nix/ /nix/store/b0zlxla7dmy1iwc3g459rjznx59797xy-binutils-2.28.1 --substituters file:///tmp/binary-cache-xz/ --no-require-sigs

from 418592 KiB to 53416 KiB. (The previous commit also reduced the
runtime from ~4.2s to ~3.4s, not sure why.) A further improvement will
be to download files into a Sink.

[1] master...Mathnerd314:dump-fix-coroutine#diff-dcbcac55a634031f9cc73707da6e4b18

Issue #1969.
  • Loading branch information
edolstra committed Mar 16, 2018
1 parent 3e6b194 commit 48662d1
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 37 deletions.
34 changes: 22 additions & 12 deletions src/libstore/local-store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -964,20 +964,11 @@ void LocalStore::invalidatePath(State & state, const Path & path)
}


void LocalStore::addToStore(const ValidPathInfo & info, const ref<std::string> & nar,
void LocalStore::addToStore(const ValidPathInfo & info, Source & source,
RepairFlag repair, CheckSigsFlag checkSigs, std::shared_ptr<FSAccessor> accessor)
{
assert(info.narHash);

Hash h = hashString(htSHA256, *nar);
if (h != info.narHash)
throw Error("hash mismatch importing path '%s'; expected hash '%s', got '%s'",
info.path, info.narHash.to_string(), h.to_string());

if (nar->size() != info.narSize)
throw Error("size mismatch importing path '%s'; expected %s, got %s",
info.path, info.narSize, nar->size());

if (requireSigs && checkSigs && !info.checkSignatures(*this, publicKeys))
throw Error("cannot add path '%s' because it lacks a valid signature", info.path);

Expand All @@ -999,8 +990,27 @@ void LocalStore::addToStore(const ValidPathInfo & info, const ref<std::string> &

deletePath(realPath);

StringSource source(*nar);
restorePath(realPath, source);
/* While restoring the path from the NAR, compute the hash
of the NAR. */
HashSink hashSink(htSHA256);

LambdaSource wrapperSource([&](unsigned char * data, size_t len) -> size_t {
size_t n = source.read(data, len);
hashSink(data, n);
return n;
});

restorePath(realPath, wrapperSource);

auto hashResult = hashSink.finish();

if (hashResult.first != info.narHash)
throw Error("hash mismatch importing path '%s'; expected hash '%s', got '%s'",
info.path, info.narHash.to_string(), hashResult.first.to_string());

if (hashResult.second != info.narSize)
throw Error("size mismatch importing path '%s'; expected %s, got %s",
info.path, info.narSize, hashResult.second);

autoGC();

Expand Down
2 changes: 1 addition & 1 deletion src/libstore/local-store.hh
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public:
void querySubstitutablePathInfos(const PathSet & paths,
SubstitutablePathInfos & infos) override;

void addToStore(const ValidPathInfo & info, const ref<std::string> & nar,
void addToStore(const ValidPathInfo & info, Source & source,
RepairFlag repair, CheckSigsFlag checkSigs,
std::shared_ptr<FSAccessor> accessor) override;

Expand Down
49 changes: 28 additions & 21 deletions src/libstore/store-api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -590,40 +590,32 @@ void copyStorePath(ref<Store> srcStore, ref<Store> dstStore,

uint64_t total = 0;

auto progress = [&](size_t len) {
total += len;
act.progress(total, info->narSize);
};

struct MyStringSink : StringSink
{
typedef std::function<void(size_t)> Callback;
Callback callback;
MyStringSink(Callback callback) : callback(callback) { }
void operator () (const unsigned char * data, size_t len) override
{
StringSink::operator ()(data, len);
callback(len);
};
};

MyStringSink sink(progress);
srcStore->narFromPath({storePath}, sink);

// FIXME
#if 0
if (!info->narHash) {
auto info2 = make_ref<ValidPathInfo>(*info);
info2->narHash = hashString(htSHA256, *sink.s);
if (!info->narSize) info2->narSize = sink.s->size();
info = info2;
}
#endif

if (info->ultimate) {
auto info2 = make_ref<ValidPathInfo>(*info);
info2->ultimate = false;
info = info2;
}

dstStore->addToStore(*info, sink.s, repair, checkSigs);
auto source = sinkToSource([&](Sink & sink) {
LambdaSink wrapperSink([&](const unsigned char * data, size_t len) {
sink(data, len);
total += len;
act.progress(total, info->narSize);
});
srcStore->narFromPath({storePath}, wrapperSink);
});

dstStore->addToStore(*info, *source, repair, checkSigs);
}


Expand Down Expand Up @@ -808,6 +800,21 @@ std::string makeFixedOutputCA(bool recursive, const Hash & hash)
}


void Store::addToStore(const ValidPathInfo & info, Source & narSource,
RepairFlag repair, CheckSigsFlag checkSigs,
std::shared_ptr<FSAccessor> accessor)
{
addToStore(info, make_ref<std::string>(narSource.drain()), repair, checkSigs, accessor);
}

void Store::addToStore(const ValidPathInfo & info, const ref<std::string> & nar,
RepairFlag repair, CheckSigsFlag checkSigs,
std::shared_ptr<FSAccessor> accessor)
{
StringSource source(*nar);
addToStore(info, source, repair, checkSigs, accessor);
}

}


Expand Down
7 changes: 6 additions & 1 deletion src/libstore/store-api.hh
Original file line number Diff line number Diff line change
Expand Up @@ -399,9 +399,14 @@ public:
virtual bool wantMassQuery() { return false; }

/* Import a path into the store. */
virtual void addToStore(const ValidPathInfo & info, Source & narSource,
RepairFlag repair = NoRepair, CheckSigsFlag checkSigs = CheckSigs,
std::shared_ptr<FSAccessor> accessor = 0);

// FIXME: remove
virtual void addToStore(const ValidPathInfo & info, const ref<std::string> & nar,
RepairFlag repair = NoRepair, CheckSigsFlag checkSigs = CheckSigs,
std::shared_ptr<FSAccessor> accessor = 0) = 0;
std::shared_ptr<FSAccessor> accessor = 0);

/* Copy the contents of a path to the store and register the
validity the resulting path. The resulting path is returned.
Expand Down
2 changes: 1 addition & 1 deletion src/libutil/local.mk
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ libutil_DIR := $(d)

libutil_SOURCES := $(wildcard $(d)/*.cc)

libutil_LDFLAGS = $(LIBLZMA_LIBS) -lbz2 -pthread $(OPENSSL_LIBS) $(LIBBROTLI_LIBS)
libutil_LDFLAGS = $(LIBLZMA_LIBS) -lbz2 -pthread $(OPENSSL_LIBS) $(LIBBROTLI_LIBS) -lboost_context

libutil_LIBS = libformat

Expand Down
63 changes: 63 additions & 0 deletions src/libutil/serialise.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
#include <cerrno>
#include <memory>

#include <boost/coroutine2/coroutine.hpp>


namespace nix {

Expand Down Expand Up @@ -88,6 +90,23 @@ void Source::operator () (unsigned char * data, size_t len)
}


std::string Source::drain()
{
std::string s;
std::vector<unsigned char> buf(8192);
while (true) {
size_t n;
try {
n = read(buf.data(), buf.size());
s.append((char *) buf.data(), n);
} catch (EndOfFile &) {
break;
}
}
return s;
}


size_t BufferedSource::read(unsigned char * data, size_t len)
{
if (!buffer) buffer = decltype(buffer)(new unsigned char[bufSize]);
Expand Down Expand Up @@ -138,6 +157,50 @@ size_t StringSource::read(unsigned char * data, size_t len)
}


std::unique_ptr<Source> sinkToSource(std::function<void(Sink &)> fun)
{
struct SinkToSource : Source
{
typedef boost::coroutines2::coroutine<std::string> coro_t;

coro_t::pull_type coro;

SinkToSource(std::function<void(Sink &)> fun)
: coro([&](coro_t::push_type & yield) {
LambdaSink sink([&](const unsigned char * data, size_t len) {
if (len) yield(std::string((const char *) data, len));
});
fun(sink);
})
{
}

std::string cur;
size_t pos = 0;

size_t read(unsigned char * data, size_t len) override
{
if (!coro)
throw EndOfFile("coroutine has finished");

if (pos == cur.size()) {
if (!cur.empty()) coro();
cur = std::move(coro.get());
pos = 0;
}

auto n = std::min(cur.size() - pos, len);
memcpy(data, (unsigned char *) cur.data() + pos, n);
pos += n;

return n;
}
};

return std::make_unique<SinkToSource>(fun);
}


void writePadding(size_t len, Sink & sink)
{
if (len % 8) {
Expand Down
23 changes: 23 additions & 0 deletions src/libutil/serialise.hh
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ struct Source
virtual size_t read(unsigned char * data, size_t len) = 0;

virtual bool good() { return true; }

std::string drain();
};


Expand Down Expand Up @@ -191,6 +193,27 @@ struct LambdaSink : Sink
};


/* Convert a function into a source. */
struct LambdaSource : Source
{
typedef std::function<size_t(unsigned char *, size_t)> lambda_t;

lambda_t lambda;

LambdaSource(const lambda_t & lambda) : lambda(lambda) { }

size_t read(unsigned char * data, size_t len) override
{
return lambda(data, len);
}
};


/* Convert a function that feeds data into a Sink into a Source. The
Source executes the function as a coroutine. */
std::unique_ptr<Source> sinkToSource(std::function<void(Sink &)> fun);


void writePadding(size_t len, Sink & sink);
void writeString(const unsigned char * buf, size_t len, Sink & sink);

Expand Down
2 changes: 1 addition & 1 deletion src/nix-daemon/nix-daemon.cc
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,7 @@ static void performOp(TunnelLogger * logger, ref<LocalStore> store,
parseDump(tee, tee.source);

logger->startWork();
store->addToStore(info, tee.source.data, (RepairFlag) repair,
store.cast<Store>()->addToStore(info, tee.source.data, (RepairFlag) repair,
dontCheckSigs ? NoCheckSigs : CheckSigs, nullptr);
logger->stopWork();
break;
Expand Down

0 comments on commit 48662d1

Please sign in to comment.