Skip to content

Commit

Permalink
Merge pull request #69 from basho/mv-iterator-prev
Browse files Browse the repository at this point in the history
mv-iterator-prev branch
  • Loading branch information
Matthew Von-Maszewski committed Sep 6, 2013
2 parents 4df4947 + 7f17774 commit 66aae23
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 28 deletions.
52 changes: 34 additions & 18 deletions c_src/eleveldb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ ERL_NIF_TERM ATOM_FIRST;
ERL_NIF_TERM ATOM_LAST;
ERL_NIF_TERM ATOM_NEXT;
ERL_NIF_TERM ATOM_PREV;
ERL_NIF_TERM ATOM_PREFETCH;
ERL_NIF_TERM ATOM_INVALID_ITERATOR;
ERL_NIF_TERM ATOM_CACHE_SIZE;
ERL_NIF_TERM ATOM_PARANOID_CHECKS;
Expand Down Expand Up @@ -539,9 +540,7 @@ async_iterator_move(

itr_ptr.assign(ItrObject::RetrieveItrObject(env, itr_handle_ref));

// prefetch logic broke PREV and not fixing for Riak
if(NULL==itr_ptr.get()
|| (enif_is_atom(env, action_or_target) && ATOM_PREV==action_or_target))
if(NULL==itr_ptr.get())
return enif_make_badarg(env);

// Reuse ref from iterator creation
Expand All @@ -558,26 +557,22 @@ async_iterator_move(
if(ATOM_LAST == action_or_target) action = eleveldb::MoveTask::LAST;
if(ATOM_NEXT == action_or_target) action = eleveldb::MoveTask::NEXT;
if(ATOM_PREV == action_or_target) action = eleveldb::MoveTask::PREV;
if(ATOM_PREFETCH == action_or_target) action = eleveldb::MoveTask::PREFETCH;
} // if


//
// Three situations:
// - not a NEXT call
// - NEXT call and no prefetch waiting
// - NEXT call and prefetch is waiting
if (eleveldb::MoveTask::NEXT != action)
{
// is a prefetch potentially in play (cut it loose and its Iterator)
if (itr_ptr->ReleaseReuseMove())
{
leveldb::Iterator * iterator;
// #1 not a PREFETCH next call
// #2 PREFETCH call and no prefetch waiting
// #3 PREFETCH call and prefetch is waiting

// NewIterator is fast, background not needed
iterator = itr_ptr->m_DbPtr->m_Db->NewIterator(*itr_ptr->m_ReadOptions);
itr_ptr->m_Iter.assign(new LevelIteratorWrapper(itr_ptr->m_DbPtr.get(), itr_ptr->m_Snapshot.get(),
iterator, itr_ptr->keys_only));
} // if
// case #1
if (eleveldb::MoveTask::PREFETCH != action)
{
// current move object could still be in later stages of
// worker thread completion ... race condition ...don't reuse
itr_ptr->ReleaseReuseMove();

submit_new_request=true;
ret_term = enif_make_copy(env, itr_ptr->m_Snapshot->itr_ref);
Expand All @@ -586,15 +581,33 @@ async_iterator_move(
itr_ptr->m_Iter->m_HandoffAtomic=1;
} // if

// case #2
// before we launch a background job for "next iteration", see if there is a
// prefetch waiting for us
else if (eleveldb::compare_and_swap(&itr_ptr->m_Iter->m_HandoffAtomic, 0, 1))
{
// nope, no prefetch ... await a message to erlang queue
ret_term = enif_make_copy(env, itr_ptr->m_Snapshot->itr_ref);

submit_new_request=false;
// is this truly a wait for prefetch ... or actually the first prefetch request
if (!itr_ptr->m_Iter->m_PrefetchStarted)
{
submit_new_request=true;
itr_ptr->m_Iter->m_PrefetchStarted=true;
itr_ptr->ReleaseReuseMove();

// first must return via message
itr_ptr->m_Iter->m_HandoffAtomic=1;
} // if

else
{
// await message that is already in the making
submit_new_request=false;
} // else
} // else if

// case #3
else
{
// why yes there is. copy the key/value info into a return tuple before
Expand Down Expand Up @@ -717,6 +730,8 @@ eleveldb_iterator_close(

if (NULL!=itr_ptr)
{
itr_ptr->ReleaseReuseMove();

// set closing flag ... atomic likely unnecessary (but safer)
eleveldb::ErlRefObject::InitiateCloseRequest(itr_ptr);

Expand Down Expand Up @@ -981,6 +996,7 @@ try
ATOM(eleveldb::ATOM_LAST, "last");
ATOM(eleveldb::ATOM_NEXT, "next");
ATOM(eleveldb::ATOM_PREV, "prev");
ATOM(eleveldb::ATOM_PREFETCH, "prefetch");
ATOM(eleveldb::ATOM_INVALID_ITERATOR, "invalid_iterator");
ATOM(eleveldb::ATOM_CACHE_SIZE, "cache_size");
ATOM(eleveldb::ATOM_PARANOID_CHECKS, "paranoid_checks");
Expand Down
3 changes: 2 additions & 1 deletion c_src/refobjects.h
Original file line number Diff line number Diff line change
Expand Up @@ -273,11 +273,12 @@ class LevelIteratorWrapper : public RefObject
leveldb::Iterator * m_Iterator;
volatile uint32_t m_HandoffAtomic; //!< matthew's atomic foreground/background prefetch flag.
bool m_KeysOnly; //!< only return key values
bool m_PrefetchStarted; //!< true after first prefetch command

LevelIteratorWrapper(DbObject * DbPtr, LevelSnapshotWrapper * Snapshot,
leveldb::Iterator * Iterator, bool KeysOnly)
: m_DbPtr(DbPtr), m_Snap(Snapshot), m_Iterator(Iterator),
m_HandoffAtomic(0), m_KeysOnly(KeysOnly)
m_HandoffAtomic(0), m_KeysOnly(KeysOnly), m_PrefetchStarted(false)
{
};

Expand Down
7 changes: 2 additions & 5 deletions c_src/workitems.cc
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ MoveTask::operator()()

case LAST: itr->SeekToLast(); break;

case PREFETCH:
case NEXT: if(itr->Valid()) itr->Next(); break;

case PREV: if(itr->Valid()) itr->Prev(); break;
Expand All @@ -216,7 +217,6 @@ MoveTask::operator()()


// who got back first, us or the erlang loop
// if (eleveldb::detail::compare_and_swap(&m_ItrPtr->m_handoff_atomic, 0, 1))
if (compare_and_swap(&m_ItrWrap->m_HandoffAtomic, 0, 1))
{
// this is prefetch of next iteration. It returned faster than actual
Expand All @@ -230,11 +230,8 @@ MoveTask::operator()()

if(itr->Valid())
{
if (NEXT==action || SEEK==action || FIRST==action)
{
if (PREFETCH==action)
prepare_recycle();
action=NEXT;
} // if

// erlang is waiting, send message
if(m_ItrWrap->m_KeysOnly)
Expand Down
2 changes: 1 addition & 1 deletion c_src/workitems.h
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ class IterTask : public WorkTask
class MoveTask : public WorkTask
{
public:
typedef enum { FIRST, LAST, NEXT, PREV, SEEK } action_t;
typedef enum { FIRST, LAST, NEXT, PREV, SEEK, PREFETCH } action_t;

protected:
ReferencePtr<LevelIteratorWrapper> m_ItrWrap; //!< access to database, and holds reference
Expand Down
6 changes: 3 additions & 3 deletions src/eleveldb.erl
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ init() ->
{delete, Key::binary()} |
clear].

-type iterator_action() :: first | last | next | prev | binary().
-type iterator_action() :: first | last | next | prev | prefetch | binary().

-opaque db_ref() :: binary().

Expand Down Expand Up @@ -308,10 +308,10 @@ fold_loop({error, invalid_iterator}, _Itr, _Fun, Acc0) ->
Acc0;
fold_loop({ok, K}, Itr, Fun, Acc0) ->
Acc = Fun(K, Acc0),
fold_loop(iterator_move(Itr, next), Itr, Fun, Acc);
fold_loop(iterator_move(Itr, prefetch), Itr, Fun, Acc);
fold_loop({ok, K, V}, Itr, Fun, Acc0) ->
Acc = Fun({K, V}, Acc0),
fold_loop(iterator_move(Itr, next), Itr, Fun, Acc).
fold_loop(iterator_move(Itr, prefetch), Itr, Fun, Acc).

validate_type({_Key, bool}, true) -> true;
validate_type({_Key, bool}, false) -> true;
Expand Down
40 changes: 40 additions & 0 deletions test/iterators.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
%% -------------------------------------------------------------------
%%
%% eleveldb: Erlang Wrapper for LevelDB (http://code.google.com/p/leveldb/)
%%
%% Copyright (c) 2010-2013 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(iterators).

-compile(export_all).

-include_lib("eunit/include/eunit.hrl").

prev_test() ->
os:cmd("rm -rf ltest"), % NOTE
{ok, Ref} = eleveldb:open("ltest", [{create_if_missing, true}]),
try
eleveldb:put(Ref, <<"a">>, <<"x">>, []),
eleveldb:put(Ref, <<"b">>, <<"y">>, []),
{ok, I} = eleveldb:iterator(Ref, []),
?assertEqual({ok, <<"a">>, <<"x">>},eleveldb:iterator_move(I, <<>>)),
?assertEqual({ok, <<"b">>, <<"y">>},eleveldb:iterator_move(I, next)),
?assertEqual({ok, <<"a">>, <<"x">>},eleveldb:iterator_move(I, prev))
after
eleveldb:close(Ref)
end.

0 comments on commit 66aae23

Please sign in to comment.