Skip to content

Commit

Permalink
Merge pull request #3067 from garrettbslone/content-s3
Browse files Browse the repository at this point in the history
content-s3: add toml config hook
  • Loading branch information
mergify[bot] committed Aug 7, 2020
2 parents 17df5bc + 960e1e5 commit e1d9de7
Show file tree
Hide file tree
Showing 8 changed files with 369 additions and 37 deletions.
248 changes: 224 additions & 24 deletions src/modules/content-s3/content-s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,19 @@
#if HAVE_CONFIG_H
#include "config.h"
#endif
#include <jansson.h>
#include <flux/core.h>

#include "src/common/libutil/blobref.h"
#include "src/common/libutil/log.h"

#include "src/common/libcontent/content-util.h"

#include "src/common/libtomlc99/toml.h"
#include "src/common/libutil/tomltk.h"

#include "src/common/libyuarel/yuarel.h"

#include "s3.h"

struct content_s3 {
Expand All @@ -27,6 +33,216 @@ struct content_s3 {
const char *hashfun;
};

static void s3_config_destroy (struct s3_config *ctx)
{
if (ctx) {
int saved_errno = errno;
free (ctx->bucket);
free (ctx->hostname);
free (ctx->access_key);
free (ctx->secret_key);
free (ctx);
errno = saved_errno;
}
}

/* Destroy module context.
*/
static void content_s3_destroy (struct content_s3 *ctx)
{
if (ctx) {
int saved_errno = errno;
flux_msg_handler_delvec (ctx->handlers);
s3_config_destroy (ctx->cfg);
free (ctx);
errno = saved_errno;
}

s3_cleanup ();
}

static int parse_credentials (struct s3_config *cfg,
const char *cred_file,
char *errbuff,
int eb_size)
{
struct tomltk_error toml_error;
toml_table_t *tbl;
const char *raw;
char *access_key;
char *secret_key;
int saved_errno;

if (!(tbl = tomltk_parse_file (cred_file, &toml_error))) {
errno = EINVAL;
snprintf (errbuff, eb_size, "toml parse failed: %s", toml_error.errbuf);
goto error;
}

if (!(raw = toml_raw_in (tbl, "secret-access-key"))) {
errno = EINVAL;
snprintf (errbuff, eb_size, "failed to parse secret key");
goto error;
}

if (toml_rtos (raw, &secret_key)) {
errno = EINVAL;
snprintf (errbuff, eb_size, "failed to parse secret key");
goto error;
}

if (!(raw = toml_raw_in (tbl, "access-key-id"))) {
free (secret_key);
errno = EINVAL;
snprintf (errbuff, eb_size, "failed to parse access key");
goto error;
}

if (toml_rtos (raw, &access_key)) {
free (secret_key);
errno = EINVAL;
snprintf (errbuff, eb_size, "failed to parse access key");
goto error;
}

cfg->secret_key = secret_key;
cfg->access_key = access_key;

return 0;

error:
saved_errno = errno;
toml_free (tbl);
errno = saved_errno;
return -1;
}

static char *hostport (const char *host, int port)
{
char *s;
if (port == 0) {
if (!(s = strdup (host)))
return NULL;
}
else {
if (asprintf (&s, "%s:%d", host, port) < 0)
return NULL;
}
return s;
}

static struct s3_config *parse_config (const flux_conf_t *conf,
char *errbuff,
int eb_size)
{
struct s3_config *cfg;
flux_conf_error_t error;
const char *uri = NULL;
const char *bucket = NULL;
const char *cred_file = NULL;
int is_virtual_host = 0;
struct yuarel yuri;
char *cpy = NULL;
int saved_errno;

if (!(cfg = calloc (1, sizeof (*cfg))))
return NULL;

cfg->retries = 5;
cfg->is_secure = 0;

if (flux_conf_unpack (conf,
&error,
"{s:{s:s, s:s, s:s, s?:b !} }",
"content-s3",
"credential-file",
&cred_file,
"bucket",
&bucket,
"uri",
&uri,
"virtual-host-style",
&is_virtual_host) < 0) {
snprintf(errbuff, eb_size, "%s", error.errbuf);
goto error;
}

if (!(cpy = strdup (uri)))
goto error;

if (yuarel_parse (&yuri, cpy) < 0) {
snprintf(errbuff, eb_size, "failed to parse uri");
errno = EINVAL;
goto error;
}

if (!(cfg->hostname = hostport (yuri.host, yuri.port))) {
snprintf(errbuff, eb_size, "failed to form hostname");
errno = ENOMEM;
goto error;
}

if (!(cfg->bucket = strdup (bucket)))
goto error;

if (!strncmp (yuri.scheme, "https", 5))
cfg->is_secure = 1;

cfg->is_virtual_host = is_virtual_host;

if (parse_credentials (cfg, cred_file, errbuff, eb_size))
goto error;

free (cpy);
return cfg;

error:
saved_errno = errno;
free (cpy);
errno = saved_errno;
s3_config_destroy (cfg);
return NULL;
}

/* Broker is sending us a new config object because 'flux config reload'
* was run. Parse it and respond with human readable errors.
* If events are posted, block until they complete so that:
* - any KVS commit errors are captured by 'flux config reload'
* - tests can look for eventlog entry after running 'flux config reload'
*/
static void config_reload_cb (flux_t *h,
flux_msg_handler_t *mh,
const flux_msg_t *msg,
void *arg)
{
struct s3_config *cfg;
const flux_conf_t *conf;
char errbuf[256];
const char *errstr = NULL;

if (flux_conf_reload_decode (msg, &conf) < 0)
goto error;
if (!(cfg = parse_config (conf, errbuf, sizeof (errbuf))) ){
errstr = errbuf;
goto error;
}
free (cfg);
flux_log (h, LOG_WARNING, "config-reload: changes will not take effect until next flux restart");

if (flux_set_conf (h, flux_conf_incref (conf)) < 0) {
errstr = "error updating cached configuration";
goto error;
}
if (flux_respond (h, msg, NULL) < 0)
flux_log_error (h, "error responding to config-reload request");

return;

error:
if (flux_respond_error (h, msg, errno, errstr) < 0)
flux_log_error (h, "error responding to config-reload request");
}

/* Handle a content-backing.load request from the rank 0 broker's
* content-cache service. The raw request payload is a blobref string,
* including NULL terminator. The raw response payload is the blob content.
Expand Down Expand Up @@ -178,30 +394,17 @@ static const struct flux_msg_handler_spec htab[] = {
{ FLUX_MSGTYPE_REQUEST, "content-backing.store", store_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "kvs-checkpoint.get", checkpoint_get_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "kvs-checkpoint.put", checkpoint_put_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "content-s3.config-reload", config_reload_cb, 0 },
FLUX_MSGHANDLER_TABLE_END,
};

/* Destroy module context.
*/
static void content_s3_destroy (struct content_s3 *ctx)
{
if(ctx) {
int saved_errno = errno;
flux_msg_handler_delvec (ctx->handlers);
free (ctx->cfg);
free (ctx);
errno = saved_errno;
}

s3_cleanup();
}

/* Create the s3 context, initalize the connection, and
* create the working bucket
*/
static struct content_s3 *content_s3_create (flux_t *h)
{
const char *errstr = NULL;
char errbuff[256];
struct content_s3 *ctx;

if (!(ctx = calloc (1, sizeof (*ctx))))
Expand All @@ -213,14 +416,11 @@ static struct content_s3 *content_s3_create (flux_t *h)
goto error;
}

if (!(ctx->cfg = calloc (1, sizeof (*ctx->cfg))))
if (!(ctx->cfg = parse_config (flux_get_conf (h), errbuff, sizeof (errbuff)))) {
errstr = errbuff;
flux_log (h, LOG_ERR, "content-s3 parsing config file: %s", errstr);
goto error;

ctx->cfg->retries = 5;
ctx->cfg->bucket = getenv("S3_BUCKET");
ctx->cfg->access_key = getenv("S3_ACCESS_KEY_ID");
ctx->cfg->secret_key = getenv("S3_SECRET_ACCESS_KEY");
ctx->cfg->hostname = getenv("S3_HOSTNAME");
}

if (s3_init (ctx->cfg, &errstr) < 0) {
flux_log_error (h, "content-s3 init");
Expand All @@ -238,8 +438,8 @@ static struct content_s3 *content_s3_create (flux_t *h)
return ctx;

error:
content_s3_destroy(ctx);
return ctx;
content_s3_destroy (ctx);
return NULL;
}

int mod_main (flux_t *h, int argc, char **argv)
Expand Down
13 changes: 9 additions & 4 deletions src/modules/content-s3/s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,19 @@ int s3_init (struct s3_config *cfg, const char **errstr)
{
S3Status status = S3_initialize ("s3", S3_INIT_ALL, cfg->hostname);

if (cfg->is_virtual_host)
uri_style = S3UriStyleVirtualHost;
if (cfg->is_secure)
protocol = S3ProtocolHTTPS;

if (status != S3StatusOK) {
errno = ECONNREFUSED;
if (errstr)
*errstr = S3_get_status_name (status);

return -1;
}

return 0;
}

Expand Down Expand Up @@ -151,7 +156,7 @@ int s3_put (struct s3_config *cfg, const char *key, const void *data, size_t siz
{
int retries = cfg->retries;
S3Status status = S3StatusOK;

S3ResponseHandler resp_hndl = {
.propertiesCallback = &response_props_cb,
.completeCallback = &response_complete_cb
Expand Down Expand Up @@ -240,7 +245,7 @@ int s3_get (struct s3_config *cfg, const char *key, void **datap, size_t *sizep,
.count = 0,
.status = status
};

if (strlen (key) == 0 || strchr (key, '/') || !strcmp (key, "..") || !strcmp (key, ".")) {
errno = EINVAL;
if (errstr)
Expand All @@ -250,7 +255,7 @@ int s3_get (struct s3_config *cfg, const char *key, void **datap, size_t *sizep,
}

do {
S3_get_object (&bucket_ctx,
S3_get_object (&bucket_ctx,
key,
NULL, // getConditions (NULL for none)
0, // startByte
Expand Down
2 changes: 2 additions & 0 deletions src/modules/content-s3/s3.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
struct s3_config {
int retries; // number of times to retry each operation
int is_secure;
int is_virtual_host;
char *bucket; // the bucket name for the instance to use
char *access_key; // access key id string
char *secret_key; // secret access key id string
Expand Down
2 changes: 2 additions & 0 deletions src/modules/content-s3/test/load.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ int main (int argc, char **argv)
fprintf(stderr, "calloc error");

cfg->retries = 5;
cfg->is_virtual_host = 0;
cfg->is_secure = 0;
cfg->bucket = getenv("S3_BUCKET");
cfg->access_key = getenv("S3_ACCESS_KEY_ID");
cfg->secret_key = getenv("S3_SECRET_ACCESS_KEY");
Expand Down
2 changes: 2 additions & 0 deletions src/modules/content-s3/test/store.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ int main (int argc, char **argv)
fprintf(stderr, "calloc error");

cfg->retries = 5;
cfg->is_virtual_host = 0;
cfg->is_secure = 0;
cfg->bucket = getenv("S3_BUCKET");
cfg->access_key = getenv("S3_ACCESS_KEY_ID");
cfg->secret_key = getenv("S3_SECRET_ACCESS_KEY");
Expand Down

0 comments on commit e1d9de7

Please sign in to comment.