Skip to content

Commit

Permalink
Integrated bang_linked_list into com, and made the cascading changes
Browse files Browse the repository at this point in the history
required.

git-svn-id: https://subversion.cs.uiuc.edu/svn/bang/eoh2009@62 69d76c3e-0761-0410-948c-9895a8bb34fc
  • Loading branch information
nbysani2 committed Jan 26, 2009
1 parent 9eeda52 commit ee243b2
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 157 deletions.
149 changes: 27 additions & 122 deletions src/base/bang-com.c
Expand Up @@ -21,33 +21,18 @@
#include<sys/socket.h>
#include<unistd.h>

/**
* A linked list of requests of peer send threads.
*/
typedef struct _request_node {
BANG_request request;
/**
* The next node in the request list.
*/
struct _request_node *next;
} request_node;

/**
* Holds requests of the peers in a linked list.
*/
typedef struct {
/**
* The lock on adding or removing requests.
*/
pthread_mutex_t lock;
/**
* Signals the thread that a new requests has been added.
*/
sem_t num_requests;
/**
* A linked list of requests
*/
request_node *head;
BANG_linked_list *requests;
} BANG_requests;

/**
Expand Down Expand Up @@ -107,41 +92,14 @@ typedef struct {
int *keys;
} peer_set;

/**
* \brief Allocates and returns a new request node.
*/
static request_node* new_request_node();

/**
* \param head The head of the list to be freed.
*
* \brief Frees resources used by a request list started at
* head.
*/
static void free_request_list(request_node *head);

/**
* \return The head of the request node list starting at head.
*
* \brief Pops off the head of the linked list.
*/
static request_node* pop_request(request_node **head);

/**
* \param node appends node to list started at head
* \param head start of the request node list
*
* \brief Appends a node to the head linked list.
*/
static void append_request(request_node **head, request_node *node);

/**
* \param self The peer to be requested.
* \param request Request to be given to the peer.
*
* \brief Adds a request to a peer structure.
*/
static void request_peer(peer *self, BANG_request request);
static void request_peer(peer *self, BANG_request *request);

/**
* \param peer_id Id to be used for the peer.
Expand Down Expand Up @@ -271,54 +229,18 @@ static void free_BANG_requests(BANG_requests *requests);

static peer_set *peers;

static request_node* new_request_node() {
request_node *new_node = (request_node*) calloc(1,sizeof(request_node));
new_node->next = NULL;
return new_node;
}

static void free_request_list(request_node *head) {
if (head == NULL) return;
if (head->next != NULL)
free_request_list(head->next);
free(head);
}

static void append_request(request_node **head, request_node *node) {
if (*head == NULL) {
*head = node;
} else {
request_node *cur;
for (cur = *head; cur->next != NULL; cur = cur->next);
cur->next = node;
}
}

static request_node* pop_request(request_node **head) {
request_node *temp = *head;
if (temp->next == NULL) {
*head = NULL;
return temp;
static BANG_requests* new_BANG_requests() {
BANG_requests *new = (BANG_requests*) calloc(1,sizeof(BANG_requests));

} else {
*head = temp->next;
temp->next = NULL;
return temp;
}
}
new->requests = new_BANG_linked_list();
sem_init(&(new->num_requests),0,0);

static BANG_requests* new_BANG_requests() {
BANG_requests *requests = (BANG_requests*) calloc(1,sizeof(BANG_requests));
requests->head = NULL;
pthread_mutex_init(&(requests->lock),NULL);
sem_init(&(requests->num_requests),0,0);
return requests;
return new;
}

static void free_BANG_requests(BANG_requests *requests) {
pthread_mutex_destroy(&(requests->lock));
sem_destroy(&(requests->num_requests));
free_request_list(requests->head);
free_BANG_linked_list(requests->requests,&free_BANG_request);
}

static void free_peer(peer *p) {
Expand Down Expand Up @@ -350,14 +272,8 @@ static peer* new_peer(int peer_id, int socket) {
return new;
}

static void request_peer(peer *to_be_requested, BANG_request request) {
pthread_mutex_lock(&(to_be_requested->requests->lock));

request_node *temp = new_request_node();
temp->request = request;
append_request(&(to_be_requested->requests->head),temp);

pthread_mutex_unlock(&(to_be_requested->requests->lock));
static void request_peer(peer *to_be_requested, BANG_request *request) {
BANG_linked_list_push(to_be_requested->requests->requests,request);
sem_post(&(to_be_requested->requests->num_requests));
}

Expand All @@ -371,11 +287,7 @@ static void peer_stop(peer *p) {

/* We need to close the receive thread in a more roundabout way, since it may be waiting
* on a semaphore in which case it will never close */
BANG_request request;

request.type = BANG_CLOSE_REQUEST;
request.length = 0;
request.request = NULL;
BANG_request *request = new_BANG_request(BANG_CLOSE_REQUEST,NULL,0);

request_peer(p,request);

Expand Down Expand Up @@ -533,8 +445,7 @@ static void catch_request_all(int signal, int num_requests, void **vrequest) {
BANG_request **to_request = (BANG_request**) vrequest;
int i = 0;
for (; i < num_requests; ++i) {
BANG_request_all(*(to_request[i]));
free(to_request[i]);
BANG_request_all(to_request[i]);
}
free(to_request);
}
Expand Down Expand Up @@ -578,12 +489,12 @@ void BANG_remove_peer(int peer_id) {
free(peer_send.args);
}

void BANG_request_peer_id(int peer_id, BANG_request request) {
void BANG_request_peer_id(int peer_id, BANG_request *request) {
peer *to_be_requested = peer_set_get_peer(peers,peer_id);
request_peer(to_be_requested,request);
}

void BANG_request_all(BANG_request request) {
void BANG_request_all(BANG_request *request) {
unsigned int i = 0;

/* TODO: Do this a better a way, have a peer set function? */
Expand Down Expand Up @@ -740,10 +651,9 @@ static char read_job_message(peer *self) {
* it may be possible that modules don't fit in memory though this seems
* _very_ unlikely.
*/
static void send_module(peer *self, BANG_request request) {
FILE *fd = fopen((char*)request.request,"r");
static void send_module(peer *self, BANG_request *request) {
FILE *fd = fopen((char*)request->request,"r");
if (fd == NULL) {
free(request.request);
return;
}

Expand All @@ -769,14 +679,12 @@ static void send_module(peer *self, BANG_request request) {
} while (reading >= UPDATE_SIZE);

fclose(fd);
free(request.request);
}

static void send_module_peer_request(peer *self, BANG_request request) {
unsigned int header = BANG_WANT_MODULE;
write_message(self,&header,LENGTH_OF_LENGTHS);
write_message(self,&(request.request),request.length);
free(request.request);
static void send_module_peer_request(peer *self, BANG_request *request) {
BANG_header header = BANG_WANT_MODULE;
write_message(self,&header,LENGTH_OF_HEADER);
write_message(self,request->request,request->length);
}

static void read_peer_thread_self_close(peer *self) {
Expand Down Expand Up @@ -914,20 +822,18 @@ void* BANG_read_peer_thread(void *self_info) {

void* BANG_write_peer_thread(void *self_info) {
peer *self = (peer*)self_info;
request_node *current;
BANG_request *current;
char sending = 1;
BANG_header header;

while (sending) {
sem_wait(&(self->requests->num_requests));
pthread_mutex_lock(&(self->requests->lock));
current = pop_request(&(self->requests->head));
pthread_mutex_unlock(&(self->requests->lock));
current = BANG_linked_list_dequeue(self->requests->requests);

/*
* TODO: act on current request
*/
switch (current->request.type) {
switch (current->type) {
case BANG_CLOSE_REQUEST:
header = BANG_BYE;
write_message(self,&header,LENGTH_OF_HEADER);
Expand All @@ -943,9 +849,8 @@ void* BANG_write_peer_thread(void *self_info) {
* possibly put in own method */
header = BANG_DEBUG_MESSAGE;
write_message(self,&header,LENGTH_OF_HEADER);
write_message(self,&(current->request.length),LENGTH_OF_LENGTHS);
write_message(self,&(current->request.request),current->request.length);
free(current->request.request);
write_message(self,&(current->length),LENGTH_OF_LENGTHS);
write_message(self,current->request,current->length);
break;

case BANG_SEND_MODULE_REQUEST:
Expand All @@ -959,12 +864,12 @@ void* BANG_write_peer_thread(void *self_info) {
default:
/*ERROR!*/
#ifdef BDEBUG_1
fprintf(stderr,"BANG ERROR:\t%d is not a request type that we take care of!\n",current->request.type);
fprintf(stderr,"BANG ERROR:\t%d is not a request type that we take care of!\n",current->type);
#endif
break;
}

free(current);
free_BANG_request(current);
}

#ifdef BDEBUG_1
Expand Down
4 changes: 2 additions & 2 deletions src/base/bang-com.h
Expand Up @@ -46,14 +46,14 @@ void* BANG_write_peer_thread(void *self_info);
*
* \brief Adds a request to the peer_id peer.
*/
void BANG_request_peer_id(int peer_id, BANG_request request);
void BANG_request_peer_id(int peer_id, BANG_request *request);

/**
* \param request The request to add.
*
* \brief Adds request to all peers.
*/
void BANG_request_all(BANG_request request);
void BANG_request_all(BANG_request *request);

/**
* \param peer_id Peer to remove.
Expand Down

0 comments on commit ee243b2

Please sign in to comment.