Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
More moving around and adjusting com source code.
  • Loading branch information
Nikhil Samith Bysani committed Jan 20, 2009
1 parent 6cf692c commit 076b1a7
Showing 1 changed file with 116 additions and 92 deletions.
208 changes: 116 additions & 92 deletions src/base/bang-com.c
Expand Up @@ -159,6 +159,14 @@ static peer_set* new_peer_set();

static void free_peer_set();

/**
* \param peer_id The id of a peer.
*
* \brief Gets the location of the peer in the in
* the peers array.
*/
static int peer_set_get_key(int peer_id);

/**
* \param peers The set to add the peer to.
* \param new_peer A peer to be added to the set.
Expand All @@ -171,15 +179,7 @@ static int peer_set_add_peer(peer_set *peers, peer *new_peer);

static void peer_set_remove_peer_by_id(peer_set *peers, int peer_id);

static void peer_set_remover_peer(peer_set *peers, peer *dead_peer);

/**
* \param peer_id The id of a peer.
*
* \brief Gets the location of the peer in the in
* the peers array.
*/
static int get_key_with_peer_id(int peer_id);
static void peer_set_remove_peer(peer_set *peers, peer *dead_peer);

/**
* \param self The peer to be closed.
Expand Down Expand Up @@ -293,47 +293,6 @@ static void read_peer_thread_self_close(peer *self) {
pthread_exit(NULL);
}

static void* extract_message(peer *self, unsigned int length) {
void *message = (char*) calloc(length,1);
int check_read;
unsigned int have_read = 0;
char extracting = 1;

while (extracting) {
if (poll(&(self->pfd),1,-1) != -1 && self->pfd.revents & POLLIN) {
check_read = read(self->socket,message + have_read,length);

if (check_read <= 0) {
#ifdef BDEBUG_1
fprintf(stderr,"Read on the socket has returned an error.\n");
#endif

free(message);
message = NULL;
extracting = 0;
} else {
have_read += check_read;
extracting = (have_read >= length) ? 0 : 1;
}
} else {
#ifdef BDEBUG_1

fprintf(stderr,"POLLIN:\t%x.\n",POLLIN);
fprintf(stderr,"POLLOUT:\t%x.\n",POLLOUT);
fprintf(stderr,"POLLERR:\t%x.\n",POLLERR);
fprintf(stderr,"POLLHUP:\t%x.\n",POLLHUP);
fprintf(stderr,"POLLNVAL:\t%x.\n",POLLNVAL);
fprintf(stderr,"Something went wrong on poll, revents %x.\n",self->pfd.revents);
#endif
free(message);
message = NULL;
extracting = 0;
}
}

return message;
}

static BANG_requests* new_BANG_requests() {
BANG_requests *requests = (BANG_requests*) calloc(1,sizeof(BANG_requests));
requests->head = NULL;
Expand Down Expand Up @@ -390,15 +349,55 @@ static peer* new_peer(int peer_id, int socket) {
return new;
}

static int add_peer_to_peers(peer *new) {
int current_key = ++current_peers;
static peer_set* new_peer_set() {
peer_set *new = calloc(1,sizeof(peer_set));

keys = (int*) realloc(keys,current_peers * sizeof(int));
peers = (peer**) realloc(peers,current_peers * sizeof(peer*));
new->lck = new_BANG_rw_syncro();
new->peer_count = 0;
new->current_peers = 0;
new->size = 2;

new->peers = calloc(new->size,sizeof(peers));
new->keys = calloc(new->size,sizeof(int));

return new;
}

static void free_peer_set(peer_set *peers) {
unsigned int i = 0;

BANG_write_lock(peers->lck);

for (i = 0; i < current_peers; ++i) {
free_peer(peers[i]);
}

BANG_write_unlock(peers->lck);

free_BANG_rw_syncro(peers->ck);
free(peers->keys);
free(peers->peers);
free(peers);
}

static int peer_set_add_peer(peer_set *peers, peer *new) {
BANG_write_lock(peers->lck);

int current_key = ++(peers->current_peers);

/* Grow the array if it is too small. */
if (peers->size < peers->current_peers) {
peer->size *= 2;

keys = realloc(peers->keys,peers->size * sizeof(int));
peers = realloc(peers->peers,peers->size * sizeof(peer*));
}

keys[current_key] = new->peer_id;
peers[current_key] = new;

BANG_write_unlock(peers->lck);

return current_key;
}

Expand Down Expand Up @@ -469,14 +468,12 @@ void BANG_add_peer(int socket) {
}

void BANG_remove_peer(int peer_id) {
/*
* this lock is needed when trying to change the
* peers structure
*/
#ifdef BDEBUG_1
fprintf(stderr,"Removing peer %d.\n",peer_id);
#endif

peer_set_remove_by_id(peer_id);

BANG_write_lock(peers_lock);

int pos = get_key_with_peer_id(peer_id);
Expand Down Expand Up @@ -533,7 +530,7 @@ void BANG_com_init() {
BANG_install_sighandler(BANG_PEER_CONNECTED,&catch_add_peer);
BANG_install_sighandler(BANG_PEER_DISCONNECTED,&catch_remove_peer);
BANG_install_sighandler(BANG_REQUEST_ALL,&catch_request_all);
peers_lock = new_BANG_rw_syncro();
peers = new_peer_set();
}

void BANG_com_close() {
Expand All @@ -546,21 +543,7 @@ void BANG_com_close() {
fprintf(stderr,"BANG com closing.\n");
#endif
free_peer_set(peers);

unsigned int i = 0;
BANG_write_lock(peers_lock);

for (i = 0; i < current_peers; ++i) {
free_peer(peers[i]);
}

BANG_write_unlock(peers_lock);

free_BANG_rw_syncro(peers_lock);
keys = NULL;
peers = NULL;
current_peers = 0;
peer_count = 0;
}

/*
Expand Down Expand Up @@ -624,25 +607,6 @@ static char read_module_message(peer *self) {
return 1;
}

static unsigned int write_message(peer *self, void *message, unsigned int length) {
unsigned int written = 0;
int write_return = 0;
char writing = 1;
while (writing) {
write_return = write(self->socket,message,length);
if (write_return >= 0) {
written += write_return;
if (written >= length)
writing = 0;
} else {
writing = 0;
/* ERROR !*/
}
}

return written;
}

/**
* \param self The thread sending the module.
* \param request The module path to send.
Expand Down Expand Up @@ -693,6 +657,66 @@ static void send_module_peer_request(peer *self, BANG_request request) {
free(request.request);
}

static unsigned int write_message(peer *self, void *message, unsigned int length) {
unsigned int written = 0;
int write_return = 0;
char writing = 1;
while (writing) {
write_return = write(self->socket,message,length);
if (write_return >= 0) {
written += write_return;
if (written >= length)
writing = 0;
} else {
writing = 0;
/* ERROR !*/
}
}

return written;
}

static void* extract_message(peer *self, unsigned int length) {
void *message = (char*) calloc(length,1);
int check_read;
unsigned int have_read = 0;
char extracting = 1;

while (extracting) {
if (poll(&(self->pfd),1,-1) != -1 && self->pfd.revents & POLLIN) {
check_read = read(self->socket,message + have_read,length);

if (check_read <= 0) {
#ifdef BDEBUG_1
fprintf(stderr,"Read on the socket has returned an error.\n");
#endif

free(message);
message = NULL;
extracting = 0;
} else {
have_read += check_read;
extracting = (have_read >= length) ? 0 : 1;
}
} else {
#ifdef BDEBUG_1

fprintf(stderr,"POLLIN:\t%x.\n",POLLIN);
fprintf(stderr,"POLLOUT:\t%x.\n",POLLOUT);
fprintf(stderr,"POLLERR:\t%x.\n",POLLERR);
fprintf(stderr,"POLLHUP:\t%x.\n",POLLHUP);
fprintf(stderr,"POLLNVAL:\t%x.\n",POLLNVAL);
fprintf(stderr,"Something went wrong on poll, revents %x.\n",self->pfd.revents);
#endif
free(message);
message = NULL;
extracting = 0;
}
}

return message;
}

void* BANG_read_peer_thread(void *self_info) {
peer *self = (peer*)self_info;

Expand Down

0 comments on commit 076b1a7

Please sign in to comment.