Permalink
Browse files

Add tests, improve performance

  • Loading branch information...
1 parent 568bc7c commit dc4a0fb9270e0f6205a00cb86c164ae0e3b34b3a @evanmiller evanmiller committed Sep 1, 2012
Showing with 122 additions and 52 deletions.
  1. +1 −0 Emakefile
  2. +17 −0 Makefile
  3. +17 −12 README.md
  4. +1 −0 rebar.config
  5. +44 −40 src/tinymq_channel_controller.erl
  6. +42 −0 tests/tinymq_test.erl
View
@@ -0,0 +1 @@
+{"tests/*", [debug_info, {outdir, "ebintest"}]}.
View
@@ -0,0 +1,17 @@
+ERL=erl
+REBAR=./rebar
+
+
+all: compile
+
+compile:
+ @$(REBAR) compile
+
+compile_test:
+ -mkdir -p ebintest
+ $(ERL) -make
+
+test: compile compile_test
+ $(ERL) -noshell -pa ebin -pa ebintest -pa deps/tiny_pq/ebin \
+ -s tinymq_test run_tests \
+ -s init stop
View
@@ -44,17 +44,22 @@ removed from the channel as soon as the first message is delivered, so
to keep a subscription active you need to keep re-subscribing using the
returned Timestamp as the input to the next call.
-Limitations
+Performance
==
-Internally, messages are stored in a list. This is not terribly efficient.
-As a result, push/2 is O(N + K) where N is the number of non-expired messages
-and K is the number of subscribers. Other operations are either constant-time
-or proportional to the number of returned messages.
-
-If a channel does not receive any new messages, but continues receiving other
-requests, the old messages may linger in memory indefinitely. Old messages are
-purged with every "push" operation on a channel, but not with other operations.
-Note that the channel itself is eliminated if max_age passes without any
-channel activity, so this should only be a concern if there are "polls" and
-"subscribes" on a channel without any new "pushes".
+Internally, messages are stored in a priority queue. Purging old messages
+occurs after any channel activity (but no more than once per second), and has
+an overhead of O(log(M) + E), where M is the total number of messages on a
+channel and E is the number of messages on a channel that expired since the
+last purge. With a better data structure this might be improved to O(log(M)),
+but note that the garbage collector will have to perform O(E) operations anyway,
+so the extra overhead is probably not worth losing sleep over.
+
+Channels are destroyed after max_age seconds of inactivity. Because old messages
+are only purged when there is channel activity, some messages may linger in memory
+for up to 2 * max_age seconds (i.e. if the last channel activity occurs \epsilon
+seconds before a message is set to expire).
+
+New messages on a channel are sent to all channel subscribers serially. With proper
+parallelism the running time might be improved to O(S/K), where S is the number
+of subscribers and K is the number of cores. But this will take some work.
View
@@ -1,3 +1,4 @@
{erl_opts, [debug_info]}.
{deps, [
+ {tiny_pq, ".*", {git, "git://github.com/evanmiller/tiny_pq", {tag, "HEAD"}}}
]}.
@@ -6,7 +6,8 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
--record(state, {channel, messages = [], subscribers = [], max_age, last_pull, supervisor}).
+-record(state, {channel, messages = [], subscribers = [], max_age,
+ last_pull, last_purge, supervisor}).
start_link(MaxAge, ChannelSup, Channel) ->
gen_server:start_link(?MODULE, [MaxAge, ChannelSup, Channel], []).
@@ -15,6 +16,7 @@ init([MaxAge, ChannelSup, Channel]) ->
{ok, #state{max_age = MaxAge,
supervisor = ChannelSup,
channel = Channel,
+ messages = gb_trees:empty(),
last_pull = erlang:now()},
MaxAge * 1000}.
@@ -24,33 +26,30 @@ handle_call(_From, _, State) ->
handle_cast({From, subscribe, 'now', Subscriber}, State) ->
NewSubscribers = [{erlang:monitor(process, Subscriber), Subscriber}|State#state.subscribers],
gen_server:reply(From, {ok, now_to_micro_seconds(erlang:now())}),
- {noreply, State#state{subscribers = NewSubscribers}};
-handle_cast({From, subscribe, 'last', Subscriber}, State) ->
- {NewSubscribers, LastPull} = pull_messages(State#state.last_pull, Subscriber, State),
+ {noreply, purge_old_messages(State#state{ subscribers = NewSubscribers })};
+
+handle_cast({From, subscribe, Timestamp, Subscriber}, State) ->
+ ActualTimestamp = case Timestamp of
+ last -> State#state.last_pull;
+ undefined -> 0;
+ _ -> Timestamp
+ end,
+ {NewSubscribers, LastPull} = pull_messages(ActualTimestamp, Subscriber, State),
gen_server:reply(From, {ok, LastPull}),
- {noreply, State#state{subscribers = NewSubscribers, last_pull = LastPull}};
-handle_cast({From, subscribe, undefined, Subscriber}, State) ->
- {NewSubscribers, LastPull} = pull_messages(undefined, Subscriber, State),
- gen_server:reply(From, {ok, LastPull}),
- {noreply, State#state{subscribers = NewSubscribers, last_pull = LastPull}};
-handle_cast({From, subscribe, Timestamp, Subscriber}, State) when is_integer(Timestamp) ->
- {NewSubscribers, LastPull} = pull_messages(Timestamp, Subscriber, State),
- gen_server:reply(From, {ok, LastPull}),
- {noreply, State#state{subscribers = NewSubscribers, last_pull = LastPull}};
-handle_cast({From, poll, undefined}, State) ->
- Now = now_to_micro_seconds(erlang:now()),
- gen_server:reply(From, {ok, Now, lists:map(fun({Msg, _Ts}) -> Msg end, State#state.messages)}),
- {noreply, State#state{ last_pull = Now }};
-handle_cast({From, poll, 'last'}, State) ->
- ReturnMessages = messages_newer_than_timestamp(State#state.last_pull, State#state.messages),
- Now = now_to_micro_seconds(erlang:now()),
- gen_server:reply(From, {ok, Now, lists:map(fun({Msg, _Ts}) -> Msg end, ReturnMessages)}),
- {noreply, State#state{ last_pull = Now }};
+ {noreply, purge_old_messages(State#state{ subscribers = NewSubscribers,
+ last_pull = LastPull})};
+
handle_cast({From, poll, Timestamp}, State) ->
- ReturnMessages = messages_newer_than_timestamp(Timestamp, State#state.messages),
+ ActualTimestamp = case Timestamp of
+ undefined -> 0;
+ last -> State#state.last_pull;
+ _ -> Timestamp
+ end,
+ ReturnMessages = messages_newer_than_timestamp(ActualTimestamp, State#state.messages),
Now = now_to_micro_seconds(erlang:now()),
- gen_server:reply(From, {ok, Now, lists:map(fun({Msg, _Ts}) -> Msg end, ReturnMessages)}),
- {noreply, State#state{ last_pull = Now }};
+ gen_server:reply(From, {ok, Now, ReturnMessages}),
+ {noreply, purge_old_messages(State#state{ last_pull = Now })};
+
handle_cast({From, push, Message}, State) ->
Now = now_to_micro_seconds(erlang:now()),
LastPull = lists:foldr(fun({Ref, Sub}, _) ->
@@ -59,13 +58,13 @@ handle_cast({From, push, Message}, State) ->
Now
end, State#state.last_pull, State#state.subscribers),
gen_server:reply(From, {ok, Now}),
- CurrentMessages = messages_newer_than_timestamp(now_to_micro_seconds(erlang:now()) -
- seconds_to_micro_seconds(State#state.max_age), State#state.messages),
- NewMessages = [{Message, Now}|CurrentMessages],
- {noreply, State#state{messages = NewMessages, subscribers = [], last_pull = LastPull}};
+ State2 = purge_old_messages(State),
+ NewMessages = tiny_pq:insert_value(Now, Message, State2#state.messages),
+ {noreply, State2#state{messages = NewMessages, subscribers = [], last_pull = LastPull}};
+
handle_cast({From, now}, State) ->
gen_server:reply(From, now_to_micro_seconds(erlang:now())),
- {noreply, State}.
+ {noreply, purge_old_messages(State)}.
terminate(_Reason, _State) ->
ok.
@@ -89,23 +88,28 @@ seconds_to_micro_seconds(Seconds) ->
now_to_micro_seconds({MegaSecs, Secs, MicroSecs}) ->
MegaSecs * 1000 * 1000 * 1000 * 1000 + Secs * 1000 * 1000 + MicroSecs.
-messages_newer_than_timestamp(undefined, Messages) ->
- Messages;
messages_newer_than_timestamp(Timestamp, Messages) ->
- messages_newer_than_timestamp(Timestamp, Messages, []).
+ tiny_pq:foldr_new(fun(V, Acc) -> [V|Acc] end, [], Messages, Timestamp).
-messages_newer_than_timestamp(_, [], Acc) ->
- lists:reverse(Acc);
-messages_newer_than_timestamp(Ts1, [{Msg, Ts2}|Rest], Acc) when Ts2 > Ts1 ->
- messages_newer_than_timestamp(Ts1, Rest, [{Msg, Ts2}|Acc]);
-messages_newer_than_timestamp(_, _, Acc) ->
- lists:reverse(Acc).
+purge_old_messages(State) ->
+ Now = now_to_micro_seconds(erlang:now()),
+ LastPurge = State#state.last_purge,
+ Duration = seconds_to_micro_seconds(1),
+ if
+ Now - LastPurge > Duration ->
+ State#state{
+ messages = tiny_pq:prune_old(State#state.messages,
+ Now - seconds_to_micro_seconds(State#state.max_age)),
+ last_purge = Now };
+ true ->
+ State
+ end.
pull_messages(Timestamp, Subscriber, State) ->
Now = now_to_micro_seconds(erlang:now()),
case messages_newer_than_timestamp(Timestamp, State#state.messages) of
ReturnMessages when erlang:length(ReturnMessages) > 0 ->
- Subscriber ! {self(), Now, lists:map(fun({Msg, _Ts}) -> Msg end, ReturnMessages)},
+ Subscriber ! {self(), Now, ReturnMessages},
{State#state.subscribers, Now};
_ ->
{[{erlang:monitor(process, Subscriber), Subscriber}|State#state.subscribers], Now}
View
@@ -0,0 +1,42 @@
+-module(tinymq_test).
+
+-export([run_tests/0]).
+
+run_tests() ->
+ application:start(tinymq),
+ Channel1 = "channel1",
+ Ts1 = tinymq:now(Channel1),
+ tinymq:push(Channel1, "Hello!"),
+ Ts2 = tinymq:now(Channel1),
+
+ {ok, _, ["Hello!"]} = tinymq:poll(Channel1, Ts1),
+ {ok, _, []} = tinymq:poll(Channel1, Ts2),
+
+ tinymq:push(Channel1, "Goodbye!"),
+
+ {ok, _, ["Goodbye!"]} = tinymq:poll(Channel1, Ts2),
+ {ok, _, ["Hello!", "Goodbye!"]} = tinymq:poll(Channel1, Ts1),
+
+ {ok, _} = tinymq:subscribe(Channel1, 'now', self()),
+
+ tinymq:push(Channel1, "Greetings!"),
+
+ ok = receive
+ {_, _, ["Greetings!"]} ->
+ ok
+ after
+ 1000 ->
+ not_ok
+ end,
+
+ {ok, _} = tinymq:subscribe(Channel1, Ts2, self()),
+
+ ok = receive
+ {_, _, ["Goodbye!", "Greetings!"]} ->
+ ok
+ after
+ 1000 ->
+ not_ok
+ end,
+
+ io:format("Passed all tests~n", []).

0 comments on commit dc4a0fb

Please sign in to comment.