Skip to content

Commit

Permalink
Give the ListenerPid to the protocol on startup
Browse files Browse the repository at this point in the history
Also sends a message 'shoot' that can be received by the protocol
to make sure Cowboy has had enough time to fully initialize the
socket. This message should be received before any socket-related
operations are performed.

WebSocket request connections are now moved from the pool 'default'
to the pool 'websocket', meaning we can have a lot of running
WebSockets despite having a low 'max_connections' setting.
  • Loading branch information
Loïc Hoguin committed Aug 10, 2011
1 parent 56369d5 commit 43d14b5
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 24 deletions.
14 changes: 9 additions & 5 deletions README.md
Expand Up @@ -219,19 +219,23 @@ One of the strengths of Cowboy is of course that you can use it with any
protocol you want. The only downside is that if it's not HTTP, you'll
probably have to write the protocol handler yourself.

The only exported function a protocol handler needs is the start_link/3
function, with arguments Socket, Transport and Opts. Socket is of course
the client socket; Transport is the module name of the chosen transport
The only exported function a protocol handler needs is the start_link/4
function, with arguments ListenerPid, Socket, Transport and Opts. ListenerPid
is the pid to the listener's gen_server, managing the connections. Socket is of
course the client socket; Transport is the module name of the chosen transport
handler and Opts is protocol options defined when starting the listener.
Anything you do past this point is up to you!

After initializing your protocol, it is recommended to wait to receive a message
containing the atom 'shoot', as it will ensure Cowboy has been able to fully
initialize the socket. Anything you do past this point is up to you!

You should definitely look at the cowboy_http_protocol module for a great
example of fast request handling if you need to. Otherwise it's probably
safe to use `{active, once}` mode and handle everything as it comes.

Note that while you technically can run a protocol handler directly as a
gen_server or a gen_fsm, it's probably not a good idea, as the only call
you'll ever receive from Cowboy is the start_link/3 call. On the other
you'll ever receive from Cowboy is the start_link/4 call. On the other
hand, feel free to write a very basic protocol handler which then forwards
requests to a gen_server or gen_fsm. By doing so however you must take
care to supervise their processes as Cowboy only knows about the protocol
Expand Down
3 changes: 2 additions & 1 deletion src/cowboy_acceptor.erl
Expand Up @@ -36,10 +36,11 @@ acceptor(LSocket, Transport, Protocol, Opts, MaxConns, ListenerPid, ReqsSup) ->
case Transport:accept(LSocket, 2000) of
{ok, CSocket} ->
{ok, Pid} = supervisor:start_child(ReqsSup,
[CSocket, Transport, Protocol, Opts]),
[ListenerPid, CSocket, Transport, Protocol, Opts]),
Transport:controlling_process(CSocket, Pid),
{ok, NbConns} = cowboy_listener:add_connection(ListenerPid,
default, Pid),
Pid ! shoot,
limit_reqs(ListenerPid, NbConns, MaxConns);
{error, timeout} ->
ignore;
Expand Down
22 changes: 12 additions & 10 deletions src/cowboy_http_protocol.erl
Expand Up @@ -31,12 +31,13 @@
%% @see cowboy_http_handler
-module(cowboy_http_protocol).

-export([start_link/3]). %% API.
-export([init/3, parse_request/1]). %% FSM.
-export([start_link/4]). %% API.
-export([init/4, parse_request/1]). %% FSM.

-include("include/http.hrl").

-record(state, {
listener :: pid(),
socket :: inet:socket(),
transport :: module(),
dispatch :: cowboy_dispatcher:dispatch_rules(),
Expand All @@ -51,20 +52,21 @@
%% API.

%% @doc Start an HTTP protocol process.
-spec start_link(inet:socket(), module(), any()) -> {ok, pid()}.
start_link(Socket, Transport, Opts) ->
Pid = spawn_link(?MODULE, init, [Socket, Transport, Opts]),
-spec start_link(pid(), inet:socket(), module(), any()) -> {ok, pid()}.
start_link(ListenerPid, Socket, Transport, Opts) ->
Pid = spawn_link(?MODULE, init, [ListenerPid, Socket, Transport, Opts]),
{ok, Pid}.

%% FSM.

%% @private
-spec init(inet:socket(), module(), any()) -> ok.
init(Socket, Transport, Opts) ->
-spec init(pid(), inet:socket(), module(), any()) -> ok.
init(ListenerPid, Socket, Transport, Opts) ->
Dispatch = proplists:get_value(dispatch, Opts, []),
MaxEmptyLines = proplists:get_value(max_empty_lines, Opts, 5),
Timeout = proplists:get_value(timeout, Opts, 5000),
wait_request(#state{socket=Socket, transport=Transport,
receive shoot -> ok end,
wait_request(#state{listener=ListenerPid, socket=Socket, transport=Transport,
dispatch=Dispatch, max_empty_lines=MaxEmptyLines, timeout=Timeout}).

%% @private
Expand Down Expand Up @@ -189,14 +191,14 @@ dispatch(Req=#http_req{host=Host, path=Path},
end.

-spec handler_init(#http_req{}, #state{}) -> ok.
handler_init(Req, State=#state{
handler_init(Req, State=#state{listener=ListenerPid,
transport=Transport, handler={Handler, Opts}}) ->
try Handler:init({Transport:name(), http}, Req, Opts) of
{ok, Req2, HandlerState} ->
handler_loop(HandlerState, Req2, State);
%% @todo {upgrade, transport, Module}
{upgrade, protocol, Module} ->
Module:upgrade(Handler, Opts, Req)
Module:upgrade(ListenerPid, Handler, Opts, Req)
catch Class:Reason ->
error_terminate(500, State),
error_logger:error_msg(
Expand Down
7 changes: 4 additions & 3 deletions src/cowboy_http_websocket.erl
Expand Up @@ -23,7 +23,7 @@
%% </ul>
-module(cowboy_http_websocket).

-export([upgrade/3]). %% API.
-export([upgrade/4]). %% API.
-export([handler_loop/4]). %% Internal.

-include("include/http.hrl").
Expand All @@ -45,8 +45,9 @@
%% You do not need to call this function manually. To upgrade to the WebSocket
%% protocol, you simply need to return <em>{upgrade, protocol, {@module}}</em>
%% in your <em>cowboy_http_handler:init/3</em> handler function.
-spec upgrade(module(), any(), #http_req{}) -> ok.
upgrade(Handler, Opts, Req) ->
-spec upgrade(pid(), module(), any(), #http_req{}) -> ok.
upgrade(ListenerPid, Handler, Opts, Req) ->
cowboy_listener:move_connection(ListenerPid, websocket, self()),
EOP = binary:compile_pattern(<< 255 >>),
case catch websocket_upgrade(#state{handler=Handler, opts=Opts, eop=EOP}, Req) of
{ok, State, Req2} -> handler_init(State, Req2);
Expand Down
19 changes: 18 additions & 1 deletion src/cowboy_listener.erl
Expand Up @@ -17,7 +17,7 @@
-behaviour(gen_server).

-export([start_link/0, stop/1,
add_connection/3, remove_connection/2, wait/3]). %% API.
add_connection/3, move_connection/3, remove_connection/2, wait/3]). %% API.
-export([init/1, handle_call/3, handle_cast/2,
handle_info/2, terminate/2, code_change/3]). %% gen_server.

Expand Down Expand Up @@ -54,6 +54,11 @@ stop(ServerPid) ->
add_connection(ServerPid, Pool, ConnPid) ->
gen_server:call(ServerPid, {add_connection, Pool, ConnPid}).

%% @doc Move a connection from one pool to another.
-spec move_connection(pid(), atom(), pid()) -> ok.
move_connection(ServerPid, DestPool, ConnPid) ->
gen_server:cast(ServerPid, {move_connection, DestPool, ConnPid}).

%% @doc Remove the given connection from its pool.
-spec remove_connection(pid(), pid()) -> ok.
remove_connection(ServerPid, ConnPid) ->
Expand Down Expand Up @@ -107,6 +112,18 @@ handle_call(_Request, _From, State) ->

%% @private
-spec handle_cast(_, State) -> {noreply, State}.
handle_cast({move_connection, DestPool, ConnPid}, State=#state{
req_pools=Pools, reqs_table=ReqsTable}) ->
{MonitorRef, SrcPool} = ets:lookup_element(ReqsTable, ConnPid, 2),
ets:insert(ReqsTable, {ConnPid, {MonitorRef, DestPool}}),
{SrcPool, SrcNbConns} = lists:keyfind(SrcPool, 1, Pools),
DestNbConns = case lists:keyfind(DestPool, 1, Pools) of
false -> 1;
{DestPool, NbConns} -> NbConns + 1
end,
Pools2 = lists:keydelete(SrcPool, 1, lists:keydelete(DestPool, 1, Pools)),
Pools3 = [{SrcPool, SrcNbConns - 1}, {DestPool, DestNbConns}|Pools2],
{noreply, State#state{req_pools=Pools3}};
handle_cast({remove_connection, ConnPid}, State) ->
State2 = remove_pid(ConnPid, State),
{noreply, State2};
Expand Down
9 changes: 5 additions & 4 deletions src/cowboy_requests_sup.erl
Expand Up @@ -16,7 +16,7 @@
-module(cowboy_requests_sup).
-behaviour(supervisor).

-export([start_link/0, start_request/4]). %% API.
-export([start_link/0, start_request/5]). %% API.
-export([init/1]). %% supervisor.

%% API.
Expand All @@ -25,9 +25,10 @@
start_link() ->
supervisor:start_link(?MODULE, []).

-spec start_request(inet:socket(), module(), module(), any()) -> {ok, pid()}.
start_request(Socket, Transport, Protocol, Opts) ->
Protocol:start_link(Socket, Transport, Opts).
-spec start_request(pid(), inet:socket(), module(), module(), any())
-> {ok, pid()}.
start_request(ListenerPid, Socket, Transport, Protocol, Opts) ->
Protocol:start_link(ListenerPid, Socket, Transport, Opts).

%% supervisor.

Expand Down

0 comments on commit 43d14b5

Please sign in to comment.