Permalink
Browse files

make async querying work.

From mysql 5.1 documentation:

http://dev.mysql.com/doc/refman/5.1/en/threaded-clients.html

* Two threads can't send a query to the MySQL server at the same time on the same connection. In particular, you have to ensure that between calls to mysql_query() and mysql_store_result() no other thread is using the same connection.
* Many threads can access different result sets that are retrieved with mysql_store_result().

* If you use mysql_use_result(), you must ensure that no other thread is using the same connection until the result set is closed. However, it really is best for threaded clients that share the same connection to use mysql_store_result()

So, create a pthread mutex around  calls to mysql_real_query. Only support mysql_store_result for async mode.
  • Loading branch information...
1 parent 0d1e9a5 commit 02c35e0fdb387e9a86b398c975722120c2628f51 @ssinghi ssinghi committed with Aug 17, 2010
Showing with 21 additions and 36 deletions.
  1. +21 −36 src/mysql_bindings_connection.cc
@@ -136,10 +136,12 @@ MysqlConn::MysqlConnInfo MysqlConn::GetInfo() {
MysqlConn::MysqlConn(): EventEmitter() {
_conn = NULL;
multi_query = false;
+ pthread_mutex_init(&query_lock, NULL);
}
MysqlConn::~MysqlConn() {
this->Close();
+ pthread_mutex_destroy(&query_lock);
}
Handle<Value> MysqlConn::New(const Arguments& args) {
@@ -826,6 +828,7 @@ int MysqlConn::EIO_After_Query(eio_req *req) {
int argc = 0;
Local<Value> argv[2];
+ HandleScope scope;
if (req->result) {
argv[0] = Exception::Error(String::New("Error on query execution"));
@@ -837,7 +840,7 @@ int MysqlConn::EIO_After_Query(eio_req *req) {
Persistent<Object> js_result(MysqlResult::constructor_template->
GetFunction()->NewInstance(2, argv));
- argv[0] = Local<Value>::New(js_result);
+ argv[0] = Local<Value>::New(scope.Close(js_result));
argc = 1;
} else {
/* no result set - not a SELECT, SHOW, DESCRIBE or EXPLAIN */
@@ -873,44 +876,32 @@ int MysqlConn::EIO_Query(eio_req *req) {
MYSQLSYNC_DISABLE_MQ;
+ pthread_mutex_lock(&conn->query_lock);
int r = mysql_real_query(
conn->_conn,
query_req->query,
query_req->query_length);
-
if (r) {
req->result = 1;
- return 0;
}
-
- req->int1 = 1;
- query_req->field_count = mysql_field_count(conn->_conn);
- if (!query_req->field_count) {
- /* no result set - not a SELECT, SHOW, DESCRIBE or EXPLAIN */
- req->int1 = 0;
+ else {
+ req->int1 = 1;
req->result = 0;
- return 0;
- }
-
- MYSQL_RES *my_result;
-
- switch (query_req->result_mode) {
- case MYSQLSYNC_STORE_RESULT:
- my_result = mysql_store_result(conn->_conn);
- break;
- case MYSQLSYNC_USE_RESULT:
- my_result = mysql_use_result(conn->_conn);
- break;
- }
-
- if (!my_result) {
- req->result = 1;
- return 0;
+ query_req->field_count = mysql_field_count(conn->_conn);
+ if (!query_req->field_count) { /* no result set - not a SELECT, SHOW, DESCRIBE or EXPLAIN */
+ req->int1 = 0;
+ }
+ else {
+ MYSQL_RES *my_result = mysql_store_result(conn->_conn);
+ if (my_result) {
+ query_req->my_result = my_result;
+ }
+ else {
+ req->result = 1;
+ }
+ }
}
-
- query_req->my_result = my_result;
- req->result = 0;
-
+ pthread_mutex_unlock(&conn->query_lock);
return 0;
}
@@ -934,12 +925,6 @@ Handle<Value> MysqlConn::QueryAsync(const Arguments& args) {
return THREXC("Could not allocate enough memory");
}
- query_req->result_mode = MYSQLSYNC_STORE_RESULT;
-
- if (args.Length() == 2) {
- query_req->result_mode = MYSQLSYNC_USE_RESULT;
- }
-
query_req->query_length = query.length();
query_req->query =
reinterpret_cast<char *>(calloc(query_req->query_length + 1,

0 comments on commit 02c35e0

Please sign in to comment.