Skip to content

Commit

Permalink
Updated to use detatched and also to have deletion of queues
Browse files Browse the repository at this point in the history
  • Loading branch information
IanCal committed Nov 1, 2011
1 parent a800ce5 commit 86101fa
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 9 deletions.
4 changes: 2 additions & 2 deletions Makefile
@@ -1,5 +1,5 @@

PREFIX:=../
PROJECT:=semq
PREFIX:=dist/
DEST:=$(PREFIX)$(PROJECT)

REBAR=./rebar
Expand Down
5 changes: 4 additions & 1 deletion src/frontend.erl
@@ -1,6 +1,6 @@
-module(frontend).
-import(routing).
-export([getrequest/1, postrequest/2]).
-export([getrequest/1, postrequest/2, deleterequest/1]).

postrequest(QueueName, Message) ->
{ok, Pid} = routing:getqueue(QueueName),
Expand All @@ -10,6 +10,9 @@ getrequest(QueueName) ->
{ok, Pid} = routing:getqueue(QueueName),
request(Pid).

deleterequest(QueueName) ->
routing:deletequeue(QueueName).

request(Pid) ->
Pid ! {get, self()},
receive
Expand Down
10 changes: 7 additions & 3 deletions src/messagequeue.erl
Expand Up @@ -11,7 +11,7 @@ queue([]) ->
returnnextadd(PidReturn);
{add, Message} ->
queue([Message]);
message_received ->
_ ->
queue([])
after ?TIMEOUT ->
exit(self())
Expand All @@ -27,7 +27,9 @@ queue(Messages) ->
[_ | Remaining] = Messages,
queue(Remaining);
{add, Message} ->
queue(Messages ++ [Message])
queue(Messages ++ [Message]);
empty_queue ->
queue([])
after ?TIMEOUT ->
exit(self())
end.
Expand All @@ -40,7 +42,9 @@ returnnextadd(Pid) ->
message_received ->
queue([]);
{get, PidReturn} ->
returnnextadd(PidReturn)
returnnextadd(PidReturn);
empty_queue ->
queue([])
after ?TIMEOUT ->
exit(self())
end.
13 changes: 12 additions & 1 deletion src/routing.erl
Expand Up @@ -4,7 +4,7 @@
-behaviour(gen_server).
-export([start_link/0, stop/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-export([getqueue/1]).
-export([getqueue/1, deletequeue/1]).

-import(messagequeue).

Expand All @@ -18,6 +18,8 @@ start_link() ->
getqueue(QueueName) ->
gen_server:call(?SERVER, {getqueue, QueueName}).

deletequeue(QueueName) ->
gen_server:call(?SERVER, {deletequeue, QueueName}).

init([]) ->
process_flag(trap_exit, true),
Expand All @@ -39,6 +41,15 @@ handle_call({getqueue, QueueName}, _From, State) ->
{reply, {ok, QueuePid}, State}
end;

handle_call({deletequeue, QueueName}, _From, State) ->
case ets:lookup(State#state.id2pid, QueueName) of
[] ->
{reply, ok, State};
[{_QueueName, QueuePid}] ->
QueuePid ! empty_queue,
{reply, ok, State}
end;

handle_call({close, Pid}, _From, State) when is_pid(Pid) ->
unlink(Pid),
[{Pid, Id}] = ets:lookup(State#state.pid2id, Pid),
Expand Down
13 changes: 12 additions & 1 deletion src/semq_web.erl
Expand Up @@ -23,6 +23,13 @@ start(Options) ->
stop() ->
mochiweb_http:stop(?MODULE).

head_request(Req) ->
Req:respond({200, headers(), ""}).

delete_request(Req, Queue) ->
frontend:deleterequest(Queue),
Req:respond({200, headers(), ""}).

get_request(Req, Queue) ->
case frontend:getrequest(Queue) of
{ok, Message} ->
Expand All @@ -40,11 +47,15 @@ loop(Req, _DocRoot) ->
"/" ++ Path = Req:get(path),
try
case Req:get(method) of
Method when Method =:= 'GET'; Method =:= 'HEAD' ->
'GET' ->
get_request(Req, Path);
'HEAD' ->
head_request(Req);
'POST' ->
Message = Req:recv_body(),
post_request(Req, Path, Message);
'DELETE' ->
delete_request(Req, Path);
_ ->
Req:respond({501, [], []})
end
Expand Down
2 changes: 1 addition & 1 deletion start-dev.sh
Expand Up @@ -3,4 +3,4 @@
exec erl -pa ebin edit deps/*/ebin -boot start_sasl \
-sname semq_dev \
-s semq \
-s reloader
-s reloader -detached

0 comments on commit 86101fa

Please sign in to comment.