Skip to content
Browse files

Pre-alpha version (already working somehow though)

  • Loading branch information...
0 parents commit 431b76d51c3244f7f12b6359919cc67bdb5fdccb @doubleyou committed Oct 4, 2012
Showing with 322 additions and 0 deletions.
  1. +2 −0 .gitignore
  2. +20 −0 Makefile
  3. +4 −0 priv/nodes.cfg
  4. BIN rebar
  5. +12 −0 src/dps.app.src
  6. +24 −0 src/dps.erl
  7. +18 −0 src/dps_app.erl
  8. +137 −0 src/dps_channel.erl
  9. +55 −0 src/dps_channels_manager.erl
  10. +20 −0 src/dps_channels_sup.erl
  11. +23 −0 src/dps_sup.erl
  12. +7 −0 src/dps_util.erl
2 .gitignore
@@ -0,0 +1,2 @@
+ebin/*
+erl_crash.dump
20 Makefile
@@ -0,0 +1,20 @@
+all:
+ ./rebar compile
+
+clean:
+ ./rebar clean
+
+test:
+ ./rebar eunit
+
+1:
+ erl -pa ebin -s dps -sname node1
+
+2:
+ erl -pa ebin -s dps -sname node2
+
+3:
+ erl -pa ebin -s dps -sname node3
+
+4:
+ erl -pa ebin -s dps -sname node4
4 priv/nodes.cfg
@@ -0,0 +1,4 @@
+node1@twist.
+node2@twist.
+node3@twist.
+node4@twist.
BIN rebar
Binary file not shown.
12 src/dps.app.src
@@ -0,0 +1,12 @@
+{application, dps,
+ [
+ {description, ""},
+ {vsn, "1"},
+ {registered, []},
+ {applications, [
+ kernel,
+ stdlib
+ ]},
+ {mod, { dps_app, []}},
+ {env, []}
+ ]}.
24 src/dps.erl
@@ -0,0 +1,24 @@
+-module(dps).
+
+-export([start/0,
+ new/0,
+ new/1,
+ publish/2,
+ subscribe/1]).
+
+start() ->
+ {ok, Nodes} = file:consult("priv/nodes.cfg"),
+ [net_adm:ping(Node) || Node <- Nodes],
+ application:start(dps).
+
+new() ->
+ new(os:cmd("uuidgen") -- "\n").
+
+new(Tag) ->
+ dps_channel:new(Tag).
+
+publish(Tag, Msg) ->
+ dps_channel:publish(Tag, Msg).
+
+subscribe(Tag) ->
+ dps_channel:subscribe(Tag).
18 src/dps_app.erl
@@ -0,0 +1,18 @@
+-module(dps_app).
+-behaviour(application).
+
+-export([start/2, stop/1]).
+
+%%
+%% Application callbacks
+%%
+
+start(_StartType, _StartArgs) ->
+ Result = dps_sup:start_link(),
+ {TagsLists, _} = rpc:multicall(nodes(), dps_channel, all, []),
+ Tags = sets:to_list(sets:from_list(lists:flatten(TagsLists))),
+ [dps:new(Tag) || Tag <- Tags],
+ Result.
+
+stop(_State) ->
+ ok.
137 src/dps_channel.erl
@@ -0,0 +1,137 @@
+-module(dps_channel).
+-behaviour(gen_server).
+
+-include_lib("stdlib/include/ms_transform.hrl").
+
+-export([start_link/1]).
+-export([init/1,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ code_change/3,
+ terminate/2]).
+
+-export([all/0,
+ new/1,
+ new/2,
+ publish/2,
+ publish/4,
+ subscribe/1,
+ subscribe/2,
+ lookup/1,
+ msgs_from_peers/2]).
+
+-record(state, {
+ subscribers = [],
+ messages = []
+}).
+
+%%
+%% External API
+%%
+
+all() ->
+ MS = ets:fun2ms(fun({Tag, _}) -> Tag end),
+ ets:select(dps_channels, MS).
+
+new(Tag) ->
+ new(Tag, global).
+
+new(Tag, Mode) ->
+ Result = case lookup(Tag) of
+ undefined ->
+ supervisor:start_child(dps_channels_sup, [Tag]);
+ _ ->
+ {error, channel_already_exists}
+ end,
+ Mode =:= global andalso rpc:multicall(nodes(), ?MODULE, new, [Tag, local]),
+ Result.
+
+publish(Tag, Msg) ->
+ TS = dps_util:ts(),
+ publish(Tag, Msg, TS, global).
+
+publish(Tag, Msg, TS, Mode) ->
+ Pid = lookup(Tag),
+ gen_server:call(Pid, {publish, Msg, TS}),
+ Mode =:= global andalso
+ rpc:multicall(nodes(), ?MODULE, publish, [Tag, Msg, TS, local]),
+ TS.
+
+subscribe(Tag) ->
+ subscribe(Tag, 0).
+
+subscribe(Tag, TS) ->
+ Pid = lookup(Tag),
+ gen_server:call(Pid, {subscribe, self(), TS}).
+
+lookup(Tag) ->
+ case ets:lookup(dps_channels, Tag) of
+ [{Tag, Pid}] -> Pid;
+ [] -> undefined
+ end.
+
+msgs_from_peers(Tag, CallbackPid) ->
+ Pid = lookup(Tag),
+ Pid ! {give_me_messages, CallbackPid}.
+
+start_link(Tag) ->
+ gen_server:start_link(?MODULE, Tag, []).
+
+%%
+%% gen_server callbacks
+%%
+
+init(Tag) ->
+ dps_channels_manager:register_channel(Tag, self()),
+ rpc:multicall(nodes(), ?MODULE, msgs_from_peers, [Tag, self()]),
+ {ok, #state{}}.
+
+handle_call({publish, Msg, TS}, {Pid, _}, State = #state{messages = Msgs,
+ subscribers = Subscribers}) ->
+ [Sub ! {dps_msg, Msg} || Sub <- Subscribers, Sub =/= Pid],
+ NewState = State#state{messages = lists:sort([{TS, Msg} | Msgs])},
+ {reply, ok, NewState};
+handle_call({subscribe, Pid, TS}, _From, State = #state{messages = Messages,
+ subscribers = Subscribers}) ->
+ erlang:monitor(process, Pid),
+ Msgs = later_than(TS, Messages),
+ [Pid ! {dps_msg, Msg} || Msg <- Msgs],
+ NewState = State#state{subscribers = [Pid | Subscribers]},
+ {reply, length(Msgs), NewState};
+handle_call(_Msg, _From, State) ->
+ {noreply, State}.
+
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+handle_info({give_me_messages, Pid}, State = #state{messages = Messages}) ->
+ Pid ! {messages, Messages},
+ {noreply, State};
+handle_info({messages, Msgs}, State = #state{messages = Messages,
+ subscribers = Subscribers}) ->
+ [[Sub ! {dps_msg, Msg} || Msg <- Msgs] || Sub <- Subscribers],
+ NewState = State#state{ messages = lists:usort(Messages ++ Msgs) },
+ {noreply, NewState};
+handle_info({'DOWN', _, _, Pid, _}, State = #state{subscribers=Subscribers}) ->
+ {noreply, State#state{subscribers = Subscribers -- [Pid]}};
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+%%
+%% Internal functions
+%%
+
+later_than(TS, Messages) ->
+ lists:takewhile(
+ fun({TS_, _}) ->
+ TS_ > TS
+ end,
+ Messages
+ ).
55 src/dps_channels_manager.erl
@@ -0,0 +1,55 @@
+-module(dps_channels_manager).
+-behaviour(gen_server).
+
+-include_lib("stdlib/include/ms_transform.hrl").
+
+-export([start_link/0]).
+-export([init/1,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ code_change/3,
+ terminate/2]).
+
+-export([register_channel/2]).
+
+%%
+%% External API
+%%
+
+start_link() ->
+ ets:new(dps_channels, [public, named_table, set]),
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+register_channel(Tag, Pid) ->
+ gen_server:call(?MODULE, {register, Tag, Pid}).
+
+%%
+%% gen_server callbacks
+%%
+
+init(Args) ->
+ {ok, Args}.
+
+handle_call({register, Tag, Pid}, _From, State) ->
+ ets:insert(dps_channels, {Tag, Pid}),
+ erlang:monitor(process, Pid),
+ {reply, ok, State};
+handle_call(_Msg, _From, State) ->
+ {noreply, State}.
+
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+handle_info({'DOWN', _, _, Pid, _}, State) ->
+ MS = ets:fun2ms(fun({_, Pid_}) -> Pid_ =:= Pid end),
+ ets:select_delete(dps_channels, MS),
+ {noreply, State};
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+terminate(_Reason, _State) ->
+ ok.
20 src/dps_channels_sup.erl
@@ -0,0 +1,20 @@
+-module(dps_channels_sup).
+-behaviour(supervisor).
+
+-export([start_link/0]).
+-export([init/1]).
+
+%%
+%% External API
+%%
+
+start_link() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+%%
+%% supervisor callbacks
+%%
+
+init([]) ->
+ Child = {none, {dps_channel, start_link, []}, transient, 5000, worker, [dps_channel]},
+ {ok, { {simple_one_for_one, 5, 10}, [Child]} }.
23 src/dps_sup.erl
@@ -0,0 +1,23 @@
+-module(dps_sup).
+-behaviour(supervisor).
+
+-export([start_link/0]).
+-export([init/1]).
+
+-define(CHILD(M, R), {M, {M, start_link, []}, permanent, 5000, R, [M]}).
+
+%%
+%% External API
+%%
+
+start_link() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+%%
+%% supervisor callbacks
+%%
+
+init([]) ->
+ ChannelsMgr = ?CHILD(dps_channels_manager, worker),
+ ChannelsSup = ?CHILD(dps_channels_sup, supervisor),
+ {ok, { {one_for_one, 5, 10}, [ChannelsMgr, ChannelsSup]} }.
7 src/dps_util.erl
@@ -0,0 +1,7 @@
+-module(dps_util).
+
+-export([ts/0]).
+
+ts() ->
+ {Megasecs, Secs, Microsecs} = now(),
+ Microsecs + 1000 * (Secs + 1000000 * Megasecs).

0 comments on commit 431b76d

Please sign in to comment.
Something went wrong with that request. Please try again.