diff --git a/db/db.c b/db/db.c index 64068dd7989..34f650dff88 100644 --- a/db/db.c +++ b/db/db.c @@ -55,6 +55,7 @@ char *db_version_table = VERSION_TABLE; char *db_default_url = NULL; +int db_max_async_connections = 10; /** maximal length of a SQL URL */ static unsigned int MAX_URL_LENGTH = 255; @@ -255,7 +256,7 @@ int db_bind_mod(const str* mod, db_func_t* mydbf) db_con_t* db_do_init(const str* url, void* (*new_connection)()) { struct db_id* id; - void* con; + struct pool_con* con; db_con_t* res; int con_size = sizeof(db_con_t) + sizeof(void *) + url->len; @@ -297,17 +298,25 @@ db_con_t* db_do_init(const str* url, void* (*new_connection)()) if (!con) { LM_DBG("connection %p not found in pool\n", id); /* Not in the pool yet */ - con = new_connection(id); + con = (struct pool_con *)new_connection(id); if (!con) { LM_ERR("could not add connection to the pool\n"); goto err; } - pool_insert((struct pool_con*)con); + pool_insert(con); LM_DBG("connection %p inserted in pool as %p\n", id,con); } else { LM_DBG("connection %p found in pool as %p\n", id,con); } + if (!con->transfers) { + con->transfers = pkg_malloc(db_max_async_connections); + if (!con->transfers) { + LM_ERR("no more pkg\n"); + goto err; + } + } + res->tail = (unsigned long)con; return res; diff --git a/db/db.h b/db/db.h index a893bdd5514..c91bdb2836d 100644 --- a/db/db.h +++ b/db/db.h @@ -274,6 +274,38 @@ typedef int (*db_last_inserted_id_f) (const db_con_t* _h); typedef int (*db_insert_update_f) (const db_con_t* _h, const db_key_t* _k, const db_val_t* _v, const int _n); +/** + * \brief Asynchronous raw SQL query on a separate TCP connection. + * Returns immediately. + * + * If all currently open connections are in use, it will attempt to open a new + * one, up to "db_max_async_connections". If maximum is reached, the query is + * done synchronously! + * + * \param _h structure representing the database handle + * \param _s the SQL query + * \return returns 1 if everything is OK, otherwise returns value < 0 + */ +typedef int (*db_async_raw_query_f) (db_con_t *_h, const str *_q); + +/* + * \brief Reads data from the given fd's SQL connection. Also fetches all data + * when it resumes fetching data for the last time. + * + * \param _h structure representing the database handle + * \param fd read file descriptor obtained in starting phase + * \param _r structure for the result + * \return: + * -> 1 on success, negative on failure + * -> also populates the global "async_status": ASYNC_CONTINUE / ASYNC_DONE + * + * !!! IMPORTANT: + * if data is fully read (async_status == ASYNC_DONE), + * backend-specific results have already been freed! + * You only need to call db_free_result(_r) when done + */ +typedef enum async_ret_code (*db_async_raw_resume_f) (db_con_t *_h, + int fd, db_res_t **_r); /** * \brief Database module callbacks @@ -295,9 +327,10 @@ typedef struct db_func { db_delete_f delete; /* Delete from table */ db_update_f update; /* Update table */ db_replace_f replace; /* Replace row in a table */ - db_last_inserted_id_f last_inserted_id; /* Retrieve the last inserted ID - in a table */ - db_insert_update_f insert_update; /* Insert into table, update on duplicate key */ + db_last_inserted_id_f last_inserted_id; /* Retrieve the last inserted ID in a table */ + db_insert_update_f insert_update; /* Insert into table, update on duplicate key */ + db_async_raw_query_f async_raw_query; /* Starts an asynchronous raw query */ + db_async_raw_resume_f async_raw_resume; /* Called if there is some data to be read */ } db_func_t; diff --git a/db/db_async.c b/db/db_async.c new file mode 100644 index 00000000000..6a890b130dd --- /dev/null +++ b/db/db_async.c @@ -0,0 +1,137 @@ +/* + * MySQL async connection array management + * + * Copyright (C) 2015 OpenSIPS Solutions + * + * This file is part of opensips, a free SIP server. + * + * opensips is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version + * + * opensips is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * + * History: + * -------- + * 2015-01-XX initial version (liviu) + */ + +#include + +#include "db_async.h" +#include "db_pool.h" +#include "../dprint.h" +#include "../error.h" + +/* + * aux variable which holds the default connection (used in blocking mode) + * while async database operations are done (separate connections & queries) + */ +static struct pool_con *sync_con; + +struct pool_con *db_switch_to_async(db_con_t *_h, get_con_fd_f get_fd, int **fd_ref, + void *(*new_connection)(const struct db_id *)) +{ + struct pool_con *con = (struct pool_con *)_h->tail; + void *new; + + if (con->no_transfers == db_max_async_connections) + return NULL; + + /* no idle connections for async queries? open a new one! */ + if (!con->async_pool) { + new = new_connection(con->id); + if (!new) { + LM_ERR("failed to open new DB connection on " + "%s://XXXX:XXXX@%s:%d/%s\n", con->id->scheme, + con->id->host, con->id->port, con->id->database); + return NULL; + } + } else { + new = con->async_pool; + con->async_pool = con->async_pool->next; + } + + *fd_ref = &con->transfers[con->no_transfers].fd; + + con->transfers[con->no_transfers].fd = get_fd(new); + con->transfers[con->no_transfers].con = new; + + LM_INFO(" %d/%d transfers: (%d - %p)\n", con->no_transfers + 1, + db_max_async_connections, con->transfers[con->no_transfers].fd, + con->transfers[con->no_transfers].con); + + con->no_transfers++; + + + /* switch to async con */ + sync_con = con; + _h->tail = (unsigned long)new; + + return new; +} + +void db_switch_to_sync(db_con_t *_h) +{ + if (!sync_con) { + LM_BUG("sync_con == NULL"); + abort(); + } + + /* switch to sync con */ + _h->tail = (unsigned long)sync_con; +} + +void db_store_async_con(db_con_t *_h, struct pool_con *con) +{ + int i; + struct pool_con *tail = (struct pool_con *)_h->tail; + + con->next = tail->async_pool; + tail->async_pool = con; + + LM_INFO("XXXXXXXXXXXXXXXXXX RESTORE CONNNN: %p <<<<<<<<<<<<<<<\n", con); + + for (i = 0; i < tail->no_transfers; i++) { + if (tail->transfers[i].con == con) { + tail->no_transfers--; + for (; i < tail->no_transfers; i++) + tail->transfers[i] = tail->transfers[i + 1]; + + return; + } + } + + LM_BUG("DB con %p not found", con); + abort(); +} + +struct pool_con *db_match_async_con(int fd, db_con_t *_h) +{ + int i, max; + struct db_transfer *transfers; + + LM_INFO("XXX MATCH fd %d\n", fd); + + transfers = ((struct pool_con *)_h->tail)->transfers; + max = ((struct pool_con *)_h->tail)->no_transfers; + + for (i = 0; i < max; i++) + if (fd == transfers[i].fd) { + /* switch to async con */ + sync_con = (struct pool_con *)_h->tail; + _h->tail = (unsigned long)transfers[i].con; + + return (struct pool_con *)_h->tail; + } + + return NULL; +} diff --git a/db/db_async.h b/db/db_async.h new file mode 100644 index 00000000000..c0d073bd71a --- /dev/null +++ b/db/db_async.h @@ -0,0 +1,84 @@ +/* + * MySQL async connection array management + * + * Copyright (C) 2015 OpenSIPS Solutions + * + * This file is part of opensips, a free SIP server. + * + * opensips is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version + * + * opensips is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * + * History: + * -------- + * 2015-01-XX initial version (liviu) + */ + +/** + * Basic primitives which enable async DB support on top of existing code + * while keeping the existing logic working exactly as before + */ + +#ifndef DB_ASYNC_H +#define DB_ASYNC_H + +#include "db_con.h" + +typedef int (*get_con_fd_f) (void *con); + +/** + * Sets up the DB handle for an asynchronous query. A new connections is + * opened if necessary, otherwise one is grabbed from the idle pool. + * + * params: + * _h - DB handle + * get_fd - function which returns a read file descriptor (used for polling) + * from the backend's connection structure + * fd_ref - if the connection fetched from the pool disconnected + * (requiring a reconnect operation), or if you happen to know its fd + * has changed, make sure you also update the reference passed on here + * new_connection - backend-specific function to allocate and set up a new con + */ +struct pool_con *db_switch_to_async(db_con_t *_h, get_con_fd_f get_fd, int **fd_ref, + void *(*new_connection)(const struct db_id *)); + +/** + * Restores the DB handle in its normal state (i.e. ready for blocking queries) + * + * MUST be called after initiating async operations and/or if: + * * a previous db_switch_to_async() was done + * * a previous db_match_async_con() was done + */ +void db_switch_to_sync(db_con_t *_h); + +/** + * Places the given connection back into the async idle pool. + * + * MUST be called after db_switch_to_sync(). + * + * MUST be called if: + * * errors occurred while starting up a new async transfer + * * a transfer is fully completed. + */ +void db_store_async_con(db_con_t *_h, struct pool_con *con); + +/** + * Matches the given fd to one of the ongoing async DB transfers. + * Returns the DB connection of the given fd. + * + * Note: the DB handle is switched to async mode. + * Make sure you switch back to sync mode when done. + */ +struct pool_con *db_match_async_con(int fd, db_con_t *_h); + +#endif /* DB_ASYNC_H */ diff --git a/db/db_con.h b/db/db_con.h index bd22b84b9b2..cca06a3028f 100644 --- a/db/db_con.h +++ b/db/db_con.h @@ -31,17 +31,20 @@ #include "../str.h" #include "db_ps.h" +#include "db_id.h" + +typedef int (*get_con_fd_f) (void *con); /** * This structure represents a database connection, pointer to this structure * are used as a connection handle from modules uses the db API. */ typedef struct { - const str* table; /**< Default table that should be used */ + const str* table; /**< Default table that should be used */ db_ps_t* curr_ps; /**< Prepared statement to be used for next query */ struct query_list *ins_list; /**< Insert list to be used for the next insert */ - unsigned long tail; /**< Hook to implementation-specific database state */ - str url; /**< what is the URL that this connection is bound on */ + unsigned long tail; /**< Hook to implementation-specific database state */ + str url; /**< URL that this connection is bound on */ int flags; } db_con_t; @@ -63,5 +66,4 @@ typedef struct { *((int *)&(con)->flags) &= ~CON_OR_OPERATOR; \ } while (0) - #endif /* DB_CON_H */ diff --git a/db/db_pool.h b/db/db_pool.h index d7f96db3569..cc685bbea26 100644 --- a/db/db_pool.h +++ b/db/db_pool.h @@ -32,6 +32,12 @@ #include "db_id.h" #include "db_con.h" +extern int db_max_async_connections; + +struct db_transfer { + int fd; + struct pool_con *con; +}; /** * This is a stub that contains all attributes @@ -44,7 +50,10 @@ struct pool_con { struct db_id* id; /**< Connection identifier */ unsigned int ref; /**< Reference count */ - struct pool_con* next; /**< Next element in the pool */ + struct pool_con *async_pool; /**< Subpool of identical database handles */ + int no_transfers; /**< Number of async queries to this backend */ + struct db_transfer *transfers; /**< Array of ongoing async operations */ + struct pool_con *next; /**< Next element in the pool (different db_id) */ }; diff --git a/io_wait.h b/io_wait.h index 4eabf9b4dd5..674e562c69e 100644 --- a/io_wait.h +++ b/io_wait.h @@ -326,7 +326,7 @@ inline static int io_watch_add( io_wait_h* h, if (e->flags & flags){ if (e->data != data) { - LM_ERR("[%s] BUG trying to overwrite entry %d" + LM_BUG("[%s] BUG trying to overwrite entry %d" " in the hash(%d, %d, %p,%d) with (%d, %d, %p,%d)\n", h->name,fd, e->fd, e->type, e->data,e->flags, fd, type, data,flags); goto error; diff --git a/modules/db_cachedb/dbase.h b/modules/db_cachedb/dbase.h index 929d2542c18..825f03cc26f 100644 --- a/modules/db_cachedb/dbase.h +++ b/modules/db_cachedb/dbase.h @@ -32,9 +32,12 @@ #include "../../str.h" struct db_cachedb_con { - struct db_id* id; /* Connection identifier */ - unsigned int ref; /* Reference count */ - struct pool_con* next; /* Next connection in the pool */ + struct db_id* id; /**< Connection identifier */ + unsigned int ref; /**< Reference count */ + struct pool_con *async_pool; /**< Subpool of identical database handles */ + int no_transfers; /**< Number of async queries to this backend */ + struct db_transfer *transfers; /**< Array of ongoing async operations */ + struct pool_con *next; /**< Next element in the pool (different db_id) */ cachedb_funcs cdbf; /* pointers to the NoSQL specific functions */ cachedb_con *cdbc; /* connection to actual NoSQL back-end */ diff --git a/modules/db_mysql/my_con.h b/modules/db_mysql/my_con.h index 0268f6428ce..164a33806e0 100644 --- a/modules/db_mysql/my_con.h +++ b/modules/db_mysql/my_con.h @@ -70,9 +70,12 @@ struct prep_stmt { struct my_con { - struct db_id* id; /* Connection identifier */ - unsigned int ref; /* Reference count */ - struct pool_con* next; /* Next connection in the pool */ + struct db_id* id; /**< Connection identifier */ + unsigned int ref; /**< Reference count */ + struct pool_con *async_pool; /**< Subpool of identical database handles */ + int no_transfers; /**< Number of async queries to this backend */ + struct db_transfer *transfers; /**< Array of ongoing async operations */ + struct pool_con *next; /**< Next element in the pool (different db_id) */ MYSQL_RES* res; /* Actual result */ MYSQL* con; /* Connection representation */ diff --git a/modules/db_oracle/ora_con.c b/modules/db_oracle/ora_con.c index 73ced27d2db..37911ab0d60 100644 --- a/modules/db_oracle/ora_con.c +++ b/modules/db_oracle/ora_con.c @@ -68,8 +68,8 @@ ora_con_t* db_oracle_new_connection(const struct db_id* id) } memset(con, 0, sizeof(*con)); - con->hdr.ref = 1; - con->hdr.id = (struct db_id*)id; /* set here - freed on error */ + con->ref = 1; + con->id = (struct db_id*)id; /* set here - freed on error */ con->uri_len = uri_len; memcpy(con->uri, buf, uri_len+1); @@ -153,7 +153,7 @@ void db_oracle_free_connection(ora_con_t* con) OCIHandleFree(con->errhp, OCI_HTYPE_ERROR); if (con->envhp) OCIHandleFree(con->envhp, OCI_HTYPE_ENV); - free_db_id(con->hdr.id); + free_db_id(con->id); pkg_free(con); } diff --git a/modules/db_oracle/ora_con.h b/modules/db_oracle/ora_con.h index 6635ea0a926..27d2a11b706 100644 --- a/modules/db_oracle/ora_con.h +++ b/modules/db_oracle/ora_con.h @@ -40,7 +40,12 @@ typedef struct query_data query_data_t; struct ora_con { - struct pool_con hdr; /* Standard fields */ + struct db_id* id; /**< Connection identifier */ + unsigned int ref; /**< Reference count */ + struct pool_con *async_pool; /**< Subpool of identical database handles */ + int no_transfers; /**< Number of async queries to this backend */ + struct db_transfer *transfers; /**< Array of ongoing async operations */ + struct pool_con *next; /**< Next element in the pool (different db_id) */ OCIError *errhp; /* Error */ OCISvcCtx *svchp; /* Server Context */ diff --git a/modules/db_postgres/pg_con.h b/modules/db_postgres/pg_con.h index 36bcf7b1287..1643d1197df 100644 --- a/modules/db_postgres/pg_con.h +++ b/modules/db_postgres/pg_con.h @@ -44,9 +44,12 @@ * Postgres specific connection data */ struct pg_con { - struct db_id* id; /* Connection identifier */ - unsigned int ref; /* Reference count */ - struct pool_con* next; /* Next connection in the pool */ + struct db_id* id; /**< Connection identifier */ + unsigned int ref; /**< Reference count */ + struct pool_con *async_pool; /**< Subpool of identical database handles */ + int no_transfers; /**< Number of async queries to this backend */ + struct db_transfer *transfers; /**< Array of ongoing async operations */ + struct pool_con *next; /**< Next element in the pool (different db_id) */ int connected; char *sqlurl; /* the url we are connected to, all connection memory parents from this */ diff --git a/modules/db_unixodbc/con.h b/modules/db_unixodbc/con.h index dcc93311cf9..dfd276d98f7 100644 --- a/modules/db_unixodbc/con.h +++ b/modules/db_unixodbc/con.h @@ -53,9 +53,13 @@ typedef struct strn struct my_con { - struct db_id* id; /* Connection identifier */ - unsigned int ref; /* Reference count */ - struct pool_con* next; /* Next connection in the pool */ + struct db_id* id; /**< Connection identifier */ + unsigned int ref; /**< Reference count */ + struct pool_con *async_pool; /**< Subpool of identical database handles */ + int no_transfers; /**< Number of async queries to this backend */ + struct db_transfer *transfers; /**< Array of ongoing async operations */ + struct pool_con *next; /**< Next element in the pool (different db_id) */ + SQLHENV env; SQLHSTMT stmt_handle; /* Actual result */ SQLHDBC dbc; /* Connection representation */