Skip to content

Commit

Permalink
add ASYNC_CHANGE_FD status in reactor
Browse files Browse the repository at this point in the history
Allows changing the file descriptor in the resume function. Such
functionality is useful for failover purposes, like having
multiple destinations, you find that one is not working only in
the resume function and you want to try another one. Now you
can change the file descriptor, remove the old one and add the
new descriptor and then wait for it to be triggered.
  • Loading branch information
ionutrazvanionita committed Feb 18, 2016
1 parent 0dca32b commit ff09d49
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 17 deletions.
7 changes: 4 additions & 3 deletions async.h
Expand Up @@ -36,9 +36,10 @@
* NOTE: all values in this enum must be negative
*/
enum async_ret_code {
ASYNC_NO_IO = -5,
ASYNC_NO_IO = -6,
ASYNC_SYNC,
ASYNC_CONTINUE,
ASYNC_CHANGE_FD,
ASYNC_DONE_CLOSE_FD,
ASYNC_DONE,
};
Expand All @@ -61,15 +62,15 @@ typedef int (async_start_function)
(struct sip_msg *msg, struct action* a , int resume_route);

typedef int (async_resume_function)
(int fd, void *param);
(int *fd, void *param);

extern async_start_function *async_start_f;
extern async_resume_function *async_resume_f;

int register_async_handlers(async_start_function *f1, async_resume_function *f2);


/* async related functions to be used by the
/* async related functions to be used by the
* functions exported by modules */
typedef int (async_resume_module)
(int fd, struct sip_msg *msg, void *param);
Expand Down
51 changes: 45 additions & 6 deletions modules/tm/async.c
Expand Up @@ -70,7 +70,7 @@ static inline void run_resume_route( int resume_route, struct sip_msg *msg)

/* function triggered from reactor in order to continue the processing
*/
int t_resume_async(int fd, void *param)
int t_resume_async(int *fd, void *param)
{
static struct sip_msg faked_req;
static struct ua_client uac;
Expand All @@ -83,7 +83,7 @@ int t_resume_async(int fd, void *param)
struct cell *t= ctx->t;
int route;

LM_DBG("resuming on fd %d, transaction %p \n",fd, t);
LM_DBG("resuming on fd %d, transaction %p \n",*fd, t);

if (current_processing_ctx) {
LM_CRIT("BUG - a context already set!\n");
Expand Down Expand Up @@ -122,18 +122,47 @@ int t_resume_async(int fd, void *param)

async_status = ASYNC_DONE; /* assume default status as done */
/* call the resume function in order to read and handle data */
return_code = ctx->resume_f( fd, &faked_req, ctx->resume_param );
return_code = ctx->resume_f( *fd, &faked_req, ctx->resume_param );
if (async_status==ASYNC_CONTINUE) {
/* do not run the resume route */
goto restore;
} else if (async_status==ASYNC_CHANGE_FD) {
if (return_code<0) {
LM_ERR("ASYNC_CHANGE_FD: given file descriptor shall be positive!\n");
goto restore;
} else if (return_code > 0 && return_code == *fd) {
/*trying to add the same fd; shall continue*/
LM_CRIT("You are trying to replace the old fd with the same fd!"
"Will act as in ASYNC_CONTINUE!\n");
goto restore;
}

/* remove the old fd from the reactor */
reactor_del_reader( *fd, -1, IO_FD_CLOSING);
*fd=return_code;

/* insert the new fd inside the reactor */
if (reactor_add_reader( *fd, F_SCRIPT_ASYNC, RCT_PRIO_ASYNC, (void*)ctx)<0 ) {
LM_ERR("failed to add async FD to reactor -> act in sync mode\n");
do {
return_code = ctx->resume_f( *fd, &faked_req, ctx->resume_param );
if (async_status == ASYNC_CHANGE_FD)
*fd=return_code;
} while(async_status==ASYNC_CONTINUE||async_status==ASYNC_CHANGE_FD);
goto route;
}

/* changed fd; now restore old state */
goto restore;
}

/* remove from reactor, we are done */
reactor_del_reader( fd, -1, IO_FD_CLOSING);
reactor_del_reader( *fd, -1, IO_FD_CLOSING);

if (async_status == ASYNC_DONE_CLOSE_FD)
close(fd);
close(*fd);

route:
/* run the resume_route (some type as the original one) */
swap_route_type(route, ctx->route_type);
run_resume_route( ctx->resume_route, &faked_req);
Expand Down Expand Up @@ -216,6 +245,13 @@ int t_handle_async(struct sip_msg *msg, struct action* a , int resume_route)
} else if (async_status==ASYNC_SYNC) {
/* IO already done in SYNC'ed way */
goto resume;
} else if (async_status==ASYNC_CHANGE_FD) {
LM_ERR("Incorrect ASYNC_CHANGE_FD status usage!"
"You should use this status only from the"
"resume function in case something went wrong"
"and you have other alternatives!\n");
/*FIXME should we go to resume or exit?it's quite an invalid scenario */
goto resume;
} else {
/* generic error, go for resume route */
goto resume;
Expand Down Expand Up @@ -274,9 +310,12 @@ int t_handle_async(struct sip_msg *msg, struct action* a , int resume_route)
/* run the resume function */
do {
return_code = ctx_f( fd, msg, ctx_p );
} while(async_status==ASYNC_CONTINUE);
if (async_status == ASYNC_CHANGE_FD)
fd = return_code;
} while(async_status==ASYNC_CONTINUE||async_status==ASYNC_CHANGE_FD);
/* run the resume route in sync mode */
run_resume_route( resume_route, msg);

/* break original script */
return 0;

Expand Down
2 changes: 1 addition & 1 deletion modules/tm/async.h
Expand Up @@ -34,7 +34,7 @@
*/
int t_handle_async(struct sip_msg *msg, struct action* a , int resume_route);

int t_resume_async(int fd, void *param);
int t_resume_async(int *fd, void *param);


#endif
Expand Down
12 changes: 6 additions & 6 deletions net/net_tcp_proc.c
Expand Up @@ -48,7 +48,7 @@ static void tcpconn_release(struct tcp_connection* c, long state,int writer)
LM_DBG(" extra_data %p\n", c->extra_data);

/* if we are in a writer context, do not touch the buffer contain read packets per connection
might be in a completely different process
might be in a completely different process
even if in our process we shouldn't touch it, since it might currently be in use, when we've read multiple SIP messages in one try*/
if (!writer && c->con_req) {
pkg_free(c->con_req);
Expand All @@ -61,8 +61,8 @@ static void tcpconn_release(struct tcp_connection* c, long state,int writer)
/* errno==EINTR, EWOULDBLOCK a.s.o todo */
response[0]=(long)c;
response[1]=state;
if (send_all((state==ASYNC_WRITE)?unix_tcp_sock:tcpmain_sock, response,

if (send_all((state==ASYNC_WRITE)?unix_tcp_sock:tcpmain_sock, response,
sizeof(response))<=0)
LM_ERR("send_all failed\n");
}
Expand All @@ -71,7 +71,7 @@ static void tcpconn_release(struct tcp_connection* c, long state,int writer)
/* wrapper around internal tcpconn_release() - to be called by functions which
* used tcp_conn_get(), in order to release the connection;
* It does the unref and pushes back (if needed) some update to TCP main;
* right now, it used only from the xxx_send() functions
* right now, it used only from the xxx_send() functions
*/
void tcp_conn_release(struct tcp_connection* c, int pending_data)
{
Expand Down Expand Up @@ -117,7 +117,7 @@ inline static int handle_io(struct fd_map* fm, int idx,int event_type)
handle_timer_job();
break;
case F_SCRIPT_ASYNC:
async_resume_f( fm->fd, fm->data);
async_resume_f( &fm->fd, fm->data);
return 0;
case F_TCPMAIN:
again:
Expand Down Expand Up @@ -173,7 +173,7 @@ inline static int handle_io(struct fd_map* fm, int idx,int event_type)
tcpconn_listrm(tcp_conn_lst, con, c_next, c_prev);
goto con_error;
}

/* mark that the connection is currently in our process
future writes to this con won't have to acquire FD */
con->proc_id = process_no;
Expand Down
2 changes: 1 addition & 1 deletion net/net_udp.c
Expand Up @@ -264,7 +264,7 @@ inline static int handle_io(struct fd_map* fm, int idx,int event_type)
handle_timer_job();
return 0;
case F_SCRIPT_ASYNC:
async_resume_f( fm->fd, fm->data);
async_resume_f( &fm->fd, fm->data);
return 0;
default:
LM_CRIT("uknown fd type %d in UDP worker\n", fm->type);
Expand Down

0 comments on commit ff09d49

Please sign in to comment.