Skip to content

Commit

Permalink
Merge pull request #158 from anton-kotenko/master
Browse files Browse the repository at this point in the history
LOAD DATA LOCAL INFILE implemetation
  • Loading branch information
Sannis committed Sep 10, 2012
2 parents 4e8a266 + 04ea286 commit e6e0762
Show file tree
Hide file tree
Showing 8 changed files with 442 additions and 29 deletions.
5 changes: 3 additions & 2 deletions Makefile
Expand Up @@ -31,15 +31,16 @@ clean-all: clean

test: npm-install
./node_modules/.bin/nodeunit --reporter=minimal tests/low-level-sync tests/low-level-async \
tests/high-level tests/complex tests/issues
tests/high-level tests/complex tests/issues \
tests/load-data-infile

test-slow: npm-install
./node_modules/.bin/nodeunit --reporter=minimal tests/slow

test-all: npm-install
./node_modules/.bin/nodeunit --reporter=minimal tests/low-level-sync tests/low-level-async \
tests/high-level tests/complex tests/issues \
tests/slow
tests/slow tests/load-data-infile

test-profile: npm-install
rm -f v8.log
Expand Down
2 changes: 1 addition & 1 deletion binding.gyp
Expand Up @@ -32,4 +32,4 @@
]
}
]
}
}
3 changes: 1 addition & 2 deletions src/mysql_bindings.cc
Expand Up @@ -88,8 +88,7 @@ void InitMysqlLibmysqlclient(Handle<Object> target) {
NODE_DEFINE_CONSTANT(target, MYSQL_OPT_CONNECT_TIMEOUT);
// Unused, embedded
// NODE_DEFINE_CONSTANT(MYSQL_OPT_GUESS_CONNECTION);
// Not yet implemented
// NODE_DEFINE_CONSTANT(target, MYSQL_OPT_LOCAL_INFILE);
NODE_DEFINE_CONSTANT(target, MYSQL_OPT_LOCAL_INFILE);
// Unused, windows
// NODE_DEFINE_CONSTANT(target, MYSQL_OPT_NAMED_PIPE);
NODE_DEFINE_CONSTANT(target, MYSQL_OPT_PROTOCOL);
Expand Down
11 changes: 10 additions & 1 deletion src/mysql_bindings.h
Expand Up @@ -9,7 +9,7 @@
#define SRC_MYSQL_BINDINGS_H_

#include <v8.h>

#include<node_buffer.h>
/*!
* Use this header file to conditionally invoke different libev/libeio/libuv functions
* depending on the node version that the module is being compiled for.
Expand Down Expand Up @@ -88,6 +88,15 @@ if (args.Length() > (I) && args[I]->IsFunction()) {\
} else { \
VAR = Null(); \
}
#define OPTIONAL_BUFFER_ARG(I, VAR) \
Handle<Value> VAR;\
if (args.Length() > (I) &&\
args[I]->IsObject() &&\
node::Buffer::HasInstance(args[I])) {\
VAR = args[I]->ToObject();\
} else {\
VAR = Null();\
}

#ifdef DEBUG
#define DEBUG_PRINTF(...) fprintf(stdout, __VA_ARGS__)
Expand Down
121 changes: 101 additions & 20 deletions src/mysql_bindings_connection.cc
Expand Up @@ -11,7 +11,7 @@
#include "./mysql_bindings_connection.h"
#include "./mysql_bindings_result.h"
#include "./mysql_bindings_statement.h"

#include<stdio.h>
/*!
* Init V8 structures for MysqlConnection class
*/
Expand Down Expand Up @@ -91,7 +91,6 @@ bool MysqlConnection::Connect(const char* hostname,
uint32_t port,
const char* socket,
uint64_t flags) {

if (this->_conn) {
return false;
}
Expand Down Expand Up @@ -363,21 +362,19 @@ Handle<Value> MysqlConnection::CommitSync(const Arguments& args) {
*/
void MysqlConnection::EIO_After_Connect(uv_work_t *req) {
HandleScope scope;

struct connect_request *conn_req = (struct connect_request *)(req->data);

const int argc = 1;
Local<Value> argv[argc];

if (!conn_req->ok) {

unsigned int error_string_length = strlen(conn_req->conn->connect_error) + 25;
char* error_string = new char[error_string_length];
snprintf(
error_string, error_string_length,
"Connection error #%d: %s",
conn_req->conn->connect_errno, conn_req->conn->connect_error
);
conn_req->conn->connect_errno, conn_req->conn->connect_error);

argv[0] = V8EXC(error_string);
delete[] error_string;
Expand All @@ -388,9 +385,9 @@ void MysqlConnection::EIO_After_Connect(uv_work_t *req) {
node::MakeCallback(Context::GetCurrent()->Global(), conn_req->callback, argc, argv);

conn_req->callback.Dispose();

conn_req->conn->Unref();

delete conn_req;

delete req;
Expand Down Expand Up @@ -496,7 +493,6 @@ Handle<Value> MysqlConnection::ConnectSync(const Arguments& args) {
HandleScope scope;

MysqlConnection *conn = OBJUNWRAP<MysqlConnection>(args.Holder());

if (conn->_conn) {
return THREXC("Already initialized. "
"Use conn.realConnectSync() after conn.initSync()");
Expand All @@ -518,7 +514,7 @@ Handle<Value> MysqlConnection::ConnectSync(const Arguments& args) {
args[4]->IsUint32() ? port : 0,
args[5]->IsString() ? *socket : NULL,
args[6]->IsUint32() ? flags : 0
);
);

if (!r) {
return scope.Close(False());
Expand Down Expand Up @@ -855,6 +851,8 @@ Handle<Value> MysqlConnection::InitStatementSync(const Arguments& args) {

MysqlConnection *conn = OBJUNWRAP<MysqlConnection>(args.Holder());

MYSQLCONN_MUSTBE_CONNECTED;

MYSQL_STMT *my_statement = mysql_stmt_init(conn->_conn);

if (!my_statement) {
Expand Down Expand Up @@ -1082,7 +1080,14 @@ void MysqlConnection::EIO_Query(uv_work_t *req) {

MYSQLCONN_DISABLE_MQ;

// we are protected with mutex, so set CURRENT request data
// in connection (common object for ALL queries)
SetCorrectLocalInfileHandlers(query_req->infile_data, conn->_conn);
int r = mysql_real_query(conn->_conn, query_req->query, query_req->query_len);

// clean after ourselves
RestoreLocalInfileHandlers(query_req->infile_data, conn->_conn);

int errno = mysql_errno(conn->_conn);
if (r != 0 || errno != 0) {
// Query error
Expand Down Expand Up @@ -1132,19 +1137,27 @@ Handle<Value> MysqlConnection::Query(const Arguments& args) {
HandleScope scope;

REQ_STR_ARG(0, query);
OPTIONAL_FUN_ARG(1, callback);
OPTIONAL_BUFFER_ARG(1, local_infile_buffer);

Handle<Value> callback;
if (local_infile_buffer->IsNull()) {
OPTIONAL_FUN_ARG(1, possibly_callback);
callback = possibly_callback;
} else {
OPTIONAL_FUN_ARG(2, possibly_callback);
callback = possibly_callback;
}

MysqlConnection *conn = OBJUNWRAP<MysqlConnection>(args.Holder());

MYSQLCONN_MUSTBE_CONNECTED;

query_request *query_req = new query_request;

unsigned int query_len = static_cast<unsigned int>(query.length());

query_req->query = new char[query_len + 1];
query_req->query_len = query_len;

query_req->infile_data = MysqlConnection::PrepareLocalInfileData(local_infile_buffer);
// Copy query from V8 value to buffer
memcpy(query_req->query, *query, query_len);
query_req->query[query_len] = '\0';
Expand Down Expand Up @@ -1293,27 +1306,28 @@ Handle<Value> MysqlConnection::QuerySync(const Arguments& args) {
MysqlConnection *conn = OBJUNWRAP<MysqlConnection>(args.Holder());

REQ_STR_ARG(0, query)
OPTIONAL_BUFFER_ARG(1, local_infile_buffer);

MYSQLCONN_MUSTBE_CONNECTED;

MYSQLCONN_DISABLE_MQ;

MYSQL_RES *my_result = NULL;
unsigned int field_count;

unsigned int query_len = static_cast<unsigned int>(query.length());

unsigned int query_len = static_cast<unsigned int>(query.length());
local_infile_data * infile_data = PrepareLocalInfileData(local_infile_buffer);
// Only one query can be executed on a connection at a time
pthread_mutex_lock(&conn->query_lock);

SetCorrectLocalInfileHandlers(infile_data, conn->_conn);
int r = mysql_real_query(conn->_conn, *query, query_len);
RestoreLocalInfileHandlers(infile_data, conn->_conn);
if (r == 0) {
my_result = mysql_store_result(conn->_conn);
field_count = mysql_field_count(conn->_conn);
}

pthread_mutex_unlock(&conn->query_lock);

if (r != 0) {
// Query error
return scope.Close(False());
Expand Down Expand Up @@ -1384,7 +1398,6 @@ Handle<Value> MysqlConnection::RealConnectSync(const Arguments& args) {
uint32_t port = args[4]->Uint32Value();
String::Utf8Value socket(args[5]->ToString());
uint64_t flags = args[6]->Uint32Value();

bool r = conn->RealConnect(args[0]->IsString() ? *hostname : NULL,
args[1]->IsString() ? *user : NULL,
args[2]->IsString() ? *password : NULL,
Expand Down Expand Up @@ -1529,7 +1542,7 @@ Handle<Value> MysqlConnection::SetOptionSync(const Arguments& args) {
}
break;
case MYSQL_OPT_LOCAL_INFILE:
return THREXC("This option isn't implemented yet");
r = mysql_options(conn->_conn, option_key, NULL);
break;
case MYSQL_OPT_NAMED_PIPE:
case MYSQL_SHARED_MEMORY_BASE_NAME:
Expand Down Expand Up @@ -1727,3 +1740,71 @@ Handle<Value> MysqlConnection::WarningCountSync(const Arguments& args) {

return scope.Close(Integer::NewFromUnsigned(warning_count));
}
int MysqlConnection::CustomLocalInfileInit(void ** ptr, const char * filename, void * userdata) {
*ptr = userdata;
return 0;
}
int MysqlConnection::CustomLocalInfileRead(void * ptr, char * buf, unsigned int buf_len) {
local_infile_data * infile_data = static_cast<local_infile_data *>(ptr);
if (!infile_data->buffer || !infile_data->length) {
return 0;
}
if (infile_data->position >= infile_data->length) {
return 0;
}
size_t copy_len = infile_data->length - infile_data->position;
copy_len = copy_len < buf_len ? copy_len : buf_len;
memcpy(buf, infile_data->buffer + infile_data->position, copy_len);
infile_data->position += copy_len;
return copy_len;
}
void MysqlConnection::CustomLocalInfileEnd(void * ptr) {
}
int MysqlConnection::CustomLocalInfileError(void * ptr,
char *error_msg,
unsigned int error_msg_len) {
return 0;
}

MysqlConnection::local_infile_data * MysqlConnection::PrepareLocalInfileData(Handle<Value> buffer) {
local_infile_data * infile_data;
if (buffer->IsNull()) {
return NULL;
}
infile_data = static_cast<local_infile_data*>(malloc(sizeof(*infile_data)));
infile_data->length = node::Buffer::Length(buffer->ToObject());
infile_data->position = 0;
infile_data->buffer = static_cast<char *>(malloc(infile_data->length));
memcpy(infile_data->buffer, node::Buffer::Data(buffer->ToObject()), infile_data->length);
return infile_data;
}
void MysqlConnection::SetCorrectLocalInfileHandlers(local_infile_data * infile_data,
MYSQL * conn) {
if (infile_data) {
mysql_set_local_infile_handler(conn,
MysqlConnection::CustomLocalInfileInit,
MysqlConnection::CustomLocalInfileRead,
MysqlConnection::CustomLocalInfileEnd,
MysqlConnection::CustomLocalInfileError,
infile_data);
} else {
// default infile handlers uses thread local storage,
// so to be shure everything is ok, init thread specific data.
// it's cheap enough, it initializes everything needed only
// first time, all subsequent calls do nothing
mysql_thread_init();
mysql_set_local_infile_default(conn);
}
}
void MysqlConnection::RestoreLocalInfileHandlers(local_infile_data * infile_data,
MYSQL * conn) {
if (infile_data) {
mysql_set_local_infile_default(conn);
if (infile_data->buffer) {
free(infile_data->buffer);
}
free(infile_data);
} else {
mysql_thread_end();
}
}
28 changes: 25 additions & 3 deletions src/mysql_bindings_connection.h
Expand Up @@ -113,7 +113,7 @@ class MysqlConnection : public node::ObjectWrap {

struct connect_request {
bool ok;

Persistent<Function> callback;
MysqlConnection *conn;

Expand Down Expand Up @@ -172,26 +172,48 @@ class MysqlConnection : public node::ObjectWrap {
static Handle<Value> MultiRealQuerySync(const Arguments& args);

static Handle<Value> PingSync(const Arguments& args);
struct local_infile_data {
char * buffer;
size_t length;
size_t position;
};

struct query_request {
bool ok;
bool connection_closed;
bool have_result_set;

Persistent<Value> callback;
MysqlConnection *conn;

char *query;
unsigned int query_len;

MYSQL_RES *my_result;
uint32_t field_count;
my_ulonglong affected_rows;
my_ulonglong insert_id;

unsigned int errno;
const char *error;

local_infile_data * infile_data;
};
static int CustomLocalInfileInit(void ** ptr,
const char * filename,
void * userdata);
static int CustomLocalInfileRead(void * ptr,
char * buf,
unsigned int buf_len);
static void CustomLocalInfileEnd(void * ptr);
static int CustomLocalInfileError(void * ptr,
char * error_msg,
unsigned int error_msg_len);
static void SetCorrectLocalInfileHandlers(local_infile_data * infile_data,
MYSQL * conn);
static void RestoreLocalInfileHandlers(local_infile_data * infile_data,
MYSQL * conn);
static local_infile_data * PrepareLocalInfileData(Handle<Value> buffer);
static void EIO_After_Query(uv_work_t *req);
static void EIO_Query(uv_work_t *req);
static Handle<Value> Query(const Arguments& args);
Expand Down

0 comments on commit e6e0762

Please sign in to comment.