Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
more work on mem3 init, handling different types of joins, requiring …
…more human-intervention, reworking startargs to strip out most everything
  • Loading branch information
Brad Anderson committed May 10, 2010
1 parent e522c64 commit 8429ee374325a6a2c9779c5001c143ce4b35b1c6
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 79 deletions.
@@ -18,6 +18,7 @@
dynomite_prof,
dynomite_sup,
lib_misc,
mem3,
mem_utils,
membership2,
node,
@@ -44,4 +44,7 @@
-record(mem, {header=3,
node,
nodes,
clock}).
clock,
ets,
test=false
}).
@@ -40,17 +40,9 @@

%% @doc start required apps, join cluster, start dynomite supervisor
start(_Type, _StartArgs) ->
% get process_dict hack for startargs (i.e. not from .app file)
PdStartArgs = case erase(startargs) of
undefined ->
[];
Args ->
Args
end,

% start dynomite supervisor
ok = start_node(),
case dynomite_sup:start_link(PdStartArgs) of
case dynomite_sup:start_link() of
{ok, Supervisor} ->
{ok, Supervisor};
Error ->
@@ -1,20 +1,10 @@
%%%-------------------------------------------------------------------
%%% File: dynomite_sup.erl
%%% @author Cliff Moon <cliff@powerset.com> []
%%% @copyright 2008 Cliff Moon
%%% @doc
%%%
%%% @end
%%%
%%% @since 2008-06-27 by Cliff Moon
%%%-------------------------------------------------------------------
-module(dynomite_sup).
-author('cliff@powerset.com').
-author('brad@cloudant.com').

-behaviour(supervisor).

%% API
-export([start_link/1]).
-export([start_link/0]).

%% Supervisor callbacks
-export([init/1]).
@@ -31,8 +21,8 @@
%% @doc Starts the supervisor
%% @end
%%--------------------------------------------------------------------
start_link(Hints) ->
supervisor:start_link(?MODULE, [Hints]).
start_link() ->
supervisor:start_link(?MODULE, []).

%%====================================================================
%% Supervisor callbacks
@@ -47,11 +37,9 @@ start_link(Hints) ->
%% specifications.
%% @end
%%--------------------------------------------------------------------
init(Args) ->
Node = node(),
Nodes = running_nodes() ++ [node()],
init(_Args) ->
Membership = {membership,
{mem3, start_link, [Node, Nodes, Args]},
{mem3, start_link, []},
permanent,
1000,
worker,
@@ -68,18 +56,3 @@ init(Args) ->
%%====================================================================
%% Internal functions
%%====================================================================

%% @doc get a list of running nodes visible to this local node
running_nodes() ->
[Node || Node <- nodes([this,visible]), running(Node)].

%% @doc monitor the membership server on Node from here
running(Node) ->
Ref = erlang:monitor(process, {membership, Node}),
R = receive
{'DOWN', Ref, _, _, _} -> false
after 1 ->
true
end,
erlang:demonitor(Ref),
R.
@@ -5,8 +5,8 @@
-behaviour(gen_server).

%% API
-export([start_link/2, start_link/3, stop/0, stop/1]).
-export([clock/0, state/0]).
-export([start_link/0, start_link/1, stop/0, stop/1]).
-export([join/2, clock/0, state/0]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@@ -21,12 +21,12 @@
%% API
%%====================================================================

start_link(Node, ErlNodes) ->
start_link(Node, ErlNodes, []).
start_link() ->
start_link([]).


start_link(Node, ErlNodes, Args) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [Node, ErlNodes, Args], []).
start_link(Args) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, Args, []).


stop() ->
@@ -37,6 +37,10 @@ stop(Server) ->
gen_server:cast(Server, stop).


join(JoinType, Nodes) ->
gen_server:call(?MODULE, {join, JoinType, Nodes}).


clock() ->
gen_server:call(?MODULE, clock).

@@ -50,20 +54,23 @@ state() ->
%%====================================================================

%% start up membership server
init([Node, Nodes, Args]) ->
init(Args) ->
process_flag(trap_exit,true),
showroom_log:message(info, "membership: membership server starting...", []),
net_kernel:monitor_nodes(true),
Options = lists:flatten(Args),
Config = configuration:get_config(),
OldState = read_latest_state_file(Config),
State = handle_init(Node, Nodes, Options, OldState, Config),
{ok, State}.
OldState = case Args of
test -> nil;
_ -> read_latest_state_file(Config)
end,
State = handle_init(OldState),
{ok, State#mem{test=(Args == test)}}.



%% new node joining to this node
handle_call({join, _JoiningNode, _Options}, _From, State) ->
{reply, ok, State};
handle_call({join, JoinType, ExtNodes}, _From, State) ->
Config = configuration:get_config(),
Reply = handle_join(JoinType, ExtNodes, State, Config),
{reply, Reply, State};

%% clock
handle_call(clock, _From, State = #mem{clock=Clock}) ->
@@ -114,43 +121,58 @@ terminate(_Reason, _State) ->

% ignored code change
code_change(OldVsn, State, _Extra) ->
io:format("Unknown Old Version!~nOldVsn: ~p~nState : ~p~n", [OldVsn, State]),
io:format("Unknown Old Version~nOldVsn: ~p~nState : ~p~n", [OldVsn, State]),
{ok, State}.


%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------

% we could be:
% 1. starting fresh node into a fresh cluster (we're one of first nodes)
% 2. starting fresh node into an existing cluster (need to join)
% 3. rejoining a cluster after some downtime
% we could be automatically:
% 1. rejoining a cluster after some downtime
%
% we could be manually:
% 2. beginning a cluster with only this node
% 3. joining a cluster as a new node
% 4. replacing a node in an existing cluster

handle_init(Node, [], nil, Options, Config) ->
% no other erlang nodes, no old state
Hints = proplists:get_value(hints, Options),
Map = create_map(Config, [{Node, Hints}]),
?debugFmt("~nmap: ~p~n", [Map]);

handle_init(_Node, [], _OldState, _Options, _Config) ->
% no other erlang nodes, old state
% network partition?
handle_init(nil) ->
showroom_log:message(info, "membership: membership server starting...", []),
net_kernel:monitor_nodes(true),
Table = init_ets_table(),
Node = node(),
Nodes = [{Node, []}],
Clock = vector_clock:create(Node),
#mem{node=Node, nodes=Nodes, clock=Clock, ets=Table};

handle_init(_OldState) ->
?debugHere,
% there's an old state, let's try to rejoin automatically
% TODO implement me
Table = init_ets_table(),
#mem{ets=Table}.


%% handle join activities
handle_join(first, ExtNodes, State, Config) ->
Map = create_map(Config, ExtNodes),
?debugFmt("~nmap: ~p~n", [Map]),
State#mem{};

handle_join(new, _ExtNodes, _State, _Config) ->
ok;

handle_init(_Node, _ErlNodes, nil, _Options, _Config) ->
% other erlang nodes, no old state
handle_join(replace, [_OldNode | _], _State, _Config) ->
ok;

handle_init(_Node, _ErlNodes, _OldState, _Options, _Config) ->
% other erlang nodes, old state
% network partition?
ok.
handle_join(JoinType, _, _, _) ->
showroom_log:message(info, "membership: unknown join type: ~p", [JoinType]),
{error, {unknown_join_type, JoinType}}.


%% @doc find the latest state file on disk
find_latest_state_filename(Config) ->
?debugFmt("~nConfig: ~p~n", [Config]),
Dir = Config#config.directory,
case file:list_dir(Dir) of
{ok, Filenames} ->
@@ -202,3 +224,9 @@ make_fullmap(PMap) ->
[{Node, Part, primary} | PartnerList]
end, PMap),
NodeParts.


init_ets_table() ->
Table = list_to_atom(lists:concat(["mem_", atom_to_list(node())])),
ets:new(Table, [public, set, named_table]),
Table.
@@ -0,0 +1,38 @@
-module(mem3_test).

-include("../include/common.hrl").
-include_lib("eunit/include/eunit.hrl").

%% TEST SETUP

all_tests_test_() ->
{"membership3 tests",
[
{setup,
fun test_setup/0,
fun test_teardown/1,
fun(Pid) ->
{with, Pid,
[
fun init/1
]}
end}
]
}.


test_setup() ->
{ok, Pid} = mem3:start_link(test),
Pid.


test_teardown(Pid) ->
exit(Pid, shutdown).


%% TESTS

init(_Pid) ->
State = #mem{test=Test} = mem3:state(),
?debugFmt("~nState: ~p~n", [State]),
?assertEqual(true, Test).

0 comments on commit 8429ee3

Please sign in to comment.