Skip to content

Commit

Permalink
Undo commit 7fc6253 (Merge branch 'slf-core-ets-match-replacement' in…
Browse files Browse the repository at this point in the history
…to 1.0)
  • Loading branch information
slfritchie committed Jan 5, 2012
1 parent 7fc6253 commit 8b40428
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 53 deletions.
49 changes: 9 additions & 40 deletions src/riak_core_node_watcher.erl
Expand Up @@ -65,13 +65,13 @@ node_down() ->
gen_server:call(?MODULE, {node_status, down}, infinity).

services() ->
gen_server:call(?MODULE, services, infinity).
ordsets:from_list([Service || [Service] <- ets:match(?MODULE, {{'_', '$1'}, '_'})]).

services(Node) ->
pubtab_get_services(Node).
[Service || [Service] <- ets:match(?MODULE, {{Node, '$1'}, '_'})].

nodes(Service) ->
pubtab_get_nodes(Service).
[Node || [Node] <- ets:match(?MODULE, {{'$1', Service}, '_'})].


%% ===================================================================
Expand Down Expand Up @@ -151,11 +151,7 @@ handle_call({node_status, Status}, _From, State) ->
{Status, Status} -> %% noop
State
end,
{reply, ok, update_avsn(S2)};
handle_call(services, _From, State) ->
Res = [Service || {{by_service, Service}, Nds} <- ets:tab2list(?MODULE),
Nds /= []],
{reply, lists:sort(Res), State}.
{reply, ok, update_avsn(S2)}.


handle_cast({ring_update, R}, State) ->
Expand Down Expand Up @@ -317,8 +313,8 @@ node_down(Node, State) ->


node_delete(Node) ->
Services = pubtab_get_services(Node),
[pubtab_delete(Node, Service) || Service <- Services],
Services = services(Node),
ets:match_delete(?MODULE, {{Node, '_'}, '_'}),
ets:delete(?MODULE, Node),
Services.

Expand All @@ -331,11 +327,12 @@ node_update(Node, Services) ->

Added = ordsets:subtract(NewStatus, OldStatus),
Deleted = ordsets:subtract(OldStatus, NewStatus),
Unchanged = ordsets:intersection(NewStatus, OldStatus),

%% Update ets table with changes; make sure to touch unchanged
%% service with latest timestamp
[pubtab_delete(Node, Ss) || Ss <- Deleted],
[pubtab_insert(Node, Ss) || Ss <- Added],
[ets:delete(?MODULE, {Node, Ss}) || Ss <- Deleted],
ets:insert(?MODULE, [{{Node, Ss}, Now} || Ss <- Added ++ Unchanged]),

%% Keep track of the last time we recv'd data from a node
ets:insert(?MODULE, {Node, Now}),
Expand Down Expand Up @@ -395,31 +392,3 @@ peers_update(NewPeers, State) ->
%% Broadcast our current status to new peers
broadcast(Added, State#state { peers = NewPeers }).

pubtab_delete(Node, Service) ->
Svcs = pubtab_get_services(Node),
ets:insert(?MODULE, {{by_node, Node}, Svcs -- [Service]}),
Nds = pubtab_get_nodes(Service),
ets:insert(?MODULE, {{by_service, Service}, Nds -- [Node]}).

pubtab_insert(Node, Service) ->
%% Remove Service & node before adding: avoid accidental duplicates
Svcs = pubtab_get_services(Node) -- [Service],
ets:insert(?MODULE, {{by_node, Node}, [Service|Svcs]}),
Nds = pubtab_get_nodes(Service) -- [Node],
ets:insert(?MODULE, {{by_service, Service}, [Node|Nds]}).

pubtab_get_services(Node) ->
case ets:lookup(?MODULE, {by_node, Node}) of
[{{by_node, Node}, Ss}] ->
Ss;
[] ->
[]
end.

pubtab_get_nodes(Service) ->
case ets:lookup(?MODULE, {by_service, Service}) of
[{{by_service, Service}, Ns}] ->
Ns;
[] ->
[]
end.
18 changes: 5 additions & 13 deletions src/riak_core_vnode_master.erl
Expand Up @@ -33,9 +33,7 @@
sync_spawn_command/3, make_request/3,
make_coverage_request/4,
unregister_vnode/2, unregister_vnode/3,
all_nodes/1, reg_name/1, all_index_pid/1,
%% field debugging
get_tab/1]).
all_nodes/1, reg_name/1, all_index_pid/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-record(idxrec, {idx, pid, monref}).
Expand Down Expand Up @@ -154,18 +152,14 @@ all_index_pid(VNodeMod) ->
RegName = reg_name(VNodeMod),
gen_server:call(RegName, all_index_pid, infinity).

get_tab(VNodeMod) ->
RegName = reg_name(VNodeMod),
gen_server:call(RegName, get_tab, infinity).

%% @private
init([VNodeMod, LegacyMod, RegName]) ->
%% Get the current list of vnodes running in the supervisor. We use this
%% to rebuild our ETS table for routing messages to the appropriate
%% vnode.
VnodePids = [Pid || {_, Pid, worker, _}
<- supervisor:which_children(riak_core_vnode_sup)],
IdxTable = ets:new(RegName, [{keypos, 2}, private]),
IdxTable = ets:new(RegName, [{keypos, 2}]),

%% In case this the vnode master is being restarted, scan the existing
%% vnode children and work out which module and index they are responsible
Expand Down Expand Up @@ -235,8 +229,6 @@ handle_call(all_index_pid, _From, State) ->
Reply = [list_to_tuple(L)
|| L <- ets:match(State#state.idxtab, {idxrec, '$1', '$2', '_'})],
{reply, Reply, State};
handle_call(get_tab, _From, State) ->
{reply, ets:tab2list(State#state.idxtab), State};
handle_call({Partition, get_vnode}, _From, State) ->
Pid = get_vnode(Partition, State),
{reply, {ok, Pid}, State};
Expand All @@ -261,9 +253,9 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}.

%% @private
idx2vnode(Idx, _State=#state{idxtab=T}) ->
case ets:lookup(T, Idx) of
[I] -> I#idxrec.pid;
[] -> no_match
case ets:match(T, {idxrec, Idx, '$1', '_'}) of
[[VNodePid]] -> VNodePid;
[] -> no_match
end.

%% @private
Expand Down

0 comments on commit 8b40428

Please sign in to comment.