Skip to content

Commit

Permalink
daos: dsync and dcmp support
Browse files Browse the repository at this point in the history
Added daos support to dsync.
Added daos support to dcmp.

`dsync.c`
- Added daos options
- Modified IO calls to use `mfu_file_*`

`dcmp.c`
-- Added daos options

`mfu_io`
- Added `mfu_file_rmdir` and `daos_rmdir`

`mfu_flist_unlink`
- Added `mfu_file_t` param

`mfu_daos.c`
- Fixed error checking for DAOS args

Signed-off-by: Dalton Bohning <daltonx.bohning@intel.com>
  • Loading branch information
Dalton Bohning authored and daltonbohning committed Jan 25, 2021
1 parent e6eb529 commit 950d387
Show file tree
Hide file tree
Showing 13 changed files with 523 additions and 151 deletions.
13 changes: 13 additions & 0 deletions doc/rst/dcmp.1.rst
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,19 @@ OPTIONS
"GB" can immediately follow the number without spaces (eg. 64MB).
The default chunksize is 1MB.

.. option:: --daos-prefix PREFIX

Specify the DAOS prefix to be used. This is only necessary
if copying a subset of a POSIX container in DAOS using a
Unified Namespace path.

.. option:: --daos-api API

Specify the DAOS API to be used. By default, the API is automatically
determined based on the container type, where POSIX containers use the
DFS API, and all other containers use the DAOS object API.
Values must be in {DFS, DAOS}.

.. option:: -s, --direct

Use O_DIRECT to avoid caching file data.
Expand Down
13 changes: 13 additions & 0 deletions doc/rst/dsync.1.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,19 @@ OPTIONS
"GB" can immediately follow the number without spaces (eg. 64MB).
The default chunksize is 64MB.

.. option:: --daos-prefix PREFIX

Specify the DAOS prefix to be used. This is only necessary
if copying a subset of a POSIX container in DAOS using a
Unified Namespace path.

.. option:: --daos-api API

Specify the DAOS API to be used. By default, the API is automatically
determined based on the container type, where POSIX containers use the
DFS API, and all other containers use the DAOS object API.
Values must be in {DFS, DAOS}.

.. option:: -c, --contents

Compare files byte-by-byte rather than checking size and mtime
Expand Down
25 changes: 20 additions & 5 deletions src/common/mfu_daos.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,14 @@ static int daos_check_args(
if (have_src_pool || have_src_cont || have_dst_pool || have_dst_cont
|| have_prefix) {
*flag_daos_args = 1;
} else {
}
if ((have_src_path && (strncmp(src_path, "daos:", 5) == 0)) ||
(have_dst_path && (strncmp(dst_path, "daos:", 5) == 0))) {
*flag_daos_args = 1;
}
if (*flag_daos_args == 0) {
return 0;
}
}

/* Determine whether the source and destination
* use the same pool and container */
Expand Down Expand Up @@ -143,7 +148,7 @@ static int daos_parse_path(
} else {
strcpy(path, dattr.da_rel_path);
}
} else if (strncmp(path, "daos://", 7) == 0) {
} else if (strncmp(path, "daos:", 5) == 0) {
/* Actual error, since we expect a daos path */
rc = -1;
} else {
Expand Down Expand Up @@ -246,7 +251,7 @@ static int daos_set_paths(
if (src_rc == 0) {
argpaths[0] = da->src_path = strdup(src_path);
} else if (src_rc == -1) {
MFU_LOG(MFU_LOG_ERR, "Failed to parse DAOS source path.");
MFU_LOG(MFU_LOG_ERR, "Failed to parse DAOS source path: daos://<pool>/<cont>[/<path>]");
return 1;
}
}
Expand All @@ -257,7 +262,7 @@ static int daos_set_paths(
if (dst_rc == 0) {
argpaths[1] = da->dst_path = strdup(dst_path);
} else if (dst_rc == -1) {
MFU_LOG(MFU_LOG_ERR, "Failed to parse DAOS destination path.");
MFU_LOG(MFU_LOG_ERR, "Failed to parse DAOS destination path: daos://<pool>/<cont>[/<path>]");
return 1;
}
}
Expand Down Expand Up @@ -703,6 +708,16 @@ int daos_setup(
local_daos_error = true;
}

/* Do a preliminary check on the DAOS args */
if (!local_daos_error) {
tmp_rc = daos_check_args(rank, argpaths, da, &flag_daos_args);
if (tmp_rc != 0) {
MFU_LOG(MFU_LOG_ERR, "Invalid DAOS args: "
MFU_ERRF, MFU_ERRP(-MFU_ERR_DAOS_INVAL_ARG));
local_daos_error = true;
}
}

/* Figure out if daos path is the src or dst,
* using UNS path, then chop off UNS path
* prefix since the path is mapped to the root
Expand Down
2 changes: 1 addition & 1 deletion src/common/mfu_flist.h
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ void mfu_flist_metadata_apply(

/* unlink all items in flist,
* if traceless=1, restore timestamps on parent directories after unlinking children */
void mfu_flist_unlink(mfu_flist flist, bool traceless);
void mfu_flist_unlink(mfu_flist flist, bool traceless, mfu_file_t* mfu_file);

typedef struct {
uid_t getuid; /* result from getuid */
Expand Down
88 changes: 43 additions & 45 deletions src/common/mfu_flist_remove.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,38 +64,33 @@ static void remove_progress_fn(const uint64_t* vals, int count, int complete, in

/* removes name by calling rmdir, unlink, or remove depending
* on item type */
static void remove_type(char type, const char* name)
static void remove_type(char type, const char* name, mfu_file_t* mfu_file)
{
/* TODO: don't print message if errno == ENOENT (file already gone) */
if (type == 'd') {
int rc = mfu_rmdir(name);
if (rc != 0) {
int rc = mfu_file_rmdir(name, mfu_file);
if (rc != 0 && errno != ENOENT) {
MFU_LOG(MFU_LOG_ERR, "Failed to rmdir `%s' (errno=%d %s)",
name, errno, strerror(errno)
);
name, errno, strerror(errno));
}
}
else if (type == 'f') {
int rc = mfu_unlink(name);
if (rc != 0) {
int rc = mfu_file_unlink(name, mfu_file);
if (rc != 0 && errno != ENOENT) {
MFU_LOG(MFU_LOG_ERR, "Failed to unlink `%s' (errno=%d %s)",
name, errno, strerror(errno)
);
name, errno, strerror(errno));
}
}
else if (type == 'u') {
int rc = remove(name);
if (rc != 0) {
int rc = mfu_file_remove(name, mfu_file);
if (rc != 0 && errno != ENOENT) {
MFU_LOG(MFU_LOG_ERR, "Failed to remove `%s' (errno=%d %s)",
name, errno, strerror(errno)
);
name, errno, strerror(errno));
}
}
else {
/* print error */
MFU_LOG(MFU_LOG_ERR, "Unknown type=%c name=%s",
type, name
);
type, name);
}

return;
Expand All @@ -106,7 +101,7 @@ static void remove_type(char type, const char* name)
****************************/

/* for given depth, just remove the files we know about */
static void remove_direct(mfu_flist list, uint64_t* rmcount)
static void remove_direct(mfu_flist list, uint64_t* rmcount, mfu_file_t* mfu_file)
{
/* each process directly removes its elements */
uint64_t idx;
Expand All @@ -120,13 +115,13 @@ static void remove_direct(mfu_flist list, uint64_t* rmcount)

/* delete item */
if (type == MFU_TYPE_DIR) {
remove_type('d', name);
remove_type('d', name, mfu_file);
}
else if (type == MFU_TYPE_FILE || type == MFU_TYPE_LINK) {
remove_type('f', name);
remove_type('f', name, mfu_file);
}
else {
remove_type('u', name);
remove_type('u', name, mfu_file);
}

/* increment number of items we have deleted
Expand All @@ -146,12 +141,12 @@ static void remove_direct(mfu_flist list, uint64_t* rmcount)

/* for given depth, evenly spread the files among processes for
* improved load balancing */
static void remove_spread(mfu_flist flist, uint64_t* rmcount)
static void remove_spread(mfu_flist flist, uint64_t* rmcount, mfu_file_t* mfu_file)
{
/* evenly spread flist among processes,
* execute direct delete, and free temp list */
mfu_flist newlist = mfu_flist_spread(flist);
remove_direct(newlist, rmcount);
remove_direct(newlist, rmcount, mfu_file);
mfu_flist_free(&newlist);
return;
}
Expand All @@ -163,14 +158,14 @@ static void remove_spread(mfu_flist flist, uint64_t* rmcount)
/* for given depth, evenly spread the files among processes for
* improved load balancing and sort items by path name to help
* cluster items in the same directory to the same process */
static void remove_spread_sort(mfu_flist flist, uint64_t* rmcount)
static void remove_spread_sort(mfu_flist flist, uint64_t* rmcount, mfu_file_t* mfu_file)
{
/* evenly spread flist among processes,
* sort by path name, execute direct delete, and free temp list */
mfu_flist spread = mfu_flist_spread(flist);
mfu_flist sorted = mfu_flist_sort("name", spread);

remove_direct(sorted, rmcount);
remove_direct(sorted, rmcount, mfu_file);

mfu_flist_free(&sorted);
mfu_flist_free(&spread);
Expand Down Expand Up @@ -203,13 +198,13 @@ static int map_name(mfu_flist flist, uint64_t idx, int ranks, const void* args)
return rank;
}

static void remove_map(mfu_flist list, uint64_t* rmcount)
static void remove_map(mfu_flist list, uint64_t* rmcount, mfu_file_t* mfu_file)
{
/* remap files based on parent directory */
mfu_flist newlist = mfu_flist_remap(list, map_name, NULL);

/* at this point, we can directly remove files in our list */
remove_direct(newlist, rmcount);
remove_direct(newlist, rmcount, mfu_file);

/* free list of remapped files */
mfu_flist_free(&newlist);
Expand All @@ -225,7 +220,7 @@ static void remove_map(mfu_flist list, uint64_t* rmcount)
/* for each depth, sort files by filename and then remove, to test
* whether it matters to limit the number of directories each process
* has to reference (e.g., locking) */
static void remove_sort(mfu_flist list, uint64_t* rmcount)
static void remove_sort(mfu_flist list, uint64_t* rmcount, mfu_file_t* mfu_file)
{
/* bail out if total count is 0 */
uint64_t all_count = mfu_flist_global_size(list);
Expand Down Expand Up @@ -298,7 +293,7 @@ static void remove_sort(mfu_flist list, uint64_t* rmcount)
ptr++;

/* delete item */
remove_type(type, name);
remove_type(type, name, mfu_file);
delcount++;
}

Expand Down Expand Up @@ -328,6 +323,7 @@ static void remove_sort(mfu_flist list, uint64_t* rmcount)
/* globals needed for libcircle callback routines */
static mfu_flist circle_list; /* list of items we're deleting */
static uint64_t circle_count; /* number of items local process has removed */
static mfu_file_t* circle_mfu_file; /* mfu_file for I/O functions */

static void remove_create(CIRCLE_handle* handle)
{
Expand Down Expand Up @@ -375,19 +371,20 @@ static void remove_process(CIRCLE_handle* handle)

char item = path[0];
char* name = &path[1];
remove_type(item, name);
remove_type(item, name, circle_mfu_file);
circle_count++;

return;
}

/* insert all items to be removed into libcircle for
* dynamic load balancing */
static void remove_libcircle(mfu_flist list, uint64_t* rmcount)
static void remove_libcircle(mfu_flist list, uint64_t* rmcount, mfu_file_t* mfu_file)
{
/* set globals for libcircle callbacks */
circle_list = list;
circle_count = 0;
circle_mfu_file = mfu_file;

/* initialize libcircle */
CIRCLE_init(0, NULL, CIRCLE_SPLIT_EQUAL | CIRCLE_CREATE_GLOBAL | CIRCLE_TERM_TREE);
Expand Down Expand Up @@ -479,24 +476,24 @@ static mfu_remove_algos select_algo(void)
return algo;
}

static void remove_by_algo(mfu_remove_algos algo, mfu_flist flist, uint64_t* count)
static void remove_by_algo(mfu_remove_algos algo, mfu_flist flist, uint64_t* count, mfu_file_t* mfu_file)
{
switch (algo) {
case DIRECT:
remove_direct(flist, count);
remove_direct(flist, count, mfu_file);
break;
case SPREAD:
remove_spread(flist, count);
remove_spread(flist, count, mfu_file);
break;
case MAP:
remove_map(flist, count);
remove_map(flist, count, mfu_file);
break;
case SORT:
//remove_sort(flist, count);
remove_spread_sort(flist, count);
remove_spread_sort(flist, count, mfu_file);
break;
case LIBCIRCLE:
remove_libcircle(flist, count);
remove_libcircle(flist, count, mfu_file);
break;
}

Expand All @@ -506,12 +503,12 @@ static void remove_by_algo(mfu_remove_algos algo, mfu_flist flist, uint64_t* cou
/* removes list of items, sets write bits on directories from
* top-to-bottom, then removes items one level at a time starting
* from the deepest */
void mfu_flist_unlink(mfu_flist flist, bool traceless)
void mfu_flist_unlink(mfu_flist flist, bool traceless, mfu_file_t* mfu_file)
{
uint64_t idx;

/* allow override algorithm choice via environment variable */
mfu_remove_algos algo = select_algo();;
mfu_remove_algos algo = select_algo();

/* wait for all tasks and start timer */
MPI_Barrier(MPI_COMM_WORLD);
Expand Down Expand Up @@ -567,9 +564,10 @@ void mfu_flist_unlink(mfu_flist flist, bool traceless)
/* stat the parent directory */
struct stat st;
char* pdir = strings[idx];
int status = mfu_lstat(pdir, &st);
int status = mfu_file_lstat(pdir, &st, mfu_file);
if (status != 0) {
MFU_LOG(MFU_LOG_DBG, "mfu_lstat(%s): %d", pdir, status);
MFU_LOG(MFU_LOG_DBG, "mfu_file_lstat() file: '%s' (errno=%d %s)",
pdir, errno, strerror(errno));
continue;
}

Expand Down Expand Up @@ -644,7 +642,7 @@ void mfu_flist_unlink(mfu_flist flist, bool traceless)
/* set the bit if needed */
if (set_write_bit) {
const char* name = mfu_flist_file_get_name(list, idx);
int rc = chmod(name, S_IRWXU);
int rc = mfu_file_chmod(name, S_IRWXU, mfu_file);
if (rc != 0) {
MFU_LOG(MFU_LOG_ERR, "Failed to chmod directory `%s' (errno=%d %s)",
name, errno, strerror(errno)
Expand All @@ -665,7 +663,7 @@ void mfu_flist_unlink(mfu_flist flist, bool traceless)

/* remove all non directory (leaf) items */
uint64_t count = 0;
remove_by_algo(algo, flist_nondirs, &count);
remove_by_algo(algo, flist_nondirs, &count, mfu_file);

/* remove directories starting from deepest level */
int level;
Expand All @@ -675,7 +673,7 @@ void mfu_flist_unlink(mfu_flist flist, bool traceless)

/* remove items at this level */
uint64_t count = 0;
remove_by_algo(algo, list, &count);
remove_by_algo(algo, list, &count, mfu_file);

/* wait for all procs to finish before we start
* with items at next level */
Expand Down Expand Up @@ -705,9 +703,9 @@ void mfu_flist_unlink(mfu_flist flist, bool traceless)

/* restore timestamps */
const char* pdir = mfu_flist_file_get_name(newlist, idx);
if(mfu_utimensat(AT_FDCWD, pdir, times, AT_SYMLINK_NOFOLLOW) != 0) {
if(mfu_file_utimensat(AT_FDCWD, pdir, times, AT_SYMLINK_NOFOLLOW, mfu_file) != 0) {
MFU_LOG(MFU_LOG_DBG,
"Failed to changeback timestamps with utimesat() `%s' (errno=%d %s)",
"Failed to changeback timestamps with mfu_file_utimesat() `%s' (errno=%d %s)",
pdir, errno, strerror(errno)
);
}
Expand Down

0 comments on commit 950d387

Please sign in to comment.