Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

update ibrowse to 4.0.1

  • Loading branch information...
commit fd4b78671c4367f4f6469156e428176648c07a76 1 parent a4eb1b3
@janl janl authored
View
2  NOTICE
@@ -36,7 +36,7 @@ This product also includes the following third-party components:
* ibrowse (http://github.com/cmullaparthi/ibrowse/tree/master)
- Copyright 2009, Chandrashekhar Mullaparthi
+ Copyright 2005-2012, Chandrashekhar Mullaparthi
* Erlang OAuth (http://github.com/tim/erlang-oauth)
View
14 src/ibrowse/ibrowse.app.in
@@ -1,13 +1,7 @@
{application, ibrowse,
- [{description, "HTTP client application"},
- {vsn, "2.2.0"},
- {modules, [ ibrowse,
- ibrowse_http_client,
- ibrowse_app,
- ibrowse_sup,
- ibrowse_lib,
- ibrowse_lb ]},
- {registered, []},
- {applications, [kernel,stdlib,sasl]},
+ [{description, "Erlang HTTP client application"},
+ {vsn, "4.0.1"},
+ {registered, [ibrowse_sup, ibrowse]},
+ {applications, [kernel,stdlib]},
{env, []},
{mod, {ibrowse_app, []}}]}.
View
250 src/ibrowse/ibrowse.erl
@@ -6,8 +6,7 @@
%%% Created : 11 Oct 2003 by Chandrashekhar Mullaparthi <chandrashekhar.mullaparthi@t-mobile.co.uk>
%%%-------------------------------------------------------------------
%% @author Chandrashekhar Mullaparthi <chandrashekhar dot mullaparthi at gmail dot com>
-%% @copyright 2005-2011 Chandrashekhar Mullaparthi
-%% @version 2.1.3
+%% @copyright 2005-2012 Chandrashekhar Mullaparthi
%% @doc The ibrowse application implements an HTTP 1.1 client in erlang. This
%% module implements the API of the HTTP client. There is one named
%% process called 'ibrowse' which assists in load balancing and maintaining configuration. There is one load balancing process per unique webserver. There is
@@ -71,6 +70,7 @@
-export([
rescan_config/0,
rescan_config/1,
+ add_config/1,
get_config_value/1,
get_config_value/2,
spawn_worker_process/1,
@@ -97,7 +97,10 @@
trace_off/2,
all_trace_off/0,
show_dest_status/0,
- show_dest_status/2
+ show_dest_status/1,
+ show_dest_status/2,
+ get_metrics/0,
+ get_metrics/2
]).
-ifdef(debug).
@@ -136,7 +139,12 @@ start() ->
%% @doc Stop the ibrowse process. Useful when testing using the shell.
stop() ->
- catch gen_server:call(ibrowse, stop).
+ case catch gen_server:call(ibrowse, stop) of
+ {'EXIT',{noproc,_}} ->
+ ok;
+ Res ->
+ Res
+ end.
%% @doc This is the basic function to send a HTTP request.
%% The Status return value indicates the HTTP status code returned by the webserver
@@ -277,7 +285,8 @@ send_req(Url, Headers, Method, Body) ->
%% {transfer_encoding, {chunked, ChunkSize}} |
%% {headers_as_is, boolean()} |
%% {give_raw_headers, boolean()} |
-%% {preserve_chunked_encoding,boolean()}
+%% {preserve_chunked_encoding,boolean()} |
+%% {workaround, head_response_with_body}
%%
%% stream_to() = process() | {process(), once}
%% process() = pid() | atom()
@@ -287,7 +296,7 @@ send_req(Url, Headers, Method, Body) ->
%% Sock_opts = [Sock_opt]
%% Sock_opt = term()
%% ChunkSize = integer()
-%% srtf() = boolean() | filename()
+%% srtf() = boolean() | filename() | {append, filename()}
%% filename() = string()
%% response_format() = list | binary
send_req(Url, Headers, Method, Body, Options) ->
@@ -354,15 +363,16 @@ try_routing_request(_, _, _, _, _, _, _, _, _, _, _) ->
{error, retry_later}.
merge_options(Host, Port, Options) ->
- Config_options = get_config_value({options, Host, Port}, []),
+ Config_options = get_config_value({options, Host, Port}, []) ++
+ get_config_value({options, global}, []),
lists:foldl(
fun({Key, Val}, Acc) ->
- case lists:keysearch(Key, 1, Options) of
- false ->
- [{Key, Val} | Acc];
- _ ->
- Acc
- end
+ case lists:keysearch(Key, 1, Options) of
+ false ->
+ [{Key, Val} | Acc];
+ _ ->
+ Acc
+ end
end, Options, Config_options).
get_lb_pid(Url) ->
@@ -426,6 +436,8 @@ do_send_req(Conn_Pid, Parsed_url, Headers, Method, Body, Options, Timeout) ->
{error, req_timedout};
{'EXIT', {noproc, {gen_server, call, [Conn_Pid, _, _]}}} ->
{error, sel_conn_closed};
+ {'EXIT', {normal, _}} ->
+ {error, req_timedout};
{error, connection_closed} ->
{error, sel_conn_closed};
{'EXIT', Reason} ->
@@ -581,74 +593,98 @@ all_trace_off() ->
%% about workers spawned using spawn_worker_process/2 or
%% spawn_link_worker_process/2 is not included.
show_dest_status() ->
- Dests = lists:filter(fun({lb_pid, {Host, Port}, _}) when is_list(Host),
- is_integer(Port) ->
- true;
- (_) ->
- false
- end, ets:tab2list(ibrowse_lb)),
- All_ets = ets:all(),
io:format("~-40.40s | ~-5.5s | ~-10.10s | ~s~n",
["Server:port", "ETS", "Num conns", "LB Pid"]),
io:format("~80.80.=s~n", [""]),
- lists:foreach(fun({lb_pid, {Host, Port}, Lb_pid}) ->
- case lists:dropwhile(
- fun(Tid) ->
- ets:info(Tid, owner) /= Lb_pid
- end, All_ets) of
- [] ->
- io:format("~40.40s | ~-5.5s | ~-5.5s | ~s~n",
- [Host ++ ":" ++ integer_to_list(Port),
- "",
- "",
- io_lib:format("~p", [Lb_pid])]
- );
- [Tid | _] ->
- catch (
- begin
- Size = ets:info(Tid, size),
- io:format("~40.40s | ~-5.5s | ~-5.5s | ~s~n",
- [Host ++ ":" ++ integer_to_list(Port),
- io_lib:format("~p", [Tid]),
- integer_to_list(Size),
- io_lib:format("~p", [Lb_pid])]
- )
- end
- )
- end
- end, Dests).
-
+ Metrics = get_metrics(),
+ lists:foreach(
+ fun({Host, Port, Lb_pid, Tid, Size}) ->
+ io:format("~40.40s | ~-5.5s | ~-5.5s | ~p~n",
+ [Host ++ ":" ++ integer_to_list(Port),
+ integer_to_list(Tid),
+ integer_to_list(Size),
+ Lb_pid])
+ end, Metrics).
+
+show_dest_status(Url) ->
+ #url{host = Host, port = Port} = ibrowse_lib:parse_url(Url),
+ show_dest_status(Host, Port).
+
%% @doc Shows some internal information about load balancing to a
%% specified Host:Port. Info about workers spawned using
%% spawn_worker_process/2 or spawn_link_worker_process/2 is not
%% included.
show_dest_status(Host, Port) ->
+ case get_metrics(Host, Port) of
+ {Lb_pid, MsgQueueSize, Tid, Size,
+ {{First_p_sz, First_speculative_sz},
+ {Last_p_sz, Last_speculative_sz}}} ->
+ io:format("Load Balancer Pid : ~p~n"
+ "LB process msg q size : ~p~n"
+ "LB ETS table id : ~p~n"
+ "Num Connections : ~p~n"
+ "Smallest pipeline : ~p:~p~n"
+ "Largest pipeline : ~p:~p~n",
+ [Lb_pid, MsgQueueSize, Tid, Size,
+ First_p_sz, First_speculative_sz,
+ Last_p_sz, Last_speculative_sz]);
+ _Err ->
+ io:format("Metrics not available~n", [])
+ end.
+
+get_metrics() ->
+ Dests = lists:filter(fun({lb_pid, {Host, Port}, _}) when is_list(Host),
+ is_integer(Port) ->
+ true;
+ (_) ->
+ false
+ end, ets:tab2list(ibrowse_lb)),
+ All_ets = ets:all(),
+ lists:map(fun({lb_pid, {Host, Port}, Lb_pid}) ->
+ case lists:dropwhile(
+ fun(Tid) ->
+ ets:info(Tid, owner) /= Lb_pid
+ end, All_ets) of
+ [] ->
+ {Host, Port, Lb_pid, unknown, 0};
+ [Tid | _] ->
+ Size = case catch (ets:info(Tid, size)) of
+ N when is_integer(N) -> N;
+ _ -> 0
+ end,
+ {Host, Port, Lb_pid, Tid, Size}
+ end
+ end, Dests).
+
+get_metrics(Host, Port) ->
case ets:lookup(ibrowse_lb, {Host, Port}) of
[] ->
no_active_processes;
[#lb_pid{pid = Lb_pid}] ->
- io:format("Load Balancer Pid : ~p~n", [Lb_pid]),
- io:format("LB process msg q size : ~p~n", [(catch process_info(Lb_pid, message_queue_len))]),
+ MsgQueueSize = (catch process_info(Lb_pid, message_queue_len)),
+ %% {Lb_pid, MsgQueueSize,
case lists:dropwhile(
fun(Tid) ->
ets:info(Tid, owner) /= Lb_pid
end, ets:all()) of
[] ->
- io:format("Couldn't locate ETS table for ~p~n", [Lb_pid]);
+ {Lb_pid, MsgQueueSize, unknown, 0, unknown};
[Tid | _] ->
- First = ets:first(Tid),
- Last = ets:last(Tid),
- Size = ets:info(Tid, size),
- io:format("LB ETS table id : ~p~n", [Tid]),
- io:format("Num Connections : ~p~n", [Size]),
- case Size of
- 0 ->
- ok;
- _ ->
- {First_p_sz, _} = First,
- {Last_p_sz, _} = Last,
- io:format("Smallest pipeline : ~1000.p~n", [First_p_sz]),
- io:format("Largest pipeline : ~1000.p~n", [Last_p_sz])
+ try
+ Size = ets:info(Tid, size),
+ case Size of
+ 0 ->
+ ok;
+ _ ->
+ First = ets:first(Tid),
+ Last = ets:last(Tid),
+ [{_, First_p_sz, First_speculative_sz}] = ets:lookup(Tid, First),
+ [{_, Last_p_sz, Last_speculative_sz}] = ets:lookup(Tid, Last),
+ {Lb_pid, MsgQueueSize, Tid, Size,
+ {{First_p_sz, First_speculative_sz}, {Last_p_sz, Last_speculative_sz}}}
+ end
+ catch _:_ ->
+ not_available
end
end
end.
@@ -663,9 +699,15 @@ rescan_config() ->
%% Clear current configuration for ibrowse and load from the specified
%% file. Current configuration is cleared only if the specified
%% file is readable using file:consult/1
+rescan_config([{_,_}|_]=Terms) ->
+ gen_server:call(?MODULE, {rescan_config_terms, Terms});
rescan_config(File) when is_list(File) ->
gen_server:call(?MODULE, {rescan_config, File}).
+%% @doc Add additional configuration elements at runtime.
+add_config([{_,_}|_]=Terms) ->
+ gen_server:call(?MODULE, {add_config_terms, Terms}).
+
%%====================================================================
%% Server functions
%%====================================================================
@@ -701,44 +743,60 @@ import_config() ->
import_config(Filename) ->
case file:consult(Filename) of
{ok, Terms} ->
- ets:delete_all_objects(ibrowse_conf),
- Fun = fun({dest, Host, Port, MaxSess, MaxPipe, Options})
- when is_list(Host), is_integer(Port),
- is_integer(MaxSess), MaxSess > 0,
- is_integer(MaxPipe), MaxPipe > 0, is_list(Options) ->
- I = [{{max_sessions, Host, Port}, MaxSess},
- {{max_pipeline_size, Host, Port}, MaxPipe},
- {{options, Host, Port}, Options}],
- lists:foreach(
- fun({X, Y}) ->
- ets:insert(ibrowse_conf,
- #ibrowse_conf{key = X,
- value = Y})
- end, I);
- ({K, V}) ->
- ets:insert(ibrowse_conf,
- #ibrowse_conf{key = K,
- value = V});
- (X) ->
- io:format("Skipping unrecognised term: ~p~n", [X])
- end,
- lists:foreach(Fun, Terms);
+ apply_config(Terms);
_Err ->
ok
end.
+apply_config(Terms) ->
+ ets:delete_all_objects(ibrowse_conf),
+ insert_config(Terms).
+
+insert_config(Terms) ->
+ Fun = fun({dest, Host, Port, MaxSess, MaxPipe, Options})
+ when is_list(Host), is_integer(Port),
+ is_integer(MaxSess), MaxSess > 0,
+ is_integer(MaxPipe), MaxPipe > 0, is_list(Options) ->
+ I = [{{max_sessions, Host, Port}, MaxSess},
+ {{max_pipeline_size, Host, Port}, MaxPipe},
+ {{options, Host, Port}, Options}],
+ lists:foreach(
+ fun({X, Y}) ->
+ ets:insert(ibrowse_conf,
+ #ibrowse_conf{key = X,
+ value = Y})
+ end, I);
+ ({K, V}) ->
+ ets:insert(ibrowse_conf,
+ #ibrowse_conf{key = K,
+ value = V});
+ (X) ->
+ io:format("Skipping unrecognised term: ~p~n", [X])
+ end,
+ lists:foreach(Fun, Terms).
+
%% @doc Internal export
get_config_value(Key) ->
- [#ibrowse_conf{value = V}] = ets:lookup(ibrowse_conf, Key),
- V.
+ try
+ [#ibrowse_conf{value = V}] = ets:lookup(ibrowse_conf, Key),
+ V
+ catch
+ error:badarg ->
+ throw({error, ibrowse_not_running})
+ end.
%% @doc Internal export
get_config_value(Key, DefVal) ->
- case ets:lookup(ibrowse_conf, Key) of
- [] ->
- DefVal;
- [#ibrowse_conf{value = V}] ->
- V
+ try
+ case ets:lookup(ibrowse_conf, Key) of
+ [] ->
+ DefVal;
+ [#ibrowse_conf{value = V}] ->
+ V
+ end
+ catch
+ error:badarg ->
+ throw({error, ibrowse_not_running})
end.
set_config_value(Key, Val) ->
@@ -777,6 +835,14 @@ handle_call({rescan_config, File}, _From, State) ->
Ret = (catch import_config(File)),
{reply, Ret, State};
+handle_call({rescan_config_terms, Terms}, _From, State) ->
+ Ret = (catch apply_config(Terms)),
+ {reply, Ret, State};
+
+handle_call({add_config_terms, Terms}, _From, State) ->
+ Ret = (catch insert_config(Terms)),
+ {reply, Ret, State};
+
handle_call(Request, _From, State) ->
Reply = {unknown_request, Request},
{reply, Reply, State}.
View
268 src/ibrowse/ibrowse_http_client.erl
@@ -47,7 +47,7 @@
reply_buffer = <<>>, rep_buf_size=0, streamed_size = 0,
recvd_headers=[],
status_line, raw_headers,
- is_closing, send_timer, content_length,
+ is_closing, content_length,
deleted_crlf = false, transfer_encoding,
chunk_size, chunk_size_buffer = <<>>,
recvd_chunk_size, interim_reply_sent = false,
@@ -61,7 +61,7 @@
stream_chunk_size,
save_response_to_file = false,
tmp_file_name, tmp_file_fd, preserve_chunked_encoding,
- response_format}).
+ response_format, timer_ref}).
-import(ibrowse_lib, [
get_value/2,
@@ -118,7 +118,7 @@ init({Lb_Tid, #url{host = Host, port = Port}, {SSLOptions, Is_ssl}}) ->
lb_ets_tid = Lb_Tid},
put(ibrowse_trace_token, [Host, $:, integer_to_list(Port)]),
put(my_trace_flag, ibrowse_lib:get_trace_status(Host, Port)),
- {ok, State};
+ {ok, set_inac_timer(State)};
init(Url) when is_list(Url) ->
case catch ibrowse_lib:parse_url(Url) of
#url{protocol = Protocol} = Url_rec ->
@@ -131,7 +131,7 @@ init({Host, Port}) ->
port = Port},
put(ibrowse_trace_token, [Host, $:, integer_to_list(Port)]),
put(my_trace_flag, ibrowse_lib:get_trace_status(Host, Port)),
- {ok, State}.
+ {ok, set_inac_timer(State)}.
%%--------------------------------------------------------------------
%% Function: handle_call/3
@@ -179,7 +179,6 @@ handle_cast(_Msg, State) ->
%% {stop, Reason, State} (terminate/2 is called)
%%--------------------------------------------------------------------
handle_info({tcp, _Sock, Data}, #state{status = Status} = State) ->
-%% io:format("Recvd data: ~p~n", [Data]),
do_trace("Data recvd in state: ~p. Size: ~p. ~p~n~n", [Status, size(Data), Data]),
handle_sock_data(Data, State);
handle_info({ssl, _Sock, Data}, State) ->
@@ -187,7 +186,6 @@ handle_info({ssl, _Sock, Data}, State) ->
handle_info({stream_next, Req_id}, #state{socket = Socket,
cur_req = #request{req_id = Req_id}} = State) ->
- %% io:format("Client process set {active, once}~n", []),
do_setopts(Socket, [{active, once}], State),
{noreply, set_inac_timer(State)};
@@ -198,8 +196,6 @@ handle_info({stream_next, _Req_id}, State) ->
_ ->
undefined
end,
-%% io:format("Ignoring stream_next as ~1000.p is not cur req (~1000.p)~n",
-%% [_Req_id, _Cur_req_id]),
{noreply, State};
handle_info({stream_close, _Req_id}, State) ->
@@ -234,7 +230,7 @@ handle_info({req_timedout, From}, State) ->
{noreply, State};
true ->
shutting_down(State),
- do_error_reply(State, req_timedout),
+%% do_error_reply(State, req_timedout),
{stop, normal, State}
end;
@@ -357,7 +353,8 @@ accumulate_response(Data,
tmp_file_fd = undefined} = CurReq,
http_status_code=[$2 | _]}=State) when Srtf /= false ->
TmpFilename = make_tmp_filename(Srtf),
- case file:open(TmpFilename, [write, delayed_write, raw]) of
+ Mode = file_mode(Srtf),
+ case file:open(TmpFilename, [Mode, delayed_write, raw]) of
{ok, Fd} ->
accumulate_response(Data, State#state{
cur_req = CurReq#request{
@@ -434,8 +431,13 @@ make_tmp_filename(true) ->
integer_to_list(B) ++
integer_to_list(C)]);
make_tmp_filename(File) when is_list(File) ->
+ File;
+make_tmp_filename({append, File}) when is_list(File) ->
File.
+file_mode({append, _File}) -> append;
+file_mode(_Srtf) -> write.
+
%%--------------------------------------------------------------------
%% Handles the case when the server closes the socket
@@ -560,9 +562,13 @@ do_send_body(Body, State, _TE) ->
do_send_body1(Source, Resp, State, TE) ->
case Resp of
+ {ok, Data} when Data == []; Data == <<>> ->
+ do_send_body({Source}, State, TE);
{ok, Data} ->
do_send(maybe_chunked_encode(Data, TE), State),
do_send_body({Source}, State, TE);
+ {ok, Data, New_source_state} when Data == []; Data == <<>> ->
+ do_send_body({Source, New_source_state}, State, TE);
{ok, Data, New_source_state} ->
do_send(maybe_chunked_encode(Data, TE), State),
do_send_body({Source, New_source_state}, State, TE);
@@ -658,10 +664,17 @@ send_req_1(From,
proxy_tunnel_setup = false,
use_proxy = true,
is_ssl = true} = State) ->
+ Ref = case Timeout of
+ infinity ->
+ undefined;
+ _ ->
+ erlang:send_after(Timeout, self(), {req_timedout, From})
+ end,
NewReq = #request{
method = connect,
preserve_chunked_encoding = get_value(preserve_chunked_encoding, Options, false),
- options = Options
+ options = Options,
+ timer_ref = Ref
},
State_1 = State#state{reqs=queue:in(NewReq, State#state.reqs)},
Pxy_auth_headers = maybe_modify_headers(Url, Method, Options, [], State_1),
@@ -677,17 +690,11 @@ send_req_1(From,
ok ->
trace_request_body(Body_1),
active_once(State_1),
- Ref = case Timeout of
- infinity ->
- undefined;
- _ ->
- erlang:send_after(Timeout, self(), {req_timedout, From})
- end,
- State_2 = State_1#state{status = get_header,
- cur_req = NewReq,
- send_timer = Ref,
- proxy_tunnel_setup = in_progress,
- tunnel_setup_queue = [{From, Url, Headers, Method, Body, Options, Timeout}]},
+ State_1_1 = inc_pipeline_counter(State_1),
+ State_2 = State_1_1#state{status = get_header,
+ cur_req = NewReq,
+ proxy_tunnel_setup = in_progress,
+ tunnel_setup_queue = [{From, Url, Headers, Method, Body, Options, Timeout}]},
State_3 = set_inac_timer(State_2),
{noreply, State_3};
Err ->
@@ -738,6 +745,12 @@ send_req_1(From,
exit({invalid_option, {stream_to, Stream_to_inv}})
end,
SaveResponseToFile = get_value(save_response_to_file, Options, false),
+ Ref = case Timeout of
+ infinity ->
+ undefined;
+ _ ->
+ erlang:send_after(Timeout, self(), {req_timedout, From})
+ end,
NewReq = #request{url = Url,
method = Method,
stream_to = StreamTo,
@@ -749,7 +762,8 @@ send_req_1(From,
stream_chunk_size = get_stream_chunk_size(Options),
response_format = Resp_format,
from = From,
- preserve_chunked_encoding = get_value(preserve_chunked_encoding, Options, false)
+ preserve_chunked_encoding = get_value(preserve_chunked_encoding, Options, false),
+ timer_ref = Ref
},
State_1 = State#state{reqs=queue:in(NewReq, State#state.reqs)},
Headers_1 = maybe_modify_headers(Url, Method, Options, Headers, State_1),
@@ -767,19 +781,12 @@ send_req_1(From,
trace_request_body(Body_1),
State_2 = inc_pipeline_counter(State_1),
active_once(State_2),
- Ref = case Timeout of
- infinity ->
- undefined;
- _ ->
- erlang:send_after(Timeout, self(), {req_timedout, From})
- end,
State_3 = case Status of
idle ->
State_2#state{status = get_header,
- cur_req = NewReq,
- send_timer = Ref};
+ cur_req = NewReq};
_ ->
- State_2#state{send_timer = Ref}
+ State_2
end,
case StreamTo of
undefined ->
@@ -987,13 +994,17 @@ chunk_request_body(Body, _ChunkSize, Acc) when is_list(Body) ->
lists:reverse(["\r\n", LastChunk, Chunk | Acc]).
-parse_response(_Data, #state{cur_req = undefined}=State) ->
+parse_response(<<>>, #state{cur_req = undefined}=State) ->
State#state{status = idle};
+parse_response(Data, #state{cur_req = undefined}) ->
+ do_trace("Data left to process when no pending request. ~1000.p~n", [Data]),
+ {error, data_in_status_idle};
+
parse_response(Data, #state{reply_buffer = Acc, reqs = Reqs,
cur_req = CurReq} = State) ->
#request{from=From, stream_to=StreamTo, req_id=ReqId,
method=Method, response_format = Resp_format,
- options = Options
+ options = Options, timer_ref = T_ref
} = CurReq,
MaxHeaderSize = ibrowse:get_config_value(max_headers_size, infinity),
case scan_header(Acc, Data) of
@@ -1005,47 +1016,55 @@ parse_response(Data, #state{reply_buffer = Acc, reqs = Reqs,
LCHeaders = [{to_lower(X), Y} || {X,Y} <- Headers_1],
ConnClose = to_lower(get_value("connection", LCHeaders, "false")),
IsClosing = is_connection_closing(HttpVsn, ConnClose),
- case IsClosing of
- true ->
- shutting_down(State);
- false ->
- ok
- end,
+ State_0 = case IsClosing of
+ true ->
+ shutting_down(State),
+ State#state{is_closing = IsClosing};
+ false ->
+ State
+ end,
Give_raw_headers = get_value(give_raw_headers, Options, false),
State_1 = case Give_raw_headers of
true ->
- State#state{recvd_headers=Headers_1, status=get_body,
- reply_buffer = <<>>,
- status_line = Status_line,
- raw_headers = Raw_headers,
- http_status_code=StatCode, is_closing=IsClosing};
+ State_0#state{recvd_headers=Headers_1, status=get_body,
+ reply_buffer = <<>>,
+ status_line = Status_line,
+ raw_headers = Raw_headers,
+ http_status_code=StatCode};
false ->
- State#state{recvd_headers=Headers_1, status=get_body,
- reply_buffer = <<>>,
- http_status_code=StatCode, is_closing=IsClosing}
+ State_0#state{recvd_headers=Headers_1, status=get_body,
+ reply_buffer = <<>>,
+ http_status_code=StatCode}
end,
put(conn_close, ConnClose),
TransferEncoding = to_lower(get_value("transfer-encoding", LCHeaders, "false")),
+ Head_response_with_body = lists:member({workaround, head_response_with_body}, Options),
case get_value("content-length", LCHeaders, undefined) of
_ when Method == connect,
hd(StatCode) == $2 ->
- cancel_timer(State#state.send_timer),
{_, Reqs_1} = queue:out(Reqs),
- upgrade_to_ssl(set_cur_request(State#state{reqs = Reqs_1,
- recvd_headers = [],
- status = idle
- }));
+ cancel_timer(T_ref),
+ upgrade_to_ssl(set_cur_request(State_0#state{reqs = Reqs_1,
+ recvd_headers = [],
+ status = idle
+ }));
_ when Method == connect ->
{_, Reqs_1} = queue:out(Reqs),
do_error_reply(State#state{reqs = Reqs_1},
{error, proxy_tunnel_failed}),
{error, proxy_tunnel_failed};
- _ when Method == head ->
+ _ when Method =:= head,
+ Head_response_with_body =:= false ->
+ %% This (HEAD response with body) is not supposed
+ %% to happen, but it does. An Apache server was
+ %% observed to send an "empty" body, but in a
+ %% Chunked-Transfer-Encoding way, which meant
+ %% there was still a body. Issue #67 on Github
{_, Reqs_1} = queue:out(Reqs),
send_async_headers(ReqId, StreamTo, Give_raw_headers, State_1),
State_1_1 = do_reply(State_1, From, StreamTo, ReqId, Resp_format,
{ok, StatCode, Headers_1, []}),
- cancel_timer(State_1_1#state.send_timer, {eat_message, {req_timedout, From}}),
+ cancel_timer(T_ref, {eat_message, {req_timedout, From}}),
State_2 = reset_state(State_1_1),
State_3 = set_cur_request(State_2#state{reqs = Reqs_1}),
parse_response(Data_1, State_3);
@@ -1065,7 +1084,7 @@ parse_response(Data, #state{reply_buffer = Acc, reqs = Reqs,
send_async_headers(ReqId, StreamTo, Give_raw_headers, State_1),
State_1_1 = do_reply(State_1, From, StreamTo, ReqId, Resp_format,
{ok, StatCode, Headers_1, []}),
- cancel_timer(State_1_1#state.send_timer, {eat_message, {req_timedout, From}}),
+ cancel_timer(T_ref, {eat_message, {req_timedout, From}}),
State_2 = reset_state(State_1_1),
State_3 = set_cur_request(State_2#state{reqs = Reqs_1}),
parse_response(Data_1, State_3);
@@ -1084,7 +1103,7 @@ parse_response(Data, #state{reply_buffer = Acc, reqs = Reqs,
State_2
end;
undefined when HttpVsn =:= "HTTP/1.0";
- ConnClose =:= "close" ->
+ ConnClose =:= "close" ->
send_async_headers(ReqId, StreamTo, Give_raw_headers, State_1),
State_1#state{reply_buffer = Data_1};
undefined ->
@@ -1291,12 +1310,12 @@ handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId,
save_response_to_file = SaveResponseToFile,
tmp_file_name = TmpFilename,
tmp_file_fd = Fd,
- options = Options
+ options = Options,
+ timer_ref = ReqTimer
},
#state{http_status_code = SCode,
status_line = Status_line,
raw_headers = Raw_headers,
- send_timer = ReqTimer,
reply_buffer = RepBuf,
recvd_headers = RespHeaders}=State) when SaveResponseToFile /= false ->
Body = RepBuf,
@@ -1324,13 +1343,13 @@ handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId,
set_cur_request(State_1);
handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId,
response_format = Resp_format,
- options = Options},
+ options = Options, timer_ref = ReqTimer},
#state{http_status_code = SCode,
status_line = Status_line,
raw_headers = Raw_headers,
recvd_headers = Resp_headers,
- reply_buffer = RepBuf,
- send_timer = ReqTimer} = State) ->
+ reply_buffer = RepBuf
+ } = State) ->
Body = RepBuf,
{Resp_headers_1, Raw_headers_1} = maybe_add_custom_headers(Resp_headers, Raw_headers, Options),
Reply = case get_value(give_raw_headers, Options, false) of
@@ -1360,10 +1379,10 @@ reset_state(State) ->
}.
set_cur_request(#state{reqs = Reqs, socket = Socket} = State) ->
- case queue:to_list(Reqs) of
- [] ->
+ case queue:peek(Reqs) of
+ empty ->
State#state{cur_req = undefined};
- [#request{caller_controls_socket = Ccs} = NextReq | _] ->
+ {value, #request{caller_controls_socket = Ccs} = NextReq} ->
case Ccs of
true ->
do_setopts(Socket, [{active, once}], State);
@@ -1410,6 +1429,11 @@ parse_headers_1([$\n, H |T], [$\r | L], Acc) when H =:= 32;
parse_headers_1(lists:dropwhile(fun(X) ->
is_whitespace(X)
end, T), [32 | L], Acc);
+parse_headers_1([$\n, H |T], L, Acc) when H =:= 32;
+ H =:= $\t ->
+ parse_headers_1(lists:dropwhile(fun(X) ->
+ is_whitespace(X)
+ end, T), [32 | L], Acc);
parse_headers_1([$\n|T], [$\r | L], Acc) ->
case parse_header(lists:reverse(L)) of
invalid ->
@@ -1417,6 +1441,13 @@ parse_headers_1([$\n|T], [$\r | L], Acc) ->
NewHeader ->
parse_headers_1(T, [], [NewHeader | Acc])
end;
+parse_headers_1([$\n|T], L, Acc) ->
+ case parse_header(lists:reverse(L)) of
+ invalid ->
+ parse_headers_1(T, [], Acc);
+ NewHeader ->
+ parse_headers_1(T, [], [NewHeader | Acc])
+ end;
parse_headers_1([H|T], L, Acc) ->
parse_headers_1(T, [H|L], Acc);
parse_headers_1([], [], Acc) ->
@@ -1458,10 +1489,13 @@ parse_header([], _) ->
invalid.
scan_header(Bin) ->
- case get_crlf_crlf_pos(Bin) of
+ case get_crlf_crlf_pos(Bin, 0) of
{yes, Pos} ->
{Headers, <<_:4/binary, Body/binary>>} = split_binary(Bin, Pos),
{yes, Headers, Body};
+ {yes_dodgy, Pos} ->
+ {Headers, <<_:2/binary, Body/binary>>} = split_binary(Bin, Pos),
+ {yes, Headers, Body};
no ->
{no, Bin}
end.
@@ -1474,29 +1508,26 @@ scan_header(Bin1, Bin2) ->
Bin1_already_scanned_size = size(Bin1) - 4,
<<Headers_prefix:Bin1_already_scanned_size/binary, Rest/binary>> = Bin1,
Bin_to_scan = <<Rest/binary, Bin2/binary>>,
- case get_crlf_crlf_pos(Bin_to_scan) of
+ case get_crlf_crlf_pos(Bin_to_scan, 0) of
{yes, Pos} ->
{Headers_suffix, <<_:4/binary, Body/binary>>} = split_binary(Bin_to_scan, Pos),
{yes, <<Headers_prefix/binary, Headers_suffix/binary>>, Body};
+ {yes_dodgy, Pos} ->
+ {Headers_suffix, <<_:2/binary, Body/binary>>} = split_binary(Bin_to_scan, Pos),
+ {yes, <<Headers_prefix/binary, Headers_suffix/binary>>, Body};
no ->
{no, <<Bin1/binary, Bin2/binary>>}
end.
-get_crlf_crlf_pos(Data) ->
- binary_bif_match(Data, <<$\r, $\n, $\r, $\n>>).
-
-binary_bif_match(Data, Binary) ->
- case binary:match(Data, Binary) of
- {Pos, _Len} ->
- {yes, Pos};
- _ -> no
- end.
-
+get_crlf_crlf_pos(<<$\r, $\n, $\r, $\n, _/binary>>, Pos) -> {yes, Pos};
+get_crlf_crlf_pos(<<$\n, $\n, _/binary>>, Pos) -> {yes_dodgy, Pos};
+get_crlf_crlf_pos(<<_, Rest/binary>>, Pos) -> get_crlf_crlf_pos(Rest, Pos + 1);
+get_crlf_crlf_pos(<<>>, _) -> no.
scan_crlf(Bin) ->
case get_crlf_pos(Bin) of
- {yes, Pos} ->
- {Prefix, <<_, _, Suffix/binary>>} = split_binary(Bin, Pos),
+ {yes, Offset, Pos} ->
+ {Prefix, <<_:Offset/binary, Suffix/binary>>} = split_binary(Bin, Pos),
{yes, Prefix, Suffix};
no ->
{no, Bin}
@@ -1513,16 +1544,20 @@ scan_crlf_1(Bin1_head_size, Bin1, Bin2) ->
<<Bin1_head:Bin1_head_size/binary, Bin1_tail/binary>> = Bin1,
Bin3 = <<Bin1_tail/binary, Bin2/binary>>,
case get_crlf_pos(Bin3) of
- {yes, Pos} ->
- {Prefix, <<_, _, Suffix/binary>>} = split_binary(Bin3, Pos),
+ {yes, Offset, Pos} ->
+ {Prefix, <<_:Offset/binary, Suffix/binary>>} = split_binary(Bin3, Pos),
{yes, list_to_binary([Bin1_head, Prefix]), Suffix};
no ->
{no, list_to_binary([Bin1, Bin2])}
end.
-get_crlf_pos(Data) ->
- binary_bif_match(Data, <<$\r, $\n>>).
+get_crlf_pos(Bin) ->
+ get_crlf_pos(Bin, 0).
+get_crlf_pos(<<$\r, $\n, _/binary>>, Pos) -> {yes, 2, Pos};
+get_crlf_pos(<<$\n, _/binary>>, Pos) -> {yes, 1, Pos};
+get_crlf_pos(<<_, Rest/binary>>, Pos) -> get_crlf_pos(Rest, Pos + 1);
+get_crlf_pos(<<>>, _) -> no.
fmt_val(L) when is_list(L) -> L;
fmt_val(I) when is_integer(I) -> integer_to_list(I);
@@ -1531,21 +1566,36 @@ fmt_val(Term) -> io_lib:format("~p", [Term]).
crnl() -> "\r\n".
-method(get) -> "GET";
-method(post) -> "POST";
-method(head) -> "HEAD";
-method(options) -> "OPTIONS";
-method(put) -> "PUT";
-method(delete) -> "DELETE";
-method(trace) -> "TRACE";
-method(mkcol) -> "MKCOL";
-method(propfind) -> "PROPFIND";
-method(proppatch) -> "PROPPATCH";
-method(lock) -> "LOCK";
-method(unlock) -> "UNLOCK";
-method(move) -> "MOVE";
-method(copy) -> "COPY";
-method(connect) -> "CONNECT".
+method(connect) -> "CONNECT";
+method(delete) -> "DELETE";
+method(get) -> "GET";
+method(head) -> "HEAD";
+method(options) -> "OPTIONS";
+method(post) -> "POST";
+method(put) -> "PUT";
+method(trace) -> "TRACE";
+%% webdav
+method(copy) -> "COPY";
+method(lock) -> "LOCK";
+method(mkcol) -> "MKCOL";
+method(move) -> "MOVE";
+method(propfind) -> "PROPFIND";
+method(proppatch) -> "PROPPATCH";
+method(search) -> "SEARCH";
+method(unlock) -> "UNLOCK";
+%% subversion %%
+method(report) -> "REPORT";
+method(mkactivity) -> "MKACTIVITY";
+method(checkout) -> "CHECKOUT";
+method(merge) -> "MERGE";
+%% upnp
+method(msearch) -> "MSEARCH";
+method(notify) -> "NOTIFY";
+method(subscribe) -> "SUBSCRIBE";
+method(unsubscribe) -> "UNSUBSCRIBE";
+%% rfc-5789
+method(patch) -> "PATCH";
+method(purge) -> "PURGE".
%% From RFC 2616
%%
@@ -1768,22 +1818,34 @@ to_lower([], Acc) ->
shutting_down(#state{lb_ets_tid = undefined}) ->
ok;
shutting_down(#state{lb_ets_tid = Tid,
- cur_pipeline_size = Sz}) ->
- catch ets:delete(Tid, {Sz, self()}).
+ cur_pipeline_size = _Sz}) ->
+ catch ets:delete(Tid, self()).
inc_pipeline_counter(#state{is_closing = true} = State) ->
State;
-inc_pipeline_counter(#state{cur_pipeline_size = Pipe_sz} = State) ->
+inc_pipeline_counter(#state{lb_ets_tid = undefined} = State) ->
+ State;
+inc_pipeline_counter(#state{cur_pipeline_size = Pipe_sz,
+ lb_ets_tid = Tid} = State) ->
+ update_counter(Tid, self(), {2,1,99999,9999}),
State#state{cur_pipeline_size = Pipe_sz + 1}.
+update_counter(Tid, Key, Args) ->
+ ets:update_counter(Tid, Key, Args).
+
dec_pipeline_counter(#state{is_closing = true} = State) ->
State;
dec_pipeline_counter(#state{lb_ets_tid = undefined} = State) ->
State;
dec_pipeline_counter(#state{cur_pipeline_size = Pipe_sz,
lb_ets_tid = Tid} = State) ->
- ets:delete(Tid, {Pipe_sz, self()}),
- ets:insert(Tid, {{Pipe_sz - 1, self()}, []}),
+ try
+ update_counter(Tid, self(), {2,-1,0,0}),
+ update_counter(Tid, self(), {3,-1,0,0})
+ catch
+ _:_ ->
+ ok
+ end,
State#state{cur_pipeline_size = Pipe_sz - 1}.
flatten([H | _] = L) when is_integer(H) ->
View
91 src/ibrowse/ibrowse_lb.erl
@@ -36,7 +36,9 @@
port,
max_sessions,
max_pipeline_size,
- num_cur_sessions = 0}).
+ num_cur_sessions = 0,
+ proc_state
+ }).
-include("ibrowse.hrl").
@@ -104,14 +106,21 @@ stop(Lb_pid) ->
%% {stop, Reason, Reply, State} | (terminate/2 is called)
%% {stop, Reason, State} (terminate/2 is called)
%%--------------------------------------------------------------------
-% handle_call({spawn_connection, _Url, Max_sess, Max_pipe, _}, _From,
-% #state{max_sessions = Max_sess,
-% ets_tid = Tid,
-% max_pipeline_size = Max_pipe_sz,
-% num_cur_sessions = Num} = State)
-% when Num >= Max ->
-% Reply = find_best_connection(Tid),
-% {reply, sorry_dude_reuse, State};
+
+handle_call(stop, _From, #state{ets_tid = undefined} = State) ->
+ gen_server:reply(_From, ok),
+ {stop, normal, State};
+
+handle_call(stop, _From, #state{ets_tid = Tid} = State) ->
+ ets:foldl(fun({Pid, _, _}, Acc) ->
+ ibrowse_http_client:stop(Pid),
+ Acc
+ end, [], Tid),
+ gen_server:reply(_From, ok),
+ {stop, normal, State};
+
+handle_call(_, _From, #state{proc_state = shutting_down} = State) ->
+ {reply, {error, shutting_down}, State};
%% Update max_sessions in #state with supplied value
handle_call({spawn_connection, _Url, Max_sess, Max_pipe, _}, _From,
@@ -119,27 +128,18 @@ handle_call({spawn_connection, _Url, Max_sess, Max_pipe, _}, _From,
when Num >= Max_sess ->
State_1 = maybe_create_ets(State),
Reply = find_best_connection(State_1#state.ets_tid, Max_pipe),
- {reply, Reply, State_1#state{max_sessions = Max_sess}};
+ {reply, Reply, State_1#state{max_sessions = Max_sess,
+ max_pipeline_size = Max_pipe}};
-handle_call({spawn_connection, Url, _Max_sess, _Max_pipe, SSL_options}, _From,
+handle_call({spawn_connection, Url, Max_sess, Max_pipe, SSL_options}, _From,
#state{num_cur_sessions = Cur} = State) ->
State_1 = maybe_create_ets(State),
Tid = State_1#state.ets_tid,
{ok, Pid} = ibrowse_http_client:start_link({Tid, Url, SSL_options}),
- ets:insert(Tid, {{1, Pid}, []}),
- {reply, {ok, Pid}, State_1#state{num_cur_sessions = Cur + 1}};
-
-handle_call(stop, _From, #state{ets_tid = undefined} = State) ->
- gen_server:reply(_From, ok),
- {stop, normal, State};
-
-handle_call(stop, _From, #state{ets_tid = Tid} = State) ->
- ets:foldl(fun({{_, Pid}, _}, Acc) ->
- ibrowse_http_client:stop(Pid),
- Acc
- end, [], Tid),
- gen_server:reply(_From, ok),
- {stop, normal, State};
+ ets:insert(Tid, {Pid, 0, 0}),
+ {reply, {ok, Pid}, State_1#state{num_cur_sessions = Cur + 1,
+ max_sessions = Max_sess,
+ max_pipeline_size = Max_pipe}};
handle_call(Request, _From, State) ->
Reply = {unknown_request, Request},
@@ -173,14 +173,13 @@ handle_info({'EXIT', Pid, _Reason},
ets_tid = Tid} = State) ->
ets:match_delete(Tid, {{'_', Pid}, '_'}),
Cur_1 = Cur - 1,
- State_1 = case Cur_1 of
+ case Cur_1 of
0 ->
ets:delete(Tid),
- State#state{ets_tid = undefined};
+ {noreply, State#state{ets_tid = undefined, num_cur_sessions = 0}, 10000};
_ ->
- State
- end,
- {noreply, State_1#state{num_cur_sessions = Cur_1}};
+ {noreply, State#state{num_cur_sessions = Cur_1}}
+ end;
handle_info({trace, Bool}, #state{ets_tid = undefined} = State) ->
put(my_trace_flag, Bool),
@@ -196,6 +195,18 @@ handle_info({trace, Bool}, #state{ets_tid = Tid} = State) ->
put(my_trace_flag, Bool),
{noreply, State};
+handle_info(timeout, State) ->
+ %% We can't shutdown the process immediately because a request
+ %% might be in flight. So we first remove the entry from the
+ %% ibrowse_lb ets table, and then shutdown a couple of seconds
+ %% later
+ ets:delete(ibrowse_lb, {State#state.host, State#state.port}),
+ erlang:send_after(2000, self(), shutdown),
+ {noreply, State#state{proc_state = shutting_down}};
+
+handle_info(shutdown, State) ->
+ {stop, normal, State};
+
handle_info(_Info, State) ->
{noreply, State}.
@@ -219,13 +230,19 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions
%%--------------------------------------------------------------------
find_best_connection(Tid, Max_pipe) ->
- case ets:first(Tid) of
- {Cur_sz, Pid} when Cur_sz < Max_pipe ->
- ets:delete(Tid, {Cur_sz, Pid}),
- ets:insert(Tid, {{Cur_sz + 1, Pid}, []}),
- {ok, Pid};
- _ ->
- {error, retry_later}
+ Res = find_best_connection(ets:first(Tid), Tid, Max_pipe),
+ Res.
+
+find_best_connection('$end_of_table', _, _) ->
+ {error, retry_later};
+find_best_connection(Pid, Tid, Max_pipe) ->
+ case ets:lookup(Tid, Pid) of
+ [{Pid, Cur_sz, Speculative_sz}] when Cur_sz < Max_pipe,
+ Speculative_sz < Max_pipe ->
+ ets:update_counter(Tid, Pid, {3, 1, 9999999, 9999999}),
+ {ok, Pid};
+ _ ->
+ find_best_connection(ets:next(Tid, Pid), Tid, Max_pipe)
end.
maybe_create_ets(#state{ets_tid = undefined} = State) ->
View
74 src/ibrowse/ibrowse_lib.erl
@@ -12,6 +12,10 @@
-include("ibrowse.hrl").
+-ifdef(EUNIT).
+-include_lib("eunit/include/eunit.hrl").
+-endif.
+
-export([
get_trace_status/2,
do_trace/2,
@@ -180,18 +184,24 @@ get_value(Tag, TVL) ->
V.
parse_url(Url) ->
- case parse_url(Url, get_protocol, #url{abspath=Url}, []) of
- #url{host_type = undefined, host = Host} = UrlRec ->
- case inet_parse:address(Host) of
- {ok, {_, _, _, _, _, _, _, _}} ->
- UrlRec#url{host_type = ipv6_address};
- {ok, {_, _, _, _}} ->
- UrlRec#url{host_type = ipv4_address};
- _ ->
- UrlRec#url{host_type = hostname}
- end;
- Else ->
- Else
+ try
+ case parse_url(Url, get_protocol, #url{abspath=Url}, []) of
+ #url{host_type = undefined, host = Host} = UrlRec ->
+ case inet_parse:address(Host) of
+ {ok, {_, _, _, _, _, _, _, _}} ->
+ UrlRec#url{host_type = ipv6_address};
+ {ok, {_, _, _, _}} ->
+ UrlRec#url{host_type = ipv4_address};
+ _ ->
+ UrlRec#url{host_type = hostname}
+ end;
+ #url{} = UrlRec ->
+ UrlRec;
+ _ ->
+ {error, invalid_uri}
+ end
+ catch _:_ ->
+ {error, invalid_uri}
end.
parse_url([$:, $/, $/ | _], get_protocol, Url, []) ->
@@ -389,3 +399,43 @@ do_trace(true, Fmt, Args) ->
do_trace(_, _, _) ->
ok.
-endif.
+
+-ifdef(EUNIT).
+
+parse_url_test() ->
+ Urls = [{"http://[FEDC:BA98:7654:3210:FEDC:BA98:7654:3210]:80/index.html",
+ #url{abspath = "http://[FEDC:BA98:7654:3210:FEDC:BA98:7654:3210]:80/index.html",
+ host = "FEDC:BA98:7654:3210:FEDC:BA98:7654:3210",
+ port = 80, protocol = http, path = "/index.html",
+ host_type = ipv6_address}},
+ {"http://[1080:0:0:0:8:800:200C:417A]/index.html",
+ #url{abspath = "http://[1080:0:0:0:8:800:200C:417A]/index.html",
+ host_type = ipv6_address, port = 80, protocol = http,
+ host = "1080:0:0:0:8:800:200C:417A", path = "/index.html"}},
+ {"http://[3ffe:2a00:100:7031::1]",
+ #url{abspath = "http://[3ffe:2a00:100:7031::1]",
+ host_type = ipv6_address, port = 80, protocol = http,
+ host = "3ffe:2a00:100:7031::1", path = "/"}},
+ {"http://[1080::8:800:200C:417A]/foo",
+ #url{abspath = "http://[1080::8:800:200C:417A]/foo",
+ host_type = ipv6_address, port = 80, protocol = http,
+ host = "1080::8:800:200C:417A", path = "/foo"}},
+ {"http://[::192.9.5.5]/ipng",
+ #url{abspath = "http://[::192.9.5.5]/ipng",
+ host_type = ipv6_address, port = 80, protocol = http,
+ host = "::192.9.5.5", path = "/ipng"}},
+ {"http://[::FFFF:129.144.52.38]:80/index.html",
+ #url{abspath = "http://[::FFFF:129.144.52.38]:80/index.html",
+ host_type = ipv6_address, port = 80, protocol = http,
+ host = "::FFFF:129.144.52.38", path = "/index.html"}},
+ {"http://[2010:836B:4179::836B:4179]",
+ #url{abspath = "http://[2010:836B:4179::836B:4179]",
+ host_type = ipv6_address, port = 80, protocol = http,
+ host = "2010:836B:4179::836B:4179", path = "/"}}
+ ],
+ lists:foreach(
+ fun({Url, Expected_result}) ->
+ ?assertMatch(Expected_result, parse_url(Url))
+ end, Urls).
+
+-endif.
View
132 src/ibrowse/ibrowse_test.erl
@@ -20,7 +20,14 @@
test_stream_once/3,
test_stream_once/4,
test_20122010/0,
- test_20122010/1
+ test_20122010/1,
+ test_pipeline_head_timeout/0,
+ test_pipeline_head_timeout/1,
+ do_test_pipeline_head_timeout/4,
+ test_head_transfer_encoding/0,
+ test_head_transfer_encoding/1,
+ test_head_response_with_body/0,
+ test_head_response_with_body/1
]).
test_stream_once(Url, Method, Options) ->
@@ -81,7 +88,7 @@ send_reqs_1(Url, NumWorkers, NumReqsPerWorker) ->
log_msg("Starting spawning of workers...~n", []),
spawn_workers(Url, NumWorkers, NumReqsPerWorker),
log_msg("Finished spawning workers...~n", []),
- do_wait(),
+ do_wait(Url),
End_time = now(),
log_msg("All workers are done...~n", []),
log_msg("ibrowse_test_results table: ~n~p~n", [ets:tab2list(ibrowse_test_results)]),
@@ -111,24 +118,28 @@ spawn_workers(Url, NumWorkers, NumReqsPerWorker) ->
ets:insert(pid_table, {Pid, []}),
spawn_workers(Url, NumWorkers - 1, NumReqsPerWorker).
-do_wait() ->
+do_wait(Url) ->
receive
{'EXIT', _, normal} ->
- do_wait();
+ catch ibrowse:show_dest_status(Url),
+ catch ibrowse:show_dest_status(),
+ do_wait(Url);
{'EXIT', Pid, Reason} ->
ets:delete(pid_table, Pid),
ets:insert(ibrowse_errors, {Pid, Reason}),
ets:update_counter(ibrowse_test_results, crash, 1),
- do_wait();
+ do_wait(Url);
Msg ->
io:format("Recvd unknown message...~p~n", [Msg]),
- do_wait()
+ do_wait(Url)
after 1000 ->
case ets:info(pid_table, size) of
0 ->
done;
_ ->
- do_wait()
+ catch ibrowse:show_dest_status(Url),
+ catch ibrowse:show_dest_status(),
+ do_wait(Url)
end
end.
@@ -219,7 +230,10 @@ dump_errors(Key, Iod) ->
{"http://jigsaw.w3.org/HTTP/CL/", get},
{"http://www.httpwatch.com/httpgallery/chunked/", get},
{"https://github.com", get, [{ssl_options, [{depth, 2}]}]},
- {local_test_fun, test_20122010, []}
+ {local_test_fun, test_20122010, []},
+ {local_test_fun, test_pipeline_head_timeout, []},
+ {local_test_fun, test_head_transfer_encoding, []},
+ {local_test_fun, test_head_response_with_body, []}
]).
unit_tests() ->
@@ -232,16 +246,19 @@ unit_tests(Options) ->
(catch ibrowse_test_server:start_server(8181, tcp)),
ibrowse:start(),
Options_1 = Options ++ [{connect_timeout, 5000}],
+ Test_timeout = proplists:get_value(test_timeout, Options, 60000),
{Pid, Ref} = erlang:spawn_monitor(?MODULE, unit_tests_1, [self(), Options_1]),
receive
{done, Pid} ->
ok;
{'DOWN', Ref, _, _, Info} ->
io:format("Test process crashed: ~p~n", [Info])
- after 60000 ->
+ after Test_timeout ->
exit(Pid, kill),
io:format("Timed out waiting for tests to complete~n", [])
- end.
+ end,
+ catch ibrowse_test_server:stop_server(8181),
+ ok.
unit_tests_1(Parent, Options) ->
lists:foreach(fun({local_test_fun, Fun_name, Args}) ->
@@ -426,6 +443,101 @@ log_msg(Fmt, Args) ->
[ibrowse_lib:printable_date() | Args]).
%%------------------------------------------------------------------------------
+%% Test what happens when the response to a HEAD request is a
+%% Chunked-Encoding response with a non-empty body. Issue #67 on
+%% Github
+%% ------------------------------------------------------------------------------
+test_head_transfer_encoding() ->
+ clear_msg_q(),
+ test_head_transfer_encoding("http://localhost:8181/ibrowse_head_test").
+
+test_head_transfer_encoding(Url) ->
+ case ibrowse:send_req(Url, [], head) of
+ {ok, "200", _, _} ->
+ success;
+ Res ->
+ {test_failed, Res}
+ end.
+
+%%------------------------------------------------------------------------------
+%% Test what happens when the response to a HEAD request is a
+%% Chunked-Encoding response with a non-empty body. Issue #67 on
+%% Github
+%% ------------------------------------------------------------------------------
+test_head_response_with_body() ->
+ clear_msg_q(),
+ test_head_response_with_body("http://localhost:8181/ibrowse_head_transfer_enc").
+
+test_head_response_with_body(Url) ->
+ case ibrowse:send_req(Url, [], head, [], [{workaround, head_response_with_body}]) of
+ {ok, "400", _, _} ->
+ success;
+ Res ->
+ {test_failed, Res}
+ end.
+
+%%------------------------------------------------------------------------------
+%% Test what happens when the request at the head of a pipeline times out
+%%------------------------------------------------------------------------------
+test_pipeline_head_timeout() ->
+ clear_msg_q(),
+ test_pipeline_head_timeout("http://localhost:8181/ibrowse_inac_timeout_test").
+
+test_pipeline_head_timeout(Url) ->
+ {ok, Pid} = ibrowse:spawn_worker_process(Url),
+ Test_parent = self(),
+ Fun = fun({fixed, Timeout}) ->
+ spawn(fun() ->
+ do_test_pipeline_head_timeout(Url, Pid, Test_parent, Timeout)
+ end);
+ (Timeout_mult) ->
+ spawn(fun() ->
+ Timeout = 1000 + Timeout_mult*1000,
+ do_test_pipeline_head_timeout(Url, Pid, Test_parent, Timeout)
+ end)
+ end,
+ Pids = [Fun(X) || X <- [{fixed, 32000} | lists:seq(1,10)]],
+ Result = accumulate_worker_resp(Pids),
+ case lists:all(fun({_, X_res}) ->
+ X_res == {error,req_timedout}
+ end, Result) of
+ true ->
+ success;
+ false ->
+ {test_failed, Result}
+ end.
+
+do_test_pipeline_head_timeout(Url, Pid, Test_parent, Req_timeout) ->
+ Resp = ibrowse:send_req_direct(
+ Pid,
+ Url,
+ [], get, [],
+ [{socket_options,[{keepalive,true}]},
+ {inactivity_timeout,180000},
+ {connect_timeout,180000}], Req_timeout),
+ Test_parent ! {self(), Resp}.
+
+accumulate_worker_resp(Pids) ->
+ accumulate_worker_resp(Pids, []).
+
+accumulate_worker_resp([_ | _] = Pids, Acc) ->
+ receive
+ {Pid, Res} when is_pid(Pid) ->
+ accumulate_worker_resp(Pids -- [Pid], [{Pid, Res} | Acc]);
+ Err ->
+ io:format("Received unexpected: ~p~n", [Err])
+ end;
+accumulate_worker_resp([], Acc) ->
+ lists:reverse(Acc).
+
+clear_msg_q() ->
+ receive
+ _ ->
+ clear_msg_q()
+ after 0 ->
+ ok
+ end.
+%%------------------------------------------------------------------------------
%%
%%------------------------------------------------------------------------------
Please sign in to comment.
Something went wrong with that request. Please try again.