From 35b4cd87ad14fe4022bac2df1200af47eb0ace77 Mon Sep 17 00:00:00 2001 From: Benjamin Anderson Date: Mon, 24 Nov 2014 14:46:57 -0800 Subject: [PATCH 1/3] Refactor couch_replicator_httpc_pool release/supply functionality BugzID: 41419 --- src/couch_replicator_httpc_pool.erl | 159 ++++++++++++---------------- 1 file changed, 68 insertions(+), 91 deletions(-) diff --git a/src/couch_replicator_httpc_pool.erl b/src/couch_replicator_httpc_pool.erl index c895048..a7bcd96 100644 --- a/src/couch_replicator_httpc_pool.erl +++ b/src/couch_replicator_httpc_pool.erl @@ -62,106 +62,36 @@ init({Url, Options}) -> {ok, State}. -handle_call(get_worker, From, State) -> - #state{ - waiting = Waiting, - callers = Callers, - url = Url, - limit = Limit, - busy = Busy, - free = Free - } = State, - case length(Busy) >= Limit of - true -> - {noreply, State#state{waiting = queue:in(From, Waiting)}}; - false -> - case Free of - [] -> - {ok, Worker} = ibrowse:spawn_link_worker_process(Url), - Free2 = Free; - [Worker | Free2] -> - ok - end, - NewState = State#state{ - free = Free2, - busy = [Worker | Busy], - callers = monitor_client(Callers, Worker, From) - }, - {reply, {ok, Worker}, NewState} - end; +handle_call(get_worker, From, State0) -> + State1 = State0#state{ + waiting = queue:in(From, State0#state.waiting) + }, + State2 = maybe_supply_worker(State1), + {noreply, State2}; handle_call(stop, _From, State) -> {stop, normal, ok, State}. -handle_cast({release_worker, Worker}, State) -> - #state{waiting = Waiting, callers = Callers} = State, - NewCallers0 = demonitor_client(Callers, Worker), - case is_process_alive(Worker) andalso - lists:member(Worker, State#state.busy) of - true -> - case queue:out(Waiting) of - {empty, Waiting2} -> - NewCallers1 = NewCallers0, - Busy2 = State#state.busy -- [Worker], - Free2 = [Worker | State#state.free]; - {{value, From}, Waiting2} -> - NewCallers1 = monitor_client(NewCallers0, Worker, From), - gen_server:reply(From, {ok, Worker}), - Busy2 = State#state.busy, - Free2 = State#state.free - end, - NewState = State#state{ - busy = Busy2, - free = Free2, - waiting = Waiting2, - callers = NewCallers1 - }, - {noreply, NewState}; - false -> - {noreply, State#state{callers = NewCallers0}} - end. - -handle_info({'EXIT', Pid, _Reason}, State) -> - #state{ - url = Url, - busy = Busy, - free = Free, - waiting = Waiting, - callers = Callers - } = State, - NewCallers0 = demonitor_client(Callers, Pid), - case Free -- [Pid] of - Free -> - case Busy -- [Pid] of - Busy -> - {noreply, State#state{callers = NewCallers0}}; - Busy2 -> - case queue:out(Waiting) of - {empty, _} -> - {noreply, State#state{busy = Busy2, callers = NewCallers0}}; - {{value, From}, Waiting2} -> - {ok, Worker} = ibrowse:spawn_link_worker_process(Url), - NewCallers1 = monitor_client(NewCallers0, Worker, From), - gen_server:reply(From, {ok, Worker}), - NewState = State#state{ - busy = [Worker | Busy2], - waiting = Waiting2, - callers = NewCallers1 - }, - {noreply, NewState} - end - end; - Free2 -> - {noreply, State#state{free = Free2, callers = NewCallers0}} - end; +handle_cast({release_worker, Worker}, State0) -> + State1 = release_worker_int(Worker, State0), + State2 = maybe_supply_worker(State1), + {noreply, State2}. -handle_info({'DOWN', Ref, process, _, _}, #state{callers = Callers} = State) -> + +handle_info({'EXIT', Worker, _Reason}, State0) -> + State1 = release_worker_int(Worker, State0), + State2 = maybe_supply_worker(State1), + {noreply, State2}; + +handle_info({'DOWN', Ref, process, _, _}, #state{callers = Callers} = State0) -> case lists:keysearch(Ref, 2, Callers) of {value, {Worker, Ref}} -> - handle_cast({release_worker, Worker}, State); + State1 = release_worker_int(Worker, State0), + State2 = maybe_supply_worker(State1), + {noreply, State2}; false -> - {noreply, State} + {noreply, State0} end. code_change(_OldVsn, #state{}=State, _Extra) -> @@ -172,6 +102,53 @@ terminate(_Reason, State) -> lists:foreach(fun ibrowse_http_client:stop/1, State#state.free), lists:foreach(fun ibrowse_http_client:stop/1, State#state.busy). + +release_worker_int(Worker, State0) -> + #state{ + callers = Callers, + busy = Busy + } = State0, + State0#state{ + callers = demonitor_client(Callers, Worker), + busy = Busy -- [Worker] + }. + + +maybe_supply_worker(State) -> + #state{ + url = Url, + waiting = Waiting0, + callers = Callers, + busy = Busy, + free = Free0, + limit = Limit + } = State, + case queue:out(Waiting0) of + {empty, Waiting0} -> + State; + {{value, From}, Waiting1} -> + case length(Busy) >= Limit of + true -> + State; + false -> + {Worker, Free2} = case Free0 of + [] -> + {ok, W} = ibrowse:spawn_link_worker_process(Url), + {W, Free0}; + [W|Free1] -> + {W, Free1} + end, + gen_server:reply(From, {ok, Worker}), + State#state{ + callers = monitor_client(Callers, Worker, From), + waiting = Waiting1, + busy = [Worker|Busy], + free = Free2 + } + end + end. + + monitor_client(Callers, Worker, {ClientPid, _}) -> [{Worker, erlang:monitor(process, ClientPid)} | Callers]. From f5afe617fce26e8ca7e5b1fef5bc685aa74fab7f Mon Sep 17 00:00:00 2001 From: Benjamin Anderson Date: Mon, 24 Nov 2014 15:38:04 -0800 Subject: [PATCH 2/3] Add DNS TTL functionality to couch_replicator_httpc_pool This feature periodically checks the DNS entry for the target URL. If DNS returns data that's different from the previous check, all open connections are gracefully closed in favor of new ones, which will perform a new DNS lookup upon connection establishment. BugzID: 41419 --- src/couch_replicator_dns.erl | 29 +++++++++++++++ src/couch_replicator_httpc_pool.erl | 58 ++++++++++++++++++++++++++--- 2 files changed, 82 insertions(+), 5 deletions(-) create mode 100644 src/couch_replicator_dns.erl diff --git a/src/couch_replicator_dns.erl b/src/couch_replicator_dns.erl new file mode 100644 index 0000000..c2664ff --- /dev/null +++ b/src/couch_replicator_dns.erl @@ -0,0 +1,29 @@ +-module(couch_replicator_dns). + +-include_lib("kernel/src/inet_dns.hrl"). +-include_lib("ibrowse/include/ibrowse.hrl"). + +-export([ + lookup/1 +]). + +-spec lookup(iolist()) -> {ok, {term(), pos_integer()}} | {error, term()}. + +lookup(Url) -> + case ibrowse_lib:parse_url(Url) of + #url{host_type = hostname, host = Host} -> + case inet_res:resolve(Host, any, any) of + {error, {Error, _}} -> + {error, Error}; + {ok, #dns_rec{anlist=[Answer|_As]}} -> + #dns_rr{data=Data, ttl=TTL} = Answer, + {Data, TTL}; + Else -> + twig:log(warn, "Unknown DNS response: ~p", [Else]), + {error, Else} + end; + #url{} -> + {error, not_a_hostname}; + Else -> + {error, Else} + end. diff --git a/src/couch_replicator_httpc_pool.erl b/src/couch_replicator_httpc_pool.erl index a7bcd96..94d6496 100644 --- a/src/couch_replicator_httpc_pool.erl +++ b/src/couch_replicator_httpc_pool.erl @@ -34,7 +34,10 @@ free = [], % free workers (connections) busy = [], % busy workers (connections) waiting = queue:new(), % blocked clients waiting for a worker - callers = [] % clients who've been given a worker + callers = [], % clients who've been given a worker + closing = [], % workers pending closing due to DNS refresh + dns_data, % The resolved IP for the target URL + dns_ttl % The DNS time-to-live }). @@ -55,7 +58,22 @@ release_worker(Pool, Worker) -> init({Url, Options}) -> process_flag(trap_exit, true), - State = #state{ + State0 = case couch_replicator_dns:lookup(Url) of + {ok, {Data, TTL}} -> + timer:send_after(TTL * 1000, refresh_dns), + #state{ + dns_data = Data, + dns_ttl = TTL + }; + {error, Error} -> + twig:log( + warn, + "couch_replicator_httpc_pool failed DNS lookup: ~p", + [Error] + ), + #state{} + end, + State = State0#state{ url = Url, limit = get_value(max_connections, Options) }, @@ -92,7 +110,34 @@ handle_info({'DOWN', Ref, process, _, _}, #state{callers = Callers} = State0) -> {noreply, State2}; false -> {noreply, State0} - end. + end; + +handle_info(refresh_dns, State0) -> + #state{url=Url, dns_data=Data0}=State0, + State1 = case couch_replicator_dns:lookup(Url) of + {ok, {Data0, TTL}} -> + State0#state{dns_ttl=TTL}; + {ok, {Data1, TTL}} -> + #state{free=Free, busy=Busy, closing=Closing} = State0, + lists:foreach(fun ibrowse_http_client:stop/1, Free), + State0#state{ + dns_data=Data1, + dns_ttl=TTL, + closing=Busy ++ Closing, + free=[], + busy=[] + }; + {error, Error} -> + twig:log( + warn, + "couch_replicator_httpc_pool failed DNS lookup: ~p", + [Error] + ), + State0 + end, + timer:send_after(State1#state.dns_ttl * 1000, refresh_dns), + {noreply, State1}. + code_change(_OldVsn, #state{}=State, _Extra) -> {ok, State}. @@ -106,10 +151,12 @@ terminate(_Reason, State) -> release_worker_int(Worker, State0) -> #state{ callers = Callers, - busy = Busy + busy = Busy, + closing = Closing } = State0, State0#state{ callers = demonitor_client(Callers, Worker), + closing = Closing -- [Worker], busy = Busy -- [Worker] }. @@ -121,13 +168,14 @@ maybe_supply_worker(State) -> callers = Callers, busy = Busy, free = Free0, + closing = Closing, limit = Limit } = State, case queue:out(Waiting0) of {empty, Waiting0} -> State; {{value, From}, Waiting1} -> - case length(Busy) >= Limit of + case length(Busy) + length(Closing) >= Limit of true -> State; false -> From 65cfb1f45a8156f62ad279a1ee18e35b2dff8564 Mon Sep 17 00:00:00 2001 From: Benjamin Anderson Date: Mon, 24 Nov 2014 15:42:16 -0800 Subject: [PATCH 3/3] Add a code_change clause for DNS TTL additions BugzID: 41419 --- src/couch_replicator_httpc_pool.erl | 29 ++++++++++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/src/couch_replicator_httpc_pool.erl b/src/couch_replicator_httpc_pool.erl index 94d6496..affbcd3 100644 --- a/src/couch_replicator_httpc_pool.erl +++ b/src/couch_replicator_httpc_pool.erl @@ -12,7 +12,7 @@ -module(couch_replicator_httpc_pool). -behaviour(gen_server). --vsn(1). +-vsn(2). % public API -export([start_link/2, stop/1]). @@ -139,6 +139,33 @@ handle_info(refresh_dns, State0) -> {noreply, State1}. +code_change(1, OldState, _Extra) -> + {state, Url, Limit, Free, Busy, Waiting, Callers} = OldState, + State0 = case couch_replicator_dns:lookup(Url) of + {ok, {Data, TTL}} -> + timer:send_after(TTL * 1000, refresh_dns), + #state{ + dns_data = Data, + dns_ttl = TTL + }; + {error, Error} -> + twig:log( + warn, + "couch_replicator_httpc_pool failed DNS lookup: ~p", + [Error] + ), + #state{} + end, + State1 = State0#state{ + url = Url, + limit = Limit, + free = Free, + busy = Busy, + waiting = Waiting, + callers = Callers + }, + {ok, State1}; + code_change(_OldVsn, #state{}=State, _Extra) -> {ok, State}.