Skip to content

Commit

Permalink
reengineering of the listener process, using active instead of passiv…
Browse files Browse the repository at this point in the history
…e mode in request parsing, except for BODY where passive is still used.

added better support for request timeout
  • Loading branch information
ostinelli committed Sep 24, 2009
1 parent 3106877 commit 39493dc
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 72 deletions.
8 changes: 7 additions & 1 deletion README.txt
Expand Up @@ -63,11 +63,17 @@ API Documentation is available online on the Misultin's wiki: http://code.google
CHANGELOG
==========================================================================================================

0.3: - reengineering of the listener process, using active instead of passive mode in request parsing,
except for BODY where passive is still used [thanks to lev walkin]
- added better support for request timeout

0.2.2: - added .app file [thanks to Essien Ita Essien]
- simplified get_options [thanks to Essien Ita Essien]
- added ip address option [thanks to Essien Ita Essien]
- added ipv6 support
- bug correction on requests peer address and port being reset on open connections
- added recv_timeout option
- bug correction: requests peer address and port are now not reset on open connections multiple
requests

0.2.1: - added support for Content-Type that specifies charset in POST data [thanks to Tuncer Ayaz]
- added support for iolist in misultin_req:ok/1,2 and misultin_req:respond/2,3
Expand Down
4 changes: 2 additions & 2 deletions examples/misultin_get_variable.erl
Expand Up @@ -45,9 +45,9 @@ handle_http(Req) ->
Value = proplists:get_value("value", Args),
case Value of
undefined ->
Req:ok([{"Content-Type", "text/xml"}], "<http_test><error>no value specified</error></http_test>");
Req:ok([{"Content-Type", "text/xml"}], "<misultin_test><error>no value specified</error></misultin_test>");
_ ->
Req:ok([{"Content-Type", "text/xml"}], "<http_test><value>~s</value></http_test>", [Value])
Req:ok([{"Content-Type", "text/xml"}], "<misultin_test><value>~s</value></misultin_test>", [Value])
end.


Expand Down
1 change: 1 addition & 0 deletions include/misultin.hrl
Expand Up @@ -31,6 +31,7 @@
-define(FORBIDDEN_403, "HTTP/1.1 403 Forbidden\r\n\r\n").
-define(NOT_FOUND_404, "HTTP/1.1 404 Not Found\r\n\r\n").
-define(CONTENT_LENGTH_REQUIRED_411, "HTTP/1.1 411 Length Required\r\n\r\n").
-define(REQUEST_TIMEOUT_408, "HTTP/1.1 408 Request Timeout\r\n\r\n").

% define debug
-ifdef(debug).
Expand Down
2 changes: 1 addition & 1 deletion src/misultin.app
@@ -1,7 +1,7 @@
{application, misultin,
[
{description, "Lightweight HTTP Server Library"},
{vsn, '0.2.2'},
{vsn, '0.3'},
{modules, [misultin_socket, misultin_req, misultin]},
{registered, [misultin]},
{env, []},
Expand Down
32 changes: 9 additions & 23 deletions src/misultin.erl
Expand Up @@ -32,7 +32,7 @@
% ==========================================================================================================
-module(misultin).
-behaviour(gen_server).
-vsn('0.2.2').
-vsn('0.3').

% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
Expand All @@ -47,9 +47,9 @@
-record(state, {
listen_socket,
port,
acceptor,
loop,
recv_timeout
recv_timeout,
acceptor
}).

% includes
Expand Down Expand Up @@ -113,15 +113,11 @@ init([Options]) ->
inet6
end,
% ok, no error found in options -> create listening socket.
% {backlog, 30} specifies the length of the OS accept queue
% {packet, http} puts the socket into http mode. This makes the socket wait for a HTTP Request line,
% and if this is received to immediately switch to receiving HTTP header lines. The socket stays in header
% mode until the end of header marker is received (CR,NL,CR,NL), at which time it goes back to wait for a
% following HTTP Request line.
case gen_tcp:listen(Port, [binary, {packet, http}, InetOpt, {ip, Ip}, {reuseaddr, true}, {active, false}, {backlog, Backlog}]) of
{ok, ListenSocket} ->
% create first acceptor process
?DEBUG(debug, "creating first acceptor process", []),
% start listening
?DEBUG(debug, "starting listener loop", []),
% create acceptor
AcceptorPid = misultin_socket:start_link(ListenSocket, Port, Loop, RecvTimeout),
{ok, #state{listen_socket = ListenSocket, port = Port, loop = Loop, acceptor = AcceptorPid, recv_timeout = RecvTimeout}};
{error, Reason} ->
Expand Down Expand Up @@ -171,22 +167,12 @@ handle_cast(_Msg, State) ->
% Description: Handling all non call/cast messages.
% ----------------------------------------------------------------------------------------------------------

% The current acceptor has died normally, ignore
handle_info({'EXIT', Pid, normal}, #state{acceptor = Pid} = State) ->
?DEBUG(debug, "current acceptor has died normally", []),
{noreply, State};

% The current acceptor has died abnormally, wait a little and try again
handle_info({'EXIT', Pid, _Abnormal}, #state{listen_socket = ListenSocket, port = Port, loop = Loop, acceptor = Pid, recv_timeout = RecvTimeout} = State) ->
?DEBUG(warning, "current acceptor has died with reason: ~p, respawning", [_Abnormal]),
% The current acceptor has died, respawn
handle_info({'EXIT', Pid, _Reason}, #state{listen_socket = ListenSocket, port = Port, loop = Loop, acceptor = Pid, recv_timeout = RecvTimeout} = State) ->
?DEBUG(warning, "acceptor has died with reason: ~p, respawning", [_Abnormal]),
AcceptorPid = misultin_socket:start_link(ListenSocket, Port, Loop, RecvTimeout),
{noreply, State#state{acceptor = AcceptorPid}};

% An acceptor has died, ignore
handle_info({'EXIT', _Pid, _Reason}, State) ->
?DEBUG(debug, "the acceptor has died with reason: ~p", [_Reason]),
{noreply, State};

% handle_info generic fallback (ignore)
handle_info(_Info, State) ->
?DEBUG(warning, "received unknown info message: ~p", [_Info]),
Expand Down
2 changes: 1 addition & 1 deletion src/misultin_req.erl
Expand Up @@ -32,7 +32,7 @@
% POSSIBILITY OF SUCH DAMAGE.
% ==========================================================================================================
-module(misultin_req, [Req, SocketPid]).
-vsn('0.2.2').
-vsn('0.3').

% macros
-define(PERCENT, 37). % $\%
Expand Down
111 changes: 67 additions & 44 deletions src/misultin_socket.erl
Expand Up @@ -31,13 +31,13 @@
% POSSIBILITY OF SUCH DAMAGE.
% ==========================================================================================================
-module(misultin_socket).
-vsn('0.2.2').
-vsn('0.3').

% API
-export([start_link/4]).

% callbacks
-export([init/4]).
-export([listener/4]).

% internale
-export([socket_loop/1]).
Expand All @@ -62,23 +62,37 @@
% Function: {ok,Pid} | ignore | {error, Error}
% Description: Starts the socket.
start_link(ListenSocket, ListenPort, Loop, RecvTimeout) ->
proc_lib:spawn_link(?MODULE, init, [ListenSocket, ListenPort, Loop, RecvTimeout]).
proc_lib:spawn_link(?MODULE, listener, [ListenSocket, ListenPort, Loop, RecvTimeout]).

% Description: Initiates the socket.
init(ListenSocket, ListenPort, Loop, RecvTimeout) ->
% Function: {ok,Pid} | ignore | {error, Error}
% Description: Starts the socket.
listener(ListenSocket, ListenPort, Loop, RecvTimeout) ->
case catch gen_tcp:accept(ListenSocket) of
{ok, Socket} ->
?DEBUG(debug, "accepted an incoming TCP connection", []),
% Send the cast message to the listener process to create a new acceptor
misultin:create_acceptor(),
{ok, {Addr, Port}} = inet:peername(Socket),
C = #c{sock = Socket, port = ListenPort, loop = Loop, recv_timeout = RecvTimeout},
% jump to state 'request'
?DEBUG(debug, "jump to state request", []),
request(C, #req{peer_addr = Addr, peer_port = Port});
_Else ->
?DEBUG(error, "accept failed error: ~p", [_Else]),
exit({error, accept_failed})
{ok, Sock} ->
?DEBUG(debug, "accepted an incoming TCP connection, spawning controlling process", []),
Pid = spawn(fun () ->
receive
set ->
inet:setopts(Sock, [{active, true}]),
?DEBUG(debug, "activated controlling process", [])
after 60000 ->
exit({error, controlling_failed})
end,
% build connection record
{ok, {Addr, Port}} = inet:peername(Sock),
C = #c{sock = Sock, port = ListenPort, loop = Loop, recv_timeout = RecvTimeout},
% jump to state 'request'
?DEBUG(debug, "jump to state request", []),
request(C, #req{peer_addr = Addr, peer_port = Port})
end),
% set controlling process
gen_tcp:controlling_process(Sock, Pid),
Pid ! set,
% get back to accept loop
listener(ListenSocket, ListenPort, Loop, RecvTimeout);
_Else ->
?DEBUG(error, "accept failed error: ~p", [_Else]),
exit({error, accept_failed})
end.

% ============================ /\ API ======================================================================
Expand All @@ -87,46 +101,51 @@ init(ListenSocket, ListenPort, Loop, RecvTimeout) ->
% ============================ \/ INTERNAL FUNCTIONS =======================================================

% REQUEST: wait for a HTTP Request line. Transition to state headers if one is received.
request(#c{recv_timeout = RecvTimeout} = C, Req) ->
case gen_tcp:recv(C#c.sock, 0, RecvTimeout) of
{ok, {http_request, Method, Path, Version}} ->
request(#c{sock = Sock, recv_timeout = RecvTimeout} = C, Req) ->
receive
{http, Sock, {http_request, Method, Path, Version}} ->
?DEBUG(debug, "received full headers of a new HTTP packet", []),
headers(C, Req#req{vsn = Version, method = Method, uri = Path, connection = default_connection(Version)}, []);
{error, {http_error, "\r\n"}} ->
{http, Sock, {http_error, "\r\n"}} ->
request(C, Req);
{error, {http_error, "\n"}} ->
{http, Sock, {http_error, "\n"}} ->
request(C, Req);
_Other ->
?DEBUG(debug, "tcp recv normal error: ~p", [_Other]),
exit(normal)
{http, Sock, _Other} ->
?DEBUG(debug, "tcp normal error treating request: ~p", [_Other]),
exit(normal)
after RecvTimeout ->
?DEBUG(debug, "request timeout, sending error", []),
send(Sock, ?REQUEST_TIMEOUT_408)
end.

% HEADERS: collect HTTP headers. After the end of header marker transition to body state.
headers(C, Req, H) ->
headers(C, Req, H, 0).
headers(#c{recv_timeout = RecvTimeout} = C, Req, H, HeaderCount) when HeaderCount =< ?MAX_HEADERS_COUNT ->
case gen_tcp:recv(C#c.sock, 0, RecvTimeout) of
{ok, {http_header, _, 'Content-Length', _, Val}} ->
headers(#c{sock = Sock, recv_timeout = RecvTimeout} = C, Req, H, HeaderCount) when HeaderCount =< ?MAX_HEADERS_COUNT ->
receive
{http, Sock, {http_header, _, 'Content-Length', _, Val}} ->
headers(C, Req#req{content_length = Val}, [{'Content-Length', Val}|H], HeaderCount + 1);
{ok, {http_header, _, 'Connection', _, Val}} ->
{http, Sock, {http_header, _, 'Connection', _, Val}} ->
KeepAlive = keep_alive(Req#req.vsn, Val),
headers(C, Req#req{connection = KeepAlive}, [{'Connection', Val}|H], HeaderCount + 1);
{ok, {http_header, _, Header, _, Val}} ->
{http, Sock, {http_header, _, Header, _, Val}} ->
headers(C, Req, [{Header, Val}|H], HeaderCount + 1);
{error, {http_error, "\r\n"}} ->
headers(C, Req, H, HeaderCount + 1);
{error, {http_error, "\n"}} ->
headers(C, Req, H, HeaderCount + 1);
{ok, http_eoh} ->
{http, Sock, {http_error, "\r\n"}} ->
headers(C, Req, H, HeaderCount);
{http, Sock, {http_error, "\n"}} ->
headers(C, Req, H, HeaderCount);
{http, Sock, http_eoh} ->
body(C, Req#req{headers = lists:reverse(H)});
_Other ->
?DEBUG(debug, "tcp recv normal error: ~p", [_Other]),
{http, Sock, _Other} ->
?DEBUG(debug, "tcp normal error treating headers: ~p", [_Other]),
exit(normal)
after RecvTimeout ->
?DEBUG(debug, "headers timeout, sending error", []),
send(Sock, ?REQUEST_TIMEOUT_408)
end;
headers(C, Req, H, _HeaderCount) ->
body(C, Req#req{headers = lists:reverse(H)}).


% default connection
default_connection({1,1}) -> keep_alive;
default_connection(_) -> close.
Expand Down Expand Up @@ -159,19 +178,20 @@ body(#c{sock = Sock, recv_timeout = RecvTimeout} = C, Req) ->
close ->
gen_tcp:close(Sock);
keep_alive ->
inet:setopts(Sock, [{packet, http}]),
% TODO: REMOVE inet:setopts(Sock, [{packet, http}]),
% inet:setopts(Sock, [{active, false}, {packet, 0}]),
request(C, #req{peer_addr = Req#req.peer_addr, peer_port = Req#req.peer_port})
end;
'POST' ->
case catch list_to_integer(Req#req.content_length) of
{'EXIT', _} ->
% TODO: provide a fallback when content length is not or wrongly specified
?DEBUG(debug, "specified content length is not a valid integer number: ~p", [Req#req.content_length]),
send(C#c.sock, ?CONTENT_LENGTH_REQUIRED_411),
send(Sock, ?CONTENT_LENGTH_REQUIRED_411),
exit(normal);
Len ->
inet:setopts(Sock, [{packet, raw}]),
case gen_tcp:recv(Sock, Len, 2*RecvTimeout) of
inet:setopts(Sock, [{packet, raw}, {active, false}]),
case gen_tcp:recv(Sock, Len, RecvTimeout) of
{ok, Bin} ->
Close = handle_post(C, Req#req{body = Bin}),
case Close of
Expand All @@ -181,14 +201,17 @@ body(#c{sock = Sock, recv_timeout = RecvTimeout} = C, Req) ->
inet:setopts(Sock, [{packet, http}]),
request(C, #req{peer_addr = Req#req.peer_addr, peer_port = Req#req.peer_port})
end;
{error, timeout} ->
?DEBUG(debug, "request timeout, sending error", []),
send(Sock, ?REQUEST_TIMEOUT_408);
_Other ->
?DEBUG(debug, "tcp recv normal error: ~p", [_Other]),
?DEBUG(debug, "tcp normal error treating post: ~p", [_Other]),
exit(normal)
end
end;
_Other ->
?DEBUG(debug, "method not implemented: ~p", [_Other]),
send(C#c.sock, ?NOT_IMPLEMENTED_501),
send(Sock, ?NOT_IMPLEMENTED_501),
exit(normal)
end.

Expand Down

0 comments on commit 39493dc

Please sign in to comment.