Add functionality to respect DNS TTLs #55

base: master
29 changes: 29 additions & 0 deletions src/couch_replicator_dns.erl
@@ -0,0 +1,29 @@

Why did you put this in its own module?



-spec lookup(iolist()) -> {ok, {term(), pos_integer()}} | {error, term()}.

lookup(Url) ->
case ibrowse_lib:parse_url(Url) of
#url{host_type = hostname, host = Host} ->
case inet_res:resolve(Host, any, any) of
{error, {Error, _}} ->
{error, Error};
{ok, #dns_rec{anlist=[Answer|_As]}} ->

Did you inspect that and found dns_rec()? Per docs the return is dns_msg() and opaque, to be accessed using inet_dns: functions, msg() and rr() in this case.

#dns_rr{data=Data, ttl=TTL} = Answer,
{Data, TTL};
Else ->
twig:log(warn, "Unknown DNS response: ~p", [Else]),
{error, Else}
#url{} ->
{error, not_a_hostname};

Maybe should be no_hostname.

Else ->
{error, Else}
242 changes: 147 additions & 95 deletions src/couch_replicator_httpc_pool.erl
Expand Up @@ -12,7 +12,7 @@


% public API
-export([start_link/2, stop/1]).
Expand All @@ -34,7 +34,10 @@
free = [], % free workers (connections)
busy = [], % busy workers (connections)
waiting = queue:new(), % blocked clients waiting for a worker
callers = [] % clients who've been given a worker
callers = [], % clients who've been given a worker
closing = [], % workers pending closing due to DNS refresh
dns_data, % The resolved IP for the target URL
dns_ttl % The DNS time-to-live

Expand All @@ -55,114 +58,113 @@ release_worker(Pool, Worker) ->

init({Url, Options}) ->
process_flag(trap_exit, true),
State = #state{
State0 = case couch_replicator_dns:lookup(Url) of

Don't know if relevant but if the lookup hangs, could that be for long? And would it matter that it blocks the gen_server start up sequence possibly for that long? Not sure if that could lead to the server seeming to hang during start up.

{ok, {Data, TTL}} ->
timer:send_after(TTL * 1000, refresh_dns),

This seems the right thing to do but is the TTL ever actually relevant for us? Do we have a different, distinct pattern maybe when we would expect DNS updates? Just wondering.

dns_data = Data,
dns_ttl = TTL
{error, Error} ->
"couch_replicator_httpc_pool failed DNS lookup: ~p",

Is there a defined consequence if this happens? Like it is, the gen_server continues just with dns_data and dns_ttl not set?

State = State0#state{
url = Url,
limit = get_value(max_connections, Options)
{ok, State}.

handle_call(get_worker, From, State) ->
waiting = Waiting,
callers = Callers,
url = Url,
limit = Limit,
busy = Busy,
free = Free
} = State,
case length(Busy) >= Limit of
true ->
{noreply, State#state{waiting = queue:in(From, Waiting)}};
false ->
case Free of
[] ->
{ok, Worker} = ibrowse:spawn_link_worker_process(Url),
Free2 = Free;
[Worker | Free2] ->
NewState = State#state{
free = Free2,
busy = [Worker | Busy],
callers = monitor_client(Callers, Worker, From)
{reply, {ok, Worker}, NewState}
handle_call(get_worker, From, State0) ->

Since this never returns a worker anymore it might be renamed queue_for_worker. I am guessing there must be source that calls get_worker somewhere that has a clause for processing a reply that is now obsolete? However, I don't know how that works with code_change.

I find it surprising to have gen_server:reply buried in the maybe_supply_worker/1 code. I see it was so in the original. But the {noreply, State2} below is quite misleading through this. I think you could return {reply | noreply, StateN} from maybe_supply_worker/1.

State1 = State0#state{
waiting = queue:in(From, State0#state.waiting)
State2 = maybe_supply_worker(State1),
{noreply, State2};

handle_call(stop, _From, State) ->
{stop, normal, ok, State}.

handle_cast({release_worker, Worker}, State) ->
#state{waiting = Waiting, callers = Callers} = State,
NewCallers0 = demonitor_client(Callers, Worker),
case is_process_alive(Worker) andalso
lists:member(Worker, State#state.busy) of
true ->
case queue:out(Waiting) of
{empty, Waiting2} ->
NewCallers1 = NewCallers0,
Busy2 = State#state.busy -- [Worker],
Free2 = [Worker |];
{{value, From}, Waiting2} ->
NewCallers1 = monitor_client(NewCallers0, Worker, From),
gen_server:reply(From, {ok, Worker}),
Busy2 = State#state.busy,
Free2 =
NewState = State#state{
busy = Busy2,
free = Free2,
waiting = Waiting2,
callers = NewCallers1
{noreply, NewState};
false ->
{noreply, State#state{callers = NewCallers0}}

handle_info({'EXIT', Pid, _Reason}, State) ->
url = Url,
busy = Busy,
free = Free,
waiting = Waiting,
callers = Callers
} = State,
NewCallers0 = demonitor_client(Callers, Pid),
case Free -- [Pid] of
Free ->
case Busy -- [Pid] of
Busy ->
{noreply, State#state{callers = NewCallers0}};
Busy2 ->
case queue:out(Waiting) of
{empty, _} ->
{noreply, State#state{busy = Busy2, callers = NewCallers0}};
{{value, From}, Waiting2} ->
{ok, Worker} = ibrowse:spawn_link_worker_process(Url),
NewCallers1 = monitor_client(NewCallers0, Worker, From),
gen_server:reply(From, {ok, Worker}),
NewState = State#state{
busy = [Worker | Busy2],
waiting = Waiting2,
callers = NewCallers1
{noreply, NewState}
Free2 ->
{noreply, State#state{free = Free2, callers = NewCallers0}}
handle_cast({release_worker, Worker}, State0) ->
State1 = release_worker_int(Worker, State0),
State2 = maybe_supply_worker(State1),
{noreply, State2}.

handle_info({'DOWN', Ref, process, _, _}, #state{callers = Callers} = State) ->

handle_info({'EXIT', Worker, _Reason}, State0) ->
State1 = release_worker_int(Worker, State0),
State2 = maybe_supply_worker(State1),
{noreply, State2};

handle_info({'DOWN', Ref, process, _, _}, #state{callers = Callers} = State0) ->
case lists:keysearch(Ref, 2, Callers) of
{value, {Worker, Ref}} ->
handle_cast({release_worker, Worker}, State);
State1 = release_worker_int(Worker, State0),
State2 = maybe_supply_worker(State1),
{noreply, State2};
false ->
{noreply, State}
{noreply, State0}

handle_info(refresh_dns, State0) ->
#state{url=Url, dns_data=Data0}=State0,
State1 = case couch_replicator_dns:lookup(Url) of
{ok, {Data0, TTL}} ->
{ok, {Data1, TTL}} ->

You are just closing all connections down, without logging or re-opening them? That happens automatically?

#state{free=Free, busy=Busy, closing=Closing} = State0,
lists:foreach(fun ibrowse_http_client:stop/1, Free),
closing=Busy ++ Closing,
{error, Error} ->
"couch_replicator_httpc_pool failed DNS lookup: ~p",

Maybe for State0 in this case not keep the old TTL value, which now is dubious, but a special one that governs DNS retry intervals after failure.

timer:send_after(State1#state.dns_ttl * 1000, refresh_dns),

After an error, this serves as DNS lookup retry. There is no implicit timed retry like this, when the first DNS lookup fails.

When refresh_dns is ever called directly, not through the time call in the source, dns_ttl in the line above can be undefined, in the case that the initial DNS lookup failed. That seems likely to be the exact scenario where one would try again and trigger this clause. I haven't seen that the case of initial failure would stop the server.

{noreply, State1}.

code_change(1, OldState, _Extra) ->
{state, Url, Limit, Free, Busy, Waiting, Callers} = OldState,
State0 = case couch_replicator_dns:lookup(Url) of

This might hang, for a while, in a code change.

This piece is way too similar to handle_info(refresh_dns, State) clause. It also quite similar to the piece of logic in init/1.
Would it be possible to abstract this functionality out into refresh_dns function?

I wondered, too, but maybe it's on the edge; in the end, special casing just making it harder to read.

{ok, {Data, TTL}} ->
timer:send_after(TTL * 1000, refresh_dns),
dns_data = Data,
dns_ttl = TTL
{error, Error} ->
"couch_replicator_httpc_pool failed DNS lookup: ~p",
State1 = State0#state{
url = Url,
limit = Limit,
free = Free,
busy = Busy,
waiting = Waiting,
callers = Callers
{ok, State1};

code_change(_OldVsn, #state{}=State, _Extra) ->
{ok, State}.
Expand All @@ -172,6 +174,56 @@ terminate(_Reason, State) ->
lists:foreach(fun ibrowse_http_client:stop/1,,
lists:foreach(fun ibrowse_http_client:stop/1, State#state.busy).

release_worker_int(Worker, State0) ->
callers = Callers,
busy = Busy,
closing = Closing
} = State0,
callers = demonitor_client(Callers, Worker),
closing = Closing -- [Worker],
busy = Busy -- [Worker]

All three above fail silently. Since with a queue like this, vicious races are a possibility, one would want to add a logging assertion here, for when called from non-exiting places, e.g. that callers is now 1 shorter and closing, or busy, too


maybe_supply_worker(State) ->
url = Url,
waiting = Waiting0,
callers = Callers,
busy = Busy,
free = Free0,
closing = Closing,
limit = Limit
} = State,
case queue:out(Waiting0) of
{empty, Waiting0} ->
{{value, From}, Waiting1} ->
case length(Busy) + length(Closing) >= Limit of
true ->
false ->
{Worker, Free2} = case Free0 of
[] ->
{ok, W} = ibrowse:spawn_link_worker_process(Url),
{W, Free0};

I propose {W, []} for readability.

[W|Free1] ->
{W, Free1}
gen_server:reply(From, {ok, Worker}),
callers = monitor_client(Callers, Worker, From),
waiting = Waiting1,
busy = [Worker|Busy],
free = Free2

monitor_client(Callers, Worker, {ClientPid, _}) ->

While you're at it, monitor_client is badly named or made, the Callers parameter and return value of a list of callers with only the first now monitored (the others already are) and added to the list, that seems kludgy to me. I think it would be better to have callers = [monitor_client(Worker, From) | Callers], with monitor client -> {Worker, erlang:monitor(process, ClientPid)}. I do understand its a pendant to demonitor_client().

[{Worker, erlang:monitor(process, ClientPid)} | Callers].

Expand Down