Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Fixed problems when upgrading long-polling to websockets transport

  • Loading branch information...
commit 9ae332b189453c5b3a476be92975665bc21b9956 1 parent 02bc841
@antoniogarrote authored
Showing with 150 additions and 94 deletions.
  1. +57 −49 erlfaye/src/erlfaye_api.erl
  2. +93 −45 erlfaye/src/erlfaye_request.erl
View
106 erlfaye/src/erlfaye_api.erl
@@ -38,15 +38,20 @@ cache_loop(Events, TransportPid) ->
error_logger:warning_msg(">>>> CACHE EVENT ~p TransportPid ~p ~n",[Event, TransportPid]),
if
TransportPid =/= 0 ->
- error_logger:warning_msg(">>>> CACHE HINTING TRANSPORT ~p ~n",[TransportPid]),
- TransportPid ! hint;
+ ProcessAlive = is_process_alive(TransportPid),
+ if ProcessAlive == true ->
+ error_logger:warning_msg(">>>> CACHE HINTING TRANSPORT ~p ~n",[TransportPid]),
+ TransportPid ! {hint,self()};
+ ProcessAlive == false ->
+ error_logger:warning_msg(">>>> CACHE NOT HINTING TRANSPORT (DEAD) ~p ~n",[TransportPid])
+ end;
true ->
- error_logger:warning_msg(">>>> CACHE NOT HINTING TRANSPORT ~p ~n",[TransportPid])
+ error_logger:warning_msg(">>>> CACHE NOT HINTING TRANSPORT (0) ~p ~n",[TransportPid])
end,
error_logger:warning_msg(">>>> CACHE LOOPING IN EVENT ~n",[]),
cache_loop([Event | Events], TransportPid);
flush ->
- error_logger:warning_msg(">>>> CACHE FLUSH ~n",[]),
+ error_logger:warning_msg(">>>> CACHE FLUSH ~p ~n",[TransportPid]),
TransportPid ! {cachedEvents, lists:reverse(Events)},
cache_loop([], TransportPid);
{setpid, Pid} ->
@@ -117,29 +122,27 @@ replace_connection(ClientId, Pid, NewState) ->
replace_connection(ClientId, Pid, 0, NewState).
replace_connection(ClientId, Pid, CachePid, NewState) ->
E = #connection{client_id=ClientId, pid=Pid, state=NewState, websocket_pid=0},
- F1 = fun() -> mnesia:read({connection, ClientId}) end,
- {Status, F2} = case mnesia:transaction(F1) of
- {atomic, EA} ->
- error_logger:warning_msg("READ FROM MNESIA ~p~n",[EA]),
- case EA of
- [] ->
- ECPid = E#connection{cache_pid=CachePid},
- {new, fun() -> mnesia:write(ECPid) end};
- [#connection{state=State, websocket_pid=WebsocketPid, cache_pid=ActualCachePid}] ->
- ECPid = E#connection{cache_pid=ActualCachePid, websocket_pid=WebsocketPid},
- case State of
- handshake ->
- {replaced_hs, fun() -> mnesia:write(ECPid) end};
- _ ->
- {replaced, fun() -> mnesia:write(ECPid) end}
- end
- end;
- _ ->
- {new, fun() -> mnesia:write(E) end}
- end,
-
- case mnesia:transaction(F2) of
- {atomic, ok} -> {ok, Status};
+ F1 = fun() -> case mnesia:read({connection, ClientId}) of
+ [] ->
+ ECPid = E#connection{cache_pid=CachePid},
+ error_logger:warning_msg(" -- REPLACING CONNECTION AS IS ~p ~n",[ECPid]),
+ {new, mnesia:write(ECPid)};
+ [#connection{state=State, websocket_pid=WebsocketPid, cache_pid=ActualCachePid}] ->
+ ECPid = E#connection{cache_pid=ActualCachePid, websocket_pid=WebsocketPid},
+ error_logger:warning_msg(" -- REPLACING CONNECTION ~p ~n",[ECPid]),
+ case State of
+ handshake ->
+ {replaced_hs, mnesia:write(ECPid)};
+ _ ->
+ {replaced, mnesia:write(ECPid)}
+ end;
+ _ ->
+ error_logger:warning_msg(" -- REPLACING CONNECTION AS IS (ERROR???) ~p ~n",[E]),
+ {new, mnesia:write(E)}
+ end
+ end,
+ case mnesia:transaction(F1) of
+ {atomic, {Status, _} } -> {ok, Status};
_ -> error
end.
@@ -147,28 +150,31 @@ replace_connection_ws(ClientId, Pid, NewState) ->
replace_connection_ws(ClientId, Pid, 0, NewState).
replace_connection_ws(ClientId, Pid, CachePid, NewState) ->
E = #connection{client_id=ClientId, pid=Pid, state=NewState, websocket_pid=Pid},
- F1 = fun() -> mnesia:read({connection, ClientId}) end,
- {Status, F2} = case mnesia:transaction(F1) of
- {atomic, EA} ->
- case EA of
- [] ->
- ECPid = E#connection{cache_pid=CachePid},
- {new, fun() -> mnesia:write(ECPid) end};
- [#connection{state=State, cache_pid=ActualCachePid}] ->
- ECPid = E#connection{cache_pid=ActualCachePid},
- case State of
- handshake ->
- {replaced_hs, fun() -> mnesia:write(ECPid) end};
- _ ->
- {replaced, fun() -> mnesia:write(ECPid) end}
- end
- end;
- _ ->
- {new, fun() -> mnesia:write(E) end}
- end,
-
- case mnesia:transaction(F2) of
- {atomic, ok} -> {ok, Status};
+ F1 = fun() ->
+ case mnesia:read({connection, ClientId}) of
+ [] ->
+ ECPid = E#connection{cache_pid=CachePid},
+ error_logger:warning_msg(" -- WS REPLACING CONNECTION AS IS ~p ~n",[ECPid]),
+ {new, mnesia:write(ECPid)};
+ [#connection{state=State, cache_pid=ActualCachePid}] ->
+ ECPid = E#connection{cache_pid=ActualCachePid},
+ error_logger:warning_msg(" -- WS REPLACING CONNECTION ~p ~n",[ECPid]),
+ Result = case State of
+ handshake ->
+ {replaced_hs, mnesia:write(ECPid)};
+ _ ->
+ {replaced, mnesia:write(ECPid)}
+ end,
+ error_logger:warning_msg("WS (~p) UPDATING CACHE WHEN RECURRING AFTER CONNECT ~p ~n",[self(), ActualCachePid]),
+ ActualCachePid ! {setpid, Pid},
+ Result;
+ _ ->
+ error_logger:warning_msg(" -- WS REPLACING CONNECTION AS IS (ERROR??) ~p ~n",[E]),
+ {new, mnesia:write(E)}
+ end
+ end,
+ case mnesia:transaction(F1) of
+ {atomic, {Status,_}} -> {ok, Status};
_ -> error
end.
@@ -231,6 +237,7 @@ connection_event_receiver_pid(ClientId) ->
error_logger:warning_msg("EVENT RECEIVER IS WEBSOCKET: ~p ~n",[WebsocketPid]),
WebsocketPid;
undefined ->
+ error_logger:warning_msg("EVENT RECEIVER IS NIL: ~n",[]),
undefined
end.
@@ -404,6 +411,7 @@ deliver_to_single_channel(Channel, Data) ->
error_logger:warning_msg("NOT HANDLERS FOUND FOR CHANNEL: ~p ~n",[Channel]),
ok;
{atomic, [{channel, Channel, Ids}] } ->
+ error_logger:warning_msg("DELIVERING TO: ~p CLIENTS ~n",[length(Ids)]),
[send_event(connection_event_receiver_pid(ClientId), Event) || ClientId <- Ids],
ok;
_ ->
View
138 erlfaye/src/erlfaye_request.erl
@@ -38,6 +38,7 @@ handle(Req, 'GET') ->
case erlfaye_websockets_api:try_to_upgrade(Req) of
false -> done;
WebSocket ->
+ error_logger:warning_msg("WS (~p) UPDGRADING OK! ~n",[self()]),
websocket_loop(WebSocket),
done
end;
@@ -71,13 +72,14 @@ shouldQueue([{struct, Fields}]) ->
case lists:keyfind(channel,1,Fields) of
false ->
false;
- _Tuple ->
- true
+ {channel, <<"/meta/connect">>} ->
+ true;
+ _ -> false
end;
shouldQueue([_H|_T]) ->
false.
-loop(Resp, #state{events=Events, id=Id, callback=Callback} = State, CachePid) ->
+loop(Resp, #state{events=Events, id=Id} = State, CachePid, Callback) ->
% set up the connection between the cache and the client handler
CachePid ! {setpid, self()},
@@ -86,26 +88,29 @@ loop(Resp, #state{events=Events, id=Id, callback=Callback} = State, CachePid) ->
receive
{cachedEvents, CachedEvents} ->
+ error_logger:warning_msg("********** CACHED EVENTS: ~p~n~p~n~n",[Id, CachedEvents]),
AllEvents = CachedEvents++lists:reverse(Events),
ShouldQueue = shouldQueue(AllEvents),
+ error_logger:warning_msg("********** should queue?: ~p ~n -> ~n ~p~n~n",[AllEvents, ShouldQueue]),
if
ShouldQueue == false ->
error_logger:warning_msg("********** SENDING: ~p~n~p~n~n",[Id,AllEvents]),
- send(Resp, AllEvents, Callback);
+ send(Resp, AllEvents, Callback),
+ exit(normal);
ShouldQueue == true ->
receive
- hint ->
+ {hint,_} ->
error_logger:warning_msg("********** RECEIVED HINT: ~p~n",[Id]),
- loop(Resp, State#state{events=AllEvents}, CachePid);
+ loop(Resp, State#state{events=AllEvents}, CachePid,Callback);
stop ->
error_logger:warning_msg("********** RECEIVED STOP: ~p~n",[Id]),
CachePid ! disconnect,
- disconnect(Resp, Id, undefined, State);
+ disconnect(Resp, Id, undefined, Callback);
{stop, MessageId} ->
error_logger:warning_msg("********** RECEIVED STOP: ~p~n",[Id]),
CachePid ! disconnect,
- disconnect(Resp, Id, MessageId, State)
+ disconnect(Resp, Id, MessageId, Callback)
after State#state.timeout ->
error_logger:warning_msg("********** DISCONECTING: ~p~n",[Id]),
CachePid ! disconnect,
@@ -125,7 +130,7 @@ disconnect(Resp, Id, MessageId, Callback) ->
L = {struct, [{channel, <<"/meta/disconnect">>}, {successful, true}, {clientId, Id}]},
Msg = case MessageId of
undefined -> L ;
- true -> [{id, MessageId} | L]
+ _ -> [{id, MessageId} | L]
end,
Chunk = callback_wrapper(mochijson2:encode(Msg), Callback),
Resp:write_chunk(Chunk),
@@ -137,41 +142,69 @@ disconnect(Resp, Id, MessageId, Callback) ->
%%====================================================================
websocket_loop(WebSocket) ->
- websocket_loop(WebSocket,[]).
-websocket_loop(WebSocket, Cache) ->
+ websocket_loop(WebSocket,0,[],0).
+websocket_loop(WebSocket, MaybeCachePid, Cache,ClientId) ->
+ if
+ MaybeCachePid =/= 0 ->
+ error_logger:warning_msg("WS (~p) FLUSHING CACHE ~p ~n",[self(), MaybeCachePid]),
+ MaybeCachePid ! flush;
+ true ->
+ error_logger:warning_msg("WS (~p) NOT FLUSHING CACHE ~p ~n",[self(), MaybeCachePid]),
+ default
+ end,
receive
+ {cachedEvents, CachedEvents} ->
+ AllEvents = Cache ++ CachedEvents,
+ error_logger:warning_msg("WS (~p) SENDING RECEIVED CACHED EVENTS ~p ~n",[self(),AllEvents]),
+ [erlfaye_websockets_api:send_data(WebSocket,mochijson2:encode(Event)) || Event <- AllEvents],
+ websocket_loop(WebSocket,MaybeCachePid,[],ClientId);
+ {hint, CachePid} ->
+ error_logger:warning_msg("WS (~p) SENDING RECEIVED HINT FROM ~p ~n",[self(), CachePid]),
+ CachePid ! flush,
+ case MaybeCachePid of
+ 0 ->
+ error_logger:warning_msg("WS (~p) UPDATING CACHE PID ~p ~n",[self(), CachePid]),
+ websocket_loop(WebSocket,CachePid,Cache,ClientId);
+ _ ->
+ websocket_loop(WebSocket,MaybeCachePid,Cache,ClientId)
+ end;
{event, Event} ->
+ error_logger:warning_msg("WS (~p) SENDING RECEIVED EVENT ~n",[self(), Event]),
erlfaye_websockets_api:send_data(WebSocket, mochijson2:encode(Event)),
- websocket_loop(WebSocket,Cache);
+ websocket_loop(WebSocket,MaybeCachePid,Cache,ClientId);
{tcp_closed,_} ->
- error_logger:warning_msg("STOPPING WEBSOCKET PROCESS DUE TO REMOTE CLOSE EVENT ~n",[]),
+ error_logger:warning_msg("WS (~p) STOPPING WEBSOCKET PROCESS DUE TO REMOTE CLOSE EVENT ~n",[self()]),
clean_websocket(self()),
done;
{tcp, _, Data} ->
case Data of
undefined ->
+ error_logger:warning_msg("WS (~p) GOT TCP DATA UNDEFINED ~n",[self()]),
receive
after 500 ->
- websocket_loop(WebSocket,Cache)
+ websocket_loop(WebSocket,MaybeCachePid,Cache,ClientId)
end;
_ ->
JsonObj = lists:flatten(lists:map(fun (X) -> mochijson2:decode(X) end, erlfaye_websockets_api:unframe(binary_to_list(Data)))),
- {Results,CacheP} = case JsonObj of
- Array when is_list(Array) ->
- Channels = [ get_json_map_val(<<"channel">>, X) || X <- Array ],
- case Channels of
- [<<"/meta/connect">>] ->
- % we have to cache the connect messages to avoid
- % clients overflowing the server with connect requests
- {[continue], [lists:nth(1,Array)|Cache]};
- _ ->
- {[process_ws_cmd(WebSocket, get_json_map_val(<<"channel">>, X), X) || X <- (lists:reverse(Cache)++Array) ], []}
- end
- end,
+ error_logger:warning_msg("WS (~p) GOT TCP DATA ~p ~n",[self(),JsonObj]),
+ {Results,CacheP,ClientIdPP} = case JsonObj of
+ Array when is_list(Array) ->
+ AllMessages = (lists:reverse(Cache)++Array),
+ error_logger:warning_msg("WS (~p) TRYING TO UPDATE CLIENTS... ~n",[self()]),
+ ClientIdP = update_connection_client_ws(ClientId,AllMessages),
+ Channels = [ get_json_map_val(<<"channel">>, X) || X <- AllMessages ],
+ error_logger:warning_msg("WS (~p) ALL CHANNELS: ~p ~n",[self(),Channels]),
+ case Channels of
+ [<<"/meta/connect">>] ->
+ {[continue], [lists:nth(1,Array)|Cache], ClientIdP};
+ _ ->
+ {[process_ws_cmd(WebSocket, get_json_map_val(<<"channel">>, X), X) || X <- (lists:reverse(Cache)++Array)], [], ClientIdP}
+ end
+ end,
case should_continue(Results) of
true ->
- websocket_loop(WebSocket, CacheP);
+ websocket_loop(WebSocket,MaybeCachePid,CacheP,ClientIdPP);
false ->
clean_websocket(self()),
done
@@ -179,6 +212,21 @@ websocket_loop(WebSocket, Cache) ->
end
end.
+update_connection_client_ws(0,[]) ->
+ 0;
+update_connection_client_ws(0,[Struct|T]) ->
+ error_logger:warning_msg("WS (~p) TRYING TO UPDATE CLIENTS SEARCHING IN ~p --> ~p ~n",[self(), Struct,get_json_map_val(<<"clientId">>, Struct)]),
+ case get_json_map_val(<<"clientId">>, Struct) of
+ undefined ->
+ update_connection_client_ws(0,T);
+ ClientId ->
+ error_logger:warning_msg("WS (~p) TRYING TO UPDATE CLIENTS FOUND! replacing: ~n",[self()]),
+ erlfaye_api:replace_connection_ws(ClientId, self(), connected),
+ ClientId
+ end;
+update_connection_client_ws(ClientPid,_) ->
+ ClientPid.
+
clean_websocket(Pid) ->
case erlfaye_api:connection_by_websocket_pid(Pid) of
undefined ->
@@ -207,11 +255,11 @@ process_msg(Req, Struct, Callback) ->
%% WEBSOCKETS COMMANDS
process_ws_cmd(WebSocket, <<"/meta/connect">> = Channel, Struct) ->
- error_logger:warning_msg("WS CONNECT ~p~n",[Struct]),
+ error_logger:warning_msg("WS CONNECT (~p) ~p ~n",[self(),Struct]),
ClientId = get_json_map_val(<<"clientId">>, Struct),
MessageId = get_json_map_val(<<"id">>, Struct),
- erlfaye_api:replace_connection_ws(ClientId, self(), connected),
+ %erlfaye_api:replace_connection_ws(ClientId, self(), connected),
JsonRespFields = if
MessageId == undefined ->
@@ -224,19 +272,19 @@ process_ws_cmd(WebSocket, <<"/meta/connect">> = Channel, Struct) ->
continue;
process_ws_cmd(WebSocket,<<"/meta/subscribe">> = Channel, Struct) ->
- error_logger:warning_msg("WS SUBSCRIBE channel ~p ~p~n",[Channel, Struct]),
+ error_logger:warning_msg("WS SUBSCRIBE (~p) channel ~p ~p~n",[self(),Channel, Struct]),
JsonResp = process_subscribe(Channel,Struct),
erlfaye_websockets_api:send_data(WebSocket, mochijson2:encode([JsonResp])),
continue;
process_ws_cmd(WebSocket, <<"/meta/unsubscribe">> = Channel, Struct) ->
- error_logger:warning_msg("WS UNSUBSCRIBE channel ~p ~p~n",[Channel, Struct]),
+ error_logger:warning_msg("WS UNSUBSCRIBE (~p) channel ~p ~p~n",[self(), Channel, Struct]),
JsonResp = process_unsubscribe(Channel,Struct),
erlfaye_websockets_api:send_data(WebSocket, mochijson2:encode([JsonResp])),
continue;
process_ws_cmd(WebSocket, <<"/meta/disconnect">> = Channel, Struct) ->
- error_logger:warning_msg("WS UNSUBSCRIBE channel ~p ~p~n",[Channel, Struct]),
+ error_logger:warning_msg("WS DISCONNECT (~p) channel ~p ~p~n",[self(), Channel, Struct]),
ClientId = get_json_map_val(<<"clientId">>, Struct),
MessageId = get_json_map_val(<<"id">>, Struct),
JsonResp = if
@@ -252,14 +300,14 @@ process_ws_cmd(WebSocket, <<"/meta/disconnect">> = Channel, Struct) ->
% we must stop the websocket loop
stop;
process_ws_cmd(WebSocket, Channel, Struct) ->
- error_logger:warning_msg("WS PUBLISH channel ~p ~p~n",[Channel, Struct]),
+ error_logger:warning_msg("WS PUBLISH (~p) channel ~p ~p~n",[self(), Channel, Struct]),
JsonResp = process_event_channel(Channel,Struct),
erlfaye_websockets_api:send_data(WebSocket, mochijson2:encode([JsonResp])),
continue.
%% LONG POLLING COMMANDS
-process_cmd(Req, <<"/meta/handshake">> = Channel, Struct, _) ->
+process_cmd(Req, <<"/meta/handshake">> = Channel, Struct, Callback) ->
error_logger:warning_msg("HANDSHAKE: ~p~n",[Struct]),
%% extract info from the request
Id = erlfaye_api:generate_id(),
@@ -270,23 +318,23 @@ process_cmd(Req, <<"/meta/handshake">> = Channel, Struct, _) ->
erlfaye_api:replace_connection(Id, 0, CachePid, handshake),
- % Advice = {struct, [{reconnect, "retry"},
- % {interval, 5000}]},
+ Advice = {struct, [{reconnect, "retry"},
+ {interval, 2000}]},
% build response
JsonRespFields = [{channel, Channel},
{version, 1.0},
{supportedConnectionTypes, [<<"websocket">>, <<"long-polling">>,<<"callback-polling">>]},
{clientId, Id},
- {successful, true}],
- %{advice, Advice}],
+ {successful, true},
+ {advice, Advice}],
JsonResp = if
MessageId == undefined ->
JsonRespFields ;
true ->
[{id, MessageId} | JsonRespFields]
end,
- Req:respond({200, [{"ContentType","application/json"}], mochijson2:encode([{struct, JsonResp}])}),
+ Req:respond({200, [{"ContentType","application/json"}], callback_wrapper(mochijson2:encode([{struct, JsonResp}]), Callback)}),
done;
process_cmd(Req, <<"/meta/connect">> = Channel, Struct, Callback) ->
@@ -312,9 +360,9 @@ process_cmd(Req, <<"/meta/connect">> = Channel, Struct, Callback) ->
Resp = Req:respond({200, [], chunked}),
loop(Resp, #state{id = ClientId,
connection_type = ConnectionType,
- events = [Msg],
- callback = Callback},
- CachePid);
+ events = [Msg]},
+ CachePid,
+ Callback);
% don't reply immediately to new connect message.
% instead wait. when new message is received, reply to connect and
% include the new message. This is acceptable given bayeux spec. see section 4.2.2
@@ -326,9 +374,9 @@ process_cmd(Req, <<"/meta/connect">> = Channel, Struct, Callback) ->
Resp = Req:respond({200, [], chunked}),
loop(Resp, #state{id = ClientId,
connection_type = ConnectionType,
- events = [Msg],
- callback = Callback},
- CachePid);
+ events = [Msg]},
+ CachePid,
+ Callback);
_ ->
{struct, [{successful, false} | L]}
end;
Please sign in to comment.
Something went wrong with that request. Please try again.