Permalink
Browse files

Initial work #1 - adding load balancer

  • Loading branch information...
1 parent dac633e commit 41f5ba0dd626133d03be45c41a014b92c08fa312 @lpgauth lpgauth committed Jul 19, 2011
Showing with 297 additions and 219 deletions.
  1. +32 −30 src/lhttpc.erl
  2. +31 −42 src/lhttpc_client.erl
  3. +198 −0 src/lhttpc_lb.erl
  4. +22 −144 src/lhttpc_manager.erl
  5. +14 −3 src/lhttpc_sock.erl
View
@@ -1,7 +1,7 @@
%%% ----------------------------------------------------------------------------
%%% Copyright (c) 2009, Erlang Training and Consulting Ltd.
%%% All rights reserved.
-%%%
+%%%
%%% Redistribution and use in source and binary forms, with or without
%%% modification, are permitted provided that the following conditions are met:
%%% * Redistributions of source code must retain the above copyright
@@ -12,7 +12,7 @@
%%% * Neither the name of Erlang Training and Consulting Ltd. nor the
%%% names of its contributors may be used to endorse or promote products
%%% derived from this software without specific prior written permission.
-%%%
+%%%
%%% THIS SOFTWARE IS PROVIDED BY Erlang Training and Consulting Ltd. ''AS IS''
%%% AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
%%% IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
@@ -37,7 +37,7 @@
-export([start/2, stop/1]).
-export([
send_body_part/2,
- send_body_part/3,
+ send_body_part/3,
send_trailers/2,
send_trailers/3
]).
@@ -178,7 +178,7 @@ request(URL, Method, Hdrs, Body, Timeout) ->
%% request(Host, Port, Path, Ssl, Method, Hdrs, Body, Timeout, Options).
%% </pre>
%%
-%% `URL' is expected to be a valid URL:
+%% `URL' is expected to be a valid URL:
%% `scheme://host[:port][/path]'.
%% @end
%% @see request/9
@@ -229,13 +229,13 @@ request(URL, Method, Hdrs, Body, Timeout, Options) ->
%% `Ssl' = `false'<br/>
%% `Path' = `"/foobar"'<br/>
%% `Path' must begin with a forward slash `/'.
-%%
+%%
%% `Method' is either a string, stating the HTTP method exactly as in the
%% protocol, i.e: `"POST"' or `"GET"'. It could also be an atom, which is
%% then coverted to an uppercase (if it isn't already) string.
%%
%% `Hdrs' is a list of headers to send. Mandatory headers such as
-%% `Host', `Content-Length' or `Transfer-Encoding' (for some requests)
+%% `Host', `Content-Length' or `Transfer-Encoding' (for some requests)
%% are added automatically.
%%
%% `Body' is the entity to send in the request. Please don't include entity
@@ -254,7 +254,7 @@ request(URL, Method, Hdrs, Body, Timeout, Options) ->
%% choose to give up earlier than the connect timeout, in which case the
%% client will also give up. The default value is infinity, which means that
%% it will either give up when the TCP stack gives up, or when the overall
-%% request timeout is reached.
+%% request timeout is reached.
%%
%% `{connect_options, Options}' specifies options to pass to the socket at
%% connect time. This makes it possible to specify both SSL options and
@@ -311,7 +311,7 @@ request(URL, Method, Hdrs, Body, Timeout, Options) ->
%% pieces. Note however that the last piece might be smaller than `PartSize'.
%% Size bounded entity bodies are handled the same way as unbounded ones if
%% `PartSize' is `infinity'. If `PartSize' is integer it must be >= 0.
-%% If `{partial_download, PartialDownloadOptions}' is specified the
+%% If `{partial_download, PartialDownloadOptions}' is specified the
%% `ResponseBody' will be a `pid()' unless the response has no body
%% (for example in case of `HEAD' requests). In that case it will be be
%% `undefined'. The functions {@link get_body_part/1} and
@@ -325,18 +325,20 @@ request(Host, Port, Ssl, Path, Method, Hdrs, Body, Timeout, Options) ->
Args = [self(), Host, Port, Ssl, Path, Method, Hdrs, Body, Options],
Pid = spawn_link(lhttpc_client, request, Args),
receive
- {response, Pid, R} ->
- R;
- {exit, Pid, Reason} ->
- % We would rather want to exit here, instead of letting the
- % linked client send us an exit signal, since this can be
- % caught by the caller.
- exit(Reason);
- {'EXIT', Pid, Reason} ->
- % This could happen if the process we're running in taps exits
- % and the client process exits due to some exit signal being
- % sent to it. Very unlikely though
- exit(Reason)
+ X ->
+ X
+ % {response, Pid, R} ->
+ % R;
+ % {exit, Pid, Reason} ->
+ % % We would rather want to exit here, instead of letting the
+ % % linked client send us an exit signal, since this can be
+ % % caught by the caller.
+ % exit(Reason);
+ % {'EXIT', Pid, Reason} ->
+ % % This could happen if the process we're running in taps exits
+ % % and the client process exits due to some exit signal being
+ % % sent to it. Very unlikely though
+ % exit(Reason)
after Timeout ->
kill_client(Pid)
end.
@@ -353,7 +355,7 @@ request(Host, Port, Ssl, Path, Method, Hdrs, Body, Timeout, Options) ->
%% Would be the same as calling
%% `send_body_part(UploadState, BodyPart, infinity)'.
%% @end
--spec send_body_part({pid(), window_size()}, iolist()) ->
+-spec send_body_part({pid(), window_size()}, iolist()) ->
{pid(), window_size()} | result().
send_body_part({Pid, Window}, IoList) ->
send_body_part({Pid, Window}, IoList, infinity).
@@ -371,15 +373,15 @@ send_body_part({Pid, Window}, IoList) ->
%% milliseconds. If there is no acknowledgement received during that time the
%% the request is cancelled and `{error, timeout}' is returned.
%%
-%% As long as the window size is larger than 0 the function will return
+%% As long as the window size is larger than 0 the function will return
%% immediately after sending the body part to the request handling process.
-%%
+%%
%% The `BodyPart' `http_eob' signals an end of the entity body, the request
%% is considered sent and the response will be read from the socket. If
%% there is no response within `Timeout' milliseconds, the request is
%% canceled and `{error, timeout}' is returned.
%% @end
--spec send_body_part({pid(), window_size()}, iolist(), timeout()) ->
+-spec send_body_part({pid(), window_size()}, iolist(), timeout()) ->
{ok, {pid(), window_size()}} | result().
send_body_part({Pid, _Window}, http_eob, Timeout) when is_pid(Pid) ->
Pid ! {body_part, self(), http_eob},
@@ -422,7 +424,7 @@ send_body_part({Pid, Window}, IoList, _Timeout) when Window > 0, is_pid(Pid) ->
%% @doc Sends trailers to an ongoing request when `{partial_upload,
%% WindowSize}' is used and no `Content-Length' was specified. The default
%% timout `infinity' will be used. Plase note that after this the request is
-%% considered complete and the response will be read from the socket.
+%% considered complete and the response will be read from the socket.
%% Would be the same as calling
%% `send_trailers(UploadState, BodyPart, infinity)'.
%% @end
@@ -450,7 +452,7 @@ send_trailers({Pid, Window}, Trailers) ->
%% `Timeout' milliseconds the request is canceled and `{error, timeout}' is
%% returned.
%% @end
--spec send_trailers({pid(), window_size()}, [{string() | string()}],
+-spec send_trailers({pid(), window_size()}, [{string() | string()}],
timeout()) -> result().
send_trailers({Pid, _Window}, Trailers, Timeout)
when is_list(Trailers), is_pid(Pid) ->
@@ -465,7 +467,7 @@ send_trailers({Pid, _Window}, Trailers, Timeout)
%% Value = string() | binary()
%% @doc Reads a body part from an ongoing response when
%% `{partial_download, PartialDownloadOptions}' is used. The default timeout,
-%% `infinity' will be used.
+%% `infinity' will be used.
%% Would be the same as calling
%% `get_body_part(HTTPClient, infinity)'.
%% @end
@@ -482,11 +484,11 @@ get_body_part(Pid) ->
%% Value = string() | binary()
%% @doc Reads a body part from an ongoing response when
%% `{partial_download, PartialDownloadOptions}' is used.
-%% `Timeout' is the timeout for reading the next body part in milliseconds.
+%% `Timeout' is the timeout for reading the next body part in milliseconds.
%% `http_eob' marks the end of the body. If there were Trailers in the
-%% response those are returned with `http_eob' as well.
+%% response those are returned with `http_eob' as well.
%% @end
--spec get_body_part(pid(), timeout()) ->
+-spec get_body_part(pid(), timeout()) ->
{ok, binary()} | {ok, {http_eob, headers()}}.
get_body_part(Pid, Timeout) ->
receive
View
@@ -45,6 +45,7 @@
method :: string(),
request :: iolist(),
request_headers :: headers(),
+ load_balancer:: pid(),
socket,
connect_timeout = infinity :: timeout(),
connect_options = [] :: [any()],
@@ -102,14 +103,12 @@ execute(From, Host, Port, Ssl, Path, Method, Hdrs, Body, Options) ->
PartialUpload = proplists:is_defined(partial_upload, Options),
PartialDownload = proplists:is_defined(partial_download, Options),
PartialDownloadOptions = proplists:get_value(partial_download, Options, []),
+ ConnectOptions = proplists:get_value(connect_options, Options, []),
NormalizedMethod = lhttpc_lib:normalize_method(Method),
{ChunkedUpload, Request} = lhttpc_lib:format_request(Path, NormalizedMethod,
Hdrs, Host, Port, Body, PartialUpload),
- SocketRequest = {socket, self(), Host, Port, Ssl},
- Socket = case gen_server:call(lhttpc_manager, SocketRequest, infinity) of
- {ok, S} -> S; % Re-using HTTP/1.1 connections
- no_socket -> undefined % Opening a new HTTP/1.1 connection
- end,
+ LbRequest = {lb, Host, Port, Ssl},
+ {ok, Lb} = gen_server:call(lhttpc_manager, LbRequest, infinity),
State = #client_state{
host = Host,
port = Port,
@@ -118,10 +117,10 @@ execute(From, Host, Port, Ssl, Path, Method, Hdrs, Body, Options) ->
request = Request,
requester = From,
request_headers = Hdrs,
- socket = Socket,
+ load_balancer = Lb,
connect_timeout = proplists:get_value(connect_timeout, Options,
infinity),
- connect_options = proplists:get_value(connect_options, Options, []),
+ connect_options = ConnectOptions,
attempts = 1 + proplists:get_value(send_retry, Options, 1),
partial_upload = PartialUpload,
upload_window = UploadWindowSize,
@@ -136,19 +135,9 @@ execute(From, Host, Port, Ssl, Path, Method, Hdrs, Body, Options) ->
{R, undefined} ->
{ok, R};
{R, NewSocket} ->
- % The socket we ended up doing the request over is returned
- % here, it might be the same as Socket, but we don't know.
- % I've noticed that we don't want to give send sockets that we
- % can't change the controlling process for to the manager. This
- % really shouldn't fail, but it could do if:
- % * The socket was closed remotely already
- % * Due to an error in this module (returning dead sockets for
- % instance)
- ManagerPid = whereis(lhttpc_manager),
- case lhttpc_sock:controlling_process(NewSocket, ManagerPid, Ssl) of
+ case lhttpc_sock:controlling_process(NewSocket, Lb, Ssl) of
ok ->
- gen_server:cast(lhttpc_manager,
- {done, Host, Port, Ssl, NewSocket});
+ gen_server:cast(Lb, {store, NewSocket});
_ ->
ok
end,
@@ -157,45 +146,37 @@ execute(From, Host, Port, Ssl, Path, Method, Hdrs, Body, Options) ->
{response, self(), Response}.
send_request(#client_state{attempts = 0}) ->
- % Don't try again if the number of allowed attempts is 0.
throw(connection_closed);
send_request(#client_state{socket = undefined} = State) ->
- Host = State#client_state.host,
- Port = State#client_state.port,
- Ssl = State#client_state.ssl,
- Timeout = State#client_state.connect_timeout,
ConnectOptions = State#client_state.connect_options,
- SocketOptions = [binary, {packet, http}, {active, false} | ConnectOptions],
- case lhttpc_sock:connect(Host, Port, SocketOptions, Timeout, Ssl) of
+ Lb = State#client_state.load_balancer,
+ SocketRequest = {socket, self(), ConnectOptions},
+ case gen_server:call(Lb, SocketRequest, infinity) of
{ok, Socket} ->
send_request(State#client_state{socket = Socket});
- {error, etimedout} ->
- % TCP stack decided to give up
- throw(connect_timeout);
- {error, timeout} ->
- throw(connect_timeout);
{error, Reason} ->
- erlang:error(Reason)
+ throw(Reason)
end;
send_request(State) ->
+ Lb = State#client_state.load_balancer,
Socket = State#client_state.socket,
Ssl = State#client_state.ssl,
Request = State#client_state.request,
case lhttpc_sock:send(Socket, Request, Ssl) of
ok ->
if
- State#client_state.partial_upload -> partial_upload(State);
+ State#client_state.partial_upload -> partial_upload(State);
not State#client_state.partial_upload -> read_response(State)
end;
{error, closed} ->
- lhttpc_sock:close(Socket, Ssl),
+ gen_server:cast(Lb, {remove, Socket}),
NewState = State#client_state{
socket = undefined,
attempts = State#client_state.attempts - 1
},
send_request(NewState);
{error, Reason} ->
- lhttpc_sock:close(Socket, Ssl),
+ gen_server:cast(Lb, {remove, Socket}),
erlang:error(Reason)
end.
@@ -242,15 +223,16 @@ encode_body_part(#client_state{chunked_upload = false}, Data) ->
check_send_result(_State, ok) ->
ok;
-check_send_result(#client_state{socket = Sock, ssl = Ssl}, {error, Reason}) ->
- lhttpc_sock:close(Sock, Ssl),
+check_send_result(#client_state{socket = Socket, load_balancer = Lb}, {error, Reason}) ->
+ gen_server:cast(Lb, {remove, Socket}),
throw(Reason).
read_response(#client_state{socket = Socket, ssl = Ssl} = State) ->
lhttpc_sock:setopts(Socket, [{packet, http}], Ssl),
read_response(State, nil, {nil, nil}, []).
read_response(State, Vsn, {StatusCode, _} = Status, Hdrs) ->
+ Lb = State#client_state.load_balancer,
Socket = State#client_state.socket,
Ssl = State#client_state.ssl,
case lhttpc_sock:recv(Socket, Ssl) of
@@ -281,14 +263,19 @@ read_response(State, Vsn, {StatusCode, _} = Status, Hdrs) ->
% the request on the wire or the server has some issues and is
% closing connections without sending responses.
% If this the first attempt to send the request, we will try again.
- lhttpc_sock:close(Socket, Ssl),
+ gen_server:cast(Lb, {remove, Socket}),
NewState = State#client_state{
socket = undefined,
attempts = State#client_state.attempts - 1
},
send_request(NewState);
- {error, Reason} ->
- erlang:error(Reason)
+ {error, timeout} ->
+ gen_server:cast(Lb, {remove, Socket}),
+ NewState = State#client_state{
+ socket = undefined,
+ attempts = 0
+ },
+ send_request(NewState)
end.
handle_response_body(#client_state{partial_download = false} = State, Vsn,
@@ -408,7 +395,7 @@ read_body_part(#client_state{part_size = infinity} = State, _ContentLength) ->
end;
read_body_part(#client_state{part_size = PartSize} = State, ContentLength)
when PartSize =< ContentLength ->
- Socket = State#client_state.socket,
+ Socket = State#client_state.socket,
Ssl = State#client_state.ssl,
PartSize = State#client_state.part_size,
case lhttpc_sock:recv(Socket, PartSize, Ssl) of
@@ -419,7 +406,7 @@ read_body_part(#client_state{part_size = PartSize} = State, ContentLength)
end;
read_body_part(#client_state{part_size = PartSize} = State, ContentLength)
when PartSize > ContentLength ->
- Socket = State#client_state.socket,
+ Socket = State#client_state.socket,
Ssl = State#client_state.ssl,
case lhttpc_sock:recv(Socket, ContentLength, Ssl) of
{ok, Data} ->
@@ -630,6 +617,7 @@ maybe_close_socket(Socket, Ssl, {1, Minor}, ReqHdrs, RespHdrs) when Minor >= 1->
ServerConnection = ?CONNECTION_HDR(RespHdrs, "keep-alive"),
if
ClientConnection =:= "close"; ServerConnection =:= "close" ->
+ error_logger:error_report("close"),
lhttpc_sock:close(Socket, Ssl),
undefined;
ClientConnection =/= "close", ServerConnection =/= "close" ->
@@ -640,6 +628,7 @@ maybe_close_socket(Socket, Ssl, _, ReqHdrs, RespHdrs) ->
ServerConnection = ?CONNECTION_HDR(RespHdrs, "close"),
if
ClientConnection =:= "close"; ServerConnection =/= "keep-alive" ->
+ error_logger:error_report("close"),
lhttpc_sock:close(Socket, Ssl),
undefined;
ClientConnection =/= "close", ServerConnection =:= "keep-alive" ->
Oops, something went wrong.

0 comments on commit 41f5ba0

Please sign in to comment.