Browse files

Various changes. See README for details

  • Loading branch information...
1 parent c6a698c commit d756a2b0b6a792639c5f4fad7db1fe461e6d3634 Chandrashekhar Mullaparthi committed Sep 22, 2010
Showing with 319 additions and 109 deletions.
  1. +23 −3 README
  2. +8 −3 doc/ibrowse.html
  3. +66 −13 src/ibrowse.erl
  4. +155 −81 src/ibrowse_http_client.erl
  5. +22 −1 src/ibrowse_lb.erl
  6. +44 −7 src/ibrowse_test.erl
  7. +1 −1 vsn.mk
View
26 README
@@ -18,12 +18,32 @@ ibrowse is available under two different licenses. LGPL or the BSD license.
Comments to : Chandrashekhar.Mullaparthi@gmail.com
-Version : 1.6.2
+Version : 2.0.0
Latest version : git://github.com/cmullaparthi/ibrowse.git
CONTRIBUTIONS & CHANGE HISTORY
==============================
+22-09-2010 - * Added option preserve_chunked_encoding. This allows the caller to get
+ the raw HTTP response when the Transfer-Encoding is Chunked. This feature
+ was requested by Benoit Chesneau who wanted to write a HTTP proxy using
+ ibrowse.
+ * Fixed bug with the {stream_to, {Pid, once}} option. Bug report and lot
+ of help from Filipe David Manana. Thank you Filipe.
+ * The {error, conn_failed} and {error, send_failed} return values are
+ now of the form {error, {conn_failed, Err}} and
+ {error, {send_failed, Err}}. This is so that the specific socket error
+ can be returned to the caller. I think it looks a bit ugly, but that
+ is the best compromise I could come up with.
+ * Added application configuration parameters default_max_sessions and
+ default_max_pipeline_size. These were previously hard coded to 10.
+ * Versioning of ibrowse now follows the Semantic Versioning principles.
+ See http://semver.org. Thanks to Anthony Molinaro for nudging me in
+ this direction.
+ * The connect_timeout option now only applies to the connection setup
+ phase. In previous versions, the time taken to setup the connection
+ was deducted from the specified timeout value for the request.
+
17-07-2010 - * Merged change made by Filipe David Manana to use the base64
module for encoding/decoding.
@@ -153,7 +173,7 @@ CONTRIBUTIONS & CHANGE HISTORY
12-01-2007 - Derek Upham sent in a bug fix. The reset_state function was not
behaving correctly when the transfer encoding was not chunked.
-13-11-2006 - Youns Hafri reported a bug where ibrowse was not returning the
+13-11-2006 - Youns Hafri reported a bug where ibrowse was not returning the
temporary filename when the server was closing the connection
after sending the data (as in HTTP/1.0).
Released ibrowse under the BSD license
@@ -172,7 +192,7 @@ CONTRIBUTIONS & CHANGE HISTORY
22-Nov-2005 - Added ability to generate requests using the Chunked
Transfer-Encoding.
-08-May-2005 - Youns Hafri made a CRUX LINUX port of ibrowse.
+08-May-2005 - Youns Hafri made a CRUX LINUX port of ibrowse.
http://yhafri.club.fr/crux/index.html
Here are some usage examples. Enjoy!
View
11 doc/ibrowse.html
@@ -12,7 +12,7 @@
<ul class="index"><li><a href="#description">Description</a></li><li><a href="#index">Function Index</a></li><li><a href="#functions">Function Details</a></li></ul>The ibrowse application implements an HTTP 1.1 client.
<p>Copyright © 2005-2010 Chandrashekhar Mullaparthi</p>
-<p><b>Version:</b> 1.6.0</p>
+<p><b>Version:</b> 2.0.0</p>
<p><b>Behaviours:</b> <a href="gen_server.html"><tt>gen_server</tt></a>.</p>
<p><b>Authors:</b> Chandrashekhar Mullaparthi (<a href="mailto:chandrashekhar dot mullaparthi at gmail dot com"><tt>chandrashekhar dot mullaparthi at gmail dot com</tt></a>).</p>
@@ -202,7 +202,7 @@ <h3 class="function"><a name="send_req-5">send_req/5</a></h3>
<div class="spec">
<p><tt>send_req(Url::string(), Headers::<a href="#type-headerList">headerList()</a>, Method::<a href="#type-method">method()</a>, Body::<a href="#type-body">body()</a>, Options::<a href="#type-optionList">optionList()</a>) -&gt; <a href="#type-response">response()</a></tt>
<ul class="definitions"><li><tt><a name="type-optionList">optionList()</a> = [<a href="#type-option">option()</a>]</tt></li>
-<li><tt><a name="type-option">option()</a> = {max_sessions, integer()} | {response_format, <a href="#type-response_format">response_format()</a>} | {stream_chunk_size, integer()} | {max_pipeline_size, integer()} | {trace, <a href="#type-boolean">boolean()</a>} | {is_ssl, <a href="#type-boolean">boolean()</a>} | {ssl_options, [SSLOpt]} | {pool_name, atom()} | {proxy_host, string()} | {proxy_port, integer()} | {proxy_user, string()} | {proxy_password, string()} | {use_absolute_uri, <a href="#type-boolean">boolean()</a>} | {basic_auth, {<a href="#type-username">username()</a>, <a href="#type-password">password()</a>}} | {cookie, string()} | {content_length, integer()} | {content_type, string()} | {save_response_to_file, <a href="#type-srtf">srtf()</a>} | {stream_to, <a href="#type-stream_to">stream_to()</a>} | {http_vsn, {MajorVsn, MinorVsn}} | {host_header, string()} | {inactivity_timeout, integer()} | {connect_timeout, integer()} | {socket_options, Sock_opts} | {transfer_encoding, {chunked, ChunkSize}} | {headers_as_is, <a href="#type-boolean">boolean()</a>} | {give_raw_headers, <a href="#type-boolean">boolean()</a>}</tt></li>
+<li><tt><a name="type-option">option()</a> = {max_sessions, integer()} | {response_format, <a href="#type-response_format">response_format()</a>} | {stream_chunk_size, integer()} | {max_pipeline_size, integer()} | {trace, <a href="#type-boolean">boolean()</a>} | {is_ssl, <a href="#type-boolean">boolean()</a>} | {ssl_options, [SSLOpt]} | {pool_name, atom()} | {proxy_host, string()} | {proxy_port, integer()} | {proxy_user, string()} | {proxy_password, string()} | {use_absolute_uri, <a href="#type-boolean">boolean()</a>} | {basic_auth, {<a href="#type-username">username()</a>, <a href="#type-password">password()</a>}} | {cookie, string()} | {content_length, integer()} | {content_type, string()} | {save_response_to_file, <a href="#type-srtf">srtf()</a>} | {stream_to, <a href="#type-stream_to">stream_to()</a>} | {http_vsn, {MajorVsn, MinorVsn}} | {host_header, string()} | {inactivity_timeout, integer()} | {connect_timeout, integer()} | {socket_options, Sock_opts} | {transfer_encoding, {chunked, ChunkSize}} | {headers_as_is, <a href="#type-boolean">boolean()</a>} | {give_raw_headers, <a href="#type-boolean">boolean()</a>} | {preserve_chunked_encoding, <a href="#type-boolean">boolean()</a>}</tt></li>
<li><tt><a name="type-stream_to">stream_to()</a> = <a href="#type-process">process()</a> | {<a href="#type-process">process()</a>, once}</tt></li>
<li><tt><a name="type-process">process()</a> = pid() | atom()</tt></li>
<li><tt><a name="type-username">username()</a> = string()</tt></li>
@@ -284,6 +284,11 @@ <h3 class="function"><a name="send_req-5">send_req/5</a></h3>
caller to get access to the raw status line and raw unparsed
headers. Not quite sure why someone would want this, but one of my
users asked for it, so here it is. </li>
+
+ <li> The <code>preserve_chunked_encoding</code> option enables the caller
+ to receive the raw data stream when the Transfer-Encoding of the server
+ response is Chunked.
+ </li>
</ul>
</p>
@@ -441,6 +446,6 @@ <h3 class="function"><a name="trace_on-2">trace_on/2</a></h3>
<hr>
<div class="navbar"><a name="#navbar_bottom"></a><table width="100%" border="0" cellspacing="0" cellpadding="2" summary="navigation bar"><tr><td><a href="overview-summary.html" target="overviewFrame">Overview</a></td><td><a href="http://www.erlang.org/"><img src="erlang.png" align="right" border="0" alt="erlang logo"></a></td></tr></table></div>
-<p><i>Generated by EDoc, May 17 2010, 23:21:42.</i></p>
+<p><i>Generated by EDoc, Sep 22 2010, 22:56:44.</i></p>
</body>
</html>
View
79 src/ibrowse.erl
@@ -7,7 +7,7 @@
%%%-------------------------------------------------------------------
%% @author Chandrashekhar Mullaparthi <chandrashekhar dot mullaparthi at gmail dot com>
%% @copyright 2005-2010 Chandrashekhar Mullaparthi
-%% @version 1.6.0
+%% @version 2.0.0
%% @doc The ibrowse application implements an HTTP 1.1 client. This
%% module implements the API of the HTTP client. There is one named
%% process called 'ibrowse' which assists in load balancing and maintaining configuration. There is one load balancing process per unique webserver. There is
@@ -236,6 +236,11 @@ send_req(Url, Headers, Method, Body) ->
%% caller to get access to the raw status line and raw unparsed
%% headers. Not quite sure why someone would want this, but one of my
%% users asked for it, so here it is. </li>
+%%
+%% <li> The <code>preserve_chunked_encoding</code> option enables the caller
+%% to receive the raw data stream when the Transfer-Encoding of the server
+%% response is Chunked.
+%% </li>
%% </ul>
%%
%% @spec send_req(Url::string(), Headers::headerList(), Method::method(), Body::body(), Options::optionList()) -> response()
@@ -266,7 +271,8 @@ send_req(Url, Headers, Method, Body) ->
%% {socket_options, Sock_opts} |
%% {transfer_encoding, {chunked, ChunkSize}} |
%% {headers_as_is, boolean()} |
-%% {give_raw_headers, boolean()}
+%% {give_raw_headers, boolean()} |
+%% {preserve_chunked_encoding,boolean()}
%%
%% stream_to() = process() | {process(), once}
%% process() = pid() | atom()
@@ -302,23 +308,46 @@ send_req(Url, Headers, Method, Body, Options, Timeout) ->
Options_1 = merge_options(Host, Port, Options),
{SSLOptions, IsSSL} =
case (Protocol == https) orelse
- get_value(is_ssl, Options_1, false) of
+ get_value(is_ssl, Options_1, false) of
false -> {[], false};
true -> {get_value(ssl_options, Options_1, []), true}
end,
- case ibrowse_lb:spawn_connection(Lb_pid, Parsed_url,
+ try_routing_request(Lb_pid, Parsed_url,
+ Max_sessions,
+ Max_pipeline_size,
+ {SSLOptions, IsSSL},
+ Headers, Method, Body, Options_1, Timeout, 0);
+ Err ->
+ {error, {url_parsing_failed, Err}}
+ end.
+
+try_routing_request(Lb_pid, Parsed_url,
+ Max_sessions,
+ Max_pipeline_size,
+ {SSLOptions, IsSSL},
+ Headers, Method, Body, Options_1, Timeout, Try_count) when Try_count < 3 ->
+ case ibrowse_lb:spawn_connection(Lb_pid, Parsed_url,
Max_sessions,
Max_pipeline_size,
{SSLOptions, IsSSL}) of
- {ok, Conn_Pid} ->
- do_send_req(Conn_Pid, Parsed_url, Headers,
- Method, Body, Options_1, Timeout);
- Err ->
- Err
+ {ok, Conn_Pid} ->
+ case do_send_req(Conn_Pid, Parsed_url, Headers,
+ Method, Body, Options_1, Timeout) of
+ {error, sel_conn_closed} ->
+ io:format("Selected connection closed. Trying again...~n", []),
+ try_routing_request(Lb_pid, Parsed_url,
+ Max_sessions,
+ Max_pipeline_size,
+ {SSLOptions, IsSSL},
+ Headers, Method, Body, Options_1, Timeout, Try_count + 1);
+ Res ->
+ Res
end;
Err ->
- {error, {url_parsing_failed, Err}}
- end.
+ Err
+ end;
+try_routing_request(_, _, _, _, _, _, _, _, _, _, _) ->
+ {error, retry_later}.
merge_options(Host, Port, Options) ->
Config_options = get_config_value({options, Host, Port}, []),
@@ -337,11 +366,27 @@ get_lb_pid(Url) ->
get_max_sessions(Host, Port, Options) ->
get_value(max_sessions, Options,
- get_config_value({max_sessions, Host, Port}, ?DEF_MAX_SESSIONS)).
+ get_config_value({max_sessions, Host, Port},
+ default_max_sessions())).
get_max_pipeline_size(Host, Port, Options) ->
get_value(max_pipeline_size, Options,
- get_config_value({max_pipeline_size, Host, Port}, ?DEF_MAX_PIPELINE_SIZE)).
+ get_config_value({max_pipeline_size, Host, Port},
+ default_max_pipeline_size())).
+
+default_max_sessions() ->
+ safe_get_env(ibrowse, default_max_sessions, ?DEF_MAX_SESSIONS).
+
+default_max_pipeline_size() ->
+ safe_get_env(ibrowse, default_max_pipeline_size, ?DEF_MAX_PIPELINE_SIZE).
+
+safe_get_env(App, Key, Def_val) ->
+ case application:get_env(App, Key) of
+ undefined ->
+ Def_val;
+ {ok, Val} ->
+ Val
+ end.
%% @doc Deprecated. Use set_max_sessions/3 and set_max_pipeline_size/3
%% for achieving the same effect.
@@ -375,6 +420,10 @@ do_send_req(Conn_Pid, Parsed_url, Headers, Method, Body, Options, Timeout) ->
Options, Timeout) of
{'EXIT', {timeout, _}} ->
{error, req_timedout};
+ {'EXIT', {noproc, {gen_server, call, [Conn_Pid, _, _]}}} ->
+ {error, sel_conn_closed};
+ {error, connection_closed} ->
+ {error, sel_conn_closed};
{'EXIT', Reason} ->
{error, {'EXIT', Reason}};
{ok, St_code, Headers, Body} = Ret when is_binary(Body) ->
@@ -684,6 +733,10 @@ handle_call({get_lb_pid, #url{host = Host, port = Port} = Url}, _From, State) ->
handle_call(stop, _From, State) ->
do_trace("IBROWSE shutting down~n", []),
+ ets:foldl(fun(#lb_pid{pid = Pid}, Acc) ->
+ ibrowse_lb:stop(Pid),
+ Acc
+ end, [], ibrowse_lb),
{stop, normal, ok, State};
handle_call({set_config_value, Key, Val}, _From, State) ->
View
236 src/ibrowse_http_client.erl
@@ -47,7 +47,8 @@
status_line, raw_headers,
is_closing, send_timer, content_length,
deleted_crlf = false, transfer_encoding,
- chunk_size, chunk_size_buffer = <<>>, recvd_chunk_size,
+ chunk_size, chunk_size_buffer = <<>>,
+ recvd_chunk_size, interim_reply_sent = false,
lb_ets_tid, cur_pipeline_size = 0, prev_req_id
}).
@@ -57,7 +58,7 @@
req_id,
stream_chunk_size,
save_response_to_file = false,
- tmp_file_name, tmp_file_fd,
+ tmp_file_name, tmp_file_fd, preserve_chunked_encoding,
response_format}).
-import(ibrowse_lib, [
@@ -82,8 +83,13 @@ start_link(Args) ->
gen_server:start_link(?MODULE, Args, []).
stop(Conn_pid) ->
- catch gen_server:call(Conn_pid, stop),
- ok.
+ case catch gen_server:call(Conn_pid, stop) of
+ {'EXIT', {timeout, _}} ->
+ exit(Conn_pid, kill),
+ ok;
+ _ ->
+ ok
+ end.
send_req(Conn_Pid, Url, Headers, Method, Body, Options, Timeout) ->
gen_server:call(
@@ -171,20 +177,22 @@ handle_cast(_Msg, State) ->
%% {stop, Reason, State} (terminate/2 is called)
%%--------------------------------------------------------------------
handle_info({tcp, _Sock, Data}, #state{status = Status} = State) ->
+%% io:format("Recvd data: ~p~n", [Data]),
do_trace("Data recvd in state: ~p. Size: ~p. ~p~n~n", [Status, size(Data), Data]),
handle_sock_data(Data, State);
handle_info({ssl, _Sock, Data}, State) ->
handle_sock_data(Data, State);
handle_info({stream_next, Req_id}, #state{socket = Socket,
cur_req = #request{req_id = Req_id}} = State) ->
+ %% io:format("Client process set {active, once}~n", []),
do_setopts(Socket, [{active, once}], State),
{noreply, State};
handle_info({stream_next, _Req_id}, State) ->
{noreply, State};
-handle_info({tcp_closed, _Sock}, State) ->
+handle_info({tcp_closed, _Sock}, State) ->
do_trace("TCP connection closed by peer!~n", []),
handle_sock_closed(State),
{stop, normal, State};
@@ -194,11 +202,11 @@ handle_info({ssl_closed, _Sock}, State) ->
{stop, normal, State};
handle_info({tcp_error, _Sock}, State) ->
- io:format("Error on connection to ~1000.p:~1000.p~n", [State#state.host, State#state.port]),
+ do_trace("Error on connection to ~1000.p:~1000.p~n", [State#state.host, State#state.port]),
handle_sock_closed(State),
{stop, normal, State};
handle_info({ssl_error, _Sock}, State) ->
- io:format("Error on SSL connection to ~1000.p:~1000.p~n", [State#state.host, State#state.port]),
+ do_trace("Error on SSL connection to ~1000.p:~1000.p~n", [State#state.host, State#state.port]),
handle_sock_closed(State),
{stop, normal, State};
@@ -233,7 +241,8 @@ handle_info(Info, State) ->
%% Returns: any (ignored by gen_server)
%%--------------------------------------------------------------------
terminate(_Reason, State) ->
- do_close(State).
+ do_close(State),
+ ok.
%%--------------------------------------------------------------------
%% Func: code_change/3
@@ -269,6 +278,7 @@ handle_sock_data(Data, #state{status = get_header}=State) ->
end;
handle_sock_data(Data, #state{status = get_body,
+ socket = Socket,
content_length = CL,
http_status_code = StatCode,
recvd_headers = Headers,
@@ -293,6 +303,21 @@ handle_sock_data(Data, #state{status = get_body,
fail_pipelined_requests(State,
{error, {Reason, {stat_code, StatCode}, Headers}}),
{stop, normal, State};
+ #state{cur_req = #request{caller_controls_socket = Ccs},
+ interim_reply_sent = Irs} = State_1 ->
+ %% io:format("Irs: ~p~n", [Irs]),
+ case Irs of
+ true ->
+ active_once(State_1);
+ false when Ccs == true ->
+ %% io:format("Setting {active,once}~n", []),
+ do_setopts(Socket, [{active, once}], State);
+ false ->
+ active_once(State_1)
+ end,
+ State_2 = State_1#state{interim_reply_sent = false},
+ set_inac_timer(State_2),
+ {noreply, State_2};
State_1 ->
active_once(State_1),
set_inac_timer(State_1),
@@ -338,33 +363,47 @@ accumulate_response(Data, #state{cur_req = #request{save_response_to_file = Srtf
{error, Reason} ->
{error, {file_write_error, Reason}}
end;
-accumulate_response(<<>>, State) ->
- State;
-accumulate_response(Data, #state{reply_buffer = RepBuf,
- rep_buf_size = RepBufSize,
- streamed_size = Streamed_size,
- cur_req = CurReq}=State) ->
- #request{stream_to=StreamTo, req_id=ReqId,
- stream_chunk_size = Stream_chunk_size,
- response_format = Response_format,
- caller_controls_socket = Caller_controls_socket} = CurReq,
- RepBuf_1 = list_to_binary([RepBuf, Data]),
+%% accumulate_response(<<>>, #state{cur_req = #request{caller_controls_socket = Ccs},
+%% socket = Socket} = State) ->
+%% case Ccs of
+%% true ->
+%% do_setopts(Socket, [{active, once}], State);
+%% false ->
+%% ok
+%% end,
+%% State;
+accumulate_response(Data, #state{reply_buffer = RepBuf,
+ rep_buf_size = RepBufSize,
+ streamed_size = Streamed_size,
+ cur_req = CurReq}=State) ->
+ #request{stream_to = StreamTo,
+ req_id = ReqId,
+ stream_chunk_size = Stream_chunk_size,
+ response_format = Response_format,
+ caller_controls_socket = Caller_controls_socket} = CurReq,
+ RepBuf_1 = <<RepBuf/binary, Data/binary>>,
New_data_size = RepBufSize - Streamed_size,
case StreamTo of
undefined ->
State#state{reply_buffer = RepBuf_1};
_ when Caller_controls_socket == true ->
do_interim_reply(StreamTo, Response_format, ReqId, RepBuf_1),
State#state{reply_buffer = <<>>,
+ interim_reply_sent = true,
streamed_size = Streamed_size + size(RepBuf_1)};
_ when New_data_size >= Stream_chunk_size ->
{Stream_chunk, Rem_data} = split_binary(RepBuf_1, Stream_chunk_size),
do_interim_reply(StreamTo, Response_format, ReqId, Stream_chunk),
- accumulate_response(
- Rem_data,
- State#state{
- reply_buffer = <<>>,
- streamed_size = Streamed_size + Stream_chunk_size});
+ State_1 = State#state{
+ reply_buffer = <<>>,
+ interim_reply_sent = true,
+ streamed_size = Streamed_size + Stream_chunk_size},
+ case Rem_data of
+ <<>> ->
+ State_1;
+ _ ->
+ accumulate_response(Rem_data, State_1)
+ end;
_ ->
State#state{reply_buffer = RepBuf_1}
end.
@@ -498,9 +537,9 @@ do_close(#state{socket = Sock,
is_ssl = true,
use_proxy = true,
proxy_tunnel_setup = Pts
- }) when Pts /= done -> gen_tcp:close(Sock);
-do_close(#state{socket = Sock, is_ssl = true}) -> ssl:close(Sock);
-do_close(#state{socket = Sock, is_ssl = false}) -> gen_tcp:close(Sock).
+ }) when Pts /= done -> catch gen_tcp:close(Sock);
+do_close(#state{socket = Sock, is_ssl = true}) -> catch ssl:close(Sock);
+do_close(#state{socket = Sock, is_ssl = false}) -> catch gen_tcp:close(Sock).
active_once(#state{cur_req = #request{caller_controls_socket = true}}) ->
ok;
@@ -542,25 +581,17 @@ send_req_1(From,
end,
State_2 = check_ssl_options(Options, State_1),
do_trace("Connecting...~n", []),
- Start_ts = now(),
Conn_timeout = get_value(connect_timeout, Options, Timeout),
case do_connect(Host_1, Port_1, Options, State_2, Conn_timeout) of
{ok, Sock} ->
- do_trace("Connected!~n", []),
- End_ts = now(),
- Timeout_1 = case Timeout of
- infinity ->
- infinity;
- _ ->
- Timeout - trunc(round(timer:now_diff(End_ts, Start_ts) / 1000))
- end,
+ do_trace("Connected! Socket: ~1000.p~n", [Sock]),
State_3 = State_2#state{socket = Sock,
connect_timeout = Conn_timeout},
- send_req_1(From, Url, Headers, Method, Body, Options, Timeout_1, State_3);
+ send_req_1(From, Url, Headers, Method, Body, Options, Timeout, State_3);
Err ->
shutting_down(State_2),
do_trace("Error connecting. Reason: ~1000.p~n", [Err]),
- gen_server:reply(From, {error, conn_failed}),
+ gen_server:reply(From, {error, {conn_failed, Err}}),
{stop, normal, State_2}
end;
@@ -580,8 +611,9 @@ send_req_1(From,
use_proxy = true,
is_ssl = true} = State) ->
NewReq = #request{
- method = connect,
- options = Options
+ method = connect,
+ preserve_chunked_encoding = get_value(preserve_chunked_encoding, Options, false),
+ options = Options
},
State_1 = State#state{reqs=queue:in(NewReq, State#state.reqs)},
Pxy_auth_headers = maybe_modify_headers(Url, Method, Options, [], State_1),
@@ -611,13 +643,13 @@ send_req_1(From,
Err ->
shutting_down(State_1),
do_trace("Send failed... Reason: ~p~n", [Err]),
- gen_server:reply(From, {error, send_failed}),
+ gen_server:reply(From, {error, {send_failed, Err}}),
{stop, normal, State_1}
end;
Err ->
shutting_down(State_1),
do_trace("Send failed... Reason: ~p~n", [Err]),
- gen_server:reply(From, {error, send_failed}),
+ gen_server:reply(From, {error, {send_failed, Err}}),
{stop, normal, State_1}
end;
@@ -666,7 +698,9 @@ send_req_1(From,
save_response_to_file = SaveResponseToFile,
stream_chunk_size = get_stream_chunk_size(Options),
response_format = Resp_format,
- from = From},
+ from = From,
+ preserve_chunked_encoding = get_value(preserve_chunked_encoding, Options, false)
+ },
State_1 = State#state{reqs=queue:in(NewReq, State#state.reqs)},
Headers_1 = maybe_modify_headers(Url, Method, Options, Headers, State_1),
{Req, Body_1} = make_request(Method,
@@ -705,13 +739,13 @@ send_req_1(From,
Err ->
shutting_down(State_1),
do_trace("Send failed... Reason: ~p~n", [Err]),
- gen_server:reply(From, {error, send_failed}),
+ gen_server:reply(From, {error, {send_failed, Err}}),
{stop, normal, State_1}
end;
Err ->
shutting_down(State_1),
do_trace("Send failed... Reason: ~p~n", [Err]),
- gen_server:reply(From, {error, send_failed}),
+ gen_server:reply(From, {error, {send_failed, Err}}),
{stop, normal, State_1}
end.
@@ -768,14 +802,14 @@ http_auth_digest(Username, Password) ->
ibrowse_lib:encode_base64(Username ++ [$: | Password]).
make_request(Method, Headers, AbsPath, RelPath, Body, Options,
- #state{use_proxy = UseProxy}) ->
+ #state{use_proxy = UseProxy, is_ssl = Is_ssl}) ->
HttpVsn = http_vsn_string(get_value(http_vsn, Options, {1,1})),
Headers_1 =
case get_value(content_length, Headers, false) of
false when (Body == []) or
- (Body == <<>>) or
- is_tuple(Body) or
- is_function(Body) ->
+ (Body == <<>>) or
+ is_tuple(Body) or
+ is_function(Body) ->
Headers;
false when is_binary(Body) ->
[{"content-length", integer_to_list(size(Body))} | Headers];
@@ -799,7 +833,12 @@ make_request(Method, Headers, AbsPath, RelPath, Body, Options,
Headers_3 = cons_headers(Headers_2),
Uri = case get_value(use_absolute_uri, Options, false) or UseProxy of
true ->
- AbsPath;
+ case Is_ssl of
+ true ->
+ RelPath;
+ false ->
+ AbsPath
+ end;
false ->
RelPath
end,
@@ -1017,7 +1056,7 @@ upgrade_to_ssl(#state{socket = Socket,
send_queued_requests(lists:reverse(Q), State_1);
Err ->
do_trace("Upgrade to SSL socket failed. Reson: ~p~n", [Err]),
- do_error_reply(State, {error, send_failed}),
+ do_error_reply(State, {error, {send_failed, Err}}),
{error, send_failed}
end.
@@ -1029,12 +1068,12 @@ send_queued_requests([{From, Url, Headers, Method, Body, Options, Timeout} | Q],
case send_req_1(From, Url, Headers, Method, Body, Options, Timeout, State) of
{noreply, State_1} ->
send_queued_requests(Q, State_1);
- _ ->
+ Err ->
do_trace("Error sending queued SSL request: ~n"
"URL : ~s~n"
"Method : ~p~n"
"Headers : ~p~n", [Url, Method, Headers]),
- do_error_reply(State, {error, send_failed}),
+ do_error_reply(State, {error, {send_failed, Err}}),
{error, send_failed}
end.
@@ -1046,11 +1085,12 @@ is_connection_closing(_, _) -> false.
%% This clause determines the chunk size when given data from the beginning of the chunk
parse_11_response(DataRecvd,
#state{transfer_encoding = chunked,
- chunk_size = chunk_start,
+ chunk_size = chunk_start,
chunk_size_buffer = Chunk_sz_buf
} = State) ->
case scan_crlf(Chunk_sz_buf, DataRecvd) of
{yes, ChunkHeader, Data_1} ->
+ State_1 = maybe_accumulate_ce_data(State, <<ChunkHeader/binary, $\r, $\n>>),
ChunkSize = parse_chunk_header(ChunkHeader),
%%
%% Do we have to preserve the chunk encoding when
@@ -1061,10 +1101,10 @@ parse_11_response(DataRecvd,
RemLen = size(Data_1),
do_trace("Determined chunk size: ~p. Already recvd: ~p~n",
[ChunkSize, RemLen]),
- parse_11_response(Data_1, State#state{chunk_size_buffer = <<>>,
- deleted_crlf = true,
- recvd_chunk_size = 0,
- chunk_size = ChunkSize});
+ parse_11_response(Data_1, State_1#state{chunk_size_buffer = <<>>,
+ deleted_crlf = true,
+ recvd_chunk_size = 0,
+ chunk_size = ChunkSize});
{no, Data_1} ->
State#state{chunk_size_buffer = Data_1}
end;
@@ -1074,13 +1114,15 @@ parse_11_response(DataRecvd,
parse_11_response(DataRecvd,
#state{transfer_encoding = chunked,
chunk_size = tbd,
- chunk_size_buffer = Buf}=State) ->
+ chunk_size_buffer = Buf
+ } = State) ->
case scan_crlf(Buf, DataRecvd) of
{yes, _, NextChunk} ->
- State_1 = State#state{chunk_size = chunk_start,
- chunk_size_buffer = <<>>,
- deleted_crlf = true},
- parse_11_response(NextChunk, State_1);
+ State_1 = maybe_accumulate_ce_data(State, <<$\r, $\n>>),
+ State_2 = State_1#state{chunk_size = chunk_start,
+ chunk_size_buffer = <<>>,
+ deleted_crlf = true},
+ parse_11_response(NextChunk, State_2);
{no, Data_1} ->
State#state{chunk_size_buffer = Data_1}
end;
@@ -1090,9 +1132,10 @@ parse_11_response(DataRecvd,
%% received is silently discarded.
parse_11_response(DataRecvd,
#state{transfer_encoding = chunked, chunk_size = 0,
- cur_req = CurReq,
- deleted_crlf = DelCrlf,
- chunk_size_buffer = Trailer, reqs = Reqs}=State) ->
+ cur_req = CurReq,
+ deleted_crlf = DelCrlf,
+ chunk_size_buffer = Trailer,
+ reqs = Reqs} = State) ->
do_trace("Detected end of chunked transfer...~n", []),
DataRecvd_1 = case DelCrlf of
false ->
@@ -1101,12 +1144,14 @@ parse_11_response(DataRecvd,
<<$\r, $\n, DataRecvd/binary>>
end,
case scan_header(Trailer, DataRecvd_1) of
- {yes, _TEHeaders, Rem} ->
+ {yes, TEHeaders, Rem} ->
{_, Reqs_1} = queue:out(Reqs),
- State_1 = handle_response(CurReq, State#state{reqs = Reqs_1}),
- parse_response(Rem, reset_state(State_1));
+ State_1 = maybe_accumulate_ce_data(State, <<TEHeaders/binary, $\r, $\n>>),
+ State_2 = handle_response(CurReq,
+ State_1#state{reqs = Reqs_1}),
+ parse_response(Rem, reset_state(State_2));
{no, Rem} ->
- State#state{chunk_size_buffer = Rem, deleted_crlf = false}
+ accumulate_response(<<>>, State#state{chunk_size_buffer = Rem, deleted_crlf = false})
end;
%% This clause extracts a chunk, given the size.
@@ -1121,8 +1166,15 @@ parse_11_response(DataRecvd,
case DataLen >= NeedBytes of
true ->
{RemChunk, RemData} = split_binary(DataRecvd, NeedBytes),
- do_trace("Recvd another chunk...~n", []),
+ do_trace("Recvd another chunk...~p~n", [RemChunk]),
do_trace("RemData -> ~p~n", [RemData]),
+ case RemData of
+ <<>> ->
+ %% io:format("RemData -> ~p~n", [RemData]);
+ ok;
+ _ ->
+ ok
+ end,
case accumulate_response(RemChunk, State) of
{error, Reason} ->
do_trace("Error accumulating response --> ~p~n", [Reason]),
@@ -1155,6 +1207,11 @@ parse_11_response(DataRecvd,
accumulate_response(DataRecvd, State#state{rep_buf_size = (RepBufSz+DataLen)})
end.
+maybe_accumulate_ce_data(#state{cur_req = #request{preserve_chunked_encoding = false}} = State, _) ->
+ State;
+maybe_accumulate_ce_data(State, Data) ->
+ accumulate_response(Data, State).
+
handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId,
response_format = Resp_format,
save_response_to_file = SaveResponseToFile,
@@ -1177,11 +1234,12 @@ handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId,
_ ->
{file, TmpFilename}
end,
+ {Resp_headers_1, Raw_headers_1} = maybe_add_custom_headers(RespHeaders, Raw_headers, Options),
Reply = case get_value(give_raw_headers, Options, false) of
true ->
- {ok, Status_line, Raw_headers, ResponseBody};
+ {ok, Status_line, Raw_headers_1, ResponseBody};
false ->
- {ok, SCode, RespHeaders, ResponseBody}
+ {ok, SCode, Resp_headers_1, ResponseBody}
end,
State_2 = do_reply(State_1, From, StreamTo, ReqId, Resp_format, Reply),
cancel_timer(ReqTimer, {eat_message, {req_timedout, From}}),
@@ -1192,16 +1250,17 @@ handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId,
#state{http_status_code = SCode,
status_line = Status_line,
raw_headers = Raw_headers,
- recvd_headers = RespHeaders,
+ recvd_headers = Resp_headers,
reply_buffer = RepBuf,
send_timer = ReqTimer} = State) ->
Body = RepBuf,
%% State_1 = set_cur_request(State),
+ {Resp_headers_1, Raw_headers_1} = maybe_add_custom_headers(Resp_headers, Raw_headers, Options),
Reply = case get_value(give_raw_headers, Options, false) of
true ->
- {ok, Status_line, Raw_headers, Body};
+ {ok, Status_line, Raw_headers_1, Body};
false ->
- {ok, SCode, RespHeaders, Body}
+ {ok, SCode, Resp_headers_1, Body}
end,
State_1 = case get(conn_close) of
"close" ->
@@ -1227,7 +1286,8 @@ reset_state(State) ->
deleted_crlf = false,
http_status_code = undefined,
chunk_size = undefined,
- transfer_encoding = undefined}.
+ transfer_encoding = undefined
+ }.
set_cur_request(#state{reqs = Reqs} = State) ->
case queue:to_list(Reqs) of
@@ -1459,15 +1519,29 @@ send_async_headers(_ReqId, undefined, _, _State) ->
ok;
send_async_headers(ReqId, StreamTo, Give_raw_headers,
#state{status_line = Status_line, raw_headers = Raw_headers,
- recvd_headers = Headers, http_status_code = StatCode
- }) ->
+ recvd_headers = Headers, http_status_code = StatCode,
+ cur_req = #request{options = Opts}
+ }) ->
+ {Headers_1, Raw_headers_1} = maybe_add_custom_headers(Headers, Raw_headers, Opts),
case Give_raw_headers of
false ->
- catch StreamTo ! {ibrowse_async_headers, ReqId, StatCode, Headers};
+ catch StreamTo ! {ibrowse_async_headers, ReqId, StatCode, Headers_1};
true ->
- catch StreamTo ! {ibrowse_async_headers, ReqId, Status_line, Raw_headers}
+ catch StreamTo ! {ibrowse_async_headers, ReqId, Status_line, Raw_headers_1}
end.
+maybe_add_custom_headers(Headers, Raw_headers, Opts) ->
+ Custom_headers = get_value(add_custom_headers, Opts, []),
+ Headers_1 = Headers ++ Custom_headers,
+ Raw_headers_1 = case Custom_headers of
+ [_ | _] when is_binary(Raw_headers) ->
+ Custom_headers_bin = list_to_binary(string:join([[X, $:, Y] || {X, Y} <- Custom_headers], "\r\n")),
+ <<Raw_headers/binary, "\r\n", Custom_headers_bin/binary>>;
+ _ ->
+ Raw_headers
+ end,
+ {Headers_1, Raw_headers_1}.
+
format_response_data(Resp_format, Body) ->
case Resp_format of
list when is_list(Body) ->
View
23 src/ibrowse_lb.erl
@@ -16,7 +16,8 @@
%% External exports
-export([
start_link/1,
- spawn_connection/5
+ spawn_connection/5,
+ stop/1
]).
%% gen_server callbacks
@@ -85,6 +86,14 @@ spawn_connection(Lb_pid, Url,
is_integer(Max_sessions) ->
gen_server:call(Lb_pid,
{spawn_connection, Url, Max_sessions, Max_pipeline_size, SSL_options}).
+
+stop(Lb_pid) ->
+ case catch gen_server:call(Lb_pid, stop) of
+ {'EXIT', {timeout, _}} ->
+ exit(Lb_pid, kill);
+ ok ->
+ ok
+ end.
%%--------------------------------------------------------------------
%% Function: handle_call/3
%% Description: Handling call messages
@@ -120,6 +129,18 @@ handle_call({spawn_connection, Url, _Max_sess, _Max_pipe, SSL_options}, _From,
ets:insert(Tid, {{1, Pid}, []}),
{reply, {ok, Pid}, State_1#state{num_cur_sessions = Cur + 1}};
+handle_call(stop, _From, #state{ets_tid = undefined} = State) ->
+ gen_server:reply(_From, ok),
+ {stop, normal, State};
+
+handle_call(stop, _From, #state{ets_tid = Tid} = State) ->
+ ets:foldl(fun({{_, Pid}, _}, Acc) ->
+ ibrowse_http_client:stop(Pid),
+ Acc
+ end, [], Tid),
+ gen_server:reply(_From, ok),
+ {stop, normal, State};
+
handle_call(Request, _From, State) ->
Reply = {unknown_request, Request},
{reply, Reply, State}.
View
51 src/ibrowse_test.erl
@@ -17,6 +17,7 @@
ue_test/1,
verify_chunked_streaming/0,
verify_chunked_streaming/1,
+ test_chunked_streaming_once/0,
i_do_async_req_list/4,
test_stream_once/3,
test_stream_once/4
@@ -260,7 +261,20 @@ verify_chunked_streaming(Options) ->
io:format("Fetching data with streaming as binary...~n", []),
Async_response_bin = do_async_req_list(
Url, get, [{response_format, binary} | Options]),
- compare_responses(Result_without_streaming, Async_response_list, Async_response_bin).
+ io:format("Fetching data with streaming as binary, {active, once}...~n", []),
+ Async_response_bin_once = do_async_req_list(
+ Url, get, [once, {response_format, binary} | Options]),
+ compare_responses(Result_without_streaming, Async_response_list, Async_response_bin),
+ compare_responses(Result_without_streaming, Async_response_list, Async_response_bin_once).
+
+test_chunked_streaming_once() ->
+ test_chunked_streaming_once([]).
+
+test_chunked_streaming_once(Options) ->
+ Url = "http://www.httpwatch.com/httpgallery/chunked/",
+ io:format("URL: ~s~n", [Url]),
+ io:format("Fetching data with streaming as binary, {active, once}...~n", []),
+ do_async_req_list(Url, get, [once, {response_format, binary} | Options]).
compare_responses({ok, St_code, _, Body}, {ok, St_code, _, Body}, {ok, St_code, _, Body}) ->
success;
@@ -313,31 +327,54 @@ wait_for_resp(Pid) ->
Msg ->
io:format("Recvd unknown message: ~p~n", [Msg]),
wait_for_resp(Pid)
- after 10000 ->
+ after 100000 ->
{error, timeout}
end.
i_do_async_req_list(Parent, Url, Method, Options) ->
- Res = ibrowse:send_req(Url, [], Method, [], [{stream_to, self()} | Options]),
+ Options_1 = case lists:member(once, Options) of
+ true ->
+ [{stream_to, {self(), once}} | (Options -- [once])];
+ false ->
+ [{stream_to, self()} | Options]
+ end,
+ Res = ibrowse:send_req(Url, [], Method, [], Options_1),
case Res of
{ibrowse_req_id, Req_id} ->
- Result = wait_for_async_resp(Req_id, undefined, undefined, []),
+ Result = wait_for_async_resp(Req_id, Options, undefined, undefined, []),
Parent ! {async_result, self(), Result};
Err ->
Parent ! {async_result, self(), Err}
end.
-wait_for_async_resp(Req_id, Acc_Stat_code, Acc_Headers, Body) ->
+wait_for_async_resp(Req_id, Options, Acc_Stat_code, Acc_Headers, Body) ->
receive
{ibrowse_async_headers, Req_id, StatCode, Headers} ->
- wait_for_async_resp(Req_id, StatCode, Headers, Body);
+ %% io:format("Recvd headers...~n", []),
+ maybe_stream_next(Req_id, Options),
+ wait_for_async_resp(Req_id, Options, StatCode, Headers, Body);
{ibrowse_async_response_end, Req_id} ->
+ io:format("Recvd end of response.~n", []),
Body_1 = list_to_binary(lists:reverse(Body)),
{ok, Acc_Stat_code, Acc_Headers, Body_1};
{ibrowse_async_response, Req_id, Data} ->
- wait_for_async_resp(Req_id, Acc_Stat_code, Acc_Headers, [Data | Body]);
+ maybe_stream_next(Req_id, Options),
+ %% io:format("Recvd data...~n", []),
+ wait_for_async_resp(Req_id, Options, Acc_Stat_code, Acc_Headers, [Data | Body]);
+ {ibrowse_async_response, Req_id, {error, _} = Err} ->
+ {ok, Acc_Stat_code, Acc_Headers, Err};
Err ->
{ok, Acc_Stat_code, Acc_Headers, Err}
+ after 10000 ->
+ {timeout, Acc_Stat_code, Acc_Headers, Body}
+ end.
+
+maybe_stream_next(Req_id, Options) ->
+ case lists:member(once, Options) of
+ true ->
+ ibrowse:stream_next(Req_id);
+ false ->
+ ok
end.
execute_req(Url, Method, Options) ->
View
2 vsn.mk
@@ -1,2 +1,2 @@
-IBROWSE_VSN = 1.6.2
+IBROWSE_VSN = 2.0.0

3 comments on commit d756a2b

@fdmanana

Chandru, I see here some uncommented calls to io:format (at least 1). Shouldn't they be do_trace/2 calls?

cheers

@cmullaparthi

Well spotted. I'll fix it later tonight.

@cmullaparthi

Fixed in ibrowse-2.0.1

Please sign in to comment.