Skip to content

Commit

Permalink
Fixed some bugs on poller mode at shutdown from another thread.
Browse files Browse the repository at this point in the history
Test also test for pool mode, but fails.
  • Loading branch information
davidmoreno committed Feb 11, 2012
1 parent 69fa1d8 commit b01b1b1
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 13 deletions.
2 changes: 1 addition & 1 deletion src/onion/onion.c
Expand Up @@ -754,7 +754,7 @@ static onion_connection_status onion_connection_read(onion_request *req){
errno=0;
r = req->server->read(req->socket, buffer, sizeof(buffer));
if (r<=0){ // error reading.
ONION_DEBUG("Read %d bytes, errno %d %s", r, errno, strerror(errno));
ONION_DEBUG0("Read %d bytes, errno %d %s", r, errno, strerror(errno));
if (errno==ECONNRESET)
ONION_DEBUG("Connection reset by peer."); // Ok, this is more or less normal.
else if (errno==EAGAIN){
Expand Down
29 changes: 25 additions & 4 deletions src/onion/poller.c
Expand Up @@ -31,6 +31,8 @@
#include "log.h"
#include "types.h"
#include "poller.h"
#include <sys/socket.h>
#include <sys/eventfd.h>

#ifdef HAVE_PTHREADS
# define __USE_UNIX98
Expand All @@ -46,8 +48,9 @@

struct onion_poller_t{
int fd;
int stop;
int eventfd; ///< fd to signal internal changes on poller.
int n;
char stop;
#ifdef HAVE_PTHREADS
pthread_mutex_t mutex;
int npollers;
Expand Down Expand Up @@ -133,6 +136,10 @@ void onion_poller_slot_set_timeout(onion_poller_slot *el, int timeout){
ONION_DEBUG0("Set timeout to %d, %d s", el->timeout_limit, el->timeout);
}

static int onion_poller_empty_helper(void *_){
return 0;
}

/**
* @short Returns a poller object that helps polling on sockets and files
* @memberof onion_poller_t
Expand All @@ -149,9 +156,14 @@ onion_poller *onion_poller_new(int n){
free(p);
return NULL;
}
p->stop=0;
p->eventfd=eventfd(0,EFD_CLOEXEC);
p->head=NULL;
p->n=0;
p->stop=0;

onion_poller_slot *ev=onion_poller_slot_new(p->eventfd,onion_poller_empty_helper,NULL);
onion_poller_add(p,ev);

#ifdef HAVE_PTHREADS
p->npollers=0;
pthread_mutexattr_t attr;
Expand Down Expand Up @@ -239,7 +251,7 @@ int onion_poller_remove(onion_poller *poller, int fd){

poller->head=el->next;
pthread_mutex_unlock(&poller->mutex);
onion_poller_slot_free(el);
return 0;
}
Expand All @@ -248,6 +260,12 @@ int onion_poller_remove(onion_poller *poller, int fd){
ONION_DEBUG0("Removed from tail %p",el);
onion_poller_slot *t=el->next;
el->next=t->next;

if (poller->head->next==NULL){ // This means only eventfd is here.
ONION_DEBUG0("Removed last, stopping poll");
onion_poller_stop(poller);
}

pthread_mutex_unlock(&poller->mutex);

onion_poller_slot_free(t);
Expand Down Expand Up @@ -412,6 +430,9 @@ void onion_poller_poll(onion_poller *p){
* @memberof onion_poller_t
*/
void onion_poller_stop(onion_poller *p){
p->stop=1;
ONION_DEBUG0("Stopping poller");
p->stop=1;
char data[8]={0,0,0,0, 0,0,0,1};
write(p->eventfd,data,8);
}

57 changes: 49 additions & 8 deletions tests/01-internal/06-onion.c
Expand Up @@ -30,7 +30,9 @@

const int MAX_TIME=120;

char processed;
int processed;
pthread_mutex_t processed_mutex = PTHREAD_MUTEX_INITIALIZER;

volatile char okexit=0;
onion *o;
FILE *null_file=NULL;
Expand All @@ -43,7 +45,9 @@ typedef struct{
}params_t;

onion_response_codes process_request(void *_, onion_request *req, onion_response *res){
pthread_mutex_lock(&processed_mutex);
processed++;
pthread_mutex_unlock(&processed_mutex);
onion_response_write0(res, "Done");

return OCS_PROCESSED;
Expand Down Expand Up @@ -71,6 +75,7 @@ int curl_get(const char *url){
}

void *do_requests(params_t *t){
ONION_DEBUG("Do %d petitions",t->n_requests);
int i;
usleep(t->wait_s*1000000);
for(i=0;i<t->n_requests;i++){
Expand All @@ -92,23 +97,34 @@ void *watchdog(void *_){
return NULL;
}

void do_petition_set(float wait_s, float wait_c, int n_requests, char close){
void do_petition_set_threaded(float wait_s, float wait_c, int nrequests, char close, int nthreads){
ONION_DEBUG("Using %d threads, %d petitions per thread",nthreads,nrequests);
processed=0;

params_t params;
params.wait_s=wait_s;
params.wait_t=wait_c;
params.n_requests=n_requests;
params.n_requests=nrequests;
params.close_at_n=close;

pthread_t thread;
pthread_create(&thread, NULL, (void*)do_requests, &params);
pthread_t *thread=malloc(sizeof(pthread_t*)*nthreads);
int i;
for (i=0;i<nthreads;i++){
pthread_create(&thread[i], NULL, (void*)do_requests, &params);
}
onion_listen(o);
pthread_join(thread, NULL);
for (i=0;i<nthreads;i++){
pthread_join(thread[i], NULL);
}
free(thread);

FAIL_IF_NOT_EQUAL_INT(params.n_requests, processed);
}

void do_petition_set(float wait_s, float wait_c, int n_requests, char close){
do_petition_set_threaded(wait_s, wait_c, n_requests, close, 1);
}

void t01_server_one(){
INIT_LOCAL();

Expand Down Expand Up @@ -137,7 +153,32 @@ void t02_server_epoll(){
onion_set_root_handler(o,onion_handler_new((void*)process_request,NULL,NULL));
do_petition_set(1,0.1,1,1);
onion_free(o);


o=onion_new(O_POLL);
onion_set_root_handler(o,onion_handler_new((void*)process_request,NULL,NULL));
do_petition_set(1,0.001,100,1);
onion_free(o);

o=onion_new(O_POOL);
onion_set_root_handler(o,onion_handler_new((void*)process_request,NULL,NULL));
do_petition_set(1,0.001,100,1);
onion_free(o);

o=onion_new(O_POOL);
onion_set_root_handler(o,onion_handler_new((void*)process_request,NULL,NULL));
do_petition_set_threaded(1,0.001,100,1,10);
onion_free(o);

o=onion_new(O_POOL);
onion_set_root_handler(o,onion_handler_new((void*)process_request,NULL,NULL));
do_petition_set_threaded(1,0.001,100,1,15);
onion_free(o);

o=onion_new(O_POOL);
onion_set_root_handler(o,onion_handler_new((void*)process_request,NULL,NULL));
do_petition_set_threaded(1,0.001,100,1,100);
onion_free(o);

END_LOCAL();
}

Expand All @@ -146,7 +187,7 @@ int main(int argc, char **argv){
pthread_t watchdog_thread;
pthread_create(&watchdog_thread, NULL, (void*)watchdog, NULL);

t01_server_one();
// t01_server_one();
t02_server_epoll();

okexit=1;
Expand Down

0 comments on commit b01b1b1

Please sign in to comment.