Skip to content

Commit

Permalink
Add connection pool for db.
Browse files Browse the repository at this point in the history
  • Loading branch information
killing committed Aug 20, 2016
1 parent 6a5e080 commit f52ffdf
Show file tree
Hide file tree
Showing 19 changed files with 145 additions and 91 deletions.
99 changes: 89 additions & 10 deletions common/db-wrapper/db-wrapper.c
Expand Up @@ -9,6 +9,7 @@ typedef struct DBOperations {
void (*db_conn_pool_free) (DBConnPool *);
DBConnection* (*get_db_connection) (DBConnPool *, GError **);
void (*db_connection_close) (DBConnection *);
gboolean (*db_connection_ping) (DBConnection *);
gboolean (*db_connection_execute) (DBConnection *, const char *, GError **);
ResultSet* (*db_connection_execute_query) (DBConnection *, const char *, GError **);
gboolean (*result_set_next) (ResultSet *, GError **);
Expand All @@ -31,6 +32,14 @@ static DBOperations db_ops;

/* DB Connection Pool. */

static void
init_conn_pool_common (DBConnPool *pool, int max_connections)
{
pool->connections = g_ptr_array_sized_new (max_connections);
pthread_mutex_init (&pool->lock, NULL);
pool->max_connections = max_connections;
}

DBConnPool *
db_conn_pool_new_mysql (const char *host,
const char *user,
Expand All @@ -45,6 +54,7 @@ db_conn_pool_new_mysql (const char *host,
db_ops.db_conn_pool_free = mysql_db_conn_pool_free;
db_ops.get_db_connection = mysql_get_db_connection;
db_ops.db_connection_close = mysql_db_connection_close;
db_ops.db_connection_ping = mysql_db_connection_ping;
db_ops.db_connection_execute = mysql_db_connection_execute;
db_ops.db_connection_execute_query = mysql_execute_query;
db_ops.result_set_next = mysql_result_set_next;
Expand All @@ -62,20 +72,27 @@ db_conn_pool_new_mysql (const char *host,
db_ops.db_connection_commit = mysql_db_commit;
db_ops.db_connection_rollback = mysql_db_rollback;

return mysql_db_conn_pool_new (host, user, password, port, db_name, unix_socket,
use_ssl, charset, max_connections);
DBConnPool *pool;

pool = mysql_db_conn_pool_new (host, user, password, port, db_name, unix_socket,
use_ssl, charset);
init_conn_pool_common (pool, max_connections);

return pool;
}

DBConnPool *
db_conn_pool_new_pgsql (const char *host,
const char *user,
const char *password,
const char *db_name,
const char *unix_socket)
const char *unix_socket,
int max_connections)
{
db_ops.db_conn_pool_free = pgsql_db_conn_pool_free;
db_ops.get_db_connection = pgsql_get_db_connection;
db_ops.db_connection_close = pgsql_db_connection_close;
db_ops.db_connection_ping = pgsql_db_connection_ping;
db_ops.db_connection_execute = pgsql_db_connection_execute;
db_ops.db_connection_execute_query = pgsql_execute_query;
db_ops.result_set_next = pgsql_result_set_next;
Expand All @@ -93,7 +110,12 @@ db_conn_pool_new_pgsql (const char *host,
db_ops.db_connection_commit = pgsql_db_commit;
db_ops.db_connection_rollback = pgsql_db_rollback;

return pgsql_db_conn_pool_new (host, user, password, db_name, unix_socket);
DBConnPool *pool;

pool = pgsql_db_conn_pool_new (host, user, password, db_name, unix_socket);
init_conn_pool_common (pool, max_connections);

return pool;
}

DBConnPool *
Expand All @@ -102,6 +124,7 @@ db_conn_pool_new_sqlite (const char *db_path, int max_connections)
db_ops.db_conn_pool_free = sqlite_db_conn_pool_free;
db_ops.get_db_connection = sqlite_get_db_connection;
db_ops.db_connection_close = sqlite_db_connection_close;
db_ops.db_connection_ping = sqlite_db_connection_ping;
db_ops.db_connection_execute = sqlite_db_connection_execute;
db_ops.db_connection_execute_query = sqlite_execute_query;
db_ops.result_set_next = sqlite_result_set_next;
Expand All @@ -119,12 +142,20 @@ db_conn_pool_new_sqlite (const char *db_path, int max_connections)
db_ops.db_connection_commit = sqlite_db_commit;
db_ops.db_connection_rollback = sqlite_db_rollback;

return sqlite_db_conn_pool_new (db_path, max_connections);
DBConnPool *pool;

pool = sqlite_db_conn_pool_new (db_path);
init_conn_pool_common (pool, max_connections);

return pool;
}

void
db_conn_pool_free (DBConnPool *pool)
{
g_ptr_array_free (pool->connections, TRUE);
pthread_mutex_destroy (&pool->lock);

return db_ops.db_conn_pool_free (pool);
}

Expand All @@ -133,15 +164,40 @@ db_conn_pool_free (DBConnPool *pool)
DBConnection *
db_conn_pool_get_connection (DBConnPool *pool, GError **error)
{
return db_ops.get_db_connection (pool, error);
DBConnection *conn = NULL;

pthread_mutex_lock (&pool->lock);

guint i, size = pool->connections->len;
for (i = 0; i < size; ++i) {
conn = g_ptr_array_index (pool->connections, i);
if (conn->is_available && db_connection_ping (conn)) {
conn->is_available = FALSE;
goto out;
}
}
conn = NULL;
if (size < pool->max_connections) {
conn = db_ops.get_db_connection (pool, error);
if (conn) {
conn->is_available = TRUE;
conn->pool = pool;
g_ptr_array_add (pool->connections, conn);
}
}

out:
pthread_mutex_unlock (&pool->lock);
return conn;
}

static void
db_connection_clear (DBConnection *conn)
{
result_set_free (conn->result_set);

db_stmt_free (conn->stmt);
conn->result_set = NULL;
conn->stmt = NULL;
}

void
Expand All @@ -150,9 +206,14 @@ db_connection_close (DBConnection *conn)
if (!conn)
return;

if (conn->in_transaction)
db_connection_rollback (conn, NULL);

db_connection_clear (conn);

db_ops.db_connection_close (conn);
pthread_mutex_lock (&conn->pool->lock);
conn->is_available = TRUE;
pthread_mutex_unlock (&conn->pool->lock);
}

gboolean
Expand All @@ -161,6 +222,12 @@ db_connection_execute (DBConnection *conn, const char *sql, GError **error)
return db_ops.db_connection_execute (conn, sql, error);
}

gboolean
db_connection_ping (DBConnection *conn)
{
return db_ops.db_connection_ping (conn);
}

/* Result Sets. */

void
Expand Down Expand Up @@ -341,19 +408,31 @@ db_stmt_free (DBStmt *stmt)
gboolean
db_connection_begin_transaction (DBConnection *conn, GError **error)
{
return db_ops.db_connection_begin_transaction (conn, error);
gboolean ret;

ret = db_ops.db_connection_begin_transaction (conn, error);
if (ret)
conn->in_transaction++;

return ret;
}

gboolean
db_connection_commit (DBConnection *conn, GError **error)
{
if (conn->in_transaction)
conn->in_transaction = 0;

return db_ops.db_connection_commit (conn, error);
}

gboolean
db_connection_rollback (DBConnection *conn, GError **error)
{
db_connection_clear (conn);
if (conn->in_transaction) {
db_connection_clear (conn);
conn->in_transaction = 0;
}

return db_ops.db_connection_rollback (conn, error);
}
11 changes: 10 additions & 1 deletion common/db-wrapper/db-wrapper.h
Expand Up @@ -2,13 +2,16 @@
#define DB_WARPPER_H

#include <glib.h>
#include <pthread.h>

#define SEAF_DB_ERROR_DOMAIN g_quark_from_string("SEAF_DB")
#define SEAF_DB_ERROR_CODE 0

/* DB Connection Pool. */

struct DBConnPool {
GPtrArray *connections;
pthread_mutex_t lock;
int max_connections;
};
typedef struct DBConnPool DBConnPool;
Expand All @@ -29,7 +32,8 @@ db_conn_pool_new_pgsql (const char *host,
const char *user,
const char *password,
const char *db_name,
const char *unix_socket);
const char *unix_socket,
int max_connections);

DBConnPool *
db_conn_pool_new_sqlite (const char *db_path, int max_connections);
Expand All @@ -46,6 +50,8 @@ struct DBStmt;
typedef struct DBStmt DBStmt;

struct DBConnection {
gboolean is_available;
int in_transaction;
DBConnPool *pool;
ResultSet *result_set;
DBStmt *stmt;
Expand All @@ -58,6 +64,9 @@ db_conn_pool_get_connection (DBConnPool *pool, GError **error);
void
db_connection_close (DBConnection *conn);

gboolean
db_connection_ping (DBConnection *conn);

gboolean
db_connection_execute (DBConnection *conn, const char *sql, GError **error);

Expand Down
13 changes: 9 additions & 4 deletions common/db-wrapper/mysql-db-ops.c
Expand Up @@ -27,12 +27,10 @@ mysql_db_conn_pool_new (const char *host,
const char *db_name,
const char *unix_socket,
gboolean use_ssl,
const char *charset,
int max_connections)
const char *charset)
{
MySQLDBConnPool *pool = g_new0 (MySQLDBConnPool, 1);

pool->parent.max_connections = max_connections;
pool->host = g_strdup (host);
pool->user = g_strdup (user);
pool->password = g_strdup (password);
Expand Down Expand Up @@ -117,7 +115,6 @@ mysql_get_db_connection (DBConnPool *vpool, GError **error)
return NULL;
conn = g_new0 (MySQLDBConnection, 1);
conn->db = db;
conn->parent.pool = vpool;
return (DBConnection *)conn;
}

Expand All @@ -134,6 +131,14 @@ mysql_db_connection_close (DBConnection *vconn)
g_free (conn);
}

gboolean
mysql_db_connection_ping (DBConnection *vconn)
{
MySQLDBConnection *conn = (MySQLDBConnection *)vconn;

return (mysql_ping (conn->db) == 0);
}

gboolean
mysql_db_connection_execute (DBConnection *vconn, const char *sql, GError **error)
{
Expand Down
6 changes: 4 additions & 2 deletions common/db-wrapper/mysql-db-ops.h
Expand Up @@ -9,8 +9,7 @@ mysql_db_conn_pool_new (const char *host,
const char *db_name,
const char *unix_socket,
gboolean use_ssl,
const char *charset,
int max_connections);
const char *charset);

void
mysql_db_conn_pool_free (DBConnPool *vpool);
Expand All @@ -21,6 +20,9 @@ mysql_get_db_connection (DBConnPool *vpool, GError **error);
void
mysql_db_connection_close (DBConnection *vconn);

gboolean
mysql_db_connection_ping (DBConnection *vconn);

gboolean
mysql_db_connection_execute (DBConnection *vconn, const char *sql, GError **error);

Expand Down
9 changes: 8 additions & 1 deletion common/db-wrapper/pgsql-db-ops.c
Expand Up @@ -114,7 +114,6 @@ pgsql_get_db_connection (DBConnPool *vpool, GError **error)

conn = g_new0 (PGDBConnection, 1);
conn->db = db;
conn->parent.pool = vpool;

return (DBConnection *)conn;
}
Expand All @@ -132,6 +131,14 @@ pgsql_db_connection_close (DBConnection *vconn)
g_free (conn);
}

gboolean
pgsql_db_connection_ping (DBConnection *vconn)
{
PGDBConnection *conn = (PGDBConnection *)vconn;

return (PQstatus(conn->db) == CONNECTION_OK);
}

gboolean
pgsql_db_connection_execute (DBConnection *vconn, const char *sql, GError **error)
{
Expand Down
3 changes: 3 additions & 0 deletions common/db-wrapper/pgsql-db-ops.h
Expand Up @@ -17,6 +17,9 @@ pgsql_get_db_connection (DBConnPool *vpool, GError **error);
void
pgsql_db_connection_close (DBConnection *vconn);

gboolean
pgsql_db_connection_ping (DBConnection *vconn);

gboolean
pgsql_db_connection_execute (DBConnection *vconn, const char *sql, GError **error);

Expand Down
10 changes: 7 additions & 3 deletions common/db-wrapper/sqlite-db-ops.c
Expand Up @@ -91,15 +91,13 @@ sqlite3_blocking_exec(sqlite3 *db, const char *sql, int (*callback)(void *, int,
typedef struct SQLiteDBConnPool {
DBConnPool parent;
char *db_path;
int max_connections;
} SQLiteDBConnPool;

DBConnPool *
sqlite_db_conn_pool_new (const char *db_path, int max_connections)
sqlite_db_conn_pool_new (const char *db_path)
{
SQLiteDBConnPool *pool = g_new0 (SQLiteDBConnPool, 1);
pool->db_path = g_strdup(db_path);
pool->max_connections = max_connections;

return (DBConnPool *)pool;
}
Expand Down Expand Up @@ -158,6 +156,12 @@ sqlite_db_connection_close (DBConnection *vconn)
g_free (conn);
}

gboolean
sqlite_db_connection_ping (DBConnection *vconn)
{
return TRUE;
}

gboolean
sqlite_db_connection_execute (DBConnection *vconn, const char *sql, GError **error)
{
Expand Down
5 changes: 4 additions & 1 deletion common/db-wrapper/sqlite-db-ops.h
Expand Up @@ -2,7 +2,7 @@
#define SQLITE_DB_OPS_H

DBConnPool *
sqlite_db_conn_pool_new (const char *db_path, int max_connections);
sqlite_db_conn_pool_new (const char *db_path);

void
sqlite_db_conn_pool_free (DBConnPool *vpool);
Expand All @@ -13,6 +13,9 @@ sqlite_get_db_connection (DBConnPool *vpool, GError **error);
void
sqlite_db_connection_close (DBConnection *vconn);

gboolean
sqlite_db_connection_ping (DBConnection *vconn);

gboolean
sqlite_db_connection_execute (DBConnection *vconn, const char *sql, GError **error);

Expand Down

0 comments on commit f52ffdf

Please sign in to comment.