diff --git a/src/base/bang-com.c b/src/base/bang-com.c index 874e4f2..9999075 100644 --- a/src/base/bang-com.c +++ b/src/base/bang-com.c @@ -95,15 +95,14 @@ typedef struct { * A lock on peers structure and affiliate things. It can have multiple readers, * but only one writer. */ - BANG_rw_syncro *peers_lock; + BANG_rw_syncro *lck; unsigned int peer_count; unsigned int current_peers; size_t size; - - peer **peers = NULL; - int *keys = NULL; + peer **peers; + int *keys; } peer_set; /** @@ -155,6 +154,10 @@ static peer* new_peer(int peer_id, int socket); */ static void free_peer(peer *p); +static void peer_stop(peer *p); + +static void peer_start(peer *p); + static peer_set* new_peer_set(); static void free_peer_set(); @@ -165,21 +168,25 @@ static void free_peer_set(); * \brief Gets the location of the peer in the in * the peers array. */ -static int peer_set_get_key(int peer_id); +static int internal_peer_set_get_key(peer_set *ps, int peer_id); /** * \param peers The set to add the peer to. - * \param new_peer A peer to be added to the set. + * \param socket The socket of the new peer. * * \return The peer_id of the peer. * * \brief Adds new_peer to the peers array. */ -static int peer_set_add_peer(peer_set *peers, peer *new_peer); +static int peer_set_create_peer(peer_set *ps, int socket); -static void peer_set_remove_peer_by_id(peer_set *peers, int peer_id); +static void peer_set_remove_peer_by_id(peer_set *ps, int peer_id); -static void peer_set_remove_peer(peer_set *peers, peer *dead_peer); +static void peer_set_remove_peer(peer_set *ps, peer *dead_peer); + +static void peer_set_start_peer_by_id(peer_set *ps, int peer_id); + +static void peer_set_stop_peer_by_id(peer_set *ps, int peer_id); /** * \param self The peer to be closed. @@ -215,7 +222,7 @@ static BANG_requests* new_BANG_requests(); */ static void free_BANG_requests(BANG_requests *requests); -static peer_set peers; +static peer_set *peers; static request_node* new_request_node() { request_node *new_node = (request_node*) calloc(1,sizeof(request_node)); @@ -230,6 +237,16 @@ static void free_request_list(request_node *head) { free(head); } +static void append_request(request_node **head, request_node *node) { + if (*head == NULL) { + *head = node; + } else { + request_node *cur; + for (cur = *head; cur->next != NULL; cur = cur->next); + cur->next = node; + } +} + static request_node* pop_request(request_node **head) { request_node *temp = *head; if (temp->next == NULL) { @@ -243,34 +260,24 @@ static request_node* pop_request(request_node **head) { } } -static void append_request(request_node **head, request_node *node) { - if (*head == NULL) { - *head = node; - } else { - request_node *cur; - for (cur = *head; cur->next != NULL; cur = cur->next); - cur->next = node; - } +static BANG_requests* new_BANG_requests() { + BANG_requests *requests = (BANG_requests*) calloc(1,sizeof(BANG_requests)); + requests->head = NULL; + pthread_mutex_init(&(requests->lock),NULL); + sem_init(&(requests->num_requests),0,0); + return requests; +} + +static void free_BANG_requests(BANG_requests *requests) { + pthread_mutex_destroy(&(requests->lock)); + sem_destroy(&(requests->num_requests)); + free_request_list(requests->head); } static void free_peer(peer *p) { #ifdef BDEBUG_1 fprintf(stderr,"Freeing a peer with peer_id %d.\n",p->peer_id); #endif - pthread_cancel(p->receive_thread); - - /* We need to close the receive thread in a more roundabout way, since it may be waiting - * on a semaphore in which case it will never close */ - BANG_request request; - - request.type = BANG_CLOSE_REQUEST; - request.length = 0; - request.request = NULL; - - request_peer(p,request); - - pthread_join(p->receive_thread,NULL); - pthread_join(p->send_thread,NULL); free_BANG_requests(p->requests); @@ -279,32 +286,21 @@ static void free_peer(peer *p) { free(p); } -static void read_peer_thread_self_close(peer *self) { - BANG_sigargs args; - args.args = calloc(1,sizeof(int)); - *((int*)args.args) = self->peer_id; - args.length = sizeof(int); +static peer* new_peer(int peer_id, int socket) { + peer *new; - BANG_read_lock(peers_lock); - BANG_send_signal(BANG_PEER_DISCONNECTED,&args,1); - free(args.args); - BANG_read_unlock(peers_lock); + new = (peer*) calloc(1,sizeof(peer)); - pthread_exit(NULL); -} + new->peer_id = peer_id; + new->socket = socket; -static BANG_requests* new_BANG_requests() { - BANG_requests *requests = (BANG_requests*) calloc(1,sizeof(BANG_requests)); - requests->head = NULL; - pthread_mutex_init(&(requests->lock),NULL); - sem_init(&(requests->num_requests),0,0); - return requests; -} + new->requests = new_BANG_requests(); -static void free_BANG_requests(BANG_requests *requests) { - pthread_mutex_destroy(&(requests->lock)); - sem_destroy(&(requests->num_requests)); - free_request_list(requests->head); + /* Set up the poll struct. */ + new->pfd.fd = socket; + new->pfd.events = POLLIN | POLLERR | POLLHUP | POLLNVAL; + + return new; } static void request_peer(peer *to_be_requested, BANG_request request) { @@ -318,35 +314,26 @@ static void request_peer(peer *to_be_requested, BANG_request request) { sem_post(&(to_be_requested->requests->num_requests)); } -static int intcmp(const void *i1, const void *i2) { - return *((int*) i1) - *((int*) i2); +static void peer_start(peer *p) { + pthread_create(&(p->receive_thread),NULL,BANG_read_peer_thread,p); + pthread_create(&(p->send_thread),NULL,BANG_write_peer_thread,p); } -static int get_key_with_peer_id(int peer_id) { - int pos = -1; - - int *ptr = bsearch(&peer_id,keys,current_peers,sizeof(int),&intcmp); - if (ptr != NULL) - pos = keys - ptr; - - return pos; -} - -static peer* new_peer(int peer_id, int socket) { - peer *new; - - new = (peer*) calloc(1,sizeof(peer)); +static void peer_stop(peer *p) { + pthread_cancel(p->receive_thread); - new->peer_id = peer_id; - new->socket = socket; + /* We need to close the receive thread in a more roundabout way, since it may be waiting + * on a semaphore in which case it will never close */ + BANG_request request; - new->requests = new_BANG_requests(); + request.type = BANG_CLOSE_REQUEST; + request.length = 0; + request.request = NULL; - /* Set up the poll struct. */ - new->pfd.fd = socket; - new->pfd.events = POLLIN | POLLERR | POLLHUP | POLLNVAL; + request_peer(p,request); - return new; + pthread_join(p->receive_thread,NULL); + pthread_join(p->send_thread,NULL); } static peer_set* new_peer_set() { @@ -363,42 +350,97 @@ static peer_set* new_peer_set() { return new; } -static void free_peer_set(peer_set *peers) { +static void free_peer_set(peer_set *ps) { unsigned int i = 0; - BANG_write_lock(peers->lck); + BANG_write_lock(ps->lck); - for (i = 0; i < current_peers; ++i) { - free_peer(peers[i]); + for (i = 0; i < ps->current_peers; ++i) { + free_peer(ps->peers[i]); } - BANG_write_unlock(peers->lck); + BANG_write_unlock(ps->lck); - free_BANG_rw_syncro(peers->ck); - free(peers->keys); - free(peers->peers); - free(peers); + free_BANG_rw_syncro(ps->lck); + free(ps->keys); + free(ps->peers); + free(ps); } -static int peer_set_add_peer(peer_set *peers, peer *new) { - BANG_write_lock(peers->lck); +static int intcmp(const void *i1, const void *i2) { + return *((int*) i1) - *((int*) i2); +} - int current_key = ++(peers->current_peers); +static int internal_peer_set_get_key(peer_set *ps, int peer_id) { + int pos = -1; + + int *ptr = bsearch(&peer_id,ps->keys,ps->current_peers,sizeof(int),&intcmp); + if (ptr != NULL) + pos = ps->keys - ptr; + + return pos; +} + +static int peer_set_create_peer(peer_set *ps, int socket) { + BANG_write_lock(ps->lck); + + int new_id = ++(ps->peer_count); + int current_key = ++(ps->current_peers); + + peer *new = new_peer(new_id,socket); /* Grow the array if it is too small. */ - if (peers->size < peers->current_peers) { - peer->size *= 2; + if (ps->size < ps->current_peers) { + ps->size *= 2; + + ps->keys = realloc(ps->keys,ps->size * sizeof(int)); + ps->peers = realloc(ps->peers,ps->size * sizeof(peer*)); + } + + ps->keys[current_key] = new->peer_id; + ps->peers[current_key] = new; - keys = realloc(peers->keys,peers->size * sizeof(int)); - peers = realloc(peers->peers,peers->size * sizeof(peer*)); + BANG_write_unlock(ps->lck); + + return new_id; +} + +static void peer_set_remove_peer_by_id(peer_set *ps, int peer_id) { + BANG_write_lock(ps->lck); + + int pos = internal_peer_set_get_key(ps, peer_id); + + if (pos == -1) return; + + free_peer(ps->peers[pos]); + ps->peers[pos] = NULL; + + for (;((unsigned int)pos) < ps->current_peers - 1; ++pos) { + ps->peers[pos] = ps->peers[pos + 1]; + ps->keys[pos] = ps->keys[pos + 1]; } - keys[current_key] = new->peer_id; - peers[current_key] = new; + --(ps->current_peers); + + BANG_write_unlock(ps->lck); +} + +static void peer_set_start_peer_by_id(peer_set *ps, int peer_id) { + BANG_read_lock(ps->lck); - BANG_write_unlock(peers->lck); + int key = internal_peer_set_get_key(ps, peer_id); + peer_start(ps->peers[key]); - return current_key; + BANG_read_unlock(ps->lck); +} + +static void peer_set_stop_peer_by_id(peer_set *ps, int peer_id) { + BANG_read_lock(ps->lck); + + int key = internal_peer_set_get_key(ps, peer_id); + peer_stop(ps->peers[key]); + + BANG_read_unlock(ps->lck); } static void catch_add_peer(int signal, int num_sockets, void **socket) { @@ -438,28 +480,20 @@ static void catch_request_all(int signal, int num_requests, void **vrequest) { } void BANG_add_peer(int socket) { - BANG_write_lock(peers_lock); - - int current_id = peer_count++; - - peer *new = new_peer(current_id,socket); - - int key = add_peer_to_peers(new); + int peer_id = peer_set_create_peer(peers, socket); #ifdef BDEBUG_1 fprintf(stderr,"Threads being started at %d.\n",key); #endif - pthread_create(&(peers[key]->receive_thread),NULL,BANG_read_peer_thread,peers[key]); - pthread_create(&(peers[key]->send_thread),NULL,BANG_write_peer_thread,peers[key]); + peer_set_start_peer_by_id(peers,peer_id); - BANG_write_unlock(peers_lock); /** * Send out that we successfully started the peer threads. */ BANG_sigargs args; args.args = calloc(1,sizeof(int)); - *((int*)args.args) = current_id; + *((int*)args.args) = peer_id; args.length = sizeof(int); BANG_send_signal(BANG_PEER_ADDED,&args,1); @@ -472,28 +506,8 @@ void BANG_remove_peer(int peer_id) { fprintf(stderr,"Removing peer %d.\n",peer_id); #endif - peer_set_remove_by_id(peer_id); - - BANG_write_lock(peers_lock); - - int pos = get_key_with_peer_id(peer_id); - if (pos == -1) return; - - free_peer(peers[pos]); - peers[pos] = NULL; - - for (;((unsigned int)pos) < current_peers - 1; ++pos) { - peers[pos] = peers[pos + 1]; - keys[pos] = keys[pos + 1]; - } - - --current_peers; - - - peers = (peer**) realloc(peers,current_peers * sizeof(peer*)); - keys = (int*) realloc(keys,current_peers * sizeof(int)); - - BANG_write_unlock(peers_lock); + peer_set_stop_peer_by_id(peers,peer_id); + peer_set_remove_peer_by_id(peers,peer_id); BANG_sigargs peer_send; peer_send.args = calloc(1,sizeof(int)); @@ -657,6 +671,20 @@ static void send_module_peer_request(peer *self, BANG_request request) { free(request.request); } +static void read_peer_thread_self_close(peer *self) { + BANG_sigargs args; + args.args = calloc(1,sizeof(int)); + *((int*)args.args) = self->peer_id; + args.length = sizeof(int); + + BANG_read_lock(peers_lock); + BANG_send_signal(BANG_PEER_DISCONNECTED,&args,1); + free(args.args); + BANG_read_unlock(peers_lock); + + pthread_exit(NULL); +} + static unsigned int write_message(peer *self, void *message, unsigned int length) { unsigned int written = 0; int write_return = 0;