Skip to content

Commit

Permalink
Port to clang (#16)
Browse files Browse the repository at this point in the history
* Builds with clang and gcc. Passes tests

* Changed socket non-block code to portable variant

* Replaced accept4() with accept() for posix compatability.

* Second attempt at cleaning out accept4()

* Replaced boot time clock with monotonic clock for POSIX compatability.
  • Loading branch information
magnusfeuer committed Sep 26, 2019
1 parent 92b2b14 commit 9d11a2b
Show file tree
Hide file tree
Showing 16 changed files with 264 additions and 221 deletions.
32 changes: 16 additions & 16 deletions list_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ static uint8_t _test_sequence(test_list* list, test_id_t start, test_id_t stop)
return 0;
}

static int _compare_pid(test_id_t existing_pid, test_id_t new_pid)
static int _compare_pid(test_id_t existing_pid, test_id_t new_pid, void* user_data)
{
if (existing_pid > new_pid)
return 1;
Expand All @@ -107,7 +107,7 @@ static int _compare_pid(test_id_t existing_pid, test_id_t new_pid)

}

static int _find_pid(test_id_t pid1, test_id_t pid2)
static int _find_pid(test_id_t pid1, test_id_t pid2, void* user_data)
{
return pid1 == pid2;

Expand Down Expand Up @@ -224,41 +224,41 @@ void run_list_tests()
}


test_list_insert_sorted(&p1, 2, _compare_pid);
test_list_insert_sorted(&p1, 2, _compare_pid, 0);
if (_test_sequence(&p1, 2, 2)) {
puts("Failed list test 4.1.\n");
exit(255);
}

test_list_insert_sorted(&p1, 1, _compare_pid);
test_list_insert_sorted(&p1, 1, _compare_pid, 0);
if (_test_sequence(&p1, 2, 1)) {
puts("Failed list test 4.2.\n");
exit(255);
}

test_list_insert_sorted(&p1, 3, _compare_pid);
test_list_insert_sorted(&p1, 3, _compare_pid, 0);
if (_test_sequence(&p1, 3, 1)) {
puts("Failed list test 4.3.\n");
exit(255);
}

test_list_insert_sorted(&p1, 7, _compare_pid);
test_list_insert_sorted(&p1, 6, _compare_pid);
test_list_insert_sorted(&p1, 5, _compare_pid);
test_list_insert_sorted(&p1, 7, _compare_pid, 0);
test_list_insert_sorted(&p1, 6, _compare_pid, 0);
test_list_insert_sorted(&p1, 5, _compare_pid, 0);

test_list_insert_sorted(&p1, 4, _compare_pid);
test_list_insert_sorted(&p1, 4, _compare_pid, 0);
if (_test_sequence(&p1, 7, 1)) {
puts("Failed list test 4.4.\n");
exit(255);
}

test_list_insert_sorted(&p1, 8, _compare_pid);
test_list_insert_sorted(&p1, 8, _compare_pid, 0);
if (_test_sequence(&p1, 8, 1)) {
puts("Failed list test 4.5.\n");
exit(255);
}

node = test_list_find_node(&p1, 1, _find_pid);
node = test_list_find_node(&p1, 1, _find_pid, 0);

if (node->data != 1) {
puts("Failed list test 5.1.\n");
Expand All @@ -267,35 +267,35 @@ void run_list_tests()

test_list_delete(node);

node = test_list_find_node(&p1, 1, _find_pid);
node = test_list_find_node(&p1, 1, _find_pid, 0);
if (node) {
puts("Failed list test 5.2.\n");
exit(255);
}

node = test_list_find_node(&p1, 2, _find_pid);
node = test_list_find_node(&p1, 2, _find_pid, 0);
if (node->data != 2) {
puts("Failed list test 5.3.\n");
exit(255);
}

test_list_delete(node);

node = test_list_find_node(&p1, 2, _find_pid);
node = test_list_find_node(&p1, 2, _find_pid, 0);
if (node) {
puts("Failed list test 5.4.\n");
exit(255);
}

node = test_list_find_node(&p1, 7, _find_pid);
node = test_list_find_node(&p1, 7, _find_pid, 0);
if (node->data != 7) {
puts("Failed list test 5.5.\n");
exit(255);
}

test_list_delete(node);

node = test_list_find_node(&p1, 7, _find_pid);
node = test_list_find_node(&p1, 7, _find_pid, 0);
if (node) {
puts("Failed list test 5.6.\n");
exit(255);
Expand Down
134 changes: 65 additions & 69 deletions pub.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ void pub_init_subscriber(pub_subscriber_t* sub, pub_context_t* ctx, user_data_t
pub_sub_list_push_tail(&ctx->subscribers, sub);
}

static int _find_subcriber(pub_subscriber_t* a, pub_subscriber_t* b, void* user_data)
{
return a == b;
}


// Clean up all pending data.
void pub_reset_subscriber(pub_subscriber_t* sub,
Expand All @@ -82,14 +87,26 @@ void pub_reset_subscriber(pub_subscriber_t* sub,


snode = pub_sub_list_find_node(&sub->context->subscribers, sub,
lambda(int, (pub_subscriber_t* a, pub_subscriber_t* b) {
return a == b;
}));
_find_subcriber,
0);
assert(snode);
pub_sub_list_delete(snode);
}


static int _compare_pid(pub_packet_t* new_pack, pub_packet_t* existing_pack, void* user_data)
{
if (new_pack->pid > existing_pack->pid)
return 1;

if (new_pack->pid < existing_pack->pid)
return -1;

return 0;
}



static packet_id_t pub_queue_packet_with_pid(pub_context_t* ctx,
packet_id_t pid,
void* payload,
Expand All @@ -116,16 +133,8 @@ static packet_id_t pub_queue_packet_with_pid(pub_context_t* ctx,
ppack->parent_node =
pub_packet_list_insert_sorted(&ctx->queued,
ppack,
lambda(int, (pub_packet_t* new_pack, pub_packet_t* existing_pack) {
if (new_pack->pid > existing_pack->pid)
return 1;

if (new_pack->pid < existing_pack->pid)
return -1;

return 0;
}
));
_compare_pid,
0);

return ppack->pid;
}
Expand Down Expand Up @@ -203,16 +212,7 @@ void pub_packet_sent(pub_context_t* ctx,
// Sorted on ascending pid.
pub_packet_list_insert_sorted_node(&ctx->inflight,
pack->parent_node,
lambda(int, (pub_packet_t* new_pack, pub_packet_t* existing_pack) {
if (new_pack->pid > existing_pack->pid)
return 1;

if (new_pack->pid < existing_pack->pid)
return -1;

return 0;
}
));
_compare_pid, 0);

// Traverse all subscribers and insert pack into their
// inflight list.
Expand All @@ -225,17 +225,8 @@ void pub_packet_sent(pub_context_t* ctx,
// Insert the new pub_packet_t in the descending
// packet_id sorted list of the subscriber's inflight packets.
pub_packet_list_insert_sorted(&sub->inflight,
pack,
lambda(int, (pub_packet_t* new_pack, pub_packet_t* existing_pack) {
if (new_pack->pid < existing_pack->pid)
return -1;

if (new_pack->pid > existing_pack->pid)
return 1;

return 0;
}
));
pack,
_compare_pid, 0);
pack->ref_count++;
sub_node = pub_sub_list_next(sub_node);
}
Expand Down Expand Up @@ -311,17 +302,18 @@ void pub_get_timed_out_subscribers(pub_context_t* ctx,
usec_timestamp_t timeout_period, // Number of usecs until timeout
pub_sub_list_t* result)
{
// Traverse all subscribers.
pub_sub_list_for_each(&ctx->subscribers,
// For each subscriber, check if their oldest inflight packet has a sent_ts
// timestamp older than max_age. If so, add the subscriber to result.
lambda(uint8_t, (pub_sub_node_t* sub_node, void* udata) {
if (pub_packet_list_size(&sub_node->data->inflight) &&
pub_packet_list_tail(&sub_node->data->inflight)->data->send_ts + timeout_period <= current_ts)
pub_sub_list_push_tail(result, sub_node->data);
return 1;
}), 0);
pub_sub_node_t* sub_node = pub_sub_list_head(&ctx->subscribers);


// Traverse all subscribers.
while(sub_node) {
// For each subscriber, check if their oldest inflight packet has a sent_ts
// timestamp older than max_age. If so, add the subscriber to result.
if (pub_packet_list_size(&sub_node->data->inflight) &&
pub_packet_list_tail(&sub_node->data->inflight)->data->send_ts + timeout_period <= current_ts)
pub_sub_list_push_tail(result, sub_node->data);
sub_node = pub_sub_list_next(sub_node);
}
}


Expand All @@ -330,17 +322,16 @@ void pub_get_timed_out_packets(pub_subscriber_t* sub,
usec_timestamp_t timeout_period, // Number of usecs until timeout
pub_packet_list_t* result)
{
pub_packet_node_t* pack_node = pub_packet_list_head(&sub->inflight);

// Traverse all inflight packets for subscriber until we find one that is not timed out.x
pub_packet_list_for_each_rev(&sub->inflight,
// For each packet, check if their oldest inflight packet has a sent_ts
// timestamp older than max_age. If so, add it to result.
lambda(uint8_t, (pub_packet_node_t* pnode, void* udata) {
if (pnode->data->send_ts + timeout_period <= current_ts) {
pub_packet_list_push_tail(result, pnode->data);
return 1;
}
return 0;
}), 0);
while(pack_node &&
pack_node->data->send_ts + timeout_period <= current_ts) {
pub_packet_list_push_tail(result, pack_node->data);
pack_node = pub_packet_list_next(pack_node);
}

return;
}


Expand All @@ -349,26 +340,31 @@ void pub_get_timed_out_packets(pub_subscriber_t* sub,
int pub_get_oldest_unackowledged_packet(pub_context_t* ctx, usec_timestamp_t* timeout_ack)
{
usec_timestamp_t oldest = -1;
pub_sub_node_t* sub_node = 0;

if (!ctx || !timeout_ack)
return 0;

sub_node = pub_sub_list_head(&ctx->subscribers);


// Traverse all subscribers.
pub_sub_list_for_each(&ctx->subscribers,
// Check if the oldest inflight packet of this subscriber is older
// than the oldest inflight packet found so far.
lambda(uint8_t, (pub_sub_node_t* sub_node, void* udata) {
pub_packet_list_t* lst = &sub_node->data->inflight;
pub_packet_t* pack = 0;
if (!pub_packet_list_size(lst))
return 1;

pack = pub_packet_list_tail(lst)->data;
if (oldest == -1 || pack->send_ts < oldest) {
oldest = pack->send_ts;
}
return 1;
}), 0);
while(sub_node) {
// Check if the oldest inflight packet of this subscriber is older
// than the oldest inflight packet found so far.
pub_packet_list_t* lst = &sub_node->data->inflight;
pub_packet_t* pack = 0;
if (!pub_packet_list_size(lst)) {
sub_node = pub_sub_list_next(sub_node);
continue;
}
pack = pub_packet_list_tail(lst)->data;
if (oldest == -1 || pack->send_ts < oldest) {
oldest = pack->send_ts;
}
sub_node = pub_sub_list_next(sub_node);
}


*timeout_ack = oldest;
return 1;
Expand Down
7 changes: 0 additions & 7 deletions reliable_multicast.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,6 @@ typedef union {
#define user_data_i32(_i32) ((user_data_t) { .i32 = _i32 })
#define user_data_ptr(_ptr) ((user_data_t) { .ptr = _ptr })

// Used for iterators etc.
#define lambda(return_type, function_body) \
({ \
return_type __fn__ function_body \
__fn__; \
})


extern usec_timestamp_t rmc_usec_monotonic_timestamp(void);

Expand Down
19 changes: 14 additions & 5 deletions rmc_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <assert.h>
Expand Down Expand Up @@ -262,7 +263,7 @@ int rmc_conn_connect_tcp_by_address(rmc_connection_vector_t* conn_vec,
rmc_index_t c_ind = RMC_NIL_INDEX;
int res = 0;
struct sockaddr_in sock_addr;

int desc_flags = 0;
assert(conn_vec);

sock_addr = (struct sockaddr_in) {
Expand All @@ -277,11 +278,14 @@ int rmc_conn_connect_tcp_by_address(rmc_connection_vector_t* conn_vec,
return ENOMEM;


conn_vec->connections[c_ind].descriptor = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);

conn_vec->connections[c_ind].descriptor = socket(AF_INET, SOCK_STREAM, 0);
if (conn_vec->connections[c_ind].descriptor == -1)
return errno;

// Set nonblocking mode on the socket.
desc_flags = fcntl(conn_vec->connections[c_ind].descriptor, F_GETFL, 0);
fcntl(conn_vec->connections[c_ind].descriptor, F_SETFL, desc_flags | O_NONBLOCK);

RMC_LOG_INDEX_COMMENT(c_ind, "Connecting to control tcp addr[%s:%d]", inet_ntoa(sock_addr.sin_addr), ntohs(sock_addr.sin_port));

res = connect(conn_vec->connections[c_ind].descriptor,
Expand Down Expand Up @@ -338,20 +342,25 @@ int rmc_conn_process_accept(int listen_descriptor,
rmc_index_t c_ind = -1;
int tr = 1;
int sock_err = 0;
int desc_flags = 0;

// Find a free slot.
c_ind = _get_free_slot(conn_vec);

if (c_ind == RMC_NIL_INDEX)
return ENOMEM;

conn_vec->connections[c_ind].descriptor = accept4(listen_descriptor,
conn_vec->connections[c_ind].descriptor = accept(listen_descriptor,
(struct sockaddr*) &src_addr,
&addr_len, SOCK_NONBLOCK);
&addr_len);

if (conn_vec->connections[c_ind].descriptor == -1)
return errno;

// Set nonblocking mode on the socket.
desc_flags = fcntl(conn_vec->connections[c_ind].descriptor, F_GETFL, 0);
fcntl(conn_vec->connections[c_ind].descriptor, F_SETFL, desc_flags | O_NONBLOCK);

RMC_LOG_INDEX_COMMENT(c_ind, "%s:%d assigned to index %d",
inet_ntoa(src_addr.sin_addr), ntohs(src_addr.sin_port), c_ind);

Expand Down

0 comments on commit 9d11a2b

Please sign in to comment.