Skip to content

Commit

Permalink
Add support for s3:// URIs
Browse files Browse the repository at this point in the history
This adds support for s3:// URIs in all places where Nix allows URIs,
e.g. in builtins.fetchurl, builtins.fetchTarball, <nix/fetchurl.nix>
and NIX_PATH. It allows fetching resources from private S3 buckets,
using credentials obtained from the standard places (i.e. AWS_*
environment variables, ~/.aws/credentials and the EC2 metadata
server). This may not be super-useful in general, but since we already
depend on aws-sdk-cpp, it's a cheap feature to add.
  • Loading branch information
edolstra committed Feb 14, 2017
1 parent 62ff5ad commit 9ff9c3f
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 63 deletions.
28 changes: 27 additions & 1 deletion src/libstore/download.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "hash.hh"
#include "store-api.hh"
#include "archive.hh"
#include "s3.hh"

#include <unistd.h>
#include <fcntl.h>
Expand Down Expand Up @@ -480,6 +481,31 @@ struct CurlDownloader : public Downloader
std::function<void(const DownloadResult &)> success,
std::function<void(std::exception_ptr exc)> failure) override
{
/* Ugly hack to support s3:// URIs. */
if (hasPrefix(request.uri, "s3://")) {
// FIXME: do this on a worker thread
sync2async<DownloadResult>(success, failure, [&]() {
#ifdef ENABLE_S3
S3Helper s3Helper;
auto slash = request.uri.find('/', 5);
if (slash == std::string::npos)
throw nix::Error("bad S3 URI ‘%s’", request.uri);
std::string bucketName(request.uri, 5, slash - 5);
std::string key(request.uri, slash + 1);
// FIXME: implement ETag
auto s3Res = s3Helper.getObject(bucketName, key);
DownloadResult res;
if (!s3Res.data)
throw DownloadError(NotFound, fmt("S3 object ‘%s’ does not exist", request.uri));
res.data = s3Res.data;
return res;
#else
throw nix::Error("cannot download ‘%s’ because Nix is not built with S3 support", request.uri);
#endif
});
return;
}

auto item = std::make_shared<DownloadItem>(*this, request);
item->success = success;
item->failure = failure;
Expand Down Expand Up @@ -629,7 +655,7 @@ bool isUri(const string & s)
size_t pos = s.find("://");
if (pos == string::npos) return false;
string scheme(s, 0, pos);
return scheme == "http" || scheme == "https" || scheme == "file" || scheme == "channel" || scheme == "git";
return scheme == "http" || scheme == "https" || scheme == "file" || scheme == "channel" || scheme == "git" || scheme == "s3";
}


Expand Down
2 changes: 1 addition & 1 deletion src/libstore/download.hh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ struct DownloadRequest

struct DownloadResult
{
bool cached;
bool cached = false;
std::string etag;
std::string effectiveUrl;
std::shared_ptr<std::string> data;
Expand Down
141 changes: 80 additions & 61 deletions src/libstore/s3-binary-cache-store.cc
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#if ENABLE_S3
#if __linux__

#include "s3.hh"
#include "s3-binary-cache-store.hh"
#include "nar-info.hh"
#include "nar-info-disk-cache.hh"
Expand All @@ -18,15 +18,6 @@

namespace nix {

struct istringstream_nocopy : public std::stringstream
{
istringstream_nocopy(const std::string & s)
{
rdbuf()->pubsetbuf(
(char *) s.data(), s.size());
}
};

struct S3Error : public Error
{
Aws::S3::S3Errors err;
Expand Down Expand Up @@ -60,21 +51,81 @@ static void initAWS()
});
}

S3Helper::S3Helper()
: config(makeConfig())
, client(make_ref<Aws::S3::S3Client>(*config))
{
}

ref<Aws::Client::ClientConfiguration> S3Helper::makeConfig()
{
initAWS();
auto res = make_ref<Aws::Client::ClientConfiguration>();
res->region = Aws::Region::US_EAST_1; // FIXME: make configurable
res->requestTimeoutMs = 600 * 1000;
return res;
}

S3Helper::DownloadResult S3Helper::getObject(
const std::string & bucketName, const std::string & key)
{
debug("fetching ‘s3://%s/%s’...", bucketName, key);

auto request =
Aws::S3::Model::GetObjectRequest()
.WithBucket(bucketName)
.WithKey(key);

request.SetResponseStreamFactory([&]() {
return Aws::New<std::stringstream>("STRINGSTREAM");
});

DownloadResult res;

auto now1 = std::chrono::steady_clock::now();

try {

auto result = checkAws(fmt("AWS error fetching ‘%s’", key),
client->GetObject(request));

res.data = std::make_shared<std::string>(
dynamic_cast<std::stringstream &>(result.GetBody()).str());

} catch (S3Error & e) {
if (e.err != Aws::S3::S3Errors::NO_SUCH_KEY) throw;
}

auto now2 = std::chrono::steady_clock::now();

res.durationMs = std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count();

return res;
}

#if __linux__

This comment has been minimized.

Copy link
@ip1981

ip1981 Sep 28, 2017

Why is it linux-only?

This comment has been minimized.

Copy link
@edolstra

edolstra Sep 28, 2017

Author Member

I don't think that this is the case anymore.


struct istringstream_nocopy : public std::stringstream
{
istringstream_nocopy(const std::string & s)
{
rdbuf()->pubsetbuf(
(char *) s.data(), s.size());
}
};

struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
{
std::string bucketName;

ref<Aws::Client::ClientConfiguration> config;
ref<Aws::S3::S3Client> client;

Stats stats;

S3Helper s3Helper;

S3BinaryCacheStoreImpl(
const Params & params, const std::string & bucketName)
: S3BinaryCacheStore(params)
, bucketName(bucketName)
, config(makeConfig())
, client(make_ref<Aws::S3::S3Client>(*config))
{
diskCache = getNarInfoDiskCache();
}
Expand All @@ -84,31 +135,22 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
return "s3://" + bucketName;
}

ref<Aws::Client::ClientConfiguration> makeConfig()
{
initAWS();
auto res = make_ref<Aws::Client::ClientConfiguration>();
res->region = Aws::Region::US_EAST_1; // FIXME: make configurable
res->requestTimeoutMs = 600 * 1000;
return res;
}

void init() override
{
if (!diskCache->cacheExists(getUri(), wantMassQuery_, priority)) {

/* Create the bucket if it doesn't already exists. */
// FIXME: HeadBucket would be more appropriate, but doesn't return
// an easily parsed 404 message.
auto res = client->GetBucketLocation(
auto res = s3Helper.client->GetBucketLocation(
Aws::S3::Model::GetBucketLocationRequest().WithBucket(bucketName));

if (!res.IsSuccess()) {
if (res.GetError().GetErrorType() != Aws::S3::S3Errors::NO_SUCH_BUCKET)
throw Error(format("AWS error checking bucket ‘%s’: %s") % bucketName % res.GetError().GetMessage());

checkAws(format("AWS error creating bucket ‘%s’") % bucketName,
client->CreateBucket(
s3Helper.client->CreateBucket(
Aws::S3::Model::CreateBucketRequest()
.WithBucket(bucketName)
.WithCreateBucketConfiguration(
Expand Down Expand Up @@ -146,7 +188,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
{
stats.head++;

auto res = client->HeadObject(
auto res = s3Helper.client->HeadObject(
Aws::S3::Model::HeadObjectRequest()
.WithBucket(bucketName)
.WithKey(path));
Expand Down Expand Up @@ -179,7 +221,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
auto now1 = std::chrono::steady_clock::now();

auto result = checkAws(format("AWS error uploading ‘%s’") % path,
client->PutObject(request));
s3Helper.client->PutObject(request));

auto now2 = std::chrono::steady_clock::now();

Expand All @@ -198,42 +240,18 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
sync2async<std::shared_ptr<std::string>>(success, failure, [&]() {
debug(format("fetching ‘s3://%1%/%2%’...") % bucketName % path);

auto request =
Aws::S3::Model::GetObjectRequest()
.WithBucket(bucketName)
.WithKey(path);

request.SetResponseStreamFactory([&]() {
return Aws::New<std::stringstream>("STRINGSTREAM");
});

stats.get++;

try {

auto now1 = std::chrono::steady_clock::now();

auto result = checkAws(format("AWS error fetching ‘%s’") % path,
client->GetObject(request));

auto now2 = std::chrono::steady_clock::now();
auto res = s3Helper.getObject(bucketName, path);

auto res = dynamic_cast<std::stringstream &>(result.GetBody()).str();
stats.getBytes += res.data ? res.data->size() : 0;
stats.getTimeMs += res.durationMs;

auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count();
if (res.data)
printTalkative("downloaded ‘s3://%s/%s’ (%d bytes) in %d ms",
bucketName, path, res.data->size(), res.durationMs);

printMsg(lvlTalkative, format("downloaded ‘s3://%1%/%2%’ (%3% bytes) in %4% ms")
% bucketName % path % res.size() % duration);

stats.getBytes += res.size();
stats.getTimeMs += duration;

return std::make_shared<std::string>(res);

} catch (S3Error & e) {
if (e.err == Aws::S3::S3Errors::NO_SUCH_KEY) return std::shared_ptr<std::string>();
throw;
}
return res.data;
});
}

Expand All @@ -246,7 +264,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
debug(format("listing bucket ‘s3://%s’ from key ‘%s’...") % bucketName % marker);

auto res = checkAws(format("AWS error listing bucket ‘%s’") % bucketName,
client->ListObjects(
s3Helper.client->ListObjects(
Aws::S3::Model::ListObjectsRequest()
.WithBucket(bucketName)
.WithDelimiter("/")
Expand Down Expand Up @@ -281,7 +299,8 @@ static RegisterStoreImplementation regStore([](
return store;
});

#endif

}

#endif
#endif
33 changes: 33 additions & 0 deletions src/libstore/s3.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#pragma once

#if ENABLE_S3

#include "ref.hh"

namespace Aws { namespace Client { class ClientConfiguration; } }
namespace Aws { namespace S3 { class S3Client; } }

namespace nix {

struct S3Helper
{
ref<Aws::Client::ClientConfiguration> config;
ref<Aws::S3::S3Client> client;

S3Helper();

ref<Aws::Client::ClientConfiguration> makeConfig();

struct DownloadResult
{
std::shared_ptr<std::string> data;
unsigned int durationMs;
};

DownloadResult getObject(
const std::string & bucketName, const std::string & key);
};

}

#endif
1 change: 1 addition & 0 deletions src/libutil/logging.hh
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ extern Verbosity verbosity; /* suppress msgs > this */

#define printError(args...) printMsg(lvlError, args)
#define printInfo(args...) printMsg(lvlInfo, args)
#define printTalkative(args...) printMsg(lvlTalkative, args)
#define debug(args...) printMsg(lvlDebug, args)
#define vomit(args...) printMsg(lvlVomit, args)

Expand Down

4 comments on commit 9ff9c3f

@copumpkin
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edolstra this is very cool, thanks. Can binary cache retrievals also use this?

@edolstra
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Binary cache retrievals don't need this because you can already use s3://<bucketName> as a binary cache.

@copumpkin
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, cool, didn't realize that already worked. I'll have to try that out, thanks again!

@nixos-discourse
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This commit has been mentioned on NixOS Discourse. There might be relevant details there:

https://discourse.nixos.org/t/creating-a-derivation-from-aws-s3-files/16869/2

Please sign in to comment.