Permalink
Browse files

first broadcast works!!

  • Loading branch information...
1 parent 7aa3657 commit 1e4fc61d80dec8826c43e3621a0b34b96f204c23 @bernardpaulus committed May 6, 2012
Showing with 56 additions and 27 deletions.
  1. +8 −2 erlang/project/condition.erl
  2. +1 −1 erlang/project/erb.erl
  3. +20 −15 erlang/project/ld_cons.erl
  4. +21 −2 erlang/project/test.erl
  5. +6 −7 erlang/project/tob.erl
@@ -1,6 +1,6 @@
% @doc a condition server
-module(condition).
--export([start/0, stop/0, upon/2, check/1]).
+-export([start/0, stop/0, upon/2, check/1, clear/0]).
-behaviour(gen_server).
%% gen_server callbacks
@@ -26,6 +26,9 @@ upon(Condition, Action) when
check(State) ->
gen_server:call(condition, {check, State}).
+% clear my existing conditions
+clear() ->
+ gen_server:call(condition, {clear}).
% callbacks
@@ -50,7 +53,10 @@ handle_call({check, S}, {Pid, _Tag}, State) ->
end;
error ->
{reply, {error, Pid, has_no_condition}, State}
- end.
+ end;
+
+handle_call({clear}, {Pid, _Tag}, State) ->
+ {reply, ok, dict:erase(Pid, State)}.
handle_cast(stop, _State) ->
{stop, normal, stopped}.
View
@@ -25,7 +25,7 @@ start([]) -> [].
% @spec (Down :: pid()) -> void
% @doc initializes the erb process
init(_Others, Down) ->
- io:format("Je suis erb ~p~n", [self()]),
+ %io:format("Je suis erb ~p~n", [self()]),
Down ! {subscribe, self()},
erb_loop(#erb_state{down = Down}).
View
@@ -18,7 +18,8 @@
epoch_cons = none,
epoch_chang = none,
my_ups = [],
- peers = []
+ peers = [],
+ round = 0
}).
@@ -30,9 +31,13 @@ start(Epoch_Conss, Epoch_Changes) when
[[Epoch_Chang] || Epoch_Chang <- Epoch_Changes]).
+% @equiv init(Peers, Epoch_Cons, Epoch_Chang, 0)
+init(Peers, Epoch_Cons, Epoch_Chang) ->
+ init(Peers, Epoch_Cons, Epoch_Chang, 0).
+
%% Epoch_Chang : a Monarchical Eventual Leader Detector
%% Peers : peers in the monarch_eld
-init(Peers, Epoch_Cons, Epoch_Chang) ->
+init(Peers, Epoch_Cons, Epoch_Chang, Round) ->
Self = self(),
L0 = monarch_eld:max_rank(Peers, sets:from_list(Peers)),
Epoch_Cons ! {subscribe, self()},
@@ -62,7 +67,8 @@ init(Peers, Epoch_Cons, Epoch_Chang) ->
lead = L0,
newts = 0,
newl = none,
- peers = Peers}).
+ peers = Peers,
+ round = Round}).
%% TODO add restart/reinit
@@ -91,31 +97,29 @@ ldc_loop(State) ->
%% There is a new epoch, abort
{startepoch, New_TS, New_L} ->
- #ldc_state{epoch_cons = EC} = State,
- io:format("~p startepoch ~p ~p", [self(), New_TS, New_L]),
- EC ! {abort, Self, New_TS},
+ #ldc_state{epoch_cons = EC, newts = Old_Ts} = State,
+ EC ! {abort, Self, Old_Ts},
ldc_loop(State#ldc_state{newts = New_TS, newl = New_L});
%% The EpochConsensus has been aborted
{aborted, Abo_State, Abo_TS} when State#ldc_state.ets == Abo_TS ->
#ldc_state{epoch_cons = EC, newts = New_TS, newl = New_L} = State,
%% initialize a new instance of epoch consensus with timestamp ets
%% Epoch_Cons = rw_epoch_cons:start(Beb, Link, New_TS, Abo_State),
- io:format("~p reinit", [self()]),
Epoch_Cons = rw_epoch_cons:reinit(EC, New_TS, Abo_State),
New_State = State#ldc_state{proposed = false, ets = New_TS, lead = New_L, epoch_cons = Epoch_Cons},
condition:check(New_State),
ldc_loop(New_State);
%% Decide on a value
{decide, Val, Ets} when State#ldc_state.ets == Ets ->
- #ldc_state{decided = D, my_ups = My_Ups} = State,
+ #ldc_state{decided = D, my_ups = My_Ups, round = Round} = State,
case D of
false ->
io:format("~p decided ~p ~n", [self(), Val]),
- [Up ! {decide, Val, Ets} || Up <- My_Ups],
+ [Up ! {decide, Val, Round} || Up <- My_Ups],
ldc_loop(State#ldc_state{decided = true});
- true ->
+ true -> % should never be true
ldc_loop(State)
end;
@@ -128,18 +132,19 @@ ldc_loop(State) ->
%% reinitalize the LDC
- {reinit, Pid} = M ->
- #ldc_state{peers = Peers, epoch_cons = Epoch_Cons, epoch_chang = Epoch_Chang, my_ups = My_Ups} = State,
+ {reinit, Pid, New_Round} = M ->
+ #ldc_state{peers = Peers, epoch_cons = Epoch_Cons,
+ epoch_chang = Epoch_Chang, my_ups = My_Ups} = State,
Target = self(),
spawn(fun() -> % re-subscribe my ups
[utils:subscribe(Up, Target) || Up <- My_Ups],
Pid ! {ack, Target, M} % only ack after re-subscription
end),
- init(Peers, Epoch_Cons, Epoch_Chang)
+ init(Peers, Epoch_Cons, Epoch_Chang, New_Round)
end.
-reinit(LDC) ->
- M = LDC ! {reinit, self()},
+reinit(LDC, Round) ->
+ M = LDC ! {reinit, self(), Round},
receive {ack, LDC, M} -> LDC end.
View
@@ -26,8 +26,8 @@ ldc() ->
_LDCs = [A, _B, _C] = ld_cons:start(Epoch_Conss, Epoch_Changes),
%% Debug mode
- %dbg:tracer(),
- %dbg:p(A,m),
+ dbg:tracer(),
+ dbg:p(A,m),
%dbg:p(E1,m),
receive after 100 -> pass end,
@@ -113,3 +113,22 @@ epoch_change() ->
%[E ! {subscribe, O} || {E, O} <- lists:zip(Epoch_Changes, _Dev_Nulls)],
%[dbg:p(O, m) || O <- _Dev_Nulls],
Epoch_Changes.
+
+rwec() ->
+ %% The links
+ Links = link:perfect_link([node(), node(), node()]),
+ receive after 100 -> pass end,
+ %% Best effort broadcasts
+ Bebs = beb:start(Links),
+ receive after 100 -> pass end,
+ %% The epoch change
+ %% The initial states of the epoch consensus
+ E_States = [{0, bottom} || _ <- Bebs],
+ %% The epoch consensus
+ Epoch_Conss = [_E1, _E2, _E3] = rw_epoch_cons:start(Bebs, Links, 0, E_States),
+
+ %% Debug mode
+ dbg:tracer(),
+ dbg:p(_E1,m),
+
+ Epoch_Conss.
View
@@ -44,10 +44,9 @@ init(State) ->
Self = self(),
#tob_state{rb = RB, consensus = Consensus} = State,
utils:subscribe(RB),
- %% RB ! {subscribe, self()},
utils:subscribe(Consensus),
- %% Consensus ! {subscribe, self()},
-
+ %% reinit to enforce an initial round value
+ ld_cons:reinit(Consensus, State#tob_state.round),
condition:start(),
condition:upon(
% Unordered not empty and wait is false
@@ -82,22 +81,22 @@ loop(State) ->
end;
{unordered_not_empty_and_wait_false} ->
- #tob_state{unordered = Unordered, consensus = Cons} = State,
+ #tob_state{unordered = Unordered, consensus = Cons, round = Round}
+ = State,
% initialize a new instance c.round of consensus
- % ld_cons:reinit(Round), % TODO
- ld_cons:reinit(Cons),
+ ld_cons:reinit(Cons, Round),
Cons ! {propose, Unordered},
loop(State#tob_state{wait = true});
{decide, Decided, Round} -> % {decide, V, Round}
#tob_state{delivered = Delivered, unordered = Unordered,
my_ups = My_Ups, round = Round, wait = true} = State,
+ io:format("~p tob deliver ~p~n", [self(), Decided]),
% forall in sort(decided)
[
% trigger deliver to all subscribers
[ Up ! {deliver, From, Msg} || Up <- My_Ups]
|| {_Peer, _Seq, From, Msg} <- lists:sort(sets:to_list(Decided))],
- % TODO reinit consensus %% Why ?
loop(State#tob_state{
delivered = sets:union(Delivered, Decided),
unordered = sets:subtract(Unordered, Decided),

0 comments on commit 1e4fc61

Please sign in to comment.