Skip to content

Commit

Permalink
DB core: add support for asynchronous raw SQL queries
Browse files Browse the repository at this point in the history
The DB core now allows SQL modules to register async raw query
functions. For normal, blocking queries, execution flow remains exactly
the same.

Documentation available in db/db_async.h
  • Loading branch information
liviuchircu committed Jan 22, 2015
1 parent 5081dde commit 92755be
Show file tree
Hide file tree
Showing 13 changed files with 320 additions and 28 deletions.
15 changes: 12 additions & 3 deletions db/db.c
Expand Up @@ -55,6 +55,7 @@

char *db_version_table = VERSION_TABLE;
char *db_default_url = NULL;
int db_max_async_connections = 10;

/** maximal length of a SQL URL */
static unsigned int MAX_URL_LENGTH = 255;
Expand Down Expand Up @@ -255,7 +256,7 @@ int db_bind_mod(const str* mod, db_func_t* mydbf)
db_con_t* db_do_init(const str* url, void* (*new_connection)())
{
struct db_id* id;
void* con;
struct pool_con* con;
db_con_t* res;

int con_size = sizeof(db_con_t) + sizeof(void *) + url->len;
Expand Down Expand Up @@ -297,17 +298,25 @@ db_con_t* db_do_init(const str* url, void* (*new_connection)())
if (!con) {
LM_DBG("connection %p not found in pool\n", id);
/* Not in the pool yet */
con = new_connection(id);
con = (struct pool_con *)new_connection(id);
if (!con) {
LM_ERR("could not add connection to the pool\n");
goto err;
}
pool_insert((struct pool_con*)con);
pool_insert(con);
LM_DBG("connection %p inserted in pool as %p\n", id,con);
} else {
LM_DBG("connection %p found in pool as %p\n", id,con);
}

if (!con->transfers) {
con->transfers = pkg_malloc(db_max_async_connections);
if (!con->transfers) {
LM_ERR("no more pkg\n");
goto err;
}
}

res->tail = (unsigned long)con;
return res;

Expand Down
39 changes: 36 additions & 3 deletions db/db.h
Expand Up @@ -274,6 +274,38 @@ typedef int (*db_last_inserted_id_f) (const db_con_t* _h);
typedef int (*db_insert_update_f) (const db_con_t* _h, const db_key_t* _k,
const db_val_t* _v, const int _n);

/**
* \brief Asynchronous raw SQL query on a separate TCP connection.
* Returns immediately.
*
* If all currently open connections are in use, it will attempt to open a new
* one, up to "db_max_async_connections". If maximum is reached, the query is
* done synchronously!
*
* \param _h structure representing the database handle
* \param _s the SQL query
* \return returns 1 if everything is OK, otherwise returns value < 0
*/
typedef int (*db_async_raw_query_f) (db_con_t *_h, const str *_q);

/*
* \brief Reads data from the given fd's SQL connection. Also fetches all data
* when it resumes fetching data for the last time.
*
* \param _h structure representing the database handle
* \param fd read file descriptor obtained in starting phase
* \param _r structure for the result
* \return:
* -> 1 on success, negative on failure
* -> also populates the global "async_status": ASYNC_CONTINUE / ASYNC_DONE
*
* !!! IMPORTANT:
* if data is fully read (async_status == ASYNC_DONE),
* backend-specific results have already been freed!
* You only need to call db_free_result(_r) when done
*/
typedef enum async_ret_code (*db_async_raw_resume_f) (db_con_t *_h,
int fd, db_res_t **_r);

/**
* \brief Database module callbacks
Expand All @@ -295,9 +327,10 @@ typedef struct db_func {
db_delete_f delete; /* Delete from table */
db_update_f update; /* Update table */
db_replace_f replace; /* Replace row in a table */
db_last_inserted_id_f last_inserted_id; /* Retrieve the last inserted ID
in a table */
db_insert_update_f insert_update; /* Insert into table, update on duplicate key */
db_last_inserted_id_f last_inserted_id; /* Retrieve the last inserted ID in a table */
db_insert_update_f insert_update; /* Insert into table, update on duplicate key */
db_async_raw_query_f async_raw_query; /* Starts an asynchronous raw query */
db_async_raw_resume_f async_raw_resume; /* Called if there is some data to be read */
} db_func_t;


Expand Down
137 changes: 137 additions & 0 deletions db/db_async.c
@@ -0,0 +1,137 @@
/*
* MySQL async connection array management
*
* Copyright (C) 2015 OpenSIPS Solutions
*
* This file is part of opensips, a free SIP server.
*
* opensips is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version
*
* opensips is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
* History:
* --------
* 2015-01-XX initial version (liviu)
*/

#include <stdlib.h>

#include "db_async.h"
#include "db_pool.h"
#include "../dprint.h"
#include "../error.h"

/*
* aux variable which holds the default connection (used in blocking mode)
* while async database operations are done (separate connections & queries)
*/
static struct pool_con *sync_con;

struct pool_con *db_switch_to_async(db_con_t *_h, get_con_fd_f get_fd, int **fd_ref,
void *(*new_connection)(const struct db_id *))
{
struct pool_con *con = (struct pool_con *)_h->tail;
void *new;

if (con->no_transfers == db_max_async_connections)
return NULL;

/* no idle connections for async queries? open a new one! */
if (!con->async_pool) {
new = new_connection(con->id);
if (!new) {
LM_ERR("failed to open new DB connection on "
"%s://XXXX:XXXX@%s:%d/%s\n", con->id->scheme,
con->id->host, con->id->port, con->id->database);
return NULL;
}
} else {
new = con->async_pool;
con->async_pool = con->async_pool->next;
}

*fd_ref = &con->transfers[con->no_transfers].fd;

con->transfers[con->no_transfers].fd = get_fd(new);
con->transfers[con->no_transfers].con = new;

LM_INFO(" %d/%d transfers: (%d - %p)\n", con->no_transfers + 1,
db_max_async_connections, con->transfers[con->no_transfers].fd,
con->transfers[con->no_transfers].con);

con->no_transfers++;


/* switch to async con */
sync_con = con;
_h->tail = (unsigned long)new;

return new;
}

void db_switch_to_sync(db_con_t *_h)
{
if (!sync_con) {
LM_BUG("sync_con == NULL");
abort();
}

/* switch to sync con */
_h->tail = (unsigned long)sync_con;
}

void db_store_async_con(db_con_t *_h, struct pool_con *con)
{
int i;
struct pool_con *tail = (struct pool_con *)_h->tail;

con->next = tail->async_pool;
tail->async_pool = con;

LM_INFO("XXXXXXXXXXXXXXXXXX RESTORE CONNNN: %p <<<<<<<<<<<<<<<\n", con);

for (i = 0; i < tail->no_transfers; i++) {
if (tail->transfers[i].con == con) {
tail->no_transfers--;
for (; i < tail->no_transfers; i++)
tail->transfers[i] = tail->transfers[i + 1];

return;
}
}

LM_BUG("DB con %p not found", con);
abort();
}

struct pool_con *db_match_async_con(int fd, db_con_t *_h)
{
int i, max;
struct db_transfer *transfers;

LM_INFO("XXX MATCH fd %d\n", fd);

transfers = ((struct pool_con *)_h->tail)->transfers;
max = ((struct pool_con *)_h->tail)->no_transfers;

for (i = 0; i < max; i++)
if (fd == transfers[i].fd) {
/* switch to async con */
sync_con = (struct pool_con *)_h->tail;
_h->tail = (unsigned long)transfers[i].con;

return (struct pool_con *)_h->tail;
}

return NULL;
}
84 changes: 84 additions & 0 deletions db/db_async.h
@@ -0,0 +1,84 @@
/*
* MySQL async connection array management
*
* Copyright (C) 2015 OpenSIPS Solutions
*
* This file is part of opensips, a free SIP server.
*
* opensips is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version
*
* opensips is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
* History:
* --------
* 2015-01-XX initial version (liviu)
*/

/**
* Basic primitives which enable async DB support on top of existing code
* while keeping the existing logic working exactly as before
*/

#ifndef DB_ASYNC_H
#define DB_ASYNC_H

#include "db_con.h"

typedef int (*get_con_fd_f) (void *con);

/**
* Sets up the DB handle for an asynchronous query. A new connections is
* opened if necessary, otherwise one is grabbed from the idle pool.
*
* params:
* _h - DB handle
* get_fd - function which returns a read file descriptor (used for polling)
* from the backend's connection structure
* fd_ref - if the connection fetched from the pool disconnected
* (requiring a reconnect operation), or if you happen to know its fd
* has changed, make sure you also update the reference passed on here
* new_connection - backend-specific function to allocate and set up a new con
*/
struct pool_con *db_switch_to_async(db_con_t *_h, get_con_fd_f get_fd, int **fd_ref,
void *(*new_connection)(const struct db_id *));

/**
* Restores the DB handle in its normal state (i.e. ready for blocking queries)
*
* MUST be called after initiating async operations and/or if:
* * a previous db_switch_to_async() was done
* * a previous db_match_async_con() was done
*/
void db_switch_to_sync(db_con_t *_h);

/**
* Places the given connection back into the async idle pool.
*
* MUST be called after db_switch_to_sync().
*
* MUST be called if:
* * errors occurred while starting up a new async transfer
* * a transfer is fully completed.
*/
void db_store_async_con(db_con_t *_h, struct pool_con *con);

/**
* Matches the given fd to one of the ongoing async DB transfers.
* Returns the DB connection of the given fd.
*
* Note: the DB handle is switched to async mode.
* Make sure you switch back to sync mode when done.
*/
struct pool_con *db_match_async_con(int fd, db_con_t *_h);

#endif /* DB_ASYNC_H */
10 changes: 6 additions & 4 deletions db/db_con.h
Expand Up @@ -31,17 +31,20 @@

#include "../str.h"
#include "db_ps.h"
#include "db_id.h"

typedef int (*get_con_fd_f) (void *con);

/**
* This structure represents a database connection, pointer to this structure
* are used as a connection handle from modules uses the db API.
*/
typedef struct {
const str* table; /**< Default table that should be used */
const str* table; /**< Default table that should be used */
db_ps_t* curr_ps; /**< Prepared statement to be used for next query */
struct query_list *ins_list; /**< Insert list to be used for the next insert */
unsigned long tail; /**< Hook to implementation-specific database state */
str url; /**< what is the URL that this connection is bound on */
unsigned long tail; /**< Hook to implementation-specific database state */
str url; /**< URL that this connection is bound on */
int flags;
} db_con_t;

Expand All @@ -63,5 +66,4 @@ typedef struct {
*((int *)&(con)->flags) &= ~CON_OR_OPERATOR; \
} while (0)


#endif /* DB_CON_H */
11 changes: 10 additions & 1 deletion db/db_pool.h
Expand Up @@ -32,6 +32,12 @@
#include "db_id.h"
#include "db_con.h"

extern int db_max_async_connections;

struct db_transfer {
int fd;
struct pool_con *con;
};

/**
* This is a stub that contains all attributes
Expand All @@ -44,7 +50,10 @@
struct pool_con {
struct db_id* id; /**< Connection identifier */
unsigned int ref; /**< Reference count */
struct pool_con* next; /**< Next element in the pool */
struct pool_con *async_pool; /**< Subpool of identical database handles */
int no_transfers; /**< Number of async queries to this backend */
struct db_transfer *transfers; /**< Array of ongoing async operations */
struct pool_con *next; /**< Next element in the pool (different db_id) */
};


Expand Down
2 changes: 1 addition & 1 deletion io_wait.h
Expand Up @@ -326,7 +326,7 @@ inline static int io_watch_add( io_wait_h* h,

if (e->flags & flags){
if (e->data != data) {
LM_ERR("[%s] BUG trying to overwrite entry %d"
LM_BUG("[%s] BUG trying to overwrite entry %d"
" in the hash(%d, %d, %p,%d) with (%d, %d, %p,%d)\n",
h->name,fd, e->fd, e->type, e->data,e->flags, fd, type, data,flags);
goto error;
Expand Down

0 comments on commit 92755be

Please sign in to comment.