Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LOAD DATA LOCAL INFILE implemetation #158

Merged
merged 7 commits into from Sep 10, 2012
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 3 additions & 2 deletions Makefile
Expand Up @@ -31,15 +31,16 @@ clean-all: clean

test: npm-install
./node_modules/.bin/nodeunit --reporter=minimal tests/low-level-sync tests/low-level-async \
tests/high-level tests/complex tests/issues
tests/high-level tests/complex tests/issues \
tests/load-data-infile

test-slow: npm-install
./node_modules/.bin/nodeunit --reporter=minimal tests/slow

test-all: npm-install
./node_modules/.bin/nodeunit --reporter=minimal tests/low-level-sync tests/low-level-async \
tests/high-level tests/complex tests/issues \
tests/slow
tests/slow tests/load-data-infile

test-profile: npm-install
rm -f v8.log
Expand Down
2 changes: 1 addition & 1 deletion binding.gyp
Expand Up @@ -32,4 +32,4 @@
]
}
]
}
}
3 changes: 1 addition & 2 deletions src/mysql_bindings.cc
Expand Up @@ -88,8 +88,7 @@ void InitMysqlLibmysqlclient(Handle<Object> target) {
NODE_DEFINE_CONSTANT(target, MYSQL_OPT_CONNECT_TIMEOUT);
// Unused, embedded
// NODE_DEFINE_CONSTANT(MYSQL_OPT_GUESS_CONNECTION);
// Not yet implemented
// NODE_DEFINE_CONSTANT(target, MYSQL_OPT_LOCAL_INFILE);
NODE_DEFINE_CONSTANT(target, MYSQL_OPT_LOCAL_INFILE);
// Unused, windows
// NODE_DEFINE_CONSTANT(target, MYSQL_OPT_NAMED_PIPE);
NODE_DEFINE_CONSTANT(target, MYSQL_OPT_PROTOCOL);
Expand Down
11 changes: 10 additions & 1 deletion src/mysql_bindings.h
Expand Up @@ -9,7 +9,7 @@
#define SRC_MYSQL_BINDINGS_H_

#include <v8.h>

#include<node_buffer.h>
/*!
* Use this header file to conditionally invoke different libev/libeio/libuv functions
* depending on the node version that the module is being compiled for.
Expand Down Expand Up @@ -88,6 +88,15 @@ if (args.Length() > (I) && args[I]->IsFunction()) {\
} else { \
VAR = Null(); \
}
#define OPTIONAL_BUFFER_ARG(I, VAR) \
Handle<Value> VAR;\
if (args.Length() > (I) &&\
args[I]->IsObject() &&\
node::Buffer::HasInstance(args[I])) {\
VAR = args[I]->ToObject();\
} else {\
VAR = Null();\
}

#ifdef DEBUG
#define DEBUG_PRINTF(...) fprintf(stdout, __VA_ARGS__)
Expand Down
121 changes: 101 additions & 20 deletions src/mysql_bindings_connection.cc
Expand Up @@ -11,7 +11,7 @@
#include "./mysql_bindings_connection.h"
#include "./mysql_bindings_result.h"
#include "./mysql_bindings_statement.h"

#include<stdio.h>
/*!
* Init V8 structures for MysqlConnection class
*/
Expand Down Expand Up @@ -91,7 +91,6 @@ bool MysqlConnection::Connect(const char* hostname,
uint32_t port,
const char* socket,
uint64_t flags) {

if (this->_conn) {
return false;
}
Expand Down Expand Up @@ -363,21 +362,19 @@ Handle<Value> MysqlConnection::CommitSync(const Arguments& args) {
*/
void MysqlConnection::EIO_After_Connect(uv_work_t *req) {
HandleScope scope;

struct connect_request *conn_req = (struct connect_request *)(req->data);

const int argc = 1;
Local<Value> argv[argc];

if (!conn_req->ok) {

unsigned int error_string_length = strlen(conn_req->conn->connect_error) + 25;
char* error_string = new char[error_string_length];
snprintf(
error_string, error_string_length,
"Connection error #%d: %s",
conn_req->conn->connect_errno, conn_req->conn->connect_error
);
conn_req->conn->connect_errno, conn_req->conn->connect_error);

argv[0] = V8EXC(error_string);
delete[] error_string;
Expand All @@ -388,9 +385,9 @@ void MysqlConnection::EIO_After_Connect(uv_work_t *req) {
node::MakeCallback(Context::GetCurrent()->Global(), conn_req->callback, argc, argv);

conn_req->callback.Dispose();

conn_req->conn->Unref();

delete conn_req;

delete req;
Expand Down Expand Up @@ -496,7 +493,6 @@ Handle<Value> MysqlConnection::ConnectSync(const Arguments& args) {
HandleScope scope;

MysqlConnection *conn = OBJUNWRAP<MysqlConnection>(args.Holder());

if (conn->_conn) {
return THREXC("Already initialized. "
"Use conn.realConnectSync() after conn.initSync()");
Expand All @@ -518,7 +514,7 @@ Handle<Value> MysqlConnection::ConnectSync(const Arguments& args) {
args[4]->IsUint32() ? port : 0,
args[5]->IsString() ? *socket : NULL,
args[6]->IsUint32() ? flags : 0
);
);

if (!r) {
return scope.Close(False());
Expand Down Expand Up @@ -855,6 +851,8 @@ Handle<Value> MysqlConnection::InitStatementSync(const Arguments& args) {

MysqlConnection *conn = OBJUNWRAP<MysqlConnection>(args.Holder());

MYSQLCONN_MUSTBE_CONNECTED;

MYSQL_STMT *my_statement = mysql_stmt_init(conn->_conn);

if (!my_statement) {
Expand Down Expand Up @@ -1082,7 +1080,14 @@ void MysqlConnection::EIO_Query(uv_work_t *req) {

MYSQLCONN_DISABLE_MQ;

// we are protected with mutex, so set CURRENT request data
// in connection (common object for ALL queries)
SetCorrectLocalInfileHandlers(query_req->infile_data, conn->_conn);
int r = mysql_real_query(conn->_conn, query_req->query, query_req->query_len);

// clean after ourselves
RestoreLocalInfileHandlers(query_req->infile_data, conn->_conn);

int errno = mysql_errno(conn->_conn);
if (r != 0 || errno != 0) {
// Query error
Expand Down Expand Up @@ -1132,19 +1137,27 @@ Handle<Value> MysqlConnection::Query(const Arguments& args) {
HandleScope scope;

REQ_STR_ARG(0, query);
OPTIONAL_FUN_ARG(1, callback);
OPTIONAL_BUFFER_ARG(1, local_infile_buffer);

Handle<Value> callback;
if (local_infile_buffer->IsNull()) {
OPTIONAL_FUN_ARG(1, possibly_callback);
callback = possibly_callback;
} else {
OPTIONAL_FUN_ARG(2, possibly_callback);
callback = possibly_callback;
}

MysqlConnection *conn = OBJUNWRAP<MysqlConnection>(args.Holder());

MYSQLCONN_MUSTBE_CONNECTED;

query_request *query_req = new query_request;

unsigned int query_len = static_cast<unsigned int>(query.length());

query_req->query = new char[query_len + 1];
query_req->query_len = query_len;

query_req->infile_data = MysqlConnection::PrepareLocalInfileData(local_infile_buffer);
// Copy query from V8 value to buffer
memcpy(query_req->query, *query, query_len);
query_req->query[query_len] = '\0';
Expand Down Expand Up @@ -1293,27 +1306,28 @@ Handle<Value> MysqlConnection::QuerySync(const Arguments& args) {
MysqlConnection *conn = OBJUNWRAP<MysqlConnection>(args.Holder());

REQ_STR_ARG(0, query)
OPTIONAL_BUFFER_ARG(1, local_infile_buffer);

MYSQLCONN_MUSTBE_CONNECTED;

MYSQLCONN_DISABLE_MQ;

MYSQL_RES *my_result = NULL;
unsigned int field_count;

unsigned int query_len = static_cast<unsigned int>(query.length());

unsigned int query_len = static_cast<unsigned int>(query.length());
local_infile_data * infile_data = PrepareLocalInfileData(local_infile_buffer);
// Only one query can be executed on a connection at a time
pthread_mutex_lock(&conn->query_lock);

SetCorrectLocalInfileHandlers(infile_data, conn->_conn);
int r = mysql_real_query(conn->_conn, *query, query_len);
RestoreLocalInfileHandlers(infile_data, conn->_conn);
if (r == 0) {
my_result = mysql_store_result(conn->_conn);
field_count = mysql_field_count(conn->_conn);
}

pthread_mutex_unlock(&conn->query_lock);

if (r != 0) {
// Query error
return scope.Close(False());
Expand Down Expand Up @@ -1384,7 +1398,6 @@ Handle<Value> MysqlConnection::RealConnectSync(const Arguments& args) {
uint32_t port = args[4]->Uint32Value();
String::Utf8Value socket(args[5]->ToString());
uint64_t flags = args[6]->Uint32Value();

bool r = conn->RealConnect(args[0]->IsString() ? *hostname : NULL,
args[1]->IsString() ? *user : NULL,
args[2]->IsString() ? *password : NULL,
Expand Down Expand Up @@ -1529,7 +1542,7 @@ Handle<Value> MysqlConnection::SetOptionSync(const Arguments& args) {
}
break;
case MYSQL_OPT_LOCAL_INFILE:
return THREXC("This option isn't implemented yet");
r = mysql_options(conn->_conn, option_key, NULL);
break;
case MYSQL_OPT_NAMED_PIPE:
case MYSQL_SHARED_MEMORY_BASE_NAME:
Expand Down Expand Up @@ -1727,3 +1740,71 @@ Handle<Value> MysqlConnection::WarningCountSync(const Arguments& args) {

return scope.Close(Integer::NewFromUnsigned(warning_count));
}
int MysqlConnection::CustomLocalInfileInit(void ** ptr, const char * filename, void * userdata) {
*ptr = userdata;
return 0;
}
int MysqlConnection::CustomLocalInfileRead(void * ptr, char * buf, unsigned int buf_len) {
local_infile_data * infile_data = static_cast<local_infile_data *>(ptr);
if (!infile_data->buffer || !infile_data->length) {
return 0;
}
if (infile_data->position >= infile_data->length) {
return 0;
}
size_t copy_len = infile_data->length - infile_data->position;
copy_len = copy_len < buf_len ? copy_len : buf_len;
memcpy(buf, infile_data->buffer + infile_data->position, copy_len);
infile_data->position += copy_len;
return copy_len;
}
void MysqlConnection::CustomLocalInfileEnd(void * ptr) {
}
int MysqlConnection::CustomLocalInfileError(void * ptr,
char *error_msg,
unsigned int error_msg_len) {
return 0;
}

MysqlConnection::local_infile_data * MysqlConnection::PrepareLocalInfileData(Handle<Value> buffer) {
local_infile_data * infile_data;
if (buffer->IsNull()) {
return NULL;
}
infile_data = static_cast<local_infile_data*>(malloc(sizeof(*infile_data)));
infile_data->length = node::Buffer::Length(buffer->ToObject());
infile_data->position = 0;
infile_data->buffer = static_cast<char *>(malloc(infile_data->length));
memcpy(infile_data->buffer, node::Buffer::Data(buffer->ToObject()), infile_data->length);
return infile_data;
}
void MysqlConnection::SetCorrectLocalInfileHandlers(local_infile_data * infile_data,
MYSQL * conn) {
if (infile_data) {
mysql_set_local_infile_handler(conn,
MysqlConnection::CustomLocalInfileInit,
MysqlConnection::CustomLocalInfileRead,
MysqlConnection::CustomLocalInfileEnd,
MysqlConnection::CustomLocalInfileError,
infile_data);
} else {
// default infile handlers uses thread local storage,
// so to be shure everything is ok, init thread specific data.
// it's cheap enough, it initializes everything needed only
// first time, all subsequent calls do nothing
mysql_thread_init();
mysql_set_local_infile_default(conn);
}
}
void MysqlConnection::RestoreLocalInfileHandlers(local_infile_data * infile_data,
MYSQL * conn) {
if (infile_data) {
mysql_set_local_infile_default(conn);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Мы не должны вызвать mysql_thread_init(); перед этой строкой?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Хм, всё равно это восстановление и если дальше будет использоваться стандартный обработчик - в его инициализации функция всё равно вызовется. Но всё равно странный порядок вызовов:

mysql_set_local_infile_default(conn);
...
mysql_thread_init();
mysql_set_local_infile_default(conn);

Так что пусть будет.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

В случае стандартного load infile (без буфера), последовательность становиться

mysql_thread_init();
mysql_set_local_infile_default(conn);
mysql_thread_end(); //чтобы не было memory leak  если в libuv в пуле потоков  какой то закроется

в случае наличия буфера последовательность

mysql_set_local_infile_handler(...) 
// не зависит от mysql_thread_init
mysql_set_local_infile_default(conn); 
// в принципе не очень хорошо, так как стандартная реализация зависит от
// mysql_thread_init(), но не очень и плохо, так как в следующем запросе, 
// в случае надобности, будет вызван mysql_thread_init

if (infile_data->buffer) {
free(infile_data->buffer);
}
free(infile_data);
} else {
mysql_thread_end();
}
}
28 changes: 25 additions & 3 deletions src/mysql_bindings_connection.h
Expand Up @@ -113,7 +113,7 @@ class MysqlConnection : public node::ObjectWrap {

struct connect_request {
bool ok;

Persistent<Function> callback;
MysqlConnection *conn;

Expand Down Expand Up @@ -172,26 +172,48 @@ class MysqlConnection : public node::ObjectWrap {
static Handle<Value> MultiRealQuerySync(const Arguments& args);

static Handle<Value> PingSync(const Arguments& args);
struct local_infile_data {
char * buffer;
size_t length;
size_t position;
};

struct query_request {
bool ok;
bool connection_closed;
bool have_result_set;

Persistent<Value> callback;
MysqlConnection *conn;

char *query;
unsigned int query_len;

MYSQL_RES *my_result;
uint32_t field_count;
my_ulonglong affected_rows;
my_ulonglong insert_id;

unsigned int errno;
const char *error;

local_infile_data * infile_data;
};
static int CustomLocalInfileInit(void ** ptr,
const char * filename,
void * userdata);
static int CustomLocalInfileRead(void * ptr,
char * buf,
unsigned int buf_len);
static void CustomLocalInfileEnd(void * ptr);
static int CustomLocalInfileError(void * ptr,
char * error_msg,
unsigned int error_msg_len);
static void SetCorrectLocalInfileHandlers(local_infile_data * infile_data,
MYSQL * conn);
static void RestoreLocalInfileHandlers(local_infile_data * infile_data,
MYSQL * conn);
static local_infile_data * PrepareLocalInfileData(Handle<Value> buffer);
static void EIO_After_Query(uv_work_t *req);
static void EIO_Query(uv_work_t *req);
static Handle<Value> Query(const Arguments& args);
Expand Down