Skip to content

Commit fe1f34f

Browse files
committed
Low-latency closure copy
This adds a new store operation 'addMultipleToStore' that reads a number of NARs and ValidPathInfos from a Source, allowing any number of store paths to be copied in a single call. This is much faster on high-latency links when copying a lot of small files, like .drv closures. For example, on a connection with an 50 ms delay: Before: $ nix copy --to 'unix:///tmp/proxy-socket?root=/tmp/dest-chroot' \ /nix/store/90jjw94xiyg5drj70whm9yll6xjj0ca9-hello-2.10.drv \ --derivation --no-check-sigs real 0m57.868s user 0m0.103s sys 0m0.056s After: real 0m0.690s user 0m0.017s sys 0m0.011s
1 parent 9957315 commit fe1f34f

File tree

8 files changed

+161
-89
lines changed

8 files changed

+161
-89
lines changed

src/libstore/daemon.cc

Lines changed: 17 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -243,23 +243,6 @@ struct ClientSettings
243243
}
244244
};
245245

246-
static void writeValidPathInfo(
247-
ref<Store> store,
248-
unsigned int clientVersion,
249-
Sink & to,
250-
std::shared_ptr<const ValidPathInfo> info)
251-
{
252-
to << (info->deriver ? store->printStorePath(*info->deriver) : "")
253-
<< info->narHash.to_string(Base16, false);
254-
worker_proto::write(*store, to, info->references);
255-
to << info->registrationTime << info->narSize;
256-
if (GET_PROTOCOL_MINOR(clientVersion) >= 16) {
257-
to << info->ultimate
258-
<< info->sigs
259-
<< renderContentAddress(info->ca);
260-
}
261-
}
262-
263246
static std::vector<DerivedPath> readDerivedPaths(Store & store, unsigned int clientVersion, Source & from)
264247
{
265248
std::vector<DerivedPath> reqs;
@@ -422,9 +405,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
422405
}();
423406
logger->stopWork();
424407

425-
to << store->printStorePath(pathInfo->path);
426-
writeValidPathInfo(store, clientVersion, to, pathInfo);
427-
408+
pathInfo->write(to, *store, GET_PROTOCOL_MINOR(clientVersion));
428409
} else {
429410
HashType hashAlgo;
430411
std::string baseName;
@@ -471,6 +452,21 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
471452
break;
472453
}
473454

455+
case wopAddMultipleToStore: {
456+
bool repair, dontCheckSigs;
457+
from >> repair >> dontCheckSigs;
458+
if (!trusted && dontCheckSigs)
459+
dontCheckSigs = false;
460+
461+
logger->startWork();
462+
FramedSource source(from);
463+
store->addMultipleToStore(source,
464+
RepairFlag{repair},
465+
dontCheckSigs ? NoCheckSigs : CheckSigs);
466+
logger->stopWork();
467+
break;
468+
}
469+
474470
case wopAddTextToStore: {
475471
string suffix = readString(from);
476472
string s = readString(from);
@@ -505,17 +501,6 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
505501
break;
506502
}
507503

508-
case wopImportPaths2: {
509-
logger->startWork();
510-
auto paths = store->importPaths(from,
511-
trusted ? NoCheckSigs : CheckSigs);
512-
logger->stopWork();
513-
Strings paths2;
514-
for (auto & i : paths) paths2.push_back(store->printStorePath(i));
515-
to << paths2;
516-
break;
517-
}
518-
519504
case wopBuildPaths: {
520505
auto drvs = readDerivedPaths(*store, clientVersion, from);
521506
BuildMode mode = bmNormal;
@@ -781,7 +766,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
781766
if (info) {
782767
if (GET_PROTOCOL_MINOR(clientVersion) >= 17)
783768
to << 1;
784-
writeValidPathInfo(store, clientVersion, to, info);
769+
info->write(to, *store, GET_PROTOCOL_MINOR(clientVersion), false);
785770
} else {
786771
assert(GET_PROTOCOL_MINOR(clientVersion) >= 17);
787772
to << 0;

src/libstore/path-info.cc

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
#include "path-info.hh"
2+
#include "worker-protocol.hh"
3+
4+
namespace nix {
5+
6+
ValidPathInfo ValidPathInfo::read(Source & source, const Store & store, unsigned int format)
7+
{
8+
return read(source, store, format, store.parseStorePath(readString(source)));
9+
}
10+
11+
ValidPathInfo ValidPathInfo::read(Source & source, const Store & store, unsigned int format, StorePath && path)
12+
{
13+
auto deriver = readString(source);
14+
auto narHash = Hash::parseAny(readString(source), htSHA256);
15+
ValidPathInfo info(path, narHash);
16+
if (deriver != "") info.deriver = store.parseStorePath(deriver);
17+
info.references = worker_proto::read(store, source, Phantom<StorePathSet> {});
18+
source >> info.registrationTime >> info.narSize;
19+
if (format >= 16) {
20+
source >> info.ultimate;
21+
info.sigs = readStrings<StringSet>(source);
22+
info.ca = parseContentAddressOpt(readString(source));
23+
}
24+
return info;
25+
}
26+
27+
void ValidPathInfo::write(
28+
Sink & sink,
29+
const Store & store,
30+
unsigned int format,
31+
bool includePath) const
32+
{
33+
if (includePath)
34+
sink << store.printStorePath(path);
35+
sink << (deriver ? store.printStorePath(*deriver) : "")
36+
<< narHash.to_string(Base16, false);
37+
worker_proto::write(store, sink, references);
38+
sink << registrationTime << narSize;
39+
if (format >= 16) {
40+
sink << ultimate
41+
<< sigs
42+
<< renderContentAddress(ca);
43+
}
44+
}
45+
46+
}

src/libstore/path-info.hh

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,11 @@ struct ValidPathInfo
105105
ValidPathInfo(const StorePath & path, Hash narHash) : path(path), narHash(narHash) { };
106106

107107
virtual ~ValidPathInfo() { }
108+
109+
static ValidPathInfo read(Source & source, const Store & store, unsigned int format);
110+
static ValidPathInfo read(Source & source, const Store & store, unsigned int format, StorePath && path);
111+
112+
void write(Sink & sink, const Store & store, unsigned int format, bool includePath = true) const;
108113
};
109114

110115
typedef std::map<StorePath, ValidPathInfo> ValidPathInfos;

src/libstore/remote-store.cc

Lines changed: 26 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -386,23 +386,6 @@ void RemoteStore::querySubstitutablePathInfos(const StorePathCAMap & pathsMap, S
386386
}
387387

388388

389-
ref<const ValidPathInfo> RemoteStore::readValidPathInfo(ConnectionHandle & conn, const StorePath & path)
390-
{
391-
auto deriver = readString(conn->from);
392-
auto narHash = Hash::parseAny(readString(conn->from), htSHA256);
393-
auto info = make_ref<ValidPathInfo>(path, narHash);
394-
if (deriver != "") info->deriver = parseStorePath(deriver);
395-
info->references = worker_proto::read(*this, conn->from, Phantom<StorePathSet> {});
396-
conn->from >> info->registrationTime >> info->narSize;
397-
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 16) {
398-
conn->from >> info->ultimate;
399-
info->sigs = readStrings<StringSet>(conn->from);
400-
info->ca = parseContentAddressOpt(readString(conn->from));
401-
}
402-
return info;
403-
}
404-
405-
406389
void RemoteStore::queryPathInfoUncached(const StorePath & path,
407390
Callback<std::shared_ptr<const ValidPathInfo>> callback) noexcept
408391
{
@@ -423,7 +406,8 @@ void RemoteStore::queryPathInfoUncached(const StorePath & path,
423406
bool valid; conn->from >> valid;
424407
if (!valid) throw InvalidPath("path '%s' is not valid", printStorePath(path));
425408
}
426-
info = readValidPathInfo(conn, path);
409+
info = std::make_shared<ValidPathInfo>(
410+
ValidPathInfo::read(conn->from, *this, GET_PROTOCOL_MINOR(conn->daemonVersion), StorePath{path}));
427411
}
428412
callback(std::move(info));
429413
} catch (...) { callback.rethrow(); }
@@ -525,8 +509,8 @@ ref<const ValidPathInfo> RemoteStore::addCAToStore(
525509
});
526510
}
527511

528-
auto path = parseStorePath(readString(conn->from));
529-
return readValidPathInfo(conn, path);
512+
return make_ref<ValidPathInfo>(
513+
ValidPathInfo::read(conn->from, *this, GET_PROTOCOL_MINOR(conn->daemonVersion)));
530514
}
531515
else {
532516
if (repair) throw Error("repairing is not supported when building through the Nix daemon protocol < 1.25");
@@ -642,6 +626,25 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source,
642626
}
643627

644628

629+
void RemoteStore::addMultipleToStore(
630+
Source & source,
631+
RepairFlag repair,
632+
CheckSigsFlag checkSigs)
633+
{
634+
if (GET_PROTOCOL_MINOR(getConnection()->daemonVersion) >= 32) {
635+
auto conn(getConnection());
636+
conn->to
637+
<< wopAddMultipleToStore
638+
<< repair
639+
<< !checkSigs;
640+
conn.withFramedSink([&](Sink & sink) {
641+
source.drainInto(sink);
642+
});
643+
} else
644+
Store::addMultipleToStore(source, repair, checkSigs);
645+
}
646+
647+
645648
StorePath RemoteStore::addTextToStore(const string & name, const string & s,
646649
const StorePathSet & references, RepairFlag repair)
647650
{
@@ -885,16 +888,6 @@ void RemoteStore::queryMissing(const std::vector<DerivedPath> & targets,
885888
}
886889

887890

888-
StorePaths RemoteStore::importPaths(Source & source, CheckSigsFlag checkSigs)
889-
{
890-
auto conn(getConnection());
891-
conn->to << wopImportPaths2;
892-
source.drainInto(conn->to);
893-
conn.processStderr();
894-
return worker_proto::read(*this, conn->from, Phantom<StorePaths> {});
895-
}
896-
897-
898891
void RemoteStore::connect()
899892
{
900893
auto conn(getConnection());
@@ -1021,14 +1014,14 @@ std::exception_ptr RemoteStore::Connection::processStderr(Sink * sink, Source *
10211014
return nullptr;
10221015
}
10231016

1024-
void ConnectionHandle::withFramedSink(std::function<void(Sink &sink)> fun)
1017+
void ConnectionHandle::withFramedSink(std::function<void(Sink & sink)> fun)
10251018
{
10261019
(*this)->to.flush();
10271020

10281021
std::exception_ptr ex;
10291022

1030-
/* Handle log messages / exceptions from the remote on a
1031-
separate thread. */
1023+
/* Handle log messages / exceptions from the remote on a separate
1024+
thread. */
10321025
std::thread stderrThread([&]()
10331026
{
10341027
try {
@@ -1061,7 +1054,6 @@ void ConnectionHandle::withFramedSink(std::function<void(Sink &sink)> fun)
10611054
stderrThread.join();
10621055
if (ex)
10631056
std::rethrow_exception(ex);
1064-
10651057
}
10661058

10671059
}

src/libstore/remote-store.hh

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,11 @@ public:
7878
void addToStore(const ValidPathInfo & info, Source & nar,
7979
RepairFlag repair, CheckSigsFlag checkSigs) override;
8080

81+
void addMultipleToStore(
82+
Source & source,
83+
RepairFlag repair,
84+
CheckSigsFlag checkSigs) override;
85+
8186
StorePath addTextToStore(const string & name, const string & s,
8287
const StorePathSet & references, RepairFlag repair) override;
8388

@@ -112,8 +117,6 @@ public:
112117
StorePathSet & willBuild, StorePathSet & willSubstitute, StorePathSet & unknown,
113118
uint64_t & downloadSize, uint64_t & narSize) override;
114119

115-
StorePaths importPaths(Source & source, CheckSigsFlag checkSigs) override;
116-
117120
void connect() override;
118121

119122
unsigned int getProtocol() override;
@@ -153,8 +156,6 @@ protected:
153156

154157
virtual void narFromPath(const StorePath & path, Sink & sink) override;
155158

156-
ref<const ValidPathInfo> readValidPathInfo(ConnectionHandle & conn, const StorePath & path);
157-
158159
private:
159160

160161
std::atomic_bool failed{false};

src/libstore/store-api.cc

Lines changed: 54 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,20 @@ StorePath Store::addToStore(const string & name, const Path & _srcPath,
250250
}
251251

252252

253+
void Store::addMultipleToStore(
254+
Source & source,
255+
RepairFlag repair,
256+
CheckSigsFlag checkSigs)
257+
{
258+
auto expected = readNum<uint64_t>(source);
259+
for (uint64_t i = 0; i < expected; ++i) {
260+
auto info = ValidPathInfo::read(source, *this, 16);
261+
info.ultimate = false;
262+
addToStore(info, source, repair, checkSigs);
263+
}
264+
}
265+
266+
253267
/*
254268
The aim of this function is to compute in one pass the correct ValidPathInfo for
255269
the files that we are trying to add to the store. To accomplish that in one
@@ -771,6 +785,19 @@ const Store::Stats & Store::getStats()
771785
}
772786

773787

788+
static std::string makeCopyPathMessage(
789+
std::string_view srcUri,
790+
std::string_view dstUri,
791+
std::string_view storePath)
792+
{
793+
return srcUri == "local" || srcUri == "daemon"
794+
? fmt("copying path '%s' to '%s'", storePath, dstUri)
795+
: dstUri == "local" || dstUri == "daemon"
796+
? fmt("copying path '%s' from '%s'", storePath, srcUri)
797+
: fmt("copying path '%s' from '%s' to '%s'", storePath, srcUri, dstUri);
798+
}
799+
800+
774801
void copyStorePath(
775802
Store & srcStore,
776803
Store & dstStore,
@@ -780,14 +807,10 @@ void copyStorePath(
780807
{
781808
auto srcUri = srcStore.getUri();
782809
auto dstUri = dstStore.getUri();
783-
810+
auto storePathS = srcStore.printStorePath(storePath);
784811
Activity act(*logger, lvlInfo, actCopyPath,
785-
srcUri == "local" || srcUri == "daemon"
786-
? fmt("copying path '%s' to '%s'", srcStore.printStorePath(storePath), dstUri)
787-
: dstUri == "local" || dstUri == "daemon"
788-
? fmt("copying path '%s' from '%s'", srcStore.printStorePath(storePath), srcUri)
789-
: fmt("copying path '%s' from '%s' to '%s'", srcStore.printStorePath(storePath), srcUri, dstUri),
790-
{srcStore.printStorePath(storePath), srcUri, dstUri});
812+
makeCopyPathMessage(srcUri, dstUri, storePathS),
813+
{storePathS, srcUri, dstUri});
791814
PushActivity pact(act.id);
792815

793816
auto info = srcStore.queryPathInfo(storePath);
@@ -896,19 +919,31 @@ std::map<StorePath, StorePath> copyPaths(
896919
for (auto & path : storePaths)
897920
pathsMap.insert_or_assign(path, path);
898921

899-
// FIXME: Temporary hack to copy closures in a single round-trip.
900-
if (dynamic_cast<RemoteStore *>(&dstStore)) {
901-
if (!missing.empty()) {
902-
auto source = sinkToSource([&](Sink & sink) {
903-
srcStore.exportPaths(missing, sink);
904-
});
905-
dstStore.importPaths(*source, NoCheckSigs);
922+
Activity act(*logger, lvlInfo, actCopyPaths, fmt("copying %d paths", missing.size()));
923+
924+
auto sorted = srcStore.topoSortPaths(missing);
925+
std::reverse(sorted.begin(), sorted.end());
926+
927+
auto source = sinkToSource([&](Sink & sink) {
928+
sink << sorted.size();
929+
for (auto & storePath : sorted) {
930+
auto srcUri = srcStore.getUri();
931+
auto dstUri = dstStore.getUri();
932+
auto storePathS = srcStore.printStorePath(storePath);
933+
Activity act(*logger, lvlInfo, actCopyPath,
934+
makeCopyPathMessage(srcUri, dstUri, storePathS),
935+
{storePathS, srcUri, dstUri});
936+
PushActivity pact(act.id);
937+
938+
auto info = srcStore.queryPathInfo(storePath);
939+
info->write(sink, srcStore, 16);
940+
srcStore.narFromPath(storePath, sink);
906941
}
907-
return pathsMap;
908-
}
942+
});
909943

910-
Activity act(*logger, lvlInfo, actCopyPaths, fmt("copying %d paths", missing.size()));
944+
dstStore.addMultipleToStore(*source, repair, checkSigs);
911945

946+
#if 0
912947
std::atomic<size_t> nrDone{0};
913948
std::atomic<size_t> nrFailed{0};
914949
std::atomic<uint64_t> bytesExpected{0};
@@ -986,6 +1021,8 @@ std::map<StorePath, StorePath> copyPaths(
9861021
nrDone++;
9871022
showProgress();
9881023
});
1024+
#endif
1025+
9891026
return pathsMap;
9901027
}
9911028

0 commit comments

Comments
 (0)