Skip to content

Commit

Permalink
Make 'nix copy' to file:// binary caches run in constant memory
Browse files Browse the repository at this point in the history
  • Loading branch information
edolstra committed Jul 13, 2020
1 parent 400f1a9 commit fc84c35
Show file tree
Hide file tree
Showing 9 changed files with 120 additions and 87 deletions.
112 changes: 76 additions & 36 deletions src/libstore/binary-cache-store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ void BinaryCacheStore::init()
}
}

void BinaryCacheStore::upsertFile(const std::string & path,
const std::string & data,
const std::string & mimeType)
{
StringSource source(data);
upsertFile(path, source, mimeType);
}

void BinaryCacheStore::getFile(const std::string & path,
Callback<std::shared_ptr<std::string>> callback) noexcept
{
Expand Down Expand Up @@ -113,13 +121,70 @@ void BinaryCacheStore::writeNarInfo(ref<NarInfo> narInfo)
diskCache->upsertNarInfo(getUri(), hashPart, std::shared_ptr<NarInfo>(narInfo));
}

AutoCloseFD openFile(const Path & path)
{
auto fd = open(path.c_str(), O_RDONLY | O_CLOEXEC);
if (!fd)
throw SysError("opening file '%1%'", path);
return fd;
}

struct FileSource : FdSource
{
AutoCloseFD fd2;

FileSource(const Path & path)
: fd2(openFile(path))
{
fd = fd2.get();
}
};

void BinaryCacheStore::addToStore(const ValidPathInfo & info, Source & narSource,
RepairFlag repair, CheckSigsFlag checkSigs, std::shared_ptr<FSAccessor> accessor)
{
// FIXME: See if we can use the original source to reduce memory usage.
auto nar = make_ref<std::string>(narSource.drain());
assert(info.narHash && info.narSize);

if (!repair && isValidPath(info.path)) {
// FIXME: copyNAR -> null sink
narSource.drain();
return;
}

auto [fdTemp, fnTemp] = createTempFile();

auto now1 = std::chrono::steady_clock::now();

HashSink fileHashSink(htSHA256);

{
FdSink fileSink(fdTemp.get());
TeeSink teeSink(fileSink, fileHashSink);
auto compressionSink = makeCompressionSink(compression, teeSink);
copyNAR(narSource, *compressionSink);
compressionSink->finish();
}

auto now2 = std::chrono::steady_clock::now();

auto narInfo = make_ref<NarInfo>(info);
narInfo->narSize = info.narSize;
narInfo->narHash = info.narHash;
narInfo->compression = compression;
auto [fileHash, fileSize] = fileHashSink.finish();
narInfo->fileHash = fileHash;
narInfo->fileSize = fileSize;
narInfo->url = "nar/" + narInfo->fileHash.to_string(Base32, false) + ".nar"
+ (compression == "xz" ? ".xz" :
compression == "bzip2" ? ".bz2" :
compression == "br" ? ".br" :
"");

if (!repair && isValidPath(info.path)) return;
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count();
printMsg(lvlTalkative, "copying path '%1%' (%2% bytes, compressed %3$.1f%% in %4% ms) to binary cache",
printStorePath(narInfo->path), info.narSize,
((1.0 - (double) fileSize / info.narSize) * 100.0),
duration);

/* Verify that all references are valid. This may do some .narinfo
reads, but typically they'll already be cached. */
Expand All @@ -132,16 +197,7 @@ void BinaryCacheStore::addToStore(const ValidPathInfo & info, Source & narSource
printStorePath(info.path), printStorePath(ref));
}

assert(nar->compare(0, narMagic.size(), narMagic) == 0);

auto narInfo = make_ref<NarInfo>(info);

narInfo->narSize = nar->size();
narInfo->narHash = hashString(htSHA256, *nar);

if (info.narHash && info.narHash != narInfo->narHash)
throw Error("refusing to copy corrupted path '%1%' to binary cache", printStorePath(info.path));

#if 0
auto accessor_ = std::dynamic_pointer_cast<RemoteFSAccessor>(accessor);

auto narAccessor = makeNarAccessor(nar);
Expand All @@ -166,27 +222,9 @@ void BinaryCacheStore::addToStore(const ValidPathInfo & info, Source & narSource

upsertFile(std::string(info.path.to_string()) + ".ls", jsonOut.str(), "application/json");
}
#endif

/* Compress the NAR. */
narInfo->compression = compression;
auto now1 = std::chrono::steady_clock::now();
auto narCompressed = compress(compression, *nar, parallelCompression);
auto now2 = std::chrono::steady_clock::now();
narInfo->fileHash = hashString(htSHA256, *narCompressed);
narInfo->fileSize = narCompressed->size();

auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count();
printMsg(lvlTalkative, "copying path '%1%' (%2% bytes, compressed %3$.1f%% in %4% ms) to binary cache",
printStorePath(narInfo->path), narInfo->narSize,
((1.0 - (double) narCompressed->size() / nar->size()) * 100.0),
duration);

narInfo->url = "nar/" + narInfo->fileHash.to_string(Base32, false) + ".nar"
+ (compression == "xz" ? ".xz" :
compression == "bzip2" ? ".bz2" :
compression == "br" ? ".br" :
"");

#if 0
/* Optionally maintain an index of DWARF debug info files
consisting of JSON files named 'debuginfo/<build-id>' that
specify the NAR file and member containing the debug info. */
Expand Down Expand Up @@ -243,16 +281,18 @@ void BinaryCacheStore::addToStore(const ValidPathInfo & info, Source & narSource
threadPool.process();
}
}
#endif

/* Atomically write the NAR file. */
if (repair || !fileExists(narInfo->url)) {
stats.narWrite++;
upsertFile(narInfo->url, *narCompressed, "application/x-nix-nar");
FileSource source(fnTemp);
upsertFile(narInfo->url, source, "application/x-nix-nar");
} else
stats.narWriteAverted++;

stats.narWriteBytes += nar->size();
stats.narWriteCompressedBytes += narCompressed->size();
stats.narWriteBytes += info.narSize;
stats.narWriteCompressedBytes += fileSize;
stats.narWriteCompressionTimeMs += duration;

/* Atomically write the NAR info file.*/
Expand Down
6 changes: 5 additions & 1 deletion src/libstore/binary-cache-store.hh
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,13 @@ public:
virtual bool fileExists(const std::string & path) = 0;

virtual void upsertFile(const std::string & path,
const std::string & data,
Source & source,
const std::string & mimeType) = 0;

void upsertFile(const std::string & path,
const std::string & data,
const std::string & mimeType);

/* Note: subclasses must implement at least one of the two
following getFile() methods. */

Expand Down
2 changes: 1 addition & 1 deletion src/libstore/daemon.cc
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
if (GET_PROTOCOL_MINOR(clientVersion) >= 21)
source = std::make_unique<TunnelSource>(from, to);
else {
TeeSink tee(from);
TeeParseSink tee(from);
parseDump(tee, tee.source);
saved = std::move(*tee.source.data);
source = std::make_unique<StringSource>(saved);
Expand Down
33 changes: 8 additions & 25 deletions src/libstore/export-import.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,6 @@

namespace nix {

struct HashAndWriteSink : Sink
{
Sink & writeSink;
HashSink hashSink;
HashAndWriteSink(Sink & writeSink) : writeSink(writeSink), hashSink(htSHA256)
{
}
virtual void operator () (const unsigned char * data, size_t len)
{
writeSink(data, len);
hashSink(data, len);
}
Hash currentHash()
{
return hashSink.currentHash().first;
}
};

void Store::exportPaths(const StorePathSet & paths, Sink & sink)
{
auto sorted = topoSortPaths(paths);
Expand All @@ -47,23 +29,24 @@ void Store::exportPath(const StorePath & path, Sink & sink)
{
auto info = queryPathInfo(path);

HashAndWriteSink hashAndWriteSink(sink);
HashSink hashSink(htSHA256);
TeeSink teeSink(sink, hashSink);

narFromPath(path, hashAndWriteSink);
narFromPath(path, teeSink);

/* Refuse to export paths that have changed. This prevents
filesystem corruption from spreading to other machines.
Don't complain if the stored hash is zero (unknown). */
Hash hash = hashAndWriteSink.currentHash();
Hash hash = hashSink.currentHash().first;
if (hash != info->narHash && info->narHash != Hash(*info->narHash.type))
throw Error("hash of path '%s' has changed from '%s' to '%s'!",
printStorePath(path), info->narHash.to_string(Base32, true), hash.to_string(Base32, true));

hashAndWriteSink
teeSink
<< exportMagic
<< printStorePath(path);
writeStorePaths(*this, hashAndWriteSink, info->references);
hashAndWriteSink
writeStorePaths(*this, teeSink, info->references);
teeSink
<< (info->deriver ? printStorePath(*info->deriver) : "")
<< 0;
}
Expand All @@ -77,7 +60,7 @@ StorePaths Store::importPaths(Source & source, std::shared_ptr<FSAccessor> acces
if (n != 1) throw Error("input doesn't look like something created by 'nix-store --export'");

/* Extract the NAR from the source. */
TeeSink tee(source);
TeeParseSink tee(source);
parseDump(tee, tee.source);

uint32_t magic = readInt(source);
Expand Down
4 changes: 2 additions & 2 deletions src/libstore/http-binary-cache-store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,11 @@ class HttpBinaryCacheStore : public BinaryCacheStore
}

void upsertFile(const std::string & path,
const std::string & data,
Source & source,
const std::string & mimeType) override
{
auto req = FileTransferRequest(cacheUri + "/" + path);
req.data = std::make_shared<string>(data); // FIXME: inefficient
req.data = std::make_shared<string>(source.drain());
req.mimeType = mimeType;
try {
getFileTransfer()->upload(req);
Expand Down
30 changes: 11 additions & 19 deletions src/libstore/local-binary-cache-store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,17 @@ class LocalBinaryCacheStore : public BinaryCacheStore
bool fileExists(const std::string & path) override;

void upsertFile(const std::string & path,
const std::string & data,
const std::string & mimeType) override;
Source & source,
const std::string & mimeType)
{
auto path2 = binaryCacheDir + "/" + path;
Path tmp = path2 + ".tmp." + std::to_string(getpid());
AutoDelete del(tmp, false);
writeFile(tmp, source);
if (rename(tmp.c_str(), path2.c_str()))
throw SysError("renaming '%1%' to '%2%'", tmp, path2);
del.cancel();
}

void getFile(const std::string & path, Sink & sink) override
{
Expand Down Expand Up @@ -70,28 +79,11 @@ void LocalBinaryCacheStore::init()
BinaryCacheStore::init();
}

static void atomicWrite(const Path & path, const std::string & s)
{
Path tmp = path + ".tmp." + std::to_string(getpid());
AutoDelete del(tmp, false);
writeFile(tmp, s);
if (rename(tmp.c_str(), path.c_str()))
throw SysError("renaming '%1%' to '%2%'", tmp, path);
del.cancel();
}

bool LocalBinaryCacheStore::fileExists(const std::string & path)
{
return pathExists(binaryCacheDir + "/" + path);
}

void LocalBinaryCacheStore::upsertFile(const std::string & path,
const std::string & data,
const std::string & mimeType)
{
atomicWrite(binaryCacheDir + "/" + path, data);
}

static RegisterStoreImplementation regStore([](
const std::string & uri, const Store::Params & params)
-> std::shared_ptr<Store>
Expand Down
3 changes: 2 additions & 1 deletion src/libstore/s3-binary-cache-store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -355,9 +355,10 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
stats.put++;
}

void upsertFile(const std::string & path, const std::string & data,
void upsertFile(const std::string & path, Source & source,
const std::string & mimeType) override
{
auto data = source.drain();
if (narinfoCompression != "" && hasSuffix(path, ".narinfo"))
uploadFile(path, *compress(narinfoCompression, data), mimeType, narinfoCompression);
else if (lsCompression != "" && hasSuffix(path, ".ls"))
Expand Down
4 changes: 2 additions & 2 deletions src/libutil/archive.hh
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ struct ParseSink
virtual void createSymlink(const Path & path, const string & target) { };
};

struct TeeSink : ParseSink
struct TeeParseSink : ParseSink
{
TeeSource source;

TeeSink(Source & source) : source(source) { }
TeeParseSink(Source & source) : source(source) { }
};

void parseDump(ParseSink & sink, Source & source);
Expand Down
13 changes: 13 additions & 0 deletions src/libutil/serialise.hh
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,19 @@ struct StringSource : Source
};


/* A sink that writes all incoming data to two other sinks. */
struct TeeSink : Sink
{
Sink & sink1, & sink2;
TeeSink(Sink & sink1, Sink & sink2) : sink1(sink1), sink2(sink2) { }
virtual void operator () (const unsigned char * data, size_t len)
{
sink1(data, len);
sink2(data, len);
}
};


/* Adapter class of a Source that saves all data read to `s'. */
struct TeeSource : Source
{
Expand Down

0 comments on commit fc84c35

Please sign in to comment.