Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Initial commit

  • Loading branch information...
commit d7f820ec7d33e48ae5af14b820edc15c6ddce833 0 parents
@evanmiller evanmiller authored
33 README.md
@@ -0,0 +1,33 @@
+TinyMQ - A diminutive message queue
+--
+
+TinyMQ is a channel-based, in-memory message queue for Erlang. Channels are
+identified by strings (whatever you want) and are automatically created and
+destroyed as needed. Each channel is managed by a gen_server process. In theory
+the channel processes could reside on different nodes in an Erlang cluster, but
+for now they all reside on the node where TinyMQ is started.
+
+Example usage:
+
+ tinymq_sup:start_link([{max_age, 60}]),
+
+ Timestamp = tinymq:now("some-channel"),
+
+ tinymq:push("some-channel", <<"Hello, world!">>),
+ tinymq:push("some-channel", <<"Hello, again!">>),
+
+ {ok, NewTimestamp, Messages} = tinymq:poll("some-channel", Timestamp),
+
+ io:format("Received messages: ~p~n", [Messages])
+
+Besides polling a channel, it is also possible for processes to subscribe to
+a channel and receive any new message sent to it as soon as the message
+arrives:
+
+ tinymq:pull("some-channel", now, self()),
+ receive
+ {_From, Timestamp, Messages} ->
+ io:format("Received messages: ~p~n", [Messages])
+ end
+
+Each channel can have an unlimited number of subscribers.
BIN  rebar
Binary file not shown
3  rebar.config
@@ -0,0 +1,3 @@
+{erl_opts, [debug_info]}.
+{deps, [
+ ]}.
12 src/tinymq.app.src
@@ -0,0 +1,12 @@
+{application, tinymq,
+ [
+ {description, "TinyMQ: a diminutive message queue"},
+ {vsn, "0.1.0"},
+ {registered, [tinymq]},
+ {modules, []},
+ {applications, [
+ kernel,
+ stdlib
+ ]},
+ {env, []}
+ ]}.
27 src/tinymq.erl
@@ -0,0 +1,27 @@
+-module(tinymq).
+
+-export([now/1, poll/2, pull/3, push/2]).
+
+%% @spec pull(Channel::string(), Timestamp::integer() | now | last, Subscriber::pid()) -> {ok, PullTime} | {error, Reason}
+%% @doc Check `Channel' for messages created since `Timestamp' and send
+%% the result to `Subscriber' (see poll/2 for the result format). If no
+%% messages are in the queue, the channel % does not respond until a message
+%% arrives.
+pull(Channel, Timestamp, Subscriber) ->
+ gen_server:call(tinymq, {pull, Channel, Timestamp, Subscriber}).
+
+%% @spec poll(Channel::string(), Timestamp::integer() | now | last) -> {ok, NewTimestamp, [Message]} | {error, Reason}
+%% @doc Check `Channel' for messages created since `Timestamp', returning
+%% the result.
+poll(Channel, Timestamp) ->
+ gen_server:call(tinymq, {poll, Channel, Timestamp}).
+
+%% @spec push(Channel::string(), Message) -> {ok, Timestamp}
+%% @doc Send a `Message' to `Channel'.
+push(Channel, Message) ->
+ gen_server:call(tinymq, {push, Channel, Message}).
+
+%% @spec now(Channel::string()) -> Timestamp
+%% @doc Retrieve the current time for the server managing `Channel'.
+now(Channel) ->
+ gen_server:call(tinymq, {now, Channel}).
115 src/tinymq_channel_controller.erl
@@ -0,0 +1,115 @@
+-module(tinymq_channel_controller).
+
+-behaviour(gen_server).
+
+-export([start_link/0, start_link/1]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
+
+-record(state, {channel, messages = [], subscribers = [], max_age, last_pull, supervisor}).
+
+start_link() ->
+ start_link([]).
+
+start_link(Args) ->
+ gen_server:start_link(?MODULE, Args, []).
+
+init(Options) ->
+ MaxAgeSeconds = proplists:get_value(max_age, Options, 60),
+ Supervisor = proplists:get_value(supervisor, Options),
+ Channel = proplists:get_value(channel, Options),
+ {ok, #state{max_age = MaxAgeSeconds, supervisor = Supervisor, channel = Channel, last_pull = erlang:now()},
+ MaxAgeSeconds * 1000}.
+
+handle_call(_From, _, State) ->
+ {noreply, State}.
+
+handle_cast({From, pull, 'now', Subscriber}, State) ->
+ NewSubscribers = [{erlang:monitor(process, Subscriber), Subscriber}|State#state.subscribers],
+ gen_server:reply(From, {ok, now_to_micro_seconds(erlang:now())}),
+ {noreply, State#state{subscribers = NewSubscribers}};
+handle_cast({From, pull, 'last', Subscriber}, State) ->
+ {NewSubscribers, LastPull} = pull_messages(State#state.last_pull, Subscriber, State),
+ gen_server:reply(From, {ok, LastPull}),
+ {noreply, State#state{subscribers = NewSubscribers, last_pull = LastPull}};
+handle_cast({From, pull, undefined, Subscriber}, State) ->
+ {NewSubscribers, LastPull} = pull_messages(undefined, Subscriber, State),
+ gen_server:reply(From, {ok, LastPull}),
+ {noreply, State#state{subscribers = NewSubscribers, last_pull = LastPull}};
+handle_cast({From, pull, Timestamp, Subscriber}, State) when is_integer(Timestamp) ->
+ {NewSubscribers, LastPull} = pull_messages(Timestamp, Subscriber, State),
+ gen_server:reply(From, {ok, LastPull}),
+ {noreply, State#state{subscribers = NewSubscribers, last_pull = LastPull}};
+handle_cast({From, poll, undefined}, State) ->
+ Now = now_to_micro_seconds(erlang:now()),
+ gen_server:reply(From, {ok, Now, lists:map(fun({Msg, _Ts}) -> Msg end, State#state.messages)}),
+ {noreply, State#state{ last_pull = Now }};
+handle_cast({From, poll, 'last'}, State) ->
+ ReturnMessages = messages_newer_than_timestamp(State#state.last_pull, State#state.messages),
+ Now = now_to_micro_seconds(erlang:now()),
+ gen_server:reply(From, {ok, Now, lists:map(fun({Msg, _Ts}) -> Msg end, ReturnMessages)}),
+ {noreply, State#state{ last_pull = Now }};
+handle_cast({From, poll, Timestamp}, State) ->
+ ReturnMessages = messages_newer_than_timestamp(Timestamp, State#state.messages),
+ Now = now_to_micro_seconds(erlang:now()),
+ gen_server:reply(From, {ok, Now, lists:map(fun({Msg, _Ts}) -> Msg end, ReturnMessages)}),
+ {noreply, State#state{ last_pull = Now }};
+handle_cast({From, push, Message}, State) ->
+ Now = now_to_micro_seconds(erlang:now()),
+ LastPull = lists:foldr(fun({Ref, Sub}, _) ->
+ Sub ! {self(), Now, [Message]},
+ erlang:demonitor(Ref),
+ Now
+ end, State#state.last_pull, State#state.subscribers),
+ gen_server:reply(From, {ok, Now}),
+ CurrentMessages = messages_newer_than_timestamp(now_to_micro_seconds(erlang:now()) -
+ seconds_to_micro_seconds(State#state.max_age), State#state.messages),
+ NewMessages = [{Message, Now}|CurrentMessages],
+ {noreply, State#state{messages = NewMessages, subscribers = [], last_pull = LastPull}};
+handle_cast({From, now}, State) ->
+ gen_server:reply(From, now_to_micro_seconds(erlang:now())),
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+handle_info(timeout, State) ->
+ gen_server:cast(tinymq, {expire, State#state.channel}),
+ exit(State#state.supervisor),
+ {noreply, State};
+handle_info({'DOWN', Ref, process, _Pid, _Reason}, State) ->
+ {noreply, State#state{ subscribers = proplists:delete(Ref, State#state.subscribers) }};
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+
+seconds_to_micro_seconds(Seconds) ->
+ Seconds * 1000 * 1000.
+
+now_to_micro_seconds({MegaSecs, Secs, MicroSecs}) ->
+ MegaSecs * 1000 * 1000 * 1000 * 1000 + Secs * 1000 * 1000 + MicroSecs.
+
+messages_newer_than_timestamp(undefined, Messages) ->
+ Messages;
+messages_newer_than_timestamp(Timestamp, Messages) ->
+ messages_newer_than_timestamp(Timestamp, Messages, []).
+
+messages_newer_than_timestamp(_, [], Acc) ->
+ lists:reverse(Acc);
+messages_newer_than_timestamp(Ts1, [{Msg, Ts2}|Rest], Acc) when Ts2 > Ts1 ->
+ messages_newer_than_timestamp(Ts1, Rest, [{Msg, Ts2}|Acc]);
+messages_newer_than_timestamp(_, _, Acc) ->
+ lists:reverse(Acc).
+
+pull_messages(Timestamp, Subscriber, State) ->
+ Now = now_to_micro_seconds(erlang:now()),
+ case messages_newer_than_timestamp(Timestamp, State#state.messages) of
+ ReturnMessages when erlang:length(ReturnMessages) > 0 ->
+ Subscriber ! {self(), Now, lists:map(fun({Msg, _Ts}) -> Msg end, ReturnMessages)},
+ {State#state.subscribers, Now};
+ _ ->
+ {[{erlang:monitor(process, Subscriber), Subscriber}|State#state.subscribers], Now}
+ end.
22 src/tinymq_channel_sup.erl
@@ -0,0 +1,22 @@
+-module(tinymq_channel_sup).
+
+-behaviour(supervisor).
+
+-export([start_link/0, start_link/1, start_child/2, init/1]).
+
+start_link() ->
+ supervisor:start_link(?MODULE, []).
+
+start_link(StartArgs) ->
+ supervisor:start_link(?MODULE, StartArgs).
+
+start_child(Supervisor, StartArgs) ->
+ supervisor:start_child(Supervisor,
+ {mq_channel_controller, {tinymq_channel_controller, start_link, [StartArgs]},
+ permanent,
+ 2000,
+ worker,
+ [tinymq_channel_controller]}).
+
+init(_StartArgs) ->
+ {ok, {{one_for_one, 10, 10}, []}}.
75 src/tinymq_controller.erl
@@ -0,0 +1,75 @@
+-module(tinymq_controller).
+
+-behaviour(gen_server).
+
+-export([start_link/0, start_link/1]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
+
+-record(state, {dict, max_age}).
+
+start_link() ->
+ start_link([]).
+
+start_link(Args) ->
+ gen_server:start_link({local, tinymq}, ?MODULE, Args, []).
+
+init(Options) ->
+ MaxAgeSeconds = proplists:get_value(max_age, Options, 60),
+ {ok, #state{dict = dict:new(), max_age = MaxAgeSeconds}}.
+
+handle_call({pull, Channel, Timestamp, Subscriber}, From, State) ->
+ {ChannelPid, NewState} = find_or_create_channel(Channel, State),
+ gen_server:cast(ChannelPid, {From, pull, Timestamp, Subscriber}),
+ {noreply, NewState};
+
+handle_call({poll, Channel, Timestamp}, From, State) ->
+ {ChannelPid, NewState} = find_or_create_channel(Channel, State),
+ gen_server:cast(ChannelPid, {From, poll, Timestamp}),
+ {noreply, NewState};
+
+handle_call({push, Channel, Message}, From, State) ->
+ {ChannelPid, NewState} = find_or_create_channel(Channel, State),
+ gen_server:cast(ChannelPid, {From, push, Message}),
+ {noreply, NewState};
+
+handle_call({now, Channel}, From, State) ->
+ {ChannelPid, NewState} = find_or_create_channel(Channel, State),
+ gen_server:cast(ChannelPid, {From, now}),
+ {noreply, NewState}.
+
+handle_cast({expire, Channel}, State) ->
+ NewState = State#state{
+ dict = dict:erase(Channel, State#state.dict)},
+ {noreply, NewState};
+
+handle_cast({set_max_age, NewMaxAge}, State) ->
+ {noreply, State#state{max_age = NewMaxAge}};
+
+handle_cast(_, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+
+% internal
+
+find_or_create_channel(Channel, #state{dict = Chan2Pid, max_age = MaxAge} = State) ->
+ case dict:find(Channel, Chan2Pid) of
+ {ok, Pid} ->
+ {Pid, State};
+ _ ->
+ {ok, ChannelSup} = tinymq_channel_sup:start_link(),
+ {ok, ChannelPid} = tinymq_channel_sup:start_child(ChannelSup,
+ [{max_age, MaxAge}, {supervisor, ChannelSup}, {channel, Channel}]),
+ {ChannelPid, State#state{
+ dict = dict:store(Channel, ChannelPid, Chan2Pid)
+ }}
+ end.
22 src/tinymq_sup.erl
@@ -0,0 +1,22 @@
+-module(tinymq_sup).
+
+-behaviour(supervisor).
+
+-export([start_link/0, start_link/1]).
+
+-export([init/1]).
+
+start_link() ->
+ supervisor:start_link(?MODULE, []).
+
+start_link(StartArgs) ->
+ supervisor:start_link(?MODULE, StartArgs).
+
+init(StartArgs) ->
+ {ok, {{one_for_one, 10, 10}, [
+ {mq_controller, {tinymq_controller, start_link, [StartArgs]},
+ permanent,
+ 2000,
+ worker,
+ [tinymq_controller]}
+ ]}}.
Please sign in to comment.
Something went wrong with that request. Please try again.