Skip to content

Commit

Permalink
rewrite local infile related code to avoid use undocumented
Browse files Browse the repository at this point in the history
features from mysqlclient library
  • Loading branch information
anton committed Sep 10, 2012
1 parent 169c7da commit 78cac61
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 91 deletions.
127 changes: 48 additions & 79 deletions src/mysql_bindings_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ bool MysqlConnection::Connect(const char* hostname,
return false;
}

this->InitLocalInfileCallbacks();
bool unsuccessful = !mysql_real_connect(this->_conn,
hostname,
user,
Expand Down Expand Up @@ -151,7 +150,6 @@ bool MysqlConnection::RealConnect(const char* hostname,
socket,
flags);

this->InitLocalInfileCallbacks();
if (unsuccessful) {
this->connect_errno = mysql_errno(this->_conn);
this->connect_error = mysql_error(this->_conn);
Expand Down Expand Up @@ -1082,15 +1080,13 @@ void MysqlConnection::EIO_Query(uv_work_t *req) {

MYSQLCONN_DISABLE_MQ;

pthread_mutex_lock(&conn->query_lock);
// we are protected with mutex, so set CURRENT request data
// in connection (common object for ALL queries)
conn->infile_data.query_req = query_req;

SetCorrectLocalInfileHandlers(query_req->infile_data, conn->_conn);
int r = mysql_real_query(conn->_conn, query_req->query, query_req->query_len);

// clean after ourselves
conn->infile_data.query_req = NULL;
RestoreLocalInfileHandlers(query_req->infile_data, conn->_conn);

int errno = mysql_errno(conn->_conn);
if (r != 0 || errno != 0) {
Expand Down Expand Up @@ -1157,21 +1153,11 @@ Handle<Value> MysqlConnection::Query(const Arguments& args) {
MYSQLCONN_MUSTBE_CONNECTED;

query_request *query_req = new query_request;
query_req->local_infile_buffer = NULL;
query_req->local_infile_buffer_length = 0;
if (!local_infile_buffer->IsNull()) {
query_req->local_infile_buffer_length = node::Buffer::Length(local_infile_buffer->ToObject());
DEBUG("buffer length %d", query_req->local_infile_buffer_length);
query_req->local_infile_buffer = static_cast<char*>(malloc(query_req->local_infile_buffer_length));
memcpy(query_req->local_infile_buffer,
node::Buffer::Data(local_infile_buffer->ToObject()),
query_req->local_infile_buffer_length);
}
unsigned int query_len = static_cast<unsigned int>(query.length());

query_req->query = new char[query_len + 1];
query_req->query_len = query_len;

query_req->infile_data = MysqlConnection::PrepareLocalInfileData(local_infile_buffer);
// Copy query from V8 value to buffer
memcpy(query_req->query, *query, query_len);
query_req->query[query_len] = '\0';
Expand Down Expand Up @@ -1330,32 +1316,19 @@ Handle<Value> MysqlConnection::QuerySync(const Arguments& args) {
unsigned int field_count;

unsigned int query_len = static_cast<unsigned int>(query.length());
query_request *query_req = new query_request;
query_req->local_infile_buffer = NULL;
query_req->local_infile_buffer_length = 0;
if (!local_infile_buffer->IsNull()) {
query_req->local_infile_buffer_length = node::Buffer::Length(local_infile_buffer->ToObject());
DEBUG("buffer length %d", query_req->local_infile_buffer_length);
query_req->local_infile_buffer = static_cast<char*>(malloc(query_req->local_infile_buffer_length));
memcpy(query_req->local_infile_buffer,
node::Buffer::Data(local_infile_buffer->ToObject()),
query_req->local_infile_buffer_length);
}

local_infile_data * infile_data = PrepareLocalInfileData(local_infile_buffer);
// Only one query can be executed on a connection at a time
pthread_mutex_lock(&conn->query_lock);
DEBUG("running query sync \n");
SetCorrectLocalInfileHandlers(infile_data, conn->_conn);
int r = mysql_real_query(conn->_conn, *query, query_len);
RestoreLocalInfileHandlers(infile_data, conn->_conn);
if (r == 0) {
DEBUG("running query sync ok\n");
my_result = mysql_store_result(conn->_conn);
field_count = mysql_field_count(conn->_conn);
}

pthread_mutex_unlock(&conn->query_lock);
delete query_req;
if (r != 0) {
DEBUG("query error %s\n", mysql_error(conn->_conn));
// Query error
return scope.Close(False());
}
Expand Down Expand Up @@ -1769,72 +1742,68 @@ Handle<Value> MysqlConnection::WarningCountSync(const Arguments& args) {
}
int MysqlConnection::CustomLocalInfileInit(void ** ptr, const char * filename, void * userdata) {
DEBUG("init start\n");
local_infile_data * data = static_cast<local_infile_data *>(userdata);
if (!data->query_req) {
DEBUG("init default\n");
return data->default_local_infile_init(ptr, filename, NULL);
}
data->query_req->local_infile_buffer_position = 0;
*ptr = data;
*ptr = userdata;
DEBUG("init custom\n");
return 0;
}
int MysqlConnection::CustomLocalInfileRead(void * ptr, char * buf, unsigned int buf_len) {
DEBUG("read start\n");
local_infile_data * data = static_cast<local_infile_data *>(ptr);
if (!data->query_req) {
DEBUG("read default\n");
return data->default_local_infile_read(ptr, buf, buf_len);
}
if (!data->query_req->local_infile_buffer || !data->query_req->local_infile_buffer_length) {
local_infile_data * infile_data = static_cast<local_infile_data *>(ptr);
if (!infile_data->buffer || !infile_data->length) {
DEBUG("read empty\n");
return 0;
}
if (data->query_req->local_infile_buffer_position >= data->query_req->local_infile_buffer_length) {
if (infile_data->position >= infile_data->length) {
DEBUG("read done\n");
return 0;
}
size_t copy_len = data->query_req->local_infile_buffer_length - data->query_req->local_infile_buffer_position;
size_t copy_len = infile_data->length - infile_data->position;
copy_len = copy_len < buf_len ? copy_len : buf_len;
memcpy(buf, data->query_req->local_infile_buffer + data->query_req->local_infile_buffer_position, copy_len);
data->query_req->local_infile_buffer_position += copy_len;
memcpy(buf, infile_data->buffer + infile_data->position, copy_len);
infile_data->position += copy_len;
DEBUG("read copied\n");
return copy_len;
}
void MysqlConnection::CustomLocalInfileEnd(void * ptr) {
DEBUG("end\n");
local_infile_data * data = static_cast<local_infile_data *>(ptr);
if (!data->query_req) {
DEBUG("default end\n");
return data->default_local_infile_end(ptr);
}
if (data->query_req->local_infile_buffer) {
DEBUG("end free\n");
free(data->query_req->local_infile_buffer);
}
data->query_req->local_infile_buffer = NULL;
data->query_req->local_infile_buffer_length = 0;
data->query_req->local_infile_buffer_position = 0;
data->query_req = NULL;
DEBUG("end done\n");
}
int MysqlConnection::CustomLocalInfileError(void *ptr, char *error_msg, unsigned int error_msg_len) {
DEBUG("errror\n");
local_infile_data * data = static_cast<local_infile_data *>(ptr);
DEBUG("default error\n");
return data->default_local_infile_error(ptr, error_msg, error_msg_len);
}
void MysqlConnection::InitLocalInfileCallbacks() {
this->infile_data.query_req = NULL;
this->infile_data.default_local_infile_init = this->_conn->options.local_infile_init;
this->infile_data.default_local_infile_read = this->_conn->options.local_infile_read;
this->infile_data.default_local_infile_end = this->_conn->options.local_infile_end;
this->infile_data.default_local_infile_error = this->_conn->options.local_infile_error;
mysql_options(this->_conn, MYSQL_OPT_LOCAL_INFILE, NULL);
mysql_set_local_infile_handler(this->_conn,
MysqlConnection::CustomLocalInfileInit,
MysqlConnection::CustomLocalInfileRead,
MysqlConnection::CustomLocalInfileEnd,
MysqlConnection::CustomLocalInfileError,
&(this->infile_data));
return 0;
}
//mysql_options(this->_conn, MYSQL_OPT_LOCAL_INFILE, NULL);

MysqlConnection::local_infile_data * MysqlConnection::PrepareLocalInfileData(Handle<Value> buffer) {
local_infile_data * infile_data;
if (buffer->IsNull()) {
return NULL;
}
infile_data = static_cast<local_infile_data*>(malloc(sizeof(*infile_data)));
infile_data->length = node::Buffer::Length(buffer->ToObject());
infile_data->position = 0;
infile_data->buffer = static_cast<char *>(malloc(infile_data->length));
memcpy(infile_data->buffer, node::Buffer::Data(buffer->ToObject()), infile_data->length);
DEBUG("buffer length %d", infile_data->length);
return infile_data;
}
void MysqlConnection::SetCorrectLocalInfileHandlers(local_infile_data * infile_data, MYSQL * conn) {
if (infile_data) {
mysql_set_local_infile_handler(conn,
MysqlConnection::CustomLocalInfileInit,
MysqlConnection::CustomLocalInfileRead,
MysqlConnection::CustomLocalInfileEnd,
MysqlConnection::CustomLocalInfileError,
infile_data);
}
}
void MysqlConnection::RestoreLocalInfileHandlers(local_infile_data * infile_data, MYSQL * conn) {
if(infile_data) {
mysql_set_local_infile_default(conn);
if(infile_data->buffer) {
free(infile_data->buffer);
}
free(infile_data);
}
}
21 changes: 9 additions & 12 deletions src/mysql_bindings_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,11 @@ class MysqlConnection : public node::ObjectWrap {
static Handle<Value> MultiRealQuerySync(const Arguments& args);

static Handle<Value> PingSync(const Arguments& args);
struct local_infile_data {
char * buffer;
size_t length;
size_t position;
};

struct query_request {
bool ok;
Expand All @@ -192,23 +197,15 @@ class MysqlConnection : public node::ObjectWrap {
unsigned int errno;
const char *error;

char * local_infile_buffer;
size_t local_infile_buffer_length;
size_t local_infile_buffer_position;
};
struct local_infile_data {
int (*default_local_infile_init)(void **, const char *, void *);
int (*default_local_infile_read)(void *, char *, unsigned int);
void (*default_local_infile_end)(void *);
int (*default_local_infile_error)(void *, char*, unsigned int);
query_request * query_req;
local_infile_data * infile_data;
};
static int CustomLocalInfileInit(void ** ptr, const char * filename, void * userdata);
static int CustomLocalInfileRead(void * ptr, char * buf, unsigned int buf_len);
static void CustomLocalInfileEnd(void * ptr);
static int CustomLocalInfileError(void *ptr, char *error_msg, unsigned int error_msg_len);
local_infile_data infile_data;
void InitLocalInfileCallbacks();
static void SetCorrectLocalInfileHandlers(local_infile_data * infile_data, MYSQL * conn);
static void RestoreLocalInfileHandlers(local_infile_data * infile_data, MYSQL * conn);
static local_infile_data * PrepareLocalInfileData(Handle<Value> buffer);
static void EIO_After_Query(uv_work_t *req);
static void EIO_Query(uv_work_t *req);
static Handle<Value> Query(const Arguments& args);
Expand Down

0 comments on commit 78cac61

Please sign in to comment.