/
HttpClientBase.cs
492 lines (444 loc) · 19.7 KB
/
HttpClientBase.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
using Lucene.Net.Diagnostics;
using Lucene.Net.Support;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Runtime.ExceptionServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Lucene.Net.Replicator.Http
{
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/// <summary>
/// Base class for Http clients.
/// </summary>
/// <remarks>
/// @lucene.experimental
/// </remarks>
public abstract class HttpClientBase : IDisposable
{
// LUCENENET specific - removed DEFAULT_CONNECTION_TIMEOUT because it is irrelevant in .NET
[Obsolete("Use DEFAULT_TIMEOUT instead. This extension method will be removed in 4.8.0 release candidate.")]
public const int DEFAULT_CONNECTION_TIMEOUT = 1000;
/// <summary>
/// Default request timeout for this client (100 seconds).
/// <see cref="Timeout"/>.
/// </summary>
public readonly static TimeSpan DEFAULT_TIMEOUT = TimeSpan.FromSeconds(100); // LUCENENET: This was DEFAULT_SO_TIMEOUT in Lucene, using .NET's default timeout value of 100 instead of 61 seconds
// TODO compression?
/// <summary>
/// The URL to execute requests against.
/// </summary>
protected string Url { get; private set; }
private volatile bool isDisposed = false;
private readonly HttpClient httpc;
/// <summary>
/// Creates a new <see cref="HttpClientBase"/> with the given host, port and path.
/// </summary>
/// <remarks>
/// The host, port and path parameters are normalized to <c>http://{host}:{port}{path}</c>,
/// if path is <c>null</c> or <c>empty</c> it defaults to <c>/</c>.
/// <para/>
/// A <see cref="HttpMessageHandler"/> is taken as an optional parameter as well, if this is not provided it defaults to <c>null</c>.
/// In this case the internal <see cref="HttpClient"/> will default to use a <see cref="HttpClientHandler"/>.
/// </remarks>
/// <param name="host">The host that the client should retrieve data from.</param>
/// <param name="port">The port to be used to connect on.</param>
/// <param name="path">The path to the replicator on the host.</param>
/// <param name="messageHandler">Optional, The HTTP handler stack to use for sending requests, defaults to <c>null</c>.</param>
protected HttpClientBase(string host, int port, string path, HttpMessageHandler messageHandler = null)
: this(NormalizedUrl(host, port, path), messageHandler)
{
}
/// <summary>
/// Creates a new <see cref="HttpClientBase"/> with the given <paramref name="url"/>.
/// </summary>
/// <remarks>
/// A <see cref="HttpMessageHandler"/> is taken as an optional parameter as well, if this is not provided it defaults to <c>null</c>.
/// In this case the internal <see cref="HttpClient"/> will default to use a <see cref="HttpClientHandler"/>.
/// </remarks>
/// <param name="url">The full url, including with host, port and path.</param>
/// <param name="messageHandler">Optional, The HTTP handler stack to use for sending requests.</param>
//Note: LUCENENET Specific
protected HttpClientBase(string url, HttpMessageHandler messageHandler = null)
: this(url, new HttpClient(messageHandler ?? new HttpClientHandler()) { Timeout = DEFAULT_TIMEOUT })
{
}
/// <summary>
/// Creates a new <see cref="HttpClientBase"/> with the given <paramref name="url"/> and <see cref="HttpClient"/>.
/// </summary>
/// <remarks>
/// This allows full controll over how the <see cref="HttpClient"/> is created,
/// prefer the <see cref="HttpClientBase(string, HttpMessageHandler)"/> over this unless you know you need the control of the <see cref="HttpClient"/>.
/// </remarks>
/// <param name="url"></param>
/// <param name="client">The <see cref="HttpClient"/> to use make remote http calls.</param>
//Note: LUCENENET Specific
protected HttpClientBase(string url, HttpClient client)
{
Url = url;
httpc = client;
Timeout = DEFAULT_TIMEOUT;
}
/// <summary>
/// Gets or Sets the connection timeout for this client, in milliseconds. This setting
/// is used to modify <see cref="HttpClient.Timeout"/>.
/// </summary>
public virtual TimeSpan Timeout
{
get => httpc.Timeout;
set => httpc.Timeout = value;
}
/// <summary>
/// Throws <see cref="ObjectDisposedException"/> if this client is already disposed.
/// </summary>
/// <exception cref="ObjectDisposedException">client is already disposed.</exception>
protected void EnsureOpen()
{
if (IsDisposed)
{
throw AlreadyClosedException.Create(this.GetType().FullName, "HttpClient already disposed.");
}
}
/// <summary>
/// Create a URL out of the given parameters, translate an empty/null path to '/'
/// </summary>
private static string NormalizedUrl(string host, int port, string path)
{
if (string.IsNullOrEmpty(path))
path = "/";
return string.Format("http://{0}:{1}{2}", host, port, path);
}
/// <summary>
/// <b>Internal:</b> Verifies the response status and if not successful throws an exception.
/// </summary>
/// <exception cref="IOException">IO Error happened at the server, check inner exception for details.</exception>
/// <exception cref="HttpRequestException">Unknown error received from the server.</exception>
protected virtual void VerifyStatus(HttpResponseMessage response)
{
if (!response.IsSuccessStatusCode)
{
try
{
ThrowKnownError(response);
}
finally
{
ConsumeQuietly(response);
}
}
}
/// <summary>
/// Throws an exception for any errors.
/// </summary>
/// <exception cref="IOException">IO Error happened at the server, check inner exception for details.</exception>
/// <exception cref="HttpRequestException">Unknown error received from the server.</exception>
protected virtual void ThrowKnownError(HttpResponseMessage response)
{
Stream input;
try
{
//.NET Note: Bridging from Async to Sync, this is not ideal and we could consider changing the interface to be Async or provide Async overloads
// and have these Sync methods with their caveats.
input = response.Content.ReadAsStreamAsync().ConfigureAwait(false).GetAwaiter().GetResult();
}
catch (Exception t) when (t.IsThrowable())
{
// the response stream is not an exception - could be an error in servlet.init().
// LUCENENET: Check status code to see if we had an HTTP error
try
{
response.EnsureSuccessStatusCode();
}
catch (HttpRequestException e)
{
throw RuntimeException.Create(e);
}
throw RuntimeException.Create("Unknown error: ", t);
}
Exception exception;
try
{
TextReader reader = new StreamReader(input);
JsonSerializer serializer = JsonSerializer.Create(ReplicationService.JSON_SERIALIZER_SETTINGS);
exception = (Exception)serializer.Deserialize(new JsonTextReader(reader));
}
catch (Exception th) when (th.IsThrowable())
{
//not likely
throw RuntimeException.Create(string.Format("Failed to read exception object: {0} {1}", response.StatusCode, response.ReasonPhrase), th);
}
finally
{
input.Dispose();
}
Util.IOUtils.ReThrow(exception);
}
/// <summary>
/// <b>Internal:</b> Execute a request and return its result.
/// The <paramref name="parameters"/> argument is treated as: name1,value1,name2,value2,...
/// </summary>
protected virtual HttpResponseMessage ExecutePost(string request, object entity, params string[] parameters)
{
EnsureOpen();
//.NET Note: No headers? No ContentType?... Bad use of Http?
HttpRequestMessage req = new HttpRequestMessage(HttpMethod.Post, QueryString(request, parameters));
req.Content = new StringContent(JToken.FromObject(entity, JsonSerializer.Create(ReplicationService.JSON_SERIALIZER_SETTINGS))
.ToString(Formatting.None), Encoding.UTF8, "application/json");
return Execute(req);
}
/// <summary>
/// <b>Internal:</b> Execute a request and return its result.
/// The <paramref name="parameters"/> argument is treated as: name1,value1,name2,value2,...
/// </summary>
protected virtual HttpResponseMessage ExecuteGet(string request, params string[] parameters)
{
EnsureOpen();
//Note: No headers? No ContentType?... Bad use of Http?
HttpRequestMessage req = new HttpRequestMessage(HttpMethod.Get, QueryString(request, parameters));
return Execute(req);
}
private HttpResponseMessage Execute(HttpRequestMessage request)
{
//.NET Note: Bridging from Async to Sync, this is not ideal and we could consider changing the interface to be Async or provide Async overloads
// and have these Sync methods with their caveats.
HttpResponseMessage response = httpc.SendAsync(request, HttpCompletionOption.ResponseHeadersRead).ConfigureAwait(false).GetAwaiter().GetResult();
VerifyStatus(response);
return response;
}
private string QueryString(string request, params string[] parameters)
{
return parameters is null
? string.Format("{0}/{1}", Url, request)
: string.Format("{0}/{1}?{2}", Url, request, string
.Join("&", parameters.Select(WebUtility.UrlEncode).InPairs((key, val) => string.Format("{0}={1}", key, val))));
}
/// <summary>
/// Internal utility: input stream of the provided response.
/// </summary>
/// <exception cref="IOException"></exception>
[Obsolete("Use GetResponseStream(HttpResponseMessage) instead. This extension method will be removed in 4.8.0 release candidate.")]
public virtual Stream ResponseInputStream(HttpResponseMessage response)
{
return GetResponseStream(response, false);
}
// TODO: can we simplify this Consuming !?!?!?
/// <summary>
/// Internal utility: input stream of the provided response, which optionally
/// consumes the response's resources when the input stream is exhausted.
/// </summary>
/// <exception cref="IOException"></exception>
[Obsolete("Use GetResponseStream(HttpResponseMessage, bool) instead. This extension method will be removed in 4.8.0 release candidate.")]
public virtual Stream ResponseInputStream(HttpResponseMessage response, bool consume)
{
return GetResponseStream(response, consume);
}
/// <summary>
/// Internal utility: input stream of the provided response.
/// </summary>
/// <exception cref="IOException"></exception>
public virtual Stream GetResponseStream(HttpResponseMessage response) // LUCENENET: This was ResponseInputStream in Lucene
{
return GetResponseStream(response, false);
}
// TODO: can we simplify this Consuming !?!?!?
/// <summary>
/// Internal utility: input stream of the provided response, which optionally
/// consumes the response's resources when the input stream is exhausted.
/// </summary>
/// <exception cref="IOException"></exception>
public virtual Stream GetResponseStream(HttpResponseMessage response, bool consume) // LUCENENET: This was ResponseInputStream in Lucene
{
Stream result = response.Content.ReadAsStreamAsync().ConfigureAwait(false).GetAwaiter().GetResult();
if (consume)
result = new ConsumingStream(result);
return result;
}
/// <summary>
/// Returns <c>true</c> if this instance was <see cref="Dispose(bool)"/>ed, otherwise
/// returns <c>false</c>. Note that if you override <see cref="Dispose(bool)"/>, you must call
/// <see cref="Dispose(bool)"/> on the base class, in order for this instance to be properly disposed.
/// </summary>
public bool IsDisposed => isDisposed;
/// <summary>
/// Calls the overload <see cref="DoAction{T}(HttpResponseMessage, bool, Func{T})"/> passing <c>true</c> to consume.
/// </summary>
protected virtual T DoAction<T>(HttpResponseMessage response, Func<T> call)
{
return DoAction(response, true, call);
}
/// <summary>
/// Do a specific action and validate after the action that the status is still OK,
/// and if not, attempt to extract the actual server side exception. Optionally
/// release the response at exit, depending on <paramref name="consume"/> parameter.
/// </summary>
protected virtual T DoAction<T>(HttpResponseMessage response, bool consume, Func<T> call)
{
Exception th = null;
try
{
return call();
}
catch (Exception t) when (t.IsThrowable())
{
th = t;
}
finally
{
try
{
VerifyStatus(response);
}
finally
{
if (consume)
{
try
{
ConsumeQuietly(response);
}
finally
{
// ignoring on purpose
}
}
}
}
if (Debugging.AssertsEnabled) Debugging.Assert(th != null); // extra safety - if we get here, it means the Func<T> failed
Util.IOUtils.ReThrow(th);
return default; // silly, if we're here, IOUtils.reThrow always throws an exception
}
/// <summary>
/// Disposes this <see cref="HttpClientBase"/>.
/// When called with <code>true</code>, this disposes the underlying <see cref="HttpClient"/>.
/// </summary>
protected virtual void Dispose(bool disposing)
{
if (disposing && !isDisposed)
{
httpc.Dispose();
}
isDisposed = true;
}
/// <summary>
/// Disposes this <see cref="HttpClientBase"/>.
/// This disposes the underlying <see cref="HttpClient"/>.
/// </summary>
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
private static void ConsumeQuietly(HttpResponseMessage response)
{
try
{
response.Content?.Dispose(); // LUCENENET: Force a flush and and dispose the underlying stream
}
catch (Exception ioe) when (ioe.IsIOException())
{
// Ignore
}
}
/// <summary>
/// Wraps a stream and consumes (flushes) and disposes automatically
/// when the last call to a Read overload occurs.
/// </summary>
private class ConsumingStream : Stream
{
private readonly Stream input;
private bool consumed = false;
public ConsumingStream(Stream input)
{
this.input = input ?? throw new ArgumentNullException(nameof(input));
}
public override bool CanRead => input.CanRead;
public override bool CanSeek => input.CanSeek;
public override bool CanWrite => input.CanWrite;
public override long Length => input.Length;
public override long Position
{
get => input.Position;
set => input.Position = value;
}
public override void Flush() => input.Flush();
public override int ReadByte()
{
int res = input.ReadByte();
Consume(res);
return res;
}
public override int Read(byte[] buffer, int offset, int count)
{
int res = input.Read(buffer, offset, count);
Consume(res);
return res;
}
#if FEATURE_SPAN
public override int Read(Span<byte> buffer)
{
int res = input.Read(buffer);
Consume(res);
return res;
}
#endif
public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
int res = await input.ReadAsync(buffer, offset, count, cancellationToken);
Consume(res);
return res;
}
public override int EndRead(IAsyncResult asyncResult)
{
int res = input.EndRead(asyncResult);
Consume(res);
return res;
}
public override long Seek(long offset, SeekOrigin origin) => input.Seek(offset, origin);
public override void SetLength(long value) => input.SetLength(value);
public override void Write(byte[] buffer, int offset, int count) => Write(buffer, offset, count);
private void Consume(int zeroOrMinusOne)
{
if (!consumed && zeroOrMinusOne <= 0)
{
try
{
try
{
input.Flush();
}
finally
{
input.Dispose();
}
}
catch (Exception e) when (e.IsException())
{
// ignored on purpose
}
consumed = true;
}
}
}
}
}