Skip to content

Commit

Permalink
LevelDB, feat: add firstKey and lastKey methods, enhancements to for…
Browse files Browse the repository at this point in the history
…Each.
  • Loading branch information
xicilion committed Jan 16, 2024
1 parent 14d029d commit 845e4d5
Show file tree
Hide file tree
Showing 6 changed files with 637 additions and 74 deletions.
29 changes: 19 additions & 10 deletions fibjs/include/LevelDB.h
Expand Up @@ -34,8 +34,14 @@ class LevelDB : public LevelDB_base {
virtual result_t mset(v8::Local<v8::Object> map);
virtual result_t mremove(v8::Local<v8::Array> keys);
virtual result_t remove(Buffer_base* key, AsyncEvent* ac);
virtual result_t firstKey(obj_ptr<Buffer_base>& retVal, AsyncEvent* ac);
virtual result_t lastKey(obj_ptr<Buffer_base>& retVal, AsyncEvent* ac);
virtual result_t forEach(v8::Local<v8::Function> func);
virtual result_t between(Buffer_base* from, Buffer_base* to, v8::Local<v8::Function> func);
virtual result_t forEach(Buffer_base* from, v8::Local<v8::Function> func);
virtual result_t forEach(Buffer_base* from, Buffer_base* to, v8::Local<v8::Function> func);
virtual result_t forEach(v8::Local<v8::Object> opt, v8::Local<v8::Function> func);
virtual result_t forEach(Buffer_base* from, v8::Local<v8::Object> opt, v8::Local<v8::Function> func);
virtual result_t forEach(Buffer_base* from, Buffer_base* to, v8::Local<v8::Object> opt, v8::Local<v8::Function> func);
virtual result_t begin(obj_ptr<LevelDB_base>& retVal);
virtual result_t commit();
virtual result_t close(AsyncEvent* ac);
Expand Down Expand Up @@ -109,9 +115,6 @@ class LevelDB : public LevelDB_base {
class Iter : public object_base {
public:
Iter(leveldb::DB* db)
: m_count(0)
, m_first(true)
, m_end(false)
{
m_it = db->NewIterator(leveldb::ReadOptions());
}
Expand All @@ -126,20 +129,26 @@ class LevelDB : public LevelDB_base {

result_t iter(Isolate* isolate, v8::Local<v8::Function> func);

result_t getValue(Buffer_base* from, Buffer_base* to)
result_t getValue(Buffer_base* from, Buffer_base* to = NULL)
{
from->toString(m_from);
to->toString(m_to);
if (from)
from->toString(m_from);
if (to)
to->toString(m_to);
return 0;
}

public:
obj_ptr<Buffer_base> m_kvs[ITER_BLOCK_SIZE * 2];
leveldb::Iterator* m_it;
int32_t m_count;
bool m_first;
bool m_end;
bool m_reverse = false;
int32_t m_skip = 0;
int32_t m_limit = -1;
exlib::string m_from, m_to;

int32_t m_count = 0;
bool m_first = true;
bool m_end = false;
};

private:
Expand Down
89 changes: 79 additions & 10 deletions fibjs/include/ifs/LevelDB.h
Expand Up @@ -29,8 +29,14 @@ class LevelDB_base : public object_base {
virtual result_t mset(v8::Local<v8::Object> map) = 0;
virtual result_t mremove(v8::Local<v8::Array> keys) = 0;
virtual result_t remove(Buffer_base* key, AsyncEvent* ac) = 0;
virtual result_t firstKey(obj_ptr<Buffer_base>& retVal, AsyncEvent* ac) = 0;
virtual result_t lastKey(obj_ptr<Buffer_base>& retVal, AsyncEvent* ac) = 0;
virtual result_t forEach(v8::Local<v8::Function> func) = 0;
virtual result_t between(Buffer_base* from, Buffer_base* to, v8::Local<v8::Function> func) = 0;
virtual result_t forEach(Buffer_base* from, v8::Local<v8::Function> func) = 0;
virtual result_t forEach(Buffer_base* from, Buffer_base* to, v8::Local<v8::Function> func) = 0;
virtual result_t forEach(v8::Local<v8::Object> opt, v8::Local<v8::Function> func) = 0;
virtual result_t forEach(Buffer_base* from, v8::Local<v8::Object> opt, v8::Local<v8::Function> func) = 0;
virtual result_t forEach(Buffer_base* from, Buffer_base* to, v8::Local<v8::Object> opt, v8::Local<v8::Function> func) = 0;
virtual result_t begin(obj_ptr<LevelDB_base>& retVal) = 0;
virtual result_t commit() = 0;
virtual result_t close(AsyncEvent* ac) = 0;
Expand All @@ -52,8 +58,9 @@ class LevelDB_base : public object_base {
static void s_mset(const v8::FunctionCallbackInfo<v8::Value>& args);
static void s_mremove(const v8::FunctionCallbackInfo<v8::Value>& args);
static void s_remove(const v8::FunctionCallbackInfo<v8::Value>& args);
static void s_firstKey(const v8::FunctionCallbackInfo<v8::Value>& args);
static void s_lastKey(const v8::FunctionCallbackInfo<v8::Value>& args);
static void s_forEach(const v8::FunctionCallbackInfo<v8::Value>& args);
static void s_between(const v8::FunctionCallbackInfo<v8::Value>& args);
static void s_begin(const v8::FunctionCallbackInfo<v8::Value>& args);
static void s_commit(const v8::FunctionCallbackInfo<v8::Value>& args);
static void s_close(const v8::FunctionCallbackInfo<v8::Value>& args);
Expand All @@ -63,6 +70,8 @@ class LevelDB_base : public object_base {
ASYNC_MEMBERVALUE2(LevelDB_base, get, Buffer_base*, obj_ptr<Buffer_base>);
ASYNC_MEMBER2(LevelDB_base, set, Buffer_base*, Buffer_base*);
ASYNC_MEMBER1(LevelDB_base, remove, Buffer_base*);
ASYNC_MEMBERVALUE1(LevelDB_base, firstKey, obj_ptr<Buffer_base>);
ASYNC_MEMBERVALUE1(LevelDB_base, lastKey, obj_ptr<Buffer_base>);
ASYNC_MEMBER0(LevelDB_base, close);
};
}
Expand All @@ -84,8 +93,11 @@ inline ClassInfo& LevelDB_base::class_info()
{ "mremove", s_mremove, false, false },
{ "remove", s_remove, false, true },
{ "removeSync", s_remove, false, false },
{ "firstKey", s_firstKey, false, true },
{ "firstKeySync", s_firstKey, false, false },
{ "lastKey", s_lastKey, false, true },
{ "lastKeySync", s_lastKey, false, false },
{ "forEach", s_forEach, false, false },
{ "between", s_between, false, false },
{ "begin", s_begin, false, false },
{ "commit", s_commit, false, false },
{ "close", s_close, false, true },
Expand Down Expand Up @@ -220,6 +232,40 @@ inline void LevelDB_base::s_remove(const v8::FunctionCallbackInfo<v8::Value>& ar
METHOD_VOID();
}

inline void LevelDB_base::s_firstKey(const v8::FunctionCallbackInfo<v8::Value>& args)
{
obj_ptr<Buffer_base> vr;

ASYNC_METHOD_INSTANCE(LevelDB_base);
METHOD_ENTER();

ASYNC_METHOD_OVER(0, 0);

if (!cb.IsEmpty())
hr = pInst->acb_firstKey(cb, args);
else
hr = pInst->ac_firstKey(vr);

METHOD_RETURN();
}

inline void LevelDB_base::s_lastKey(const v8::FunctionCallbackInfo<v8::Value>& args)
{
obj_ptr<Buffer_base> vr;

ASYNC_METHOD_INSTANCE(LevelDB_base);
METHOD_ENTER();

ASYNC_METHOD_OVER(0, 0);

if (!cb.IsEmpty())
hr = pInst->acb_lastKey(cb, args);
else
hr = pInst->ac_lastKey(vr);

METHOD_RETURN();
}

inline void LevelDB_base::s_forEach(const v8::FunctionCallbackInfo<v8::Value>& args)
{
METHOD_INSTANCE(LevelDB_base);
Expand All @@ -231,21 +277,44 @@ inline void LevelDB_base::s_forEach(const v8::FunctionCallbackInfo<v8::Value>& a

hr = pInst->forEach(v0);

METHOD_VOID();
}
METHOD_OVER(2, 2);

inline void LevelDB_base::s_between(const v8::FunctionCallbackInfo<v8::Value>& args)
{
METHOD_INSTANCE(LevelDB_base);
METHOD_ENTER();
ARG(obj_ptr<Buffer_base>, 0);
ARG(v8::Local<v8::Function>, 1);

hr = pInst->forEach(v0, v1);

METHOD_OVER(3, 3);

ARG(obj_ptr<Buffer_base>, 0);
ARG(obj_ptr<Buffer_base>, 1);
ARG(v8::Local<v8::Function>, 2);

hr = pInst->between(v0, v1, v2);
hr = pInst->forEach(v0, v1, v2);

METHOD_OVER(2, 2);

ARG(v8::Local<v8::Object>, 0);
ARG(v8::Local<v8::Function>, 1);

hr = pInst->forEach(v0, v1);

METHOD_OVER(3, 3);

ARG(obj_ptr<Buffer_base>, 0);
ARG(v8::Local<v8::Object>, 1);
ARG(v8::Local<v8::Function>, 2);

hr = pInst->forEach(v0, v1, v2);

METHOD_OVER(4, 4);

ARG(obj_ptr<Buffer_base>, 0);
ARG(obj_ptr<Buffer_base>, 1);
ARG(v8::Local<v8::Object>, 2);
ARG(v8::Local<v8::Function>, 3);

hr = pInst->forEach(v0, v1, v2, v3);

METHOD_VOID();
}
Expand Down
143 changes: 117 additions & 26 deletions fibjs/src/db/LevelDB.cpp
Expand Up @@ -298,33 +298,76 @@ result_t LevelDB::remove(Buffer_base* key, AsyncEvent* ac)
return 0;
}

result_t LevelDB::firstKey(obj_ptr<Buffer_base>& retVal, AsyncEvent* ac)
{
if (!db())
return CHECK_ERROR(CALL_E_INVALID_CALL);

if (ac->isSync())
return CHECK_ERROR(CALL_E_NOSYNC);

leveldb::Iterator* it = db()->NewIterator(leveldb::ReadOptions());
it->SeekToFirst();
if (!it->Valid()) {
delete it;
return CALL_RETURN_NULL;
}

retVal = new Buffer(it->key().data(), it->key().size());
delete it;

return 0;
}

result_t LevelDB::lastKey(obj_ptr<Buffer_base>& retVal, AsyncEvent* ac)
{
if (!db())
return CHECK_ERROR(CALL_E_INVALID_CALL);

if (ac->isSync())
return CHECK_ERROR(CALL_E_NOSYNC);

leveldb::Iterator* it = db()->NewIterator(leveldb::ReadOptions());
it->SeekToLast();
if (!it->Valid()) {
delete it;
return CALL_RETURN_NULL;
}

retVal = new Buffer(it->key().data(), it->key().size());
delete it;

return 0;
}

result_t LevelDB::Iter::_iter(AsyncEvent* ac)
{
m_count = 0;

if (m_first) {
if (m_from.empty())
m_it->SeekToFirst();
else
m_it->Seek(leveldb::Slice(m_from.c_str(), m_from.length()));
if (m_reverse) {
if (m_from.empty())
m_it->SeekToLast();
else {
m_it->Seek(leveldb::Slice(m_from.c_str(), m_from.length()));

leveldb::Slice key = m_it->key();
if (key.compare(leveldb::Slice(m_from.c_str(), m_from.length())) != 0)
m_it->Prev();
}
} else {
if (m_from.empty())
m_it->SeekToFirst();
else
m_it->Seek(leveldb::Slice(m_from.c_str(), m_from.length()));
}

m_first = false;

if (!m_it->Valid()) {
m_end = true;
return 0;
}

if (!m_from.empty()) {
leveldb::Slice key = m_it->key();
if (key.compare(leveldb::Slice(m_from.c_str(), m_from.length())) == 0) {
m_it->Next();
if (!m_it->Valid()) {
m_end = true;
return 0;
}
}
}
}

while (m_count < ITER_BLOCK_SIZE) {
Expand All @@ -334,13 +377,28 @@ result_t LevelDB::Iter::_iter(AsyncEvent* ac)
break;
}

m_kvs[m_count * 2] = new Buffer(key.data(), key.size());
leveldb::Slice value = m_it->value();
m_kvs[m_count * 2 + 1] = new Buffer(value.data(), value.size());
if (m_skip == 0) {
m_kvs[m_count * 2] = new Buffer(key.data(), key.size());
leveldb::Slice value = m_it->value();
m_kvs[m_count * 2 + 1] = new Buffer(value.data(), value.size());

m_count++;

if (m_limit > 0) {
m_limit--;
if (m_limit == 0) {
m_end = true;
break;
}
}
} else
m_skip--;

m_count++;
if (m_reverse)
m_it->Prev();
else
m_it->Next();

m_it->Next();
if (!m_it->Valid()) {
m_end = true;
break;
Expand Down Expand Up @@ -392,16 +450,49 @@ result_t LevelDB::forEach(v8::Local<v8::Function> func)
return Iter(db()).iter(holder(), func);
}

result_t LevelDB::between(Buffer_base* from, Buffer_base* to, v8::Local<v8::Function> func)
result_t LevelDB::forEach(Buffer_base* from, v8::Local<v8::Function> func)
{
if (!db())
return CHECK_ERROR(CALL_E_INVALID_CALL);
return forEach(from, NULL, v8::Local<v8::Object>(), func);
}

result_t LevelDB::forEach(Buffer_base* from, Buffer_base* to, v8::Local<v8::Function> func)
{
return forEach(from, to, v8::Local<v8::Object>(), func);
}

result_t LevelDB::forEach(v8::Local<v8::Object> opt, v8::Local<v8::Function> func)
{
return forEach(NULL, NULL, opt, func);
}

result_t LevelDB::forEach(Buffer_base* from, v8::Local<v8::Object> opt, v8::Local<v8::Function> func)
{
return forEach(from, NULL, opt, func);
}

result_t LevelDB::forEach(Buffer_base* from, Buffer_base* to, v8::Local<v8::Object> opt, v8::Local<v8::Function> func)
{
obj_ptr<Iter> it = new Iter(db());

result_t hr = it->getValue(from, to);
if (hr < 0)
return hr;
it->getValue(from, to);
if (!opt.IsEmpty()) {
Isolate* isolate = holder();
result_t hr;

hr = GetConfigValue(isolate, opt, "skip", it->m_skip, true);
if (hr < 0 && hr != CALL_E_PARAMNOTOPTIONAL)
return hr;

hr = GetConfigValue(isolate, opt, "limit", it->m_limit, true);
if (hr < 0 && hr != CALL_E_PARAMNOTOPTIONAL)
return hr;
if (it->m_limit == 0)
return Runtime::setError("limit must be greater than 0");

hr = GetConfigValue(isolate, opt, "reverse", it->m_reverse, true);
if (hr < 0 && hr != CALL_E_PARAMNOTOPTIONAL)
return hr;
}

return it->iter(holder(), func);
}
Expand Down

0 comments on commit 845e4d5

Please sign in to comment.