Permalink
Browse files

Add a component to cache the preflist

This was used as a prototype to run some initial benchmarks.  The
benchmarks didn't show a marked improvement and therefore this work
is being put on hold.  In fact, the throughput/latency profiles were
identical.  There are a few reasons I believe this cache doesn't help.

1. We still use mochiglobal for the ring so we avoid contention on a
   single process

2. The GET/PUT FSMs still need to get the ring anyways for bucket properties.

3. The prepare stage of the GET/PUT FSMs isn't the long-pole.  There are other
   areas that could use improvement first.
  • Loading branch information...
1 parent 0e11189 commit 21a20afd546c57522e52ed3698cc92cff8f05f57 @rzezeski rzezeski committed Nov 22, 2011
Showing with 152 additions and 5 deletions.
  1. +1 −0 ebin/riak_core.app
  2. +10 −0 src/chash.erl
  3. +7 −5 src/riak_core_apl.erl
  4. +2 −0 src/riak_core_node_watcher.erl
  5. +131 −0 src/riak_core_pl_cache.erl
  6. +1 −0 src/riak_core_sup.erl
View
@@ -37,6 +37,7 @@
riak_core_node_watcher,
riak_core_node_watcher_events,
riak_core_pb,
+ riak_core_pl_cache,
riak_core_ring,
riak_core_ring_events,
riak_core_ring_handler,
View
@@ -38,6 +38,8 @@
-export([contains_name/2,
fresh/2,
+ index_to_int/1,
+ int_to_index/1,
lookup/2,
key_of/1,
members/1,
@@ -92,6 +94,14 @@ fresh(NumPartitions, SeedNode) ->
{NumPartitions, [{IndexAsInt, SeedNode} ||
IndexAsInt <- lists:seq(0,(?RINGTOP-1),Inc)]}.
+%% @doc Convert to integer index.
+-spec index_to_int(binary()) -> index_as_int().
+index_to_int(<<Int:160/integer>>) -> Int.
+
+%% @doc Convert to binary index.
+-spec int_to_index(index_as_int()) -> binary().
+int_to_index(Int) -> <<Int:160/integer>>.
+
%% @doc Find the Node that owns the partition identified by IndexAsInt.
-spec lookup(IndexAsInt :: index_as_int(), CHash :: chash()) -> chash_node().
lookup(IndexAsInt, CHash) ->
View
@@ -24,7 +24,8 @@
%% -------------------------------------------------------------------
-module(riak_core_apl).
-export([active_owners/1, active_owners/2,
- get_apl/3, get_apl/4, get_apl_ann/4,
+ get_apl/3, get_apl/4,
+ get_apl_ann/3, get_apl_ann/4,
get_primary_apl/3, get_primary_apl/4
]).
@@ -57,8 +58,7 @@ active_owners(Ring, UpNodes) ->
%% Get the active preflist taking account of which nodes are up
-spec get_apl(binary(), n_val(), atom()) -> preflist().
get_apl(DocIdx, N, Service) ->
- {ok, Ring} = riak_core_ring_manager:get_my_ring(),
- get_apl(DocIdx, N, Ring, riak_core_node_watcher:nodes(Service)).
+ riak_core_pl_cache:preflist(p_and_f, DocIdx, N, Service).
%% Get the active preflist taking account of which nodes are up
%% for a given ring/upnodes list
@@ -67,6 +67,9 @@ get_apl(DocIdx, N, Ring, UpNodes) ->
[{Partition, Node} || {{Partition, Node}, _Type} <-
get_apl_ann(DocIdx, N, Ring, UpNodes)].
+get_apl_ann(DocIdx, N, Service) ->
+ riak_core_pl_cache:preflist(ann, DocIdx, N, Service).
+
%% Get the active preflist taking account of which nodes are up
%% for a given ring/upnodes list and annotate each node with type of
%% primary/fallback
@@ -82,8 +85,7 @@ get_apl_ann(DocIdx, N, Ring, UpNodes) ->
%% Same as get_apl, but returns only the primaries.
-spec get_primary_apl(binary(), n_val(), atom()) -> preflist2().
get_primary_apl(DocIdx, N, Service) ->
- {ok, Ring} = riak_core_ring_manager:get_my_ring(),
- get_primary_apl(DocIdx, N, Ring, riak_core_node_watcher:nodes(Service)).
+ riak_core_pl_cache:preflist(primary, DocIdx, N, Service).
%% Same as get_apl, but returns only the primaries.
-spec get_primary_apl(binary(), n_val(), ring(), [node()]) -> preflist2().
@@ -174,9 +174,11 @@ handle_cast({down, Node}, State) ->
handle_info({nodeup, _Node}, State) ->
%% Ignore node up events; nothing to do here...
+ riak_core_pl_cache:invalidate(),
{noreply, State};
handle_info({nodedown, Node}, State) ->
+ riak_core_pl_cache:invalidate(),
node_down(Node, State),
{noreply, update_avsn(State)};
View
@@ -0,0 +1,131 @@
+%% -------------------------------------------------------------------
+%%
+%% riak_core: Core Riak Application
+%%
+%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+-module(riak_core_pl_cache).
+-behavior(gen_server).
+
+%% API
+-export([invalidate/0,
+ preflist/4,
+ start_link/0]).
+
+%% Callbacks
+-export([init/1,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ terminate/2,
+ code_change/3]).
+
+%% -------------------------------------------------------------------
+%% API
+%% -------------------------------------------------------------------
+
+%% @doc Invalidate the cache.
+-spec invalidate() -> ok.
+invalidate() -> gen_server:cast(?MODULE, invalidate).
+
+%% @doc Attempt to retrieve the preflist from the cache. Otherwise
+%% generate from ring and send async command to write to cache.
+-spec preflist(primary | p_and_f | ann, binary(), pos_integer(), atom()) ->
+ riak_core_apl:preflist() | riak_core_apl:preflist2().
+preflist(Type, Idx, N, Service) ->
+ I = chash:index_to_int(Idx),
+ K = {I, N, Service},
+ try
+ case ets:lookup(?MODULE, K) of
+ [{K,Preflist}] -> Preflist;
+ [] -> calc_and_write(Type, Idx, K)
+ end
+ catch _:Err ->
+ %% The reason for try/catch here is that you don't want
+ %% preflist requestor to fail because the ETS table or
+ %% it's owner don't exist.
+ lager:error("error fetching preflist for ~p with reason ~p ~p",
+ [K, Err, erlang:get_stacktrace()]),
+ calc_and_write(Type, Idx, K)
+ end.
+
+%% @doc Start a new cache server. Meant to be called by the
+%% supervisor.
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+%% -------------------------------------------------------------------
+%% Callbacks
+%% -------------------------------------------------------------------
+
+init([]) ->
+ ets:new(?MODULE, [protected, named_table, {read_concurrency, true}]),
+ {ok, none}.
+
+handle_call(_Msg, _From, _State) ->
+ {noreply, _State}.
+
+%% TODO pass ring vclock to determine if write really needs to happen.
+handle_cast({cache_entry, _RingVClock, Key, Preflist}, _State) ->
+ ets:insert(?MODULE, {Key, Preflist}),
+ {noreply, _State};
+handle_cast(invalidate, _State) ->
+ %% TODO debug level?
+ lager:info("Preflist cache was invalidated"),
+ ets:delete_all_objects(?MODULE),
+ {noreply, _State}.
+
+handle_info(_Msg, _State) -> {noreply, _State}.
+
+terminate(_Reason, _State) -> ignored.
+
+code_change(_OldVsn, _State, _Extra) -> {ok, _State}.
+
+%% -------------------------------------------------------------------
+%% Private
+%% -------------------------------------------------------------------
+
+%% @private
+%%
+%% @doc Calculate the preflist from the ring and send a request to
+%% file a new cache entry.
+-spec calc_and_write(primary | both, binary(), {non_neg_integer(), pos_integer(), atom()}) ->
+ riak_core_apl:preflist().
+calc_and_write(Type, Idx, {_I, N, Service}=Key) ->
+ Preflist = preflist_from_ring(Type, Idx, N, Service),
+ gen_server:cast(?MODULE, {cache_entry, vclock, Key, Preflist}),
+ Preflist.
+
+%% @private
+%%
+%% @doc Calculate preflist from the ring.
+-spec preflist_from_ring(ann | primary | both, binary(), pos_integer(), atom()) ->
+ riak_core_apl:preflist()
+ | riak_core_apl:preflist2().
+preflist_from_ring(ann, Idx, N, Service) ->
+ {ok, Ring} = riak_core_ring_manager:get_my_ring(),
+ Up = riak_core_node_watcher:nodes(Service),
+ riak_core_apl:get_apl_ann(Idx, N, Ring, Up);
+preflist_from_ring(p_and_f, Idx, N, Service) ->
+ {ok, Ring} = riak_core_ring_manager:get_my_ring(),
+ Up = riak_core_node_watcher:nodes(Service),
+ riak_core_apl:get_apl(Idx, N, Ring, Up);
+preflist_from_ring(primary, Idx, N, Service) ->
+ {ok, Ring} = riak_core_ring_manager:get_my_ring(),
+ Up = riak_core_node_watcher:nodes(Service),
+ riak_core_apl:get_primary_apl(Idx, N, Ring, Up).
View
@@ -67,6 +67,7 @@ init([]) ->
?CHILD(riak_core_ring_manager, worker),
?CHILD(riak_core_node_watcher_events, worker),
?CHILD(riak_core_node_watcher, worker),
+ ?CHILD(riak_core_pl_cache, worker),
?CHILD(riak_core_vnode_manager, worker),
?CHILD(riak_core_gossip, worker),
RiakWebs

0 comments on commit 21a20af

Please sign in to comment.