Skip to content

Commit

Permalink
Cleanup hanging iterator also when next() errored
Browse files Browse the repository at this point in the history
  • Loading branch information
vweevers committed Sep 24, 2021
1 parent 0f88586 commit 7356ba4
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 44 deletions.
86 changes: 42 additions & 44 deletions binding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ static napi_status CallFunction (napi_env env,
*
* - DoExecute (abstract, worker pool thread): main work
* - HandleOKCallback (main thread): call JS callback on success
* - HandleErrorCallback (main thread): call JS callback on error
* - DoFinally (main thread): do cleanup regardless of success
*/
struct BaseWorker {
Expand Down Expand Up @@ -310,48 +311,52 @@ struct BaseWorker {
}

virtual void DoExecute () = 0;
virtual void DoFinally (napi_env env) {};

static void Complete (napi_env env, napi_status status, void* data) {
BaseWorker* self = (BaseWorker*)data;

self->DoComplete(env);
self->DoFinally(env);

napi_delete_reference(env, self->callbackRef_);
napi_delete_async_work(env, self->asyncWork_);

delete self;
}

void DoComplete (napi_env env) {
if (status_.ok()) {
return HandleOKCallback(env);
}

napi_value argv = CreateError(env, errMsg_);
napi_value callback;
napi_get_reference_value(env, callbackRef_, &callback);
CallFunction(env, callback, 1, &argv);

if (status_.ok()) {
HandleOKCallback(env, callback);
} else {
HandleErrorCallback(env, callback);
}
}

virtual void HandleOKCallback (napi_env env) {
virtual void HandleOKCallback (napi_env env, napi_value callback) {
napi_value argv;
napi_get_null(env, &argv);
napi_value callback;
napi_get_reference_value(env, callbackRef_, &callback);
CallFunction(env, callback, 1, &argv);
}

virtual void HandleErrorCallback (napi_env env, napi_value callback) {
napi_value argv = CreateError(env, errMsg_);
CallFunction(env, callback, 1, &argv);
}

virtual void DoFinally (napi_env env) {
napi_delete_reference(env, callbackRef_);
napi_delete_async_work(env, asyncWork_);

delete this;
}

void Queue (napi_env env) {
napi_queue_async_work(env, asyncWork_);
}

napi_ref callbackRef_;
napi_async_work asyncWork_;
Database* database_;

private:
napi_ref callbackRef_;
napi_async_work asyncWork_;
leveldb::Status status_;
char *errMsg_;
};
Expand Down Expand Up @@ -491,6 +496,7 @@ struct PriorityWorker : public BaseWorker {

void DoFinally (napi_env env) override {
database_->DecrementPriorityWork(env);
BaseWorker::DoFinally(env);
}
};

Expand All @@ -507,7 +513,6 @@ struct BaseIterator {
int limit,
bool fillCache)
: database_(database),
isEnding_(false),
hasEnded_(false),
didSeek_(false),
reverse_(reverse),
Expand Down Expand Up @@ -669,7 +674,6 @@ struct BaseIterator {
}

Database* database_;
bool isEnding_;
bool hasEnded_;

private:
Expand Down Expand Up @@ -713,6 +717,7 @@ struct Iterator final : public BaseIterator {
highWaterMark_(highWaterMark),
landed_(false),
nexting_(false),
isEnding_(false),
endWorker_(NULL),
ref_(NULL) {
}
Expand All @@ -729,15 +734,6 @@ struct Iterator final : public BaseIterator {
if (ref_ != NULL) napi_delete_reference(env, ref_);
}

void CheckEndCallback (napi_env env) {
nexting_ = false;

if (endWorker_ != NULL) {
endWorker_->Queue(env);
endWorker_ = NULL;
}
}

bool ReadMany (uint32_t size, std::vector<std::pair<std::string, std::string>>& result) {
size_t bytesRead = 0;

Expand Down Expand Up @@ -780,6 +776,7 @@ struct Iterator final : public BaseIterator {
uint32_t highWaterMark_;
bool landed_;
bool nexting_;
bool isEnding_;
BaseWorker* endWorker_;

private:
Expand Down Expand Up @@ -1043,7 +1040,7 @@ struct GetWorker final : public PriorityWorker {
SetStatus(database_->Get(options_, key_, value_));
}

void HandleOKCallback (napi_env env) override {
void HandleOKCallback (napi_env env, napi_value callback) override {
napi_value argv[2];
napi_get_null(env, &argv[0]);

Expand All @@ -1053,8 +1050,6 @@ struct GetWorker final : public PriorityWorker {
napi_create_string_utf8(env, value_.data(), value_.size(), &argv[1]);
}

napi_value callback;
napi_get_reference_value(env, callbackRef_, &callback);
CallFunction(env, callback, 2, argv);
}

Expand Down Expand Up @@ -1233,12 +1228,10 @@ struct ApproximateSizeWorker final : public PriorityWorker {
size_ = database_->ApproximateSize(&range);
}

void HandleOKCallback (napi_env env) override {
void HandleOKCallback (napi_env env, napi_value callback) override {
napi_value argv[2];
napi_get_null(env, &argv[0]);
napi_create_int64(env, (uint64_t)size_, &argv[1]);
napi_value callback;
napi_get_reference_value(env, callbackRef_, &callback);
CallFunction(env, callback, 2, argv);
}

Expand Down Expand Up @@ -1486,10 +1479,9 @@ struct EndWorker final : public BaseWorker {
iterator_->End();
}

void HandleOKCallback (napi_env env) override {
// TODO: would this be safe(r) to do in DoFinally() i.e. after we call the callback?
void DoFinally (napi_env env) override {
iterator_->Detach(env);
BaseWorker::HandleOKCallback(env);
BaseWorker::DoFinally(env);
}

Iterator* iterator_;
Expand Down Expand Up @@ -1551,7 +1543,7 @@ struct NextWorker final : public BaseWorker {
}
}

void HandleOKCallback (napi_env env) override {
void HandleOKCallback (napi_env env, napi_value callback) override {
size_t arraySize = result_.size() * 2;
napi_value jsArray;
napi_create_array_with_length(env, arraySize, &jsArray);
Expand Down Expand Up @@ -1580,19 +1572,25 @@ struct NextWorker final : public BaseWorker {
napi_set_element(env, jsArray, static_cast<int>(arraySize - idx * 2 - 2), returnValue);
}

// clean up & handle the next/end state
// TODO: always do this, even on error
iterator_->CheckEndCallback(env);

napi_value argv[3];
napi_get_null(env, &argv[0]);
argv[1] = jsArray;
napi_get_boolean(env, !ok_, &argv[2]);
napi_value callback;
napi_get_reference_value(env, callbackRef_, &callback);
CallFunction(env, callback, 3, argv);
}

void DoFinally (napi_env env) override {
// clean up & handle the next/end state
iterator_->nexting_ = false;

if (iterator_->endWorker_ != NULL) {
iterator_->endWorker_->Queue(env);
iterator_->endWorker_ = NULL;
}

BaseWorker::DoFinally(env);
}

Iterator* iterator_;
std::vector<std::pair<std::string, std::string> > result_;
bool ok_;
Expand Down
24 changes: 24 additions & 0 deletions test/cleanup-hanging-iterators-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,27 @@ makeTest('test ending iterators', function (db, t, done) {
done()
})
})

makeTest('test recursive next', function (db, t, done) {
// Test that we're able to close when user keeps scheduling work
const it = db.iterator({ highWaterMark: 0 })

it.next(function loop (err, key) {
if (err && err.message !== 'iterator has ended') throw err
if (key !== undefined) it.next(loop)
})

done()
})

makeTest('test recursive next (random)', function (db, t, done) {
// Same as the test above but closing at a random time
const it = db.iterator({ highWaterMark: 0 })

it.next(function loop (err, key) {
if (err && err.message !== 'iterator has ended') throw err
if (key !== undefined) it.next(loop)
})

setTimeout(done, Math.floor(Math.random() * 50))
})

0 comments on commit 7356ba4

Please sign in to comment.