Skip to content

Commit

Permalink
Merge pull request #170 from HDFGroup/chogan/error_handling
Browse files Browse the repository at this point in the history
Another error handling pass
  • Loading branch information
ChristopherHogan committed Apr 9, 2021
2 parents 5bcd25a + 8fca6fd commit c981906
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 118 deletions.
3 changes: 0 additions & 3 deletions src/api/bucket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,6 @@ Status Bucket::RenameBlob(const std::string &old_name,
Status ret;

if (IsBlobNameTooLong(new_name)) {
// TODO(chogan): @errorhandling
ret = BLOB_NAME_TOO_LONG;
LOG(ERROR) << ret.Msg();
return ret;
Expand Down Expand Up @@ -248,7 +247,6 @@ Status Bucket::Rename(const std::string &new_name, const Context &ctx) {
Status ret;

if (IsBucketNameTooLong(new_name)) {
// TODO(chogan): @errorhandling
ret = BUCKET_NAME_TOO_LONG;
LOG(ERROR) << ret.Msg();
return ret;
Expand Down Expand Up @@ -317,7 +315,6 @@ Status Bucket::Destroy(const Context &ctx) {
if (destroyed) {
id_.as_int = 0;
} else {
// TODO(chogan): @errorhandling
result = BUCKET_IN_USE;
LOG(ERROR) << result.Msg();
}
Expand Down
11 changes: 2 additions & 9 deletions src/api/vbucket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ Status VBucket::Link(std::string blob_name, std::string bucket_name,
}
}
} else {
// TODO(hari): @errorhandling
ret = BLOB_NOT_IN_BUCKET;
LOG(ERROR) << ret.Msg();
}
Expand Down Expand Up @@ -94,7 +93,6 @@ Status VBucket::Unlink(std::string blob_name, std::string bucket_name,
}
}
if (!found) {
// TODO(hari): @errorhandling
ret = BLOB_NOT_LINKED_TO_VBUCKET;
LOG(ERROR) << ret.Msg();
}
Expand Down Expand Up @@ -170,7 +168,6 @@ Status VBucket::Attach(Trait* trait, Context& ctx) {
}
attached_traits_.push_back(trait);
} else {
// TODO(hari): @errorhandling throw trait already exists.
ret = TRAIT_EXISTS_ALREADY;
LOG(ERROR) << ret.Msg();
}
Expand Down Expand Up @@ -216,7 +213,6 @@ Status VBucket::Detach(Trait* trait, Context& ctx) {
}
attached_traits_.erase(selected_trait_iter);
} else {
// TODO(hari): @errorhandling throw trait not valid.
ret = TRAIT_NOT_VALID;
LOG(ERROR) << ret.Msg();
}
Expand Down Expand Up @@ -290,20 +286,18 @@ Status VBucket::Delete(Context& ctx) {
auto blob_id =
GetBlobId(&hermes_->context_, &hermes_->rpc_, ci->second,
bucket_id);
// TODO(hari): @errorhandling check return of StdIoPersistBlob
ret = StdIoPersistBlob(&hermes_->context_, &hermes_->rpc_,
&hermes_->trans_arena_, blob_id, file,
iter->second);
if (!ret.Succeeded())
if (!ret.Succeeded()) {
LOG(ERROR) << ret.Msg();
}
} else {
// TODO(hari): @errorhandling map doesnt have the blob linked.
ret = BLOB_NOT_LINKED_IN_MAP;
LOG(ERROR) << ret.Msg();
}

} else {
// TODO(hari): @errorhandling offset_map should not be empty
ret = OFFSET_MAP_EMPTY;
LOG(ERROR) << ret.Msg();
}
Expand All @@ -324,7 +318,6 @@ Status VBucket::Delete(Context& ctx) {
if (file != nullptr) {
fflush(file);
if (fclose(file) != 0) {
// TODO(chogan): @errorhandling
ret = FCLOSE_FAILED;
LOG(ERROR) << ret.Msg() << strerror(errno);
}
Expand Down
160 changes: 74 additions & 86 deletions src/buffer_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -732,10 +732,13 @@ Device *InitDevices(Arena *arena, Config *config) {
if (path_length == 0) {
device->is_byte_addressable = true;
} else {
// TODO(chogan): @errorhandling
assert(path_length < kMaxPathLength);
snprintf(device->mount_point, path_length + 1, "%s",
config->mount_points[i].c_str());
if (path_length < kMaxPathLength) {
snprintf(device->mount_point, path_length + 1, "%s",
config->mount_points[i].c_str());
} else {
LOG(ERROR) << "Mount point path " << config->mount_points[i]
<< " exceeds max length of " << kMaxPathLength << std::endl;
}
}
}

Expand Down Expand Up @@ -1183,8 +1186,7 @@ void SerializeBufferPoolToFile(SharedMemoryContext *context, FILE *file) {
int result = fwrite(context->shm_base, context->shm_size, 1, file);

if (result < 1) {
// TODO(chogan): @errorhandling
perror("Failed to serialize BufferPool to file");
FailedLibraryCall("fwrite");
}
}

Expand All @@ -1196,11 +1198,17 @@ void MakeFullShmemName(char *dest, const char *base) {
char *username = getenv("USER");
if (username) {
size_t username_length = strlen(username);
[[maybe_unused]] size_t total_length =
base_name_length + username_length + 1;
// TODO(chogan): @errorhandling
assert(total_length < kMaxBufferPoolShmemNameLength);
snprintf(dest + base_name_length, username_length + 1, "%s", username);
size_t total_length = base_name_length + username_length + 1;

if (total_length < kMaxBufferPoolShmemNameLength) {
snprintf(dest + base_name_length, username_length + 1, "%s", username);
} else {
LOG(ERROR) << "Shared memory name " << base << username
<< " exceeds max length of " << kMaxBufferPoolShmemNameLength
<< std::endl;
}
} else {
// TODO(chogan): Use pid?
}
}

Expand Down Expand Up @@ -1283,22 +1291,24 @@ u8 *InitSharedMemory(const char *shmem_name, size_t total_size) {
shm_open(shmem_name, O_CREAT | O_RDWR | O_TRUNC, S_IRUSR | S_IWUSR);

if (shmem_fd >= 0) {
[[maybe_unused]] int ftruncate_result = ftruncate(shmem_fd, total_size);
// TODO(chogan): @errorhandling
assert(ftruncate_result == 0);
int ftruncate_result = ftruncate(shmem_fd, total_size);
if (ftruncate_result != 0) {
FailedLibraryCall("ftruncate");
}

result = (u8 *)mmap(0, total_size, PROT_READ | PROT_WRITE, MAP_SHARED,
shmem_fd, 0);
// TODO(chogan): @errorhandling
close(shmem_fd);

if (result == MAP_FAILED) {
FailedLibraryCall("mmap");
}
if (close(shmem_fd) == -1) {
FailedLibraryCall("close");
}
} else {
// TODO(chogan): @errorhandling
assert(!"shm_open failed\n");
FailedLibraryCall("shm_open");
}

// TODO(chogan): @errorhandling
assert(result);

return result;
}

Expand Down Expand Up @@ -1353,17 +1363,15 @@ void CloseBufferingFiles(SharedMemoryContext *context) {
if (context->open_streams[device_id][slab]) {
int fclose_result = fclose(context->open_streams[device_id][slab]);
if (fclose_result != 0) {
// TODO(chogan): @errorhandling
HERMES_NOT_IMPLEMENTED_YET;
FailedLibraryCall("fclose");
}
}
}
}

if (context->swap_file) {
if (fclose(context->swap_file) != 0) {
// TODO(chogan): @errorhandling
HERMES_NOT_IMPLEMENTED_YET;
FailedLibraryCall("fclose");
}
}
}
Expand Down Expand Up @@ -1405,12 +1413,12 @@ size_t LocalWriteBufferById(SharedMemoryContext *context, BufferID id,
context->open_streams[device->id][slab_index] = file;
}
fseek(file, header->data_offset, SEEK_SET);
[[maybe_unused]] size_t items_written = fwrite(at, write_size, 1, file);
// TODO(chogan): @errorhandling
assert(items_written == 1);
size_t items_written = fwrite(at, write_size, 1, file);
if (items_written != 1) {
FailedLibraryCall("fwrite");
}
if (fflush(file) != 0) {
// TODO(chogan): @errorhandling
LOG(WARNING) << "fflush failed\n";
FailedLibraryCall("fflush");
}
// fsync(fileno(file));
}
Expand Down Expand Up @@ -1480,8 +1488,9 @@ size_t LocalReadBufferById(SharedMemoryContext *context, BufferID id,
fseek(file, header->data_offset, SEEK_SET);
size_t items_read = fread((u8 *)blob->data + read_offset, read_size, 1,
file);
// TODO(chogan): @errorhandling
assert(items_read == 1);
if (items_read != 1) {
FailedLibraryCall("fwrite");
}
result = items_read * read_size;
}
UnlockBuffer(header);
Expand All @@ -1505,8 +1514,10 @@ size_t ReadBlobFromBuffers(SharedMemoryContext *context, RpcContext *rpc,
"RemoteBulkReadBufferById",
blob->data + total_bytes_read,
buffer_sizes[i], id);
// TODO(chogan): @errorhandling
assert(bytes_transferred == buffer_sizes[i]);
if (bytes_transferred != buffer_sizes[i]) {
LOG(ERROR) << "BulkRead only transferred " << bytes_transferred
<< " out of " << buffer_sizes[i] << " bytes" << std::endl;
}
bytes_read += bytes_transferred;
} else {
std::vector<u8> data =
Expand All @@ -1522,8 +1533,11 @@ size_t ReadBlobFromBuffers(SharedMemoryContext *context, RpcContext *rpc,
}
total_bytes_read += bytes_read;
}
// TODO(chogan): @errorhandling
assert(total_bytes_read == blob->size);

if (total_bytes_read != blob->size) {
LOG(ERROR) << __func__ << "expected to read a Blob of size " << blob->size
<< " but only read " << total_bytes_read << std::endl;
}

return total_bytes_read;
}
Expand Down Expand Up @@ -1558,52 +1572,39 @@ size_t ReadBlobById(SharedMemoryContext *context, RpcContext *rpc, Arena *arena,
return result;
}

int OpenSwapFile(SharedMemoryContext *context, u32 node_id) {
int result = 0;

void OpenSwapFile(SharedMemoryContext *context, u32 node_id) {
if (!context->swap_file) {
MetadataManager *mdm = GetMetadataManagerFromContext(context);
std::string swap_path = GetSwapFilename(mdm, node_id);
context->swap_file = fopen(swap_path.c_str(), "a+");

if (!context->swap_file) {
// TODO(chogan): @errorhandling
result = 1;
FailedLibraryCall("fopen");
}
}

return result;
}

SwapBlob WriteToSwap(SharedMemoryContext *context, Blob blob, u32 node_id,
BucketID bucket_id) {
SwapBlob result = {};

if (OpenSwapFile(context, node_id) == 0) {
if (fseek(context->swap_file, 0, SEEK_END) != 0) {
// TODO(chogan): @errorhandling
HERMES_NOT_IMPLEMENTED_YET;
}
OpenSwapFile(context, node_id);
if (fseek(context->swap_file, 0, SEEK_END) != 0) {
FailedLibraryCall("fseek");
}

long int file_position = ftell(context->swap_file);
if (file_position == -1) {
// TODO(chogan): @errorhandling
HERMES_NOT_IMPLEMENTED_YET;
}
result.offset = file_position;
long int file_position = ftell(context->swap_file);
if (file_position == -1) {
FailedLibraryCall("ftell");
}
result.offset = file_position;

if (fwrite(blob.data, 1, blob.size, context->swap_file) != blob.size) {
// TODO(chogan): @errorhandling
HERMES_NOT_IMPLEMENTED_YET;
}
if (fwrite(blob.data, 1, blob.size, context->swap_file) != blob.size) {
FailedLibraryCall("fwrite");
}

if (fflush(context->swap_file) != 0) {
// TODO(chogan): @errorhandling
HERMES_NOT_IMPLEMENTED_YET;
}
} else {
// TODO(chogan): @errorhandling
HERMES_NOT_IMPLEMENTED_YET;
if (fflush(context->swap_file) != 0) {
FailedLibraryCall("fflush");
}

result.node_id = node_id;
Expand Down Expand Up @@ -1631,21 +1632,14 @@ SwapBlob PutToSwap(SharedMemoryContext *context, RpcContext *rpc,
size_t ReadFromSwap(SharedMemoryContext *context, Blob blob,
SwapBlob swap_blob) {
u32 node_id = swap_blob.node_id;
if (OpenSwapFile(context, node_id) == 0) {
if (fseek(context->swap_file, swap_blob.offset, SEEK_SET) != 0) {
// TODO(chogan): @errorhandling
HERMES_NOT_IMPLEMENTED_YET;
}

if (fread(blob.data, 1, swap_blob.size, context->swap_file) !=
swap_blob.size) {
// TODO(chogan): @errorhandling
HERMES_NOT_IMPLEMENTED_YET;
}
OpenSwapFile(context, node_id);
if (fseek(context->swap_file, swap_blob.offset, SEEK_SET) != 0) {
FailedLibraryCall("fseek");
}

} else {
// TODO(chogan): @errorhandling
HERMES_NOT_IMPLEMENTED_YET;
if (fread(blob.data, 1, swap_blob.size, context->swap_file) !=
swap_blob.size) {
FailedLibraryCall("fread");
}

return swap_blob.size;
Expand Down Expand Up @@ -1715,26 +1709,22 @@ api::Status StdIoPersistBucket(SharedMemoryContext *context, RpcContext *rpc,
// they were `Put`, but once we have a Trait that represents a file
// mapping, we'll need pwrite and offsets.
if (fwrite(data.data(), 1, num_bytes, file) != num_bytes) {
// TODO(chogan): @errorhandling
result = STDIO_FWRITE_FAILED;
LOG(ERROR) << result.Msg() << strerror(errno);
break;
}
} else {
// TODO(chogan): @errorhandling
result = READ_BLOB_FAILED;
LOG(ERROR) << result.Msg();
break;
}
}

if (fclose(file) != 0) {
// TODO(chogan): @errorhandling
result = STDIO_FCLOSE_FAILED;
LOG(ERROR) << result.Msg() << strerror(errno);
}
} else {
// TODO(chogan): @errorhandling
result = STDIO_FOPEN_FAILED;
LOG(ERROR) << result.Msg() << strerror(errno);
}
Expand Down Expand Up @@ -1763,22 +1753,20 @@ api::Status StdIoPersistBlob(SharedMemoryContext *context, RpcContext *rpc,
LOG(INFO) << "STDIO Flush to file: " << " offset: " << offset
<< " of size:" << num_bytes << "." << std::endl;
if (fwrite(data.data(), 1, num_bytes, file) != num_bytes) {
// TODO(chogan): @errorhandling
result = STDIO_FWRITE_FAILED;
LOG(ERROR) << result.Msg() << strerror(errno);
}
} else {
// TODO(chogan): @errorhandling
result = STDIO_OFFSET_ERROR;
LOG(ERROR) << result.Msg() << strerror(errno);
}
}

} else {
// TODO(chogan): @errorhandling
result = INVALID_FILE;
LOG(ERROR) << result.Msg();
}

return result;
}

Expand Down

0 comments on commit c981906

Please sign in to comment.