Skip to content
Browse files

comet and websocket stuff

  • Loading branch information...
1 parent 49e5511 commit 6228815139024fdb4b002e2b8f80ca98ad30facd @RJ committed
View
2 deps/mochiweb/src/mochiweb_util.erl
@@ -370,6 +370,8 @@ guess_mime(File) ->
"application/octet-stream";
".csv" ->
"text/csv";
+ ".xspf" ->
+ "application/xml";
_ ->
"text/plain"
end.
View
27 deps/mochiweb/src/websocket_request.erl
@@ -0,0 +1,27 @@
+%%
+%% This is a wrapper for the Socket connection
+%% @author Dave Bryson [http://weblog.miceda.org]
+%%
+-module(websocket_request,[Socket, Path]).
+
+-export([get/1, get_data/0, send/1]).
+
+get(path) -> Path;
+get(socket) -> Socket.
+
+%% Return the data from the Socket. Parse it from the WebSocket format
+get_data() ->
+ case gen_tcp:recv(Socket, 0) of
+ {ok,Data} ->
+ unframe(binary_to_list(Data));
+ _Other ->
+ exit(normal)
+ end.
+
+%% Send the data to the client. Format it in the WebSocket format
+send(Data) -> gen_tcp:send(Socket, [0] ++ Data ++ [255]).
+
+%% Borrowed from Joe Armstrong's example
+unframe([0|T]) -> unframe1(T).
+unframe1([255]) -> [];
+unframe1([H|T]) -> [H|unframe1(T)].
View
59 src/playdar_http_api.erl
@@ -42,7 +42,23 @@ http_req(Req, DocRoot) ->
]},
respond(Req, R)
end;
-
+
+ % /comet?id=XX pushes results from queries sent with ?comet=XX
+ % doesn't need auth, IDs are opaque.
+ "comet" ->
+ case proplists:get_value("id", Qs) of
+ undefined ->
+ Req:not_found();
+ Cid ->
+ playdar_resolver:register_comet(Cid, self()),
+ Jsonp = proplists:get_value("jsonp", Qs),
+ %MType = content_type(Jsonp /= undefined),
+ MType = "multipart/x-mixed-replace",
+ Response = Req:ok({MType, [], chunked}),
+ Response:write_chunk("<html><head></head><body>"),
+ comet_loop(Response, Jsonp, Cid)
+ end;
+
_ ->
% for all methods other than stat, require auth if jsonp= is used.
case Authenticated of
@@ -79,6 +95,15 @@ http_req_authed(Req, _DocRoot, Method, Qs, _Auth) ->
{<<"album">>, list_to_binary(Album)},
{<<"track">>, list_to_binary(Track)}
]},
+ CometPids = case proplists:get_value("comet", Qs) of
+ undefined -> [];
+ Cid ->
+ case playdar_resolver:get_comet_pid(Cid) of
+ P when is_pid(P) -> [ P ];
+ _ -> []
+ end
+ end,
+
Q = case proplists:get_value("mimetypes", Qs) of
undefined -> Q0;
Strlist ->
@@ -87,7 +112,7 @@ http_req_authed(Req, _DocRoot, Method, Qs, _Auth) ->
{struct, [{<<"mimetypes">>, MT} | L]}
end,
Qry = #qry{ qid = Qid, obj = Q, local = true },
- Qid = playdar_resolver:dispatch(Qry),
+ Qid = playdar_resolver:dispatch(Qry, [], CometPids),
R = {struct,[
{"qid", Qid}
]},
@@ -101,13 +126,25 @@ http_req_authed(Req, _DocRoot, Method, Qs, _Auth) ->
% will return all results found so far in those 4 secs,
% and return immediately on solved
"get_results_long" ->
-
get_results_long_poll(Req, Qs);
_ ->
Req:not_found()
end.
+comet_loop(Resp, Jsonp, Cid) ->
+ receive
+ {results, Qid, Results} ->
+ R = {struct, [{<<"method">>, <<"results">>},
+ {<<"qid">>, Qid},
+ {<<"results">>, Results}]},
+ ?LOG(info, "Writing COMET response for ~s", [Qid]),
+ Body = encode_response( R, Jsonp ),
+ Resp:write_chunk( io_lib:format("<script type=\"text/javascript\">~s</script>~n",
+ [Body]) ),
+ comet_loop(Resp, Jsonp, Cid)
+ end.
+
get_results(Req, Qs) ->
Qid = list_to_binary(proplists:get_value("qid", Qs)),
case playdar_resolver:results(Qid) of
@@ -178,12 +215,20 @@ long_poll_loop(Qid, Timeleft, Results) ->
% responds with json
respond(Req, R) ->
Qs = Req:parse_qs(),
- case proplists:get_value("jsonp", Qs) of
+ Jsonp = proplists:get_value("jsonp", Qs),
+ Ctype = content_type( Jsonp /= undefined ),
+ Body = encode_response(R, Jsonp),
+ Req:ok({Ctype, [], Body}).
+
+content_type(true) -> "text/javascript; charset=utf-8";
+content_type(false)-> "appplication/json; charset=utf-8".
+
+encode_response(R, Jsonp) ->
+ case Jsonp of
undefined ->
- Req:ok({"appplication/json; charset=utf-8", [], mochijson2:encode(R)});
+ mochijson2:encode(R);
F ->
- Req:ok({"text/javascript; charset=utf-8", [],
- F++"("++mochijson2:encode(R)++")\n"})
+ F++"("++mochijson2:encode(R)++");\n"
end.
View
1 src/playdar_http_registry.erl
@@ -33,6 +33,7 @@ init([]) ->
P = ets:new(db, []),
% hardcoded handlers that ship by default:
register_handler("api", fun playdar_http_api:http_req/2, "Handles core Playdar API"),
+ register_handler("ws:api", fun playdar_websocket_api:req/2, "Websocket Playdar API"),
register_handler("logger", fun playdar_logger:http_req/2, "View log output in realtime", "/logger"),
{ok, #state{db=P}}.
View
45 src/playdar_resolver.erl
@@ -12,10 +12,11 @@
-define(MIN_SCORE, 0.6).
%% API
--export([start_link/0, dispatch/1, dispatch/2, sid2qid/1,
+-export([start_link/0, dispatch/1, dispatch/2, dispatch/3, sid2qid/1,
resolvers/0, register_sid/2, add_resolver/2, resolver_pid/1,
queries/0, add_results/2, results/1, result/1,
- solved/1, register_query_observer/2, gc/1]).
+ solved/1, register_query_observer/2, gc/1,
+ register_comet/2, get_comet_pid/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@@ -40,8 +41,12 @@
start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
% dispatch a Query, returns the query pid
-dispatch(Qry) -> dispatch(Qry, []).
-dispatch(Qry, Callbacks)-> gen_server:call(?MODULE, {dispatch, Qry, Callbacks}).
+dispatch(Qry) ->
+ dispatch(Qry, [], []).
+dispatch(Qry, Callbacks) ->
+ gen_server:call(?MODULE, {dispatch, Qry, Callbacks, []}).
+dispatch(Qry, Callbacks, Observers) ->
+ gen_server:call(?MODULE, {dispatch, Qry, Callbacks, Observers}).
% associate a source id with a query id:
register_sid(Sid, Qid) -> gen_server:cast(?MODULE, {register_sid, Sid, Qid}).
@@ -82,6 +87,9 @@ solved(Qid) -> gen_server:call(?MODULE, {solved, Qid}).
register_query_observer(Qid, Pid) ->
gen_server:call(?MODULE, {register_query_observer, Qid, Pid}).
+register_comet(Cid, Pid) -> gen_server:cast(?MODULE, {register_comet, Cid, Pid}).
+get_comet_pid(Cid) -> gen_server:call(?MODULE, {get_comet_pid, Cid}).
+
gc(Age) -> gen_server:cast(?MODULE, {gc, Age}).
%% gen_server callbacks
@@ -114,6 +122,9 @@ init([]) ->
sources=Tid2,
resolvers=[]}}.
+handle_call({get_comet_pid, Cid}, _From, State) ->
+ {reply, get({comet, Cid}), State};
+
handle_call(queries, _From, State) ->
{reply, [Qid||{Qid,_RQ} <- ets:tab2list(State#state.queries)], State};
@@ -144,7 +155,7 @@ handle_call({sid2qid, Sid}, _From, State) ->
{reply, undefined, State}
end;
-handle_call({dispatch, Qry, Callbacks}, _From, State) ->
+handle_call({dispatch, Qry, Callbacks, Observers}, _From, State) when is_list(Callbacks), is_list(Observers) ->
Qid = Qry#qry.qid,
% First of all, do nothing if a query with this Qid already exists:
case ets:lookup(State#state.queries, Qid) of
@@ -155,7 +166,7 @@ handle_call({dispatch, Qry, Callbacks}, _From, State) ->
P = start_resolver_pipeline(Qry, State#state.resolvers),
RQ = #rq{qry=Qry, solved=false, ctime=erlang:localtime(),
callbacks=Callbacks, results=[], pipelinepid=P,
- observers=[]},
+ observers=Observers},
ets:insert(State#state.queries, {Qid, RQ}),
{reply, Qid, State}
end;
@@ -201,6 +212,12 @@ handle_call({register_query_observer, Qid, Pid}, _From, State) ->
end.
+handle_cast({register_comet, Cid, Pid}, State) ->
+ put({comet, Cid}, Pid),
+ put({comet_pid, Pid}, Cid),
+ link(Pid),
+ {noreply, State};
+
handle_cast({gc, Age}, State) ->
Now = calendar:datetime_to_gregorian_seconds(erlang:localtime()),
F = fun({_Qid, El}, Acc) ->
@@ -296,7 +313,7 @@ handle_cast({add_results, Qid, Results}, State) ->
lists:foreach(fun(R)->
Cb(R)
end, Results2)
- end, RQ1#rq.callbacks ),
+ end, RQ1#rq.callbacks ),
{noreply, State}
end;
[] ->
@@ -305,10 +322,16 @@ handle_cast({add_results, Qid, Results}, State) ->
end.
handle_info({'EXIT', Pid, _Reason}, State) ->
- % remove this crashed resolver from our list,
- % the supervisor will restart it as necessary:
- R = lists:filter( fun(X)-> X#resolver.pid /= Pid end, State#state.resolvers),
- {noreply, State#state{resolvers=R}};
+ case erase({comet_pid, Pid}) of
+ undefined ->
+ % remove this crashed resolver from our list,
+ % the supervisor will restart it as necessary:
+ R = lists:filter( fun(X)-> X#resolver.pid /= Pid end, State#state.resolvers),
+ {noreply, State#state{resolvers=R}};
+ Cid ->
+ erase(Cid),
+ {noreply, State}
+ end;
handle_info(_Info, State) ->
{noreply, State}.
View
20 src/playdar_web.erl
@@ -4,15 +4,16 @@
%% @doc Web server for playdar.
-module(playdar_web).
--export([start/1, stop/0, loop/2, render/3]).
+-export([start/1, stop/0, loop/2, wsloop/1, render/3]).
-include("playdar.hrl").
%% External API
start(Options) ->
{DocRoot, Options1} = get_option(docroot, Options),
- Loop = fun(Req) -> ?MODULE:loop(Req, DocRoot) end,
- Opts = [ {loop, Loop}, {name, ?MODULE} | Options1 ],
+ Loop = fun(Req) -> ?MODULE:loop(Req, DocRoot) end,
+ WSLoop = fun(Req) -> ?MODULE:wsloop(Req) end,
+ Opts = [ {loop, Loop}, {wsloop, WSLoop}, {name, ?MODULE} | Options1 ],
mochiweb_http:start(Opts).
@@ -214,6 +215,19 @@ loop1(Req, DocRoot) ->
end
end.
+wsloop(WSReq) ->
+ %["/"++Path0] = WSReq:get(path),
+ %Path = "/ws:"++Path0,
+ Path = "/ws:api",
+ case playdar_http_registry:get_handler(Path) of
+ undefined ->
+ ?LOG(warning, "No ws: handler for ~s", [Path]),
+ fail; % closes socket, since we don't tailcall TODO how to send error code, ws: style?
+ Handler ->
+ Handler(WSReq, undefined)
+ end.
+
+
%% Internal API
View
48 src/playdar_websocket_api.erl
@@ -0,0 +1,48 @@
+-module(playdar_websocket_api).
+-include("playdar.hrl").
+
+-export([req/2]).
+
+req(WSReq, _Docroot) ->
+ ?LOG(info, "ws api handler fired for ~s", [WSReq:get(path)]),
+ Sender = spawn_link(fun()->sender(WSReq)end),
+ Data = WSReq:get_data(),
+
+ case (catch mochijson2:decode(Data)) of
+ {struct, L} ->
+ case proplists:get_value(<<"method">>, L) of
+ <<"resolve">> -> resolve(WSReq, L, Sender);
+ Err ->
+ ?LOG(warning, "Unhandled method in ws call: ~s", [Err]),
+ WSReq:send(mochijson2:encode({struct,[{<<"method">>,<<"error">>}, {<<"msg">>,<<"unknown method">>}]})),
+ req(WSReq, undefined)
+ end;
+ _ ->
+ ?LOG(warning, "Failed to decode websocket data: ~s", [Data])
+ end.
+
+sender(WSReq) ->
+ % we'll construct a standard (polling based) results array of all results
+ % just so i don't have to change any JS interfaces for now:
+ receive
+ Qid ->
+ %WSReq:send(M),
+ {R, _Qry, _Solved} = playdar_resolver:results(Qid),
+ Results = {struct, [{<<"qid">>, Qid}, {<<"method">>,<<"results">>}, {<<"results">>, R}]},
+ WSReq:send( mochijson2:encode(Results) ),
+ sender(WSReq)
+ end.
+
+
+resolve(WSReq, L, Sender) ->
+ Qid = proplists:get_value(<<"qid">>, L),
+ Qry = #qry{ qid = Qid, obj = {struct, L}, local = true },
+ Cbfun = fun(_StrR) ->
+ %Bin = mochijson2:encode({struct,[{<<"method">>,<<"result">>},{<<"qid">>,Qid}|R]}),
+ %?LOG(info, "ws: results callback fired for: '~s'", [Bin]),
+ Sender ! Qid
+ end,
+ ?LOG(info, "ws: resolve",[]),
+ Qid = playdar_resolver:dispatch(Qry, [Cbfun]),
+ req(WSReq, undefined).
+

0 comments on commit 6228815

Please sign in to comment.
Something went wrong with that request. Please try again.