diff --git a/async.h b/async.h index 9abd79a69bf..4e9716e41fd 100644 --- a/async.h +++ b/async.h @@ -36,9 +36,10 @@ * NOTE: all values in this enum must be negative */ enum async_ret_code { - ASYNC_NO_IO = -5, + ASYNC_NO_IO = -6, ASYNC_SYNC, ASYNC_CONTINUE, + ASYNC_CHANGE_FD, ASYNC_DONE_CLOSE_FD, ASYNC_DONE, }; @@ -61,7 +62,7 @@ typedef int (async_start_function) (struct sip_msg *msg, struct action* a , int resume_route); typedef int (async_resume_function) - (int fd, void *param); + (int *fd, void *param); extern async_start_function *async_start_f; extern async_resume_function *async_resume_f; @@ -69,7 +70,7 @@ extern async_resume_function *async_resume_f; int register_async_handlers(async_start_function *f1, async_resume_function *f2); -/* async related functions to be used by the +/* async related functions to be used by the * functions exported by modules */ typedef int (async_resume_module) (int fd, struct sip_msg *msg, void *param); diff --git a/db/db.h b/db/db.h index 16590bbc071..6603fdfa99d 100644 --- a/db/db.h +++ b/db/db.h @@ -282,11 +282,13 @@ typedef int (*db_insert_update_f) (const db_con_t* _h, const db_key_t* _k, * * \param _h structure representing the database handle * \param _s the SQL query + * \param _data data that shall be populated by the engine; + * !!! must be handled by the upper layers * \return * success: Unix FD for polling * failure: negative error code */ -typedef int (*db_async_raw_query_f) (db_con_t *_h, const str *_q); +typedef int (*db_async_raw_query_f) (db_con_t *_h, const str *_q, void **_data); /* * \brief Reads data from the given fd's SQL connection. Populates the query @@ -295,6 +297,8 @@ typedef int (*db_async_raw_query_f) (db_con_t *_h, const str *_q); * \param _h structure representing the database handle * \param fd read file descriptor obtained in starting phase * \param _r structure for the result + * \param _data data that shall be populated by the engine; + * !!! must be handled by the upper layers * \return: * -> 0 on success, negative on failure * -> also populates the global "async_status": ASYNC_CONTINUE / ASYNC_DONE @@ -304,7 +308,8 @@ typedef int (*db_async_raw_query_f) (db_con_t *_h, const str *_q); * backend-specific results have already been freed! * You only need to call db_free_result(_r) when done */ -typedef int (*db_async_raw_resume_f) (db_con_t *_h, int fd, db_res_t **_r); +typedef int (*db_async_raw_resume_f) (db_con_t *_h, int fd, db_res_t **_r, + void *_data); /** * \brief Database module callbacks diff --git a/modules/avpops/avpops_impl.c b/modules/avpops/avpops_impl.c index 0bc57921174..c48f69da7ef 100644 --- a/modules/avpops/avpops_impl.c +++ b/modules/avpops/avpops_impl.c @@ -811,6 +811,8 @@ int ops_async_dbquery(struct sip_msg* msg, async_resume_module **rfunc, query_async_param *param; str qstr; + void *_tmp_db_param; + if (!msg || !query) { LM_ERR("bad parameters\n"); @@ -843,7 +845,7 @@ int ops_async_dbquery(struct sip_msg* msg, async_resume_module **rfunc, return rc == 1 ? -2 : (rc != 0 ? -1 : 1); } - read_fd = url->dbf.async_raw_query(url->hdl, &qstr); + read_fd = url->dbf.async_raw_query(url->hdl, &qstr, &_tmp_db_param); if (read_fd < 0) { *rparam = NULL; @@ -865,6 +867,7 @@ int ops_async_dbquery(struct sip_msg* msg, async_resume_module **rfunc, param->output_avps = dest; param->hdl = url->hdl; param->dbf = &url->dbf; + param->db_param = _tmp_db_param; async_status = read_fd; return 1; @@ -876,8 +879,8 @@ int resume_async_dbquery(int fd, struct sip_msg *msg, void *_param) query_async_param *param = (query_async_param *)_param; int rc; - rc = param->dbf->async_raw_resume(param->hdl, fd, &res); - if (async_status == ASYNC_CONTINUE) { + rc = param->dbf->async_raw_resume(param->hdl, fd, &res, param->db_param); + if (async_status == ASYNC_CONTINUE || async_status == ASYNC_CHANGE_FD) { return rc; } @@ -901,6 +904,8 @@ int resume_async_dbquery(int fd, struct sip_msg *msg, void *_param) return -1; } + async_status=ASYNC_DONE; + db_free_result(res); pkg_free(param); diff --git a/modules/avpops/avpops_impl.h b/modules/avpops/avpops_impl.h index b8b11af7547..fd3c6d9fda4 100644 --- a/modules/avpops/avpops_impl.h +++ b/modules/avpops/avpops_impl.h @@ -121,6 +121,7 @@ typedef struct _query_async_param pvname_list_t *output_avps; db_con_t *hdl; db_func_t *dbf; + void *db_param; } query_async_param; diff --git a/modules/db_mysql/dbase.c b/modules/db_mysql/dbase.c index da2fe26cb1f..0950ad97926 100644 --- a/modules/db_mysql/dbase.c +++ b/modules/db_mysql/dbase.c @@ -1125,7 +1125,7 @@ static inline int db_mysql_get_con_fd(void *con) * success: Unix FD for polling * failure: negative error code */ -int db_mysql_async_raw_query(db_con_t *_h, const str *_s) +int db_mysql_async_raw_query(db_con_t *_h, const str *_s, void** _data) { int *fd_ref; int code, i; @@ -1197,7 +1197,7 @@ int db_mysql_async_raw_query(db_con_t *_h, const str *_s) return -2; } -int db_mysql_async_raw_resume(db_con_t *_h, int fd, db_res_t **_r) +int db_mysql_async_raw_resume(db_con_t *_h, int fd, db_res_t **_r, void* _data) { struct pool_con *con; int rc; diff --git a/modules/db_mysql/dbase.h b/modules/db_mysql/dbase.h index e038b10e12c..25fcca3fa63 100644 --- a/modules/db_mysql/dbase.h +++ b/modules/db_mysql/dbase.h @@ -111,7 +111,7 @@ int db_last_inserted_id(const db_con_t* _h); * success: Unix FD for polling * failure: negative error code */ -int db_mysql_async_raw_query(db_con_t *_h, const str *_s); +int db_mysql_async_raw_query(db_con_t *_h, const str *_s, void** _data); /* * Reads data from the given fd's connection. @@ -125,7 +125,7 @@ int db_mysql_async_raw_query(db_con_t *_h, const str *_s); * backend-specific results have already been freed! * You only need to call db_free_result(_r) when done */ -int db_mysql_async_raw_resume(db_con_t *_h, int fd, db_res_t **_r); +int db_mysql_async_raw_resume(db_con_t *_h, int fd, db_res_t **_r, void* _data); /* * Insert a row into table, update on duplicate key diff --git a/modules/db_virtual/README b/modules/db_virtual/README index 096afbccf93..70bc8bfeaf4 100644 --- a/modules/db_virtual/README +++ b/modules/db_virtual/README @@ -97,6 +97,8 @@ Chapter 1. Admin Guide dbb->replace 1 0 dbb->last_inserted_id 0 0 dbb->insert_update 1 1 + dbb->async_raw_query 0 1 + dbb->async_raw_resume 0 1 Note 1: The capabilities returned are the minimum common denominator of all the dbs in the set. The capabilities are @@ -107,6 +109,10 @@ Chapter 1. Admin Guide but conceptual not allowed operations will be done on a single db. Ex: query will only query one db. + Note 3: Since version 2.2 db_virtual supports async_raw_query + and async_raw_resume functions currently implemented only by + the mysql database engine. + 1.1.3. Failures When an operation from a process on a real db fails: diff --git a/modules/db_virtual/db_virtual.c b/modules/db_virtual/db_virtual.c index 6a49881ecc9..4d36aee7595 100644 --- a/modules/db_virtual/db_virtual.c +++ b/modules/db_virtual/db_virtual.c @@ -473,6 +473,8 @@ int db_virtual_bind_api(const str* mod, db_func_t *dbb) dbb->replace = db_virtual_replace; dbb->last_inserted_id = db_virtual_last_inserted_id; dbb->insert_update = db_virtual_insert_update; + dbb->async_raw_query = db_virtual_async_raw_query; + dbb->async_raw_resume = db_virtual_async_raw_resume; return 0; } diff --git a/modules/db_virtual/db_virtual.h b/modules/db_virtual/db_virtual.h index ec8cbc96a43..92ec5d91187 100644 --- a/modules/db_virtual/db_virtual.h +++ b/modules/db_virtual/db_virtual.h @@ -45,15 +45,15 @@ #define DB_CAP_FAILOVER (0 | DB_CAP_QUERY | DB_CAP_RAW_QUERY | DB_CAP_INSERT | \ DB_CAP_DELETE | DB_CAP_UPDATE | DB_CAP_REPLACE | DB_CAP_FETCH | \ -DB_CAP_LAST_INSERTED_ID | DB_CAP_INSERT_UPDATE) +DB_CAP_LAST_INSERTED_ID | DB_CAP_INSERT_UPDATE | DB_CAP_ASYNC_RAW_QUERY) #define DB_CAP_PARALLEL (0 | DB_CAP_QUERY | DB_CAP_RAW_QUERY | DB_CAP_INSERT | \ DB_CAP_DELETE | DB_CAP_UPDATE | DB_CAP_REPLACE | DB_CAP_FETCH | \ -DB_CAP_LAST_INSERTED_ID | DB_CAP_INSERT_UPDATE) +DB_CAP_LAST_INSERTED_ID | DB_CAP_INSERT_UPDATE | DB_CAP_ASYNC_RAW_QUERY) #define DB_CAP_ROUND (0 | DB_CAP_QUERY | DB_CAP_RAW_QUERY | DB_CAP_INSERT | \ DB_CAP_FETCH | \ -DB_CAP_LAST_INSERTED_ID | DB_CAP_INSERT_UPDATE) +DB_CAP_LAST_INSERTED_ID | DB_CAP_INSERT_UPDATE | DB_CAP_ASYNC_RAW_QUERY) enum DB_MODE {FAILOVER=0, PARALLEL, ROUND}; diff --git a/modules/db_virtual/dbase.c b/modules/db_virtual/dbase.c index 5b7ca39141f..a1c5feb2f44 100644 --- a/modules/db_virtual/dbase.c +++ b/modules/db_virtual/dbase.c @@ -32,6 +32,7 @@ #include "dbase.h" #include "../../timer.h" +#define MAXBUF (1<<14) /* Conceptual allowed operations * parallel round dbb->use_table @@ -512,3 +513,180 @@ int db_virtual_insert_update(const db_con_t* _h, const db_key_t* _k, { db_generic_operation2(insert_update(handle->con, _k, _v, _n),1, 1, 1); } + +#define CURRCON(_ah) (_ah->current_con) + +#define db_generic_async_operation(_h,_ah, _resume_f, FUNC_WITH_ARGS) \ +do { \ + int mode; \ + int rc=0; \ + handle_con_t * handle; \ + db_func_t * f; \ + handle_set_t * p = (handle_set_t*)_h->tail; \ + \ + LM_DBG("f call handle size = %i\n", p->size); \ + \ + get_update_flags(p); \ + try_reconnect(p); \ + \ + mode = global->set_list[p->set_index].set_mode; \ + \ + if (mode == PARALLEL) { \ + LM_WARN("PARALLEL not supported in async! using FAILOVER!\n"); \ + } else if (mode != FAILOVER && mode != ROUND) { \ + LM_ERR("mode %d not supported!\n", mode); \ + return -1; \ + } \ + \ + do { \ + handle = &p->con_list[CURRCON(_ah)]; \ + f = &global->set_list[p->set_index].db_list[CURRCON(_ah)].dbf; \ + \ + if((handle->flags & CAN_USE) && (handle->flags & MAY_USE)){ \ + LM_DBG("flags1 = %i\n", p->con_list[CURRCON(_ah)].flags); \ + \ + rc=f->FUNC_WITH_ARGS; \ + \ + if (rc<0) { \ + /* FIXME quite a complicated case \ + * if the db disconected by any means then \ + * anything shall be ok if continue with other DB \ + * if cannot open new connections to mysql \ + * then things are gonna be messed up if continuing */ \ + LM_ERR("failover call failed rc:%d\n", rc); \ + /* set local can not use flag*/ \ + handle->flags &= NOT_CAN_USE; \ + \ + /* close connection*/ \ + set_update_flags(CURRCON(_ah), p); \ + \ + f->close(handle->con); \ + /* if failed before placing the fd in reactor \ + * we keep on */ \ + if ((--_ah->cons_rem) == 0) { \ + LM_ERR("All databases failed!! No hope for you!\n"); \ + return -1; \ + } \ + \ + /* try next*/ \ + rc = -1; \ + CURRCON(_ah) = (CURRCON(_ah)+1)%p->size; \ + } else { \ + if (_resume_f) \ + async_status = ASYNC_CHANGE_FD; \ + set_update_flags(CURRCON(_ah), p); \ + return rc; \ + } \ + } else { \ + LM_DBG("flags2 = %i\n", p->con_list[CURRCON(_ah)].flags); \ + if ((--_ah->cons_rem) == 0) { \ + LM_ERR("All databases failed!! No hope for you!\n"); \ + return -1; \ + } \ + \ + /* try next*/ \ + rc = -1; \ + CURRCON(_ah) = (CURRCON(_ah)+1)%p->size; \ + } \ + LM_DBG("curent_con = %i\n", CURRCON(_ah)); \ + } while ((_ah)->cons_rem); /* should never exit here */ \ + \ + return rc; \ +}while (0); + + +int db_virtual_async_raw_query(db_con_t *_h, const str *_s, void **_data) +{ + handle_async_t* _ah; + handle_con_t * _handle; + handle_set_t * _p = (handle_set_t*)_h->tail; + + if (_s->len > MAXBUF) { + LM_ERR("query exceeds buffer size(%d)!\n", MAXBUF); + return -1; + } + + if ((_ah=pkg_malloc(sizeof(handle_async_t)+_s->len)) == NULL) { + LM_ERR("no more pkg\n"); + return -1; + } else { + /* automatically jump to next DB destination only for ROUND ROBIN + * else, for failover, will jump only if something goes wrong */ + if (global->set_list[_p->set_index].set_mode == ROUND) + _p->curent_con = (_p->curent_con+1)%_p->size; + + _ah->current_con = _p->curent_con; + _ah->cons_rem = _p->size; + + /* store the query for further calls */ + _ah->query.len = _s->len; + _ah->query.s = (char*)(_ah+1); + memcpy(_ah->query.s, _s->s, _s->len); + + *_data = _ah; + } + + _handle = &_p->con_list[CURRCON(_ah)]; + + db_generic_async_operation(_h, _ah,0, async_raw_query(_handle->con, _s, NULL) ); + return 0; +} + + +int db_virtual_async_raw_resume(db_con_t *_h, int fd, db_res_t **_r, void *_data) +{ + + handle_async_t *_ah; + db_func_t * _f; + handle_con_t * _handle; + handle_set_t * _p = (handle_set_t*)_h->tail; + + if (_data == NULL) { + LM_ERR("Expecting async handle! Nothing received!\n"); + return -1; + } else { + _ah = (handle_async_t*)_data; + } + + _handle = &_p->con_list[CURRCON(_ah)]; + + _f = &global->set_list[_p->set_index].db_list[CURRCON(_ah)].dbf; + + /* call the resume function */ + if (_f->async_raw_resume(_handle->con, fd, _r, NULL) < 0) { + _handle->flags &= NOT_CAN_USE; + /* close connection*/ + _f->close(_handle->con); + + /* we did all we could, but no con worked + * do something to those DBs */ + if ((--_ah->cons_rem) == 0) { + LM_ERR("All databases failed!! No hope for you!\n"); + goto out_err_free; + } + + /* try next DB; no matter RR or FAILOVER */ + CURRCON(_ah) = (CURRCON(_ah) +1)%_p->size; + _handle = &_p->con_list[CURRCON(_ah)]; + + /* try the next database connection */ + db_generic_async_operation(_h, _ah,1, + async_raw_query(_handle->con, &_ah->query, NULL) ); + } + + /* if here means it worked; we set this connection as current connection + * for other messages to come */ + _p->curent_con = CURRCON(_ah); + + async_status = ASYNC_DONE; + + pkg_free(_ah); + return 0; + +out_err_free: + pkg_free(_ah); + return -1; + +} + +#undef CURRCON diff --git a/modules/db_virtual/dbase.h b/modules/db_virtual/dbase.h index eda62abb4c0..25d5835885b 100644 --- a/modules/db_virtual/dbase.h +++ b/modules/db_virtual/dbase.h @@ -119,6 +119,12 @@ typedef struct handle_private { } handle_private_t; +typedef struct handle_async { + int current_con; /* current connection index */ + int cons_rem; /* number of cons to try */ + str query; /* the query for this function call */ +} handle_async_t; + /* * Initialize database connection */ @@ -194,6 +200,15 @@ int db_virtual_last_inserted_id(const db_con_t* _h); int db_virtual_insert_update(const db_con_t* _h, const db_key_t* _k, const db_val_t* _v, const int _n); +/* + * Async raw SQL query + */ +int db_virtual_async_raw_query(db_con_t *_h, const str *_s, void **_data); + +/* + * Async raw SQL query resume function + */ +int db_virtual_async_raw_resume(db_con_t *_h, int fd, db_res_t **_r, void *_data); /* * Store name of table that will be used by diff --git a/modules/db_virtual/doc/db_virtual_admin.xml b/modules/db_virtual/doc/db_virtual_admin.xml index 63973a4a2f8..c095a6593ff 100644 --- a/modules/db_virtual/doc/db_virtual_admin.xml +++ b/modules/db_virtual/doc/db_virtual_admin.xml @@ -1,9 +1,9 @@ - + &adminguide; - +
Overview @@ -14,7 +14,7 @@ This means that a virtual db url translates to many real db urls. This virtual layer also enables us to use the real dbs in multiple ways such as: parallel, failover(hotswap), round-robin. - Therefore: + Therefore: each virtual db url with associated real dbs and a way to use(mode) it's real dbs must be specified. @@ -70,6 +70,8 @@ dbb->replace 1 0 dbb->last_inserted_id 0 0 dbb->insert_update 1 1 + dbb->async_raw_query 0 1 + dbb->async_raw_resume 0 1 @@ -81,6 +83,11 @@ will be done on a single db. Ex: query will only query one db. + + Note 3: Since version 2.2 db_virtual supports async_raw_query and async_raw_resume functions currently + implemented only by the mysql database engine. + +
@@ -254,7 +261,7 @@ modparam("db_virtual", "db_max_consec_retrys", 20) Return information about global state of the real dbs. - Name: + Name: db_get Parameters: diff --git a/modules/sipcapture/sipcapture.c b/modules/sipcapture/sipcapture.c index 298ae06aedd..da538271bcf 100644 --- a/modules/sipcapture/sipcapture.c +++ b/modules/sipcapture/sipcapture.c @@ -127,6 +127,7 @@ struct _sipcapture_object { #define TABLE_LEN 256 #define NR_KEYS 37 +typedef void* sc_async_param_t; db_key_t db_keys[NR_KEYS]; /* module function prototypes */ @@ -1118,6 +1119,8 @@ static int db_async_store(db_val_t* db_vals, int read_fd; str query_str; + sc_async_param_t as_param; + if (!DB_CAPABILITY(db_funcs, DB_CAP_ASYNC_RAW_QUERY)) { LM_WARN("This database module does not have async queries!" "Using sync insert!\n"); @@ -1183,7 +1186,7 @@ static int db_async_store(db_val_t* db_vals, query_str.s = query_buf; query_str.len = query_len; - read_fd = db_funcs.async_raw_query(db_con, &query_str); + read_fd = db_funcs.async_raw_query(db_con, &query_str, &as_param); lock_release(&query_lock); @@ -1192,7 +1195,9 @@ static int db_async_store(db_val_t* db_vals, *resume_f = NULL; return -1; } - *resume_param = (void*)((unsigned long int)read_fd); + + + *resume_param = as_param; *resume_f = resume_async_dbquery; async_status = read_fd; @@ -1214,12 +1219,9 @@ static int db_async_store(db_val_t* db_vals, int resume_async_dbquery(int fd, struct sip_msg *msg, void *_param) { int rc; - unsigned long int param_fd; - - param_fd = (int)((unsigned long int)_param); - rc = db_funcs.async_raw_resume(db_con, (int)param_fd, NULL); - if (async_status == ASYNC_CONTINUE) + rc = db_funcs.async_raw_resume(db_con, fd, NULL, (sc_async_param_t)_param); + if (async_status == ASYNC_CONTINUE || async_status == ASYNC_CHANGE_FD) return rc; if (rc != 0) { diff --git a/modules/tm/async.c b/modules/tm/async.c index 6e02d68458e..239ba293454 100644 --- a/modules/tm/async.c +++ b/modules/tm/async.c @@ -70,7 +70,7 @@ static inline void run_resume_route( int resume_route, struct sip_msg *msg) /* function triggered from reactor in order to continue the processing */ -int t_resume_async(int fd, void *param) +int t_resume_async(int *fd, void *param) { static struct sip_msg faked_req; static struct ua_client uac; @@ -83,7 +83,7 @@ int t_resume_async(int fd, void *param) struct cell *t= ctx->t; int route; - LM_DBG("resuming on fd %d, transaction %p \n",fd, t); + LM_DBG("resuming on fd %d, transaction %p \n",*fd, t); if (current_processing_ctx) { LM_CRIT("BUG - a context already set!\n"); @@ -122,18 +122,47 @@ int t_resume_async(int fd, void *param) async_status = ASYNC_DONE; /* assume default status as done */ /* call the resume function in order to read and handle data */ - return_code = ctx->resume_f( fd, &faked_req, ctx->resume_param ); + return_code = ctx->resume_f( *fd, &faked_req, ctx->resume_param ); if (async_status==ASYNC_CONTINUE) { /* do not run the resume route */ goto restore; + } else if (async_status==ASYNC_CHANGE_FD) { + if (return_code<0) { + LM_ERR("ASYNC_CHANGE_FD: given file descriptor shall be positive!\n"); + goto restore; + } else if (return_code > 0 && return_code == *fd) { + /*trying to add the same fd; shall continue*/ + LM_CRIT("You are trying to replace the old fd with the same fd!" + "Will act as in ASYNC_CONTINUE!\n"); + goto restore; + } + + /* remove the old fd from the reactor */ + reactor_del_reader( *fd, -1, IO_FD_CLOSING); + *fd=return_code; + + /* insert the new fd inside the reactor */ + if (reactor_add_reader( *fd, F_SCRIPT_ASYNC, RCT_PRIO_ASYNC, (void*)ctx)<0 ) { + LM_ERR("failed to add async FD to reactor -> act in sync mode\n"); + do { + return_code = ctx->resume_f( *fd, &faked_req, ctx->resume_param ); + if (async_status == ASYNC_CHANGE_FD) + *fd=return_code; + } while(async_status==ASYNC_CONTINUE||async_status==ASYNC_CHANGE_FD); + goto route; + } + + /* changed fd; now restore old state */ + goto restore; } /* remove from reactor, we are done */ - reactor_del_reader( fd, -1, IO_FD_CLOSING); + reactor_del_reader( *fd, -1, IO_FD_CLOSING); if (async_status == ASYNC_DONE_CLOSE_FD) - close(fd); + close(*fd); +route: /* run the resume_route (some type as the original one) */ swap_route_type(route, ctx->route_type); run_resume_route( ctx->resume_route, &faked_req); @@ -216,6 +245,13 @@ int t_handle_async(struct sip_msg *msg, struct action* a , int resume_route) } else if (async_status==ASYNC_SYNC) { /* IO already done in SYNC'ed way */ goto resume; + } else if (async_status==ASYNC_CHANGE_FD) { + LM_ERR("Incorrect ASYNC_CHANGE_FD status usage!" + "You should use this status only from the" + "resume function in case something went wrong" + "and you have other alternatives!\n"); + /*FIXME should we go to resume or exit?it's quite an invalid scenario */ + goto resume; } else { /* generic error, go for resume route */ goto resume; @@ -274,9 +310,12 @@ int t_handle_async(struct sip_msg *msg, struct action* a , int resume_route) /* run the resume function */ do { return_code = ctx_f( fd, msg, ctx_p ); - } while(async_status==ASYNC_CONTINUE); + if (async_status == ASYNC_CHANGE_FD) + fd = return_code; + } while(async_status==ASYNC_CONTINUE||async_status==ASYNC_CHANGE_FD); /* run the resume route in sync mode */ run_resume_route( resume_route, msg); + /* break original script */ return 0; diff --git a/modules/tm/async.h b/modules/tm/async.h index 32b992c0ad6..6c5fa737976 100644 --- a/modules/tm/async.h +++ b/modules/tm/async.h @@ -34,7 +34,7 @@ */ int t_handle_async(struct sip_msg *msg, struct action* a , int resume_route); -int t_resume_async(int fd, void *param); +int t_resume_async(int *fd, void *param); #endif diff --git a/net/net_tcp_proc.c b/net/net_tcp_proc.c index 90049a99903..b68e9c96fbf 100644 --- a/net/net_tcp_proc.c +++ b/net/net_tcp_proc.c @@ -48,7 +48,7 @@ static void tcpconn_release(struct tcp_connection* c, long state,int writer) LM_DBG(" extra_data %p\n", c->extra_data); /* if we are in a writer context, do not touch the buffer contain read packets per connection - might be in a completely different process + might be in a completely different process even if in our process we shouldn't touch it, since it might currently be in use, when we've read multiple SIP messages in one try*/ if (!writer && c->con_req) { pkg_free(c->con_req); @@ -61,8 +61,8 @@ static void tcpconn_release(struct tcp_connection* c, long state,int writer) /* errno==EINTR, EWOULDBLOCK a.s.o todo */ response[0]=(long)c; response[1]=state; - - if (send_all((state==ASYNC_WRITE)?unix_tcp_sock:tcpmain_sock, response, + + if (send_all((state==ASYNC_WRITE)?unix_tcp_sock:tcpmain_sock, response, sizeof(response))<=0) LM_ERR("send_all failed\n"); } @@ -71,7 +71,7 @@ static void tcpconn_release(struct tcp_connection* c, long state,int writer) /* wrapper around internal tcpconn_release() - to be called by functions which * used tcp_conn_get(), in order to release the connection; * It does the unref and pushes back (if needed) some update to TCP main; - * right now, it used only from the xxx_send() functions + * right now, it used only from the xxx_send() functions */ void tcp_conn_release(struct tcp_connection* c, int pending_data) { @@ -117,7 +117,7 @@ inline static int handle_io(struct fd_map* fm, int idx,int event_type) handle_timer_job(); break; case F_SCRIPT_ASYNC: - async_resume_f( fm->fd, fm->data); + async_resume_f( &fm->fd, fm->data); return 0; case F_TCPMAIN: again: @@ -173,7 +173,7 @@ inline static int handle_io(struct fd_map* fm, int idx,int event_type) tcpconn_listrm(tcp_conn_lst, con, c_next, c_prev); goto con_error; } - + /* mark that the connection is currently in our process future writes to this con won't have to acquire FD */ con->proc_id = process_no; diff --git a/net/net_udp.c b/net/net_udp.c index 57bc9794c69..3b2e45cf5fc 100644 --- a/net/net_udp.c +++ b/net/net_udp.c @@ -264,7 +264,7 @@ inline static int handle_io(struct fd_map* fm, int idx,int event_type) handle_timer_job(); return 0; case F_SCRIPT_ASYNC: - async_resume_f( fm->fd, fm->data); + async_resume_f( &fm->fd, fm->data); return 0; default: LM_CRIT("uknown fd type %d in UDP worker\n", fm->type);