diff --git a/src/couch_replicator_dns.erl b/src/couch_replicator_dns.erl new file mode 100644 index 0000000..c2664ff --- /dev/null +++ b/src/couch_replicator_dns.erl @@ -0,0 +1,29 @@ +-module(couch_replicator_dns). + +-include_lib("kernel/src/inet_dns.hrl"). +-include_lib("ibrowse/include/ibrowse.hrl"). + +-export([ + lookup/1 +]). + +-spec lookup(iolist()) -> {ok, {term(), pos_integer()}} | {error, term()}. + +lookup(Url) -> + case ibrowse_lib:parse_url(Url) of + #url{host_type = hostname, host = Host} -> + case inet_res:resolve(Host, any, any) of + {error, {Error, _}} -> + {error, Error}; + {ok, #dns_rec{anlist=[Answer|_As]}} -> + #dns_rr{data=Data, ttl=TTL} = Answer, + {Data, TTL}; + Else -> + twig:log(warn, "Unknown DNS response: ~p", [Else]), + {error, Else} + end; + #url{} -> + {error, not_a_hostname}; + Else -> + {error, Else} + end. diff --git a/src/couch_replicator_httpc_pool.erl b/src/couch_replicator_httpc_pool.erl index c895048..affbcd3 100644 --- a/src/couch_replicator_httpc_pool.erl +++ b/src/couch_replicator_httpc_pool.erl @@ -12,7 +12,7 @@ -module(couch_replicator_httpc_pool). -behaviour(gen_server). --vsn(1). +-vsn(2). % public API -export([start_link/2, stop/1]). @@ -34,7 +34,10 @@ free = [], % free workers (connections) busy = [], % busy workers (connections) waiting = queue:new(), % blocked clients waiting for a worker - callers = [] % clients who've been given a worker + callers = [], % clients who've been given a worker + closing = [], % workers pending closing due to DNS refresh + dns_data, % The resolved IP for the target URL + dns_ttl % The DNS time-to-live }). @@ -55,114 +58,113 @@ release_worker(Pool, Worker) -> init({Url, Options}) -> process_flag(trap_exit, true), - State = #state{ + State0 = case couch_replicator_dns:lookup(Url) of + {ok, {Data, TTL}} -> + timer:send_after(TTL * 1000, refresh_dns), + #state{ + dns_data = Data, + dns_ttl = TTL + }; + {error, Error} -> + twig:log( + warn, + "couch_replicator_httpc_pool failed DNS lookup: ~p", + [Error] + ), + #state{} + end, + State = State0#state{ url = Url, limit = get_value(max_connections, Options) }, {ok, State}. -handle_call(get_worker, From, State) -> - #state{ - waiting = Waiting, - callers = Callers, - url = Url, - limit = Limit, - busy = Busy, - free = Free - } = State, - case length(Busy) >= Limit of - true -> - {noreply, State#state{waiting = queue:in(From, Waiting)}}; - false -> - case Free of - [] -> - {ok, Worker} = ibrowse:spawn_link_worker_process(Url), - Free2 = Free; - [Worker | Free2] -> - ok - end, - NewState = State#state{ - free = Free2, - busy = [Worker | Busy], - callers = monitor_client(Callers, Worker, From) - }, - {reply, {ok, Worker}, NewState} - end; +handle_call(get_worker, From, State0) -> + State1 = State0#state{ + waiting = queue:in(From, State0#state.waiting) + }, + State2 = maybe_supply_worker(State1), + {noreply, State2}; handle_call(stop, _From, State) -> {stop, normal, ok, State}. -handle_cast({release_worker, Worker}, State) -> - #state{waiting = Waiting, callers = Callers} = State, - NewCallers0 = demonitor_client(Callers, Worker), - case is_process_alive(Worker) andalso - lists:member(Worker, State#state.busy) of - true -> - case queue:out(Waiting) of - {empty, Waiting2} -> - NewCallers1 = NewCallers0, - Busy2 = State#state.busy -- [Worker], - Free2 = [Worker | State#state.free]; - {{value, From}, Waiting2} -> - NewCallers1 = monitor_client(NewCallers0, Worker, From), - gen_server:reply(From, {ok, Worker}), - Busy2 = State#state.busy, - Free2 = State#state.free - end, - NewState = State#state{ - busy = Busy2, - free = Free2, - waiting = Waiting2, - callers = NewCallers1 - }, - {noreply, NewState}; - false -> - {noreply, State#state{callers = NewCallers0}} - end. - -handle_info({'EXIT', Pid, _Reason}, State) -> - #state{ - url = Url, - busy = Busy, - free = Free, - waiting = Waiting, - callers = Callers - } = State, - NewCallers0 = demonitor_client(Callers, Pid), - case Free -- [Pid] of - Free -> - case Busy -- [Pid] of - Busy -> - {noreply, State#state{callers = NewCallers0}}; - Busy2 -> - case queue:out(Waiting) of - {empty, _} -> - {noreply, State#state{busy = Busy2, callers = NewCallers0}}; - {{value, From}, Waiting2} -> - {ok, Worker} = ibrowse:spawn_link_worker_process(Url), - NewCallers1 = monitor_client(NewCallers0, Worker, From), - gen_server:reply(From, {ok, Worker}), - NewState = State#state{ - busy = [Worker | Busy2], - waiting = Waiting2, - callers = NewCallers1 - }, - {noreply, NewState} - end - end; - Free2 -> - {noreply, State#state{free = Free2, callers = NewCallers0}} - end; +handle_cast({release_worker, Worker}, State0) -> + State1 = release_worker_int(Worker, State0), + State2 = maybe_supply_worker(State1), + {noreply, State2}. -handle_info({'DOWN', Ref, process, _, _}, #state{callers = Callers} = State) -> + +handle_info({'EXIT', Worker, _Reason}, State0) -> + State1 = release_worker_int(Worker, State0), + State2 = maybe_supply_worker(State1), + {noreply, State2}; + +handle_info({'DOWN', Ref, process, _, _}, #state{callers = Callers} = State0) -> case lists:keysearch(Ref, 2, Callers) of {value, {Worker, Ref}} -> - handle_cast({release_worker, Worker}, State); + State1 = release_worker_int(Worker, State0), + State2 = maybe_supply_worker(State1), + {noreply, State2}; false -> - {noreply, State} - end. + {noreply, State0} + end; + +handle_info(refresh_dns, State0) -> + #state{url=Url, dns_data=Data0}=State0, + State1 = case couch_replicator_dns:lookup(Url) of + {ok, {Data0, TTL}} -> + State0#state{dns_ttl=TTL}; + {ok, {Data1, TTL}} -> + #state{free=Free, busy=Busy, closing=Closing} = State0, + lists:foreach(fun ibrowse_http_client:stop/1, Free), + State0#state{ + dns_data=Data1, + dns_ttl=TTL, + closing=Busy ++ Closing, + free=[], + busy=[] + }; + {error, Error} -> + twig:log( + warn, + "couch_replicator_httpc_pool failed DNS lookup: ~p", + [Error] + ), + State0 + end, + timer:send_after(State1#state.dns_ttl * 1000, refresh_dns), + {noreply, State1}. + + +code_change(1, OldState, _Extra) -> + {state, Url, Limit, Free, Busy, Waiting, Callers} = OldState, + State0 = case couch_replicator_dns:lookup(Url) of + {ok, {Data, TTL}} -> + timer:send_after(TTL * 1000, refresh_dns), + #state{ + dns_data = Data, + dns_ttl = TTL + }; + {error, Error} -> + twig:log( + warn, + "couch_replicator_httpc_pool failed DNS lookup: ~p", + [Error] + ), + #state{} + end, + State1 = State0#state{ + url = Url, + limit = Limit, + free = Free, + busy = Busy, + waiting = Waiting, + callers = Callers + }, + {ok, State1}; code_change(_OldVsn, #state{}=State, _Extra) -> {ok, State}. @@ -172,6 +174,56 @@ terminate(_Reason, State) -> lists:foreach(fun ibrowse_http_client:stop/1, State#state.free), lists:foreach(fun ibrowse_http_client:stop/1, State#state.busy). + +release_worker_int(Worker, State0) -> + #state{ + callers = Callers, + busy = Busy, + closing = Closing + } = State0, + State0#state{ + callers = demonitor_client(Callers, Worker), + closing = Closing -- [Worker], + busy = Busy -- [Worker] + }. + + +maybe_supply_worker(State) -> + #state{ + url = Url, + waiting = Waiting0, + callers = Callers, + busy = Busy, + free = Free0, + closing = Closing, + limit = Limit + } = State, + case queue:out(Waiting0) of + {empty, Waiting0} -> + State; + {{value, From}, Waiting1} -> + case length(Busy) + length(Closing) >= Limit of + true -> + State; + false -> + {Worker, Free2} = case Free0 of + [] -> + {ok, W} = ibrowse:spawn_link_worker_process(Url), + {W, Free0}; + [W|Free1] -> + {W, Free1} + end, + gen_server:reply(From, {ok, Worker}), + State#state{ + callers = monitor_client(Callers, Worker, From), + waiting = Waiting1, + busy = [Worker|Busy], + free = Free2 + } + end + end. + + monitor_client(Callers, Worker, {ClientPid, _}) -> [{Worker, erlang:monitor(process, ClientPid)} | Callers].