Skip to content

Commit

Permalink
add a way to tag s3 nars
Browse files Browse the repository at this point in the history
if a nar contains a nix-support/tags.json file of the form {key: string
value}, then the nar file (plus .narinfo and other accompanying files)
is tagged as "nix:key" -> "value".
  • Loading branch information
symphorien committed Nov 3, 2023
1 parent 66cb364 commit 5bf011f
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 28 deletions.
40 changes: 31 additions & 9 deletions src/libstore/binary-cache-store.cc
@@ -1,5 +1,6 @@
#include "archive.hh"
#include "binary-cache-store.hh"
#include "canon-path.hh"
#include "compression.hh"
#include "derivations.hh"
#include "source-accessor.hh"
Expand Down Expand Up @@ -61,9 +62,10 @@ void BinaryCacheStore::init()

void BinaryCacheStore::upsertFile(const std::string & path,
std::string && data,
const std::string & mimeType)
const std::string & mimeType,
std::map<std::string, std::string> tags)
{
upsertFile(path, std::make_shared<std::stringstream>(std::move(data)), mimeType);
upsertFile(path, std::make_shared<std::stringstream>(std::move(data)), mimeType, tags);
}

void BinaryCacheStore::getFile(const std::string & path,
Expand Down Expand Up @@ -104,11 +106,11 @@ std::string BinaryCacheStore::narInfoFileFor(const StorePath & storePath)
return std::string(storePath.hashPart()) + ".narinfo";
}

void BinaryCacheStore::writeNarInfo(ref<NarInfo> narInfo)
void BinaryCacheStore::writeNarInfo(ref<NarInfo> narInfo, std::map<std::string, std::string> tags)
{
auto narInfoFile = narInfoFileFor(narInfo->path);

upsertFile(narInfoFile, narInfo->to_string(*this), "text/x-nix-narinfo");
upsertFile(narInfoFile, narInfo->to_string(*this), "text/x-nix-narinfo", tags);

{
auto state_(state.lock());
Expand Down Expand Up @@ -137,6 +139,8 @@ ref<const ValidPathInfo> BinaryCacheStore::addToStoreCommon(

AutoDelete autoDelete(fnTemp);

const CanonPath tagsFile { "/nix-support/tags.json" };

auto now1 = std::chrono::steady_clock::now();

/* Read the NAR simultaneously into a CompressionSink+FileSink (to
Expand All @@ -151,7 +155,8 @@ ref<const ValidPathInfo> BinaryCacheStore::addToStoreCommon(
auto compressionSink = makeCompressionSink(compression, teeSinkCompressed, parallelCompression, compressionLevel);
TeeSink teeSinkUncompressed { *compressionSink, narHashSink };
TeeSource teeSource { narSource, teeSinkUncompressed };
narAccessor = makeNarAccessor(teeSource);
// index the nar, and keep the content of tagsFile in memory in case it is present.
narAccessor = makeNarAccessor(teeSource, [&tagsFile](CanonPath& path) { return path == tagsFile; });
compressionSink->finish();
fileSink.flush();
}
Expand Down Expand Up @@ -190,6 +195,23 @@ ref<const ValidPathInfo> BinaryCacheStore::addToStoreCommon(
printStorePath(info.path), printStorePath(ref));
}

/* load tags if applicable */
std::map<std::string, std::string> tags;
auto lstat = narAccessor->maybeLstat(tagsFile);
if (lstat.has_value() && lstat->type == SourceAccessor::Type::tRegular) {
std::string tags_str = narAccessor->readFile(tagsFile);
try {
nlohmann::json tags_json = nlohmann::json::parse(tags_str);
for (const auto& kv: tags_json.items()) {
std::string key = kv.key();
std::string value = kv.value().template get<std::string>();
tags[key] = value;
}
} catch (const nlohmann::json::exception& e) {
printMsg(lvlWarn, "could not read nar tags from %s in %s: %s", tagsFile, printStorePath(info.path), e.what());
}
}

/* Optionally write a JSON file containing a listing of the
contents of the NAR. */
if (writeNARListing) {
Expand All @@ -198,7 +220,7 @@ ref<const ValidPathInfo> BinaryCacheStore::addToStoreCommon(
{"root", listNar(ref<SourceAccessor>(narAccessor), CanonPath::root, true)},
};

upsertFile(std::string(info.path.hashPart()) + ".ls", j.dump(), "application/json");
upsertFile(std::string(info.path.hashPart()) + ".ls", j.dump(), "application/json", tags);
}

/* Optionally maintain an index of DWARF debug info files
Expand All @@ -225,7 +247,7 @@ ref<const ValidPathInfo> BinaryCacheStore::addToStoreCommon(

printMsg(lvlTalkative, "creating debuginfo link from '%s' to '%s'", key, target);

upsertFile(key, json.dump(), "application/json");
upsertFile(key, json.dump(), "application/json", tags);
};

std::regex regex1("^[0-9a-f]{2}$");
Expand Down Expand Up @@ -263,7 +285,7 @@ ref<const ValidPathInfo> BinaryCacheStore::addToStoreCommon(
stats.narWrite++;
upsertFile(narInfo->url,
std::make_shared<std::fstream>(fnTemp, std::ios_base::in | std::ios_base::binary),
"application/x-nix-nar");
"application/x-nix-nar", tags);
} else
stats.narWriteAverted++;

Expand All @@ -274,7 +296,7 @@ ref<const ValidPathInfo> BinaryCacheStore::addToStoreCommon(
/* Atomically write the NAR info file.*/
if (secretKey) narInfo->sign(*this, *secretKey);

writeNarInfo(narInfo);
writeNarInfo(narInfo, tags);

stats.narInfoWrite++;

Expand Down
8 changes: 5 additions & 3 deletions src/libstore/binary-cache-store.hh
Expand Up @@ -73,12 +73,14 @@ public:

virtual void upsertFile(const std::string & path,
std::shared_ptr<std::basic_iostream<char>> istream,
const std::string & mimeType) = 0;
const std::string & mimeType,
std::map<std::string, std::string> tags = {}) = 0;

void upsertFile(const std::string & path,
// FIXME: use std::string_view
std::string && data,
const std::string & mimeType);
const std::string & mimeType,
std::map<std::string, std::string> tags = {});

/**
* Dump the contents of the specified file to a sink.
Expand All @@ -105,7 +107,7 @@ private:

std::string narInfoFileFor(const StorePath & storePath);

void writeNarInfo(ref<NarInfo> narInfo);
void writeNarInfo(ref<NarInfo> narInfo, std::map<std::string, std::string> tags={});

ref<const ValidPathInfo> addToStoreCommon(
Source & narSource, RepairFlag repair, CheckSigsFlag checkSigs,
Expand Down
5 changes: 4 additions & 1 deletion src/libstore/http-binary-cache-store.cc
Expand Up @@ -130,8 +130,11 @@ class HttpBinaryCacheStore : public virtual HttpBinaryCacheStoreConfig, public v

void upsertFile(const std::string & path,
std::shared_ptr<std::basic_iostream<char>> istream,
const std::string & mimeType) override
const std::string & mimeType,
std::map<std::string, std::string> tags) override
{
(void)tags;

auto req = makeRequest(path);
req.data = StreamToSourceAdapter(istream).drain();
req.mimeType = mimeType;
Expand Down
5 changes: 4 additions & 1 deletion src/libstore/local-binary-cache-store.cc
Expand Up @@ -56,8 +56,11 @@ class LocalBinaryCacheStore : public virtual LocalBinaryCacheStoreConfig, public

void upsertFile(const std::string & path,
std::shared_ptr<std::basic_iostream<char>> istream,
const std::string & mimeType) override
const std::string & mimeType,
std::map<std::string, std::string> tags) override
{
(void)tags;

auto path2 = binaryCacheDir + "/" + path;
static std::atomic<int> counter{0};
Path tmp = fmt("%s.tmp.%d.%d", path2, getpid(), ++counter);
Expand Down
34 changes: 27 additions & 7 deletions src/libstore/nar-accessor.cc
Expand Up @@ -17,6 +17,8 @@ struct NarMember

/* If this is a directory, all the children of the directory. */
std::map<std::string, NarMember> children;

std::optional<std::string> content;
};

struct NarAccessor : public SourceAccessor
Expand All @@ -27,6 +29,8 @@ struct NarAccessor : public SourceAccessor

NarMember root;

std::optional<std::function<bool(CanonPath&)>> loadContentsFilter;

struct NarIndexer : ParseSink, Source
{
NarAccessor & acc;
Expand All @@ -38,6 +42,8 @@ struct NarAccessor : public SourceAccessor

uint64_t pos = 0;

std::optional<CanonPath> currentRegularFile;

NarIndexer(NarAccessor & acc, Source & source)
: acc(acc), source(source)
{ }
Expand Down Expand Up @@ -66,10 +72,13 @@ struct NarAccessor : public SourceAccessor
void createRegularFile(const Path & path) override
{
createMember(path, {Type::tRegular, false, 0, 0});
currentRegularFile = CanonPath(path);
}

void closeRegularFile() override
{ }
{
currentRegularFile = std::nullopt;
}

void isExecutable() override
{
Expand All @@ -85,7 +94,13 @@ struct NarAccessor : public SourceAccessor
}

void receiveContents(std::string_view data) override
{ }
{
if (acc.loadContentsFilter != std::nullopt) {
if (acc.loadContentsFilter.value()(currentRegularFile.value())) {
parents.top()->content = std::string(data);
}
}
}

void createSymlink(const Path & path, const std::string & target) override
{
Expand All @@ -110,7 +125,7 @@ struct NarAccessor : public SourceAccessor
parseDump(indexer, indexer);
}

NarAccessor(Source & source)
NarAccessor(Source & source, std::optional<std::function<bool(CanonPath&)>> _loadContentsFilter): loadContentsFilter(_loadContentsFilter)
{
NarIndexer indexer(*this, source);
parseDump(indexer, indexer);
Expand Down Expand Up @@ -200,8 +215,13 @@ struct NarAccessor : public SourceAccessor

if (getNarBytes) return getNarBytes(*i.stat.narOffset, *i.stat.fileSize);

assert(nar);
return std::string(*nar, *i.stat.narOffset, *i.stat.fileSize);
if (nar) {
return std::string(*nar, *i.stat.narOffset, *i.stat.fileSize);
}

if (i.content != std::nullopt) return i.content.value();

throw Error("content of path %1% is not available with this NarAccessor", path);
}

std::string readLink(const CanonPath & path) override
Expand All @@ -218,9 +238,9 @@ ref<SourceAccessor> makeNarAccessor(std::string && nar)
return make_ref<NarAccessor>(std::move(nar));
}

ref<SourceAccessor> makeNarAccessor(Source & source)
ref<SourceAccessor> makeNarAccessor(Source & source, std::optional<std::function<bool(CanonPath&)>> loadContentsFilter)
{
return make_ref<NarAccessor>(source);
return make_ref<NarAccessor>(source, loadContentsFilter);
}

ref<SourceAccessor> makeLazyNarAccessor(const std::string & listing,
Expand Down
12 changes: 11 additions & 1 deletion src/libstore/nar-accessor.hh
Expand Up @@ -4,6 +4,7 @@
#include "source-accessor.hh"

#include <functional>
#include <optional>

#include <nlohmann/json_fwd.hpp>

Expand All @@ -17,7 +18,16 @@ struct Source;
*/
ref<SourceAccessor> makeNarAccessor(std::string && nar);

ref<SourceAccessor> makeNarAccessor(Source & source);
/**
* Return an object that provides access to the contents of a NAR
* file.
*
* the readFile() member function only works for files where loadContentsFilter
* return true. The content of those files is stored in memory. For
* other files, and if loadContentsFilter is nullopt, readFile() throws
* an error.
*/
ref<SourceAccessor> makeNarAccessor(Source & source, std::optional<std::function<bool(CanonPath&)>> loadContentsFilter = std::nullopt);

/**
* Create a NAR accessor from a NAR listing (in the format produced by
Expand Down
34 changes: 28 additions & 6 deletions src/libstore/s3-binary-cache-store.cc
Expand Up @@ -7,6 +7,7 @@
#include "globals.hh"
#include "compression.hh"
#include "filetransfer.hh"
#include "url.hh"

#include <aws/core/Aws.h>
#include <aws/core/VersionConfig.h>
Expand Down Expand Up @@ -347,12 +348,27 @@ struct S3BinaryCacheStoreImpl : virtual S3BinaryCacheStoreConfig, public virtual
void uploadFile(const std::string & path,
std::shared_ptr<std::basic_iostream<char>> istream,
const std::string & mimeType,
const std::string & contentEncoding)
const std::string & contentEncoding,
std::map<std::string, std::string> tags)
{
istream->seekg(0, istream->end);
auto size = istream->tellg();
istream->seekg(0, istream->beg);

std::map<std::string, std::string> actual_tags;
const std::string prefix = "nix:";
for (const auto& kv: tags) {
if (kv.first.size() > 128 - prefix.size()) {
printMsg(lvlWarn, "tag %s in %s is too long for s3 upload", kv.first, path);
continue;
}
if (kv.second.size() > 256) {
printMsg(lvlWarn, "tag value %s in %s is too long for s3 upload", kv.second, path);
continue;
}
actual_tags[prefix + kv.first] = kv.second;
}

auto maxThreads = std::thread::hardware_concurrency();

static std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor>
Expand Down Expand Up @@ -418,6 +434,11 @@ struct S3BinaryCacheStoreImpl : virtual S3BinaryCacheStoreConfig, public virtual
if (contentEncoding != "")
request.SetContentEncoding(contentEncoding);

if (!actual_tags.empty()) {
request.SetTagging(encodeQuery(actual_tags));
}


request.SetBody(istream);

auto result = checkAws(fmt("AWS error uploading '%s'", path),
Expand All @@ -440,7 +461,8 @@ struct S3BinaryCacheStoreImpl : virtual S3BinaryCacheStoreConfig, public virtual

void upsertFile(const std::string & path,
std::shared_ptr<std::basic_iostream<char>> istream,
const std::string & mimeType) override
const std::string & mimeType,
std::map<std::string, std::string> tags) override
{
auto compress = [&](std::string compression)
{
Expand All @@ -449,13 +471,13 @@ struct S3BinaryCacheStoreImpl : virtual S3BinaryCacheStoreConfig, public virtual
};

if (narinfoCompression != "" && hasSuffix(path, ".narinfo"))
uploadFile(path, compress(narinfoCompression), mimeType, narinfoCompression);
uploadFile(path, compress(narinfoCompression), mimeType, narinfoCompression, tags);
else if (lsCompression != "" && hasSuffix(path, ".ls"))
uploadFile(path, compress(lsCompression), mimeType, lsCompression);
uploadFile(path, compress(lsCompression), mimeType, lsCompression, tags);
else if (logCompression != "" && hasPrefix(path, "log/"))
uploadFile(path, compress(logCompression), mimeType, logCompression);
uploadFile(path, compress(logCompression), mimeType, logCompression, tags);
else
uploadFile(path, istream, mimeType, "");
uploadFile(path, istream, mimeType, "", tags);
}

void getFile(const std::string & path, Sink & sink) override
Expand Down
1 change: 1 addition & 0 deletions src/libutil/url.hh
Expand Up @@ -27,6 +27,7 @@ std::string percentDecode(std::string_view in);
std::string percentEncode(std::string_view s, std::string_view keep="");

std::map<std::string, std::string> decodeQuery(const std::string & query);
std::string encodeQuery(const std::map<std::string, std::string> & params);

ParsedURL parseURL(const std::string & url);

Expand Down

0 comments on commit 5bf011f

Please sign in to comment.