Skip to content
This repository
Browse code

Replicator now uses lhttpc instead of ibrowse

Change-Id: I9c920c8b350c8d2234253103696eb4b16b981527
Reviewed-on: http://review.couchbase.org/14577
Tested-by: buildbot <build@couchbase.com>
Reviewed-by: Damien Katz <damien@couchbase.com>
Tested-by: Damien Katz <damien@couchbase.com>
  • Loading branch information...
commit 7a63d76fcab3dcb3a8bdd1f67684e86a11380b60 1 parent 5a3fdc9
Filipe Manana authored April 05, 2012 Damienkatz committed April 06, 2012
2  src/couchdb/Makefile.am
@@ -53,7 +53,6 @@ source_files = \
53 53
     couch_external_server.erl \
54 54
     couch_file.erl \
55 55
     couch_db_frontend.erl \
56  
-    couch_httpc_pool.erl \
57 56
     couch_httpd.erl \
58 57
     couch_httpd_db.erl \
59 58
     couch_httpd_auth.erl \
@@ -133,7 +132,6 @@ compiled_files = \
133 132
     couch_file.beam \
134 133
     couch_file_write_guard.beam \
135 134
     couch_db_frontend.beam \
136  
-    couch_httpc_pool.beam \
137 135
     couch_httpd.beam \
138 136
     couch_httpd_db.beam \
139 137
     couch_httpd_auth.beam \
101  src/couchdb/couch_api_wrap.erl
@@ -69,9 +69,9 @@ db_open(#httpdb{} = Db1, _Options, Create) ->
69 69
     false ->
70 70
         ok;
71 71
     true ->
72  
-        send_req(Db, [{method, put}], fun(_, _, _) -> ok end)
  72
+        send_req(Db, [{method, "PUT"}], fun(_, _, _) -> ok end)
73 73
     end,
74  
-    send_req(Db, [{method, head}],
  74
+    send_req(Db, [{method, "HEAD"}],
75 75
         fun(200, _, _) ->
76 76
             {ok, Db};
77 77
         (401, _, _) ->
@@ -100,9 +100,8 @@ db_open(DbName, Options, Create) ->
100 100
         throw({unauthorized, DbName})
101 101
     end.
102 102
 
103  
-db_close(#httpdb{httpc_pool = Pool}) ->
104  
-    unlink(Pool),
105  
-    ok = couch_httpc_pool:stop(Pool);
  103
+db_close(#httpdb{} = HttpDb) ->
  104
+    ok = couch_api_wrap_httpc:tear_down(HttpDb);
106 105
 db_close(DbName) ->
107 106
     catch couch_db:close(DbName).
108 107
 
@@ -122,7 +121,7 @@ get_db_info(#db{name = DbName, user_ctx = UserCtx}) ->
122 121
 ensure_full_commit(#httpdb{} = Db) ->
123 122
     send_req(
124 123
         Db,
125  
-        [{method, post}, {path, "_ensure_full_commit"},
  124
+        [{method, "POST"}, {path, "_ensure_full_commit"},
126 125
             {headers, [{"Content-Type", "application/json"}]}],
127 126
         fun(201, _, {Props}) ->
128 127
             {ok, get_value(<<"instance_start_time">>, Props)};
@@ -137,7 +136,7 @@ get_missing_revs(#httpdb{} = Db, IdRevList) ->
137 136
     JsonBody = {[{Id, couch_doc:rev_to_str(Rev)} || {Id, Rev} <- IdRevList]},
138 137
     send_req(
139 138
         Db,
140  
-        [{method, post}, {path, "_revs_diff"}, {body, ?JSON_ENCODE(JsonBody)}],
  139
+        [{method, "POST"}, {path, "_revs_diff"}, {body, ?JSON_ENCODE(JsonBody)}],
141 140
         fun(200, _, {Props}) ->
142 141
             ConvertToNativeFun = fun({Id, {Result}}) ->
143 142
                 MissingRev = couch_doc:parse_rev(
@@ -202,7 +201,6 @@ update_docs(Db, DocList, Options) ->
202 201
 update_docs(_, [], _, _) ->
203 202
     ok;
204 203
 update_docs(#httpdb{} = HttpDb, DocList, Options, UpdateType) ->
205  
-    FullCommit = atom_to_list(not lists:member(delay_commit, Options)),
206 204
     Prefix = case UpdateType of
207 205
     replicated_changes ->
208 206
         <<"{\"new_edits\":false,\"docs\":[">>;
@@ -222,31 +220,37 @@ update_docs(#httpdb{} = HttpDb, DocList, Options, UpdateType) ->
222 220
         end,
223 221
         byte_size(Prefix) + byte_size(Suffix) + length(DocList) - 1,
224 222
         DocList),
225  
-    BodyFun = fun(eof) ->
226  
-            eof;
227  
-        ([]) ->
228  
-            {ok, Suffix, eof};
229  
-        ([prefix | Rest]) ->
230  
-            {ok, Prefix, Rest};
231  
-        ([Doc]) ->
232  
-            {ok, Doc, []};
233  
-        ([Doc | RestDocs]) ->
234  
-            {ok, [Doc, ","], RestDocs}
235  
-    end,
236 223
     Headers = [
237  
-        {"Content-Length", Len},
238  
-        {"Content-Type", "application/json"},
239  
-        {"X-Couch-Full-Commit", FullCommit}
  224
+        {"Content-Length", integer_to_list(Len)},
  225
+        {"Content-Type", "application/json"}
240 226
     ],
241  
-    send_req(
242  
-        HttpDb,
243  
-        [{method, post}, {path, "_bulk_docs"},
244  
-            {body, {BodyFun, [prefix | Docs]}}, {headers, Headers}],
245  
-        fun(201, _, _) ->
246  
-                ok;
247  
-           (_, _, Error) ->
248  
-                {ok, Error}
249  
-        end);
  227
+    ReqOptions = [
  228
+        {method, "POST"},
  229
+        {path, "_bulk_docs"},
  230
+        {headers, maybe_add_delayed_commit(Headers, Options)},
  231
+        {lhttpc_options, [{partial_upload, 3}]}
  232
+    ],
  233
+    SendDocsFun = fun(Data, {SendFun, N}) ->
  234
+        {ok, SendFun2} = case N > 1 of
  235
+        true ->
  236
+            SendFun([Data, <<",">>]);
  237
+        false ->
  238
+            SendFun(Data)
  239
+        end,
  240
+        {SendFun2, N - 1}
  241
+    end,
  242
+    ReqCallback = fun(UploadFun) ->
  243
+        {ok, UploadFun2} = UploadFun(Prefix),
  244
+        {UploadFun3, 0} = lists:foldl(SendDocsFun, {UploadFun2, length(Docs)}, Docs),
  245
+        {ok, UploadFun4} = UploadFun3(Suffix),
  246
+        case UploadFun4(eof) of
  247
+        {ok, 201, _Headers, _Body} ->
  248
+            ok;
  249
+        {ok, _Code, _Headers, Error} ->
  250
+            {ok, Error}
  251
+        end
  252
+    end,
  253
+    send_req(HttpDb, ReqOptions, ReqCallback);
250 254
 update_docs(Db, DocList, Options, replicated_changes) ->
251 255
     ok = couch_db:update_docs(Db, DocList, Options).
252 256
 
@@ -265,25 +269,36 @@ changes_since(#httpdb{headers = Headers1} = HttpDb, Style, StartSeq,
265 269
     {QArgs, Method, Body, Headers} = case DocIds of
266 270
     undefined ->
267 271
         QArgs1 = maybe_add_changes_filter_q_args(BaseQArgs, Options),
268  
-        {QArgs1, get, [], Headers1};
  272
+        {QArgs1, "GET", [], Headers1};
269 273
     _ when is_list(DocIds) ->
270 274
         Headers2 = [{"Content-Type", "application/json"} | Headers1],
271 275
         JsonDocIds = ?JSON_ENCODE({[{<<"doc_ids">>, DocIds}]}),
272  
-        {[{"filter", "_doc_ids"} | BaseQArgs], post, JsonDocIds, Headers2}
  276
+        {[{"filter", "_doc_ids"} | BaseQArgs], "POST", JsonDocIds, Headers2}
273 277
     end,
  278
+    ReqOptions = [
  279
+        {method, Method},
  280
+        {path, "_changes"},
  281
+        {body, Body},
  282
+        {headers, Headers},
  283
+        {qs, QArgs},
  284
+        {lhttpc_options, [{partial_download, [{window_size, 3}]}]}
  285
+    ],
274 286
     send_req(
275 287
         HttpDb,
276  
-        [{method, Method}, {path, "_changes"}, {qs, QArgs},
277  
-            {headers, Headers}, {body, Body},
278  
-            {ibrowse_options, [{stream_to, {self(), once}}]}],
  288
+        ReqOptions,
279 289
         fun(200, _, DataStreamFun) ->
280 290
                 parse_changes_feed(Options, UserFun, DataStreamFun);
281 291
             (405, _, _) when is_list(DocIds) ->
282 292
                 % CouchDB versions < 1.1.0 don't have the builtin _changes feed
283 293
                 % filter "_doc_ids" neither support POST
284  
-                send_req(HttpDb, [{method, get}, {path, "_changes"},
285  
-                    {qs, BaseQArgs}, {headers, Headers1},
286  
-                    {ibrowse_options, [{stream_to, {self(), once}}]}],
  294
+                Req2Options = [
  295
+                    {method, "GET"},
  296
+                    {path, "_changes"},
  297
+                    {qs, BaseQArgs},
  298
+                    {headers, Headers1},
  299
+                    {lhttpc_options, [{partial_download, [{window_size, 3}]}]}
  300
+                ],
  301
+                send_req(HttpDb, Req2Options,
287 302
                     fun(200, _, DataStreamFun2) ->
288 303
                         UserFun2 = fun(#doc_info{id = Id} = DocInfo) ->
289 304
                             case lists:member(Id, DocIds) of
@@ -427,3 +442,11 @@ json_to_doc_info({Props}) ->
427 442
         deleted = Del
428 443
     }.
429 444
 
  445
+maybe_add_delayed_commit(Headers, Options) ->
  446
+    case lists:member(delay_commit, Options) of
  447
+    true ->
  448
+        [{"X-Couch-Full-Commit", "false"} | Headers];
  449
+    false ->
  450
+        Headers
  451
+    end.
  452
+
2  src/couchdb/couch_api_wrap.hrl
@@ -20,7 +20,7 @@
20 20
         {"User-Agent", "CouchDB/" ++ couch_server:get_version()}
21 21
     ],
22 22
     timeout,            % milliseconds
23  
-    ibrowse_options = [],
  23
+    lhttpc_options = [],
24 24
     retries = 10,
25 25
     wait = 250,         % milliseconds
26 26
     httpc_pool = nil,
333  src/couchdb/couch_api_wrap_httpc.erl
@@ -14,9 +14,9 @@
14 14
 
15 15
 -include("couch_db.hrl").
16 16
 -include("couch_api_wrap.hrl").
17  
--include("../ibrowse/ibrowse.hrl").
  17
+-include("../lhttpc/lhttpc.hrl").
18 18
 
19  
--export([setup/1]).
  19
+-export([setup/1, tear_down/1]).
20 20
 -export([send_req/3]).
21 21
 -export([full_url/2]).
22 22
 
@@ -27,145 +27,122 @@
27 27
 
28 28
 -define(replace(L, K, V), lists:keystore(K, 1, L, {K, V})).
29 29
 -define(MAX_WAIT, 5 * 60 * 1000).
  30
+-define(NOT_HTTP_ERROR(Code),
  31
+    (Code =:= 200 orelse Code =:= 201 orelse
  32
+        (Code >= 400 andalso Code < 500))).
  33
+-define(IS_HTTP_REDIRECT(Code),
  34
+    (Code =:= 301 orelse Code =:= 302 orelse Code =:= 303)).
30 35
 
31 36
 
32  
-setup(#httpdb{httpc_pool = nil, url = Url, http_connections = MaxConns} = Db) ->
33  
-    {ok, Pid} = couch_httpc_pool:start_link(Url, [{max_connections, MaxConns}]),
  37
+setup(#httpdb{httpc_pool = nil} = Db) ->
  38
+    #httpdb{timeout = Timeout, http_connections = MaxConns} = Db,
  39
+    {ok, Pid} = lhttpc_manager:start_link(
  40
+        [{connection_timeout, Timeout}, {pool_size, MaxConns}]),
34 41
     {ok, Db#httpdb{httpc_pool = Pid}}.
35 42
 
36 43
 
37  
-send_req(HttpDb, Params1, Callback) ->
38  
-    Params2 = ?replace(Params1, qs,
39  
-        [{K, ?b2l(iolist_to_binary(V))} || {K, V} <- get_value(qs, Params1, [])]),
40  
-    Params = ?replace(Params2, ibrowse_options,
41  
-        lists:keysort(1, get_value(ibrowse_options, Params2, []))),
42  
-    {Worker, Response} = send_ibrowse_req(HttpDb, Params),
43  
-    process_response(Response, Worker, HttpDb, Params, Callback).
  44
+tear_down(#httpdb{httpc_pool = Pool}) ->
  45
+    couch_util:shutdown_sync(Pool).
44 46
 
45 47
 
46  
-send_ibrowse_req(#httpdb{headers = BaseHeaders} = HttpDb, Params) ->
47  
-    Method = get_value(method, Params, get),
  48
+send_req(HttpDb, Params, Callback) ->
  49
+    Qs1 = get_value(qs, Params, []),
  50
+    Qs2 = [{K, ?b2l(iolist_to_binary(V))} || {K, V} <- Qs1],
  51
+    Params2 = ?replace(Params, qs, Qs2),
  52
+    Response = send_lhttpc_req(HttpDb, Params2),
  53
+    process_response(Response, HttpDb, Params2, Callback).
  54
+
  55
+
  56
+send_lhttpc_req(#httpdb{headers = BaseHeaders} = HttpDb, Params) ->
  57
+    Method = get_value(method, Params, "GET"),
48 58
     UserHeaders = lists:keysort(1, get_value(headers, Params, [])),
49 59
     Headers1 = lists:ukeymerge(1, UserHeaders, BaseHeaders),
50 60
     Headers2 = oauth_header(HttpDb, Params) ++ Headers1,
51 61
     Url = full_url(HttpDb, Params),
52 62
     Body = get_value(body, Params, []),
53  
-    case get_value(path, Params) of
54  
-    "_changes" ->
55  
-        {ok, Worker} = ibrowse:spawn_link_worker_process(Url);
56  
-    _ ->
57  
-        {ok, Worker} = couch_httpc_pool:get_worker(HttpDb#httpdb.httpc_pool)
58  
-    end,
59  
-    IbrowseOptions = [
60  
-        {response_format, binary}, {inactivity_timeout, HttpDb#httpdb.timeout} |
61  
-        lists:ukeymerge(1, get_value(ibrowse_options, Params, []),
62  
-            HttpDb#httpdb.ibrowse_options)
  63
+    Timeout = HttpDb#httpdb.timeout,
  64
+    CallerLhttpcOptions = lists:keysort(1, get_value(lhttpc_options, Params, [])),
  65
+    LhttpcOptions = [
  66
+        {pool, HttpDb#httpdb.httpc_pool},
  67
+        {connect_timeout, Timeout} |
  68
+        lists:ukeymerge(1, CallerLhttpcOptions, HttpDb#httpdb.lhttpc_options)
63 69
     ],
64  
-    Response = ibrowse:send_req_direct(
65  
-        Worker, Url, Headers2, Method, Body, IbrowseOptions, infinity),
66  
-    {Worker, Response}.
67  
-
68  
-
69  
-process_response({error, sel_conn_closed}, _Worker, HttpDb, Params, Callback) ->
70  
-    send_req(HttpDb, Params, Callback);
71  
-
72  
-process_response({error, {'EXIT', {normal, _}}}, _Worker, HttpDb, Params, Cb) ->
73  
-    % ibrowse worker terminated because remote peer closed the socket
74  
-    % -> not an error
75  
-    send_req(HttpDb, Params, Cb);
76  
-
77  
-process_response({ibrowse_req_id, ReqId}, Worker, HttpDb, Params, Callback) ->
78  
-    process_stream_response(ReqId, Worker, HttpDb, Params, Callback);
79  
-
80  
-process_response({ok, Code, Headers, Body}, Worker, HttpDb, Params, Callback) ->
81  
-    release_worker(Worker, HttpDb),
82  
-    case list_to_integer(Code) of
83  
-    Ok when Ok =:= 200 ; Ok =:= 201 ; (Ok >= 400 andalso Ok < 500) ->
84  
-        EJson = case Body of
85  
-        <<>> ->
86  
-            null;
87  
-        Json ->
88  
-            ?JSON_DECODE(Json)
89  
-        end,
90  
-        Callback(Ok, Headers, EJson);
91  
-    R when R =:= 301 ; R =:= 302 ; R =:= 303 ->
92  
-        do_redirect(Worker, R, Headers, HttpDb, Params, Callback);
93  
-    Error ->
94  
-        maybe_retry({code, Error}, Worker, HttpDb, Params, Callback)
95  
-    end;
96  
-
97  
-process_response(Error, Worker, HttpDb, Params, Callback) ->
98  
-    maybe_retry(Error, Worker, HttpDb, Params, Callback).
99  
-
100  
-
101  
-process_stream_response(ReqId, Worker, HttpDb, Params, Callback) ->
102  
-    receive
103  
-    {ibrowse_async_headers, ReqId, Code, Headers} ->
104  
-        case list_to_integer(Code) of
105  
-        Ok when Ok =:= 200 ; Ok =:= 201 ; (Ok >= 400 andalso Ok < 500) ->
106  
-            StreamDataFun = fun() ->
107  
-                stream_data_self(HttpDb, Params, Worker, ReqId, Callback)
108  
-            end,
109  
-            ibrowse:stream_next(ReqId),
110  
-            try
111  
-                Ret = Callback(Ok, Headers, StreamDataFun),
112  
-                release_worker(Worker, HttpDb),
113  
-                clean_mailbox_req(ReqId),
114  
-                Ret
115  
-            catch throw:{maybe_retry_req, Err} ->
116  
-                clean_mailbox_req(ReqId),
117  
-                maybe_retry(Err, Worker, HttpDb, Params, Callback)
118  
-            end;
119  
-        R when R =:= 301 ; R =:= 302 ; R =:= 303 ->
120  
-            do_redirect(Worker, R, Headers, HttpDb, Params, Callback);
121  
-        Error ->
122  
-            report_error(Worker, HttpDb, Params, {code, Error})
123  
-        end;
124  
-    {ibrowse_async_response, ReqId, {error, _} = Error} ->
125  
-        maybe_retry(Error, Worker, HttpDb, Params, Callback)
126  
-    after HttpDb#httpdb.timeout + 500 ->
127  
-        % Note: ibrowse should always reply with timeouts, but this doesn't
128  
-        % seem to be always true when there's a very high rate of requests
129  
-        % and many open connections.
130  
-        maybe_retry(timeout, Worker, HttpDb, Params, Callback)
  70
+    try
  71
+        lhttpc:request(Url, Method, Headers2, Body, Timeout, LhttpcOptions)
  72
+    catch exit:ExitReason ->
  73
+        {error, ExitReason}
131 74
     end.
132 75
 
133 76
 
134  
-clean_mailbox_req(ReqId) ->
135  
-    receive
136  
-    {ibrowse_async_response, ReqId, _} ->
137  
-        clean_mailbox_req(ReqId);
138  
-    {ibrowse_async_response_end, ReqId} ->
139  
-        clean_mailbox_req(ReqId)
140  
-    after 0 ->
141  
-        ok
142  
-    end.
  77
+process_response({ok, {{Code, _}, Headers, _Body}}, HttpDb, Params, Callback) when
  78
+        ?IS_HTTP_REDIRECT(Code) ->
  79
+    do_redirect(Code, Headers, HttpDb, Params, Callback);
143 80
 
  81
+process_response({ok, {{Code, _}, Headers, Pid}}, HttpDb, Params, Callback) when
  82
+        is_pid(Pid) ->
  83
+    process_stream_response(Code, Headers, Pid, HttpDb, Params, Callback);
144 84
 
145  
-release_worker(Worker, #httpdb{httpc_pool = Pool}) ->
146  
-    ok = couch_httpc_pool:release_worker(Pool, Worker).
  85
+process_response({ok, {{Code, _}, Headers, Body}}, HttpDb, Params, Callback) ->
  86
+    case ?NOT_HTTP_ERROR(Code) of
  87
+    true ->
  88
+        Callback(Code, Headers, decode_body(Body));
  89
+    false ->
  90
+        maybe_retry({code, Code}, HttpDb, Params, Callback)
  91
+    end;
  92
+
  93
+process_response({ok, UploadState0}, HttpDb, Params, Callback) ->
  94
+    UploadFun = make_upload_fun(UploadState0, HttpDb),
  95
+    try
  96
+        Callback(UploadFun)
  97
+    catch
  98
+    throw:{redirect_req, Code, Headers} ->
  99
+        do_redirect(Code, Headers, HttpDb, Params, Callback);
  100
+    throw:{maybe_retry_req, Error} ->
  101
+        maybe_retry(Error, HttpDb, Params, Callback)
  102
+    end;
  103
+
  104
+process_response(Error, HttpDb, Params, Callback) ->
  105
+    maybe_retry(Error, HttpDb, Params, Callback).
  106
+
  107
+
  108
+process_stream_response(Code, Headers, Pid, HttpDb, Params, Callback) ->
  109
+    case ?NOT_HTTP_ERROR(Code) of
  110
+    true ->
  111
+        StreamDataFun = fun() ->
  112
+            stream_data_self(HttpDb, Params, Pid, Callback)
  113
+        end,
  114
+        try
  115
+            RetValue = Callback(Code, Headers, StreamDataFun),
  116
+            receive {http_eob, Pid, _Trailers} -> ok end,
  117
+            RetValue
  118
+        catch throw:{maybe_retry_req, Err} ->
  119
+            maybe_retry(Err, HttpDb, Params, Callback)
  120
+        end;
  121
+    false ->
  122
+        report_error(HttpDb, Params, {code, Code})
  123
+    end.
147 124
 
148 125
 
149  
-maybe_retry(Error, Worker, #httpdb{retries = 0} = HttpDb, Params, _Cb) ->
150  
-    report_error(Worker, HttpDb, Params, {error, Error});
  126
+maybe_retry(Error, #httpdb{retries = 0} = HttpDb, Params, _Callback) ->
  127
+    report_error(HttpDb, Params, {error, Error});
151 128
 
152  
-maybe_retry(Error, Worker, #httpdb{retries = Retries, wait = Wait} = HttpDb,
153  
-    Params, Cb) ->
154  
-    release_worker(Worker, HttpDb),
155  
-    Method = string:to_upper(atom_to_list(get_value(method, Params, get))),
  129
+maybe_retry(Error, #httpdb{retries = Retries} = HttpDb, Params, Callback) ->
  130
+    Method = get_value(method, Params, "GET"),
156 131
     Url = couch_util:url_strip_password(full_url(HttpDb, Params)),
157 132
     ?LOG_INFO("Retrying ~s request to ~s in ~p seconds due to error ~s",
158  
-        [Method, Url, Wait / 1000, error_cause(Error)]),
159  
-    ok = timer:sleep(Wait),
160  
-    Wait2 = erlang:min(Wait * 2, ?MAX_WAIT),
161  
-    send_req(HttpDb#httpdb{retries = Retries - 1, wait = Wait2}, Params, Cb).
  133
+        [Method, Url, HttpDb#httpdb.wait / 1000, error_cause(Error)]),
  134
+    ok = timer:sleep(HttpDb#httpdb.wait),
  135
+    HttpDb2 = HttpDb#httpdb{
  136
+        retries = Retries - 1,
  137
+        wait = erlang:min(HttpDb#httpdb.wait * 2, ?MAX_WAIT)
  138
+    },
  139
+    send_req(HttpDb2, Params, Callback).
162 140
 
163 141
 
164  
-report_error(Worker, HttpDb, Params, Error) ->
165  
-    Method = string:to_upper(atom_to_list(get_value(method, Params, get))),
  142
+report_error(HttpDb, Params, Error) ->
  143
+    Method = get_value(method, Params, "GET"),
166 144
     Url = couch_util:url_strip_password(full_url(HttpDb, Params)),
167 145
     do_report_error(Url, Method, Error),
168  
-    release_worker(Worker, HttpDb),
169 146
     exit({http_request_failed, Method, Url, Error}).
170 147
 
171 148
 
@@ -184,23 +161,48 @@ error_cause(Cause) ->
184 161
     lists:flatten(io_lib:format("~p", [Cause])).
185 162
 
186 163
 
187  
-stream_data_self(#httpdb{timeout = T} = HttpDb, Params, Worker, ReqId, Cb) ->
188  
-    receive
189  
-    {ibrowse_async_response, ReqId, {error, Error}} ->
190  
-        throw({maybe_retry_req, Error});
191  
-    {ibrowse_async_response, ReqId, <<>>} ->
192  
-        ibrowse:stream_next(ReqId),
193  
-        stream_data_self(HttpDb, Params, Worker, ReqId, Cb);
194  
-    {ibrowse_async_response, ReqId, Data} ->
195  
-        ibrowse:stream_next(ReqId),
196  
-        {Data, fun() -> stream_data_self(HttpDb, Params, Worker, ReqId, Cb) end};
197  
-    {ibrowse_async_response_end, ReqId} ->
198  
-        {<<>>, fun() -> throw({maybe_retry_req, more_data_expected}) end}
199  
-    after T + 500 ->
200  
-        % Note: ibrowse should always reply with timeouts, but this doesn't
201  
-        % seem to be always true when there's a very high rate of requests
202  
-        % and many open connections.
203  
-        throw({maybe_retry_req, timeout})
  164
+stream_data_self(#httpdb{timeout = T} = HttpDb, Params, Pid, Callback) ->
  165
+    try
  166
+        case lhttpc:get_body_part(Pid, T) of
  167
+        {ok, {http_eob, _Trailers}} ->
  168
+            {<<>>, fun() -> throw({maybe_retry_req, more_data_expected}) end};
  169
+        {ok, Data} ->
  170
+            {Data, fun() -> stream_data_self(HttpDb, Params, Pid, Callback) end};
  171
+        Error ->
  172
+            throw({maybe_retry_req, Error})
  173
+        end
  174
+    catch exit:ExitReason ->
  175
+        throw({maybe_retry_req, ExitReason})
  176
+    end.
  177
+
  178
+
  179
+make_upload_fun(UploadState, #httpdb{timeout = Timeout} = HttpDb) ->
  180
+    fun(eof) ->
  181
+        try
  182
+            case lhttpc:send_body_part(UploadState, http_eob, Timeout) of
  183
+            {ok, {{Code, _}, Headers, Body}} when ?NOT_HTTP_ERROR(Code) ->
  184
+                {ok, Code, Headers, decode_body(Body)};
  185
+            {ok, {{Code, _}, Headers, _Body}} when ?IS_HTTP_REDIRECT(Code) ->
  186
+                throw({redirect_req, Code, Headers});
  187
+            {ok, {{Code, _}, _Headers, _Body}} ->
  188
+                throw({maybe_retry_req, {code, Code}});
  189
+            Error ->
  190
+                throw({maybe_retry_req, Error})
  191
+            end
  192
+        catch exit:ExitReason ->
  193
+            throw({maybe_retry_req, ExitReason})
  194
+        end;
  195
+    (BodyPart) ->
  196
+        try
  197
+            case lhttpc:send_body_part(UploadState, BodyPart, Timeout) of
  198
+            {ok, UploadState2} ->
  199
+                {ok, make_upload_fun(UploadState2, HttpDb)};
  200
+            Error ->
  201
+                throw({maybe_retry_req, Error})
  202
+            end
  203
+        catch exit:ExitReason ->
  204
+            throw({maybe_retry_req, ExitReason})
  205
+        end
204 206
     end.
205 207
 
206 208
 
@@ -226,12 +228,7 @@ oauth_header(#httpdb{url = BaseUrl, oauth = OAuth}, ConnParams) ->
226 228
         OAuth#oauth.consumer_secret,
227 229
         OAuth#oauth.signature_method
228 230
     },
229  
-    Method = case get_value(method, ConnParams, get) of
230  
-    get -> "GET";
231  
-    post -> "POST";
232  
-    put -> "PUT";
233  
-    head -> "HEAD"
234  
-    end,
  231
+    Method = get_value(method, ConnParams, "GET"),
235 232
     QSL = get_value(qs, ConnParams, []),
236 233
     OAuthParams = oauth:signed_params(Method,
237 234
         BaseUrl ++ get_value(path, ConnParams, []),
@@ -240,8 +237,7 @@ oauth_header(#httpdb{url = BaseUrl, oauth = OAuth}, ConnParams) ->
240 237
         "OAuth " ++ oauth_uri:params_to_header_string(OAuthParams)}].
241 238
 
242 239
 
243  
-do_redirect(Worker, Code, Headers, #httpdb{url = Url} = HttpDb, Params, Cb) ->
244  
-    release_worker(Worker, HttpDb),
  240
+do_redirect(Code, Headers, #httpdb{url = Url} = HttpDb, Params, Cb) ->
245 241
     RedirectUrl = redirect_url(Headers, Url),
246 242
     {HttpDb2, Params2} = after_redirect(RedirectUrl, Code, HttpDb, Params),
247 243
     send_req(HttpDb2, Params2, Cb).
@@ -250,37 +246,62 @@ do_redirect(Worker, Code, Headers, #httpdb{url = Url} = HttpDb, Params, Cb) ->
250 246
 redirect_url(RespHeaders, OrigUrl) ->
251 247
     MochiHeaders = mochiweb_headers:make(RespHeaders),
252 248
     RedUrl = mochiweb_headers:get_value("Location", MochiHeaders),
253  
-    #url{
  249
+    #lhttpc_url{
254 250
         host = Host,
255  
-        host_type = HostType,
256 251
         port = Port,
257 252
         path = Path,  % includes query string
258  
-        protocol = Proto
259  
-    } = ibrowse_lib:parse_url(RedUrl),
260  
-    #url{
261  
-        username = User,
  253
+        is_ssl = IsSsl
  254
+    } = lhttpc_lib:parse_url(RedUrl),
  255
+    #lhttpc_url{
  256
+        user = User,
262 257
         password = Passwd
263  
-    } = ibrowse_lib:parse_url(OrigUrl),
264  
-    Creds = case is_list(User) andalso is_list(Passwd) of
265  
-    true ->
  258
+    } = lhttpc_lib:parse_url(OrigUrl),
  259
+    Creds = case User of
  260
+    [] ->
  261
+        [];
  262
+    _ when Passwd =/= [] ->
266 263
         User ++ ":" ++ Passwd ++ "@";
267  
-    false ->
268  
-        []
  264
+    _ ->
  265
+        User ++ "@"
269 266
     end,
270  
-    HostPart = case HostType of
271  
-    ipv6_address ->
  267
+    HostPart = case is_ipv6_literal(Host) of
  268
+    true ->
  269
+        % IPv6 address literals are enclosed by square brackets (RFC2732)
272 270
         "[" ++ Host ++ "]";
273  
-    _ ->
  271
+    false ->
274 272
         Host
275 273
     end,
276  
-    atom_to_list(Proto) ++ "://" ++ Creds ++ HostPart ++ ":" ++
277  
-        integer_to_list(Port) ++ Path.
  274
+    Proto = case IsSsl of
  275
+    true ->
  276
+        "https://";
  277
+    false ->
  278
+        "http://"
  279
+    end,
  280
+    Proto ++ Creds ++ HostPart ++ ":" ++ integer_to_list(Port) ++ Path.
  281
+
  282
+is_ipv6_literal(Host) ->
  283
+    case inet_parse:address(Host) of
  284
+    {ok, {_, _, _, _, _, _, _, _}} ->
  285
+        true;
  286
+    _ ->
  287
+        false
  288
+    end.
  289
+
278 290
 
279 291
 after_redirect(RedirectUrl, 303, HttpDb, Params) ->
280  
-    after_redirect(RedirectUrl, HttpDb, ?replace(Params, method, get));
  292
+    after_redirect(RedirectUrl, HttpDb, ?replace(Params, method, "GET"));
281 293
 after_redirect(RedirectUrl, _Code, HttpDb, Params) ->
282 294
     after_redirect(RedirectUrl, HttpDb, Params).
283 295
 
284 296
 after_redirect(RedirectUrl, HttpDb, Params) ->
285 297
     Params2 = lists:keydelete(path, 1, lists:keydelete(qs, 1, Params)),
286 298
     {HttpDb#httpdb{url = RedirectUrl}, Params2}.
  299
+
  300
+
  301
+decode_body(<<>>) ->
  302
+    null;
  303
+decode_body(undefined) ->
  304
+    % HEAD request response body
  305
+    null;
  306
+decode_body(Body) ->
  307
+    ?JSON_DECODE(Body).
138  src/couchdb/couch_httpc_pool.erl
... ...
@@ -1,138 +0,0 @@
1  
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
2  
-% use this file except in compliance with the License. You may obtain a copy of
3  
-% the License at
4  
-%
5  
-%   http://www.apache.org/licenses/LICENSE-2.0
6  
-%
7  
-% Unless required by applicable law or agreed to in writing, software
8  
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9  
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10  
-% License for the specific language governing permissions and limitations under
11  
-% the License.
12  
-
13  
--module(couch_httpc_pool).
14  
--behaviour(gen_server).
15  
-
16  
-% public API
17  
--export([start_link/2, stop/1]).
18  
--export([get_worker/1, release_worker/2]).
19  
-
20  
-% gen_server API
21  
--export([init/1, handle_call/3, handle_info/2, handle_cast/2]).
22  
--export([code_change/3, terminate/2]).
23  
-
24  
--include("couch_db.hrl").
25  
-
26  
--import(couch_util, [
27  
-    get_value/2,
28  
-    get_value/3
29  
-]).
30  
-
31  
--record(state, {
32  
-    url,
33  
-    limit,                  % max # of workers allowed
34  
-    free = [],              % free workers (connections)
35  
-    busy = [],              % busy workers (connections)
36  
-    waiting = queue:new()   % blocked clients waiting for a worker
37  
-}).
38  
-
39  
-
40  
-start_link(Url, Options) ->
41  
-    gen_server:start_link(?MODULE, {Url, Options}, []).
42  
-
43  
-
44  
-stop(Pool) ->
45  
-    ok = gen_server:call(Pool, stop, infinity).
46  
-
47  
-
48  
-get_worker(Pool) ->
49  
-    {ok, _Worker} = gen_server:call(Pool, get_worker, infinity).
50  
-
51  
-
52  
-release_worker(Pool, Worker) ->
53  
-    ok = gen_server:cast(Pool, {release_worker, Worker}).
54  
-
55  
-
56  
-init({Url, Options}) ->
57  
-    process_flag(trap_exit, true),
58  
-    State = #state{
59  
-        url = Url,
60  
-        limit = get_value(max_connections, Options)
61  
-    },
62  
-    {ok, State}.
63  
-
64  
-
65  
-handle_call(get_worker, From, #state{waiting = Waiting} = State) ->
66  
-    #state{url = Url, limit = Limit, busy = Busy, free = Free} = State,
67  
-    case length(Busy) >= Limit of
68  
-    true ->
69  
-        {noreply, State#state{waiting = queue:in(From, Waiting)}};
70  
-    false ->
71  
-        case Free of
72  
-        [] ->
73  
-           {ok, Worker} = ibrowse:spawn_link_worker_process(Url),
74  
-           Free2 = Free;
75  
-        [Worker | Free2] ->
76  
-           ok
77  
-        end,
78  
-        NewState = State#state{free = Free2, busy = [Worker | Busy]},
79  
-        {reply, {ok, Worker}, NewState}
80  
-    end;
81  
-
82  
-handle_call(stop, _From, State) ->
83  
-    {stop, normal, ok, State}.
84  
-
85  
-
86  
-handle_cast({release_worker, Worker}, #state{waiting = Waiting} = State) ->
87  
-    case is_process_alive(Worker) andalso
88  
-        lists:member(Worker, State#state.busy) of
89  
-    true ->
90  
-        case queue:out(Waiting) of
91  
-        {empty, Waiting2} ->
92  
-            Busy2 = State#state.busy -- [Worker],
93  
-            Free2 = [Worker | State#state.free];
94  
-        {{value, From}, Waiting2} ->
95  
-            gen_server:reply(From, {ok, Worker}),
96  
-            Busy2 = State#state.busy,
97  
-            Free2 = State#state.free
98  
-        end,
99  
-        NewState = State#state{
100  
-           busy = Busy2,
101  
-           free = Free2,
102  
-           waiting = Waiting2
103  
-        },
104  
-        {noreply, NewState};
105  
-   false ->
106  
-        {noreply, State}
107  
-   end.
108  
-
109  
-
110  
-handle_info({'EXIT', Pid, _Reason}, #state{busy = Busy, free = Free} = State) ->
111  
-    case Free -- [Pid] of
112  
-    Free ->
113  
-        case Busy -- [Pid] of
114  
-        Busy ->
115  
-            {noreply, State};
116  
-        Busy2 ->
117  
-            case queue:out(State#state.waiting) of
118  
-            {empty, _} ->
119  
-                {noreply, State#state{busy = Busy2}};
120  
-            {{value, From}, Waiting2} ->
121  
-                {ok, Worker} = ibrowse:spawn_link_worker_process(State#state.url),
122  
-                gen_server:reply(From, {ok, Worker}),
123  
-                {noreply, State#state{busy = [Worker | Busy2], waiting = Waiting2}}
124  
-            end
125  
-        end;
126  
-    Free2 ->
127  
-        {noreply, State#state{free = Free2}}
128  
-    end.
129  
-
130  
-
131  
-code_change(_OldVsn, State, _Extra) ->
132  
-    {ok, State}.
133  
-
134  
-
135  
-terminate(_Reason, State) ->
136  
-    lists:foreach(fun ibrowse_http_client:stop/1, State#state.free),
137  
-    lists:foreach(fun ibrowse_http_client:stop/1, State#state.busy).
138  
-
46  src/couchdb/couch_replicator_utils.erl
@@ -22,7 +22,7 @@
22 22
 -include("couch_db.hrl").
23 23
 -include("couch_api_wrap.hrl").
24 24
 -include("couch_replicator.hrl").
25  
--include("../ibrowse/ibrowse.hrl").
  25
+-include("../lhttpc/lhttpc.hrl").
26 26
 
27 27
 -import(couch_util, [
28 28
     get_value/2,
@@ -186,14 +186,28 @@ parse_rep_db({Props}, ProxyParams, Options) ->
186 186
                 end
187 187
         }
188 188
     end,
  189
+    SslParams = ssl_params(Url),
  190
+    ProxyParams2 = case SslParams of
  191
+    [] ->
  192
+        ProxyParams;
  193
+    _ when ProxyParams =/= [] ->
  194
+        [{proxy_ssl_options, SslParams} | ProxyParams];
  195
+   _ ->
  196
+        ProxyParams
  197
+    end,
  198
+    ConnectOpts = get_value(socket_options, Options) ++ ProxyParams2 ++ SslParams,
  199
+    Timeout = get_value(connection_timeout, Options),
  200
+    LhttpcOpts = lists:keysort(1, [
  201
+        {connect_options, ConnectOpts},
  202
+        {connect_timeout, Timeout},
  203
+        {send_retry, 0}
  204
+    ]),
189 205
     #httpdb{
190 206
         url = Url,
191 207
         oauth = OAuth,
192 208
         headers = lists:ukeymerge(1, Headers, DefaultHeaders),
193  
-        ibrowse_options = lists:keysort(1,
194  
-            [{socket_options, get_value(socket_options, Options)} |
195  
-                ProxyParams ++ ssl_params(Url)]),
196  
-        timeout = get_value(connection_timeout, Options),
  209
+        lhttpc_options = LhttpcOpts,
  210
+        timeout = Timeout,
197 211
         http_connections = get_value(http_connections, Options),
198 212
         retries = get_value(retries, Options)
199 213
     };
@@ -281,31 +295,19 @@ parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) ->
281 295
 parse_proxy_params([]) ->
282 296
     [];
283 297
 parse_proxy_params(ProxyUrl) ->
284  
-    #url{
285  
-        host = Host,
286  
-        port = Port,
287  
-        username = User,
288  
-        password = Passwd
289  
-    } = ibrowse_lib:parse_url(ProxyUrl),
290  
-    [{proxy_host, Host}, {proxy_port, Port}] ++
291  
-        case is_list(User) andalso is_list(Passwd) of
292  
-        false ->
293  
-            [];
294  
-        true ->
295  
-            [{proxy_user, User}, {proxy_password, Passwd}]
296  
-        end.
  298
+    [{proxy, ProxyUrl}].
297 299
 
298 300
 
299 301
 ssl_params(Url) ->
300  
-    case ibrowse_lib:parse_url(Url) of
301  
-    #url{protocol = https} ->
  302
+    case lhttpc_lib:parse_url(Url) of
  303
+    #lhttpc_url{is_ssl = true} ->
302 304
         Depth = list_to_integer(
303 305
             couch_config:get("replicator", "ssl_certificate_max_depth", "3")
304 306
         ),
305 307
         VerifyCerts = couch_config:get("replicator", "verify_ssl_certificates"),
306 308
         SslOpts = [{depth, Depth} | ssl_verify_options(VerifyCerts =:= "true")],
307  
-        [{is_ssl, true}, {ssl_options, SslOpts}];
308  
-    #url{protocol = http} ->
  309
+        [{is_ssl, true}, {ssl_options, SslOpts}, {proxy_ssl_options, SslOpts}];
  310
+    _ ->
309 311
         []
310 312
     end.
311 313
 
252  test/etap/230-httpc-pool.t
... ...
@@ -1,252 +0,0 @@
1  
-#!/usr/bin/env escript
2  
-%% -*- erlang -*-
3  
-%%! -smp enable
4  
-
5  
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
6  
-% use this file except in compliance with the License. You may obtain a copy of
7  
-% the License at
8  
-%
9  
-%   http://www.apache.org/licenses/LICENSE-2.0
10  
-%
11  
-% Unless required by applicable law or agreed to in writing, software
12  
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13  
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14  
-% License for the specific language governing permissions and limitations under
15  
-% the License.
16  
-
17  
-main(_) ->
18  
-    test_util:init_code_path(),
19  
-
20  
-    etap:plan(55),
21  
-    case (catch test()) of
22  
-        ok ->
23  
-            etap:end_tests();
24  
-        Other ->
25  
-            etap:diag(io_lib:format("Test died abnormally: ~p", [Other])),
26  
-            etap:bail(Other)
27  
-    end,
28  
-    ok.
29  
-
30  
-
31  
-test() ->
32  
-    couch_server_sup:start_link(test_util:config_files()),
33  
-    ibrowse:start(),
34  
-
35  
-    test_pool_full(),
36  
-    test_worker_dead_pool_non_full(),
37  
-    test_worker_dead_pool_full(),
38  
-
39  
-    couch_server_sup:stop(),
40  
-    ok.
41  
-
42  
-
43  
-test_pool_full() ->
44  
-    Pool = spawn_pool(),
45  
-    Client1 = spawn_client(Pool),
46  
-    Client2 = spawn_client(Pool),
47  
-    Client3 = spawn_client(Pool),
48  
-
49  
-    etap:diag("Check that we can spawn the max number of connections."),
50  
-    etap:is(ping_client(Client1), ok, "Client 1 started ok."),
51  
-    etap:is(ping_client(Client2), ok, "Client 2 started ok."),
52  
-    etap:is(ping_client(Client3), ok, "Client 3 started ok."),
53  
-
54  
-    Worker1 = get_client_worker(Client1, "1"),
55  
-    Worker2 = get_client_worker(Client2, "2"),
56  
-    Worker3 = get_client_worker(Client3, "3"),
57  
-    etap:is(is_process_alive(Worker1), true, "Client's 1 worker is alive."),
58  
-    etap:is(is_process_alive(Worker2), true, "Client's 2 worker is alive."),
59  
-    etap:is(is_process_alive(Worker3), true, "Client's 3 worker is alive."),
60  
-
61  
-    etap:isnt(Worker1, Worker2, "Clients 1 and 2 got different workers."),
62  
-    etap:isnt(Worker2, Worker3, "Clients 2 and 3 got different workers."),
63  
-    etap:isnt(Worker1, Worker3, "Clients 1 and 3 got different workers."),
64  
-
65  
-    etap:diag("Check that client 4 blocks waiting for a worker."),
66  
-    Client4 = spawn_client(Pool),
67  
-    etap:is(ping_client(Client4), timeout, "Client 4 blocked while waiting."),
68  
-
69  
-    etap:diag("Check that stopping a client gives up its worker."),
70  
-    etap:is(stop_client(Client1), ok, "First client stopped."),
71  
-
72  
-    etap:diag("And check that our blocked client has been unblocked."),
73  
-    etap:is(ping_client(Client4), ok, "Client 4 was unblocked."),
74  
-
75  
-    Worker4 = get_client_worker(Client4, "4"),
76  
-    etap:is(is_process_alive(Worker4), true, "Client's 4 worker is alive."),
77  
-    etap:is(Worker4, Worker1, "Client 4 got worker that client 1 got before."),
78  
-
79  
-    lists:foreach(fun(C) -> ok = stop_client(C) end, [Client2, Client3, Client4]),
80  
-    stop_pool(Pool).
81  
-
82  
-
83  
-test_worker_dead_pool_non_full() ->
84  
-    Pool = spawn_pool(),
85  
-    Client1 = spawn_client(Pool),
86  
-
87  
-    etap:is(ping_client(Client1), ok, "Client 1 started ok."),
88  
-    Worker1 = get_client_worker(Client1, "1"),
89  
-    etap:is(is_process_alive(Worker1), true, "Client's 1 worker is alive."),
90  
-
91  
-    etap:diag("Kill client's 1 worker."),
92  
-    etap:is(kill_client_worker(Client1), ok, "Killed client's 1 worker."),
93  
-    etap:is(is_process_alive(Worker1), false, "Client's 1 worker process is dead."),
94  
-
95  
-    etap:is(stop_client(Client1), ok, "First client stopped and released its worker."),
96  
-
97  
-    Client2 = spawn_client(Pool),
98  
-    etap:is(ping_client(Client2), ok, "Client 2 started ok."),
99  
-    Worker2 = get_client_worker(Client2, "2"),
100  
-    etap:isnt(Worker2, Worker1, "Client 2 got a different worker from client 1"),
101  
-    etap:is(is_process_alive(Worker2), true, "Client's 2 worker is alive."),
102  
-
103  
-    etap:is(stop_client(Client2), ok, "Second client stopped."),
104  
-    stop_pool(Pool).
105  
-
106  
-
107  
-test_worker_dead_pool_full() ->
108  
-    Pool = spawn_pool(),
109  
-    Client1 = spawn_client(Pool),
110  
-    Client2 = spawn_client(Pool),
111  
-    Client3 = spawn_client(Pool),
112  
-
113  
-    etap:diag("Check that we can spawn the max number of connections."),
114  
-    etap:is(ping_client(Client1), ok, "Client 1 started ok."),
115  
-    etap:is(ping_client(Client2), ok, "Client 2 started ok."),
116  
-    etap:is(ping_client(Client3), ok, "Client 3 started ok."),
117  
-
118  
-    Worker1 = get_client_worker(Client1, "1"),
119  
-    Worker2 = get_client_worker(Client2, "2"),
120  
-    Worker3 = get_client_worker(Client3, "3"),
121  
-    etap:is(is_process_alive(Worker1), true, "Client's 1 worker is alive."),
122  
-    etap:is(is_process_alive(Worker2), true, "Client's 2 worker is alive."),
123  
-    etap:is(is_process_alive(Worker3), true, "Client's 3 worker is alive."),
124  
-
125  
-    etap:isnt(Worker1, Worker2, "Clients 1 and 2 got different workers."),
126  
-    etap:isnt(Worker2, Worker3, "Clients 2 and 3 got different workers."),
127  
-    etap:isnt(Worker1, Worker3, "Clients 1 and 3 got different workers."),
128  
-
129  
-    etap:diag("Check that client 4 blocks waiting for a worker."),
130  
-    Client4 = spawn_client(Pool),
131  
-    etap:is(ping_client(Client4), timeout, "Client 4 blocked while waiting."),
132  
-
133  
-    etap:diag("Kill client's 1 worker."),
134  
-    etap:is(kill_client_worker(Client1), ok, "Killed client's 1 worker."),
135  
-    etap:is(is_process_alive(Worker1), false, "Client's 1 worker process is dead."),
136  
-
137  
-    etap:diag("Check client 4 got unblocked after first worker's death"),
138  
-    etap:is(ping_client(Client4), ok, "Client 4 not blocked anymore."),
139  
-
140  
-    Worker4 = get_client_worker(Client4, "4"),
141  
-    etap:is(is_process_alive(Worker4), true, "Client's 4 worker is alive."),
142  
-    etap:isnt(Worker4, Worker1, "Client 4 got a worker different from client 1."),
143  
-    etap:isnt(Worker4, Worker2, "Client 4 got a worker different from client 2."),
144  
-    etap:isnt(Worker4, Worker3, "Client 4 got a worker different from client 3."),
145  
-
146  
-    etap:diag("Check that stopping client 1 is a noop."),
147  
-    etap:is(stop_client(Client1), ok, "First client stopped."),
148  
-
149  
-    etap:is(is_process_alive(Worker2), true, "Client's 2 worker still alive."),
150  
-    etap:is(is_process_alive(Worker3), true, "Client's 3 worker still alive."),
151  
-    etap:is(is_process_alive(Worker4), true, "Client's 4 worker still alive."),
152  
-
153  
-    etap:diag("Check that client 5 blocks waiting for a worker."),
154  
-    Client5 = spawn_client(Pool),
155  
-    etap:is(ping_client(Client5), timeout, "Client 5 blocked while waiting."),
156  
-
157  
-    etap:diag("Check that stopping client 2 gives up its worker."),
158  
-    etap:is(stop_client(Client2), ok, "Second client stopped."),
159  
-
160  
-    etap:diag("Now check that client 5 has been unblocked."),
161  
-    etap:is(ping_client(Client5), ok, "Client 5 was unblocked."),
162  
-
163  
-    Worker5 = get_client_worker(Client5, "5"),
164  
-    etap:is(is_process_alive(Worker5), true, "Client's 5 worker is alive."),
165  
-    etap:isnt(Worker5, Worker1, "Client 5 got a worker different from client 1."),
166  
-    etap:is(Worker5, Worker2, "Client 5 got same worker as client 2."),
167  
-    etap:isnt(Worker5, Worker3, "Client 5 got a worker different from client 3."),
168  
-    etap:isnt(Worker5, Worker4, "Client 5 got a worker different from client 4."),
169  
-
170  
-    etap:is(is_process_alive(Worker3), true, "Client's 3 worker still alive."),
171  
-    etap:is(is_process_alive(Worker4), true, "Client's 4 worker still alive."),
172  
-    etap:is(is_process_alive(Worker5), true, "Client's 5 worker still alive."),
173  
-
174  
-    lists:foreach(fun(C) -> ok = stop_client(C) end, [Client3, Client4, Client5]),
175  
-    stop_pool(Pool).
176  
-
177  
-
178  
-spawn_client(Pool) ->
179  
-    Parent = self(),
180  
-    Ref = make_ref(),
181  
-    Pid = spawn(fun() ->
182  
-        {ok, Worker} = couch_httpc_pool:get_worker(Pool),
183  
-        loop(Parent, Ref, Worker, Pool)
184  
-    end),
185  
-    {Pid, Ref}.
186  
-
187  
-
188  
-ping_client({Pid, Ref}) ->
189  
-    Pid ! ping,
190  
-    receive
191  
-        {pong, Ref} ->
192  
-            ok
193  
-    after 3000 ->
194  
-        timeout
195  
-    end.
196  
-
197  
-
198  
-get_client_worker({Pid, Ref}, ClientName) ->
199  
-    Pid ! get_worker,
200  
-    receive
201  
-        {worker, Ref, Worker} ->
202  
-            Worker
203  
-    after 3000 ->
204  
-        etap:bail("Timeout getting client " ++ ClientName ++ " worker.")
205  
-    end.
206  
-
207  
-
208  
-stop_client({Pid, Ref}) ->
209  
-    Pid ! stop,
210  
-    receive
211  
-        {stop, Ref} ->
212  
-            ok
213  
-    after 3000 ->
214  
-        timeout
215  
-    end.
216  
-
217  
-
218  
-kill_client_worker({Pid, Ref}) ->
219  
-    Pid ! get_worker,
220  
-    receive
221  
-        {worker, Ref, Worker} ->
222  
-            exit(Worker, kill),
223  
-            ok
224  
-    after 3000 ->
225  
-        timeout
226  
-    end.
227  
-
228  
-
229  
-loop(Parent, Ref, Worker, Pool) ->
230  
-    receive
231  
-        ping ->
232  
-            Parent ! {pong, Ref},
233  
-            loop(Parent, Ref, Worker, Pool);
234  
-        get_worker  ->
235  
-            Parent ! {worker, Ref, Worker},
236  
-            loop(Parent, Ref, Worker, Pool);
237  
-        stop ->
238  
-            couch_httpc_pool:release_worker(Pool, Worker),
239  
-            Parent ! {stop, Ref}
240  
-    end.
241  
-
242  
-
243  
-spawn_pool() ->
244  
-    Host = couch_config:get("httpd", "bind_address", "127.0.0.1"),
245  
-    Port = couch_config:get("httpd", "port", "5984"),
246  
-    {ok, Pool} = couch_httpc_pool:start_link(
247  
-        "http://" ++ Host ++ ":5984", [{max_connections, 3}]),
248  
-    Pool.
249  
-
250  
-
251  
-stop_pool(Pool) ->
252  
-    ok = couch_httpc_pool:stop(Pool).
3  test/etap/Makefile.am
@@ -69,5 +69,4 @@ EXTRA_DIST = \
69 69
     190-json-stream-parse.t \
70 70
     200-view-group-no-db-leaks.t \
71 71
     201-view-group-shutdown.t \
72  
-    220-compaction-daemon.t \
73  
-    230-httpc-pool.t
  72
+    220-compaction-daemon.t

0 notes on commit 7a63d76

Please sign in to comment.
Something went wrong with that request. Please try again.