Skip to content

Commit

Permalink
db_mysql: implement async_raw_query and async_raw_resume
Browse files Browse the repository at this point in the history
The db_mysql module is now fully capable of running asynchronous raw
queries.
  • Loading branch information
liviuchircu committed Jan 23, 2015
1 parent d9411e9 commit 6c4f266
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 0 deletions.
2 changes: 2 additions & 0 deletions modules/db_mysql/db_mysql.c
Expand Up @@ -117,6 +117,8 @@ int db_mysql_bind_api(const str* mod, db_func_t *dbb)
dbb->replace = db_mysql_replace;
dbb->last_inserted_id = db_last_inserted_id;
dbb->insert_update = db_insert_update;
dbb->async_raw_query = db_mysql_async_raw_query;
dbb->async_raw_resume = db_mysql_async_raw_resume;

dbb->cap |= DB_CAP_MULTIPLE_INSERT;
return 0;
Expand Down
147 changes: 147 additions & 0 deletions modules/db_mysql/dbase.c
Expand Up @@ -41,6 +41,7 @@
#include "../../mem/mem.h"
#include "../../dprint.h"
#include "../../db/db_query.h"
#include "../../db/db_async.h"
#include "../../db/db_ut.h"
#include "../../db/db_insertq.h"
#include "val.h"
Expand Down Expand Up @@ -192,6 +193,32 @@ static inline int wrapper_single_mysql_real_query(const db_con_t *conn,
}
}

static inline int wrapper_single_mysql_send_query(const db_con_t *conn,
const str *query)
{
int code, error;
if (CON_DISCON(conn))
return -1;

code = mysql_send_query(CON_CONNECTION(conn), query->s, query->len);
if (code == 0)
return 0;

error = mysql_errno(CON_CONNECTION(conn));
switch (error) {
case CR_SERVER_GONE_ERROR:
case CR_SERVER_LOST:
case CR_COMMANDS_OUT_OF_SYNC:
return -1; /* reconnection error -> <0 */
default:
LM_CRIT("driver error (%i): %s\n", error,
mysql_error(CON_CONNECTION(conn)));
/* do not rely on libmysqlclient implementation
* specification says non-zero code on error, not positive code */
return 1;
}
}


static inline int connect_with_retry(const db_con_t *conn, const int max_tries)
{
Expand Down Expand Up @@ -1074,6 +1101,126 @@ int db_mysql_raw_query(const db_con_t* _h, const str* _s, db_res_t** _r)
db_mysql_store_result);
}

static inline int db_mysql_get_con_fd(void *con)
{
return ((struct my_con *)con)->con->net.fd;
}

/**
* Begins execution of a raw SQL query. Returns immediately.
*
* \param _h handle for the database
* \param _s raw query string
* \return
* success: Unix FD for polling
* failure: negative error code
*/
int db_mysql_async_raw_query(db_con_t *_h, const str *_s)
{
int *fd_ref;
int code, i;
struct timeval start;
struct my_con *con;

if (!_h || !_s || !_s->s) {
LM_ERR("invalid parameter value\n");
return -1;
}

con = (struct my_con *)db_switch_to_async(_h, db_mysql_get_con_fd, &fd_ref,
(void *)db_mysql_new_connection);
if (!con)
LM_INFO("Failed to open new connection (current: 1 + %d). Running "
"in sync mode!\n", ((struct pool_con *)_h->tail)->no_transfers);

/* no prepared statements support */
CON_RESET_CURR_PS(_h);

for (i = 0; i < 2; i++) {
start_expire_timer(start, db_mysql_exec_query_threshold);

/* async mode */
if (con) {
code = wrapper_single_mysql_send_query(_h, _s);
/* sync mode */
} else {
code = wrapper_single_mysql_real_query(_h, _s);
}
stop_expire_timer(start, db_mysql_exec_query_threshold,
"mysql async query", _s->s, _s->len, 0);
if (code < 0) {
/* got disconnected during call */
switch_state_to_disconnected(_h);
if (connect_with_retry(_h, 3) != 0) {
/* mysql reconnection problem */
LM_ERR("failed to reconnect before trying "
"mysql_stmt_prepare()\n");
break;
}
/* if reconnected, run the loop again */
} else if (code > 0) {
/* other problems - error already logged by the wrapper */
goto out;
} else {
/* success */
mysql_raise_event(_h);

if (!con)
return -1;

i = db_mysql_get_con_fd(con);
*fd_ref = i;
db_switch_to_sync(_h);
async_status = i;
return 1;
}
}

mysql_raise_event(_h);
LM_CRIT("too many mysql server reconnection failures\n");

out:
if (!con)
return -1;

db_switch_to_sync(_h);
db_store_async_con(_h, (struct pool_con *)con);

return -2;
}

enum async_ret_code db_mysql_async_raw_resume(db_con_t *_h, int fd, db_res_t **_r)
{
struct pool_con *con;

con = db_match_async_con(fd, _h);
if (!con) {
LM_BUG("no conn match for fd %d", fd);
abort();
}

if (mysql_read_query_result(CON_CONNECTION(_h)) == 0) {
if (_r) {
if (db_mysql_store_result(_h, _r) != 0) {
LM_ERR("failed to store result\n");
return -1;
}
}

mysql_free_result(CON_RESULT(_h));
CON_RESULT(_h) = NULL;

db_switch_to_sync(_h);
db_store_async_con(_h, con);
return 1;
}

db_switch_to_sync(_h);
db_store_async_con(_h, con);
async_status = ASYNC_CONTINUE;
return 1;
}


/**
* Insert a row into a specified table.
Expand Down
20 changes: 20 additions & 0 deletions modules/db_mysql/dbase.h
Expand Up @@ -105,6 +105,26 @@ int db_mysql_replace(const db_con_t* handle, const db_key_t* keys, const db_val_
*/
int db_last_inserted_id(const db_con_t* _h);

/*
* Begins execution of a raw MySQL query. Possibly opens new TCP connections up
* to "db_max_async_connections". Returns immediately.
*
* \return
* success: Unix FD for polling
* failure: negative error code
*/
int db_mysql_async_raw_query(db_con_t *_h, const str *_s);

/*
* Reads data from the given fd's connection.
*
* !!! IMPORTANT:
* if data is fully read (async_status == ASYNC_DONE),
* backend-specific results have already been freed!
* You only need to call db_free_result(_r) when done
*/
enum async_ret_code db_mysql_async_raw_resume(db_con_t *_h, int fd, db_res_t **_r);

/*
* Insert a row into table, update on duplicate key
*/
Expand Down

0 comments on commit 6c4f266

Please sign in to comment.