Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvs: Add src and dst namespace to kvs copy & move #1936

Merged
merged 6 commits into from Jan 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 6 additions & 4 deletions doc/man1/flux-kvs.adoc
Expand Up @@ -132,11 +132,13 @@ an RFC 11 snapshot reference.
Create an empty directory and commit the change. If 'key' exists,
it is overwritten.

*copy* 'source' 'destination'::
Copy 'source' key to 'destination' key. If a directory is copied, a new
reference is created; it is unnecessary for *copy* to recurse into 'source'.
*copy* [-S src-ns] [-D dst-ns] 'source' 'destination'::
Copy 'source' key to 'destination' key. Optionally, specify a source
and/or destination namespace to override the one specified via 'N'.
If a directory is copied, a new reference is created; it is
unnecessary for *copy* to recurse into 'source'.

*move* 'source' 'destination'::
*move* [-S src-ns] [-D dst-ns] 'source' 'destination'::
Like *copy*, but 'source' is unlinked after the copy.

*dropcache* [--all]::
Expand Down
28 changes: 24 additions & 4 deletions src/cmd/flux-kvs.c
Expand Up @@ -205,6 +205,16 @@ static struct optparse_option getroot_opts[] = {
OPTPARSE_TABLE_END
};

static struct optparse_option copy_opts[] = {
{ .name = "src-namespace", .key = 'S', .has_arg = 1,
.usage = "Specify source key's namespace",
},
{ .name = "dst-namespace", .key = 'D', .has_arg = 1,
.usage = "Specify destination key's namespace",
},
OPTPARSE_TABLE_END
};

static struct optparse_subcommand subcommands[] = {
{ "namespace-create",
"name [name...]",
Expand Down Expand Up @@ -288,14 +298,14 @@ static struct optparse_subcommand subcommands[] = {
"Copy source key to destination key",
cmd_copy,
0,
NULL
copy_opts
},
{ "move",
"source destination",
"Move source key to destination key",
cmd_move,
0,
NULL
copy_opts
},
{ "dropcache",
"[--all]",
Expand Down Expand Up @@ -1619,14 +1629,19 @@ int cmd_copy (optparse_t *p, int argc, char **argv)
int optindex;
flux_future_t *f;
const char *srckey, *dstkey;
const char *srcns, *dstns;

optindex = optparse_option_index (p);
if (optindex != (argc - 2))
log_msg_exit ("copy: specify srckey dstkey");

srcns = optparse_get_str (p, "src-namespace", NULL);
dstns = optparse_get_str (p, "dst-namespace", NULL);

srckey = argv[optindex];
dstkey = argv[optindex + 1];

if (!(f = flux_kvs_copy (h, srckey, dstkey, 0))
if (!(f = flux_kvs_copy (h, srcns, srckey, dstns, dstkey, 0))
|| flux_future_get (f, NULL) < 0)
log_err_exit ("flux_kvs_copy");
flux_future_destroy (f);
Expand All @@ -1640,16 +1655,21 @@ int cmd_move (optparse_t *p, int argc, char **argv)
int optindex;
flux_future_t *f;
const char *srckey, *dstkey;
const char *srcns, *dstns;

h = (flux_t *)optparse_get_data (p, "flux_handle");

optindex = optparse_option_index (p);
if (optindex != (argc - 2))
log_msg_exit ("move: specify srckey dstkey");

srcns = optparse_get_str (p, "src-namespace", NULL);
dstns = optparse_get_str (p, "dst-namespace", NULL);

srckey = argv[optindex];
dstkey = argv[optindex + 1];

if (!(f = flux_kvs_move (h, srckey, dstkey, 0))
if (!(f = flux_kvs_move (h, srcns, srckey, dstns, dstkey, 0))
|| flux_future_get (f, NULL) < 0)
log_err_exit ("flux_kvs_move");
flux_future_destroy (f);
Expand Down
2 changes: 2 additions & 0 deletions src/common/libkvs/Makefile.am
Expand Up @@ -16,12 +16,14 @@ noinst_LTLIBRARIES = libkvs.la
libkvs_la_SOURCES = \
kvs.c \
kvs_lookup.c \
kvs_lookup_private.h \
kvs_getroot.c \
kvs_dir.c \
kvs_dir_private.h \
kvs_classic.c \
kvs_watch.c \
kvs_commit.c \
kvs_commit_private.h \
kvs_txn.c \
kvs_txn_private.h \
treeobj.h \
Expand Down
21 changes: 15 additions & 6 deletions src/common/libkvs/kvs_commit.c
Expand Up @@ -46,19 +46,18 @@ flux_future_t *flux_kvs_fence (flux_t *h, int flags, const char *name,
"ops", ops);
}

flux_future_t *flux_kvs_commit (flux_t *h, int flags, flux_kvs_txn_t *txn)
flux_future_t *flux_kvs_commit_ns (flux_t *h,
const char *namespace,
int flags,
flux_kvs_txn_t *txn)
{
const char *namespace;
json_t *ops;

if (!txn) {
if (!txn || !namespace) {
errno = EINVAL;
return NULL;
}

if (!(namespace = flux_kvs_get_namespace (h)))
return NULL;

if (!(ops = txn_get_ops (txn))) {
errno = EINVAL;
return NULL;
Expand All @@ -71,6 +70,16 @@ flux_future_t *flux_kvs_commit (flux_t *h, int flags, flux_kvs_txn_t *txn)
"ops", ops);
}

flux_future_t *flux_kvs_commit (flux_t *h, int flags, flux_kvs_txn_t *txn)
{
const char *namespace;

if (!(namespace = flux_kvs_get_namespace (h)))
return NULL;

return flux_kvs_commit_ns (h, namespace, flags, txn);
}

/*
* vi:tabstop=4 shiftwidth=4 expandtab
*/
23 changes: 23 additions & 0 deletions src/common/libkvs/kvs_commit_private.h
@@ -0,0 +1,23 @@
/************************************************************\
* Copyright 2019 Lawrence Livermore National Security, LLC
* (c.f. AUTHORS, NOTICE.LLNS, COPYING)
*
* This file is part of the Flux resource manager framework.
* For details, see https://github.com/flux-framework.
*
* SPDX-License-Identifier: LGPL-3.0
\************************************************************/

#ifndef _KVS_COMMIT_PRIVATE_H
#define _KVS_COMMIT_PRIVATE_H

flux_future_t *flux_kvs_commit_ns (flux_t *h,
const char *namespace,
int flags,
flux_kvs_txn_t *txn);

#endif /* !_KVS_COMMIT_PRIVATE_H */

/*
* vi:tabstop=4 shiftwidth=4 expandtab
*/
77 changes: 60 additions & 17 deletions src/common/libkvs/kvs_copy.c
Expand Up @@ -21,33 +21,44 @@
#include <flux/core.h>

#include "kvs_copy.h"
#include "kvs_commit_private.h"
#include "kvs_lookup_private.h"

struct copy_context {
int commit_flags;
char *srcns;
char *srckey;
char *dstns;
char *dstkey;
};

static void copy_context_destroy (struct copy_context *ctx)
{
if (ctx) {
int saved_errno = errno;
free (ctx->srcns);
free (ctx->srckey);
free (ctx->dstns);
free (ctx->dstkey);
free (ctx);
errno = saved_errno;
}
}

static struct copy_context *copy_context_create (const char *srckey,
static struct copy_context *copy_context_create (const char *srcns,
const char *srckey,
const char *dstns,
const char *dstkey,
int commit_flags)
{
struct copy_context *ctx;

if (!(ctx = calloc (1, sizeof (*ctx))))
return NULL;
if (!(ctx->srckey = strdup (srckey)) || !(ctx->dstkey = strdup (dstkey))) {
if ((srcns && !(ctx->srcns = strdup (srcns)))
|| !(ctx->srckey = strdup (srckey))
|| (dstns && !(ctx->dstns = strdup (dstns)))
|| !(ctx->dstkey = strdup (dstkey))) {
copy_context_destroy (ctx);
return NULL;
}
Expand Down Expand Up @@ -76,8 +87,14 @@ static void copy_continuation (flux_future_t *f, void *arg)
goto error;
if (flux_kvs_txn_unlink (txn, 0, ctx->srckey) < 0)
goto error;
if (!(f2 = flux_kvs_commit (h, ctx->commit_flags, txn)))
goto error;
if (ctx->srcns) {
if (!(f2 = flux_kvs_commit_ns (h, ctx->srcns, ctx->commit_flags, txn)))
goto error;
}
else {
if (!(f2 = flux_kvs_commit (h, ctx->commit_flags, txn)))
goto error;
}
if (flux_future_continue (f, f2) < 0) {
flux_future_destroy (f2);
goto error;
Expand Down Expand Up @@ -110,8 +127,14 @@ static void lookup_continuation (flux_future_t *f, void *arg)
goto error;
if (flux_kvs_txn_put_treeobj (txn, 0, ctx->dstkey, val) < 0)
goto error;
if (!(f2 = flux_kvs_commit (h, ctx->commit_flags, txn)))
goto error;
if (ctx->dstns) {
if (!(f2 = flux_kvs_commit_ns (h, ctx->dstns, ctx->commit_flags, txn)))
goto error;
}
else {
if (!(f2 = flux_kvs_commit (h, ctx->commit_flags, txn)))
goto error;
}
if (flux_future_continue (f, f2) < 0) {
flux_future_destroy (f2);
goto error;
Expand All @@ -124,9 +147,12 @@ static void lookup_continuation (flux_future_t *f, void *arg)
flux_kvs_txn_destroy (txn);
}

flux_future_t *flux_kvs_copy (flux_t *h, const char *srckey,
const char *dstkey,
int commit_flags)
flux_future_t *flux_kvs_copy (flux_t *h,
const char *srcns,
const char *srckey,
const char *dstns,
const char *dstkey,
int commit_flags)
{
struct copy_context *ctx;
flux_future_t *f1;
Expand All @@ -136,9 +162,19 @@ flux_future_t *flux_kvs_copy (flux_t *h, const char *srckey,
errno = EINVAL;
return NULL;
}
if (!(f1 = flux_kvs_lookup (h, FLUX_KVS_TREEOBJ, srckey)))
return NULL;
if (!(ctx = copy_context_create (srckey, dstkey, commit_flags)))
if (srcns) {
if (!(f1 = flux_kvs_lookup_ns (h, srcns, FLUX_KVS_TREEOBJ, srckey)))
return NULL;
}
else {
if (!(f1 = flux_kvs_lookup (h, FLUX_KVS_TREEOBJ, srckey)))
return NULL;
}
if (!(ctx = copy_context_create (srcns,
srckey,
dstns,
dstkey,
commit_flags)))
goto error;
if (flux_aux_set (h, NULL, ctx, (flux_free_f)copy_context_destroy) < 0) {
copy_context_destroy (ctx);
Expand All @@ -152,9 +188,12 @@ flux_future_t *flux_kvs_copy (flux_t *h, const char *srckey,
return NULL;
}

flux_future_t *flux_kvs_move (flux_t *h, const char *srckey,
const char *dstkey,
int commit_flags)
flux_future_t *flux_kvs_move (flux_t *h,
const char *srcns,
const char *srckey,
const char *dstns,
const char *dstkey,
int commit_flags)
{
struct copy_context *ctx;
flux_future_t *f1;
Expand All @@ -164,9 +203,13 @@ flux_future_t *flux_kvs_move (flux_t *h, const char *srckey,
errno = EINVAL;
return NULL;
}
if (!(f1 = flux_kvs_copy (h, srckey, dstkey, commit_flags)))
if (!(f1 = flux_kvs_copy (h, srcns, srckey, dstns, dstkey, commit_flags)))
return NULL;
if (!(ctx = copy_context_create (srckey, dstkey, commit_flags)))
if (!(ctx = copy_context_create (srcns,
srckey,
dstns,
dstkey,
commit_flags)))
goto error;
if (flux_aux_set (h, NULL, ctx, (flux_free_f)copy_context_destroy) < 0) {
copy_context_destroy (ctx);
Expand Down
37 changes: 25 additions & 12 deletions src/common/libkvs/kvs_copy.h
Expand Up @@ -15,26 +15,39 @@
extern "C" {
#endif

/* Create a copy of 'srckey' at 'dstkey'.
/* Create a copy of 'srckey' at 'dstkey'. Read from / write to the
* specified namespaces. If a namespace is not specified (i.e. NULL),
* the namespace from flux_kvs_get_namespace() will be used.
*
* Due to the hash-tree design of the KVS, dstkey is by definition a
* "deep copy" (or writable snapshot) of all content below srckey.
* The copy operation has a low overhead since it only copies a single
* directory entry. 'srckey' and 'dstkey' may be in different namespaces.
* directory entry.
*
* Returns future on success, NULL on failure with errno set.
*/
flux_future_t *flux_kvs_copy (flux_t *h, const char *srckey,
const char *dstkey,
int commit_flags);
flux_future_t *flux_kvs_copy (flux_t *h,
const char *srcns,
const char *srckey,
const char *dstns,
const char *dstkey,
int commit_flags);

/* Move 'srckey' to 'dstkey'.
* This is a copy followed by an unlink on 'srckey'.
* 'srckey' and 'dstkey' may be in different namespaces.
* The copy and unlink are not atomic.
/* Move 'srckey' to 'dstkey'. Read from / write to the
* specified namespaces. If a namespace is not specified (i.e. NULL),
* the namespace from flux_kvs_get_namespace() will be used.
*
* This is a copy followed by an unlink on 'srckey'. The copy and
* unlink are not atomic.
*
* Returns future on success, NULL on failure with errno set.
*/
flux_future_t *flux_kvs_move (flux_t *h, const char *srckey,
const char *dstkey,
int commit_flags);
flux_future_t *flux_kvs_move (flux_t *h,
const char *srcns,
const char *srckey,
const char *dstns,
const char *dstkey,
int commit_flags);

#ifdef __cplusplus
}
Expand Down