diff --git a/c_src/eleveldb.cc b/c_src/eleveldb.cc index 89013bdd..3e3a2b9b 100644 --- a/c_src/eleveldb.cc +++ b/c_src/eleveldb.cc @@ -109,6 +109,7 @@ ERL_NIF_TERM ATOM_FIRST; ERL_NIF_TERM ATOM_LAST; ERL_NIF_TERM ATOM_NEXT; ERL_NIF_TERM ATOM_PREV; +ERL_NIF_TERM ATOM_PREFETCH; ERL_NIF_TERM ATOM_INVALID_ITERATOR; ERL_NIF_TERM ATOM_CACHE_SIZE; ERL_NIF_TERM ATOM_PARANOID_CHECKS; @@ -539,9 +540,7 @@ async_iterator_move( itr_ptr.assign(ItrObject::RetrieveItrObject(env, itr_handle_ref)); - // prefetch logic broke PREV and not fixing for Riak - if(NULL==itr_ptr.get() - || (enif_is_atom(env, action_or_target) && ATOM_PREV==action_or_target)) + if(NULL==itr_ptr.get()) return enif_make_badarg(env); // Reuse ref from iterator creation @@ -558,26 +557,22 @@ async_iterator_move( if(ATOM_LAST == action_or_target) action = eleveldb::MoveTask::LAST; if(ATOM_NEXT == action_or_target) action = eleveldb::MoveTask::NEXT; if(ATOM_PREV == action_or_target) action = eleveldb::MoveTask::PREV; + if(ATOM_PREFETCH == action_or_target) action = eleveldb::MoveTask::PREFETCH; } // if // // Three situations: - // - not a NEXT call - // - NEXT call and no prefetch waiting - // - NEXT call and prefetch is waiting - if (eleveldb::MoveTask::NEXT != action) - { - // is a prefetch potentially in play (cut it loose and its Iterator) - if (itr_ptr->ReleaseReuseMove()) - { - leveldb::Iterator * iterator; + // #1 not a PREFETCH next call + // #2 PREFETCH call and no prefetch waiting + // #3 PREFETCH call and prefetch is waiting - // NewIterator is fast, background not needed - iterator = itr_ptr->m_DbPtr->m_Db->NewIterator(*itr_ptr->m_ReadOptions); - itr_ptr->m_Iter.assign(new LevelIteratorWrapper(itr_ptr->m_DbPtr.get(), itr_ptr->m_Snapshot.get(), - iterator, itr_ptr->keys_only)); - } // if + // case #1 + if (eleveldb::MoveTask::PREFETCH != action) + { + // current move object could still be in later stages of + // worker thread completion ... race condition ...don't reuse + itr_ptr->ReleaseReuseMove(); submit_new_request=true; ret_term = enif_make_copy(env, itr_ptr->m_Snapshot->itr_ref); @@ -586,6 +581,7 @@ async_iterator_move( itr_ptr->m_Iter->m_HandoffAtomic=1; } // if + // case #2 // before we launch a background job for "next iteration", see if there is a // prefetch waiting for us else if (eleveldb::compare_and_swap(&itr_ptr->m_Iter->m_HandoffAtomic, 0, 1)) @@ -593,8 +589,25 @@ async_iterator_move( // nope, no prefetch ... await a message to erlang queue ret_term = enif_make_copy(env, itr_ptr->m_Snapshot->itr_ref); - submit_new_request=false; + // is this truly a wait for prefetch ... or actually the first prefetch request + if (!itr_ptr->m_Iter->m_PrefetchStarted) + { + submit_new_request=true; + itr_ptr->m_Iter->m_PrefetchStarted=true; + itr_ptr->ReleaseReuseMove(); + + // first must return via message + itr_ptr->m_Iter->m_HandoffAtomic=1; + } // if + + else + { + // await message that is already in the making + submit_new_request=false; + } // else } // else if + + // case #3 else { // why yes there is. copy the key/value info into a return tuple before @@ -717,6 +730,8 @@ eleveldb_iterator_close( if (NULL!=itr_ptr) { + itr_ptr->ReleaseReuseMove(); + // set closing flag ... atomic likely unnecessary (but safer) eleveldb::ErlRefObject::InitiateCloseRequest(itr_ptr); @@ -981,6 +996,7 @@ try ATOM(eleveldb::ATOM_LAST, "last"); ATOM(eleveldb::ATOM_NEXT, "next"); ATOM(eleveldb::ATOM_PREV, "prev"); + ATOM(eleveldb::ATOM_PREFETCH, "prefetch"); ATOM(eleveldb::ATOM_INVALID_ITERATOR, "invalid_iterator"); ATOM(eleveldb::ATOM_CACHE_SIZE, "cache_size"); ATOM(eleveldb::ATOM_PARANOID_CHECKS, "paranoid_checks"); diff --git a/c_src/refobjects.h b/c_src/refobjects.h index 2309afd2..ec2112d5 100644 --- a/c_src/refobjects.h +++ b/c_src/refobjects.h @@ -273,11 +273,12 @@ class LevelIteratorWrapper : public RefObject leveldb::Iterator * m_Iterator; volatile uint32_t m_HandoffAtomic; //!< matthew's atomic foreground/background prefetch flag. bool m_KeysOnly; //!< only return key values + bool m_PrefetchStarted; //!< true after first prefetch command LevelIteratorWrapper(DbObject * DbPtr, LevelSnapshotWrapper * Snapshot, leveldb::Iterator * Iterator, bool KeysOnly) : m_DbPtr(DbPtr), m_Snap(Snapshot), m_Iterator(Iterator), - m_HandoffAtomic(0), m_KeysOnly(KeysOnly) + m_HandoffAtomic(0), m_KeysOnly(KeysOnly), m_PrefetchStarted(false) { }; diff --git a/c_src/workitems.cc b/c_src/workitems.cc index 79cf7ff4..03f5373a 100644 --- a/c_src/workitems.cc +++ b/c_src/workitems.cc @@ -193,6 +193,7 @@ MoveTask::operator()() case LAST: itr->SeekToLast(); break; + case PREFETCH: case NEXT: if(itr->Valid()) itr->Next(); break; case PREV: if(itr->Valid()) itr->Prev(); break; @@ -216,7 +217,6 @@ MoveTask::operator()() // who got back first, us or the erlang loop -// if (eleveldb::detail::compare_and_swap(&m_ItrPtr->m_handoff_atomic, 0, 1)) if (compare_and_swap(&m_ItrWrap->m_HandoffAtomic, 0, 1)) { // this is prefetch of next iteration. It returned faster than actual @@ -230,11 +230,8 @@ MoveTask::operator()() if(itr->Valid()) { - if (NEXT==action || SEEK==action || FIRST==action) - { + if (PREFETCH==action) prepare_recycle(); - action=NEXT; - } // if // erlang is waiting, send message if(m_ItrWrap->m_KeysOnly) diff --git a/c_src/workitems.h b/c_src/workitems.h index 807a4fc2..cd837152 100644 --- a/c_src/workitems.h +++ b/c_src/workitems.h @@ -308,7 +308,7 @@ class IterTask : public WorkTask class MoveTask : public WorkTask { public: - typedef enum { FIRST, LAST, NEXT, PREV, SEEK } action_t; + typedef enum { FIRST, LAST, NEXT, PREV, SEEK, PREFETCH } action_t; protected: ReferencePtr m_ItrWrap; //!< access to database, and holds reference diff --git a/src/eleveldb.erl b/src/eleveldb.erl index 60bb2f88..5e416529 100644 --- a/src/eleveldb.erl +++ b/src/eleveldb.erl @@ -106,7 +106,7 @@ init() -> {delete, Key::binary()} | clear]. --type iterator_action() :: first | last | next | prev | binary(). +-type iterator_action() :: first | last | next | prev | prefetch | binary(). -opaque db_ref() :: binary(). @@ -308,10 +308,10 @@ fold_loop({error, invalid_iterator}, _Itr, _Fun, Acc0) -> Acc0; fold_loop({ok, K}, Itr, Fun, Acc0) -> Acc = Fun(K, Acc0), - fold_loop(iterator_move(Itr, next), Itr, Fun, Acc); + fold_loop(iterator_move(Itr, prefetch), Itr, Fun, Acc); fold_loop({ok, K, V}, Itr, Fun, Acc0) -> Acc = Fun({K, V}, Acc0), - fold_loop(iterator_move(Itr, next), Itr, Fun, Acc). + fold_loop(iterator_move(Itr, prefetch), Itr, Fun, Acc). validate_type({_Key, bool}, true) -> true; validate_type({_Key, bool}, false) -> true; diff --git a/test/iterators.erl b/test/iterators.erl new file mode 100644 index 00000000..d3ca6c9a --- /dev/null +++ b/test/iterators.erl @@ -0,0 +1,40 @@ +%% ------------------------------------------------------------------- +%% +%% eleveldb: Erlang Wrapper for LevelDB (http://code.google.com/p/leveldb/) +%% +%% Copyright (c) 2010-2013 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- +-module(iterators). + +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). + +prev_test() -> + os:cmd("rm -rf ltest"), % NOTE + {ok, Ref} = eleveldb:open("ltest", [{create_if_missing, true}]), + try + eleveldb:put(Ref, <<"a">>, <<"x">>, []), + eleveldb:put(Ref, <<"b">>, <<"y">>, []), + {ok, I} = eleveldb:iterator(Ref, []), + ?assertEqual({ok, <<"a">>, <<"x">>},eleveldb:iterator_move(I, <<>>)), + ?assertEqual({ok, <<"b">>, <<"y">>},eleveldb:iterator_move(I, next)), + ?assertEqual({ok, <<"a">>, <<"x">>},eleveldb:iterator_move(I, prev)) + after + eleveldb:close(Ref) + end.