Skip to content

Commit

Permalink
dw_store: Incapsulated storage infos in view of multi-disk (issue tom…
Browse files Browse the repository at this point in the history
…cucinotta#20), advanced disk operations (issue tomcucinotta#21)
  • Loading branch information
deRemo committed Sep 8, 2023
1 parent 4ae69c4 commit 811fbff
Showing 1 changed file with 40 additions and 24 deletions.
64 changes: 40 additions & 24 deletions src/dw_node.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ typedef struct {
int core_id; // core pinning
} thread_info_t;

typedef struct {
int storage_fd;

size_t max_storage_size;
size_t storage_offset; //TODO: mutual exclusion here to avoid race conditions in per-client thread mode
size_t storage_eof; //TODO: same here
} storage_info_t;

#define MAX_CONNS 16

conn_info_t conns[MAX_CONNS];
Expand All @@ -69,6 +77,8 @@ conn_info_t conns[MAX_CONNS];
pthread_t workers[MAX_THREADS];
thread_info_t thread_infos[MAX_THREADS];

storage_info_t storage_info;

typedef struct {
in_addr_t inaddr; // target IP
uint16_t port; // target port (for multiple nodes on same host)
Expand Down Expand Up @@ -410,42 +420,42 @@ void compute_for(unsigned long usecs) {

unsigned long blk_size = 0;

void store(int conn_id, size_t bytes) {
void store(storage_info_t* storage_info, unsigned char* buf, size_t bytes) {
// generate the data to be stored
if (use_odirect) bytes = (bytes + blk_size - 1) / blk_size * blk_size;
cw_log("STORE: storing %lu bytes\n", bytes);

//write, otherwise over-write
if (storage_offset + bytes > max_storage_size) {
lseek(storage_fd, 0, SEEK_SET);
storage_offset = 0;
if (storage_info->storage_offset + bytes > storage_info->max_storage_size) {
lseek(storage_info->storage_fd, 0, SEEK_SET);
storage_info->storage_offset = 0;
}

safe_write(storage_fd, conns[conn_id].store_buf, bytes);
safe_write(storage_info->storage_fd, buf, bytes);

storage_offset += bytes;
storage_info->storage_offset += bytes;

fsync(storage_fd);
fsync(storage_info->storage_fd);

if (storage_offset > storage_eof) {
storage_eof = storage_offset;
if (storage_info->storage_offset > storage_info->storage_eof) {
storage_info->storage_eof = storage_info->storage_offset;

if (storage_eof > max_storage_size) {
storage_eof = max_storage_size;
if (storage_info->storage_eof > storage_info->max_storage_size) {
storage_info->storage_eof = storage_info->max_storage_size;
}
}
}

void load(int conn_id, size_t bytes, size_t* leftovers) {
void load(storage_info_t* storage_info, unsigned char* buf, size_t bytes, size_t* leftovers) {
cw_log("LOAD: loading %lu bytes\n", bytes);

if (storage_offset + bytes > storage_eof){
lseek(storage_fd, 0, SEEK_SET);
storage_offset = 0;
if (storage_info->storage_offset + bytes > storage_info->storage_eof){
lseek(storage_info->storage_fd, 0, SEEK_SET);
storage_info->storage_offset = 0;
}

*leftovers = safe_read(storage_fd, conns[conn_id].store_buf, bytes);
storage_offset += bytes;
*leftovers = safe_read(storage_info->storage_fd, buf, bytes);
storage_info->storage_offset += bytes;
}

void close_and_forget(int epollfd, int sock) {
Expand Down Expand Up @@ -506,14 +516,14 @@ int process_messages(int conn_id) {
fprintf(stderr, "Error: Cannot execute STORE cmd because no storage path has been defined\n");
exit(EXIT_FAILURE);
}
store(conn_id, m->cmds[i].u.store_nbytes);
store(&storage_info, conns[conn_id].store_buf, m->cmds[i].u.store_nbytes);
} else if (m->cmds[i].cmd == LOAD) {
if (!storage_path) {
fprintf(stderr, "Error: Cannot execute LOAD cmd because no storage path has been defined\n");
exit(EXIT_FAILURE);
}
size_t leftovers;
load(conn_id, m->cmds[i].u.load_nbytes, &leftovers);
load(&storage_info, conns[conn_id].store_buf, m->cmds[i].u.load_nbytes, &leftovers);
} else {
fprintf(stderr, "Error: Unknown cmd: %d\n", m->cmds[0].cmd);
return 0;
Expand Down Expand Up @@ -831,6 +841,12 @@ int main(int argc, char *argv[]) {
cpu_set_t mask;
struct sockaddr_in serverAddr;

// Storage info defaults
storage_info.storage_fd = -1;
storage_info.max_storage_size = MAX_STORAGE_SIZE;
storage_info.storage_offset = 0; //TODO: mutual exclusion here to avoid race conditions in per-client thread mode
storage_info.storage_eof = 0; //TODO: same here

argc--;
argv++;
while (argc > 0) {
Expand Down Expand Up @@ -866,7 +882,7 @@ int main(int argc, char *argv[]) {
} else if (strcmp(argv[0], "-m") == 0 ||
strcmp(argv[0], "--max-storage-size") == 0) {
assert(argc >= 2);
max_storage_size = atoi(argv[1]);
storage_info.max_storage_size = atoi(argv[1]);
argc--;
argv++;
} else if (strcmp(argv[0], "--threads") == 0) {
Expand Down Expand Up @@ -918,9 +934,9 @@ int main(int argc, char *argv[]) {
if (storage_path) {
int flags = O_RDWR | O_CREAT | O_TRUNC;
if (use_odirect) flags |= O_DIRECT;
sys_check(storage_fd = open(storage_path, flags, S_IRUSR | S_IWUSR));
sys_check(storage_info.storage_fd = open(storage_path, flags, S_IRUSR | S_IWUSR));
struct stat s;
sys_check(fstat(storage_fd, &s));
sys_check(fstat(storage_info.storage_fd, &s));
blk_size = s.st_blksize;
cw_log("blk_size = %lu\n", blk_size);
}
Expand Down Expand Up @@ -1012,8 +1028,8 @@ int main(int argc, char *argv[]) {
}

// termination clean-ups
if (storage_fd >= 0) {
close(storage_fd);
if (storage_info.storage_fd >= 0) {
close(storage_info.storage_fd);
}

return 0;
Expand Down

0 comments on commit 811fbff

Please sign in to comment.