Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion capio/server/capio_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
#include "common/logger.hpp"
#include "common/requests.hpp"
#include "common/semaphore.hpp"
#include "utils/capio_file.hpp"
#include "storage/capio_file.hpp"
#include "utils/common.hpp"
#include "utils/env.hpp"
#include "utils/types.hpp"
Expand Down
4 changes: 2 additions & 2 deletions capio/server/include/handlers/close.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ inline void handle_close(int tid, int fd) {
LOG("File was closed", path.c_str());

if (CapioCLEngine::get().getCommitRule(path) == capiocl::commitRules::ON_CLOSE &&
c_file.is_closed()) {
c_file.closed()) {
LOG("Capio File %s is closed and commit rule is on_close. setting it to complete",
path.c_str());
c_file.set_complete();
c_file.setComplete();
c_file.commit();
}

Expand Down
6 changes: 3 additions & 3 deletions capio/server/include/handlers/exig.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@ inline void handle_exit_group(int tid) {
LOG("Handling file %s", path.c_str());
if (CapioCLEngine::get().getCommitRule(path) == capiocl::commitRules::ON_TERMINATION) {
CapioFile &c_file = storage_manager->get(path);
if (c_file.is_dir()) {
if (c_file.directory()) {
LOG("file %s is dir", path.c_str());
long int n_committed = c_file.n_files_expected;
if (n_committed <= c_file.n_files) {
LOG("Setting file %s to complete", path.c_str());
c_file.set_complete();
c_file.setComplete();
}
} else {
LOG("Setting file %s to complete", path.c_str());
c_file.set_complete();
c_file.setComplete();
c_file.commit();
}
c_file.close();
Expand Down
15 changes: 7 additions & 8 deletions capio/server/include/handlers/getdents.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,16 @@ inline void request_remote_getdents(int tid, int fd, off64_t count) {
CapioFile &c_file = storage_manager->get(tid, fd);
off64_t offset = storage_manager->getFileOffset(tid, fd);
off64_t end_of_read = offset + count;
off64_t end_of_sector = c_file.get_sector_end(offset);
off64_t end_of_sector = c_file.getSectorEnd(offset);

if (c_file.is_complete() &&
(end_of_read <= end_of_sector ||
(end_of_sector == -1 ? 0 : end_of_sector) == c_file.real_file_size)) {
if (c_file.complete() && (end_of_read <= end_of_sector ||
(end_of_sector == -1 ? 0 : end_of_sector) == c_file.real_file_size)) {
LOG("Handling local read");
send_dirent_to_client(tid, fd, c_file, offset, count);
} else if (end_of_read <= end_of_sector) {
LOG("?");
c_file.create_buffer_if_needed(storage_manager->getPath(tid, fd), false);
client_manager->replyToClient(tid, offset, c_file.get_buffer(), count);
c_file.createBufferIfNeeded(storage_manager->getPath(tid, fd), false);
client_manager->replyToClient(tid, offset, c_file.getBuffer(), count);
storage_manager->setFileOffset(tid, fd, offset + count);
} else {
LOG("Delegating to backend remote read");
Expand All @@ -53,6 +52,7 @@ inline void handle_getdents(int tid, int fd, long int count) {
if (strcmp(std::get<0>(get_file_location(path_to_check)), node_name) == 0) {
handle_getdents(tid, fd, count);
} else {

request_remote_getdents(tid, fd, count);
}
});
Expand All @@ -63,8 +63,7 @@ inline void handle_getdents(int tid, int fd, long int count) {
off64_t offset = storage_manager->getFileOffset(tid, fd);
send_dirent_to_client(tid, fd, c_file, offset, count);
} else {
LOG("File is remote");
LOG("Delegating to backend remote read");
LOG("File is remote. Delegating to backend remote read");
request_remote_getdents(tid, fd, count);
}
}
Expand Down
32 changes: 15 additions & 17 deletions capio/server/include/handlers/read.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ inline void handle_pending_read(int tid, int fd, long int process_offset, long i

const std::filesystem::path &path = storage_manager->getPath(tid, fd);
CapioFile &c_file = storage_manager->get(path);
off64_t end_of_sector = c_file.get_sector_end(process_offset);
off64_t end_of_sector = c_file.getSectorEnd(process_offset);
off64_t end_of_read = process_offset + count;

off64_t bytes_read;
Expand All @@ -30,8 +30,8 @@ inline void handle_pending_read(int tid, int fd, long int process_offset, long i
bytes_read = end_of_sector - process_offset;
}

c_file.create_buffer_if_needed(path, false);
client_manager->replyToClient(tid, process_offset, c_file.get_buffer(), bytes_read);
c_file.createBufferIfNeeded(path, false);
client_manager->replyToClient(tid, process_offset, c_file.getBuffer(), bytes_read);
storage_manager->setFileOffset(tid, fd, process_offset + bytes_read);

// TODO: check if the file was moved to the disk
Expand All @@ -47,13 +47,13 @@ inline void handle_local_read(int tid, int fd, off64_t count, bool is_prod) {
off64_t process_offset = storage_manager->getFileOffset(tid, fd);

// if a process is the producer of a file, then the file is always complete for that process
const bool file_complete = c_file.is_complete() || is_prod;
const bool file_complete = c_file.complete() || is_prod;

if (!(file_complete || CapioCLEngine::get().isFirable(path))) {
// wait for file to be completed and then do what is done inside handle pending read
LOG("Data is not available yet. Starting async thread to wait for file availability");
std::thread t([&c_file, tid, fd, count, process_offset] {
c_file.wait_for_completion();
c_file.waitForCompletion();
handle_pending_read(tid, fd, process_offset, count);
});
t.detach();
Expand All @@ -63,7 +63,7 @@ inline void handle_local_read(int tid, int fd, off64_t count, bool is_prod) {
LOG("Data can be served. Condition met: %s %s", file_complete ? "c_file.is_complete()" : "",
CapioCLEngine::get().isFirable(path) ? "CapioCLEngine::get().isFirable(path)" : "");

const off64_t end_of_sector = c_file.get_sector_end(process_offset);
const off64_t end_of_sector = c_file.getSectorEnd(process_offset);
if (end_of_sector == -1) {
LOG("End of sector is -1. returning process_offset without serving data");
client_manager->replyToClient(tid, process_offset);
Expand All @@ -74,7 +74,7 @@ inline void handle_local_read(int tid, int fd, off64_t count, bool is_prod) {
LOG("Mode is NO_UPDATE, but not enough data is available. Awaiting for data on "
"a separate thread before sending it to client");
std::thread t([&c_file, tid, fd, count, process_offset] {
c_file.wait_for_data(process_offset + count);
c_file.waitForData(process_offset + count);
handle_pending_read(tid, fd, process_offset, count);
});
t.detach();
Expand All @@ -85,8 +85,8 @@ inline void handle_local_read(int tid, int fd, off64_t count, bool is_prod) {
const auto read_size = std::min(count, end_of_sector - process_offset);
LOG("Requested read within end of sector, and data is available. Serving %ld bytes", read_size);

c_file.create_buffer_if_needed(path, false);
client_manager->replyToClient(tid, process_offset, c_file.get_buffer(), read_size);
c_file.createBufferIfNeeded(path, false);
client_manager->replyToClient(tid, process_offset, c_file.getBuffer(), read_size);
storage_manager->setFileOffset(tid, fd, process_offset + read_size);
}

Expand All @@ -97,18 +97,17 @@ inline void request_remote_read(int tid, int fd, off64_t count) {
CapioFile &c_file = storage_manager->get(path);
off64_t offset = storage_manager->getFileOffset(tid, fd);
off64_t end_of_read = offset + count;
off64_t end_of_sector = c_file.get_sector_end(offset);
off64_t end_of_sector = c_file.getSectorEnd(offset);

if (c_file.is_complete() &&
(end_of_read <= end_of_sector ||
(end_of_sector == -1 ? 0 : end_of_sector) == c_file.real_file_size)) {
if (c_file.complete() && (end_of_read <= end_of_sector ||
(end_of_sector == -1 ? 0 : end_of_sector) == c_file.real_file_size)) {
LOG("Handling local read");
handle_local_read(tid, fd, count, true);
} else if (end_of_read <= end_of_sector) {
LOG("Data is present locally and can be served to client");
c_file.create_buffer_if_needed(path, false);
c_file.createBufferIfNeeded(path, false);

client_manager->replyToClient(tid, offset, c_file.get_buffer(), count);
client_manager->replyToClient(tid, offset, c_file.getBuffer(), count);
storage_manager->setFileOffset(tid, fd, offset + count);
} else {
LOG("Delegating to backend remote read");
Expand Down Expand Up @@ -151,8 +150,7 @@ inline void handle_read(int tid, int fd, off64_t count) {
LOG("File is local. handling local read");
handle_local_read(tid, fd, count, is_prod);
} else {
LOG("File is remote");
LOG("Delegating to backend remote read");
LOG("File is remote. Delegating to backend remote read");
request_remote_read(tid, fd, count);
}
}
Expand Down
4 changes: 2 additions & 2 deletions capio/server/include/handlers/seek.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ void handle_seek_data(int tid, int fd, off64_t offset) {
START_LOG(gettid(), "call(tid=%d, fd=%d, offset=%ld)", tid, fd, offset);

CapioFile &c_file = storage_manager->get(tid, fd);
offset = c_file.seek_data(offset);
offset = c_file.seekData(offset);
storage_manager->setFileOffset(tid, fd, offset);
client_manager->replyToClient(tid, offset);
}
Expand All @@ -35,7 +35,7 @@ inline void handle_seek_hole(int tid, int fd, off64_t offset) {
START_LOG(gettid(), "call(tid=%d, fd=%d, offset=%ld)", tid, fd, offset);

CapioFile &c_file = storage_manager->get(tid, fd);
offset = c_file.seek_hole(offset);
offset = c_file.seekHole(offset);
storage_manager->setFileOffset(tid, fd, offset);
client_manager->replyToClient(tid, offset);
}
Expand Down
14 changes: 7 additions & 7 deletions capio/server/include/handlers/stat.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ void wait_for_file_completion(int tid, const std::filesystem::path &path) {
CapioFile &c_file = storage_manager->get(path);

// if file is streamable
if (c_file.is_complete() || CapioCLEngine::get().isFirable(path) ||
if (c_file.complete() || CapioCLEngine::get().isFirable(path) ||
strcmp(std::get<0>(get_file_location(path)), node_name) == 0) {

client_manager->replyToClient(tid, c_file.get_file_size());
client_manager->replyToClient(tid, static_cast<int>(c_file.is_dir() ? 1 : 0));
client_manager->replyToClient(tid, c_file.getFileSize());
client_manager->replyToClient(tid, static_cast<int>(c_file.directory() ? 1 : 0));

} else {
handle_remote_stat_request(tid, path);
Expand Down Expand Up @@ -72,15 +72,15 @@ inline void reply_stat(int tid, const std::filesystem::path &path) {
LOG("File is now present from remote node. retrieving file again.");
file_location_opt = get_file_location_opt(path);
}
if (c_file.is_complete() || strcmp(std::get<0>(file_location_opt->get()), node_name) == 0 ||
if (c_file.complete() || strcmp(std::get<0>(file_location_opt->get()), node_name) == 0 ||
CapioCLEngine::get().isFirable(path) || capio_dir == path) {
LOG("Sending response to client");
client_manager->replyToClient(tid, c_file.get_file_size());
client_manager->replyToClient(tid, static_cast<int>(c_file.is_dir() ? 1 : 0));
client_manager->replyToClient(tid, c_file.getFileSize());
client_manager->replyToClient(tid, static_cast<int>(c_file.directory() ? 1 : 0));
} else {
LOG("Delegating backend to reply to remote stats");
// send a request for file. then start a thread to wait for the request completion
c_file.create_buffer_if_needed(path, false);
c_file.createBufferIfNeeded(path, false);
handle_remote_stat_request(tid, path);
}
}
Expand Down
2 changes: 1 addition & 1 deletion capio/server/include/handlers/unlink.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ void unlink_handler(const char *const str) {
const auto c_file_opt = storage_manager->tryGet(path);
if (c_file_opt) { // TODO: it works only in the local case
CapioFile &c_file = c_file_opt->get();
if (c_file.is_deletable()) {
if (c_file.deletable()) {
storage_manager->remove(path);
delete_from_files_location(path);
}
Expand Down
10 changes: 5 additions & 5 deletions capio/server/include/handlers/write.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@ void write_handler(const char *const str) {
off64_t end_of_write = offset + count;
const std::filesystem::path &path = storage_manager->getPath(tid, fd);
CapioFile &c_file = storage_manager->get(path);
off64_t file_shm_size = c_file.get_buf_size();
off64_t file_shm_size = c_file.getBufferSize();
SPSCQueue &data_buf = client_manager->getClientToServerDataBuffers(tid);

c_file.create_buffer_if_needed(path, true);
c_file.createBufferIfNeeded(path, true);
if (end_of_write > file_shm_size) {
c_file.expand_buffer(end_of_write);
c_file.expandBuffer(end_of_write);
}
c_file.read_from_queue(data_buf, offset, count);
c_file.readFromQueue(data_buf, offset, count);

client_manager->registerProducedFile(tid, path);
c_file.insert_sector(offset, end_of_write);
c_file.insertSector(offset, end_of_write);
if (c_file.first_write) {
c_file.first_write = false;
write_file_location(path);
Expand Down
34 changes: 17 additions & 17 deletions capio/server/include/remote/handlers/read.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,18 @@ inline void serve_remote_read(const std::filesystem::path &path, const std::stri
// Send all the rest of the file not only the number of bytes requested
// Useful for caching
CapioFile &c_file = storage_manager->get(path);
long int nbytes = c_file.get_stored_size() - offset;
long int nbytes = c_file.getStoredSize() - offset;
off64_t prefetch_data_size = get_prefetch_data_size();

if (prefetch_data_size != 0 && nbytes > prefetch_data_size) {
nbytes = prefetch_data_size;
}
const off64_t file_size = c_file.get_stored_size();
const off64_t file_size = c_file.getStoredSize();

// send request
serve_remote_read_request(tid, fd, count, nbytes, file_size, complete, is_getdents, dest);
// send data
backend->send_file(c_file.get_buffer() + offset, nbytes, dest);
backend->send_file(c_file.getBuffer() + offset, nbytes, dest);
}

inline void handle_read_reply(int tid, int fd, long count, off64_t file_size, off64_t nbytes,
Expand All @@ -45,11 +45,11 @@ inline void handle_read_reply(int tid, int fd, long count, off64_t file_size, of
CapioFile &c_file = storage_manager->get(path);
off64_t offset = storage_manager->getFileOffset(tid, fd);
c_file.real_file_size = file_size;
c_file.insert_sector(offset, offset + nbytes);
c_file.set_complete(complete);
c_file.insertSector(offset, offset + nbytes);
c_file.setComplete(complete);

off64_t end_of_sector = c_file.get_sector_end(offset);
c_file.create_buffer_if_needed(path, false);
off64_t end_of_sector = c_file.getSectorEnd(offset);
c_file.createBufferIfNeeded(path, false);
off64_t bytes_read;
off64_t end_of_read = offset + count;
if (end_of_sector > end_of_read) {
Expand All @@ -61,7 +61,7 @@ inline void handle_read_reply(int tid, int fd, long count, off64_t file_size, of
if (is_getdents) {
send_dirent_to_client(tid, fd, c_file, offset, bytes_read);
} else {
client_manager->replyToClient(tid, offset, c_file.get_buffer(), count);
client_manager->replyToClient(tid, offset, c_file.getBuffer(), count);
storage_manager->setFileOffset(tid, fd, offset + count);
}
}
Expand All @@ -74,8 +74,8 @@ void wait_for_data(const std::filesystem::path &path, const std::string &dest, i

const CapioFile &c_file = storage_manager->get(path);
// wait that nbytes are written
c_file.wait_for_data(offset + count);
serve_remote_read(path, dest, tid, fd, count, offset, c_file.is_complete(), is_getdents);
c_file.waitForData(offset + count);
serve_remote_read(path, dest, tid, fd, count, offset, c_file.complete(), is_getdents);
}

inline void handle_remote_read(const std::filesystem::path &path, const std::string &source,
Expand All @@ -85,9 +85,9 @@ inline void handle_remote_read(const std::filesystem::path &path, const std::str
path.c_str(), source.c_str(), tid, fd, count, offset, is_getdents ? "true" : "false");

CapioFile &c_file = storage_manager->get(path);
bool data_available = (offset + count <= c_file.get_stored_size());
if (c_file.is_complete() || (CapioCLEngine::get().isFirable(path) && data_available)) {
serve_remote_read(path, source, tid, fd, count, offset, c_file.is_complete(), is_getdents);
bool data_available = (offset + count <= c_file.getStoredSize());
if (c_file.complete() || (CapioCLEngine::get().isFirable(path) && data_available)) {
serve_remote_read(path, source, tid, fd, count, offset, c_file.complete(), is_getdents);
} else {
std::thread t(wait_for_data, path, source, tid, fd, count, offset, is_getdents);
t.detach();
Expand All @@ -107,14 +107,14 @@ inline void handle_remote_read_reply(const std::string &source, int tid, int fd,
off64_t offset = storage_manager->getFileOffset(tid, fd);
CapioFile &c_file = storage_manager->get(path);

c_file.create_buffer_if_needed(path, false);
c_file.createBufferIfNeeded(path, false);
if (nbytes != 0) {
auto file_shm_size = c_file.get_buf_size();
auto file_shm_size = c_file.getBufferSize();
auto file_size_recv = offset + nbytes;
if (file_size_recv > file_shm_size) {
c_file.expand_buffer(file_size_recv);
c_file.expandBuffer(file_size_recv);
}
c_file.read_from_node(source, offset, nbytes);
c_file.readFromNode(source, offset, nbytes);
nbytes *= sizeof(char);
}
handle_read_reply(tid, fd, count, file_size, nbytes, complete, is_getdents);
Expand Down
8 changes: 4 additions & 4 deletions capio/server/include/remote/handlers/stat.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ inline void serve_remote_stat(const std::filesystem::path &path, const std::stri
source_tid);

const CapioFile &c_file = storage_manager->get(path);
off64_t file_size = c_file.get_file_size();
bool is_dir = c_file.is_dir();
off64_t file_size = c_file.getFileSize();
bool is_dir = c_file.directory();
serve_remote_stat_request(path, source_tid, file_size, is_dir, dest);
}

Expand All @@ -26,7 +26,7 @@ void wait_for_completion(const std::filesystem::path &path, int source_tid,
dest.c_str());

const CapioFile &c_file = storage_manager->get(path);
c_file.wait_for_completion();
c_file.waitForCompletion();
LOG("File %s has been completed. serving stats data", path.c_str());
serve_remote_stat(path, dest, source_tid);
}
Expand All @@ -39,7 +39,7 @@ inline void handle_remote_stat(int source_tid, const std::filesystem::path &path
const auto c_file = storage_manager->tryGet(path);
if (c_file) {
LOG("File %s is present on capio file system", path.c_str());
if (c_file->get().is_complete() || CapioCLEngine::get().isFirable(path)) {
if (c_file->get().complete() || CapioCLEngine::get().isFirable(path)) {
LOG("file is complete. serving file");
serve_remote_stat(path, dest, source_tid);
} else { // wait for completion
Expand Down
Loading
Loading