/
HttpServerBuilder.java
432 lines (401 loc) · 21.6 KB
/
HttpServerBuilder.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
/*
* Copyright © 2018-2020 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.http.api;
import io.servicetalk.buffer.api.BufferAllocator;
import io.servicetalk.concurrent.api.AsyncContext;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.http.api.HttpApiConversions.ServiceAdapterHolder;
import io.servicetalk.logging.api.LogLevel;
import io.servicetalk.transport.api.ConnectionAcceptor;
import io.servicetalk.transport.api.ConnectionAcceptorFactory;
import io.servicetalk.transport.api.IoExecutor;
import io.servicetalk.transport.api.ServerContext;
import io.servicetalk.transport.api.ServiceTalkSocketOptions;
import io.servicetalk.transport.api.TransportObserver;
import java.net.SocketOption;
import java.net.StandardSocketOptions;
import java.util.function.BooleanSupplier;
import java.util.function.Predicate;
import javax.annotation.Nullable;
import static io.servicetalk.http.api.BlockingUtils.blockingInvocation;
import static io.servicetalk.http.api.HttpApiConversions.toStreamingHttpService;
import static io.servicetalk.http.api.HttpExecutionStrategies.defaultStrategy;
import static io.servicetalk.http.api.HttpExecutionStrategyInfluencer.defaultStreamingInfluencer;
import static io.servicetalk.http.api.StrategyInfluencerAwareConversions.toConditionalServiceFilterFactory;
import static io.servicetalk.transport.api.ConnectionAcceptor.ACCEPT_ALL;
/**
* A builder for building HTTP Servers.
*/
public abstract class HttpServerBuilder {
@Nullable
private ConnectionAcceptorFactory connectionAcceptorFactory;
@Nullable
private StreamingHttpServiceFilterFactory serviceFilter;
private HttpExecutionStrategy strategy = defaultStrategy();
private final StrategyInfluencerChainBuilder influencerChainBuilder = new StrategyInfluencerChainBuilder();
private boolean drainRequestPayloadBody = true;
/**
* Configurations of various HTTP protocol versions.
* <p>
* <b>Note:</b> the order of specified protocols will reflect on priorities for ALPN in case the connections are
* {@link #secure() secured}.
*
* @param protocols {@link HttpProtocolConfig} for each protocol that should be supported.
* @return {@code this}.
*/
public abstract HttpServerBuilder protocols(HttpProtocolConfig... protocols);
/**
* Sets the maximum queue length for incoming connection indications (a request to connect) is set to the backlog
* parameter. If a connection indication arrives when the queue is full, the connection may time out.
*
* @param backlog the backlog to use when accepting connections.
* @return {@code this}.
*/
public abstract HttpServerBuilder backlog(int backlog);
/**
* Initiates security configuration for this server. Calling any {@code commit} method on the returned
* {@link HttpServerSecurityConfigurator} will commit the configuration.
* <p>
* Additionally use {@link #secure(String...)} to define configurations for specific
* <a href="https://tools.ietf.org/html/rfc6066#section-3">SNI</a> hostnames. If such configuration is additionally
* defined then configuration using this method is used as default if the hostname does not match any of the
* specified hostnames.
*
* @return {@link HttpServerSecurityConfigurator} to configure security for this server. It is
* mandatory to call any one of the {@code commit} methods after all configuration is done.
*/
public abstract HttpServerSecurityConfigurator secure();
/**
* Initiates security configuration for this server for the passed {@code sniHostnames}.
* Calling any {@code commit} method on the returned {@link HttpServerSecurityConfigurator} will commit the
* configuration.
* <p>
* When using this method, it is mandatory to also define the default configuration using {@link #secure()} which
* is used when the hostname does not match any of the specified {@code sniHostnames}.
*
* @param sniHostnames <a href="https://tools.ietf.org/html/rfc6066#section-3">SNI</a> hostnames for which this
* config is being defined.
* @return {@link HttpServerSecurityConfigurator} to configure security for this server. It is
* mandatory to call any one of the {@code commit} methods after all configuration is done.
*/
public abstract HttpServerSecurityConfigurator secure(String... sniHostnames);
/**
* Adds a {@link SocketOption} that is applied.
*
* @param <T> the type of the value.
* @param option the option to apply.
* @param value the value.
* @return this.
* @see StandardSocketOptions
* @see ServiceTalkSocketOptions
*/
public abstract <T> HttpServerBuilder socketOption(SocketOption<T> option, T value);
/**
* Enables wire-logging for this server.
* <p>
* @deprecated Use {@link #enableWireLogging(String, LogLevel, BooleanSupplier)} instead.
* @param loggerName The name of the logger to log wire events.
* @return {@code this}.
*/
@Deprecated
public abstract HttpServerBuilder enableWireLogging(String loggerName);
/**
* Enables wire-logging for this server.
*
* @param loggerName The name of the logger to log wire events.
* @param logLevel The level to log at.
* @param logUserData {@code true} to include user data (e.g. data, headers, etc.). {@code false} to exclude user
* data and log only network events.
* @return {@code this}.
*/
public abstract HttpServerBuilder enableWireLogging(String loggerName, LogLevel logLevel,
BooleanSupplier logUserData);
/**
* Sets a {@link TransportObserver} that provides visibility into transport events.
*
* @param transportObserver A {@link TransportObserver} that provides visibility into transport events.
* @return {@code this}.
*/
public abstract HttpServerBuilder transportObserver(TransportObserver transportObserver);
/**
* Disables automatic consumption of request {@link StreamingHttpRequest#payloadBody() payload body} when it is not
* consumed by the service.
* <p>
* For <a href="https://tools.ietf.org/html/rfc7230#section-6.3">persistent HTTP connections</a> it is required to
* eventually consume the entire request payload to enable reading of the next request. This is required because
* requests are pipelined for HTTP/1.1, so if the previous request is not completely read, next request can not be
* read from the socket. For cases when there is a possibility that user may forget to consume request payload,
* ServiceTalk automatically consumes request payload body. This automatic consumption behavior may create some
* overhead and can be disabled using this method when it is guaranteed that all request paths consumes all request
* payloads eventually. An example of guaranteed consumption are {@link HttpRequest non-streaming APIs}.
*
* @return {@code this}.
*/
public final HttpServerBuilder disableDrainingRequestPayloadBody() {
this.drainRequestPayloadBody = false;
return this;
}
/**
* Appends the filter to the chain of filters used to decorate the {@link ConnectionAcceptor} used by this builder.
* <p>
* The order of execution of these filters are in order of append. If 3 filters are added as follows:
* <pre>
* builder.appendConnectionAcceptorFilter(filter1).appendConnectionAcceptorFilter(filter2).
* appendConnectionAcceptorFilter(filter3)
* </pre>
* accepting a connection by a filter wrapped by this filter chain, the order of invocation of these filters will
* be:
* <pre>
* filter1 => filter2 => filter3
* </pre>
*
* @param factory {@link ConnectionAcceptorFactory} to append. Lifetime of this
* {@link ConnectionAcceptorFactory} is managed by this builder and the server started thereof.
* @return {@code this}
*/
public final HttpServerBuilder appendConnectionAcceptorFilter(final ConnectionAcceptorFactory factory) {
if (connectionAcceptorFactory == null) {
connectionAcceptorFactory = factory;
} else {
connectionAcceptorFactory = connectionAcceptorFactory.append(factory);
}
return this;
}
/**
* Appends the filter to the chain of filters used to decorate the {@link StreamingHttpService} used by this
* builder.
* <p>
* Note this method will be used to decorate the {@link StreamingHttpService} passed to
* {@link #listenStreaming(StreamingHttpService)} before it is used by the server.
* <p>
* The order of execution of these filters are in order of append. If 3 filters are added as follows:
* <pre>
* builder.append(filter1).append(filter2).append(filter3)
* </pre>
* accepting a request by a service wrapped by this filter chain, the order of invocation of these filters will be:
* <pre>
* filter1 => filter2 => filter3 => service
* </pre>
*
* @param factory {@link StreamingHttpServiceFilterFactory} to append.
* @return {@code this}
*/
public final HttpServerBuilder appendServiceFilter(final StreamingHttpServiceFilterFactory factory) {
if (serviceFilter == null) {
serviceFilter = factory;
} else {
serviceFilter = serviceFilter.append(factory);
}
if (!influencerChainBuilder.appendIfInfluencer(factory)) {
influencerChainBuilder.append(defaultStreamingInfluencer());
}
return this;
}
/**
* Appends the filter to the chain of filters used to decorate the {@link StreamingHttpService} used by this
* builder, for every request that passes the provided {@link Predicate}.
* <p>
* Note this method will be used to decorate the {@link StreamingHttpService} passed to
* {@link #listenStreaming(StreamingHttpService)} before it is used by the server.
* <p>
* The order of execution of these filters are in order of append. If 3 filters are added as follows:
* <pre>
* builder.append(filter1).append(filter2).append(filter3)
* </pre>
* accepting a request by a service wrapped by this filter chain, the order of invocation of these filters will be:
* <pre>
* filter1 => filter2 => filter3 => service
* </pre>
*
* @param predicate the {@link Predicate} to test if the filter must be applied.
* @param factory {@link StreamingHttpServiceFilterFactory} to append.
* @return {@code this}
*/
public final HttpServerBuilder appendServiceFilter(final Predicate<StreamingHttpRequest> predicate,
final StreamingHttpServiceFilterFactory factory) {
appendServiceFilter(toConditionalServiceFilterFactory(predicate, factory));
return this;
}
/**
* Sets the {@link IoExecutor} to be used by this server.
*
* @param ioExecutor {@link IoExecutor} to use.
* @return {@code this}.
*/
public abstract HttpServerBuilder ioExecutor(IoExecutor ioExecutor);
/**
* Sets the {@link BufferAllocator} to be used by this server.
*
* @param allocator {@link BufferAllocator} to use.
* @return {@code this}.
*/
public abstract HttpServerBuilder bufferAllocator(BufferAllocator allocator);
/**
* Sets the {@link HttpExecutionStrategy} to be used by this server.
*
* @param strategy {@link HttpExecutionStrategy} to use by this server.
* @return {@code this}.
*/
public final HttpServerBuilder executionStrategy(HttpExecutionStrategy strategy) {
this.strategy = strategy;
return this;
}
/**
* Starts this server and returns the {@link ServerContext} after the server has been successfully started.
* <p>
* If the underlying protocol (eg. TCP) supports it this will result in a socket bind/listen on {@code address}.
*
* @param service Service invoked for every request received by this server. The returned {@link ServerContext}
* manages the lifecycle of the {@code service}, ensuring it is closed when the {@link ServerContext} is closed.
* @return A {@link ServerContext} by blocking the calling thread until the server is successfully started or
* throws an {@link Exception} if the server could not be started.
* @throws Exception if the server could not be started.
*/
public final ServerContext listenAndAwait(final HttpService service) throws Exception {
return blockingInvocation(listen(service));
}
/**
* Starts this server and returns the {@link ServerContext} after the server has been successfully started.
* <p>
* If the underlying protocol (eg. TCP) supports it this will result in a socket bind/listen on {@code address}.
*
* @param handler Service invoked for every request received by this server. The returned {@link ServerContext}
* manages the lifecycle of the {@code service}, ensuring it is closed when the {@link ServerContext} is closed.
* @return A {@link ServerContext} by blocking the calling thread until the server is successfully started or
* throws an {@link Exception} if the server could not be started.
* @throws Exception if the server could not be started.
*/
public final ServerContext listenStreamingAndAwait(final StreamingHttpService handler) throws Exception {
return blockingInvocation(listenStreaming(handler));
}
/**
* Starts this server and returns the {@link ServerContext} after the server has been successfully started.
* <p>
* If the underlying protocol (eg. TCP) supports it this will result in a socket bind/listen on {@code address}.
*
* @param service Service invoked for every request received by this server. The returned {@link ServerContext}
* manages the lifecycle of the {@code service}, ensuring it is closed when the {@link ServerContext} is closed.
* @return A {@link ServerContext} by blocking the calling thread until the server is successfully started or
* throws an {@link Exception} if the server could not be started.
* @throws Exception if the server could not be started.
*/
public final ServerContext listenBlockingAndAwait(final BlockingHttpService service) throws Exception {
return blockingInvocation(listenBlocking(service));
}
/**
* Starts this server and returns the {@link ServerContext} after the server has been successfully started.
* <p>
* If the underlying protocol (eg. TCP) supports it this will result in a socket bind/listen on {@code address}.
*
* @param handler Service invoked for every request received by this server. The returned {@link ServerContext}
* manages the lifecycle of the {@code service}, ensuring it is closed when the {@link ServerContext} is closed.
* @return A {@link ServerContext} by blocking the calling thread until the server is successfully started or
* throws an {@link Exception} if the server could not be started.
* @throws Exception if the server could not be started.
*/
public final ServerContext listenBlockingStreamingAndAwait(
final BlockingStreamingHttpService handler) throws Exception {
return blockingInvocation(listenBlockingStreaming(handler));
}
/**
* Starts this server and returns the {@link ServerContext} after the server has been successfully started.
* <p>
* If the underlying protocol (eg. TCP) supports it this will result in a socket bind/listen on {@code address}.
*
* @param service Service invoked for every request received by this server. The returned {@link ServerContext}
* manages the lifecycle of the {@code service}, ensuring it is closed when the {@link ServerContext} is closed.
* @return A {@link Single} that completes when the server is successfully started or terminates with an error if
* the server could not be started.
*/
public final Single<ServerContext> listen(final HttpService service) {
influencerChainBuilder.prependIfInfluencer(service);
return listenForAdapter(toStreamingHttpService(service, influencerChainBuilder.build(strategy)));
}
/**
* Starts this server and returns the {@link ServerContext} after the server has been successfully started.
* <p>
* If the underlying protocol (eg. TCP) supports it this will result in a socket bind/listen on {@code address}.
*
* @param service Service invoked for every request received by this server. The returned {@link ServerContext}
* manages the lifecycle of the {@code service}, ensuring it is closed when the {@link ServerContext} is closed.
* @return A {@link Single} that completes when the server is successfully started or terminates with an error if
* the server could not be started.
*/
public final Single<ServerContext> listenStreaming(final StreamingHttpService service) {
return listenForService(service, strategy);
}
/**
* Starts this server and returns the {@link ServerContext} after the server has been successfully started.
* <p>
* If the underlying protocol (eg. TCP) supports it this will result in a socket bind/listen on {@code address}.
*
* @param service Service invoked for every request received by this server. The returned {@link ServerContext}
* manages the lifecycle of the {@code service}, ensuring it is closed when the {@link ServerContext} is closed.
* @return A {@link Single} that completes when the server is successfully started or terminates with an error if
* the server could not be started.
*/
public final Single<ServerContext> listenBlocking(final BlockingHttpService service) {
influencerChainBuilder.prependIfInfluencer(service);
return listenForAdapter(toStreamingHttpService(service, influencerChainBuilder.build(strategy)));
}
/**
* Starts this server and returns the {@link ServerContext} after the server has been successfully started.
* <p>
* If the underlying protocol (eg. TCP) supports it this will result in a socket bind/listen on {@code address}.
*
* @param service Service invoked for every request received by this server. The returned {@link ServerContext}
* manages the lifecycle of the {@code service}, ensuring it is closed when the {@link ServerContext} is closed.
* @return A {@link Single} that completes when the server is successfully started or terminates with an error if
* the server could not be started.
*/
public final Single<ServerContext> listenBlockingStreaming(final BlockingStreamingHttpService service) {
influencerChainBuilder.prependIfInfluencer(service);
return listenForAdapter(toStreamingHttpService(service, influencerChainBuilder.build(strategy)));
}
/**
* Starts this server and returns the {@link ServerContext} after the server has been successfully started.
* <p>
* If the underlying protocol (eg. TCP) supports it this should result in a socket bind/listen on {@code address}.
*
* @param connectionAcceptor {@link ConnectionAcceptor} to use for the server.
* @param service {@link StreamingHttpService} to use for the server.
* @param strategy the {@link HttpExecutionStrategy} to use for the service.
* @param drainRequestPayloadBody if {@code true} the server implementation should automatically subscribe and
* ignore the {@link StreamingHttpRequest#payloadBody() payload body} of incoming requests.
* @return A {@link Single} that completes when the server is successfully started or terminates with an error if
* the server could not be started.
*/
protected abstract Single<ServerContext> doListen(@Nullable ConnectionAcceptor connectionAcceptor,
StreamingHttpService service,
HttpExecutionStrategy strategy,
boolean drainRequestPayloadBody);
private Single<ServerContext> listenForAdapter(ServiceAdapterHolder adapterHolder) {
return listenForService(adapterHolder.adaptor(), adapterHolder.serviceInvocationStrategy());
}
private Single<ServerContext> listenForService(StreamingHttpService rawService, HttpExecutionStrategy strategy) {
ConnectionAcceptor connectionAcceptor = connectionAcceptorFactory == null ? null :
connectionAcceptorFactory.create(ACCEPT_ALL);
StreamingHttpServiceFilterFactory currServiceFilter = serviceFilter;
if (!AsyncContext.isDisabled()) {
StreamingHttpServiceFilterFactory asyncContextFilter = new AsyncContextAwareHttpServiceFilter();
currServiceFilter = currServiceFilter == null ?
asyncContextFilter : asyncContextFilter.append(currServiceFilter);
}
StreamingHttpService filteredService = currServiceFilter != null ?
currServiceFilter.create(rawService) : rawService;
return doListen(connectionAcceptor, filteredService, strategy, drainRequestPayloadBody);
}
}