Skip to content

Commit 0638e9f

Browse files
russcamMpdreamz
authored andcommitted
Quick sweep over all the TODOs that were quick to (#2488)
(cherry-picked from commit 43865b2)
1 parent c98b2df commit 0638e9f

File tree

14 files changed

+204
-232
lines changed

14 files changed

+204
-232
lines changed

build/scripts/scripts.fsproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@
6565
<None Include="Releasing.fsx" />
6666
<None Include="Versioning.fsx" />
6767
<None Include="Profiling.fsx" />
68-
<None Include="InheritDoc.fsx" />
68+
<None Include="XmlDocPatcher.fsx" />
6969
<None Include="Building.fsx" />
7070
<None Include="Signing.fsx" />
7171
<None Include="Testing.fsx" />
Lines changed: 191 additions & 195 deletions
Original file line numberDiff line numberDiff line change
@@ -1,199 +1,195 @@
1-
using System.Collections.Generic;
2-
using System.Threading.Tasks;
3-
using System.Threading;
4-
using System;
5-
6-
namespace Elasticsearch.Net
7-
{
8-
public class Transport<TConnectionSettings> : ITransport<TConnectionSettings>
9-
where TConnectionSettings : IConnectionConfigurationValues
10-
{
1+
using System.Collections.Generic;
2+
using System.Threading.Tasks;
3+
using System.Threading;
4+
using System;
5+
6+
namespace Elasticsearch.Net
7+
{
8+
public class Transport<TConnectionSettings> : ITransport<TConnectionSettings>
9+
where TConnectionSettings : IConnectionConfigurationValues
10+
{
1111
//TODO should all of these be public?
12-
public TConnectionSettings Settings { get; }
12+
public TConnectionSettings Settings { get; }
1313
public IDateTimeProvider DateTimeProvider { get; }
1414
public IMemoryStreamFactory MemoryStreamFactory { get; }
1515
public IRequestPipelineFactory PipelineProvider { get; }
16-
17-
/// <summary>
18-
/// Transport coordinates the client requests over the connection pool nodes and is in charge of falling over on different nodes
19-
/// </summary>
20-
/// <param name="configurationValues">The connectionsettings to use for this transport</param>
21-
public Transport(TConnectionSettings configurationValues)
22-
: this(configurationValues, null, null, null)
23-
{ }
24-
25-
/// <summary>
26-
/// Transport coordinates the client requests over the connection pool nodes and is in charge of falling over on different nodes
27-
/// </summary>
28-
/// <param name="configurationValues">The connectionsettings to use for this transport</param>
29-
/// <param name="pipelineProvider">In charge of create a new pipeline, safe to pass null to use the default</param>
30-
/// <param name="dateTimeProvider">The date time proved to use, safe to pass null to use the default</param>
31-
/// <param name="memoryStreamFactory">The memory stream provider to use, safe to pass null to use the default</param>
32-
public Transport(
33-
TConnectionSettings configurationValues,
34-
IRequestPipelineFactory pipelineProvider,
35-
IDateTimeProvider dateTimeProvider,
36-
IMemoryStreamFactory memoryStreamFactory
37-
)
38-
{
39-
configurationValues.ThrowIfNull(nameof(configurationValues));
40-
configurationValues.ConnectionPool.ThrowIfNull(nameof(configurationValues.ConnectionPool));
41-
configurationValues.Connection.ThrowIfNull(nameof(configurationValues.Connection));
42-
configurationValues.Serializer.ThrowIfNull(nameof(configurationValues.Serializer));
43-
44-
this.Settings = configurationValues;
45-
this.PipelineProvider = pipelineProvider ?? new RequestPipelineFactory();
46-
this.DateTimeProvider = dateTimeProvider ?? Net.DateTimeProvider.Default;
47-
this.MemoryStreamFactory = memoryStreamFactory ?? new MemoryStreamFactory();
48-
}
49-
50-
public ElasticsearchResponse<TReturn> Request<TReturn>(HttpMethod method, string path, PostData<object> data = null, IRequestParameters requestParameters = null)
51-
where TReturn : class
52-
{
53-
using (var pipeline = this.PipelineProvider.Create(this.Settings, this.DateTimeProvider, this.MemoryStreamFactory, requestParameters))
54-
{
55-
pipeline.FirstPoolUsage(this.Settings.BootstrapLock);
56-
57-
var requestData = new RequestData(method, path, data, this.Settings, requestParameters, this.MemoryStreamFactory);
58-
this.Settings.OnRequestDataCreated?.Invoke(requestData);
59-
ElasticsearchResponse<TReturn> response = null;
60-
61-
var seenExceptions = new List<PipelineException>();
62-
foreach (var node in pipeline.NextNode())
63-
{
64-
requestData.Node = node;
65-
try
66-
{
67-
pipeline.SniffOnStaleCluster();
68-
Ping(pipeline, node);
69-
response = pipeline.CallElasticsearch<TReturn>(requestData);
70-
if (!response.SuccessOrKnownError)
71-
{
72-
pipeline.MarkDead(node);
73-
pipeline.SniffOnConnectionFailure();
74-
}
75-
}
76-
catch (PipelineException pipelineException) when (!pipelineException.Recoverable)
77-
{
78-
pipeline.MarkDead(node);
79-
seenExceptions.Add(pipelineException);
80-
break;
81-
}
82-
catch (PipelineException pipelineException)
83-
{
84-
pipeline.MarkDead(node);
85-
seenExceptions.Add(pipelineException);
86-
}
87-
catch (Exception killerException)
88-
{
89-
throw new UnexpectedElasticsearchClientException(killerException, seenExceptions)
90-
{
91-
Request = requestData,
92-
Response = response,
93-
AuditTrail = pipeline?.AuditTrail
94-
};
95-
}
96-
if (response != null && response.SuccessOrKnownError)
97-
{
98-
pipeline.MarkAlive(node);
99-
break;
100-
}
101-
}
102-
if (response == null || !response.Success)
103-
pipeline.BadResponse(ref response, requestData, seenExceptions);
104-
105-
this.Settings.OnRequestCompleted?.Invoke(response);
106-
107-
return response;
108-
}
109-
}
110-
111-
public async Task<ElasticsearchResponse<TReturn>> RequestAsync<TReturn>(HttpMethod method, string path, PostData<object> data = null, IRequestParameters requestParameters = null)
112-
where TReturn : class
113-
{
114-
using (var pipeline = this.PipelineProvider.Create(this.Settings, this.DateTimeProvider, this.MemoryStreamFactory, requestParameters))
115-
{
116-
await pipeline.FirstPoolUsageAsync(this.Settings.BootstrapLock).ConfigureAwait(false);
117-
118-
var requestData = new RequestData(method, path, data, this.Settings, requestParameters, this.MemoryStreamFactory);
119-
this.Settings.OnRequestDataCreated?.Invoke(requestData);
120-
ElasticsearchResponse<TReturn> response = null;
121-
122-
var seenExceptions = new List<PipelineException>();
123-
foreach (var node in pipeline.NextNode())
124-
{
125-
requestData.Node = node;
126-
try
127-
{
128-
await pipeline.SniffOnStaleClusterAsync().ConfigureAwait(false);
129-
await PingAsync(pipeline, node).ConfigureAwait(false);
130-
response = await pipeline.CallElasticsearchAsync<TReturn>(requestData).ConfigureAwait(false);
131-
if (!response.SuccessOrKnownError)
132-
{
133-
pipeline.MarkDead(node);
134-
await pipeline.SniffOnConnectionFailureAsync().ConfigureAwait(false);
135-
}
136-
}
137-
catch (PipelineException pipelineException) when (!pipelineException.Recoverable)
138-
{
139-
pipeline.MarkDead(node);
140-
seenExceptions.Add(pipelineException);
141-
break;
142-
}
143-
catch (PipelineException pipelineException)
144-
{
145-
pipeline.MarkDead(node);
146-
seenExceptions.Add(pipelineException);
147-
}
148-
catch (Exception killerException)
149-
{
150-
throw new UnexpectedElasticsearchClientException(killerException, seenExceptions)
151-
{
152-
Request = requestData,
153-
Response = response,
154-
AuditTrail = pipeline.AuditTrail
155-
};
156-
}
157-
if (response != null && response.SuccessOrKnownError)
158-
{
159-
pipeline.MarkAlive(node);
160-
break;
161-
}
162-
}
163-
if (response == null || !response.Success)
164-
pipeline.BadResponse(ref response, requestData, seenExceptions);
165-
166-
this.Settings.OnRequestCompleted?.Invoke(response);
167-
168-
return response;
169-
}
170-
}
171-
172-
private static void Ping(IRequestPipeline pipeline, Node node)
173-
{
174-
try
175-
{
176-
pipeline.Ping(node);
177-
}
178-
catch (PipelineException e) when (e.Recoverable)
179-
{
180-
pipeline.SniffOnConnectionFailure();
181-
throw;
182-
}
183-
}
184-
185-
private static async Task PingAsync(IRequestPipeline pipeline, Node node)
186-
{
187-
try
188-
{
189-
await pipeline.PingAsync(node).ConfigureAwait(false);
190-
}
191-
catch (PipelineException e) when (e.Recoverable)
192-
{
193-
await pipeline.SniffOnConnectionFailureAsync().ConfigureAwait(false);
194-
throw;
195-
}
196-
}
197-
198-
}
199-
}
16+
17+
/// <summary>
18+
/// Transport coordinates the client requests over the connection pool nodes and is in charge of falling over on different nodes
19+
/// </summary>
20+
/// <param name="configurationValues">The connectionsettings to use for this transport</param>
21+
public Transport(TConnectionSettings configurationValues)
22+
: this(configurationValues, null, null, null)
23+
{ }
24+
25+
/// <summary>
26+
/// Transport coordinates the client requests over the connection pool nodes and is in charge of falling over on different nodes
27+
/// </summary>
28+
/// <param name="configurationValues">The connectionsettings to use for this transport</param>
29+
/// <param name="pipelineProvider">In charge of create a new pipeline, safe to pass null to use the default</param>
30+
/// <param name="dateTimeProvider">The date time proved to use, safe to pass null to use the default</param>
31+
/// <param name="memoryStreamFactory">The memory stream provider to use, safe to pass null to use the default</param>
32+
public Transport(
33+
TConnectionSettings configurationValues,
34+
IRequestPipelineFactory pipelineProvider,
35+
IDateTimeProvider dateTimeProvider,
36+
IMemoryStreamFactory memoryStreamFactory
37+
)
38+
{
39+
configurationValues.ThrowIfNull(nameof(configurationValues));
40+
configurationValues.ConnectionPool.ThrowIfNull(nameof(configurationValues.ConnectionPool));
41+
configurationValues.Connection.ThrowIfNull(nameof(configurationValues.Connection));
42+
configurationValues.Serializer.ThrowIfNull(nameof(configurationValues.Serializer));
43+
44+
this.Settings = configurationValues;
45+
this.PipelineProvider = pipelineProvider ?? new RequestPipelineFactory();
46+
this.DateTimeProvider = dateTimeProvider ?? Elasticsearch.Net.DateTimeProvider.Default;
47+
this.MemoryStreamFactory = memoryStreamFactory ?? new MemoryStreamFactory();
48+
}
49+
50+
public ElasticsearchResponse<TReturn> Request<TReturn>(HttpMethod method, string path, PostData<object> data = null, IRequestParameters requestParameters = null)
51+
where TReturn : class
52+
{
53+
using (var pipeline = this.PipelineProvider.Create(this.Settings, this.DateTimeProvider, this.MemoryStreamFactory, requestParameters))
54+
{
55+
pipeline.FirstPoolUsage(this.Settings.BootstrapLock);
56+
57+
var requestData = new RequestData(method, path, data, this.Settings, requestParameters, this.MemoryStreamFactory);
58+
this.Settings.OnRequestDataCreated?.Invoke(requestData);
59+
ElasticsearchResponse<TReturn> response = null;
60+
61+
var seenExceptions = new List<PipelineException>();
62+
foreach (var node in pipeline.NextNode())
63+
{
64+
requestData.Node = node;
65+
try
66+
{
67+
pipeline.SniffOnStaleCluster();
68+
Ping(pipeline, node);
69+
response = pipeline.CallElasticsearch<TReturn>(requestData);
70+
if (!response.SuccessOrKnownError)
71+
{
72+
pipeline.MarkDead(node);
73+
pipeline.SniffOnConnectionFailure();
74+
}
75+
}
76+
catch (PipelineException pipelineException) when (!pipelineException.Recoverable)
77+
{
78+
pipeline.MarkDead(node);
79+
seenExceptions.Add(pipelineException);
80+
break;
81+
}
82+
catch (PipelineException pipelineException)
83+
{
84+
pipeline.MarkDead(node);
85+
seenExceptions.Add(pipelineException);
86+
}
87+
catch (Exception killerException)
88+
{
89+
throw new UnexpectedElasticsearchClientException(killerException, seenExceptions)
90+
{
91+
Request = requestData,
92+
Response = response,
93+
AuditTrail = pipeline?.AuditTrail
94+
};
95+
}
96+
if (response == null || !response.SuccessOrKnownError) continue;
97+
pipeline.MarkAlive(node);
98+
break;
99+
}
100+
if (response == null || !response.Success)
101+
pipeline.BadResponse(ref response, requestData, seenExceptions);
102+
103+
this.Settings.OnRequestCompleted?.Invoke(response);
104+
105+
return response;
106+
}
107+
}
108+
109+
public async Task<ElasticsearchResponse<TReturn>> RequestAsync<TReturn>(HttpMethod method, string path, PostData<object> data = null, IRequestParameters requestParameters = null)
110+
where TReturn : class
111+
{
112+
using (var pipeline = this.PipelineProvider.Create(this.Settings, this.DateTimeProvider, this.MemoryStreamFactory, requestParameters))
113+
{
114+
await pipeline.FirstPoolUsageAsync(this.Settings.BootstrapLock).ConfigureAwait(false);
115+
116+
var requestData = new RequestData(method, path, data, this.Settings, requestParameters, this.MemoryStreamFactory);
117+
this.Settings.OnRequestDataCreated?.Invoke(requestData);
118+
ElasticsearchResponse<TReturn> response = null;
119+
120+
var seenExceptions = new List<PipelineException>();
121+
foreach (var node in pipeline.NextNode())
122+
{
123+
requestData.Node = node;
124+
try
125+
{
126+
await pipeline.SniffOnStaleClusterAsync().ConfigureAwait(false);
127+
await PingAsync(pipeline, node).ConfigureAwait(false);
128+
response = await pipeline.CallElasticsearchAsync<TReturn>(requestData).ConfigureAwait(false);
129+
if (!response.SuccessOrKnownError)
130+
{
131+
pipeline.MarkDead(node);
132+
await pipeline.SniffOnConnectionFailureAsync().ConfigureAwait(false);
133+
}
134+
}
135+
catch (PipelineException pipelineException) when (!pipelineException.Recoverable)
136+
{
137+
pipeline.MarkDead(node);
138+
seenExceptions.Add(pipelineException);
139+
break;
140+
}
141+
catch (PipelineException pipelineException)
142+
{
143+
pipeline.MarkDead(node);
144+
seenExceptions.Add(pipelineException);
145+
}
146+
catch (Exception killerException)
147+
{
148+
throw new UnexpectedElasticsearchClientException(killerException, seenExceptions)
149+
{
150+
Request = requestData,
151+
Response = response,
152+
AuditTrail = pipeline.AuditTrail
153+
};
154+
}
155+
if (response == null || !response.SuccessOrKnownError) continue;
156+
pipeline.MarkAlive(node);
157+
break;
158+
}
159+
if (response == null || !response.Success)
160+
pipeline.BadResponse(ref response, requestData, seenExceptions);
161+
162+
this.Settings.OnRequestCompleted?.Invoke(response);
163+
164+
return response;
165+
}
166+
}
167+
168+
private static void Ping(IRequestPipeline pipeline, Node node)
169+
{
170+
try
171+
{
172+
pipeline.Ping(node);
173+
}
174+
catch (PipelineException e) when (e.Recoverable)
175+
{
176+
pipeline.SniffOnConnectionFailure();
177+
throw;
178+
}
179+
}
180+
181+
private static async Task PingAsync(IRequestPipeline pipeline, Node node)
182+
{
183+
try
184+
{
185+
await pipeline.PingAsync(node).ConfigureAwait(false);
186+
}
187+
catch (PipelineException e) when (e.Recoverable)
188+
{
189+
await pipeline.SniffOnConnectionFailureAsync().ConfigureAwait(false);
190+
throw;
191+
}
192+
}
193+
194+
}
195+
}

0 commit comments

Comments
 (0)