Skip to content

Commit

Permalink
add a way to tag s3 nars
Browse files Browse the repository at this point in the history
if a nar contains a nix-support/tags.json file of the form {key: string
value}, then the nar file (plus .narinfo and other accompanying files)
is tagged as "nix:key" -> "value".
  • Loading branch information
symphorien committed Oct 30, 2023
1 parent a6e5879 commit 8c538f4
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 28 deletions.
38 changes: 29 additions & 9 deletions src/libstore/binary-cache-store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,10 @@ void BinaryCacheStore::init()

void BinaryCacheStore::upsertFile(const std::string & path,
std::string && data,
const std::string & mimeType)
const std::string & mimeType,
std::map<std::string, std::string> tags)
{
upsertFile(path, std::make_shared<std::stringstream>(std::move(data)), mimeType);
upsertFile(path, std::make_shared<std::stringstream>(std::move(data)), mimeType, tags);
}

void BinaryCacheStore::getFile(const std::string & path,
Expand Down Expand Up @@ -104,11 +105,11 @@ std::string BinaryCacheStore::narInfoFileFor(const StorePath & storePath)
return std::string(storePath.hashPart()) + ".narinfo";
}

void BinaryCacheStore::writeNarInfo(ref<NarInfo> narInfo)
void BinaryCacheStore::writeNarInfo(ref<NarInfo> narInfo, std::map<std::string, std::string> tags)
{
auto narInfoFile = narInfoFileFor(narInfo->path);

upsertFile(narInfoFile, narInfo->to_string(*this), "text/x-nix-narinfo");
upsertFile(narInfoFile, narInfo->to_string(*this), "text/x-nix-narinfo", tags);

{
auto state_(state.lock());
Expand Down Expand Up @@ -137,6 +138,8 @@ ref<const ValidPathInfo> BinaryCacheStore::addToStoreCommon(

AutoDelete autoDelete(fnTemp);

const std::string tagsFile = "/nix-support/tags.json";

auto now1 = std::chrono::steady_clock::now();

/* Read the NAR simultaneously into a CompressionSink+FileSink (to
Expand All @@ -151,7 +154,8 @@ ref<const ValidPathInfo> BinaryCacheStore::addToStoreCommon(
auto compressionSink = makeCompressionSink(compression, teeSinkCompressed, parallelCompression, compressionLevel);
TeeSink teeSinkUncompressed { *compressionSink, narHashSink };
TeeSource teeSource { narSource, teeSinkUncompressed };
narAccessor = makeNarAccessor(teeSource);
// index the nar, and keep the content of tagsFile in memory in case it is present.
narAccessor = makeNarAccessor(teeSource, [&tagsFile](Path& path) { return path == tagsFile; });
compressionSink->finish();
fileSink.flush();
}
Expand Down Expand Up @@ -190,6 +194,22 @@ ref<const ValidPathInfo> BinaryCacheStore::addToStoreCommon(
printStorePath(info.path), printStorePath(ref));
}

/* load tags if applicable */
std::map<std::string, std::string> tags;
if (narAccessor->stat(tagsFile).type == FSAccessor::tRegular) {
std::string tags_str = narAccessor->readFile(tagsFile);
try {
nlohmann::json tags_json = nlohmann::json::parse(tags_str);
for (const auto& kv: tags_json.items()) {
std::string key = kv.key();
std::string value = kv.value().template get<std::string>();
tags[key] = value;
}
} catch (const nlohmann::json::exception& e) {
printMsg(lvlWarn, "could not read nar tags from %s in %s: %s", tagsFile, printStorePath(info.path), e.what());
}
}

/* Optionally write a JSON file containing a listing of the
contents of the NAR. */
if (writeNARListing) {
Expand All @@ -198,7 +218,7 @@ ref<const ValidPathInfo> BinaryCacheStore::addToStoreCommon(
{"root", listNar(ref<FSAccessor>(narAccessor), "", true)},
};

upsertFile(std::string(info.path.hashPart()) + ".ls", j.dump(), "application/json");
upsertFile(std::string(info.path.hashPart()) + ".ls", j.dump(), "application/json", tags);
}

/* Optionally maintain an index of DWARF debug info files
Expand All @@ -225,7 +245,7 @@ ref<const ValidPathInfo> BinaryCacheStore::addToStoreCommon(

printMsg(lvlTalkative, "creating debuginfo link from '%s' to '%s'", key, target);

upsertFile(key, json.dump(), "application/json");
upsertFile(key, json.dump(), "application/json", tags);
};

std::regex regex1("^[0-9a-f]{2}$");
Expand Down Expand Up @@ -263,7 +283,7 @@ ref<const ValidPathInfo> BinaryCacheStore::addToStoreCommon(
stats.narWrite++;
upsertFile(narInfo->url,
std::make_shared<std::fstream>(fnTemp, std::ios_base::in | std::ios_base::binary),
"application/x-nix-nar");
"application/x-nix-nar", tags);
} else
stats.narWriteAverted++;

Expand All @@ -274,7 +294,7 @@ ref<const ValidPathInfo> BinaryCacheStore::addToStoreCommon(
/* Atomically write the NAR info file.*/
if (secretKey) narInfo->sign(*this, *secretKey);

writeNarInfo(narInfo);
writeNarInfo(narInfo, tags);

stats.narInfoWrite++;

Expand Down
8 changes: 5 additions & 3 deletions src/libstore/binary-cache-store.hh
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,14 @@ public:

virtual void upsertFile(const std::string & path,
std::shared_ptr<std::basic_iostream<char>> istream,
const std::string & mimeType) = 0;
const std::string & mimeType,
std::map<std::string, std::string> tags = {}) = 0;

void upsertFile(const std::string & path,
// FIXME: use std::string_view
std::string && data,
const std::string & mimeType);
const std::string & mimeType,
std::map<std::string, std::string> tags = {});

/**
* Dump the contents of the specified file to a sink.
Expand All @@ -105,7 +107,7 @@ private:

std::string narInfoFileFor(const StorePath & storePath);

void writeNarInfo(ref<NarInfo> narInfo);
void writeNarInfo(ref<NarInfo> narInfo, std::map<std::string, std::string> tags={});

ref<const ValidPathInfo> addToStoreCommon(
Source & narSource, RepairFlag repair, CheckSigsFlag checkSigs,
Expand Down
5 changes: 4 additions & 1 deletion src/libstore/http-binary-cache-store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,11 @@ class HttpBinaryCacheStore : public virtual HttpBinaryCacheStoreConfig, public v

void upsertFile(const std::string & path,
std::shared_ptr<std::basic_iostream<char>> istream,
const std::string & mimeType) override
const std::string & mimeType,
std::map<std::string, std::string> tags) override
{
(void)tags;

auto req = makeRequest(path);
req.data = StreamToSourceAdapter(istream).drain();
req.mimeType = mimeType;
Expand Down
5 changes: 4 additions & 1 deletion src/libstore/local-binary-cache-store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,11 @@ class LocalBinaryCacheStore : public virtual LocalBinaryCacheStoreConfig, public

void upsertFile(const std::string & path,
std::shared_ptr<std::basic_iostream<char>> istream,
const std::string & mimeType) override
const std::string & mimeType,
std::map<std::string, std::string> tags) override
{
(void)tags;

auto path2 = binaryCacheDir + "/" + path;
static std::atomic<int> counter{0};
Path tmp = fmt("%s.tmp.%d.%d", path2, getpid(), ++counter);
Expand Down
32 changes: 25 additions & 7 deletions src/libstore/nar-accessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ struct NarMember

/* If this is a directory, all the children of the directory. */
std::map<std::string, NarMember> children;

std::optional<std::string> content;
};

struct NarAccessor : public FSAccessor
Expand All @@ -33,6 +35,8 @@ struct NarAccessor : public FSAccessor

NarMember root;

std::optional<std::function<bool(Path&)>> loadContentsFilter;

struct NarIndexer : ParseSink, Source
{
NarAccessor & acc;
Expand All @@ -44,6 +48,8 @@ struct NarAccessor : public FSAccessor

uint64_t pos = 0;

std::optional<Path> currentRegularFile;

NarIndexer(NarAccessor & acc, Source & source)
: acc(acc), source(source)
{ }
Expand Down Expand Up @@ -72,10 +78,13 @@ struct NarAccessor : public FSAccessor
void createRegularFile(const Path & path) override
{
createMember(path, {FSAccessor::Type::tRegular, false, 0, 0});
currentRegularFile = path;
}

void closeRegularFile() override
{ }
{
currentRegularFile = std::nullopt;
}

void isExecutable() override
{
Expand All @@ -90,7 +99,13 @@ struct NarAccessor : public FSAccessor
}

void receiveContents(std::string_view data) override
{ }
{
if (acc.loadContentsFilter != std::nullopt) {
if (acc.loadContentsFilter.value()(currentRegularFile.value())) {
parents.top()->content = std::string(data);
}
}
}

void createSymlink(const Path & path, const std::string & target) override
{
Expand All @@ -113,7 +128,7 @@ struct NarAccessor : public FSAccessor
parseDump(indexer, indexer);
}

NarAccessor(Source & source)
NarAccessor(Source & source, std::optional<std::function<bool(Path&)>> _loadContentsFilter): loadContentsFilter(_loadContentsFilter)
{
NarIndexer indexer(*this, source);
parseDump(indexer, indexer);
Expand Down Expand Up @@ -213,8 +228,11 @@ struct NarAccessor : public FSAccessor

if (getNarBytes) return getNarBytes(i.start, i.size);

assert(nar);
return std::string(*nar, i.start, i.size);
if (nar) return std::string(*nar, i.start, i.size);

if (i.content != std::nullopt) return i.content.value();

throw Error("content of path %1% is not available with this NarAccessor", path);
}

std::string readLink(const Path & path) override
Expand All @@ -231,9 +249,9 @@ ref<FSAccessor> makeNarAccessor(std::string && nar)
return make_ref<NarAccessor>(std::move(nar));
}

ref<FSAccessor> makeNarAccessor(Source & source)
ref<FSAccessor> makeNarAccessor(Source & source, std::optional<std::function<bool(Path&)>> loadContentsFilter)
{
return make_ref<NarAccessor>(source);
return make_ref<NarAccessor>(source, loadContentsFilter);
}

ref<FSAccessor> makeLazyNarAccessor(const std::string & listing,
Expand Down
12 changes: 11 additions & 1 deletion src/libstore/nar-accessor.hh
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <functional>

#include <nlohmann/json_fwd.hpp>
#include <optional>
#include "fs-accessor.hh"

namespace nix {
Expand All @@ -16,7 +17,16 @@ struct Source;
*/
ref<FSAccessor> makeNarAccessor(std::string && nar);

ref<FSAccessor> makeNarAccessor(Source & source);
/**
* Return an object that provides access to the contents of a NAR
* file.
*
* the readFile() member function only works for files where loadContentsFilter
* return true. The content of those files is stored in memory. For
* other files, and if loadContentsFilter is nullopt, readFile() throws
* an error.
*/
ref<FSAccessor> makeNarAccessor(Source & source, std::optional<std::function<bool(Path&)>> loadContentsFilter = std::nullopt);

/**
* Create a NAR accessor from a NAR listing (in the format produced by
Expand Down
34 changes: 28 additions & 6 deletions src/libstore/s3-binary-cache-store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "globals.hh"
#include "compression.hh"
#include "filetransfer.hh"
#include "url.hh"

#include <aws/core/Aws.h>
#include <aws/core/VersionConfig.h>
Expand Down Expand Up @@ -347,12 +348,27 @@ struct S3BinaryCacheStoreImpl : virtual S3BinaryCacheStoreConfig, public virtual
void uploadFile(const std::string & path,
std::shared_ptr<std::basic_iostream<char>> istream,
const std::string & mimeType,
const std::string & contentEncoding)
const std::string & contentEncoding,
std::map<std::string, std::string> tags)
{
istream->seekg(0, istream->end);
auto size = istream->tellg();
istream->seekg(0, istream->beg);

std::map<std::string, std::string> actual_tags;
const std::string prefix = "nix:";
for (const auto& kv: tags) {
if (kv.first.size() > 128 - prefix.size()) {
printMsg(lvlWarn, "tag %s in %s is too long for s3 upload", kv.first, path);
continue;
}
if (kv.second.size() > 256) {
printMsg(lvlWarn, "tag value %s in %s is too long for s3 upload", kv.second, path);
continue;
}
actual_tags[prefix + kv.first] = kv.second;
}

auto maxThreads = std::thread::hardware_concurrency();

static std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor>
Expand Down Expand Up @@ -418,6 +434,11 @@ struct S3BinaryCacheStoreImpl : virtual S3BinaryCacheStoreConfig, public virtual
if (contentEncoding != "")
request.SetContentEncoding(contentEncoding);

if (!actual_tags.empty()) {
request.SetTagging(encodeQuery(actual_tags));
}


request.SetBody(istream);

auto result = checkAws(fmt("AWS error uploading '%s'", path),
Expand All @@ -440,7 +461,8 @@ struct S3BinaryCacheStoreImpl : virtual S3BinaryCacheStoreConfig, public virtual

void upsertFile(const std::string & path,
std::shared_ptr<std::basic_iostream<char>> istream,
const std::string & mimeType) override
const std::string & mimeType,
std::map<std::string, std::string> tags) override
{
auto compress = [&](std::string compression)
{
Expand All @@ -449,13 +471,13 @@ struct S3BinaryCacheStoreImpl : virtual S3BinaryCacheStoreConfig, public virtual
};

if (narinfoCompression != "" && hasSuffix(path, ".narinfo"))
uploadFile(path, compress(narinfoCompression), mimeType, narinfoCompression);
uploadFile(path, compress(narinfoCompression), mimeType, narinfoCompression, tags);
else if (lsCompression != "" && hasSuffix(path, ".ls"))
uploadFile(path, compress(lsCompression), mimeType, lsCompression);
uploadFile(path, compress(lsCompression), mimeType, lsCompression, tags);
else if (logCompression != "" && hasPrefix(path, "log/"))
uploadFile(path, compress(logCompression), mimeType, logCompression);
uploadFile(path, compress(logCompression), mimeType, logCompression, tags);
else
uploadFile(path, istream, mimeType, "");
uploadFile(path, istream, mimeType, "", tags);
}

void getFile(const std::string & path, Sink & sink) override
Expand Down
1 change: 1 addition & 0 deletions src/libutil/url.hh
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ std::string percentDecode(std::string_view in);
std::string percentEncode(std::string_view s, std::string_view keep="");

std::map<std::string, std::string> decodeQuery(const std::string & query);
std::string encodeQuery(const std::map<std::string, std::string> & params);

ParsedURL parseURL(const std::string & url);

Expand Down

0 comments on commit 8c538f4

Please sign in to comment.