Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Add 2I support to TestServer backend #35

Merged
merged 8 commits into from

3 participants

@seancribbs
Owner

You know you want this. :smile:

Most of the Erlang code is identical with basho/riak_kv#314, which is near finished code review. Also fixes a few things with node configuration and generation that were discovered while debugging, and adds integration specs for the #get_index backend API.

seancribbs added some commits
@seancribbs seancribbs WIP Add 2I to test backend. [ci skip] 7ee184a
@seancribbs seancribbs Merge branch 'master' into 2i-test-backend 799d00b
@seancribbs seancribbs Apply dialyzer fixes from riak_kv version. 334c8d8
@seancribbs seancribbs Re-enable the lager console log. 4500c88
@seancribbs seancribbs Remove SIGWINCH handling since this only comes from a real TTY. 459392b
@seancribbs seancribbs Fix 2I support for test backend.
* Resolved catch bug in riak_search_test_backend.
* Set the `test` properties on both riak_kv and memory_backend.
* Workaround a bug in Riak 1.0-1.1 where Config is not a correct
  proplist.
* Use a regular fold for list-keys, the key_range_folder doesn't seem
  to work.
* Simplify the reset/0,2 function.
3add2ad
@seancribbs seancribbs Add unified backend spec for get_index. 90c2915
@seancribbs
Owner

@jcoyne This PR is required for me to merge seancribbs/ripple#276. Care to try it out?

@randysecrist randysecrist commented on the diff
erl_src/riak_kv_test_backend.erl
@@ -324,6 +366,24 @@ status(#state{data_ref=DataRef,
callback(_Ref, _Msg, State) ->
{ok, State}.
+%% @doc Resets state of all running memory backends on the local
+%% node. The `riak_kv' environment variable `memory_backend' must
+%% contain the `test' property, set to `true' for this to work.
+-spec reset() -> ok | {error, reset_disabled}.
+reset() ->
+ reset(app_helper:get_env(memory_backend, test, app_helper:get_env(riak_kv, test)), app_helper:get_env(riak_kv, storage_backend)).
+
+reset(true, ?MODULE) ->
+ {ok, Ring} = riak_core_ring_manager:get_my_ring(),
+ [ begin
+ catch ets:delete_all_objects(?DNAME(I)),
+ catch ets:delete_all_objects(?INAME(I)),
+ catch ets:delete_all_objects(?TNAME(I))
+ end || I <- riak_core_ring:my_indices(Ring) ],
@randysecrist Owner

Ok, so the actual cleanup occurs if using the memory backend & the test property is true.

Does the test server need to do anything to the vnodes to make them aware of the backend clear? See the reference to this more invasive reset method that recently had to be updated for riak 1.1 ... https://github.com/randysecrist/ripple-contrib/blob/master/test/support/ripple_test_server.rb#L119

@seancribbs Owner

Good point. I actually have tried the "vnode kill" method with the memory backend, but somehow it didn't reliably clear the data. Note that ets:delete_all_objects/1 is guaranteed to be atomic and isolated; so assuming the test is doing nothing at the moment and no requests are in-flight (a reasonable but not fool-proof on a testing setup), the ets:delete_all_objects/1 should leave nothing behind.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@saarons

Are range queries working? I'm getting a few problems with them, namely I keep getting an empty array as a result. It could be something with my code, but I just want to check first.

@seancribbs
Owner

@saarons I am experiencing a few as well. It is probably time to break out EQC on it.

@seancribbs
Owner

@saarons Do you have sample failing test? I'm writing some tests to supplement the existing ones in Erlang but if there's a simple failing case it would save a lot of time.

@saarons

@seancribbs The problem I was having before was actually on my end. I didn't account for DateTime to be indexed with: (utc.to_f * 1000).round. Other than that, I'm not getting any errors and the indexes have been working great so far.

@seancribbs
Owner

@saarons Excellent. I'm almost finished with the due-diligence on the Erlang side so we can merge this soon.

By the way, you can choose what type of index is used; for Time-y objects it will use ISO8601 format if you tell it to be a String index. (in Ripple, that is)

@seancribbs
Owner

Verified with QuickCheck. Merging!

@seancribbs seancribbs merged commit 3e680c2 into from
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Apr 11, 2012
  1. @seancribbs
Commits on Apr 12, 2012
  1. @seancribbs
  2. @seancribbs
  3. @seancribbs
  4. @seancribbs
  5. @seancribbs

    Fix 2I support for test backend.

    seancribbs authored
    * Resolved catch bug in riak_search_test_backend.
    * Set the `test` properties on both riak_kv and memory_backend.
    * Workaround a bug in Riak 1.0-1.1 where Config is not a correct
      proplist.
    * Use a regular fold for list-keys, the key_range_folder doesn't seem
      to work.
    * Simplify the reset/0,2 function.
  6. @seancribbs
  7. @seancribbs
This page is out of date. Refresh to see the latest.
View
BIN  erl_src/riak_kv_test_backend.beam
Binary file not shown
View
375 erl_src/riak_kv_test_backend.erl
@@ -1,6 +1,6 @@
%% -------------------------------------------------------------------
%%
-%% riak_memory_backend: storage engine using ETS tables
+%% riak_kv_test_backend: storage engine using ETS tables, for use in testing.
%%
%% Copyright (c) 2007-2011 Basho Technologies, Inc. All Rights Reserved.
%%
@@ -32,6 +32,7 @@
%% <ul>
%% <li>`ttl' - The time in seconds that an object should live before being expired.</li>
%% <li>`max_memory' - The amount of memory in megabytes to limit the backend to.</li>
+%% <li>`test' - When `true', exposes the internal ETS tables so that they can be efficiently cleared using {@link reset/3}.</li>
%% </ul>
%%
@@ -40,6 +41,8 @@
%% KV Backend API
-export([api_version/0,
+ capabilities/1,
+ capabilities/2,
start/2,
stop/1,
get/3,
@@ -51,20 +54,30 @@
fold_objects/4,
is_empty/1,
status/1,
- callback/3,
- reset/0,
- capabilities/1,
- capabilities/2]).
+ callback/3]).
+
+%% "Testing" backend API
+-export([reset/0]).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.
-define(API_VERSION, 1).
--define(CAPABILITIES, [async_fold]).
+-define(CAPABILITIES, [async_fold, indexes]).
+
+%% Macros for working with indexes
+-define(DELETE_PTN(B,K), {{B,'_','_',K},'_'}).
--record(state, {data_ref :: integer() | atom(),
- time_ref :: integer() | atom(),
+%% ETS table name macros so we can break encapsulation for testing
+%% mode
+-define(DNAME(P), list_to_atom("riak_kv_"++integer_to_list(P))).
+-define(INAME(P), list_to_atom("riak_kv_"++integer_to_list(P)++"_i")).
+-define(TNAME(P), list_to_atom("riak_kv_"++integer_to_list(P)++"_t")).
+
+-record(state, {data_ref :: ets:tid(),
+ index_ref :: ets:tid(),
+ time_ref :: ets:tid(),
max_memory :: undefined | integer(),
used_memory=0 :: integer(),
ttl :: integer()}).
@@ -76,20 +89,11 @@
%% Public API
%% ===================================================================
-%% TestServer reset
-
--spec reset() -> ok | {error, timeout}.
-reset() ->
- {ok, Ring} = riak_core_ring_manager:get_my_ring(),
- [ catch ets:delete_all_objects(list_to_atom("kv" ++ integer_to_list(P))) ||
- P <- riak_core_ring:my_indices(Ring) ],
- ok.
-
%% KV Backend API
%% @doc Return the major version of the
-%% current API and a capabilities list.
--spec api_version() -> {ok, integer()} | {integer(), [atom()]}.
+%% current API.
+-spec api_version() -> {ok, integer()}.
api_version() ->
case lists:member({capabilities, 1}, riak_kv_backend:behaviour_info(callbacks)) of
true -> % Using 1.1 API or later
@@ -110,23 +114,33 @@ capabilities(_, _) ->
%% @doc Start the memory backend
-spec start(integer(), config()) -> {ok, state()}.
-start(Partition, _Config) ->
- %% TTL = config_value(ttl, Config),
- %% MemoryMB = config_value(max_memory, Config),
- %% case MemoryMB of
- %% undefined ->
- %% MaxMemory = undefined,
- %% TimeRef = undefined;
- %% _ ->
- %% MaxMemory = MemoryMB * 1024 * 1024,
- %% TimeRef = ets:new(list_to_atom(integer_to_list(Partition)), [ordered_set])
- %% end,
- DataRef = ets:new(list_to_atom("kv" ++ integer_to_list(Partition)), [named_table, public]),
- {ok, #state{data_ref=DataRef
- %% max_memory=MaxMemory,
- %% time_ref=TimeRef,
- %% ttl=TTL
- }}.
+%% Bug in riak_kv_vnode in 1.0
+start(Partition, [{async_folds,_}=AFolds, Rest]) when is_list(Rest) ->
+ start(Partition, [AFolds|Rest]);
+start(Partition, Config) ->
+ TTL = app_helper:get_prop_or_env(ttl, Config, memory_backend),
+ MemoryMB = app_helper:get_prop_or_env(max_memory, Config, memory_backend),
+ TableOpts = case app_helper:get_prop_or_env(test, Config, memory_backend) of
+ true ->
+ [ordered_set, public, named_table];
+ _ ->
+ [ordered_set]
+ end,
+ case MemoryMB of
+ undefined ->
+ MaxMemory = undefined,
+ TimeRef = undefined;
+ _ ->
+ MaxMemory = MemoryMB * 1024 * 1024,
+ TimeRef = ets:new(?TNAME(Partition), TableOpts)
+ end,
+ IndexRef = ets:new(?INAME(Partition), TableOpts),
+ DataRef = ets:new(?DNAME(Partition), TableOpts),
+ {ok, #state{data_ref=DataRef,
+ index_ref=IndexRef,
+ max_memory=MaxMemory,
+ time_ref=TimeRef,
+ ttl=TTL}}.
%% @doc Stop the memory backend
-spec stop(state()) -> ok.
@@ -148,14 +162,27 @@ stop(#state{data_ref=DataRef,
{ok, not_found, state()} |
{error, term(), state()}.
get(Bucket, Key, State=#state{data_ref=DataRef,
+ index_ref=IndexRef,
+ used_memory=UsedMemory,
+ max_memory=MaxMemory,
ttl=TTL}) ->
case ets:lookup(DataRef, {Bucket, Key}) of
[] -> {error, not_found, State};
- [{{Bucket, Key}, {{ts, Timestamp}, Val}}] ->
+ [{{Bucket, Key}, {{ts, Timestamp}, Val}}=Object] ->
case exceeds_ttl(Timestamp, TTL) of
true ->
- delete(Bucket, Key, undefined, State),
- {error, not_found, State};
+ %% Because we do not have the IndexSpecs, we must
+ %% delete the object directly and all index
+ %% entries blindly using match_delete.
+ ets:delete(DataRef, {Bucket, Key}),
+ ets:match_delete(IndexRef, ?DELETE_PTN(Bucket, Key)),
+ case MaxMemory of
+ undefined ->
+ UsedMemory1 = UsedMemory;
+ _ ->
+ UsedMemory1 = UsedMemory - object_size(Object)
+ end,
+ {error, not_found, State#state{used_memory=UsedMemory1}};
false ->
{ok, Val, State}
end;
@@ -166,18 +193,15 @@ get(Bucket, Key, State=#state{data_ref=DataRef,
end.
%% @doc Insert an object into the memory backend.
-%% NOTE: The memory backend does not currently
-%% support secondary indexing and the _IndexSpecs
-%% parameter is ignored.
-type index_spec() :: {add, Index, SecondaryKey} | {remove, Index, SecondaryKey}.
-spec put(riak_object:bucket(), riak_object:key(), [index_spec()], binary(), state()) ->
- {ok, state()} |
- {error, term(), state()}.
-put(Bucket, PrimaryKey, _IndexSpecs, Val, State=#state{data_ref=DataRef,
- max_memory=MaxMemory,
- time_ref=TimeRef,
- ttl=TTL,
- used_memory=UsedMemory}) ->
+ {ok, state()}.
+put(Bucket, PrimaryKey, IndexSpecs, Val, State=#state{data_ref=DataRef,
+ index_ref=IndexRef,
+ max_memory=MaxMemory,
+ time_ref=TimeRef,
+ ttl=TTL,
+ used_memory=UsedMemory}) ->
Now = now(),
case TTL of
undefined ->
@@ -185,36 +209,29 @@ put(Bucket, PrimaryKey, _IndexSpecs, Val, State=#state{data_ref=DataRef,
_ ->
Val1 = {{ts, Now}, Val}
end,
- case do_put(Bucket, PrimaryKey, Val1, DataRef) of
- {ok, Size} ->
- %% If the memory is capped update timestamp table
- %% and check if the memory usage is over the cap.
- case MaxMemory of
- undefined ->
- UsedMemory1 = UsedMemory;
- _ ->
- time_entry(Bucket, PrimaryKey, Now, TimeRef),
- Freed = trim_data_table(MaxMemory,
- UsedMemory + Size,
- DataRef,
- TimeRef,
- 0),
- UsedMemory1 = UsedMemory + Size - Freed
- end,
- {ok, State#state{used_memory=UsedMemory1}};
- {error, Reason} ->
- {error, Reason, State}
- end.
+ {ok, Size} = do_put(Bucket, PrimaryKey, Val1, IndexSpecs, DataRef, IndexRef),
+ case MaxMemory of
+ undefined ->
+ UsedMemory1 = UsedMemory;
+ _ ->
+ time_entry(Bucket, PrimaryKey, Now, TimeRef),
+ Freed = trim_data_table(MaxMemory,
+ UsedMemory + Size,
+ DataRef,
+ TimeRef,
+ IndexRef,
+ 0),
+ UsedMemory1 = UsedMemory + Size - Freed
+ end,
+ {ok, State#state{used_memory=UsedMemory1}}.
%% @doc Delete an object from the memory backend
-%% NOTE: The memory backend does not currently
-%% support secondary indexing and the _IndexSpecs
-%% parameter is ignored.
-spec delete(riak_object:bucket(), riak_object:key(), [index_spec()], state()) ->
{ok, state()}.
-delete(Bucket, Key, _IndexSpecs, State=#state{data_ref=DataRef,
- time_ref=TimeRef,
- used_memory=UsedMemory}) ->
+delete(Bucket, Key, IndexSpecs, State=#state{data_ref=DataRef,
+ index_ref=IndexRef,
+ time_ref=TimeRef,
+ used_memory=UsedMemory}) ->
case TimeRef of
undefined ->
UsedMemory1 = UsedMemory;
@@ -231,6 +248,7 @@ delete(Bucket, Key, _IndexSpecs, State=#state{data_ref=DataRef,
UsedMemory1 = UsedMemory
end
end,
+ update_indexes(Bucket, Key, IndexSpecs, IndexRef),
ets:delete(DataRef, {Bucket, Key}),
{ok, State#state{used_memory=UsedMemory1}}.
@@ -259,15 +277,33 @@ fold_buckets(FoldBucketsFun, Acc, Opts, #state{data_ref=DataRef}) ->
any(),
[{atom(), term()}],
state()) -> {ok, term()} | {async, fun()}.
-fold_keys(FoldKeysFun, Acc, Opts, #state{data_ref=DataRef}) ->
- Bucket = proplists:get_value(bucket, Opts),
- FoldFun = fold_keys_fun(FoldKeysFun, Bucket),
+fold_keys(FoldKeysFun, Acc, Opts, #state{data_ref=DataRef,
+ index_ref=IndexRef}) ->
+
+ %% Figure out how we should limit the fold: by bucket, by
+ %% secondary index, or neither (fold across everything.)
+ Bucket = lists:keyfind(bucket, 1, Opts),
+ Index = lists:keyfind(index, 1, Opts),
+
+ %% Multiple limiters may exist. Take the most specific limiter,
+ %% get an appropriate folder function.
+ Folder = if
+ Index /= false ->
+ FoldFun = fold_keys_fun(FoldKeysFun, Index),
+ get_index_folder(FoldFun, Acc, Index, DataRef, IndexRef);
+ Bucket /= false ->
+ FoldFun = fold_keys_fun(FoldKeysFun, Bucket),
+ get_folder(FoldFun, Acc, DataRef);
+ true ->
+ FoldFun = fold_keys_fun(FoldKeysFun, undefined),
+ get_folder(FoldFun, Acc, DataRef)
+ end,
+
case lists:member(async_fold, Opts) of
true ->
- {async, get_folder(FoldFun, Acc, DataRef)};
+ {async, Folder};
false ->
- Acc0 = ets:foldl(FoldFun, Acc, DataRef),
- {ok, Acc0}
+ {ok, Folder()}
end.
%% @doc Fold over all the objects for one or all buckets.
@@ -289,8 +325,10 @@ fold_objects(FoldObjectsFun, Acc, Opts, #state{data_ref=DataRef}) ->
%% @doc Delete all objects from this memory backend
-spec drop(state()) -> {ok, state()}.
drop(State=#state{data_ref=DataRef,
+ index_ref=IndexRef,
time_ref=TimeRef}) ->
ets:delete_all_objects(DataRef),
+ ets:delete_all_objects(IndexRef),
case TimeRef of
undefined ->
ok;
@@ -308,14 +346,18 @@ is_empty(#state{data_ref=DataRef}) ->
%% @doc Get the status information for this memory backend
-spec status(state()) -> [{atom(), term()}].
status(#state{data_ref=DataRef,
+ index_ref=IndexRef,
time_ref=TimeRef}) ->
DataStatus = ets:info(DataRef),
+ IndexStatus = ets:info(IndexRef),
case TimeRef of
undefined ->
- [{data_table_status, DataStatus}];
+ [{data_table_status, DataStatus},
+ {index_table_status, IndexStatus}];
_ ->
TimeStatus = ets:info(TimeRef),
[{data_table_status, DataStatus},
+ {index_table_status, IndexStatus},
{time_table_status, TimeStatus}]
end.
@@ -324,6 +366,24 @@ status(#state{data_ref=DataRef,
callback(_Ref, _Msg, State) ->
{ok, State}.
+%% @doc Resets state of all running memory backends on the local
+%% node. The `riak_kv' environment variable `memory_backend' must
+%% contain the `test' property, set to `true' for this to work.
+-spec reset() -> ok | {error, reset_disabled}.
+reset() ->
+ reset(app_helper:get_env(memory_backend, test, app_helper:get_env(riak_kv, test)), app_helper:get_env(riak_kv, storage_backend)).
+
+reset(true, ?MODULE) ->
+ {ok, Ring} = riak_core_ring_manager:get_my_ring(),
+ [ begin
+ catch ets:delete_all_objects(?DNAME(I)),
+ catch ets:delete_all_objects(?INAME(I)),
+ catch ets:delete_all_objects(?TNAME(I))
+ end || I <- riak_core_ring:my_indices(Ring) ],
@randysecrist Owner

Ok, so the actual cleanup occurs if using the memory backend & the test property is true.

Does the test server need to do anything to the vnodes to make them aware of the backend clear? See the reference to this more invasive reset method that recently had to be updated for riak 1.1 ... https://github.com/randysecrist/ripple-contrib/blob/master/test/support/ripple_test_server.rb#L119

@seancribbs Owner

Good point. I actually have tried the "vnode kill" method with the memory backend, but somehow it didn't reliably clear the data. Note that ets:delete_all_objects/1 is guaranteed to be atomic and isolated; so assuming the test is doing nothing at the moment and no requests are in-flight (a reasonable but not fool-proof on a testing setup), the ets:delete_all_objects/1 should leave nothing behind.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+ ok;
+reset(_, _) ->
+ {error, reset_disabled}.
+
%% ===================================================================
%% Internal functions
%% ===================================================================
@@ -349,32 +409,43 @@ fold_buckets_fun(FoldBucketsFun) ->
%% Return a function to fold over keys on this backend
fold_keys_fun(FoldKeysFun, undefined) ->
fun({{Bucket, Key}, _}, Acc) ->
- FoldKeysFun(Bucket, Key, Acc)
+ FoldKeysFun(Bucket, Key, Acc);
+ (_, Acc) ->
+ Acc
end;
-fold_keys_fun(FoldKeysFun, Bucket) ->
- fun({{B, Key}, _}, Acc) ->
- case B =:= Bucket of
- true ->
- FoldKeysFun(Bucket, Key, Acc);
- false ->
- Acc
- end
+fold_keys_fun(FoldKeysFun, {bucket, FilterBucket}) ->
+ fun({{Bucket, Key}, _}, Acc) when Bucket == FilterBucket ->
+ FoldKeysFun(Bucket, Key, Acc);
+ (_, Acc) ->
+ Acc
+ end;
+fold_keys_fun(FoldKeysFun, {index, FilterBucket, {eq, <<"$bucket">>, _}}) ->
+ %% 2I exact match query on special $bucket field...
+ fold_keys_fun(FoldKeysFun, {bucket, FilterBucket});
+fold_keys_fun(FoldKeysFun, {index, FilterBucket, {range, <<"$key">>, _, _}}) ->
+ %% 2I range query on special $key field...
+ fold_keys_fun(FoldKeysFun, {bucket, FilterBucket});
+fold_keys_fun(FoldKeysFun, {index, _FilterBucket, _Query}) ->
+ fun({{Bucket, _FilterField, _FilterTerm, Key}, _}, Acc) ->
+ FoldKeysFun(Bucket, Key, Acc);
+ (_, Acc) ->
+ Acc
end.
+
%% @private
%% Return a function to fold over keys on this backend
fold_objects_fun(FoldObjectsFun, undefined) ->
fun({{Bucket, Key}, Value}, Acc) ->
- FoldObjectsFun(Bucket, Key, Value, Acc)
+ FoldObjectsFun(Bucket, Key, Value, Acc);
+ (_, Acc) ->
+ Acc
end;
-fold_objects_fun(FoldObjectsFun, Bucket) ->
- fun({{B, Key}, Value}, Acc) ->
- case B =:= Bucket of
- true ->
- FoldObjectsFun(Bucket, Key, Value, Acc);
- false ->
- Acc
- end
+fold_objects_fun(FoldObjectsFun, FilterBucket) ->
+ fun({{Bucket, Key}, Value}, Acc) when Bucket == FilterBucket->
+ FoldObjectsFun(Bucket, Key, Value, Acc);
+ (_, Acc) ->
+ Acc
end.
%% @private
@@ -384,29 +455,90 @@ get_folder(FoldFun, Acc, DataRef) ->
end.
%% @private
-do_put(Bucket, Key, Val, Ref) ->
- Object = {{Bucket, Key}, Val},
- true = ets:insert(Ref, Object),
- {ok, object_size(Object)}.
+get_index_folder(Folder, Acc0, {index, Bucket, {eq, <<"$bucket">>, _}}, DataRef, _) ->
+ %% For the special $bucket index, turn it into a fold over the
+ %% data table.
+ fun() ->
+ key_range_folder(Folder, Acc0, DataRef, {Bucket, <<>>}, Bucket)
+ end;
+get_index_folder(Folder, Acc0, {index, Bucket, {range, <<"$key">>, Min, Max}}, DataRef, _) ->
+ %% For the special range lookup on the $key index, turn it into a
+ %% fold on the data table
+ fun() ->
+ key_range_folder(Folder, Acc0, DataRef, {Bucket, Min}, {Bucket, Min, Max})
+ end;
+get_index_folder(Folder, Acc0, {index, Bucket, {eq, Field, Term}}, _, IndexRef) ->
+ fun() ->
+ index_range_folder(Folder, Acc0, IndexRef, {Bucket, Field, Term, undefined}, {Bucket, Field, Term, Term})
+ end;
+get_index_folder(Folder, Acc0, {index, Bucket, {range, Field, Min, Max}}, _, IndexRef) ->
+ fun() ->
+ index_range_folder(Folder, Acc0, IndexRef, {Bucket, Field, Min, undefined}, {Bucket, Field, Min, Max})
+ end.
+
+%% Iterates over a range of keys, for the special $key and $bucket
+%% indexes.
%% @private
-config_value(Key, Config) ->
- config_value(Key, Config, undefined).
+-spec key_range_folder(function(), term(), ets:tid(), {riak_object:bucket(), riak_object:key()}, binary() | {riak_object:bucket(), term(), term()}) -> term().
+key_range_folder(Folder, Acc0, DataRef, {B,_}=DataKey, B) ->
+ case ets:lookup(DataRef, DataKey) of
+ [] ->
+ key_range_folder(Folder, Acc0, DataRef, ets:next(DataRef, DataKey), B);
+ [Object] ->
+ Acc = Folder(Object, Acc0),
+ key_range_folder(Folder, Acc, DataRef, ets:next(DataRef, DataKey), B)
+ end;
+key_range_folder(Folder, Acc0, DataRef, {B,K}=DataKey, {B, Min, Max}=Query) when K >= Min, K =< Max ->
+ case ets:lookup(DataRef, DataKey) of
+ [] ->
+ key_range_folder(Folder, Acc0, DataRef, ets:next(DataRef, DataKey), Query);
+ [Object] ->
+ Acc = Folder(Object, Acc0),
+ key_range_folder(Folder, Acc, DataRef, ets:next(DataRef, DataKey), Query)
+ end;
+key_range_folder(_Folder, Acc, _DataRef, _DataKey, _Query) ->
+ Acc.
+
+%% Iterates over a range of index postings
+index_range_folder(Folder, Acc0, IndexRef, {B, I, V, _K}=IndexKey, {B, I, Min, Max}=Query) when V >= Min, V =< Max ->
+ case ets:lookup(IndexRef, IndexKey) of
+ [] ->
+ %% This will happen on the first iteration, where the key
+ %% does not exist. In all other cases, ETS will give us a
+ %% real key from next/2.
+ index_range_folder(Folder, Acc0, IndexRef, ets:next(IndexRef, IndexKey), Query);
+ [Posting] ->
+ Acc = Folder(Posting, Acc0),
+ index_range_folder(Folder, Acc, IndexRef, ets:next(IndexRef, IndexKey), Query)
+ end;
+index_range_folder(_Folder, Acc, _IndexRef, _IndexKey, _Query) ->
+ Acc.
+
%% @private
-config_value(Key, Config, Default) ->
- case proplists:get_value(Key, Config) of
- undefined ->
- app_helper:get_env(memory_backend, Key, Default);
- Value ->
- Value
- end.
+do_put(Bucket, Key, Val, IndexSpecs, DataRef, IndexRef) ->
+ Object = {{Bucket, Key}, Val},
+ true = ets:insert(DataRef, Object),
+ update_indexes(Bucket, Key, IndexSpecs, IndexRef),
+ {ok, object_size(Object)}.
%% Check if this timestamp is past the ttl setting.
exceeds_ttl(Timestamp, TTL) ->
Diff = (timer:now_diff(now(), Timestamp) / 1000 / 1000),
Diff > TTL.
+update_indexes(_Bucket, _Key, undefined, _IndexRef) ->
+ ok;
+update_indexes(_Bucket, _Key, [], _IndexRef) ->
+ ok;
+update_indexes(Bucket, Key, [{remove, Field, Value}|Rest], IndexRef) ->
+ true = ets:delete(IndexRef, {Bucket, Field, Value, Key}),
+ update_indexes(Bucket, Key, Rest, IndexRef);
+update_indexes(Bucket, Key, [{add, Field, Value}|Rest], IndexRef) ->
+ true = ets:insert(IndexRef, {{Bucket, Field, Value, Key}, <<>>}),
+ update_indexes(Bucket, Key, Rest, IndexRef).
+
%% @private
time_entry(Bucket, Key, Now, TimeRef) ->
ets:insert(TimeRef, {Now, {Bucket, Key}}).
@@ -414,20 +546,21 @@ time_entry(Bucket, Key, Now, TimeRef) ->
%% @private
%% @doc Dump some entries if the max memory size has
%% been breached.
-trim_data_table(MaxMemory, UsedMemory, _, _, Freed) when
+trim_data_table(MaxMemory, UsedMemory, _, _, _, Freed) when
(UsedMemory - Freed) =< MaxMemory ->
Freed;
-trim_data_table(MaxMemory, UsedMemory, DataRef, TimeRef, Freed) ->
+trim_data_table(MaxMemory, UsedMemory, DataRef, TimeRef, IndexRef, Freed) ->
%% Delete the oldest object
- OldestSize = delete_oldest(DataRef, TimeRef),
+ OldestSize = delete_oldest(DataRef, TimeRef, IndexRef),
trim_data_table(MaxMemory,
UsedMemory,
DataRef,
TimeRef,
+ IndexRef,
Freed + OldestSize).
%% @private
-delete_oldest(DataRef, TimeRef) ->
+delete_oldest(DataRef, TimeRef, IndexRef) ->
OldestTime = ets:first(TimeRef),
case OldestTime of
'$end_of_table' ->
@@ -437,8 +570,10 @@ delete_oldest(DataRef, TimeRef) ->
ets:delete(TimeRef, OldestTime),
case ets:lookup(DataRef, OldestKey) of
[] ->
- delete_oldest(DataRef, TimeRef);
+ delete_oldest(DataRef, TimeRef, IndexRef);
[Object] ->
+ {Bucket, Key} = OldestKey,
+ ets:match_delete(IndexRef, ?DELETE_PTN(Bucket, Key)),
ets:delete(DataRef, OldestKey),
object_size(Object)
end
@@ -522,8 +657,8 @@ eqc_test_() ->
[
{timeout, 60000,
[?_assertEqual(true,
- backend_eqc:test(?MODULE, true))]}
- ]}]}]}.
+ backend_eqc:test(?MODULE, true))]}
+ ]}]}]}.
setup() ->
application:load(sasl),
View
BIN  erl_src/riak_search_test_backend.beam
Binary file not shown
View
6 erl_src/riak_search_test_backend.erl
@@ -25,18 +25,18 @@
]).
-include_lib("riak_search/include/riak_search.hrl").
-
+-define(T(P), list_to_atom("rs" ++ integer_to_list(P))).
-record(state, {partition, table}).
reset() ->
{ok, Ring} = riak_core_ring_manager:get_my_ring(),
- [ ets:delete_all_objects(list_to_atom("rs" ++ integer_to_list(P))) ||
+ [ catch ets:delete_all_objects(?T(P)) ||
P <- riak_core_ring:my_indices(Ring) ],
riak_search_config:clear(),
ok.
start(Partition, _Config) ->
- Table = ets:new(list_to_atom("rs" ++ integer_to_list(Partition)),
+ Table = ets:new(?T(Partition),
[named_table, public, ordered_set]),
{ok, #state{partition=Partition, table=Table}}.
View
1  lib/riak/node/configuration.rb
@@ -167,6 +167,7 @@ def configure_data
def configure_logging
if env[:lager]
env[:lager][:handlers] = {
+ :lager_console_backend => :info,
:lager_file_backend => [
Tuple[(log+"error.log").expand_path.to_s, :error],
Tuple[(log+"console.log").expand_path.to_s, :info]
View
7 lib/riak/node/console.rb
@@ -30,7 +30,6 @@ def self.open(node)
def initialize(pipedir, nodename)
@nodename = nodename
@mutex = Mutex.new
- @winch = Signal.trap("WINCH", &method(:handle_winch))
@prompt = /\(#{Regexp.escape(nodename)}\)\d+>\s*/
pipedir = Pathname(pipedir)
pipedir.children.each do |path|
@@ -108,12 +107,6 @@ def close
end
protected
- # Handles the "window change" signal by faking it.
- def handle_winch
- debug "WINCHED!"
- @w.print "\033_winsize=80,26\033\\"
- Signal.trap("WINCH", &method(:handle_winch))
- end
def debug(msg)
$stderr.puts msg if ENV["DEBUG_RIAK_CONSOLE"]
View
8 lib/riak/test_server.rb
@@ -17,6 +17,9 @@ def initialize(configuration = {})
configuration[:env] ||= {}
configuration[:env][:riak_kv] ||= {}
(configuration[:env][:riak_kv][:add_paths] ||= []) << File.expand_path("../../../erl_src", __FILE__)
+ configuration[:env][:riak_kv][:test] = true
+ configuration[:env][:memory_backend] ||={}
+ configuration[:env][:memory_backend][:test] = true
configuration[:env][:riak_search] ||= {}
configuration[:env][:riak_search][:search_backend] = :riak_search_test_backend
super configuration
@@ -75,8 +78,11 @@ def configure_data
super
if version < "1.0.0"
env[:riak_kv][:storage_backend] = :riak_kv_test014_backend
- else
+ elsif version =~ /^1\.[01]\.\d+$/ # 1.0 and 1.1 series
env[:riak_kv][:storage_backend] = :riak_kv_test_backend
+ else
+ # TODO: change this when 1.2+ is released, if it includes riak_kv#314
+ env[:riak_kv][:storage_backend] = :riak_kv_memory_backend
end
end
end
View
21 spec/support/unified_backend_examples.rb
@@ -241,6 +241,27 @@
end
end
+ # get_index
+ context "querying secondary indexes" do
+ before do
+ 50.times do |i|
+ @client.bucket('test').new(i.to_s).tap do |obj|
+ obj.indexes["index_int"] << i
+ obj.data = [i]
+ @backend.store_object(obj)
+ end
+ end
+ end
+
+ it "should find keys for an equality query" do
+ @backend.get_index('test', 'index_int', 20).should == ["20"]
+ end
+
+ it "should find keys for a range query" do
+ @backend.get_index('test', 'index_int', 19..21).should =~ ["19","20", "21"]
+ end
+ end
+
# mapred
context "performing MapReduce" do
before do
Something went wrong with that request. Please try again.