From 9d11a2bfea37829a4543f82d0afe2a665a5b729e Mon Sep 17 00:00:00 2001 From: Magnus Feuer Date: Thu, 26 Sep 2019 12:31:48 -0700 Subject: [PATCH] Port to clang (#16) * Builds with clang and gcc. Passes tests * Changed socket non-block code to portable variant * Replaced accept4() with accept() for posix compatability. * Second attempt at cleaning out accept4() * Replaced boot time clock with monotonic clock for POSIX compatability. --- list_test.c | 32 +++++------ pub.c | 134 +++++++++++++++++++++---------------------- reliable_multicast.h | 7 --- rmc_connection.c | 19 ++++-- rmc_list.h | 27 ++++++--- rmc_list_template.h | 40 ++++++++----- rmc_proto_test_pub.c | 70 ++++++++++++---------- rmc_pub_context.c | 6 +- rmc_pub_packet.c | 38 ++++++------ rmc_pub_read.c | 11 ++-- rmc_pub_write.c | 2 +- rmc_sub_context.c | 2 +- rmc_sub_packet.c | 35 ++++++----- rmc_sub_read.c | 12 ++-- sub.c | 48 +++++++++------- time.c | 2 +- 16 files changed, 264 insertions(+), 221 deletions(-) diff --git a/list_test.c b/list_test.c index e2e5150..2deac43 100644 --- a/list_test.c +++ b/list_test.c @@ -95,7 +95,7 @@ static uint8_t _test_sequence(test_list* list, test_id_t start, test_id_t stop) return 0; } -static int _compare_pid(test_id_t existing_pid, test_id_t new_pid) +static int _compare_pid(test_id_t existing_pid, test_id_t new_pid, void* user_data) { if (existing_pid > new_pid) return 1; @@ -107,7 +107,7 @@ static int _compare_pid(test_id_t existing_pid, test_id_t new_pid) } -static int _find_pid(test_id_t pid1, test_id_t pid2) +static int _find_pid(test_id_t pid1, test_id_t pid2, void* user_data) { return pid1 == pid2; @@ -224,41 +224,41 @@ void run_list_tests() } - test_list_insert_sorted(&p1, 2, _compare_pid); + test_list_insert_sorted(&p1, 2, _compare_pid, 0); if (_test_sequence(&p1, 2, 2)) { puts("Failed list test 4.1.\n"); exit(255); } - test_list_insert_sorted(&p1, 1, _compare_pid); + test_list_insert_sorted(&p1, 1, _compare_pid, 0); if (_test_sequence(&p1, 2, 1)) { puts("Failed list test 4.2.\n"); exit(255); } - test_list_insert_sorted(&p1, 3, _compare_pid); + test_list_insert_sorted(&p1, 3, _compare_pid, 0); if (_test_sequence(&p1, 3, 1)) { puts("Failed list test 4.3.\n"); exit(255); } - test_list_insert_sorted(&p1, 7, _compare_pid); - test_list_insert_sorted(&p1, 6, _compare_pid); - test_list_insert_sorted(&p1, 5, _compare_pid); + test_list_insert_sorted(&p1, 7, _compare_pid, 0); + test_list_insert_sorted(&p1, 6, _compare_pid, 0); + test_list_insert_sorted(&p1, 5, _compare_pid, 0); - test_list_insert_sorted(&p1, 4, _compare_pid); + test_list_insert_sorted(&p1, 4, _compare_pid, 0); if (_test_sequence(&p1, 7, 1)) { puts("Failed list test 4.4.\n"); exit(255); } - test_list_insert_sorted(&p1, 8, _compare_pid); + test_list_insert_sorted(&p1, 8, _compare_pid, 0); if (_test_sequence(&p1, 8, 1)) { puts("Failed list test 4.5.\n"); exit(255); } - node = test_list_find_node(&p1, 1, _find_pid); + node = test_list_find_node(&p1, 1, _find_pid, 0); if (node->data != 1) { puts("Failed list test 5.1.\n"); @@ -267,13 +267,13 @@ void run_list_tests() test_list_delete(node); - node = test_list_find_node(&p1, 1, _find_pid); + node = test_list_find_node(&p1, 1, _find_pid, 0); if (node) { puts("Failed list test 5.2.\n"); exit(255); } - node = test_list_find_node(&p1, 2, _find_pid); + node = test_list_find_node(&p1, 2, _find_pid, 0); if (node->data != 2) { puts("Failed list test 5.3.\n"); exit(255); @@ -281,13 +281,13 @@ void run_list_tests() test_list_delete(node); - node = test_list_find_node(&p1, 2, _find_pid); + node = test_list_find_node(&p1, 2, _find_pid, 0); if (node) { puts("Failed list test 5.4.\n"); exit(255); } - node = test_list_find_node(&p1, 7, _find_pid); + node = test_list_find_node(&p1, 7, _find_pid, 0); if (node->data != 7) { puts("Failed list test 5.5.\n"); exit(255); @@ -295,7 +295,7 @@ void run_list_tests() test_list_delete(node); - node = test_list_find_node(&p1, 7, _find_pid); + node = test_list_find_node(&p1, 7, _find_pid, 0); if (node) { puts("Failed list test 5.6.\n"); exit(255); diff --git a/pub.c b/pub.c index 9019be6..b483f18 100644 --- a/pub.c +++ b/pub.c @@ -65,6 +65,11 @@ void pub_init_subscriber(pub_subscriber_t* sub, pub_context_t* ctx, user_data_t pub_sub_list_push_tail(&ctx->subscribers, sub); } +static int _find_subcriber(pub_subscriber_t* a, pub_subscriber_t* b, void* user_data) +{ + return a == b; +} + // Clean up all pending data. void pub_reset_subscriber(pub_subscriber_t* sub, @@ -82,14 +87,26 @@ void pub_reset_subscriber(pub_subscriber_t* sub, snode = pub_sub_list_find_node(&sub->context->subscribers, sub, - lambda(int, (pub_subscriber_t* a, pub_subscriber_t* b) { - return a == b; - })); + _find_subcriber, + 0); assert(snode); pub_sub_list_delete(snode); } +static int _compare_pid(pub_packet_t* new_pack, pub_packet_t* existing_pack, void* user_data) +{ + if (new_pack->pid > existing_pack->pid) + return 1; + + if (new_pack->pid < existing_pack->pid) + return -1; + + return 0; +} + + + static packet_id_t pub_queue_packet_with_pid(pub_context_t* ctx, packet_id_t pid, void* payload, @@ -116,16 +133,8 @@ static packet_id_t pub_queue_packet_with_pid(pub_context_t* ctx, ppack->parent_node = pub_packet_list_insert_sorted(&ctx->queued, ppack, - lambda(int, (pub_packet_t* new_pack, pub_packet_t* existing_pack) { - if (new_pack->pid > existing_pack->pid) - return 1; - - if (new_pack->pid < existing_pack->pid) - return -1; - - return 0; - } - )); + _compare_pid, + 0); return ppack->pid; } @@ -203,16 +212,7 @@ void pub_packet_sent(pub_context_t* ctx, // Sorted on ascending pid. pub_packet_list_insert_sorted_node(&ctx->inflight, pack->parent_node, - lambda(int, (pub_packet_t* new_pack, pub_packet_t* existing_pack) { - if (new_pack->pid > existing_pack->pid) - return 1; - - if (new_pack->pid < existing_pack->pid) - return -1; - - return 0; - } - )); + _compare_pid, 0); // Traverse all subscribers and insert pack into their // inflight list. @@ -225,17 +225,8 @@ void pub_packet_sent(pub_context_t* ctx, // Insert the new pub_packet_t in the descending // packet_id sorted list of the subscriber's inflight packets. pub_packet_list_insert_sorted(&sub->inflight, - pack, - lambda(int, (pub_packet_t* new_pack, pub_packet_t* existing_pack) { - if (new_pack->pid < existing_pack->pid) - return -1; - - if (new_pack->pid > existing_pack->pid) - return 1; - - return 0; - } - )); + pack, + _compare_pid, 0); pack->ref_count++; sub_node = pub_sub_list_next(sub_node); } @@ -311,17 +302,18 @@ void pub_get_timed_out_subscribers(pub_context_t* ctx, usec_timestamp_t timeout_period, // Number of usecs until timeout pub_sub_list_t* result) { - // Traverse all subscribers. - pub_sub_list_for_each(&ctx->subscribers, - // For each subscriber, check if their oldest inflight packet has a sent_ts - // timestamp older than max_age. If so, add the subscriber to result. - lambda(uint8_t, (pub_sub_node_t* sub_node, void* udata) { - if (pub_packet_list_size(&sub_node->data->inflight) && - pub_packet_list_tail(&sub_node->data->inflight)->data->send_ts + timeout_period <= current_ts) - pub_sub_list_push_tail(result, sub_node->data); - return 1; - }), 0); + pub_sub_node_t* sub_node = pub_sub_list_head(&ctx->subscribers); + + // Traverse all subscribers. + while(sub_node) { + // For each subscriber, check if their oldest inflight packet has a sent_ts + // timestamp older than max_age. If so, add the subscriber to result. + if (pub_packet_list_size(&sub_node->data->inflight) && + pub_packet_list_tail(&sub_node->data->inflight)->data->send_ts + timeout_period <= current_ts) + pub_sub_list_push_tail(result, sub_node->data); + sub_node = pub_sub_list_next(sub_node); + } } @@ -330,17 +322,16 @@ void pub_get_timed_out_packets(pub_subscriber_t* sub, usec_timestamp_t timeout_period, // Number of usecs until timeout pub_packet_list_t* result) { + pub_packet_node_t* pack_node = pub_packet_list_head(&sub->inflight); + // Traverse all inflight packets for subscriber until we find one that is not timed out.x - pub_packet_list_for_each_rev(&sub->inflight, - // For each packet, check if their oldest inflight packet has a sent_ts - // timestamp older than max_age. If so, add it to result. - lambda(uint8_t, (pub_packet_node_t* pnode, void* udata) { - if (pnode->data->send_ts + timeout_period <= current_ts) { - pub_packet_list_push_tail(result, pnode->data); - return 1; - } - return 0; - }), 0); + while(pack_node && + pack_node->data->send_ts + timeout_period <= current_ts) { + pub_packet_list_push_tail(result, pack_node->data); + pack_node = pub_packet_list_next(pack_node); + } + + return; } @@ -349,26 +340,31 @@ void pub_get_timed_out_packets(pub_subscriber_t* sub, int pub_get_oldest_unackowledged_packet(pub_context_t* ctx, usec_timestamp_t* timeout_ack) { usec_timestamp_t oldest = -1; + pub_sub_node_t* sub_node = 0; if (!ctx || !timeout_ack) return 0; + sub_node = pub_sub_list_head(&ctx->subscribers); + + // Traverse all subscribers. - pub_sub_list_for_each(&ctx->subscribers, - // Check if the oldest inflight packet of this subscriber is older - // than the oldest inflight packet found so far. - lambda(uint8_t, (pub_sub_node_t* sub_node, void* udata) { - pub_packet_list_t* lst = &sub_node->data->inflight; - pub_packet_t* pack = 0; - if (!pub_packet_list_size(lst)) - return 1; - - pack = pub_packet_list_tail(lst)->data; - if (oldest == -1 || pack->send_ts < oldest) { - oldest = pack->send_ts; - } - return 1; - }), 0); + while(sub_node) { + // Check if the oldest inflight packet of this subscriber is older + // than the oldest inflight packet found so far. + pub_packet_list_t* lst = &sub_node->data->inflight; + pub_packet_t* pack = 0; + if (!pub_packet_list_size(lst)) { + sub_node = pub_sub_list_next(sub_node); + continue; + } + pack = pub_packet_list_tail(lst)->data; + if (oldest == -1 || pack->send_ts < oldest) { + oldest = pack->send_ts; + } + sub_node = pub_sub_list_next(sub_node); + } + *timeout_ack = oldest; return 1; diff --git a/reliable_multicast.h b/reliable_multicast.h index 4ddd2a2..c590b85 100644 --- a/reliable_multicast.h +++ b/reliable_multicast.h @@ -37,13 +37,6 @@ typedef union { #define user_data_i32(_i32) ((user_data_t) { .i32 = _i32 }) #define user_data_ptr(_ptr) ((user_data_t) { .ptr = _ptr }) -// Used for iterators etc. -#define lambda(return_type, function_body) \ - ({ \ - return_type __fn__ function_body \ - __fn__; \ - }) - extern usec_timestamp_t rmc_usec_monotonic_timestamp(void); diff --git a/rmc_connection.c b/rmc_connection.c index 8df8c40..73730e2 100644 --- a/rmc_connection.c +++ b/rmc_connection.c @@ -15,6 +15,7 @@ #include #include +#include #include #include #include @@ -262,7 +263,7 @@ int rmc_conn_connect_tcp_by_address(rmc_connection_vector_t* conn_vec, rmc_index_t c_ind = RMC_NIL_INDEX; int res = 0; struct sockaddr_in sock_addr; - + int desc_flags = 0; assert(conn_vec); sock_addr = (struct sockaddr_in) { @@ -277,11 +278,14 @@ int rmc_conn_connect_tcp_by_address(rmc_connection_vector_t* conn_vec, return ENOMEM; - conn_vec->connections[c_ind].descriptor = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0); - + conn_vec->connections[c_ind].descriptor = socket(AF_INET, SOCK_STREAM, 0); if (conn_vec->connections[c_ind].descriptor == -1) return errno; + // Set nonblocking mode on the socket. + desc_flags = fcntl(conn_vec->connections[c_ind].descriptor, F_GETFL, 0); + fcntl(conn_vec->connections[c_ind].descriptor, F_SETFL, desc_flags | O_NONBLOCK); + RMC_LOG_INDEX_COMMENT(c_ind, "Connecting to control tcp addr[%s:%d]", inet_ntoa(sock_addr.sin_addr), ntohs(sock_addr.sin_port)); res = connect(conn_vec->connections[c_ind].descriptor, @@ -338,6 +342,7 @@ int rmc_conn_process_accept(int listen_descriptor, rmc_index_t c_ind = -1; int tr = 1; int sock_err = 0; + int desc_flags = 0; // Find a free slot. c_ind = _get_free_slot(conn_vec); @@ -345,13 +350,17 @@ int rmc_conn_process_accept(int listen_descriptor, if (c_ind == RMC_NIL_INDEX) return ENOMEM; - conn_vec->connections[c_ind].descriptor = accept4(listen_descriptor, + conn_vec->connections[c_ind].descriptor = accept(listen_descriptor, (struct sockaddr*) &src_addr, - &addr_len, SOCK_NONBLOCK); + &addr_len); if (conn_vec->connections[c_ind].descriptor == -1) return errno; + // Set nonblocking mode on the socket. + desc_flags = fcntl(conn_vec->connections[c_ind].descriptor, F_GETFL, 0); + fcntl(conn_vec->connections[c_ind].descriptor, F_SETFL, desc_flags | O_NONBLOCK); + RMC_LOG_INDEX_COMMENT(c_ind, "%s:%d assigned to index %d", inet_ntoa(src_addr.sin_addr), ntohs(src_addr.sin_port), c_ind); diff --git a/rmc_list.h b/rmc_list.h index 95051e7..de9c6d8 100644 --- a/rmc_list.h +++ b/rmc_list.h @@ -78,29 +78,42 @@ extern NODETYPE* LISTTYPE##_find_node(LISTTYPE* list, \ DATATYPE data, \ int (*compare_func)(DATATYPE needle, \ - DATATYPE haystack)); \ + DATATYPE haystack, \ + void* user_data), \ + void* user_data); \ \ extern NODETYPE* LISTTYPE##_find_node_rev(LISTTYPE* list, \ DATATYPE data, \ int (*compare_func)(DATATYPE needle, \ - DATATYPE haystack)); \ + DATATYPE haystack, \ + void* user_data), \ + void* user_data); \ + \ extern NODETYPE* LISTTYPE##_insert_sorted(LISTTYPE* list, \ DATATYPE new_elem, \ int (*compare_func)(DATATYPE new_elem, \ - DATATYPE existing_elem)); \ - \ + DATATYPE existing_elem, \ + void* user_data), \ + void* user_data); \ extern NODETYPE* LISTTYPE##_insert_sorted_rev(LISTTYPE* list, \ DATATYPE new_elem, \ int (*compare_func)(DATATYPE new_elem, \ - DATATYPE existing_elem)); \ + DATATYPE existing_elem, \ + void* user_data), \ + void* user_data); \ \ extern NODETYPE* LISTTYPE##_insert_sorted_node(LISTTYPE* list, \ NODETYPE* node, \ int (*compare_func)(DATATYPE new_elem, \ - DATATYPE existing_elem)); \ + DATATYPE existing_elem, \ + void* user_data), \ + void* user_data); \ + \ extern NODETYPE* LISTTYPE##_insert_sorted_node_rev(LISTTYPE* list, \ NODETYPE* node, \ int (*compare_func)(DATATYPE new_elem, \ - DATATYPE existing_elem)); \ + DATATYPE existing_elem, \ + void* user_data), \ + void* user_data); #endif // __RMC_LIST_H__ diff --git a/rmc_list_template.h b/rmc_list_template.h index 335921e..b075d12 100644 --- a/rmc_list_template.h +++ b/rmc_list_template.h @@ -306,7 +306,9 @@ NODETYPE* LISTTYPE##_find_node(LISTTYPE* list, \ DATATYPE data, \ int (*compare_func)(DATATYPE needle, \ - DATATYPE haystack)) \ + DATATYPE haystack, \ + void* user_data), \ + void* user_data) \ { \ NODETYPE* node = 0; \ \ @@ -314,7 +316,7 @@ node = LISTTYPE##_head(list); \ \ while(node) { \ - int res = (*compare_func)(data, node->data); \ + int res = (*compare_func)(data, node->data, user_data); \ \ if (res == -1) \ return 0; \ @@ -328,9 +330,11 @@ } \ \ NODETYPE* LISTTYPE##_find_node_rev(LISTTYPE* list, \ - DATATYPE data, \ - int (*compare_func)(DATATYPE needle, \ - DATATYPE haystack)) \ + DATATYPE data, \ + int (*compare_func)(DATATYPE needle, \ + DATATYPE haystack, \ + void* user_data), \ + void* user_data) \ { \ NODETYPE* node = 0; \ \ @@ -338,7 +342,7 @@ node = LISTTYPE##_tail(list); \ \ while(node) { \ - int res = (*compare_func)(data, node->data); \ + int res = (*compare_func)(data, node->data, user_data); \ \ if (res == -1) \ return 0; \ @@ -355,7 +359,9 @@ NODETYPE* LISTTYPE##_insert_sorted_node(LISTTYPE* list, \ NODETYPE* new_node, \ int (*compare_func)(DATATYPE new_elem, \ - DATATYPE existing_elem)) \ + DATATYPE existing_elem, \ + void* user_data), \ + void* user_data) \ { \ NODETYPE* node = 0; \ \ @@ -363,7 +369,7 @@ node = LISTTYPE##_head(list); \ \ while(node) { \ - if ((*compare_func)(new_node->data, node->data) >= 0) { \ + if ((*compare_func)(new_node->data, node->data, user_data) >= 0) { \ return LISTTYPE##_insert_before_node(node, new_node); \ } \ node = LISTTYPE##_next(node); \ @@ -375,7 +381,9 @@ NODETYPE* LISTTYPE##_insert_sorted_node_rev(LISTTYPE* list, \ NODETYPE* new_node, \ int (*compare_func)(DATATYPE new_elem, \ - DATATYPE existing_elem)) \ + DATATYPE existing_elem, \ + void* user_data), \ + void* user_data) \ { \ NODETYPE* node = 0; \ \ @@ -383,7 +391,7 @@ node = LISTTYPE##_tail(list); \ \ while(node) { \ - if ((*compare_func)(new_node->data, node->data) >= 0) { \ + if ((*compare_func)(new_node->data, node->data, user_data) >= 0) { \ return LISTTYPE##_insert_after_node(node, new_node); \ } \ node = LISTTYPE##_prev(node); \ @@ -396,7 +404,9 @@ NODETYPE* LISTTYPE##_insert_sorted(LISTTYPE* list, \ DATATYPE new_elem, \ int (*compare_func)(DATATYPE new_elem, \ - DATATYPE existing_elem)) \ + DATATYPE existing_elem, \ + void* user_data), \ + void* user_data) \ { \ NODETYPE* new_node = 0; \ \ @@ -406,13 +416,15 @@ assert(new_node); \ \ new_node->data = new_elem; \ - return LISTTYPE##_insert_sorted_node(list, new_node, compare_func); \ + return LISTTYPE##_insert_sorted_node(list, new_node, compare_func, user_data); \ } \ \ NODETYPE* LISTTYPE##_insert_sorted_rev(LISTTYPE* list, \ DATATYPE new_elem, \ int (*compare_func)(DATATYPE new_elem, \ - DATATYPE existing_elem)) \ + DATATYPE existing_elem, \ + void* user_data), \ + void* user_data) \ { \ NODETYPE* new_node = 0; \ assert(list); \ @@ -421,7 +433,7 @@ assert(new_node); \ \ new_node->data = new_elem; \ - return LISTTYPE##_insert_sorted_node_rev(list, new_node, compare_func); \ + return LISTTYPE##_insert_sorted_node_rev(list, new_node, compare_func, user_data); \ } \ \ inline void LISTTYPE##_empty(LISTTYPE* list) \ diff --git a/rmc_proto_test_pub.c b/rmc_proto_test_pub.c index a57b911..71cf0ce 100644 --- a/rmc_proto_test_pub.c +++ b/rmc_proto_test_pub.c @@ -125,6 +125,7 @@ void queue_test_data(rmc_pub_context_t* ctx, uint8_t* payload, int payload_len, // Patch node with the correct pid. // Find the correct payload and update its pid /* TO BE REINTRODUCED WHEN WE DO PACKET FLIPPING ON SENDER SIDE + Replace lambda() with a static function in this file and provide a pointer to it. pub_packet_list_for_each(&ctx->pub_ctx.queued, lambda (uint8_t, (pub_packet_node_t* node, void* dt) { pub_packet_t *pack = node->data; @@ -144,7 +145,39 @@ void queue_test_data(rmc_pub_context_t* ctx, uint8_t* payload, int payload_len, } +// Shared between _subscriber_connect_callback() and test_rmc_proto_pub() +int subscriber_count = 0; +static uint8_t _subscriber_connect_callback(rmc_pub_context_t*ctx, + uint32_t remote_addr, + in_port_t remote_port) +{ + char* addr_str = inet_ntoa( (struct in_addr) { .s_addr = htonl( remote_addr) }); + RMC_LOG_INFO("Subscriber [%s:%d] connected", + addr_str, + remote_port); + if (!subscriber_count) + rmc_log_set_start_time(); + + subscriber_count++; + return 1; +} +static void _subscriber_disconnect_callback(rmc_pub_context_t*ctx, + uint32_t remote_addr, + in_port_t remote_port) +{ + char* addr_str = inet_ntoa( (struct in_addr) { .s_addr = htonl( remote_addr) }); + RMC_LOG_INFO("Subscriber [%s:%d] disconnected", + addr_str, + remote_port); + subscriber_count--; + return; +} + +static void _free_payload(void* pl, payload_len_t len, user_data_t dt) +{ + free(pl); +} void test_rmc_proto_pub(char* mcast_group_addr, char* mcast_if_addr, @@ -165,7 +198,6 @@ void test_rmc_proto_pub(char* mcast_group_addr, uint64_t signal_ind = 0; uint64_t packet_ind = 0; uint8_t *conn_vec_mem = 0; - int subscriber_count = 0; usec_timestamp_t current_ts = 0; @@ -188,36 +220,12 @@ void test_rmc_proto_pub(char* mcast_group_addr, (user_data_t) { .i32 = epollfd }, poll_add, poll_modify, poll_remove, RMC_MAX_CONNECTIONS, - lambda(void, (void* pl, payload_len_t len, user_data_t dt) { free(pl); })); - - - rmc_pub_set_subscriber_connect_callback(ctx, - lambda(uint8_t, (rmc_pub_context_t*ctx, - uint32_t remote_addr, - in_port_t remote_port) { - char* addr_str = inet_ntoa( (struct in_addr) { .s_addr = htonl( remote_addr) }); - RMC_LOG_INFO("Subscriber [%s:%d] connected", - addr_str, - remote_port); - if (!subscriber_count) - rmc_log_set_start_time(); - - subscriber_count++; - return 1; - })); - - - rmc_pub_set_subscriber_disconnect_callback(ctx, - lambda(void, (rmc_pub_context_t*ctx, - uint32_t remote_addr, - in_port_t remote_port) { - char* addr_str = inet_ntoa( (struct in_addr) { .s_addr = htonl( remote_addr) }); - RMC_LOG_INFO("Subscriber [%s:%d] disconnected", - addr_str, - remote_port); - subscriber_count--; - return; - })); + _free_payload); + + + rmc_pub_set_subscriber_connect_callback(ctx, _subscriber_connect_callback); + rmc_pub_set_subscriber_disconnect_callback(ctx, _subscriber_disconnect_callback); + // Send an announcement every 0.3 second. rmc_pub_set_announce_interval(ctx, 300000); diff --git a/rmc_pub_context.c b/rmc_pub_context.c index e279d73..4c77812 100644 --- a/rmc_pub_context.c +++ b/rmc_pub_context.c @@ -284,7 +284,7 @@ int rmc_pub_deactivate_context(rmc_pub_context_t* ctx) rmc_conn_get_max_index_in_use(&ctx->conn_vec, &max); - if (max != -1) { + if (max != RMC_NIL_INDEX) { for(ind = 0; ind <= max; ++ind) { rmc_connection_t* conn = rmc_conn_find_by_index(&ctx->conn_vec, ind); @@ -468,8 +468,8 @@ uint32_t rmc_pub_get_socket_count(rmc_pub_context_t* ctx) return 0; return rmc_pub_get_subscriber_count(ctx) + - (ctx->mcast_send_descriptor != -1)?1:0 + - (ctx->listen_descriptor != -1 )?1:0; + ((ctx->mcast_send_descriptor != -1)?1:0) + + ((ctx->listen_descriptor != -1 )?1:0); } int rmc_pub_throttling(rmc_pub_context_t* ctx, diff --git a/rmc_pub_packet.c b/rmc_pub_packet.c index 389a3fe..98ae2a1 100644 --- a/rmc_pub_packet.c +++ b/rmc_pub_packet.c @@ -86,28 +86,30 @@ int rmc_pub_traffic_suspended(rmc_pub_context_t* ctx) return ctx->traffic_suspended?EBUSY:0; } + +static void _payload_free_generic(void* payload, payload_len_t payload_len, user_data_t user_data) +{ + free(payload); +} + int rmc_pub_packet_ack(rmc_pub_context_t* ctx, rmc_connection_t* conn, packet_id_t pid) { pub_packet_ack(&ctx->subscribers[conn->connection_index], pid, // packet free function invoked when the last subscriber acks the packet. - // Also resumes suspende traffic - lambda(void, (void* payload, payload_len_t payload_len, user_data_t user_data) { - // If we currently have suspended traffic, - // check if we are to resume it. - if (ctx->traffic_suspended) { - if (pub_get_unacknowledged_packet_count(&ctx->pub_ctx) <= ctx->traffic_resume_threshold) { - RMC_LOG_INFO("Resuming traffic"); - ctx->traffic_suspended = 0; - } else - RMC_LOG_DEBUG("Still suspended unacked[%d] threshold[%d]", - pub_get_unacknowledged_packet_count(&ctx->pub_ctx), - ctx->traffic_resume_threshold); - } - if (ctx->payload_free) - (*ctx->payload_free)(payload, payload_len, user_data); - else - free(payload); - })); + ctx->payload_free?ctx->payload_free:_payload_free_generic); + + // If we currently have suspended traffic, + // check if we are to resume it. + if (ctx->traffic_suspended) { + if (pub_get_unacknowledged_packet_count(&ctx->pub_ctx) <= ctx->traffic_resume_threshold) { + RMC_LOG_INFO("Resuming traffic"); + ctx->traffic_suspended = 0; + } else + RMC_LOG_DEBUG("Still suspended unacked[%d] threshold[%d]", + pub_get_unacknowledged_packet_count(&ctx->pub_ctx), + ctx->traffic_resume_threshold); + } + return 0; } diff --git a/rmc_pub_read.c b/rmc_pub_read.c index 79aa79f..68c22b9 100644 --- a/rmc_pub_read.c +++ b/rmc_pub_read.c @@ -82,6 +82,10 @@ static int process_control_message(rmc_connection_t* conn, user_data_t user_data return 0; } +static void _generic_payload_free(void* payload, payload_len_t payload_len, user_data_t user_data) +{ + free(payload); +} int rmc_pub_close_connection(rmc_pub_context_t* ctx, rmc_index_t s_ind) { @@ -108,12 +112,7 @@ int rmc_pub_close_connection(rmc_pub_context_t* ctx, rmc_index_t s_ind) RMC_LOG_INDEX_INFO(s_ind, "rmc_pub_close_connection() - ok"); pub_reset_subscriber(&ctx->subscribers[s_ind], - lambda(void, (void* payload, payload_len_t payload_len, user_data_t user_data) { - if (ctx->payload_free) - (*ctx->payload_free)(payload, payload_len, user_data); - else - free(payload); - })); + ctx->payload_free?ctx->payload_free:_generic_payload_free); return 0; } diff --git a/rmc_pub_write.c b/rmc_pub_write.c index b6a52e9..c16e524 100644 --- a/rmc_pub_write.c +++ b/rmc_pub_write.c @@ -333,7 +333,7 @@ int rmc_pub_context_get_pending(rmc_pub_context_t* ctx, rmc_conn_get_max_index_in_use(&ctx->conn_vec, &max_ind); // If we have no subscribers, just return immediately - if (max_ind == -1) + if (max_ind == RMC_NIL_INDEX) // Return EBUSY if we have pending data to transmit return busy?EBUSY:0; diff --git a/rmc_sub_context.c b/rmc_sub_context.c index d77a1d4..d18a6f8 100644 --- a/rmc_sub_context.c +++ b/rmc_sub_context.c @@ -365,5 +365,5 @@ uint32_t rmc_sub_get_socket_count(rmc_sub_context_t* ctx) return 0; return rmc_sub_get_publisher_count(ctx) + - (ctx->mcast_recv_descriptor != -1)?1:0; + ((ctx->mcast_recv_descriptor != -1)?1:0); } diff --git a/rmc_sub_packet.c b/rmc_sub_packet.c index 8315bbd..6329934 100644 --- a/rmc_sub_packet.c +++ b/rmc_sub_packet.c @@ -13,6 +13,20 @@ #include #include +static int _compare_oldest_unackowledged_packet(rmc_index_t n_ind, rmc_index_t o_ind, void* user_data) +{ + rmc_sub_context_t* ctx = (rmc_sub_context_t*) user_data; + usec_timestamp_t n_oldest; + usec_timestamp_t o_oldest; + + n_oldest = sub_oldest_unacknowledged_packet(&ctx->publishers[n_ind]); + o_oldest = sub_oldest_unacknowledged_packet(&ctx->publishers[o_ind]); + + return (n_oldest < o_oldest)?-1: + ((n_oldest > o_oldest)?1:0); + +} + int rmc_sub_packet_received(rmc_sub_context_t* ctx, rmc_index_t index, // Into ctx->connections and ctx->publishers @@ -33,17 +47,8 @@ int rmc_sub_packet_received(rmc_sub_context_t* ctx, if (!sub_oldest_unacknowledged_packet(pub)) rmc_index_list_insert_sorted_rev(&ctx->pub_ack_list, index, - lambda(int, (rmc_index_t n_ind, rmc_index_t o_ind) { - usec_timestamp_t n_oldest; - usec_timestamp_t o_oldest; - - n_oldest = sub_oldest_unacknowledged_packet(&ctx->publishers[n_ind]); - o_oldest = sub_oldest_unacknowledged_packet(&ctx->publishers[o_ind]); - - return (n_oldest < o_oldest)?-1: - ((n_oldest > o_oldest)?1:0); - - })); + _compare_oldest_unackowledged_packet, + ctx); sub_packet_received(&ctx->publishers[index], pid, @@ -75,6 +80,10 @@ sub_packet_t* rmc_sub_get_next_dispatch_ready(rmc_sub_context_t* ctx) return 0; } +static int _compare_packet(sub_packet_t* needle, sub_packet_t* haystack, void* user_dat) +{ + return needle == haystack; +} // Caller still need to free pack->payload int rmc_sub_packet_dispatched_keep_payload(rmc_sub_context_t* ctx, sub_packet_t* pack) @@ -86,9 +95,7 @@ int rmc_sub_packet_dispatched_keep_payload(rmc_sub_context_t* ctx, sub_packet_t* node = sub_packet_list_find_node(&ctx->dispatch_ready, pack, - lambda(int, (sub_packet_t* needle, sub_packet_t* haystack) { - return needle == haystack; - })); + _compare_packet, 0); if (!node) return ENOENT; diff --git a/rmc_sub_read.c b/rmc_sub_read.c index 3394a7e..a77ee4c 100644 --- a/rmc_sub_read.c +++ b/rmc_sub_read.c @@ -347,7 +347,10 @@ static int process_cmd_packet(rmc_connection_t* conn, user_data_t user_data) return 0; } - +static void _payload_free_generic(void* payload, payload_len_t payload_len, user_data_t user_data) +{ + free(payload); +} int rmc_sub_close_connection(rmc_sub_context_t* ctx, rmc_index_t s_ind) { @@ -356,12 +359,7 @@ int rmc_sub_close_connection(rmc_sub_context_t* ctx, rmc_index_t s_ind) rmc_conn_close_connection(&ctx->conn_vec, s_ind); sub_reset_publisher(&ctx->publishers[s_ind], - lambda(void, (void* payload, payload_len_t payload_len, user_data_t user_data) { - if (ctx->payload_free) - (*ctx->payload_free)(payload, payload_len, user_data); - else - free(payload); - })); + ctx->payload_free?ctx->payload_free:_payload_free_generic); return 0; } diff --git a/sub.c b/sub.c index 5d60659..34d0e95 100644 --- a/sub.c +++ b/sub.c @@ -33,6 +33,26 @@ static void _free_pending_packet(sub_packet_t* ppack) free((void*) ppack); } +static int _is_duplicate(sub_packet_t* needle, + sub_packet_t* haystack, + void* user_data) +{ + int *is_duplicate = (int*) user_data; + // We found an actual duplicate! + if (needle->pid == haystack->pid) { + *is_duplicate = 1; + return 1; + } + + // Since received_pid is sorted, we will know when + // we will never find pid in the list + if (needle->pid > haystack->pid) { + *is_duplicate = 0; + return -1; + } + return 0; +} + int sub_packet_is_duplicate(sub_publisher_t* pub, packet_id_t pid) { sub_packet_t cmp_pack = { .pid = pid }; @@ -53,22 +73,7 @@ int sub_packet_is_duplicate(sub_publisher_t* pub, packet_id_t pid) sub_packet_list_find_node_rev(&pub->received_pid, &cmp_pack, - lambda(int, (sub_packet_t* needle, - sub_packet_t* haystack) { - // We found an actual duplicate! - if (needle->pid == haystack->pid) { - is_duplicate = 1; - return 1; - } - - // Since received_pid is sorted, we will know when - // we will never find pid in the list - if (needle->pid > haystack->pid) { - is_duplicate = 0; - return -1; - } - return 0; - })); + _is_duplicate, &is_duplicate); if (is_duplicate == 1) { return 1; @@ -77,6 +82,11 @@ int sub_packet_is_duplicate(sub_publisher_t* pub, packet_id_t pid) } +static int _compare_pid(sub_packet_t* n_dt, sub_packet_t* o_dt, void* user_data) { + return (n_dt->pid < o_dt->pid)?-1: + ((n_dt->pid > o_dt->pid)?1: + 0); +} int sub_packet_received(sub_publisher_t* pub, packet_id_t pid, void* payload, payload_len_t payload_len, @@ -102,11 +112,7 @@ int sub_packet_received(sub_publisher_t* pub, packet_id_t pid, // the received list than the beginning sub_packet_list_insert_sorted_rev(&pub->received_pid, pack, - lambda(int, (sub_packet_t* n_dt, sub_packet_t* o_dt) { - return (n_dt->pid < o_dt->pid)?-1: - ((n_dt->pid > o_dt->pid)?1: - 0); - })); + _compare_pid, 0); if (store_receive_interval_data) sub_packet_add_to_received_interval(pub, pid); diff --git a/time.c b/time.c index 219addd..a94918c 100644 --- a/time.c +++ b/time.c @@ -13,7 +13,7 @@ usec_timestamp_t rmc_usec_monotonic_timestamp(void) { struct timespec res; - clock_gettime(CLOCK_BOOTTIME, &res); + clock_gettime(CLOCK_MONOTONIC, &res); return (usec_timestamp_t) res.tv_sec * 1000000 + res.tv_nsec / 1000; }