Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

remove luke usage

riak_pipe powers MapReduce queries now
  • Loading branch information...
commit 28c6bdf08ba75d71b7655b2afa0a1a8da1061f05 1 parent e639737
@beerriot beerriot authored
Showing with 114 additions and 2,439 deletions.
  1. +0 −1  rebar.config
  2. +9 −184 src/riak_client.erl
  3. +0 −3  src/riak_index.erl
  4. +0 −1  src/riak_kv.app.src
  5. +11 −34 src/riak_kv_buckets_fsm.erl
  6. +9 −11 src/riak_kv_encoding_migrate.erl
  7. +15 −46 src/riak_kv_index_fsm.erl
  8. +0 −5 src/riak_kv_js_manager.erl
  9. +18 −48 src/riak_kv_keys_fsm.erl
  10. +21 −53 src/riak_kv_keys_fsm_legacy.erl
  11. +0 −352 src/riak_kv_lru.erl
  12. +0 −263 src/riak_kv_map_master.erl
  13. +0 −301 src/riak_kv_map_phase.erl
  14. +0 −311 src/riak_kv_mapper.erl
  15. +0 −48 src/riak_kv_mapper_sup.erl
  16. +0 −80 src/riak_kv_mapred_cache.erl
  17. +0 −75 src/riak_kv_mapred_planner.erl
  18. +0 −214 src/riak_kv_mapred_query.erl
  19. +1 −1  src/riak_kv_mapred_term.erl
  20. +4 −4 src/riak_kv_mapreduce.erl
  21. +3 −96 src/riak_kv_pb_mapred.erl
  22. +0 −35 src/riak_kv_phase_proto.erl
  23. +0 −112 src/riak_kv_reduce_phase.erl
  24. +1 −13 src/riak_kv_sup.erl
  25. +1 −1  src/riak_kv_test_util.erl
  26. +1 −13 src/riak_kv_util.erl
  27. +0 −2  src/riak_kv_vnode.erl
  28. +11 −16 src/riak_kv_wm_link_walker.erl
  29. +4 −104 src/riak_kv_wm_mapred.erl
  30. +5 −9 test/keys_fsm_eqc.erl
  31. +0 −3  test/mapred_test.erl
View
1  rebar.config
@@ -10,7 +10,6 @@
{deps, [
{riak_core, ".*", {git, "git://github.com/basho/riak_core", "master"}},
- {luke, ".*", {git, "git://github.com/basho/luke", "master"}},
{erlang_js, ".*", {git, "git://github.com/basho/erlang_js", "master"}},
{bitcask, ".*", {git, "git://github.com/basho/bitcask", "master"}},
{merge_index, ".*", {git, "git://github.com/basho/merge_index",
View
193 src/riak_client.erl
@@ -25,12 +25,6 @@
-module(riak_client, [Node,ClientId]).
-author('Justin Sheehy <justin@basho.com>').
--export([mapred/2,mapred/3,mapred/4]).
--export([mapred_stream/2,mapred_stream/3,mapred_stream/4]).
--export([mapred_bucket/2,mapred_bucket/3,mapred_bucket/4]).
--export([mapred_bucket_stream/3,mapred_bucket_stream/4,mapred_bucket_stream/5,
- mapred_bucket_stream/6]).
--export([mapred_dynamic_inputs_stream/3]).
-export([get/2, get/3,get/4]).
-export([put/1, put/2,put/3,put/4,put/5]).
-export([delete/2,delete/3,delete/4]).
@@ -56,158 +50,6 @@
-type riak_client() :: term().
-%% @spec mapred(Inputs :: riak_kv_mapred_term:mapred_inputs(),
-%% Query :: [riak_kv_mapred_query:mapred_queryterm()]) ->
-%% {ok, riak_kv_mapred_query:mapred_result()} |
-%% {error, {bad_qterm, riak_kv_mapred_query:mapred_queryterm()}} |
-%% {error, timeout} |
-%% {error, Err :: term()}
-%% @doc Perform a map/reduce job across the cluster.
-%% See the map/reduce documentation for explanation of behavior.
-%% @equiv mapred(Inputs, Query, default_timeout())
-mapred(Inputs,Query) -> mapred(Inputs,Query,?DEFAULT_TIMEOUT).
-
-%% @spec mapred(Inputs :: riak_kv_mapred_term:mapred_inputs(),
-%% Query :: [riak_kv_mapred_query:mapred_queryterm()],
-%% TimeoutMillisecs :: integer() | 'infinity') ->
-%% {ok, riak_kv_mapred_query:mapred_result()} |
-%% {error, {bad_qterm, riak_kv_mapred_query:mapred_queryterm()}} |
-%% {error, timeout} |
-%% {error, Err :: term()}
-%% @doc Perform a map/reduce job across the cluster.
-%% See the map/reduce documentation for explanation of behavior.
-mapred(Inputs,Query,Timeout) ->
- mapred(Inputs,Query,undefined,Timeout).
-
-%% @spec mapred(Inputs :: riak_kv_mapred_term:mapred_inputs(),
-%% Query :: [riak_kv_mapred_query:mapred_queryterm()],
-%% TimeoutMillisecs :: integer() | 'infinity',
-%% ResultTransformer :: function()) ->
-%% {ok, riak_kv_mapred_query:mapred_result()} |
-%% {error, {bad_qterm, riak_kv_mapred_query:mapred_queryterm()}} |
-%% {error, timeout} |
-%% {error, Err :: term()}
-%% @doc Perform a map/reduce job across the cluster.
-%% See the map/reduce documentation for explanation of behavior.
-mapred(Inputs,Query,ResultTransformer,Timeout) when is_binary(Inputs) orelse
- is_tuple(Inputs) ->
- case is_binary(Inputs) orelse is_key_filter(Inputs) of
- true ->
- mapred_bucket(Inputs, Query, ResultTransformer, Timeout);
- false ->
- Me = self(),
- case mapred_stream(Query,Me,ResultTransformer,Timeout) of
- {ok, {ReqId, FlowPid}} ->
- mapred_dynamic_inputs_stream(FlowPid, Inputs, Timeout),
- luke_flow:finish_inputs(FlowPid),
- luke_flow:collect_output(ReqId, Timeout);
- Error ->
- Error
- end
- end;
-mapred(Inputs,Query,ResultTransformer,Timeout)
- when is_list(Query),
- (is_integer(Timeout) orelse Timeout =:= infinity) ->
- Me = self(),
- case mapred_stream(Query,Me,ResultTransformer,Timeout) of
- {ok, {ReqId, FlowPid}} ->
- case is_list(Inputs) of
- true ->
- add_inputs(FlowPid, Inputs);
- false ->
- mapred_dynamic_inputs_stream(FlowPid, Inputs, Timeout)
- end,
- luke_flow:finish_inputs(FlowPid),
- luke_flow:collect_output(ReqId, Timeout);
- Error ->
- Error
- end.
-
-%% @spec mapred_stream(Query :: [riak_kv_mapred_query:mapred_queryterm()],
-%% ClientPid :: pid()) ->
-%% {ok, {ReqId :: term(), MR_FSM_PID :: pid()}} |
-%% {error, {bad_qterm, riak_kv_mapred_query:mapred_queryterm()}} |
-%% {error, Err :: term()}
-%% @doc Perform a streaming map/reduce job across the cluster.
-%% See the map/reduce documentation for explanation of behavior.
-mapred_stream(Query,ClientPid) ->
- mapred_stream(Query,ClientPid,?DEFAULT_TIMEOUT).
-
-%% @spec mapred_stream(Query :: [riak_kv_mapred_query:mapred_queryterm()],
-%% ClientPid :: pid(),
-%% TimeoutMillisecs :: integer() | 'infinity') ->
-%% {ok, {ReqId :: term(), MR_FSM_PID :: pid()}} |
-%% {error, {bad_qterm, riak_kv_mapred_query:mapred_queryterm()}} |
-%% {error, Err :: term()}
-%% @doc Perform a streaming map/reduce job across the cluster.
-%% See the map/reduce documentation for explanation of behavior.
-mapred_stream(Query, ClientPid, Timeout) ->
- mapred_stream(Query, ClientPid, undefined, Timeout).
-
-%% @spec mapred_stream(Query :: [riak_kv_mapred_query:mapred_queryterm()],
-%% ClientPid :: pid(),
-%% TimeoutMillisecs :: integer() | 'infinity',
-%% ResultTransformer :: function()) ->
-%% {ok, {ReqId :: term(), MR_FSM_PID :: pid()}} |
-%% {error, {bad_qterm, riak_kv_mapred_query:mapred_queryterm()}} |
-%% {error, Err :: term()}
-%% @doc Perform a streaming map/reduce job across the cluster.
-%% See the map/reduce documentation for explanation of behavior.
-mapred_stream(Query,ClientPid,ResultTransformer,Timeout)
- when is_list(Query), is_pid(ClientPid),
- (is_integer(Timeout) orelse Timeout =:= infinity) ->
- ReqId = mk_reqid(),
- case riak_kv_mapred_query:start(Node, ClientPid, ReqId, Query, ResultTransformer, Timeout) of
- {ok, Pid} ->
- {ok, {ReqId, Pid}};
- Error ->
- Error
- end.
-
-mapred_bucket_stream(Bucket, Query, ClientPid) ->
- mapred_bucket_stream(Bucket, Query, ClientPid, ?DEFAULT_TIMEOUT).
-
-mapred_bucket_stream(Bucket, Query, ClientPid, Timeout) ->
- mapred_bucket_stream(Bucket, Query, ClientPid, undefined, Timeout).
-
-mapred_bucket_stream(Bucket, Query, ClientPid, ResultTransformer, Timeout) ->
- {ok,{MR_ReqId,MR_FSM}} = mapred_stream(Query,ClientPid,ResultTransformer,Timeout),
- {ok,_Stream_ReqID} = stream_list_keys(Bucket, Timeout,
- MR_FSM, mapred),
- {ok,MR_ReqId}.
-
-
-%% @deprecated Only in place for backwards compatibility.
-mapred_bucket_stream(Bucket, Query, ClientPid, ResultTransformer, Timeout, _) ->
- mapred_bucket_stream(Bucket, Query, ClientPid, ResultTransformer, Timeout).
-
-mapred_bucket(Bucket, Query) ->
- mapred_bucket(Bucket, Query, ?DEFAULT_TIMEOUT).
-
-mapred_bucket(Bucket, Query, Timeout) ->
- mapred_bucket(Bucket, Query, undefined, Timeout).
-
-mapred_bucket(Bucket, Query, ResultTransformer, Timeout) ->
- Me = self(),
- {ok,MR_ReqId} = mapred_bucket_stream(Bucket, Query, Me,
- ResultTransformer, Timeout),
- luke_flow:collect_output(MR_ReqId, Timeout).
-
--define(PRINT(Var), io:format("DEBUG: ~p:~p - ~p~n~n ~p~n~n", [?MODULE, ?LINE, ??Var, Var])).
-
-%% An InputDef defines a Module and Function to call to generate
-%% inputs for a map/reduce job. Should return {ok,
-%% LukeReqID}. Ideally, we'd combine both the other input types (BKeys
-%% and Bucket) into this approach, but postponing until after a code
-%% review of Map/Reduce.
-mapred_dynamic_inputs_stream(FSMPid, InputDef, Timeout) ->
- case InputDef of
- {modfun, Mod, Fun, Options} ->
- Mod:Fun(FSMPid, Options, Timeout);
- _ ->
- throw({invalid_inputdef, InputDef})
- end.
-
%% @spec get(riak_object:bucket(), riak_object:key()) ->
%% {ok, riak_object:riak_object()} |
%% {error, notfound} |
@@ -469,7 +311,7 @@ list_keys(Bucket, Timeout, ErrorTolerance) when is_integer(Timeout) ->
Me = self(),
ReqId = mk_reqid(),
FSM_Timeout = trunc(Timeout / 8),
- riak_kv_keys_fsm_legacy_sup:start_keys_fsm(Node, [ReqId, Bucket, FSM_Timeout, plain, ErrorTolerance, Me]),
+ riak_kv_keys_fsm_legacy_sup:start_keys_fsm(Node, [ReqId, Bucket, FSM_Timeout, ErrorTolerance, Me]),
wait_for_listkeys(ReqId, Timeout);
%% @spec list_keys(riak_object:bucket(), TimeoutMillisecs :: integer()) ->
%% {ok, [Key :: riak_object:key()]} |
@@ -487,7 +329,7 @@ list_keys(Bucket, Filter, Timeout) ->
_ ->
Me = self(),
ReqId = mk_reqid(),
- riak_kv_keys_fsm_sup:start_keys_fsm(Node, [{raw, ReqId, Me}, [Bucket, Filter, Timeout, plain]]),
+ riak_kv_keys_fsm_sup:start_keys_fsm(Node, [{raw, ReqId, Me}, [Bucket, Filter, Timeout]]),
wait_for_listkeys(ReqId, Timeout)
end.
@@ -505,11 +347,11 @@ stream_list_keys(Bucket, Timeout, _) ->
stream_list_keys(Bucket, Timeout).
%% @deprecated Only in place for backwards compatibility.
-stream_list_keys(Bucket0, Timeout, ErrorTolerance, Client, ClientType) ->
+stream_list_keys(Bucket0, Timeout, ErrorTolerance, Client, _ClientType) ->
ReqId = mk_reqid(),
case build_filter(Bucket0) of
{ok, Filter} ->
- riak_kv_keys_fsm_legacy_sup:start_keys_fsm(Node, [ReqId, Filter, Timeout, ClientType, ErrorTolerance, Client]),
+ riak_kv_keys_fsm_legacy_sup:start_keys_fsm(Node, [ReqId, Filter, Timeout, ErrorTolerance, Client]),
{ok, ReqId};
Error ->
Error
@@ -550,8 +392,7 @@ stream_list_keys(Input, Timeout, Client, ClientType) when is_pid(Client) ->
Client},
[Bucket,
FilterExprs,
- Timeout,
- ClientType]]),
+ Timeout]]),
{ok, ReqId}
end;
Bucket ->
@@ -559,8 +400,7 @@ stream_list_keys(Input, Timeout, Client, ClientType) when is_pid(Client) ->
[{raw, ReqId, Client},
[Bucket,
none,
- Timeout,
- ClientType]]),
+ Timeout]]),
{ok, ReqId}
end
end;
@@ -636,7 +476,7 @@ list_buckets(Filter, Timeout) ->
_ ->
Me = self(),
ReqId = mk_reqid(),
- riak_kv_buckets_fsm_sup:start_buckets_fsm(Node, [{raw, ReqId, Me}, [Filter, Timeout, plain]]),
+ riak_kv_buckets_fsm_sup:start_buckets_fsm(Node, [{raw, ReqId, Me}, [Filter, Timeout]]),
wait_for_listbuckets(ReqId, Timeout)
end.
@@ -676,7 +516,7 @@ get_index(Bucket, Query) ->
get_index(Bucket, Query, Timeout) ->
Me = self(),
ReqId = mk_reqid(),
- riak_kv_index_fsm_sup:start_index_fsm(Node, [{raw, ReqId, Me}, [Bucket, none, Query, Timeout, plain]]),
+ riak_kv_index_fsm_sup:start_index_fsm(Node, [{raw, ReqId, Me}, [Bucket, none, Query, Timeout]]),
wait_for_query_results(ReqId, Timeout).
%% @spec stream_get_index(Bucket :: binary(),
@@ -700,7 +540,7 @@ stream_get_index(Bucket, Query) ->
stream_get_index(Bucket, Query, Timeout) ->
Me = self(),
ReqId = mk_reqid(),
- riak_kv_index_fsm_sup:start_index_fsm(Node, [{raw, ReqId, Me}, [Bucket, none, Query, Timeout, plain]]),
+ riak_kv_index_fsm_sup:start_index_fsm(Node, [{raw, ReqId, Me}, [Bucket, none, Query, Timeout]]),
{ok, ReqId}.
%% @spec set_bucket(riak_object:bucket(), [BucketProp :: {atom(),term()}]) -> ok
@@ -795,21 +635,6 @@ wait_for_query_results(ReqId, Timeout, Acc) ->
{error, timeout}
end.
-add_inputs(_FlowPid, []) ->
- ok;
-add_inputs(FlowPid, Inputs) when length(Inputs) < 100 ->
- luke_flow:add_inputs(FlowPid, Inputs);
- add_inputs(FlowPid, Inputs) ->
- {Current, Next} = lists:split(100, Inputs),
- luke_flow:add_inputs(FlowPid, Current),
- add_inputs(FlowPid, Next).
-
-is_key_filter({Bucket, Filters}) when is_binary(Bucket),
- is_list(Filters) ->
- true;
-is_key_filter(_) ->
- false.
-
%% @deprecated This function is only here to support
%% rolling upgrades and will be removed.
build_filter({Bucket, Exprs}) ->
View
3  src/riak_index.erl
@@ -57,9 +57,6 @@
mapred_index(Dest, Args) ->
mapred_index(Dest, Args, ?TIMEOUT).
-mapred_index(FlowPid, [_Bucket, _Query], _Timeout)
- when is_pid(FlowPid) ->
- throw({not_supported, mapred_index, FlowPid});
mapred_index(_Pipe, [Bucket, Query], Timeout) ->
{ok, C} = riak:local_client(),
{ok, ReqId} = C:stream_get_index(Bucket, Query, Timeout),
View
1  src/riak_kv.app.src
@@ -11,7 +11,6 @@
crypto,
riak_api,
riak_core,
- luke,
erlang_js,
mochiweb,
webmachine,
View
45 src/riak_kv_buckets_fsm.erl
@@ -36,7 +36,6 @@
-type req_id() :: non_neg_integer().
-record(state, {buckets=sets:new() :: [term()],
- client_type :: plain | mapred,
from :: from()}).
-include("riak_kv_dtrace.hrl").
@@ -45,26 +44,20 @@
%% the number of primary preflist vnodes the operation
%% should cover, the service to use to check for available nodes,
%% and the registered name to use to access the vnode master process.
-init(From={_, _, ClientPid}, [ItemFilter, Timeout, ClientType]) ->
+init(From={_, _, ClientPid}, [ItemFilter, Timeout]) ->
ClientNode = atom_to_list(node(ClientPid)),
PidStr = pid_to_list(ClientPid),
FilterX = if ItemFilter == none -> 0;
true -> 1
end,
- case ClientType of
- %% Link to the mapred job so we die if the job dies
- mapred ->
- ?DTRACE(?C_BUCKETS_INIT, [1, FilterX],
- [<<"mapred">>, ClientNode, PidStr]),
- link(ClientPid);
- _ ->
- ?DTRACE(?C_BUCKETS_INIT, [2, FilterX],
- [<<"other">>, ClientNode, PidStr])
- end,
+ %% "other" is a legacy term from when MapReduce used this FSM (in
+ %% which case, the string "mapred" would appear
+ ?DTRACE(?C_BUCKETS_INIT, [2, FilterX],
+ [<<"other">>, ClientNode, PidStr]),
%% Construct the bucket listing request
Req = ?KV_LISTBUCKETS_REQ{item_filter=ItemFilter},
{Req, allup, 1, 1, riak_kv, riak_kv_vnode_master, Timeout,
- #state{client_type=ClientType, from=From}}.
+ #state{from=From}}.
process_results(done, StateData) ->
{done, StateData};
@@ -78,31 +71,15 @@ process_results({error, Reason}, _State) ->
{error, Reason}.
finish({error, Error},
- StateData=#state{client_type=ClientType,
- from={raw, ReqId, ClientPid}}) ->
+ StateData=#state{from={raw, ReqId, ClientPid}}) ->
?DTRACE(?C_BUCKETS_FINISH, [-1], []),
- case ClientType of
- mapred ->
- %% An error occurred or the timeout interval elapsed
- %% so all we can do now is die so that the rest of the
- %% MapReduce processes will also die and be cleaned up.
- exit(Error);
- plain ->
- %% Notify the requesting client that an error
- %% occurred or the timeout has elapsed.
- ClientPid ! {ReqId, Error}
- end,
+ %% Notify the requesting client that an error
+ %% occurred or the timeout has elapsed.
+ ClientPid ! {ReqId, Error},
{stop, normal, StateData};
finish(clean,
StateData=#state{buckets=Buckets,
- client_type=ClientType,
from={raw, ReqId, ClientPid}}) ->
- case ClientType of
- mapred ->
- luke_flow:add_inputs(Buckets),
- luke_flow:finish_inputs(ClientPid);
- plain ->
- ClientPid ! {ReqId, {buckets, sets:to_list(Buckets)}}
- end,
+ ClientPid ! {ReqId, {buckets, sets:to_list(Buckets)}},
?DTRACE(?C_BUCKETS_FINISH, [0], []),
{stop, normal, StateData}.
View
20 src/riak_kv_encoding_migrate.erl
@@ -62,7 +62,6 @@
%% Check if the cluster contains encoded values that need to be migrated
check_cluster() ->
{ok, RC} = riak:local_client(),
- riak_kv_mapred_cache:clear(),
{ok, Buckets} = RC:list_buckets(),
case Buckets of
[] ->
@@ -70,16 +69,16 @@ check_cluster() ->
{empty, [], []};
_ ->
EObjs = get_encoded_keys(RC),
- check_cluster2(RC, EObjs)
+ check_cluster2(EObjs)
end.
-check_cluster2(_, []) ->
+check_cluster2([]) ->
io:format("Cluster does not contain URL encoded values. "
"No migration needed.~n", []),
{not_needed, [], []};
-check_cluster2(RC, EObjs) ->
- case {check_safe(RC, EObjs), check_double_encoding(EObjs)} of
+check_cluster2(EObjs) ->
+ case {check_safe(EObjs), check_double_encoding(EObjs)} of
{{safe, _}, {false, _}} ->
io:format("Cluster contains URL encoded values. "
"Migration needed.~n", []),
@@ -112,7 +111,8 @@ get_encoded_keys() ->
get_encoded_keys(RC) ->
{ok, Buckets} = RC:list_buckets(),
EObjs = [begin
- {ok, Objs} = RC:mapred(Bucket, [reduce_check_encoded()]),
+ {ok, Objs} = riak_kv_mrc_pipe:mapred(
+ Bucket, [reduce_check_encoded()]),
Objs
end || Bucket <- Buckets],
lists:flatten(EObjs).
@@ -146,9 +146,9 @@ check_double_encoding(EObjs) ->
end.
%% Determine if it is safe to perform migration (no bucket/key conflicts).
-check_safe(RC, EObjs) ->
+check_safe(EObjs) ->
EObjs2 = [decode_name(Name) || Name <- EObjs],
- MR = RC:mapred(EObjs2, [map_unsafe()]),
+ MR = riak_kv_mrc_pipe:mapred(EObjs2, [map_unsafe()]),
case MR of
{ok, []} ->
{safe, []};
@@ -167,9 +167,7 @@ map_unsafe(RO, _, _) ->
%% Perform first phase of migration: copying encoded values to
%% unencoded equivalents.
migrate_objects(EObjs) ->
- {ok, RC} = riak:local_client(),
- riak_kv_mapred_cache:clear(),
- MR = RC:mapred(EObjs, [map_rewrite_encoded()]),
+ MR = riak_kv_mrc_pipe:mapred(EObjs, [map_rewrite_encoded()]),
case MR of
{ok, []} ->
io:format("All objects with URL encoded buckets/keys have been "
View
61 src/riak_kv_index_fsm.erl
@@ -48,8 +48,7 @@
-type from() :: {atom(), req_id(), pid()}.
-type req_id() :: non_neg_integer().
--record(state, {client_type :: plain | mapred,
- from :: from()}).
+-record(state, {from :: from()}).
%% @doc Returns `true' if the new ack-based backpressure index
%% protocol should be used. This decision is based on the
@@ -77,73 +76,43 @@ req(Bucket, ItemFilter, Query) ->
%% the number of primary preflist vnodes the operation
%% should cover, the service to use to check for available nodes,
%% and the registered name to use to access the vnode master process.
-init(From={_, _, ClientPid}, [Bucket, ItemFilter, Query, Timeout, ClientType]) ->
- case ClientType of
- %% Link to the mapred job so we die if the job dies
- mapred ->
- link(ClientPid);
- _ ->
- ok
- end,
+init(From={_, _, _}, [Bucket, ItemFilter, Query, Timeout]) ->
%% Get the bucket n_val for use in creating a coverage plan
BucketProps = riak_core_bucket:get_bucket(Bucket),
NVal = proplists:get_value(n_val, BucketProps),
%% Construct the key listing request
Req = req(Bucket, ItemFilter, Query),
{Req, all, NVal, 1, riak_kv, riak_kv_vnode_master, Timeout,
- #state{client_type=ClientType, from=From}}.
+ #state{from=From}}.
process_results({error, Reason}, _State) ->
{error, Reason};
process_results({From, Bucket, Results},
- StateData=#state{client_type=ClientType,
- from={raw, ReqId, ClientPid}}) ->
- process_query_results(ClientType, Bucket, Results, ReqId, ClientPid),
+ StateData=#state{from={raw, ReqId, ClientPid}}) ->
+ process_query_results(Bucket, Results, ReqId, ClientPid),
riak_kv_vnode:ack_keys(From), % tell that vnode we're ready for more
{ok, StateData};
process_results({Bucket, Results},
- StateData=#state{client_type=ClientType,
- from={raw, ReqId, ClientPid}}) ->
- process_query_results(ClientType, Bucket, Results, ReqId, ClientPid),
+ StateData=#state{from={raw, ReqId, ClientPid}}) ->
+ process_query_results(Bucket, Results, ReqId, ClientPid),
{ok, StateData};
process_results(done, StateData) ->
{done, StateData}.
finish({error, Error},
- StateData=#state{from={raw, ReqId, ClientPid},
- client_type=ClientType}) ->
- case ClientType of
- mapred ->
- %% An error occurred or the timeout interval elapsed
- %% so all we can do now is die so that the rest of the
- %% MapReduce processes will also die and be cleaned up.
- exit(Error);
- plain ->
- %% Notify the requesting client that an error
- %% occurred or the timeout has elapsed.
- ClientPid ! {ReqId, {error, Error}}
- end,
+ StateData=#state{from={raw, ReqId, ClientPid}}) ->
+ %% Notify the requesting client that an error
+ %% occurred or the timeout has elapsed.
+ ClientPid ! {ReqId, {error, Error}},
{stop, normal, StateData};
finish(clean,
- StateData=#state{from={raw, ReqId, ClientPid},
- client_type=ClientType}) ->
- case ClientType of
- mapred ->
- luke_flow:finish_inputs(ClientPid);
- plain ->
- ClientPid ! {ReqId, done}
- end,
+ StateData=#state{from={raw, ReqId, ClientPid}}) ->
+ ClientPid ! {ReqId, done},
{stop, normal, StateData}.
%% ===================================================================
%% Internal functions
%% ===================================================================
-process_query_results(plain, _Bucket, Results, ReqId, ClientPid) ->
- ClientPid ! {ReqId, {results, Results}};
-process_query_results(mapred, Bucket, Results, _ReqId, ClientPid) ->
- try
- luke_flow:add_inputs(ClientPid, [{Bucket, Result} || Result <- Results])
- catch _:_ ->
- exit(self(), normal)
- end.
+process_query_results(_Bucket, Results, ReqId, ClientPid) ->
+ ClientPid ! {ReqId, {results, Results}}.
View
5 src/riak_kv_js_manager.erl
@@ -121,11 +121,6 @@ handle_call({mark_idle, VM}, _From, #state{master=Master,
handle_call(reload_vms, _From, #state{master=Master, idle=Idle}=State) ->
reload_idle_vms(Idle),
mark_pending_reloads(Master, Idle),
- if State#state.name == ?JSPOOL_MAP ->
- riak_kv_mapred_cache:clear();
- true ->
- ok
- end,
{reply, ok, State};
handle_call({reserve_batch_vm, Owner}, _From, State) ->
View
66 src/riak_kv_keys_fsm.erl
@@ -49,8 +49,7 @@
-type from() :: {atom(), req_id(), pid()}.
-type req_id() :: non_neg_integer().
--record(state, {client_type :: plain | mapred,
- from :: from()}).
+-record(state, {from :: from()}).
-include("riak_kv_dtrace.hrl").
@@ -78,44 +77,36 @@ req(Bucket, ItemFilter) ->
%% the number of primary preflist vnodes the operation
%% should cover, the service to use to check for available nodes,
%% and the registered name to use to access the vnode master process.
-init(From={_, _, ClientPid}, [Bucket, ItemFilter, Timeout, ClientType]) ->
+init(From={_, _, ClientPid}, [Bucket, ItemFilter, Timeout]) ->
riak_core_dtrace:put_tag(io_lib:format("~p", [Bucket])),
ClientNode = atom_to_list(node(ClientPid)),
PidStr = pid_to_list(ClientPid),
FilterX = if ItemFilter == none -> 0;
true -> 1
end,
- case ClientType of
- %% Link to the mapred job so we die if the job dies
- mapred ->
- ?DTRACE(?C_KEYS_INIT, [1, FilterX],
- [<<"mapred">>, ClientNode, PidStr]),
- link(ClientPid);
- _ ->
- ?DTRACE(?C_KEYS_INIT, [2, FilterX],
- [<<"other">>, ClientNode, PidStr])
- end,
+ %% "other" is a legacy term from when MapReduce used this FSM (in
+ %% which case, the string "mapred" would appear
+ ?DTRACE(?C_KEYS_INIT, [2, FilterX],
+ [<<"other">>, ClientNode, PidStr]),
%% Get the bucket n_val for use in creating a coverage plan
BucketProps = riak_core_bucket:get_bucket(Bucket),
NVal = proplists:get_value(n_val, BucketProps),
%% Construct the key listing request
Req = req(Bucket, ItemFilter),
{Req, all, NVal, 1, riak_kv, riak_kv_vnode_master, Timeout,
- #state{client_type=ClientType, from=From}}.
+ #state{from=From}}.
process_results({From, Bucket, Keys},
- StateData=#state{client_type=ClientType,
- from={raw, ReqId, ClientPid}}) ->
+ StateData=#state{from={raw, ReqId, ClientPid}}) ->
%% TODO: have caller give us the Idx number.
?DTRACE(?C_KEYS_PROCESS_RESULTS, [length(Keys)], []),
- process_keys(ClientType, Bucket, Keys, ReqId, ClientPid),
+ process_keys(Bucket, Keys, ReqId, ClientPid),
riak_kv_vnode:ack_keys(From), % tell that vnode we're ready for more
{ok, StateData};
process_results({Bucket, Keys},
- StateData=#state{client_type=ClientType,
- from={raw, ReqId, ClientPid}}) ->
+ StateData=#state{from={raw, ReqId, ClientPid}}) ->
?DTRACE(?C_KEYS_PROCESS_RESULTS, [length(Keys)], []),
- process_keys(ClientType, Bucket, Keys, ReqId, ClientPid),
+ process_keys(Bucket, Keys, ReqId, ClientPid),
{ok, StateData};
process_results(done, StateData) ->
{done, StateData};
@@ -124,30 +115,15 @@ process_results({error, Reason}, _State) ->
{error, Reason}.
finish({error, Error},
- StateData=#state{from={raw, ReqId, ClientPid},
- client_type=ClientType}) ->
+ StateData=#state{from={raw, ReqId, ClientPid}}) ->
?DTRACE(?C_KEYS_FINISH, [-1], []),
- case ClientType of
- mapred ->
- %% An error occurred or the timeout interval elapsed
- %% so all we can do now is die so that the rest of the
- %% MapReduce processes will also die and be cleaned up.
- exit(Error);
- plain ->
- %% Notify the requesting client that an error
- %% occurred or the timeout has elapsed.
- ClientPid ! {ReqId, Error}
- end,
+ %% Notify the requesting client that an error
+ %% occurred or the timeout has elapsed.
+ ClientPid ! {ReqId, Error},
{stop, normal, StateData};
finish(clean,
- StateData=#state{from={raw, ReqId, ClientPid},
- client_type=ClientType}) ->
- case ClientType of
- mapred ->
- luke_flow:finish_inputs(ClientPid);
- plain ->
- ClientPid ! {ReqId, done}
- end,
+ StateData=#state{from={raw, ReqId, ClientPid}}) ->
+ ClientPid ! {ReqId, done},
?DTRACE(?C_KEYS_FINISH, [0], []),
{stop, normal, StateData}.
@@ -163,7 +139,7 @@ finish(clean,
ack_keys({Pid, Ref}) ->
Pid ! {Ref, ok}.
-process_keys(plain, _Bucket, Keys, ReqId, ClientPid) ->
+process_keys(_Bucket, Keys, ReqId, ClientPid) ->
case use_ack_backpressure() of
true ->
Monitor = erlang:monitor(process, ClientPid),
@@ -176,10 +152,4 @@ process_keys(plain, _Bucket, Keys, ReqId, ClientPid) ->
end;
false ->
ClientPid ! {ReqId, {keys, Keys}}
- end;
-process_keys(mapred, Bucket, Keys, _ReqId, ClientPid) ->
- try
- luke_flow:add_inputs(ClientPid, [{Bucket, Key} || Key <- Keys])
- catch _:_ ->
- exit(self(), normal)
end.
View
74 src/riak_kv_keys_fsm_legacy.erl
@@ -25,13 +25,12 @@
-module(riak_kv_keys_fsm_legacy).
-behaviour(gen_fsm).
-include_lib("riak_kv_vnode.hrl").
--export([start_link/6]).
+-export([start_link/5]).
-export([init/1, handle_event/3, handle_sync_event/4,
handle_info/3, terminate/3, code_change/4]).
-export([initialize/2,waiting_kl/2]).
-record(state, {client :: pid(),
- client_type :: atom(),
bloom :: term(),
pls :: [list()],
wait_pls :: [term()],
@@ -45,12 +44,12 @@
listers :: [{atom(), pid()}]
}).
-start_link(ReqId,Bucket,Timeout,ClientType,ErrorTolerance,From) ->
+start_link(ReqId,Bucket,Timeout,ErrorTolerance,From) ->
gen_fsm:start_link(?MODULE,
- [ReqId,Bucket,Timeout,ClientType,ErrorTolerance,From], []).
+ [ReqId,Bucket,Timeout,ErrorTolerance,From], []).
%% @private
-init([ReqId,Input,Timeout,ClientType,ErrorTolerance,Client]) ->
+init([ReqId,Input,Timeout,ErrorTolerance,Client]) ->
process_flag(trap_exit, true),
{ok, Ring} = riak_core_ring_manager:get_my_ring(),
{ok, Bloom} = ebloom:new(10000000,ErrorTolerance,ReqId),
@@ -60,15 +59,8 @@ init([ReqId,Input,Timeout,ClientType,ErrorTolerance,Client]) ->
_ ->
Input
end,
- StateData = #state{client=Client, client_type=ClientType, timeout=Timeout,
+ StateData = #state{client=Client, timeout=Timeout,
bloom=Bloom, req_id=ReqId, input=Input, bucket=Bucket, ring=Ring},
- case ClientType of
- %% Link to the mapred job so we die if the job dies
- mapred ->
- link(Client);
- _ ->
- ok
- end,
{ok,initialize,StateData,0}.
%% @private
@@ -98,9 +90,8 @@ initialize(timeout, StateData0=#state{input=Input, bucket=Bucket, ring=Ring, req
waiting_kl({ReqId, {kl, _Idx, Keys}},
StateData=#state{bloom=Bloom,
- req_id=ReqId,client=Client,timeout=Timeout,
- bucket=Bucket,client_type=ClientType}) ->
- process_keys(Keys,Bucket,ClientType,Bloom,ReqId,Client),
+ req_id=ReqId,client=Client,timeout=Timeout}) ->
+ process_keys(Keys,Bloom,ReqId,Client),
{next_state, waiting_kl, StateData, Timeout};
waiting_kl({ReqId, Idx, done}, StateData0=#state{wait_pls=WPL0,vns=VNS0,pls=PLS,
@@ -127,27 +118,14 @@ waiting_kl(timeout, StateData=#state{pls=PLS,wait_pls=WPL}) ->
NewPLS = lists:append(PLS, [W_PL || {_W_Idx,_W_Node,W_PL} <- WPL]),
reduce_pls(StateData#state{pls=NewPLS,wait_pls=[]}).
-finish(StateData=#state{req_id=ReqId,client=Client,client_type=ClientType, listers=[]}) ->
- case ClientType of
- mapred ->
- %% No nodes are available for key listing so all
- %% we can do now is die so that the rest of the
- %% MapReduce processes will also die and be cleaned up.
- exit(all_nodes_unavailable);
- plain ->
- %%Notify the requesting client that the key
- %% listing is complete or that no nodes are
- %% available to fulfil the request.
- Client ! {ReqId, all_nodes_unavailable}
- end,
+finish(StateData=#state{req_id=ReqId,client=Client,listers=[]}) ->
+ %%Notify the requesting client that the key
+ %% listing is complete or that no nodes are
+ %% available to fulfil the request.
+ Client ! {ReqId, all_nodes_unavailable},
{stop,normal,StateData};
-finish(StateData=#state{req_id=ReqId,client=Client,client_type=ClientType}) ->
- case ClientType of
- mapred ->
- luke_flow:finish_inputs(Client);
- plain ->
- Client ! {ReqId, done}
- end,
+finish(StateData=#state{req_id=ReqId,client=Client}) ->
+ Client ! {ReqId, done},
{stop,normal,StateData}.
reduce_pls(StateData0=#state{timeout=Timeout, wait_pls=WPL,
@@ -215,29 +193,19 @@ check_pl(PL,VNS,WPL) ->
end.
%% @private
-process_keys(Keys,Bucket,ClientType,Bloom,ReqId,Client) ->
- process_keys(Keys,Bucket,ClientType,Bloom,ReqId,Client,[]).
+process_keys(Keys,Bloom,ReqId,Client) ->
+ process_keys(Keys,Bloom,ReqId,Client,[]).
%% @private
-process_keys([],Bucket,ClientType,_Bloom,ReqId,Client,Acc) ->
- case ClientType of
- mapred ->
- try
- luke_flow:add_inputs(Client, [{Bucket,K} || K <- Acc])
- catch _:_ ->
- exit(self(), normal)
- end;
- plain -> Client ! {ReqId, {keys, Acc}}
- end,
+process_keys([],_Bloom,ReqId,Client,Acc) ->
+ Client ! {ReqId, {keys, Acc}},
ok;
-process_keys([K|Rest],Bucket,ClientType,Bloom,ReqId,Client,Acc) ->
+process_keys([K|Rest],Bloom,ReqId,Client,Acc) ->
case ebloom:contains(Bloom,K) of
true ->
- process_keys(Rest,Bucket,ClientType,
- Bloom,ReqId,Client,Acc);
+ process_keys(Rest,Bloom,ReqId,Client,Acc);
false ->
ebloom:insert(Bloom,K),
- process_keys(Rest,Bucket,ClientType,
- Bloom,ReqId,Client,[K|Acc])
+ process_keys(Rest,Bloom,ReqId,Client,[K|Acc])
end.
%% @private
View
352 src/riak_kv_lru.erl
@@ -1,352 +0,0 @@
-%% -------------------------------------------------------------------
-%%
-%% riak_kv_lru: ETS-based LRU cache
-%%
-%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved.
-%%
-%% This file is provided to you under the Apache License,
-%% Version 2.0 (the "License"); you may not use this file
-%% except in compliance with the License. You may obtain
-%% a copy of the License at
-%%
-%% http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing,
-%% software distributed under the License is distributed on an
-%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-%% KIND, either express or implied. See the License for the
-%% specific language governing permissions and limitations
-%% under the License.
-%%
-%% -------------------------------------------------------------------
--module(riak_kv_lru).
-
--ifdef(TEST).
--include_lib("eunit/include/eunit.hrl").
--endif.
-
--export([new/1,
- put/4,
- remove/3,
- fetch/3,
- size/1,
- max_size/1,
- clear/1,
- clear_bkey/2,
- destroy/1,
- table_sizes/1]).
--export([init/1,
- handle_call/3,
- handle_cast/2,
- handle_info/2,
- terminate/2]).
-
--record(kv_lru, {max_size,
- bucket_idx,
- age_idx,
- cache}).
-
--record(kv_lru_entry, {key,
- value,
- ts}).
-
-new(0) ->
- nocache;
-new(Size) ->
- {ok, Pid} = gen_server:start_link(?MODULE, [Size], []),
- Pid.
-
-put(nocache, _BKey, _Key, _Value) ->
- ok;
-put(Pid, BKey, Key, Value) ->
- gen_server:cast(Pid, {put, BKey, Key, Value}).
-
-fetch(nocache, _BKey, _Key) ->
- notfound;
-fetch(Pid, BKey, Key) ->
- gen_server:call(Pid, {fetch, BKey, Key}).
-
-remove(nocache, _BKey, _Key) ->
- ok;
-remove(Pid, BKey, Key) ->
- gen_server:cast(Pid, {remove, BKey, Key}).
-
-size(nocache) ->
- 0;
-size(Pid) ->
- gen_server:call(Pid, size).
-
-max_size(nocache) ->
- 0;
-max_size(Pid) ->
- gen_server:call(Pid, max_size).
-
-clear(nocache) ->
- ok;
-clear(Pid) ->
- gen_server:cast(Pid, clear).
-
-clear_bkey(nocache, _BKey) ->
- ok;
-clear_bkey(Pid, BKey) ->
- gen_server:cast(Pid, {clear_bkey, BKey}).
-
-destroy(nocache) ->
- ok;
-destroy(Pid) ->
- gen_server:call(Pid, destroy).
-
-%% for test usage
-table_sizes(Pid) ->
- gen_server:call(Pid, table_sizes).
-
-init([Size]) ->
- IdxName = pid_to_list(self()) ++ "_cache_age_idx",
- BucketIdxName = pid_to_list(self()) ++ "_bucket_idx",
- CacheName = pid_to_list(self()) ++ "_cache",
- Idx = ets:new(list_to_atom(IdxName), [ordered_set, private]),
- BucketIdx = ets:new(list_to_atom(BucketIdxName), [bag, private]),
- Cache = ets:new(list_to_atom(CacheName), [private, {keypos, 2}]),
- {ok, #kv_lru{max_size=Size, age_idx=Idx, bucket_idx=BucketIdx, cache=Cache}}.
-
-handle_call({fetch, BKey, Key}, _From, State) ->
- Reply = fetch_internal(State, BKey, Key),
- {reply, Reply, State};
-handle_call(size, _From, State) ->
- Reply = size_internal(State),
- {reply, Reply, State};
-handle_call(max_size, _From, State) ->
- Reply = max_size_internal(State),
- {reply, Reply, State};
-handle_call(destroy, _From, State) ->
- {Reply, NewState} = destroy_internal(State),
- {stop, normal, Reply, NewState};
-handle_call(table_sizes, _From, State) ->
- Reply = table_sizes_internal(State),
- {reply, Reply, State};
-handle_call(_Request, _From, State) ->
- {reply, ok, State}.
-
-handle_cast({put, BKey, Key, Value}, State) ->
- put_internal(State, BKey, Key, Value),
- {noreply, State};
-handle_cast({remove, BKey, Key}, State) ->
- remove_internal(State, BKey, Key),
- {noreply, State};
-handle_cast(clear, State) ->
- clear_internal(State),
- {noreply, State};
-handle_cast({clear_bkey, BKey}, State) ->
- clear_bkey_internal(State, BKey),
- {noreply, State};
-handle_cast(_Request, State) ->
- {noreply, State}.
-
-handle_info(_Info, State) ->
- {noreply, State}.
-
-terminate(_Reason, State) ->
- destroy_internal(State),
- ok.
-
-put_internal(#kv_lru{max_size=MaxSize, age_idx=Idx,
- bucket_idx=BucketIdx, cache=Cache},
- BKey, Key, Value) ->
- remove_existing(Idx, BucketIdx, Cache, BKey, Key),
- insert_value(Idx, BucketIdx, Cache, BKey, Key, Value),
- prune_oldest_if_needed(MaxSize, Idx, BucketIdx, Cache).
-
-fetch_internal(#kv_lru{cache=Cache}=LRU, BKey, Key) ->
- case fetch_value(Cache, BKey, Key) of
- notfound ->
- notfound;
- Value ->
- %% Do a put to update the timestamp in the cache
- put_internal(LRU, BKey, Key, Value),
- Value
- end.
-
-remove_internal(#kv_lru{age_idx=Idx, bucket_idx=BucketIdx, cache=Cache},
- BKey, Key) ->
- remove_existing(Idx, BucketIdx, Cache, BKey, Key),
- ok.
-
-size_internal(#kv_lru{age_idx=Idx}) ->
- ets:info(Idx, size).
-
-max_size_internal(#kv_lru{max_size=MaxSize}) ->
- MaxSize.
-
-clear_internal(#kv_lru{age_idx=Idx, cache=Cache}) ->
- ets:delete_all_objects(Idx),
- ets:delete_all_objects(Cache),
- ok.
-
-clear_bkey_internal(#kv_lru{bucket_idx=BucketIdx}=LRU, BKey) ->
- case ets:lookup(BucketIdx, BKey) of
- [] ->
- ok;
- BK_Ks ->
- [remove_internal(LRU, BKey, Key) || {_BKey, Key} <- BK_Ks],
- ok
- end.
-
-destroy_internal(#kv_lru{age_idx=undefined, bucket_idx=undefined, cache=undefined}=State) ->
- {ok, State};
-destroy_internal(#kv_lru{age_idx=Idx, bucket_idx=BucketIdx, cache=Cache}) ->
- ets:delete(Idx),
- ets:delete(BucketIdx),
- ets:delete(Cache),
- {ok, #kv_lru{age_idx=undefined, bucket_idx=undefined, cache=undefined}}.
-
-table_sizes_internal(#kv_lru{age_idx=Idx, bucket_idx=BucketIdx, cache=Cache}) ->
- [{age_idx, ets:info(Idx, size)},
- {bucket_idx, ets:info(BucketIdx, size)},
- {cache, ets:info(Cache, size)}].
-
-%% Internal functions
-remove_existing(Idx, BucketIdx, Cache, BKey, Key) ->
- CacheKey = {BKey, Key},
- case ets:lookup(Cache, CacheKey) of
- [Entry] ->
- ets:delete(Idx, Entry#kv_lru_entry.ts),
- ets:delete_object(BucketIdx, CacheKey),
- ets:delete(Cache, CacheKey),
- ok;
- [] ->
- ok
- end.
-
-insert_value(Idx, BucketIdx, Cache, BKey, Key, Value) ->
- CacheKey = {BKey, Key},
- TS = erlang:now(),
- Entry = #kv_lru_entry{key=CacheKey, value=Value, ts=TS},
- ets:insert_new(Cache, Entry),
- ets:insert_new(Idx, {TS, CacheKey}),
- ets:insert(BucketIdx, CacheKey).
-
-prune_oldest_if_needed(MaxSize, Idx, BucketIdx, Cache) ->
- OverSize = MaxSize + 1,
- case ets:info(Idx, size) of
- OverSize ->
- TS = ets:first(Idx),
- [{TS, {BKey, Key}}] = ets:lookup(Idx, TS),
- remove_existing(Idx, BucketIdx, Cache, BKey, Key),
- ok;
- _ ->
- ok
- end.
-
-fetch_value(Cache, BKey, Key) ->
- CacheKey = {BKey, Key},
- case ets:lookup(Cache, CacheKey) of
- [] ->
- notfound;
- [Entry] ->
- Entry#kv_lru_entry.value
- end.
-
--ifdef(TEST).
-put_fetch_test() ->
- BKey = {<<"test">>, <<"foo">>},
- C = riak_kv_lru:new(5),
- riak_kv_lru:put(C, BKey, <<"hello">>, <<"world">>),
- <<"world">> = riak_kv_lru:fetch(C, BKey, <<"hello">>),
- riak_kv_lru:destroy(C).
-
-delete_test() ->
- BKey = {<<"test">>, <<"foo">>},
- C = riak_kv_lru:new(5),
- riak_kv_lru:put(C, BKey, "hello", "world"),
- riak_kv_lru:remove(C, BKey, "hello"),
- notfound = riak_kv_lru:fetch(C, BKey, "hello"),
- riak_kv_lru:destroy(C).
-
-size_test() ->
- BKey = {<<"test">>, <<"foo">>},
- C = riak_kv_lru:new(5),
- [riak_kv_lru:put(C, BKey, X, X) || X <- lists:seq(1, 6)],
- notfound = riak_kv_lru:fetch(C, BKey, 1),
- 5 = riak_kv_lru:size(C),
- 5 = riak_kv_lru:max_size(C),
- 2 = riak_kv_lru:fetch(C, BKey, 2),
- 6 = riak_kv_lru:fetch(C, BKey, 6),
- riak_kv_lru:destroy(C).
-
-age_test() ->
- BKey = {<<"test">>, <<"foo">>},
- C = riak_kv_lru:new(3),
- [riak_kv_lru:put(C, BKey, X, X) || X <- lists:seq(1, 3)],
- timer:sleep(500),
- 2 = riak_kv_lru:fetch(C, BKey, 2),
- riak_kv_lru:put(C, BKey, 4, 4),
- 2 = riak_kv_lru:fetch(C, BKey, 2),
- 4 = riak_kv_lru:fetch(C, BKey, 4),
- notfound = riak_kv_lru:fetch(C, BKey, 1),
- riak_kv_lru:destroy(C).
-
-clear_bkey_test() ->
- BKey1 = {<<"test">>, <<"foo">>},
- BKey2 = {<<"test">>, <<"bar">>},
- C = riak_kv_lru:new(10),
- F = fun(X) ->
- riak_kv_lru:put(C, BKey1, X, X),
- riak_kv_lru:put(C, BKey2, X, X) end,
- [F(X) || X <- lists:seq(1, 5)],
- riak_kv_lru:clear_bkey(C, BKey2),
- notfound = riak_kv_lru:fetch(C, BKey2, 3),
- 3 = riak_kv_lru:fetch(C, BKey1, 3),
- riak_kv_lru:destroy(C).
-
-zero_size_test() ->
- BKey = {<<"test">>, <<"foo">>},
- C = riak_kv_lru:new(0),
- ok = riak_kv_lru:put(C, BKey, 1, 1),
- notfound = riak_kv_lru:fetch(C, BKey, 1),
- 0 = riak_kv_lru:size(C),
- riak_kv_lru:destroy(C).
-
-consistency_test() ->
- BKey = {<<"test">>, <<"foo">>},
- C = riak_kv_lru:new(3),
- F = fun(X) ->
- riak_kv_lru:put(C, BKey, X, X)
- end,
- [F(X) || X <- lists:seq(1,10)],
- consistency_check(C).
-
-%% Make sure that riak_kv_lru is correct under concurrent modification
-%% by spawning 10 processes that each do 1000 puts on the same LRU, then
-%% checking that the size limit of the cache has been respected
-%% (added to check https://issues.basho.com/show_bug.cgi?id=969)
-concurrency_test() ->
- Size = 10,
- C = riak_kv_lru:new(Size),
- Pids = [ spawn_link(concurrent_incrementer(C, K, self()))
- || K <- lists:seq(10000, 10010) ],
- wait_for_incrementers(Pids),
- consistency_check(C),
- ?assertEqual(Size, riak_kv_lru:size(C)).
-
-concurrent_incrementer(C, K, Test) ->
- fun() ->
- [ riak_kv_lru:put(C, N+K, N+K, N+K)
- || N <- lists:seq(1, 1000) ],
- Test ! {increment_done, self()}
- end.
-
-wait_for_incrementers([]) -> ok;
-wait_for_incrementers(Pids) ->
- receive {increment_done, Pid} ->
- wait_for_incrementers(lists:delete(Pid, Pids))
- after 5000 ->
- throw(incrementer_timeout)
- end.
-
-consistency_check(LRU) ->
- Ts = table_sizes(LRU),
- %% make sure all tables report same size
- UniqueSizes = lists:usort([ Size || {_Name, Size} <- Ts]),
- ?assertEqual(1, length(UniqueSizes)).
-
--endif.
View
263 src/riak_kv_map_master.erl
@@ -1,263 +0,0 @@
-%% -------------------------------------------------------------------
-%%
-%% riak_map_master: spins up batched map tasks on behalf of map phases
-%%
-%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved.
-%%
-%% This file is provided to you under the Apache License,
-%% Version 2.0 (the "License"); you may not use this file
-%% except in compliance with the License. You may obtain
-%% a copy of the License at
-%%
-%% http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing,
-%% software distributed under the License is distributed on an
-%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-%% KIND, either express or implied. See the License for the
-%% specific language governing permissions and limitations
-%% under the License.
-%%
-%% -------------------------------------------------------------------
-
--module(riak_kv_map_master).
--include_lib("riak_kv_js_pools.hrl").
-
--behaviour(riak_core_gen_server).
-
-%% API
--export([start_link/0,
- queue_depth/0,
- new_mapper/4]).
-
-%% gen_server callbacks
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
-
--define(SERVER, ?MODULE).
-
--record(mapper, {vnode,
- qterm,
- inputs,
- phase}).
-
--record(state, {datadir,
- store,
- highest,
- next}).
-
-new_mapper({_, Node}=VNode, QTerm, MapInputs, PhasePid) ->
- riak_core_gen_server:pcall({?SERVER, Node}, 5,
- {new_mapper, VNode, QTerm, MapInputs, PhasePid},
- infinity).
-
-queue_depth() ->
- Nodes = [node()|nodes()],
- [{Node, riak_core_gen_server:pcall({?SERVER, Node}, 0, queue_depth,
- infinity)} || Node <- Nodes].
-
-
-start_link() ->
- riak_core_gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
-
-init([]) ->
- process_flag(trap_exit, true),
- DataDir = init_data_dir(),
- Store = bitcask:open(DataDir, [read_write]),
- {ok, NextCounter} = file:open(filename:join(DataDir, "_next_"), [read, write, raw, binary]),
- {ok, HighestCounter} = file:open(filename:join(DataDir, "_highest_"), [read, write, raw, binary]),
- State = #state{datadir=DataDir, store=Store, highest=HighestCounter,
- next=NextCounter},
- reset_counters(State),
- timer:send_interval(60000, merge_storage),
- {ok, State}.
-
-handle_call({new_mapper, VNode, {erlang, _}=QTerm, MapInputs, PhasePid}, _From, State) ->
- Id = make_id(),
- case riak_kv_mapper_sup:new_mapper(VNode, Id, QTerm, MapInputs, PhasePid) of
- {ok, _Pid} ->
- {reply, {ok, Id}, State};
- {error, Reason} ->
- {reply, {error, Reason}, State}
- end;
-
-handle_call({new_mapper, VNode, {javascript, _}=QTerm, MapInputs, PhasePid}, _From, State) ->
- case riak_kv_js_manager:pool_size(?JSPOOL_MAP) > 0 of
- true ->
- Id = make_id(),
- case riak_kv_mapper_sup:new_mapper(VNode, Id, QTerm, MapInputs, PhasePid) of
- {ok, Pid} ->
- erlang:monitor(process, Pid),
- {reply, {ok, Id}, State};
- {error, Reason} ->
- {reply, {error, Reason}, State}
- end;
- false ->
- Id = defer_mapper(VNode, QTerm, MapInputs, PhasePid, State),
- {reply, {ok, {Id, node()}}, State}
- end;
-
-handle_call(queue_depth, _From, #state{highest=Highest, next=Next}=State) ->
- H = read_counter(Highest),
- N = read_counter(Next),
- Reply = H - N,
- {reply, Reply, State};
-
-handle_call(_Request, _From, State) ->
- {reply, ignore, State}.
-
-handle_cast(_Msg, State) ->
- {noreply, State}.
-
-%% Dequeue a deferred mapper when a mapper exits
-handle_info({'DOWN', _A, _B, _Mapper, _C}, State) ->
- dequeue_mapper(State),
- {noreply, State};
-
-handle_info(merge_storage, #state{store=Store, datadir=DataDir}=State) ->
- case bitcask:needs_merge(Store) of
- {true, Files} ->
- bitcask_merge_worker:merge(DataDir, [], Files);
- false ->
- ok
- end,
- {noreply, State};
-
-handle_info(_Info, State) ->
- {noreply, State}.
-
-terminate(_Reason, #state{store=Store, highest=Highest, next=Next}) ->
- file:close(Highest),
- file:close(Next),
- bitcask:close(Store).
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-%% Internal functions
-make_id() ->
- ID = erlang:phash2({self(), os:timestamp()}),
- {ID, node()}.
-
-dequeue_mapper(State) ->
- case are_mappers_waiting(State) of
- false ->
- ok;
- true ->
- Id = read(State#state.next),
- Mapper = read_entry(Id, State#state.store),
- case is_mapper_runnable(Mapper) of
- false ->
- incr(State#state.next),
- delete_entry(Id, State#state.store),
- dequeue_mapper(State);
- true ->
- #mapper{vnode=VNode, qterm=QTerm,
- inputs=MapInputs, phase=Phase} = Mapper,
- case riak_kv_js_manager:pool_size(?JSPOOL_MAP) > 0 of
- true ->
- {ok, Pid} = riak_kv_mapper_sup:new_mapper(VNode, {Id, node()}, QTerm,
- MapInputs, Phase),
- erlang:monitor(process, Pid),
- incr(State#state.next),
- delete_entry(Id, State#state.store),
- dequeue_mapper(State);
- false ->
- ok
- end
- end
- end.
-
-defer_mapper(VNode, QTerm, MapInputs, PhasePid, State) ->
- Mapper = #mapper{vnode=VNode, qterm=QTerm, inputs=MapInputs, phase=PhasePid},
- Id = read_incr(State#state.highest),
- write_entry(Id, Mapper, State#state.store).
-
-reset_counters(State) ->
- case are_mappers_waiting(State) of
- false ->
- file:pwrite(State#state.highest, 0, <<0:64>>),
- file:sync(State#state.highest),
- file:pwrite(State#state.next, 0, <<0:64>>),
- file:sync(State#state.next);
- true ->
- dequeue_mapper(State)
- end.
-
-read(CounterFile) ->
- Counter = read_counter(CounterFile),
- list_to_binary(integer_to_list(Counter)).
-
-incr(CounterFile) ->
- Counter = read_counter(CounterFile),
- NewCounter = Counter + 1,
- ok = file:pwrite(CounterFile, 0, <<NewCounter:64>>),
- file:sync(CounterFile).
-
-read_incr(CounterFile) ->
- Counter = read_counter(CounterFile),
- NewCounter = Counter + 1,
- ok = file:pwrite(CounterFile, 0, <<NewCounter:64>>),
- file:sync(CounterFile),
- list_to_binary(integer_to_list(Counter)).
-
-read_counter(Counter) ->
- case file:pread(Counter, 0, 8) of
- eof ->
- 0;
- {ok, Data} ->
- <<V:64/integer>> = Data,
- V;
- Error ->
- throw(Error)
- end.
-
-are_mappers_waiting(State) ->
- Highest = read_counter(State#state.highest),
- Next = read_counter(State#state.next),
- Next < Highest.
-
-is_mapper_runnable({error,_}) -> false;
-is_mapper_runnable(not_found) -> false;
-is_mapper_runnable(#mapper{phase=Phase}) ->
- Node = node(Phase),
- ClusterNodes = riak_core_node_watcher:nodes(riak_kv),
- lists:member(Node, ClusterNodes) andalso rpc:call(Node, erlang, is_process_alive,
- [Phase]).
-
-write_entry(Id, Mapper, Store) ->
- ok = bitcask:put(Store, Id, term_to_binary(Mapper, [compressed])),
- Id.
-
-read_entry(Id, Store) ->
- case bitcask:get(Store, Id) of
- {ok, D} -> binary_to_term(D);
- Err -> Err
- end.
-
-delete_entry(Id, Store) ->
- bitcask:delete(Store, Id).
-
-ensure_dir(Dir) ->
- filelib:ensure_dir(filename:join(Dir, ".empty")).
-
-init_data_dir() ->
- %% There are some upgrade situations where the mapred_queue_dir, is not
- %% specified and as such we'll wind up using the mr_queue dir,
- %% relative to platform_data_dir.
- %% We fallback to creating the mr_queue in /tmp.
- P_DataDir = app_helper:get_env(riak_core, platform_data_dir),
- DataDir0 = app_helper:get_env(riak_kv, mapred_queue_dir,
- filename:join(P_DataDir, "mr_queue")),
- case ensure_dir(DataDir0) of
- ok ->
- DataDir0;
- {error, Reason} ->
- TmpDir = "/tmp/mr_queue",
- lager:warning("Failed to create ~p for mapred_queue_dir "
- "defaulting to %s: ~p",
- [DataDir0, TmpDir, Reason]),
- ok = ensure_dir(TmpDir),
- TmpDir
- end.
-
View
301 src/riak_kv_map_phase.erl
@@ -1,301 +0,0 @@
-%% -------------------------------------------------------------------
-%%
-%% riak_map_phase: manage the mechanics of a map phase of a MR job
-%%
-%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved.
-%%
-%% This file is provided to you under the Apache License,
-%% Version 2.0 (the "License"); you may not use this file
-%% except in compliance with the License. You may obtain
-%% a copy of the License at
-%%
-%% http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing,
-%% software distributed under the License is distributed on an
-%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-%% KIND, either express or implied. See the License for the
-%% specific language governing permissions and limitations
-%% under the License.
-%%
-%% -------------------------------------------------------------------
-
--module(riak_kv_map_phase).
--author('Kevin Smith <kevin@basho.com>').
--author('John Muellerleile <johnm@basho.com>').
-
--include("riak_kv_map_phase.hrl").
-
--behaviour(luke_phase).
-
--export([init/1, handle_input/3, handle_input_done/1, handle_event/2,
- handle_sync_event/3, handle_info/2, handle_timeout/1, terminate/2]).
-
--record(state, {done=false, qterm, fsms=dict:new(), mapper_data=[], pending=[]}).
-
-init([QTerm]) ->
- process_flag(trap_exit, true),
- {ok, #state{qterm=QTerm}}.
-
-handle_input(Inputs0, #state{fsms=FSMs0, qterm=QTerm, mapper_data=MapperData}=State, _Timeout) ->
- {ok, Ring} = riak_core_ring_manager:get_my_ring(),
- Inputs1 = [build_input(I, Ring) || I <- Inputs0],
- case Inputs1 of
- [] ->
- {no_output, State};
- _ ->
- ClaimLists = riak_kv_mapred_planner:plan_map(Inputs1),
- case schedule_input(Inputs1, ClaimLists, QTerm, FSMs0, State) of
- {NewFSMs, _ClaimLists1, FsmKeys} ->
- MapperData1 = MapperData ++ FsmKeys,
- {no_output, State#state{fsms=NewFSMs, mapper_data=MapperData1}};
- {error, exhausted_preflist} ->
- {stop, {error, {no_candidate_nodes, exhausted_prefist, erlang:get_stacktrace(), MapperData}}, State}
- end
- end.
-
-handle_input_done(State) ->
- maybe_done(State#state{done=true}).
-
-handle_event({register_mapper, Id, MapperPid}, #state{mapper_data=MapperData}=State) ->
- MapperData0 = case lists:keyfind(Id, 1, MapperData) of
- {Id, MapperProps} -> lists:keyreplace(Id, 1, MapperData, {Id, [{pid, MapperPid}|MapperProps]});
- false -> MapperData
- end,
- MapperData1 = MapperData0 ++ [{MapperPid, Id}],
- {no_output, State#state{mapper_data=MapperData1}};
-
-handle_event({mapexec_reply, VNode, BKey, Reply, Executor}, #state{fsms=FSMs, mapper_data=MapperData,
- pending=Pending}=State) ->
- case dict:is_key(Executor, FSMs) of
- false ->
- %% node retry case will produce dictionary miss
- maybe_done(State);
- true ->
- case Reply of
- [{not_found, _, _}] ->
- handle_not_found_reply(VNode, BKey, Executor, State, Reply);
- [{error, notfound}] ->
- handle_not_found_reply(VNode, BKey, Executor, State, Reply);
- _ ->
- Pending1 = Pending ++ Reply,
- FSMs1 = update_counter(Executor, FSMs),
- MapperData1 = update_inputs(Executor, VNode, BKey, MapperData),
- maybe_done(State#state{fsms=FSMs1, mapper_data=MapperData1, pending=Pending1})
- end
-
- end;
-
-handle_event({mapexec_error, _Executor, Reply}, State) ->
- %{no_output, State};
- {stop, Reply, State#state{fsms=[]}};
-handle_event(_Event, State) ->
- {no_output, State}.
-
-
-handle_info({'EXIT', Pid, _Reason}, #state{mapper_data=MapperData, fsms=FSMs, qterm=QTerm}=State) ->
- case lists:keyfind(Pid, 1, MapperData) of
- {Pid, Id} ->
- case lists:keyfind(Id, 1, MapperData) of
- {Id, MapperProps} ->
- {keys, {VNode, Keys}} = lists:keyfind(keys, 1, MapperProps),
- case length(Keys) of
- 0 ->
- MapperData1 = lists:keydelete(Id, 1, lists:keydelete(Pid, 1, MapperData)),
- maybe_done(State#state{mapper_data=MapperData1});
- _C ->
- try
- {_Partition, BadNode} = VNode,
- NewKeys = prune_input_nodes(Keys, BadNode),
- ClaimLists = riak_kv_mapred_planner:plan_map(NewKeys),
- case schedule_input(NewKeys, ClaimLists, QTerm, FSMs, State) of
- {NewFSMs, _ClaimLists1, FsmKeys} ->
- MapperData1 = lists:keydelete(Id, 1, lists:keydelete(Pid, 1, MapperData ++ FsmKeys)),
- maybe_done(State#state{mapper_data=MapperData1, fsms=NewFSMs});
- {error, exhausted_preflist} ->
- MapperData1 = lists:keydelete(Id, 1, lists:keydelete(Pid, 1, MapperData)),
- maybe_done(State#state{mapper_data=MapperData1, fsms=FSMs})
- end
- catch
- _:Error ->
- {stop, {error, {no_candidate_nodes, Error, erlang:get_stacktrace(), MapperData}}, State}
- end
- end;
- false ->
- MapperData1 = lists:keydelete(Pid, 1, MapperData),
- maybe_done(State#state{mapper_data=MapperData1})
- end;
- false ->
- {stop, {error, {dead_mapper, erlang:get_stacktrace(), MapperData}}, State}
- end;
-
-handle_info(_Info, State) ->
- {no_output, State}.
-
-handle_sync_event(_Event, _From, State) ->
- {reply, ignored, State}.
-
-handle_timeout(State) ->
- {no_output, State}.
-
-terminate(_Reason, _State) ->
- _Reason.
-
-%% Internal functions
-
-schedule_input(Inputs1, ClaimLists, QTerm, FSMs0, State) ->
- try
- {FSMs1, FsmKeys} = start_mappers(ClaimLists, QTerm, FSMs0, []),
- {FSMs1, ClaimLists, FsmKeys}
- catch
- exit:{{nodedown, Node}, _} ->
- Inputs2 = prune_input_nodes(Inputs1, Node),
- try riak_kv_mapred_planner:plan_map(Inputs2) of
- ClaimLists2 ->
- schedule_input(Inputs2, ClaimLists2, QTerm, FSMs0, State)
- catch exit:exhausted_preflist ->
- {error, exhausted_preflist}
- end;
- Error ->
- throw(Error)
- end.
-
-prune_input_nodes(Inputs, BadNode) ->
- prune_input_nodes(Inputs, BadNode, []).
-prune_input_nodes([], _BadNode, NewInputs) ->
- NewInputs;
-prune_input_nodes([Input|T], BadNode, NewInputs) ->
- #riak_kv_map_input{preflist=Targets} = Input,
- Targets2 = lists:keydelete(BadNode, 2, Targets),
- prune_input_nodes(T, BadNode, [Input#riak_kv_map_input{preflist=Targets2}|NewInputs]).
-
-prune_input_partitions(Inputs, BadNode) ->
- prune_input_partitions(Inputs, BadNode, []).
-prune_input_partitions([], _BadNode, NewInputs) ->
- NewInputs;
-prune_input_partitions([Input|T], BadPartition, NewInputs) ->
- #riak_kv_map_input{preflist=Targets} = Input,
- Targets2 = lists:keydelete(BadPartition, 1, Targets),
- prune_input_partitions(T, BadPartition, [Input#riak_kv_map_input{preflist=Targets2}|NewInputs]).
-
-
-build_input(I, Ring) ->
- {{Bucket, Key}, KD} = convert_input(I),
- Props = riak_core_bucket:get_bucket(Bucket, Ring),
- {value, {_, NVal}} = lists:keysearch(n_val, 1, Props),
- Idx = riak_core_util:chash_key({Bucket, Key}),
- PL = riak_core_ring:preflist(Idx, Ring),
- {Targets, _} = lists:split(NVal, PL),
- #riak_kv_map_input{bkey={Bucket, Key},
- bprops=Props,
- kd=KD,
- preflist=Targets}.
-
-convert_input(I={{_B,_K},_D})
- when is_binary(_B) andalso (is_list(_K) orelse is_binary(_K)) -> I;
-convert_input(I={_B,_K})
- when is_binary(_B) andalso (is_list(_K) orelse is_binary(_K)) -> {I,undefined};
-convert_input([B,K]) when is_binary(B), is_binary(K) -> {{B,K},undefined};
-convert_input([B,K,D]) when is_binary(B), is_binary(K) -> {{B,K},D};
-convert_input({struct, [{<<"not_found">>,
- {struct, [{<<"bucket">>, Bucket},
- {<<"key">>, Key}]}}]}) ->
- {{Bucket, Key}, undefined};
-convert_input({not_found, {Bucket, Key}, KD}) ->
- {{Bucket, Key}, KD};
-convert_input(I) -> I.
-
-start_mappers([], _QTerm, Accum, FsmKeys) ->
- {Accum, FsmKeys};
-start_mappers([{Partition, Inputs}|T], QTerm, Accum, FsmKeys) ->
- case riak_kv_map_master:new_mapper(Partition, QTerm, Inputs, self()) of
- {ok, FSM} ->
- Accum1 = dict:store(FSM, length(Inputs), Accum),
- start_mappers(T, QTerm, Accum1, FsmKeys ++ [{FSM, [{keys, {Partition, Inputs}}]}]);
- Error ->
- throw(Error)
- end.
-
-update_counter(Executor, FSMs) ->
- case dict:find(Executor, FSMs) of
- {ok, 1} ->
- dict:erase(Executor, FSMs);
- {ok, _C} ->
- dict:update_counter(Executor, -1, FSMs)
- end.
-
-maybe_done(#state{done=Done, fsms=FSMs, mapper_data=MapperData, pending=Pending}=State) ->
- case Done =:= true andalso dict:size(FSMs) == 0 andalso MapperData == [] of
- true ->
- luke_phase:complete(),
- case Pending of
- [] ->
- {no_output, State};
- _ ->
- {output, Pending, State#state{pending=[]}}
- end;
- false ->
- BatchSize = app_helper:get_env(riak_kv, mapper_batch_size, 5),
- case length(Pending) >= BatchSize of
- true ->
- {output, Pending, State#state{pending=[]}};
- false ->
- {no_output, State}
- end
- end.
-
-update_inputs(Id, VNode, BKey, MapperData) ->
- case lists:keyfind(Id, 1, MapperData) of
- {Id, MapperProps} ->
- case lists:keyfind(keys, 1, MapperProps) of
- {keys, {VNode, Keys}} ->
- MapperProps1 = lists:keyreplace(keys, 1, MapperProps,
- {keys, {VNode, lists:keydelete(BKey, 2, Keys)}}),
- lists:keyreplace(Id, 1, MapperData, {Id, MapperProps1});
- false -> throw(bad_mapper_props_no_keys);
- _ -> MapperData
- end;
- false -> throw(bad_mapper_props_no_id)
- end.
-
-handle_not_found_reply(VNode, BKey, Executor, #state{fsms=FSMs, mapper_data=MapperData, qterm=QTerm, pending=Pending}=State, Reply) ->
- %% If the reply is not_found, then check if there are other
- %% preflist entries that can be tried before giving up.
-
- %% Look up the properties for the replying mapper
- {_Id, MapperProps} = lists:keyfind(Executor, 1, MapperData),
- %% Extract the vnode data and the list of inputs
- {keys, {VNode, Keys}} = lists:keyfind(keys, 1, MapperProps),
-
- %% Remove the current partition from
- %% the list of potential inputs.
- {BadPartition, _Node} = VNode,
- NewKeys = prune_input_partitions(Keys, BadPartition),
-
- %% Create a new map plan using a different preflist entry.
- %% The call to plan_map will call exit with reason
- %% exhausted_preflist if all the preference list
- %% entries have been checked.
- try riak_kv_mapred_planner:plan_map(NewKeys) of
- ClaimLists ->
- FSMs1 = dict:erase(Executor, FSMs),
- case schedule_input(NewKeys, ClaimLists, QTerm, FSMs1, State) of
- {NewFSMs, _ClaimLists1, FsmKeys} ->
- MapperData1 = lists:keydelete(Executor, 1, MapperData ++ FsmKeys),
- maybe_done(State#state{mapper_data=MapperData1, fsms=NewFSMs});
- {error, exhausted_preflist} ->
- MapperData1 = lists:keydelete(Executor, 1, MapperData),
- maybe_done(State#state{fsms=FSMs1, mapper_data=MapperData1, pending=Pending++Reply})
- end
- catch
- exit:exhausted_preflist ->
- %% At this point the preflist has been exhausted
- Pending1 = Pending ++ Reply,
- FSMs2 = update_counter(Executor, FSMs),
- MapperData2 = update_inputs(Executor, VNode, BKey, MapperData),
- maybe_done(State#state{fsms=FSMs2, mapper_data=MapperData2, pending=Pending1})
- end.
-
-
-
-
View
311 src/riak_kv_mapper.erl
@@ -1,311 +0,0 @@
-%% -------------------------------------------------------------------
-%%
-%% riak_kv_mapper: Executes map functions on input batches
-%%
-%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved.
-%%
-%% This file is provided to you under the Apache License,
-%% Version 2.0 (the "License"); you may not use this file
-%% except in compliance with the License. You may obtain
-%% a copy of the License at
-%%
-%% http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing,
-%% software distributed under the License is distributed on an
-%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-%% KIND, either express or implied. See the License for the
-%% specific language governing permissions and limitations
-%% under the License.
-%%
-%% -------------------------------------------------------------------
--module(riak_kv_mapper).
-
--behaviour(gen_fsm).
-
--include_lib("riak_kv_map_phase.hrl").
--include_lib("riak_kv_js_pools.hrl").
-
--define(READ_TIMEOUT, 30000).
-
-%% API
--export([start_link/5]).
-
-%% States
--export([prepare/2,
- recv_data/2,
- do_map/2]).
-
-%% gen_fsm callbacks
--export([init/1,
- handle_event/3,
- handle_sync_event/4,
- handle_info/3,
- terminate/3,
- code_change/4]).
-
--record(state, {id,
- cache_ref,
- cache_key_base,
- vnode,
- vm,
- qterm,
- pending,
- reqid,
- data=[],
- inputs,
- phase}).
-
-start_link(VNode, Id, QTerm, MapInputs, PhasePid) ->
- gen_fsm:start_link(?MODULE, [VNode, Id, QTerm, MapInputs, PhasePid], []).
-
-init([VNode, Id, QTerm0, MapInputs, PhasePid]) ->
- erlang:link(PhasePid),
- gen_fsm:send_event(PhasePid, {register_mapper, Id, self()}),
- QTermFun = xform_link_walk(QTerm0),
- ReqId = erlang:phash2({self(), os:timestamp()}),
- riak_kv_stat:update(mapper_start),
- {ok, CacheRef} = riak_kv_mapred_cache:cache_ref(),
- CacheKeyBase = generate_cache_key_base(QTermFun(undefined)),
- {ok, VM} = reserve_jsvm(QTermFun(undefined)),
- %% we need some way to reclaim the JS VM if it is busy doing something
- %% when the rest of the MapReduce phase exists (e.g. on timeout)
- %% easiest method is simply to link, such that the VM is also killed,
- %% which will cause the supervisor to spin up a fresh one
- if is_pid(VM) -> erlang:link(VM);
- true -> ok %% erlang phases do not use VMs
- end,
- {ok, prepare, #state{id=Id, vnode=VNode, qterm=QTermFun, inputs=MapInputs,
- cache_key_base=CacheKeyBase, reqid=ReqId, phase=PhasePid,
- cache_ref=CacheRef, vm=VM}, 0}.
-
-prepare(timeout, State) ->
- case fetch_cached_results(State) of
- done ->
- {stop, normal, State};
- State1 ->
- case fetch_data(State1) of
- done ->
- {stop, normal, State1};
- NewState ->
- {next_state, recv_data, NewState}
- end
- end.
-
-recv_data({r, Result, _Idx, ReqId}, #state{reqid=ReqId, data=Data0,
- pending=Pending}=State) ->
- Data = [Result|Data0],
-
- %% When we receive all data for the keys we sent out
- %% switch to "map mode" and evaluate the map function
- case length(Data) == length(Pending) of
- false ->
- {next_state, recv_data, State#state{data=Data}, ?READ_TIMEOUT};
- true ->
- {next_state, do_map, State#state{data=Data}, 0}
- end;
-recv_data(timeout, #state{phase=Phase, id=Id}=State) ->
- riak_kv_phase_proto:mapexec_error(Phase, {error, read_timeout}, Id),
- {stop, normal, State#state{data=[], pending=[], inputs=[]}}.
-
-do_map(timeout, #state{data=Data, vm=VM, qterm=QTermFun, pending=Pending,
- phase=Phase, id=Id, cache_key_base=CacheKeyBase, vnode=VNode, cache_ref=CacheRef}=State) ->
- lists:foldl(fun(Obj, WorkingSet) ->
- case find_input(obj_bkey(Obj), WorkingSet) of
- {none, WorkingSet} ->
- WorkingSet;
- {Input, WorkingSet1} ->
- case QTermFun(Input) of
- {error, Error} ->
- riak_kv_phase_proto:mapexec_error(Phase, {error, Error}, Id);
- QTerm ->
- CacheKey = generate_final_cachekey(CacheKeyBase,
- Input#riak_kv_map_input.kd),
- run_map(VNode, Id, QTerm,
- Input#riak_kv_map_input.kd,
- Obj, Phase, VM, CacheKey, CacheRef)
- end,
- WorkingSet1
- end end, Pending, Data),
- case fetch_data(State) of
- done ->
- {stop, normal, State};
- NewState ->
- {next_state, recv_data, NewState#state{data=[]}}
- end.
-
-handle_event(_Event, StateName, State) ->
- {next_state, StateName, State}.
-
-handle_sync_event(_Event, _From, StateName, State) ->
- {reply, ignored, StateName, State}.
-
-handle_info(_Info, StateName, State) ->
- {next_state, StateName, State}.
-
-terminate(_Reason, _StateName, #state{vm=VM, id=_Id, vnode=_VNode, reqid=_ReqId, phase=_PhasePid}=_State) ->
- release_jsvm(VM),
- riak_kv_stat:update(mapper_end),
- _Reason.
-
-code_change(_OldVsn, StateName, State, _Extra) ->
- {ok, StateName, State}.
-
-%% Internal functions
-fetch_data(#state{inputs=[]}) ->
- done;
-fetch_data(#state{inputs=Inputs, reqid=ReqId,
- vnode=VNode}=State) ->
- {Current, Next} = split(Inputs),
- BKeys = [Input#riak_kv_map_input.bkey || Input <- Current],
- riak_kv_vnode:mget(VNode, BKeys, ReqId),
- State#state{inputs=Next, pending=Current}.
-
-reserve_jsvm({erlang, _}) ->
- {ok, undefined};
-reserve_jsvm({javascript, _}) ->
- riak_kv_js_manager:reserve_batch_vm(?JSPOOL_MAP, 10).
-
-release_jsvm(undefined) ->
- ok;
-release_jsvm(VM) when is_pid(VM) ->
- riak_kv_js_vm:finish_batch(VM).
-
-obj_bkey({{error, notfound},Bkey}) ->
- Bkey;
-obj_bkey(Obj) ->
- {riak_object:bucket(Obj), riak_object:key(Obj)}.
-
-find_input(BKey, WorkingSet) ->
- find_input(BKey, WorkingSet, WorkingSet).
-
-find_input(_BKey, [], CompleteSet) ->
- {none, CompleteSet};
-find_input(BKey, [#riak_kv_map_input{bkey=BKey}=H|_], CompleteSet) ->
- {H, lists:delete(H, CompleteSet)};
-find_input(BKey, [_|T], CompleteSet) ->
- find_input(BKey, T, CompleteSet).
-
-run_map(VNode, Id, {erlang, {map, FunTerm, Arg, _}}, KD, Obj0, Phase, _VM, CacheKey, CacheRef) ->
- Obj = case Obj0 of
- {{error,notfound},_} ->
- {error, notfound};
- _ ->
- Obj0
- end,
- BKey = obj_bkey(Obj0),
- Result = try
- case FunTerm of
- {qfun, F} ->
- {ok, (F)(Obj, KD, Arg)};
- {modfun, M, F} ->
- {ok, M:F(Obj, KD, Arg)}
- end
- catch C:R ->
- Reason = {C, R, erlang:get_stacktrace()},
- {error, Reason}
- end,
- case Result of
- {ok, Value} ->
- riak_kv_phase_proto:mapexec_result(Phase, VNode, obj_bkey(Obj0), Value, Id),
- if
- is_list(Value) ->
- case CacheKey of
- not_cached ->
- ok;
- _ ->
- riak_kv_lru:put(CacheRef, BKey, CacheKey, Value)
- end;
- true ->
- ok
- end;
- {error, _} ->
- riak_kv_phase_proto:mapexec_error(Phase, Result, Id)
- end;
-
-run_map(VNode, Id, {javascript, {map, _FunTerm, _Arg, _}}, KD, {{error, notfound},_}=Obj, Phase, _VM, _CacheKey, _CacheRef) ->
- BKey = obj_bkey(Obj),
- riak_kv_phase_proto:mapexec_result(
- Phase, VNode, BKey, [{not_found, BKey, KD}], Id);
-run_map(VNode, Id, {javascript, {map, FunTerm, Arg, _}}, KD, Obj, Phase, VM, CacheKey, CacheRef) ->
- BKey = {riak_object:bucket(Obj), riak_object:key(Obj)},
- JSArgs = [riak_object:to_json(Obj), KD, Arg],
- JSCall = {map, FunTerm, JSArgs},
- case riak_kv_js_vm:batch_blocking_dispatch(VM, JSCall) of
- {ok, Result} ->
- riak_kv_phase_proto:mapexec_result(Phase, VNode, obj_bkey(Obj), Result, Id),
- if
- is_list(Result) ->
- case CacheKey of
- not_cached ->
- ok;
- _ ->
- riak_kv_lru:put(CacheRef, BKey, CacheKey, Result)
- end;
- true ->
- ok
- end;
- Error ->
- riak_kv_phase_proto:mapexec_error(Phase, Error, Id)
- end.
-
-split(L) when length(L) =< 5 ->
- {L, []};
-split(L) ->
- lists:split(5, L).
-
-generate_cache_key_base({erlang, {map, {modfun, Mod, Fun}, Arg, _}}) ->
- term_to_binary([Mod, Fun, Arg], [compressed]);
-generate_cache_key_base({erlang, _}) ->
- not_cached;
-generate_cache_key_base({javascript, {map, {jsanon, Source}, Arg, _}}) ->
- term_to_binary([Source, Arg], [compressed]);
-generate_cache_key_base({javascript, {map, {jsfun, Name}, Arg, _}}) ->
- term_to_binary([Name, Arg]).
-
-generate_final_cachekey(not_cached, _KD) ->
- not_cached;
-generate_final_cachekey(CacheKey, KD) ->
- CacheKey1 = list_to_binary([CacheKey, term_to_binary(KD)]),
- mochihex:to_hex(crypto:sha(CacheKey1)).
-
-fetch_cached_results(#state{cache_key_base=not_cached}=State) ->
- State;
-fetch_cached_results(#state{vnode=VNode, id=Id, phase=Phase, cache_ref=CacheRef,
- cache_key_base=CacheKeyBase, inputs=Inputs}=State) ->
- case fetch_cached_results(VNode, Id, Phase, CacheRef, CacheKeyBase, Inputs, []) of
- done ->
- done;
- Remainder ->
- State#state{inputs=Remainder}
- end.
-
-fetch_cached_results(_VNode, _Id, _Phase, _CacheRef, _CacheKeyBase, [], []) ->
- done;
-fetch_cached_results(_VNode, _Id, _Phase, _CacheRef, _CacheKeyBase, [], Accum) ->
- Accum;
-fetch_cached_results(VNode, Id, Phase, CacheRef, CacheKeyBase, [#riak_kv_map_input{bkey=BKey, kd=KD}=H|T], Accum) ->
- CacheKey = generate_final_cachekey(CacheKeyBase, KD),
- case riak_kv_lru:fetch(CacheRef, BKey, CacheKey) of
- notfound ->
- fetch_cached_results(VNode, Id, Phase, CacheRef, CacheKey, T, [H|Accum]);
- Result ->
- riak_kv_phase_proto:mapexec_result(Phase, VNode, BKey, Result, Id),
- fetch_cached_results(VNode, Id, Phase, CacheRef, CacheKey, T, Accum)
- end.
-
-xform_link_walk({erlang, {link, LB, LT, LAcc}}=QTerm) ->
- fun(Input) ->
- case Input of
- undefined ->
- QTerm;
- _ ->
- case proplists:get_value(linkfun, Input#riak_kv_map_input.bprops) of
- undefined ->
- {error, missing_linkfun};
- LinkFun ->
- {erlang, {map, LinkFun, {LB, LT}, LAcc}}
- end
- end end;
-xform_link_walk(QTerm) ->
- fun(_) -> QTerm end.