Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Share per-table cursors for get/put/delete operations.

  • Loading branch information...
commit e4b3acbd11cb1e199b9c36860683d8fd1b35019a 1 parent f94336f
@gburd gburd authored
Showing with 27 additions and 6 deletions.
  1. +27 −6 src/riak_kv_wterl_backend.erl
View
33 src/riak_kv_wterl_backend.erl
@@ -53,6 +53,7 @@
-record(state, {conn :: wterl:connection(),
table :: string(),
session :: wterl:session(),
+ cursors :: ets:tid(),
partition :: integer()}).
-type state() :: #state{}.
@@ -145,18 +146,24 @@ start(Partition, Config) ->
%% @doc Stop the wterl backend
-spec stop(state()) -> ok.
-stop(#state{conn=ConnRef, session=SRef}) ->
+stop(#state{conn=ConnRef, session=SRef, cursors=undefined}) ->
ok = wterl:session_close(SRef),
- wterl_conn:close(ConnRef).
+ wterl_conn:close(ConnRef);
+stop(#state{cursors=Cursors}=State) ->
+ ets:foldl(fun({_Table, Cursor}, _) ->
+ ok = wterl:cursor_close(Cursor)
+ end, true, Cursors),
+ ets:delete(Cursors),
+ stop(State#state{cursors=undefined}).
%% @doc Retrieve an object from the wterl backend
-spec get(riak_object:bucket(), riak_object:key(), state()) ->
{ok, any(), state()} |
{ok, not_found, state()} |
{error, term(), state()}.
-get(Bucket, Key, #state{table=Table, session=SRef}=State) ->
+get(Bucket, Key, #state{session=SRef, table=Table}=State) ->
WTKey = to_object_key(Bucket, Key),
- case wterl:session_get(SRef, Table, WTKey) of
+ case wterl:cursor_search(shared_cursor(SRef, Table, State), WTKey) of
{ok, Value} ->
{ok, Value, State};
not_found ->
@@ -175,7 +182,7 @@ get(Bucket, Key, #state{table=Table, session=SRef}=State) ->
{error, term(), state()}.
put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{table=Table, session=SRef}=State) ->
WTKey = to_object_key(Bucket, PrimaryKey),
- case wterl:session_put(SRef, Table, WTKey, Val) of
+ case wterl:cursor_insert(shared_cursor(SRef, Table, State), WTKey, Val) of
ok ->
{ok, State};
{error, Reason} ->
@@ -191,7 +198,7 @@ put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{table=Table, session=SRef}=Stat
{error, term(), state()}.
delete(Bucket, Key, _IndexSpecs, #state{table=Table, session=SRef}=State) ->
WTKey = to_object_key(Bucket, Key),
- case wterl:session_delete(SRef, Table, WTKey) of
+ case wterl:cursor_remove(shared_cursor(SRef, Table, State), WTKey) of
ok ->
{ok, State};
{error, Reason} ->
@@ -340,6 +347,20 @@ callback(_Ref, _Msg, State) ->
%% Internal functions
%% ===================================================================
+shared_cursor(SRef, Table, #state{cursors=undefined}=State) ->
+ Cursors = ets:new(?MODULE, []),
+ shared_cursor(SRef, Table, State#state{cursors=Cursors});
+shared_cursor(SRef, Table, #state{cursors=Cursors}=State) ->
+ case ets:lookup(Cursors, Table) of
+ [{Table, Cursor}] ->
+ {Cursor, State};
+ _ ->
+ Cursor = wterl:cursor_open(SRef, Table),
+ ets:insert(Cursors, {Table, Cursor}),
+ {Cursor, State}
+ end.
+
+
%% @private
%% Return a function to fold over the buckets on this backend
fold_buckets_fun(FoldBucketsFun) ->
Please sign in to comment.
Something went wrong with that request. Please try again.