Skip to content

Commit

Permalink
add stream_close.
Browse files Browse the repository at this point in the history
autoclose inactive connections.
  • Loading branch information
textendo committed Oct 8, 2010
1 parent 4f7da8b commit aec26ac
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 21 deletions.
14 changes: 14 additions & 0 deletions src/ibrowse.erl
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
send_req_direct/6,
send_req_direct/7,
stream_next/1,
stream_close/1,
set_max_sessions/3,
set_max_pipeline_size/3,
set_dest/3,
Expand Down Expand Up @@ -524,6 +525,19 @@ stream_next(Req_id) ->
ok
end.

%% @doc Tell ibrowse to close the stream.
%% Should be used in conjunction with the
%% <code>stream_to</code> option
%% @spec stream_close(Req_id :: req_id()) -> ok | {error, unknown_req_id}
stream_close(Req_id) ->
case ets:lookup(ibrowse_stream, {req_id_pid, Req_id}) of
[] ->
{error, unknown_req_id};
[{_, Pid}] ->
catch Pid ! {stream_close, Req_id},
ok
end.

%% @doc Turn tracing on for the ibrowse process
trace_on() ->
ibrowse ! {trace, true}.
Expand Down
60 changes: 43 additions & 17 deletions src/ibrowse_http_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
-include("ibrowse.hrl").

-record(state, {host, port, connect_timeout,
inactivity_timer_ref,
use_proxy = false, proxy_auth_digest,
ssl_options = [], is_ssl = false, socket,
proxy_tunnel_setup = false,
Expand Down Expand Up @@ -192,6 +193,15 @@ handle_info({stream_next, Req_id}, #state{socket = Socket,
handle_info({stream_next, _Req_id}, State) ->
{noreply, State};

handle_info({stream_close, Req_id}, #state{cur_req = #request{req_id = Req_id}} = State) ->
do_trace("Close request. Shutting down connection~n", []),
shutting_down(State),
do_error_reply(State, req_closed),
{stop, normal, State};

handle_info({stream_close, _Req_id}, State) ->
{noreply, State};

handle_info({tcp_closed, _Sock}, State) ->
do_trace("TCP connection closed by peer!~n", []),
handle_sock_closed(State),
Expand Down Expand Up @@ -221,6 +231,7 @@ handle_info({req_timedout, From}, State) ->
end;

handle_info(timeout, State) ->
do_trace("Inactivity timeout triggered. Shutting down connection~n", []),
shutting_down(State),
do_error_reply(State, req_timedout),
{stop, normal, State};
Expand Down Expand Up @@ -273,8 +284,8 @@ handle_sock_data(Data, #state{status = get_header}=State) ->
{stop, normal, State};
State_1 ->
active_once(State_1),
set_inac_timer(State_1),
{noreply, State_1}
State_2 = set_inac_timer(State_1),
{noreply, State_2}
end;

handle_sock_data(Data, #state{status = get_body,
Expand All @@ -293,8 +304,8 @@ handle_sock_data(Data, #state{status = get_body,
{stop, normal, State};
State_1 ->
active_once(State_1),
set_inac_timer(State_1),
{noreply, State_1}
State_2 = set_inac_timer(State_1),
{noreply, State_2}
end;
_ ->
case parse_11_response(Data, State) of
Expand All @@ -314,12 +325,12 @@ handle_sock_data(Data, #state{status = get_body,
active_once(State_1)
end,
State_2 = State_1#state{interim_reply_sent = false},
set_inac_timer(State_2),
{noreply, State_2};
State_3 = set_inac_timer(State_2),
{noreply, State_3};
State_1 ->
active_once(State_1),
set_inac_timer(State_1),
{noreply, State_1}
State_2 = set_inac_timer(State_1),
{noreply, State_2}
end
end.

Expand Down Expand Up @@ -636,8 +647,8 @@ send_req_1(From,
send_timer = Ref,
proxy_tunnel_setup = in_progress,
tunnel_setup_queue = [{From, Url, Headers, Method, Body, Options, Timeout}]},
set_inac_timer(State_1),
{noreply, State_2};
State_3 = set_inac_timer(State_2),
{noreply, State_3};
Err ->
shutting_down(State_1),
do_trace("Send failed... Reason: ~p~n", [Err]),
Expand Down Expand Up @@ -732,8 +743,8 @@ send_req_1(From,
_ ->
gen_server:reply(From, {ibrowse_req_id, ReqId})
end,
set_inac_timer(State_1),
{noreply, State_3};
State_4 = set_inac_timer(State_3),
{noreply, State_4};
Err ->
shutting_down(State_1),
do_trace("Send failed... Reason: ~p~n", [Err]),
Expand Down Expand Up @@ -1710,17 +1721,32 @@ get_stream_chunk_size(Options) ->
end.

set_inac_timer(State) ->
set_inac_timer(State, get_inac_timeout(State)).

set_inac_timer(_State, Timeout) when is_integer(Timeout) ->
erlang:send_after(Timeout, self(), timeout);
cancel_timer(State#state.inactivity_timer_ref),
set_inac_timer(State#state{inactivity_timer_ref = undefined},
get_inac_timeout(State)).

set_inac_timer(State, Timeout) when is_integer(Timeout) ->
Ref = erlang:send_after(Timeout, self(), timeout),
State#state{inactivity_timer_ref = Ref};
set_inac_timer(State, infinity) ->
State;
set_inac_timer(_, _) ->
undefined.

get_inac_timeout(#state{cur_req = #request{options = Opts}}) ->
get_value(inactivity_timeout, Opts, infinity);
get_inac_timeout(#state{cur_req = undefined}) ->
infinity.
case ibrowse:get_config_value(inactivity_timeout, undefined) of
Val when is_integer(Val) ->
Val;
_ ->
case application:get_env(ibrowse, inactivity_timeout) of
{ok, Val} when is_integer(Val), Val > 0 ->
Val;
_ ->
10000
end
end.

trace_request(Req) ->
case get(my_trace_flag) of
Expand Down
15 changes: 11 additions & 4 deletions src/ibrowse_lib.erl
Original file line number Diff line number Diff line change
Expand Up @@ -208,10 +208,17 @@ parse_url(Url) ->
parse_url([$:, $/, $/ | _], get_protocol, Url, []) ->
{invalid_uri_1, Url};
parse_url([$:, $/, $/ | T], get_protocol, Url, TmpAcc) ->
Prot = list_to_atom(lists:reverse(TmpAcc)),
parse_url(T, get_username,
Url#url{protocol = Prot},
[]);
%% Verify that the Protocol is supported and avoid atom pulution
case lists:member(lists:reverse(TmpAcc), ["http", "https"]) of
true ->
Prot = list_to_atom(lists:reverse(TmpAcc)),
parse_url(T, get_username,
Url#url{protocol = Prot},
[]);
false ->
%% Protocol not supported
{invalid_uri_3, get_protocol, Url, TmpAcc}
end;
parse_url([H | T], get_username, Url, TmpAcc) when H == $/;
H == $? ->
Path = case H of
Expand Down

0 comments on commit aec26ac

Please sign in to comment.