Skip to content
Browse files

Merge pull request #314 from basho/t27-2i-memory-backend

Add 2I to Memory backend
  • Loading branch information...
2 parents cada9c7 + 4e0269d commit 49be5a6a141c604987c9171533b3c054507cec33 @seancribbs seancribbs committed Apr 12, 2012
Showing with 237 additions and 82 deletions.
  1. +234 −78 src/riak_kv_memory_backend.erl
  2. +3 −4 src/riak_kv_vnode.erl
View
312 src/riak_kv_memory_backend.erl
@@ -32,6 +32,7 @@
%% <ul>
%% <li>`ttl' - The time in seconds that an object should live before being expired.</li>
%% <li>`max_memory' - The amount of memory in megabytes to limit the backend to.</li>
+%% <li>`test' - When true, allow public access to ETS tables so they can be cleared efficiently.</li>
%% </ul>
%%
@@ -55,15 +56,28 @@
status/1,
callback/3]).
+%% "Testing" backend API
+-export([reset/0]).
+
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.
-define(API_VERSION, 1).
--define(CAPABILITIES, [async_fold]).
+-define(CAPABILITIES, [async_fold, indexes]).
+
+%% Macros for working with indexes
+-define(DELETE_PTN(B,K), {{B,'_','_',K},'_'}).
+
+%% ETS table name macros so we can break encapsulation for testing
+%% mode
+-define(DNAME(P), list_to_atom("riak_kv_"++integer_to_list(P))).
+-define(INAME(P), list_to_atom("riak_kv_"++integer_to_list(P)++"_i")).
+-define(TNAME(P), list_to_atom("riak_kv_"++integer_to_list(P)++"_t")).
--record(state, {data_ref :: integer() | atom(),
- time_ref :: integer() | atom(),
+-record(state, {data_ref :: ets:tid(),
+ index_ref :: ets:tid(),
+ time_ref :: ets:tid(),
max_memory :: undefined | integer(),
used_memory=0 :: integer(),
ttl :: integer()}).
@@ -98,16 +112,24 @@ capabilities(_, _) ->
start(Partition, Config) ->
TTL = app_helper:get_prop_or_env(ttl, Config, memory_backend),
MemoryMB = app_helper:get_prop_or_env(max_memory, Config, memory_backend),
+ TableOpts = case app_helper:get_prop_or_env(test, Config, memory_backend) of
+ true ->
+ [ordered_set, public, named_table];
+ _ ->
+ [ordered_set]
+ end,
case MemoryMB of
undefined ->
MaxMemory = undefined,
TimeRef = undefined;
_ ->
MaxMemory = MemoryMB * 1024 * 1024,
- TimeRef = ets:new(list_to_atom(integer_to_list(Partition)), [ordered_set])
+ TimeRef = ets:new(?TNAME(Partition), TableOpts)
end,
- DataRef = ets:new(list_to_atom(integer_to_list(Partition)), []),
+ IndexRef = ets:new(?INAME(Partition), TableOpts),
+ DataRef = ets:new(?DNAME(Partition), TableOpts),
{ok, #state{data_ref=DataRef,
+ index_ref=IndexRef,
max_memory=MaxMemory,
time_ref=TimeRef,
ttl=TTL}}.
@@ -132,14 +154,27 @@ stop(#state{data_ref=DataRef,
{ok, not_found, state()} |
{error, term(), state()}.
get(Bucket, Key, State=#state{data_ref=DataRef,
+ index_ref=IndexRef,
+ used_memory=UsedMemory,
+ max_memory=MaxMemory,
ttl=TTL}) ->
case ets:lookup(DataRef, {Bucket, Key}) of
[] -> {error, not_found, State};
- [{{Bucket, Key}, {{ts, Timestamp}, Val}}] ->
+ [{{Bucket, Key}, {{ts, Timestamp}, Val}}=Object] ->
case exceeds_ttl(Timestamp, TTL) of
true ->
- delete(Bucket, Key, undefined, State),
- {error, not_found, State};
+ %% Because we do not have the IndexSpecs, we must
+ %% delete the object directly and all index
+ %% entries blindly using match_delete.
+ ets:delete(DataRef, {Bucket, Key}),
+ ets:match_delete(IndexRef, ?DELETE_PTN(Bucket, Key)),
+ case MaxMemory of
+ undefined ->
+ UsedMemory1 = UsedMemory;
+ _ ->
+ UsedMemory1 = UsedMemory - object_size(Object)
+ end,
+ {error, not_found, State#state{used_memory=UsedMemory1}};
false ->
{ok, Val, State}
end;
@@ -150,55 +185,45 @@ get(Bucket, Key, State=#state{data_ref=DataRef,
end.
%% @doc Insert an object into the memory backend.
-%% NOTE: The memory backend does not currently
-%% support secondary indexing and the _IndexSpecs
-%% parameter is ignored.
-type index_spec() :: {add, Index, SecondaryKey} | {remove, Index, SecondaryKey}.
-spec put(riak_object:bucket(), riak_object:key(), [index_spec()], binary(), state()) ->
- {ok, state()} |
- {error, term(), state()}.
-put(Bucket, PrimaryKey, _IndexSpecs, Val, State=#state{data_ref=DataRef,
- max_memory=MaxMemory,
- time_ref=TimeRef,
- ttl=TTL,
- used_memory=UsedMemory}) ->
+ {ok, state()}.
+put(Bucket, PrimaryKey, IndexSpecs, Val, State=#state{data_ref=DataRef,
+ index_ref=IndexRef,
+ max_memory=MaxMemory,
+ time_ref=TimeRef,
+ ttl=TTL,
+ used_memory=UsedMemory}) ->
Now = now(),
case TTL of
undefined ->
Val1 = Val;
_ ->
Val1 = {{ts, Now}, Val}
end,
- case do_put(Bucket, PrimaryKey, Val1, DataRef) of
- {ok, Size} ->
- %% If the memory is capped update timestamp table
- %% and check if the memory usage is over the cap.
- case MaxMemory of
- undefined ->
- UsedMemory1 = UsedMemory;
- _ ->
- time_entry(Bucket, PrimaryKey, Now, TimeRef),
- Freed = trim_data_table(MaxMemory,
- UsedMemory + Size,
- DataRef,
- TimeRef,
- 0),
- UsedMemory1 = UsedMemory + Size - Freed
- end,
- {ok, State#state{used_memory=UsedMemory1}};
- {error, Reason} ->
- {error, Reason, State}
- end.
+ {ok, Size} = do_put(Bucket, PrimaryKey, Val1, IndexSpecs, DataRef, IndexRef),
+ case MaxMemory of
+ undefined ->
+ UsedMemory1 = UsedMemory;
+ _ ->
+ time_entry(Bucket, PrimaryKey, Now, TimeRef),
+ Freed = trim_data_table(MaxMemory,
+ UsedMemory + Size,
+ DataRef,
+ TimeRef,
+ IndexRef,
+ 0),
+ UsedMemory1 = UsedMemory + Size - Freed
+ end,
+ {ok, State#state{used_memory=UsedMemory1}}.
%% @doc Delete an object from the memory backend
-%% NOTE: The memory backend does not currently
-%% support secondary indexing and the _IndexSpecs
-%% parameter is ignored.
-spec delete(riak_object:bucket(), riak_object:key(), [index_spec()], state()) ->
{ok, state()}.
-delete(Bucket, Key, _IndexSpecs, State=#state{data_ref=DataRef,
- time_ref=TimeRef,
- used_memory=UsedMemory}) ->
+delete(Bucket, Key, IndexSpecs, State=#state{data_ref=DataRef,
+ index_ref=IndexRef,
+ time_ref=TimeRef,
+ used_memory=UsedMemory}) ->
case TimeRef of
undefined ->
UsedMemory1 = UsedMemory;
@@ -215,6 +240,7 @@ delete(Bucket, Key, _IndexSpecs, State=#state{data_ref=DataRef,
UsedMemory1 = UsedMemory
end
end,
+ update_indexes(Bucket, Key, IndexSpecs, IndexRef),
ets:delete(DataRef, {Bucket, Key}),
{ok, State#state{used_memory=UsedMemory1}}.
@@ -243,15 +269,33 @@ fold_buckets(FoldBucketsFun, Acc, Opts, #state{data_ref=DataRef}) ->
any(),
[{atom(), term()}],
state()) -> {ok, term()} | {async, fun()}.
-fold_keys(FoldKeysFun, Acc, Opts, #state{data_ref=DataRef}) ->
- Bucket = proplists:get_value(bucket, Opts),
- FoldFun = fold_keys_fun(FoldKeysFun, Bucket),
+fold_keys(FoldKeysFun, Acc, Opts, #state{data_ref=DataRef,
+ index_ref=IndexRef}) ->
+
+ %% Figure out how we should limit the fold: by bucket, by
+ %% secondary index, or neither (fold across everything.)
+ Bucket = lists:keyfind(bucket, 1, Opts),
+ Index = lists:keyfind(index, 1, Opts),
+
+ %% Multiple limiters may exist. Take the most specific limiter,
+ %% get an appropriate folder function.
+ Folder = if
+ Index /= false ->
+ FoldFun = fold_keys_fun(FoldKeysFun, Index),
+ get_index_folder(FoldFun, Acc, Index, DataRef, IndexRef);
+ Bucket /= false ->
+ FoldFun = fold_keys_fun(FoldKeysFun, Bucket),
+ get_folder(FoldFun, Acc, DataRef);
+ true ->
+ FoldFun = fold_keys_fun(FoldKeysFun, undefined),
+ get_folder(FoldFun, Acc, DataRef)
+ end,
+
case lists:member(async_fold, Opts) of
true ->
- {async, get_folder(FoldFun, Acc, DataRef)};
+ {async, Folder};
false ->
- Acc0 = ets:foldl(FoldFun, Acc, DataRef),
- {ok, Acc0}
+ {ok, Folder()}
end.
%% @doc Fold over all the objects for one or all buckets.
@@ -273,8 +317,10 @@ fold_objects(FoldObjectsFun, Acc, Opts, #state{data_ref=DataRef}) ->
%% @doc Delete all objects from this memory backend
-spec drop(state()) -> {ok, state()}.
drop(State=#state{data_ref=DataRef,
+ index_ref=IndexRef,
time_ref=TimeRef}) ->
ets:delete_all_objects(DataRef),
+ ets:delete_all_objects(IndexRef),
case TimeRef of
undefined ->
ok;
@@ -292,14 +338,18 @@ is_empty(#state{data_ref=DataRef}) ->
%% @doc Get the status information for this memory backend
-spec status(state()) -> [{atom(), term()}].
status(#state{data_ref=DataRef,
+ index_ref=IndexRef,
time_ref=TimeRef}) ->
DataStatus = ets:info(DataRef),
+ IndexStatus = ets:info(IndexRef),
case TimeRef of
undefined ->
- [{data_table_status, DataStatus}];
+ [{data_table_status, DataStatus},
+ {index_table_status, IndexStatus}];
_ ->
TimeStatus = ets:info(TimeRef),
[{data_table_status, DataStatus},
+ {index_table_status, IndexStatus},
{time_table_status, TimeStatus}]
end.
@@ -308,6 +358,24 @@ status(#state{data_ref=DataRef,
callback(_Ref, _Msg, State) ->
{ok, State}.
+%% @doc Resets state of all running memory backends on the local
+%% node. The `riak_kv' environment variable `memory_backend' must
+%% contain the `test' property, set to `true' for this to work.
+-spec reset() -> ok | {error, reset_disabled}.
+reset() ->
+ reset(app_helper:get_env(memory_backend, test, app_helper:get_env(riak_kv, test)), app_helper:get_env(riak_kv, storage_backend)).
+
+reset(true, ?MODULE) ->
+ {ok, Ring} = riak_core_ring_manager:get_my_ring(),
+ [ begin
+ catch ets:delete_all_objects(?DNAME(I)),
+ catch ets:delete_all_objects(?INAME(I)),
+ catch ets:delete_all_objects(?TNAME(I))
+ end || I <- riak_core_ring:my_indices(Ring) ],
+ ok;
+reset(_, _) ->
+ {error, reset_disabled}.
+
%% ===================================================================
%% Internal functions
%% ===================================================================
@@ -333,32 +401,43 @@ fold_buckets_fun(FoldBucketsFun) ->
%% Return a function to fold over keys on this backend
fold_keys_fun(FoldKeysFun, undefined) ->
fun({{Bucket, Key}, _}, Acc) ->
- FoldKeysFun(Bucket, Key, Acc)
+ FoldKeysFun(Bucket, Key, Acc);
+ (_, Acc) ->
+ Acc
end;
-fold_keys_fun(FoldKeysFun, Bucket) ->
- fun({{B, Key}, _}, Acc) ->
- case B =:= Bucket of
- true ->
- FoldKeysFun(Bucket, Key, Acc);
- false ->
- Acc
- end
+fold_keys_fun(FoldKeysFun, {bucket, FilterBucket}) ->
+ fun({{Bucket, Key}, _}, Acc) when Bucket == FilterBucket ->
+ FoldKeysFun(Bucket, Key, Acc);
+ (_, Acc) ->
+ Acc
+ end;
+fold_keys_fun(FoldKeysFun, {index, FilterBucket, {eq, <<"$bucket">>, _}}) ->
+ %% 2I exact match query on special $bucket field...
+ fold_keys_fun(FoldKeysFun, {bucket, FilterBucket});
+fold_keys_fun(FoldKeysFun, {index, FilterBucket, {range, <<"$key">>, _, _}}) ->
+ %% 2I range query on special $key field...
+ fold_keys_fun(FoldKeysFun, {bucket, FilterBucket});
+fold_keys_fun(FoldKeysFun, {index, _FilterBucket, _Query}) ->
+ fun({{Bucket, _FilterField, _FilterTerm, Key}, _}, Acc) ->
+ FoldKeysFun(Bucket, Key, Acc);
+ (_, Acc) ->
+ Acc
end.
+
%% @private
%% Return a function to fold over keys on this backend
fold_objects_fun(FoldObjectsFun, undefined) ->
fun({{Bucket, Key}, Value}, Acc) ->
- FoldObjectsFun(Bucket, Key, Value, Acc)
+ FoldObjectsFun(Bucket, Key, Value, Acc);
+ (_, Acc) ->
+ Acc
end;
-fold_objects_fun(FoldObjectsFun, Bucket) ->
- fun({{B, Key}, Value}, Acc) ->
- case B =:= Bucket of
- true ->
- FoldObjectsFun(Bucket, Key, Value, Acc);
- false ->
- Acc
- end
+fold_objects_fun(FoldObjectsFun, FilterBucket) ->
+ fun({{Bucket, Key}, Value}, Acc) when Bucket == FilterBucket->
+ FoldObjectsFun(Bucket, Key, Value, Acc);
+ (_, Acc) ->
+ Acc
end.
%% @private
@@ -368,37 +447,112 @@ get_folder(FoldFun, Acc, DataRef) ->
end.
%% @private
-do_put(Bucket, Key, Val, Ref) ->
+get_index_folder(Folder, Acc0, {index, Bucket, {eq, <<"$bucket">>, _}}, DataRef, _) ->
+ %% For the special $bucket index, turn it into a fold over the
+ %% data table.
+ fun() ->
+ key_range_folder(Folder, Acc0, DataRef, {Bucket, <<>>}, Bucket)
+ end;
+get_index_folder(Folder, Acc0, {index, Bucket, {range, <<"$key">>, Min, Max}}, DataRef, _) ->
+ %% For the special range lookup on the $key index, turn it into a
+ %% fold on the data table
+ fun() ->
+ key_range_folder(Folder, Acc0, DataRef, {Bucket, Min}, {Bucket, Min, Max})
+ end;
+get_index_folder(Folder, Acc0, {index, Bucket, {eq, Field, Term}}, _, IndexRef) ->
+ fun() ->
+ index_range_folder(Folder, Acc0, IndexRef, {Bucket, Field, Term, undefined}, {Bucket, Field, Term, Term})
+ end;
+get_index_folder(Folder, Acc0, {index, Bucket, {range, Field, Min, Max}}, _, IndexRef) ->
+ fun() ->
+ index_range_folder(Folder, Acc0, IndexRef, {Bucket, Field, Min, undefined}, {Bucket, Field, Min, Max})
+ end.
+
+
+%% Iterates over a range of keys, for the special $key and $bucket
+%% indexes.
+%% @private
+-spec key_range_folder(function(), term(), ets:tid(), {riak_object:bucket(), riak_object:key()}, binary() | {riak_object:bucket(), term(), term()}) -> term().
+key_range_folder(Folder, Acc0, DataRef, {B,_}=DataKey, B) ->
+ case ets:lookup(DataRef, DataKey) of
+ [] ->
+ key_range_folder(Folder, Acc0, DataRef, ets:next(DataRef, DataKey), B);
+ [Object] ->
+ Acc = Folder(Object, Acc0),
+ key_range_folder(Folder, Acc, DataRef, ets:next(DataRef, DataKey), B)
+ end;
+key_range_folder(Folder, Acc0, DataRef, {B,K}=DataKey, {B, Min, Max}=Query) when K >= Min, K =< Max ->
+ case ets:lookup(DataRef, DataKey) of
+ [] ->
+ key_range_folder(Folder, Acc0, DataRef, ets:next(DataRef, DataKey), Query);
+ [Object] ->
+ Acc = Folder(Object, Acc0),
+ key_range_folder(Folder, Acc, DataRef, ets:next(DataRef, DataKey), Query)
+ end;
+key_range_folder(_Folder, Acc, _DataRef, _DataKey, _Query) ->
+ Acc.
+
+%% Iterates over a range of index postings
+index_range_folder(Folder, Acc0, IndexRef, {B, I, V, _K}=IndexKey, {B, I, Min, Max}=Query) when V >= Min, V =< Max ->
+ case ets:lookup(IndexRef, IndexKey) of
+ [] ->
+ %% This will happen on the first iteration, where the key
+ %% does not exist. In all other cases, ETS will give us a
+ %% real key from next/2.
+ index_range_folder(Folder, Acc0, IndexRef, ets:next(IndexRef, IndexKey), Query);
+ [Posting] ->
+ Acc = Folder(Posting, Acc0),
+ index_range_folder(Folder, Acc, IndexRef, ets:next(IndexRef, IndexKey), Query)
+ end;
+index_range_folder(_Folder, Acc, _IndexRef, _IndexKey, _Query) ->
+ Acc.
+
+
+%% @private
+do_put(Bucket, Key, Val, IndexSpecs, DataRef, IndexRef) ->
Object = {{Bucket, Key}, Val},
- true = ets:insert(Ref, Object),
+ true = ets:insert(DataRef, Object),
+ update_indexes(Bucket, Key, IndexSpecs, IndexRef),
{ok, object_size(Object)}.
%% Check if this timestamp is past the ttl setting.
exceeds_ttl(Timestamp, TTL) ->
Diff = (timer:now_diff(now(), Timestamp) / 1000 / 1000),
Diff > TTL.
+update_indexes(_Bucket, _Key, undefined, _IndexRef) ->
+ ok;
+update_indexes(_Bucket, _Key, [], _IndexRef) ->
+ ok;
+update_indexes(Bucket, Key, [{remove, Field, Value}|Rest], IndexRef) ->
+ true = ets:delete(IndexRef, {Bucket, Field, Value, Key}),
+ update_indexes(Bucket, Key, Rest, IndexRef);
+update_indexes(Bucket, Key, [{add, Field, Value}|Rest], IndexRef) ->
+ true = ets:insert(IndexRef, {{Bucket, Field, Value, Key}, <<>>}),
+ update_indexes(Bucket, Key, Rest, IndexRef).
+
%% @private
time_entry(Bucket, Key, Now, TimeRef) ->
ets:insert(TimeRef, {Now, {Bucket, Key}}).
%% @private
%% @doc Dump some entries if the max memory size has
%% been breached.
-trim_data_table(MaxMemory, UsedMemory, _, _, Freed) when
+trim_data_table(MaxMemory, UsedMemory, _, _, _, Freed) when
(UsedMemory - Freed) =< MaxMemory ->
Freed;
-trim_data_table(MaxMemory, UsedMemory, DataRef, TimeRef, Freed) ->
+trim_data_table(MaxMemory, UsedMemory, DataRef, TimeRef, IndexRef, Freed) ->
%% Delete the oldest object
- OldestSize = delete_oldest(DataRef, TimeRef),
+ OldestSize = delete_oldest(DataRef, TimeRef, IndexRef),
trim_data_table(MaxMemory,
UsedMemory,
DataRef,
TimeRef,
+ IndexRef,
Freed + OldestSize).
%% @private
-delete_oldest(DataRef, TimeRef) ->
+delete_oldest(DataRef, TimeRef, IndexRef) ->
OldestTime = ets:first(TimeRef),
case OldestTime of
'$end_of_table' ->
@@ -408,8 +562,10 @@ delete_oldest(DataRef, TimeRef) ->
ets:delete(TimeRef, OldestTime),
case ets:lookup(DataRef, OldestKey) of
[] ->
- delete_oldest(DataRef, TimeRef);
+ delete_oldest(DataRef, TimeRef, IndexRef);
[Object] ->
+ {Bucket, Key} = OldestKey,
+ ets:match_delete(IndexRef, ?DELETE_PTN(Bucket, Key)),
ets:delete(DataRef, OldestKey),
object_size(Object)
end
@@ -493,8 +649,8 @@ eqc_test_() ->
[
{timeout, 60000,
[?_assertEqual(true,
- backend_eqc:test(?MODULE, true))]}
- ]}]}]}.
+ backend_eqc:test(?MODULE, true))]}
+ ]}]}]}.
setup() ->
application:load(sasl),
View
7 src/riak_kv_vnode.erl
@@ -210,8 +210,7 @@ init([Index]) ->
{ok, VId} = get_vnodeid(Index),
DeleteMode = app_helper:get_env(riak_kv, delete_mode, 3000),
AsyncFolding = app_helper:get_env(riak_kv, async_folds, true) == true,
- case catch Mod:start(Index, [{async_folds, AsyncFolding},
- Configuration]) of
+ case catch Mod:start(Index, [{async_folds, AsyncFolding}|Configuration]) of
{ok, ModState} ->
%% Get the backend capabilities
State = #state{idx=Index,
@@ -616,7 +615,7 @@ do_backend_delete(BKey, RObj, State = #state{mod = Mod, modstate = ModState}) ->
%% object is a tombstone or all siblings are tombstones
riak_kv_mapred_cache:eject(BKey),
- %% Calculate the index specs to remove...
+ %% Calculate the index specs to remove...
%% JDM: This should just be a tombstone by this point, but better
%% safe than sorry.
IndexSpecs = riak_object:diff_index_specs(undefined, RObj),
@@ -969,7 +968,7 @@ do_delete(BKey, ReqId, State) ->
UpdState = do_backend_delete(BKey, RObj, State),
{reply, {del, Idx, ReqId}, UpdState};
Delay when is_integer(Delay) ->
- erlang:send_after(Delay, self(),
+ erlang:send_after(Delay, self(),
{final_delete, BKey,
delete_hash(RObj)}),
%% Nothing checks these messages - will just reply

0 comments on commit 49be5a6

Please sign in to comment.
Something went wrong with that request. Please try again.