Skip to content

Commit

Permalink
rename --synchronous to --direct, handle O_DIRECT short read/writes
Browse files Browse the repository at this point in the history
Signed-off-by: Adam Moody <moody20@llnl.gov>
  • Loading branch information
adammoody committed Sep 22, 2020
1 parent 867cb5e commit 492b278
Show file tree
Hide file tree
Showing 12 changed files with 531 additions and 286 deletions.
4 changes: 4 additions & 0 deletions doc/rst/dcmp.1.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ OPTIONS

Enable base checks and normal stdout results when --output is used.

.. option:: -s, --direct

Use O_DIRECT to avoid caching file data.

.. option:: --progress N

Print progress message to stdout approximately every N seconds.
Expand Down
5 changes: 2 additions & 3 deletions doc/rst/dcp.1.rst
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,9 @@ OPTIONS

Preserve permissions, group, timestamps, and extended attributes.

.. option:: -s, --synchronous
.. option:: -s, --direct

Use synchronous read/write calls (open files with O_DIRECT).
This also avoids caching the file data on the client nodes.
Use O_DIRECT to avoid caching file data.

.. option:: -S, --sparse

Expand Down
4 changes: 4 additions & 0 deletions doc/rst/dsync.1.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ OPTIONS

Delete extraneous files from destination.

.. option:: -s, --direct

Use O_DIRECT to avoid caching file data.

.. option:: --link-dest DIR

Create hardlink in DEST to files in DIR when file is unchanged
Expand Down
216 changes: 89 additions & 127 deletions src/common/mfu_flist_copy.c
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,14 @@ static void mfu_copy_open_file(const char* file, int read_flag,
/* open the new file */
if (read_flag) {
int flags = O_RDONLY;
if (mfu_copy_opts->synchronous) {
if (mfu_copy_opts->direct) {
flags |= O_DIRECT;
}
/* set obj or fd here */
mfu_file_open(file, flags, mfu_file);
} else {
int flags = O_WRONLY | O_CREAT;
if (mfu_copy_opts->synchronous) {
if (mfu_copy_opts->direct) {
flags |= O_DIRECT;
}
/* set obj or fd here */
Expand Down Expand Up @@ -1556,30 +1556,6 @@ static int mfu_is_all_null(const char* buf, uint64_t buf_size)
return 1;
}

/* when using sparse files, we need to write the last byte if the
* hole is adjacent to EOF, so we need to detect whether we're at
* the end of the file */
static int mfu_is_eof(const char* file, mfu_file_t* mfu_file)
{
/* read one byte from fd to determine whether this is EOF.
* This is not efficient, but it is the only reliable way */
char buf[1];
ssize_t num_of_bytes_read = mfu_file_read(file, buf, 1, mfu_file);

/* return if we detect EOF */
if(! num_of_bytes_read) {
return 1;
}

/* otherwise, we're not at EOF yet, seek back one byte */
if(mfu_file_lseek(file, mfu_file, -1, SEEK_CUR) == (off_t)-1) {
MFU_LOG(MFU_LOG_ERR, "Couldn't seek in path `%s' (errno=%d %s)",
file, errno, strerror(errno));
return -1;
}
return 0;
}

static int mfu_copy_file_normal(
const char* src,
const char* dest,
Expand All @@ -1590,129 +1566,124 @@ static int mfu_copy_file_normal(
mfu_file_t* mfu_src_file,
mfu_file_t* mfu_dst_file)
{
/* There is no equivalent "lseek" in daos, it is part of read, so skip that
* for a DAOS path */
/* seek to offset in source file */
if(mfu_file_lseek(src, mfu_src_file, offset, SEEK_SET) == (off_t)-1) {
MFU_LOG(MFU_LOG_ERR, "Couldn't seek in source path `%s' (errno=%d %s)", src, errno, strerror(errno));
return -1;
}

/* seek to offset in destination file */
if (mfu_file_lseek(dest, mfu_dst_file, offset, SEEK_SET) == (off_t)-1) {
MFU_LOG(MFU_LOG_ERR, "Couldn't seek in destination path `%s' (errno=%d %s)", dest, errno, strerror(errno));
return -1;
}

/* set buffer and buffer size */
size_t buf_size = mfu_copy_opts->block_size;
void* buf = mfu_copy_opts->block_buf1;

/* for O_DIRECT, check that length is multiple of block_size */
if (mfu_copy_opts->direct && /* using O_DIRECT */
offset + length < file_size && /* not at end of file */
length % buf_size != 0) /* length not an integer multiple of block size */
{
MFU_ABORT(-1, "O_DIRECT requires chunk size to be integer multiple of block size %llu",
buf_size);
}

/* initialize our starting offset within the file */
off_t off = offset;

/* write data */
size_t total_bytes = 0;
while(total_bytes < (size_t)length) {
/* determine number of bytes that we
* can read = max(buf size, remaining chunk) */
size_t left_to_read = (size_t)length - total_bytes;

if(left_to_read > buf_size) {
left_to_read = buf_size;
uint64_t total_bytes = 0;
while (total_bytes < length) {
/* determine number of bytes to read,
* O_DIRECT requires read operation of certain size blocks,
* even if we know that would run past the end of the file */
size_t left_to_read = buf_size;
if (! mfu_copy_opts->direct) {
uint64_t remainder = length - total_bytes;
if (remainder < (uint64_t) buf_size) {
left_to_read = (size_t) remainder;
}
}

if(mfu_copy_opts->synchronous) {
/* O_DIRECT requires particular read sizes */
left_to_read = buf_size;
/* read data from source file */
ssize_t bytes_read = mfu_file_pread(src, buf, left_to_read, off, mfu_src_file);

/* If we're using O_DIRECT, deal with short reads.
* Retry with same buffer and offset since those must
* be aligned at block boundaries. */
while (mfu_copy_opts->direct && /* using O_DIRECT */
bytes_read > 0 && /* read was not an error or eof */
bytes_read < left_to_read && /* shorter than requested */
(off + bytes_read) < file_size) /* not at end of file */
{
/* TODO: probably should retry a limited number of times then abort */
bytes_read = mfu_file_pread(src, buf, left_to_read, off, mfu_src_file);
}

/* read data from source file */
ssize_t num_of_bytes_read = mfu_file_read(src, buf, left_to_read, mfu_src_file);
/* check for an error */
if (bytes_read < 0) {
MFU_LOG(MFU_LOG_ERR, "Read error when copying from `%s' to `%s' (errno=%d %s)",
src, dest, errno, strerror(errno));
return -1;
}

/* check for EOF */
if(! num_of_bytes_read) {
break;
/* check for early EOF */
if (bytes_read == 0) {
MFU_LOG(MFU_LOG_ERR, "Source file `%s' shorter than expected %llu (errno=%d %s)",
src, file_size, errno, strerror(errno));
return -1;
}

/* compute number of bytes to write */
size_t bytes_to_write = (size_t) num_of_bytes_read;
if(mfu_copy_opts->synchronous) {
size_t bytes_to_write = (size_t) bytes_read;
if (mfu_copy_opts->direct) {
/* O_DIRECT requires particular write sizes,
* ok to write beyond end of file so long as
* we truncate in cleanup step */
size_t remainder = buf_size - (size_t) num_of_bytes_read;
if(remainder > 0) {
size_t remainder = buf_size - (size_t) bytes_read;
if (remainder > 0) {
/* zero out the end of the buffer for security,
* don't want to leave data from another file at end of
* current file if we fail before truncating */
char* bufzero = ((char*)buf + num_of_bytes_read);
char* bufzero = ((char*)buf + bytes_read);
memset(bufzero, 0, remainder);
}

/* assumes buf_size is magic size for O_DIRECT */
bytes_to_write = buf_size;
}

/* Write data to destination file.
* Do nothing for a hole in the middle of a file,
* because write of next chunk will create one for us.
* Write only the last byte to create the hole,
* if the hole is next to EOF. */
/* TODO: add code for daos api to support sparse files? */
ssize_t num_of_bytes_written = (ssize_t)bytes_to_write;
/* If in sparse mode, skip writing out blocks that are all 0.
* Rely on posix hole semantics to account for those 0 values instead.
* If this hole is at the end of the file, the truncate below will
* set the file size correctly. */
int skip_write = 0;
if (mfu_copy_opts->sparse && mfu_is_all_null(buf, bytes_to_write)) {
/* TODO: isn't there a better way to know if we're at EOF,
* e.g., by using file size? */
/* determine whether we're at the end of the file */
int end_of_file = mfu_is_eof(src, mfu_src_file);
if (end_of_file < 0) {
/* hit an error while looking for EOF */
return -1;
}
skip_write = 1;
}

/* if we're at the end of the file, write out a byte,
* otherwise just seek out destination file pointer
* ahead without writing anything */
if (end_of_file) {
/* seek to last byte position in file */
if(mfu_file_lseek(dest, mfu_dst_file, (off_t)bytes_to_write - 1, SEEK_CUR) == (off_t)-1) {
MFU_LOG(MFU_LOG_ERR, "Couldn't seek in destination path `%s' (errno=%d %s)",
dest, errno, strerror(errno));
/* write data to destination file if needed */
if (! skip_write) {
/* we loop to account for short writes */
ssize_t n = 0;
while (n < bytes_to_write) {
/* write bytes to destination file */
ssize_t bytes_written = mfu_file_pwrite(dest, ((char*)buf) + n, bytes_to_write - n, off + n, mfu_dst_file);

/* check for an error */
if (bytes_written < 0) {
MFU_LOG(MFU_LOG_ERR, "Write error when copying from `%s' to `%s' (errno=%d %s)",
src, dest, errno, strerror(errno));
return -1;
}

/* write out a single byte */
mfu_file_write(dest, buf, 1, mfu_dst_file);
} else {
/* this section of the destination file is all 0,
* seek past this section */
if(mfu_file_lseek(dest, mfu_dst_file, (off_t)bytes_to_write, SEEK_CUR) == (off_t)-1) {
MFU_LOG(MFU_LOG_ERR, "Couldn't seek in destination path `%s' (errno=%d %s)",
dest, errno, strerror(errno));
return -1;
/* So long as we're not using O_DIRECT, we can handle short writes
* by advancing by the number of bytes written. For O_DIRECT, we
* need to keep buffer, file offset, and amount to write aligned
* on block boundaries, so just retry the entire operation. */
if (!mfu_copy_opts->direct || bytes_written == bytes_to_write) {
n += bytes_written;
}
}
} else {
/* write bytes to destination file */
num_of_bytes_written = mfu_file_write(dest, buf, bytes_to_write, mfu_dst_file);
}

/* check for an error */
if(num_of_bytes_written < 0) {
MFU_LOG(MFU_LOG_ERR, "Write error when copying from `%s' to `%s' (errno=%d %s)",
src, dest, errno, strerror(errno));
return -1;
}

/* check that we wrote the same number of bytes that we read */
if((size_t)num_of_bytes_written != bytes_to_write) {
MFU_LOG(MFU_LOG_ERR, "Write error when copying from `%s' to `%s'",
src, dest);
return -1;
}

total_bytes += (size_t) num_of_bytes_read;
/* update current offset and accumulate number of bytes copied */
off += (off_t) bytes_read;
total_bytes += (uint64_t) bytes_read;

/* update number of bytes we have copied for progress messages */
copy_count += (uint64_t) num_of_bytes_read;
copy_count += (uint64_t) bytes_read;
mfu_progress_update(&copy_count, copy_prog);
}

Expand All @@ -1722,33 +1693,25 @@ static int mfu_copy_file_normal(

#if 0
/* force data to file system */
if(total_bytes > 0) {
if (total_bytes > 0) {
mfu_fsync(dest, out_fd);
}
#endif

/* no need to truncate if sparse file is enabled,
* since we truncated files when they were first created */
if (mfu_copy_opts->sparse) {
return 0;
}

/* if we wrote the last chunk, truncate the file */
off_t last_written = offset + length;
off_t file_size_offt = (off_t) file_size;
if (last_written >= file_size_offt || file_size == 0) {
/* Use ftruncate() here rather than truncate(), because grouplock
* of Lustre would cause block to truncate() since the fd is different
* from the out_fd. */
if(mfu_file_ftruncate(mfu_dst_file, file_size_offt) < 0) {
if (mfu_file_ftruncate(mfu_dst_file, file_size_offt) < 0) {
MFU_LOG(MFU_LOG_ERR, "Failed to truncate destination file: %s (errno=%d %s)",
dest, errno, strerror(errno));
return -1;
}
}

/* we don't bother closing the file because our cache does it for us */

return 0;
}

Expand All @@ -1764,7 +1727,7 @@ static int mfu_copy_file_fiemap(
mfu_file_t* mfu_dst_file)
{
*normal_copy_required = true;
if (mfu_copy_opts->synchronous) {
if (mfu_copy_opts->direct) {
goto fail_normal_copy;
}

Expand Down Expand Up @@ -1938,9 +1901,7 @@ static int mfu_copy_file(
mfu_file_t* mfu_dst_file)
{
int ret;
bool normal_copy_required;


/* open the input file */
mfu_copy_open_file(src, 1, &mfu_copy_src_cache,
mfu_copy_opts, mfu_src_file);
Expand All @@ -1959,6 +1920,7 @@ static int mfu_copy_file(
}

if (mfu_copy_opts->sparse) {
bool normal_copy_required;
ret = mfu_copy_file_fiemap(src, dest, offset, length, file_size,
&normal_copy_required, mfu_copy_opts,
mfu_src_file, mfu_dst_file);
Expand Down Expand Up @@ -2721,7 +2683,7 @@ static int mfu_fill_file(
}

/* compute number of bytes to write */
if (mfu_copy_opts->synchronous) {
if (mfu_copy_opts->direct) {
/* O_DIRECT requires particular write sizes,
* ok to write beyond end of file so long as
* we truncate in cleanup step */
Expand Down Expand Up @@ -3184,7 +3146,7 @@ mfu_copy_opts_t* mfu_copy_opts_new(void)
opts->preserve = false;

/* By default, don't use O_DIRECT. */
opts->synchronous = false;
opts->direct = false;

/* By default, don't use sparse file. */
opts->sparse = false;
Expand Down

0 comments on commit 492b278

Please sign in to comment.