Skip to content

Commit

Permalink
mfu: improve archive scan to handle multiple EOF markers
Browse files Browse the repository at this point in the history
Signed-off-by: Adam Moody <moody20@llnl.gov>
  • Loading branch information
adammoody committed Jan 23, 2021
1 parent febfe64 commit a6b6516
Showing 1 changed file with 84 additions and 68 deletions.
152 changes: 84 additions & 68 deletions src/common/mfu_flist_archive.c
Original file line number Diff line number Diff line change
Expand Up @@ -3108,7 +3108,7 @@ static int index_entries_distread_parallelscan(
size_t bytes_remaining = ptr_end - ptr;
char* ptr_found = (char*) memmem(ptr, bytes_remaining, "ustar", strlen("ustar"));
if (ptr_found == NULL) {
/* no "ustar" foudn in the rest of the buffer */
/* no "ustar" found in the rest of the buffer */
break;
}

Expand Down Expand Up @@ -3153,9 +3153,18 @@ static int index_entries_distread_parallelscan(
struct archive_entry* entry;
int r = archive_read_next_header(a, &entry);
if (r == ARCHIVE_EOF) {
/* hit end of the archive, advance past "end-of-archive" markers
/* hit end of the archive, compute size of "end-of-archive" marker */
uint64_t eof_start = (uint64_t) archive_read_header_position(a);
uint64_t eof_end = (uint64_t) archive_filter_bytes(a, -1);
uint64_t eof_size = eof_end - eof_start;

/* advance past "end-of-archive" marker
* in case another archive starts just beyond */
ptr += 2 * 512;
ptr += eof_size;

/* compute file offset to start of next archive, if any */
max_offset = pos + buf_offset + eof_start + eof_size;

break;
}
if (r != ARCHIVE_OK) {
Expand All @@ -3165,9 +3174,9 @@ static int index_entries_distread_parallelscan(
* If this is actually a corrupt entry, then the rank
* following us will also fail trying to process it */
if (! found_one) {
/* failed to process the first entry in the archive,
* so skip past this "ustar" instance on to the next */
ptr = ptr_found + strlen("ustar");
/* failed to process the first entry in the archive,
* so skip past this "ustar" instance on to the next */
ptr = ptr_found + strlen("ustar");
}
break;
}
Expand Down Expand Up @@ -3214,27 +3223,30 @@ static int index_entries_distread_parallelscan(
/* TODO: also record entry sizes, add to headers, and then check that starting
* position on each rank matches where the rank before left off */

/* initiate archive object for reading */
struct archive* a = archive_read_new();

/* we want all the format supports */
// archive_read_support_filter_bzip2(a);
// archive_read_support_filter_gzip(a);
// archive_read_support_filter_compress(a);
archive_read_support_format_tar(a);

/* advance file position to offset based on our starting offset */
pos = starting_pos;

/* variables to count number of entries we find,
* and byte offset within archive for each one */
* and byte offset within archive for each one,
* start with 1024 entries, we'll grow this list of offsets as needed */
uint64_t count = 0;
uint64_t* offsets = NULL;
size_t maxcount = 1024;
uint64_t* offsets = (uint64_t*) malloc(maxcount * sizeof(uint64_t));

/* so long as starting position does not lie past our region,
* and so long as we have not already hit an error, process any
* entries from our region of the file */
if (pos < offset_last && rc == MFU_SUCCESS) {
int break_outer = 0;
while (bufsize > 0 && pos < offset_last && rc == MFU_SUCCESS && !break_outer) {
/* initiate archive object for reading */
struct archive* a = archive_read_new();

/* we want all the format supports */
// archive_read_support_filter_bzip2(a);
// archive_read_support_filter_gzip(a);
// archive_read_support_filter_compress(a);
archive_read_support_format_tar(a);

/* count number of bytes we may be starting before or after the start
* of our assigned region, which may be positive or negative */
int64_t adjustment = (int64_t)pos - (int64_t)offset_start;
Expand All @@ -3259,8 +3271,6 @@ static int index_entries_distread_parallelscan(
/* Skip through entries in our buffer, counting them
* and recording their byte offset within the file as
* we go. */
size_t maxcount = 1024;
offsets = (uint64_t*) malloc(maxcount * sizeof(uint64_t));
while (pos < offset_last && rc == MFU_SUCCESS) {
/* increase our buffer capacity if needed */
if (count >= maxcount) {
Expand All @@ -3273,7 +3283,11 @@ static int index_entries_distread_parallelscan(
struct archive_entry* entry;
int r = archive_read_next_header(a, &entry);
if (r == ARCHIVE_EOF) {
/* hit an early end of the archive */
/* hit end of the archive, compute size of "end-of-archive" marker,
* and advance past it */
uint64_t eof_start = (uint64_t) archive_read_header_position(a);
uint64_t eof_end = (uint64_t) archive_filter_bytes(a, -1);
pos += (eof_end - eof_start);
break;
}
if (r != ARCHIVE_OK) {
Expand All @@ -3282,6 +3296,14 @@ static int index_entries_distread_parallelscan(
* on to the next rank to see if it has better luck.
* If it actually is a corrupt entry, then the rank
* following us will also fail trying to process it */
if (offset_last == file_size) {
/* our section of the file runs to the end,
* so this must be an actual error */
rc = MFU_FAILURE;
}

/* break the outer loop to pass the ball to the next rank */
break_outer = 1;
break;
}

Expand All @@ -3304,33 +3326,21 @@ static int index_entries_distread_parallelscan(
/* advance to the next item */
count++;
}
}

/* have the last rank check that we processed the full file */
if (rank == (ranks - 1) && offset_last == file_size && rc == MFU_SUCCESS) {
/* if we're the last rank and things are still successful up
* to this point, check that pos reached the last byte
* of the archive */
uint64_t termination_bytes = 2 * 512;
pos += termination_bytes;
if (pos != offset_last) {
MFU_LOG(MFU_LOG_ERR, "Failed to process full archive '%s'",
filename);
rc = MFU_FAILURE;
}
/* done reading the archive, so free resources */
archive_read_close(a);
archive_read_free(a);
}

/* done reading the archive, so free resources */
archive_read_close(a);
archive_read_free(a);

/* get maximum offset from all ranks before us to use as our starting position */
uint64_t max_pos = 0;
MPI_Allreduce(&pos, &max_pos, 1, MPI_UINT64_T, MPI_MAX, MPI_COMM_WORLD);
*out_starting_pos = max_pos;

/* check whether anyone failed */
if (! mfu_alltrue(rc == MFU_SUCCESS, MPI_COMM_WORLD)) {
/* release the offset array */
mfu_free(&offsets);
return MFU_FAILURE;
}

Expand Down Expand Up @@ -3359,15 +3369,6 @@ static int index_entries_distread_linearscan(
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &ranks);

/* initiate archive object for reading */
struct archive* a = archive_read_new();

/* we want all the format supports */
// archive_read_support_filter_bzip2(a);
// archive_read_support_filter_gzip(a);
// archive_read_support_filter_compress(a);
archive_read_support_format_tar(a);

/* direct scan: rank 0 starts by reading its buffer, and then it sends the
* file offset at which rank 1 should pick up and continue */
uint64_t starting_pos = *out_starting_pos;
Expand All @@ -3389,14 +3390,26 @@ static int index_entries_distread_linearscan(
uint64_t pos = starting_pos;

/* variables to count number of entries we find,
* and byte offset within archive for each one */
* and byte offset within archive for each one,
* start with 1024 entries, we'll grow this list of offsets as needed */
uint64_t count = 0;
uint64_t* offsets = NULL;
size_t maxcount = 1024;
uint64_t* offsets = (uint64_t*) malloc(maxcount * sizeof(uint64_t));

/* so long as starting position does not lie past our region,
* and so long as we have not already hit an error, process any
* entries from our region of the file */
if (pos < offset_last && rc == MFU_SUCCESS) {
int break_outer = 0;
while (pos < offset_last && rc == MFU_SUCCESS && !break_outer) {
/* initiate archive object for reading */
struct archive* a = archive_read_new();

/* we want all the format supports */
// archive_read_support_filter_bzip2(a);
// archive_read_support_filter_gzip(a);
// archive_read_support_filter_compress(a);
archive_read_support_format_tar(a);

/* count number of bytes we may be starting before or after the start
* of our assigned region, may be positive or negative */
int64_t adjustment = (int64_t)pos - (int64_t)offset_start;
Expand Down Expand Up @@ -3425,8 +3438,6 @@ static int index_entries_distread_linearscan(
/* Skip through entries in our buffer, counting them
* and recording their byte offset within the file as
* we go. */
size_t maxcount = 1024;
offsets = (uint64_t*) malloc(maxcount * sizeof(uint64_t));
while (pos < offset_last && rc == MFU_SUCCESS) {
/* increase our buffer capacity if needed */
if (count >= maxcount) {
Expand All @@ -3439,7 +3450,11 @@ static int index_entries_distread_linearscan(
struct archive_entry* entry;
int r = archive_read_next_header(a, &entry);
if (r == ARCHIVE_EOF) {
/* hit end of the archive */
/* hit end of the current archive, but there could be another,
* update position past end of the current archive */
uint64_t archive_offset = offset_start + adjustment;
uint64_t eof_offset = (uint64_t) archive_filter_bytes(a, -1);
pos = archive_offset + eof_offset;
break;
}
if (r != ARCHIVE_OK) {
Expand All @@ -3448,6 +3463,14 @@ static int index_entries_distread_linearscan(
* on to the next rank to see if it has better luck.
* If it actually is a corrupt entry, then the rank
* following us will also fail trying to process it */
if (offset_last == file_size) {
/* our section of the file runs to the end,
* so this must be an actual error */
rc = MFU_FAILURE;
}

/* break the outer loop to pass the ball to the next rank */
break_outer = 1;
break;
}

Expand All @@ -3470,35 +3493,28 @@ static int index_entries_distread_linearscan(
/* advance to the next item */
count++;
}

/* close the archive that we opened */
archive_read_close(a);

/* done reading the archive, so free resources */
archive_read_free(a);
}

/* send the starting position for the rank to our right */
if (rank < ranks - 1) {
MPI_Send(&pos, 1, MPI_UINT64_T, rank + 1, 0, MPI_COMM_WORLD);
MPI_Send(&rc, 1, MPI_INT, rank + 1, 0, MPI_COMM_WORLD);
} else if (rc == MFU_SUCCESS && offset_last == file_size) {
/* if we're the last rank and things are still successful up
* to this point, check that pos reached the last byte
* of the archive */
uint64_t termination_bytes = 2 * 512;
pos += termination_bytes;
if (pos != offset_last) {
MFU_LOG(MFU_LOG_ERR, "Failed to process full archive '%s'",
filename);
rc = MFU_FAILURE;
}
}

/* done reading the archive, so free resources */
archive_read_close(a);
archive_read_free(a);

/* get starting position for the next round, if any */
*out_starting_pos = pos;
MPI_Bcast(out_starting_pos, 1, MPI_UINT64_T, ranks-1, MPI_COMM_WORLD);

/* check whether anyone failed */
if (! mfu_alltrue(rc == MFU_SUCCESS, MPI_COMM_WORLD)) {
/* release the offset array */
mfu_free(&offsets);
return MFU_FAILURE;
}

Expand Down

0 comments on commit a6b6516

Please sign in to comment.