Skip to content

Commit

Permalink
Merge pull request #164 from HDFGroup/chogan/issue_160
Browse files Browse the repository at this point in the history
Fix allocated size of buffering files
  • Loading branch information
ChristopherHogan committed Mar 22, 2021
2 parents 04f0eae + eb0ce3d commit 833a555
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 109 deletions.
52 changes: 40 additions & 12 deletions src/buffer_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -307,10 +307,15 @@ i32 GetSlabBufferSize(SharedMemoryContext *context, DeviceID device_id,
if (slab_index < pool->num_slabs[device_id]) {
result = slab_sizes[slab_index];
} else {
// TODO(chogan): @logging
LOG(WARNING) << "Requested info for a non-existent slab "
<< "(requested slab index: " << slab_index
<< " , max index: " << pool->num_slabs[device_id]
<< std::endl;
}
} else {
// TODO(chogan): @logging
LOG(WARNING) << "Requested info for a non-existent Device "
<< "(requested id: " << device_id << " , max id: "
<< pool->num_devices << std::endl;
}

return result;
Expand Down Expand Up @@ -401,7 +406,6 @@ std::atomic<u32> *GetAvailableBuffersArray(SharedMemoryContext *context,
return result;
}

#if 0
static u32 GetNumBuffersAvailable(SharedMemoryContext *context,
DeviceID device_id, int slab_index) {
std::atomic<u32> *buffers_available = GetAvailableBuffersArray(context,
Expand All @@ -414,6 +418,7 @@ static u32 GetNumBuffersAvailable(SharedMemoryContext *context,
return result;
}

#if 0
static u64 GetNumBytesRemaining(SharedMemoryContext *context,
DeviceID device_id, int slab_index) {
u32 num_free_buffers = GetNumBuffersAvailable(context, device_id, slab_index);
Expand Down Expand Up @@ -1199,6 +1204,18 @@ void MakeFullShmemName(char *dest, const char *base) {
}
}

FILE *FopenOrTerminate(const char *fname, const char *mode) {
FILE *result = fopen(fname, mode);

if (!result) {
LOG(ERROR) << "Failed to open file at " << fname << ": ";
perror(nullptr);
LOG(FATAL) << "Terminating...";
}

return result;
}

void InitFilesForBuffering(SharedMemoryContext *context, bool make_space) {
BufferPool *pool = GetBufferPoolFromContext(context);
context->buffering_filenames.resize(pool->num_devices);
Expand Down Expand Up @@ -1228,20 +1245,31 @@ void InitFilesForBuffering(SharedMemoryContext *context, bool make_space) {

const char *buffering_fname =
context->buffering_filenames[device_id][slab].c_str();
FILE *buffering_file = fopen(buffering_fname, "w+");
FILE *buffering_file = FopenOrTerminate(buffering_fname, "w+");

if (make_space) {
if (device->has_fallocate) {
// TODO(chogan): Use posix_fallocate when it is available
} else {
// TODO(chogan): Some Devices require file initialization on each
// node, and some are shared (burst buffers) and only require one rank
// to initialize them
if (device->is_shared) {
// TODO(chogan): Some Devices require file initialization on each
// node, and some are shared (burst buffers) and only require one
// rank to initialize them
HERMES_NOT_IMPLEMENTED_YET;
}

Target *target = GetTarget(context, device_id);
[[maybe_unused]] int ftruncate_result =
ftruncate(fileno(buffering_file), target->capacity);
// TODO(chogan): @errorhandling
assert(ftruncate_result == 0);
u32 num_buffers = GetNumBuffersAvailable(context, device_id, slab);
i32 buffer_size = GetSlabBufferSize(context, device_id, slab);
size_t this_slabs_capacity = num_buffers * buffer_size;

int ftruncate_result = ftruncate(fileno(buffering_file),
this_slabs_capacity);
if (ftruncate_result) {
LOG(ERROR) << "Failed to allocate buffering file at "
<< buffering_fname << ": ";
perror(nullptr);
LOG(FATAL) << "Terminating...";
}
}
}
context->open_streams[device_id][slab] = buffering_file;
Expand Down
2 changes: 2 additions & 0 deletions src/buffer_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ struct Device {
* Device
*/
bool has_fallocate;
/** True if the device is shared among multiple ranks (e.g., burst buffers) */
bool is_shared;
/** The directory where buffering files can be created. Zero terminated. */
char mount_point[kMaxPathLength];
};
Expand Down
2 changes: 1 addition & 1 deletion test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ target_link_libraries(bp ${LIBRT} hermes MPI::MPI_CXX
$<$<BOOL:${HERMES_RPC_THALLIUM}>:thallium>)
target_compile_definitions(bp
PRIVATE $<$<BOOL:${HERMES_RPC_THALLIUM}>:HERMES_RPC_THALLIUM>)
add_test(NAME "TestBufferPool" COMMAND "${CMAKE_BINARY_DIR}/bin/bp" "-b" "-s")
add_test(NAME "TestBufferPool" COMMAND "${CMAKE_BINARY_DIR}/bin/bp")
set_tests_properties("TestBufferPool" PROPERTIES ENVIRONMENT
LSAN_OPTIONS=suppressions=${CMAKE_CURRENT_SOURCE_DIR}/data/asan.supp)

Expand Down
Loading

0 comments on commit 833a555

Please sign in to comment.