diff --git a/modules/db_mysql/db_mysql.c b/modules/db_mysql/db_mysql.c index 20690fa3e50..e5d924c3b16 100644 --- a/modules/db_mysql/db_mysql.c +++ b/modules/db_mysql/db_mysql.c @@ -117,6 +117,8 @@ int db_mysql_bind_api(const str* mod, db_func_t *dbb) dbb->replace = db_mysql_replace; dbb->last_inserted_id = db_last_inserted_id; dbb->insert_update = db_insert_update; + dbb->async_raw_query = db_mysql_async_raw_query; + dbb->async_raw_resume = db_mysql_async_raw_resume; dbb->cap |= DB_CAP_MULTIPLE_INSERT; return 0; diff --git a/modules/db_mysql/dbase.c b/modules/db_mysql/dbase.c index e011564a0a3..c3be4814aba 100644 --- a/modules/db_mysql/dbase.c +++ b/modules/db_mysql/dbase.c @@ -41,6 +41,7 @@ #include "../../mem/mem.h" #include "../../dprint.h" #include "../../db/db_query.h" +#include "../../db/db_async.h" #include "../../db/db_ut.h" #include "../../db/db_insertq.h" #include "val.h" @@ -192,6 +193,32 @@ static inline int wrapper_single_mysql_real_query(const db_con_t *conn, } } +static inline int wrapper_single_mysql_send_query(const db_con_t *conn, + const str *query) +{ + int code, error; + if (CON_DISCON(conn)) + return -1; + + code = mysql_send_query(CON_CONNECTION(conn), query->s, query->len); + if (code == 0) + return 0; + + error = mysql_errno(CON_CONNECTION(conn)); + switch (error) { + case CR_SERVER_GONE_ERROR: + case CR_SERVER_LOST: + case CR_COMMANDS_OUT_OF_SYNC: + return -1; /* reconnection error -> <0 */ + default: + LM_CRIT("driver error (%i): %s\n", error, + mysql_error(CON_CONNECTION(conn))); + /* do not rely on libmysqlclient implementation + * specification says non-zero code on error, not positive code */ + return 1; + } +} + static inline int connect_with_retry(const db_con_t *conn, const int max_tries) { @@ -1074,6 +1101,126 @@ int db_mysql_raw_query(const db_con_t* _h, const str* _s, db_res_t** _r) db_mysql_store_result); } +static inline int db_mysql_get_con_fd(void *con) +{ + return ((struct my_con *)con)->con->net.fd; +} + +/** + * Begins execution of a raw SQL query. Returns immediately. + * + * \param _h handle for the database + * \param _s raw query string + * \return + * success: Unix FD for polling + * failure: negative error code + */ +int db_mysql_async_raw_query(db_con_t *_h, const str *_s) +{ + int *fd_ref; + int code, i; + struct timeval start; + struct my_con *con; + + if (!_h || !_s || !_s->s) { + LM_ERR("invalid parameter value\n"); + return -1; + } + + con = (struct my_con *)db_switch_to_async(_h, db_mysql_get_con_fd, &fd_ref, + (void *)db_mysql_new_connection); + if (!con) + LM_INFO("Failed to open new connection (current: 1 + %d). Running " + "in sync mode!\n", ((struct pool_con *)_h->tail)->no_transfers); + + /* no prepared statements support */ + CON_RESET_CURR_PS(_h); + + for (i = 0; i < 2; i++) { + start_expire_timer(start, db_mysql_exec_query_threshold); + + /* async mode */ + if (con) { + code = wrapper_single_mysql_send_query(_h, _s); + /* sync mode */ + } else { + code = wrapper_single_mysql_real_query(_h, _s); + } + stop_expire_timer(start, db_mysql_exec_query_threshold, + "mysql async query", _s->s, _s->len, 0); + if (code < 0) { + /* got disconnected during call */ + switch_state_to_disconnected(_h); + if (connect_with_retry(_h, 3) != 0) { + /* mysql reconnection problem */ + LM_ERR("failed to reconnect before trying " + "mysql_stmt_prepare()\n"); + break; + } + /* if reconnected, run the loop again */ + } else if (code > 0) { + /* other problems - error already logged by the wrapper */ + goto out; + } else { + /* success */ + mysql_raise_event(_h); + + if (!con) + return -1; + + i = db_mysql_get_con_fd(con); + *fd_ref = i; + db_switch_to_sync(_h); + async_status = i; + return 1; + } + } + + mysql_raise_event(_h); + LM_CRIT("too many mysql server reconnection failures\n"); + +out: + if (!con) + return -1; + + db_switch_to_sync(_h); + db_store_async_con(_h, (struct pool_con *)con); + + return -2; +} + +enum async_ret_code db_mysql_async_raw_resume(db_con_t *_h, int fd, db_res_t **_r) +{ + struct pool_con *con; + + con = db_match_async_con(fd, _h); + if (!con) { + LM_BUG("no conn match for fd %d", fd); + abort(); + } + + if (mysql_read_query_result(CON_CONNECTION(_h)) == 0) { + if (_r) { + if (db_mysql_store_result(_h, _r) != 0) { + LM_ERR("failed to store result\n"); + return -1; + } + } + + mysql_free_result(CON_RESULT(_h)); + CON_RESULT(_h) = NULL; + + db_switch_to_sync(_h); + db_store_async_con(_h, con); + return 1; + } + + db_switch_to_sync(_h); + db_store_async_con(_h, con); + async_status = ASYNC_CONTINUE; + return 1; +} + /** * Insert a row into a specified table. diff --git a/modules/db_mysql/dbase.h b/modules/db_mysql/dbase.h index fe627e0dc5c..6b494084ebe 100644 --- a/modules/db_mysql/dbase.h +++ b/modules/db_mysql/dbase.h @@ -105,6 +105,26 @@ int db_mysql_replace(const db_con_t* handle, const db_key_t* keys, const db_val_ */ int db_last_inserted_id(const db_con_t* _h); +/* + * Begins execution of a raw MySQL query. Possibly opens new TCP connections up + * to "db_max_async_connections". Returns immediately. + * + * \return + * success: Unix FD for polling + * failure: negative error code + */ +int db_mysql_async_raw_query(db_con_t *_h, const str *_s); + +/* + * Reads data from the given fd's connection. + * + * !!! IMPORTANT: + * if data is fully read (async_status == ASYNC_DONE), + * backend-specific results have already been freed! + * You only need to call db_free_result(_r) when done + */ +enum async_ret_code db_mysql_async_raw_resume(db_con_t *_h, int fd, db_res_t **_r); + /* * Insert a row into table, update on duplicate key */