diff --git a/lib/sqlite3.js b/lib/sqlite3.js index cc7461a7a..36ee0438a 100644 --- a/lib/sqlite3.js +++ b/lib/sqlite3.js @@ -24,21 +24,21 @@ Database.prototype.prepare = function(sql) { // Database#run(sql, [bind1, bind2, ...], [callback]) Database.prototype.run = function(sql) { var statement = new Statement(this, sql); - statement.run.apply(statement, Array.prototype.slice.call(arguments, 1)); + statement.run.apply(statement, Array.prototype.slice.call(arguments, 1)).finalize(); return this; } // Database#get(sql, [bind1, bind2, ...], [callback]) Database.prototype.get = function(sql) { var statement = new Statement(this, sql); - statement.get.apply(statement, Array.prototype.slice.call(arguments, 1)); + statement.get.apply(statement, Array.prototype.slice.call(arguments, 1)).finalize(); return this; } // Database#all(sql, [bind1, bind2, ...], [callback]) Database.prototype.all = function(sql) { var statement = new Statement(this, sql); - statement.all.apply(statement, Array.prototype.slice.call(arguments, 1)); + statement.all.apply(statement, Array.prototype.slice.call(arguments, 1)).finalize(); return this; } diff --git a/src/database.cc b/src/database.cc index 1ce00d3c1..3c70ce28d 100644 --- a/src/database.cc +++ b/src/database.cc @@ -12,7 +12,7 @@ using namespace node_sqlite3; Persistent Database::constructor_template; -void Database::Init(v8::Handle target) { +void Database::Init(Handle target) { HandleScope scope; Local t = FunctionTemplate::New(New); @@ -23,9 +23,11 @@ void Database::Init(v8::Handle target) { constructor_template->SetClassName(String::NewSymbol("Database")); NODE_SET_PROTOTYPE_METHOD(constructor_template, "close", Close); + NODE_SET_PROTOTYPE_METHOD(constructor_template, "serialize", Serialize); + NODE_SET_PROTOTYPE_METHOD(constructor_template, "parallelize", Parallelize); - target->Set(v8::String::NewSymbol("Database"), - constructor_template->GetFunction()); + target->Set(String::NewSymbol("Database"), + constructor_template->GetFunction()); } void Database::Process() { @@ -57,20 +59,22 @@ void Database::Process() { return; } - while (open && !locked && !queue.empty()) { + + while (open && (!locked || pending == 0) && !queue.empty()) { Call* call = queue.front(); if (call->exclusive && pending > 0) { break; } + locked = call->exclusive; call->callback(call->baton); queue.pop(); delete call; } } -void Database::Schedule(EIO_Callback callback, Baton* baton, bool exclusive = false) { +void Database::Schedule(EIO_Callback callback, Baton* baton, bool exclusive) { if (!open && locked) { EXCEPTION(String::New("Database is closed"), SQLITE_MISUSE, exception); if (!baton->callback.IsEmpty() && baton->callback->IsFunction()) { @@ -84,10 +88,11 @@ void Database::Schedule(EIO_Callback callback, Baton* baton, bool exclusive = fa return; } - if (!open || locked || (exclusive && pending > 0)) { - queue.push(new Call(callback, baton, exclusive)); + if (!open || ((locked || exclusive || serialize) && pending > 0)) { + queue.push(new Call(callback, baton, exclusive || serialize)); } else { + locked = exclusive; callback(baton); } } @@ -198,9 +203,7 @@ Handle Database::Close(const Arguments& args) { void Database::EIO_BeginClose(Baton* baton) { assert(baton->db->open); - assert(!baton->db->locked); assert(baton->db->pending == 0); - baton->db->locked = true; eio_custom(EIO_Close, EIO_PRI_DEFAULT, EIO_AfterClose, baton); } @@ -255,6 +258,42 @@ int Database::EIO_AfterClose(eio_req *req) { return 0; } +Handle Database::Serialize(const Arguments& args) { + HandleScope scope; + Database* db = ObjectWrap::Unwrap(args.This()); + OPTIONAL_ARGUMENT_FUNCTION(0, callback); + + bool before = db->serialize; + db->serialize = true; + + if (!callback.IsEmpty() && callback->IsFunction()) { + TRY_CATCH_CALL(args.This(), callback, 0, NULL); + db->serialize = before; + } + + db->Process(); + + return args.This(); +} + +Handle Database::Parallelize(const Arguments& args) { + HandleScope scope; + Database* db = ObjectWrap::Unwrap(args.This()); + OPTIONAL_ARGUMENT_FUNCTION(0, callback); + + bool before = db->serialize; + db->serialize = false; + + if (!callback.IsEmpty() && callback->IsFunction()) { + TRY_CATCH_CALL(args.This(), callback, 0, NULL); + db->serialize = before; + } + + db->Process(); + + return args.This(); +} + /** * Override this so that we can properly close the database when this object * gets garbage collected. diff --git a/src/database.h b/src/database.h index 0d58d2357..a41ad8d62 100644 --- a/src/database.h +++ b/src/database.h @@ -21,7 +21,7 @@ class Database; class Database : public EventEmitter { public: static Persistent constructor_template; - static void Init(v8::Handle target); + static void Init(Handle target); static inline bool HasInstance(Handle val) { if (!val->IsObject()) return false; @@ -71,7 +71,8 @@ class Database : public EventEmitter { handle(NULL), open(false), locked(false), - pending(0) { + pending(0), + serialize(false) { } @@ -84,7 +85,7 @@ class Database : public EventEmitter { static int EIO_Open(eio_req *req); static int EIO_AfterOpen(eio_req *req); - void Schedule(EIO_Callback callback, Baton* baton, bool exclusive); + void Schedule(EIO_Callback callback, Baton* baton, bool exclusive = false); void Process(); static Handle Close(const Arguments& args); @@ -92,6 +93,9 @@ class Database : public EventEmitter { static int EIO_Close(eio_req *req); static int EIO_AfterClose(eio_req *req); + static Handle Serialize(const Arguments& args); + static Handle Parallelize(const Arguments& args); + void Wrap (Handle handle); inline void MakeWeak(); virtual void Unref(); @@ -106,6 +110,8 @@ class Database : public EventEmitter { bool locked; unsigned int pending; + bool serialize; + std::queue queue; }; diff --git a/src/macros.h b/src/macros.h index f5b947f81..997ca37a6 100644 --- a/src/macros.h +++ b/src/macros.h @@ -135,13 +135,23 @@ const char* sqlite_code_string(int code); static int EIO_##name(eio_req *req); \ static int EIO_After##name(eio_req *req); +#define STATEMENT_BEGIN(type) \ + assert(!baton->stmt->locked); \ + assert(!baton->stmt->finalized); \ + assert(baton->stmt->prepared); \ + baton->stmt->locked = true; \ + baton->stmt->db->pending++; \ + eio_custom(EIO_##type, EIO_PRI_DEFAULT, EIO_After##type, baton); + #define STATEMENT_INIT(type) \ type* baton = static_cast(req->data); \ Statement* stmt = baton->stmt; #define STATEMENT_END() \ stmt->locked = false; \ + stmt->db->pending--; \ stmt->Process(); \ + stmt->db->Process(); \ delete baton; #endif diff --git a/src/statement.cc b/src/statement.cc index ea9e5056f..b02fa4076 100644 --- a/src/statement.cc +++ b/src/statement.cc @@ -12,7 +12,7 @@ using namespace node_sqlite3; Persistent Statement::constructor_template; -void Statement::Init(v8::Handle target) { +void Statement::Init(Handle target) { HandleScope scope; Local t = FunctionTemplate::New(New); @@ -29,8 +29,8 @@ void Statement::Init(v8::Handle target) { NODE_SET_PROTOTYPE_METHOD(constructor_template, "reset", Reset); NODE_SET_PROTOTYPE_METHOD(constructor_template, "finalize", Finalize); - target->Set(v8::String::NewSymbol("Statement"), - constructor_template->GetFunction()); + target->Set(String::NewSymbol("Statement"), + constructor_template->GetFunction()); } void Statement::Process() { @@ -102,7 +102,7 @@ Handle Statement::New(const Arguments& args) { stmt->Wrap(args.This()); PrepareBaton* baton = new PrepareBaton(db, Local::Cast(args[2]), stmt); baton->sql = std::string(*String::Utf8Value(sql)); - db->Schedule(EIO_BeginPrepare, baton, false); + db->Schedule(EIO_BeginPrepare, baton); return args.This(); } @@ -110,7 +110,7 @@ Handle Statement::New(const Arguments& args) { void Statement::EIO_BeginPrepare(Database::Baton* baton) { assert(baton->db->open); - assert(!baton->db->locked); + baton->db->pending++; eio_custom(EIO_Prepare, EIO_PRI_DEFAULT, EIO_AfterPrepare, baton); } @@ -157,7 +157,6 @@ int Statement::EIO_AfterPrepare(eio_req *req) { } STATEMENT_END(); - baton->db->Process(); return 0; } @@ -263,11 +262,7 @@ Handle Statement::Bind(const Arguments& args) { } void Statement::EIO_BeginBind(Baton* baton) { - assert(!baton->stmt->locked); - assert(!baton->stmt->finalized); - assert(baton->stmt->prepared); - baton->stmt->locked = true; - eio_custom(EIO_Bind, EIO_PRI_DEFAULT, EIO_AfterBind, baton); + STATEMENT_BEGIN(Bind); } int Statement::EIO_Bind(eio_req *req) { @@ -313,11 +308,7 @@ Handle Statement::Get(const Arguments& args) { } void Statement::EIO_BeginGet(Baton* baton) { - assert(!baton->stmt->locked); - assert(!baton->stmt->finalized); - assert(baton->stmt->prepared); - baton->stmt->locked = true; - eio_custom(EIO_Get, EIO_PRI_DEFAULT, EIO_AfterGet, baton); + STATEMENT_BEGIN(Get); } int Statement::EIO_Get(eio_req *req) { @@ -383,11 +374,7 @@ Handle Statement::Run(const Arguments& args) { } void Statement::EIO_BeginRun(Baton* baton) { - assert(!baton->stmt->locked); - assert(!baton->stmt->finalized); - assert(baton->stmt->prepared); - baton->stmt->locked = true; - eio_custom(EIO_Run, EIO_PRI_DEFAULT, EIO_AfterRun, baton); + STATEMENT_BEGIN(Run); } int Statement::EIO_Run(eio_req *req) { @@ -444,11 +431,7 @@ Handle Statement::All(const Arguments& args) { } void Statement::EIO_BeginAll(Baton* baton) { - assert(!baton->stmt->locked); - assert(!baton->stmt->finalized); - assert(baton->stmt->prepared); - baton->stmt->locked = true; - eio_custom(EIO_All, EIO_PRI_DEFAULT, EIO_AfterAll, baton); + STATEMENT_BEGIN(All); } int Statement::EIO_All(eio_req *req) { @@ -529,11 +512,7 @@ Handle Statement::Reset(const Arguments& args) { } void Statement::EIO_BeginReset(Baton* baton) { - assert(!baton->stmt->locked); - assert(!baton->stmt->finalized); - assert(baton->stmt->prepared); - baton->stmt->locked = true; - eio_custom(EIO_Reset, EIO_PRI_DEFAULT, EIO_AfterReset, baton); + STATEMENT_BEGIN(Reset); } int Statement::EIO_Reset(eio_req *req) { @@ -649,8 +628,6 @@ void Statement::Finalize() { // error events in case those failed. sqlite3_finalize(handle); handle = NULL; - db->pending--; - db->Process(); db->Unref(); } diff --git a/src/statement.h b/src/statement.h index 78cb2a876..c54c51c48 100644 --- a/src/statement.h +++ b/src/statement.h @@ -124,7 +124,6 @@ class Statement : public EventEmitter { prepared(false), locked(false), finalized(false) { - db->pending++; db->Ref(); } diff --git a/test/serialization.test.js b/test/serialization.test.js new file mode 100644 index 000000000..e68aff5a6 --- /dev/null +++ b/test/serialization.test.js @@ -0,0 +1,85 @@ +var sqlite3 = require('sqlite3'); +var assert = require('assert'); + +exports['test serialize() and parallelize()'] = function(beforeExit) { + var db = new sqlite3.Database(':memory:'); + + var inserted1 = 0; + var inserted2 = 0; + var retrieved = 0; + + var count = 1000; + + db.serialize(); + db.run("CREATE TABLE foo (txt text, num int, flt float, blb blob)"); + db.parallelize(); + + var stmt1 = db.prepare("INSERT INTO foo VALUES(?, ?, ?, ?)"); + var stmt2 = db.prepare("INSERT INTO foo VALUES(?, ?, ?, ?)"); + for (var i = 0; i < count; i++) { + stmt1.run('String ' + i, i, i * Math.PI, function(err) { + if (err) throw err; + inserted1++; + // Might sometimes fail, but should work fine most of the time. + assert.ok(inserted2 >= Math.floor(0.95 * inserted1)); + }); + i++; + stmt2.run('String ' + i, i, i * Math.PI, function(err) { + if (err) throw err; + inserted2++; + assert.ok(inserted1 >= Math.floor(0.95 * inserted2)); + }); + } + db.serialize(); + db.all("SELECT txt, num, flt, blb FROM foo ORDER BY num", function(err, rows) { + if (err) throw err; + for (var i = 0; i < rows.length; i++) { + assert.equal(rows[i][0], 'String ' + i); + assert.equal(rows[i][1], i); + assert.equal(rows[i][2], i * Math.PI); + assert.equal(rows[i][3], null); + retrieved++; + } + }); + + beforeExit(function() { + assert.equal(count, inserted1 + inserted2, "Didn't insert all rows"); + assert.equal(count, retrieved, "Didn't retrieve all rows"); + }); +} + +exports['test serialize(fn)'] = function(beforeExit) { + var db = new sqlite3.Database(':memory:'); + + var inserted = 0; + var retrieved = 0; + + var count = 1000; + db.serialize(function() { + db.run("CREATE TABLE foo (txt text, num int, flt float, blb blob)"); + + var stmt = db.prepare("INSERT INTO foo VALUES(?, ?, ?, ?)"); + for (var i = 0; i < count; i++) { + stmt.run('String ' + i, i, i * Math.PI, function(err) { + if (err) throw err; + inserted++; + }); + } + + db.all("SELECT txt, num, flt, blb FROM foo ORDER BY num", function(err, rows) { + if (err) throw err; + for (var i = 0; i < rows.length; i++) { + assert.equal(rows[i][0], 'String ' + i); + assert.equal(rows[i][1], i); + assert.equal(rows[i][2], i * Math.PI); + assert.equal(rows[i][3], null); + retrieved++; + } + }); + }); + + beforeExit(function() { + assert.equal(count, inserted, "Didn't insert all rows"); + assert.equal(count, retrieved, "Didn't retrieve all rows"); + }); +}