/
WebClientComposer.java
114 lines (93 loc) · 4.21 KB
/
WebClientComposer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package com.compcruz;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.tcp.TcpClient;
import java.util.List;
import java.util.function.Consumer;
import javax.net.ssl.SSLException;
import org.springframework.http.client.reactive.ClientHttpConnector;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.http.client.reactive.ReactorResourceFactory;
import org.springframework.http.codec.ClientCodecConfigurer;
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import org.springframework.web.reactive.function.client.WebClient;
import io.netty.channel.ChannelOption;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.timeout.ReadTimeoutHandler;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NonNull;
import lombok.Singular;
import lombok.extern.slf4j.Slf4j;
/**
* Instantiates a reactive {@link WebClient} to invoke HTTP call in reactive fashion. During JVM shutdown, the consumers
* of this common library should be mindful of closing the resources. {@link ReactorResourceFactory} should be declared
* as a bean so that spring will properly cleanup resources on shutdown.
*/
@Slf4j
@Builder
@Getter
@AllArgsConstructor(access = AccessLevel.PRIVATE)
public class WebClientComposer {
static final boolean DEFAULT_SOCKET_KEEP_ALIVE_ENABLED = true;
static final boolean DEFAULT_HTTP_PROXY_ENABLED = false;
static final int DEFAULT_CONNECTION_TIMEOUT_MILLIS = 5000;
static final int MAX_IN_MEMORY_SIZE_BYTES = 1024 * 1024;
@Builder.Default
private final String baseUrl = "";
@Builder.Default
private final int connectionTimeout = DEFAULT_CONNECTION_TIMEOUT_MILLIS;
@Builder.Default
private final int readTimeoutSecs = 0;
@Builder.Default
private final boolean socketKeepAlive = DEFAULT_SOCKET_KEEP_ALIVE_ENABLED;
@Builder.Default
private final int maxInMemorySize = MAX_IN_MEMORY_SIZE_BYTES;
@Builder.Default
private final int maxPoolConnectionSize = ConnectionProvider.DEFAULT_POOL_MAX_CONNECTIONS;
@Singular
private final List<ExchangeFilterFunction> requestFilters;
@Singular
private final List<ExchangeFilterFunction> responseFilters;
@NonNull
private final ReactorResourceFactory reactorResourceFactory;
@NonNull
private final SslContext sslContext = getSslContext();
/**
* Returns an instance of reactive web client with all needed properties configured.
*
* @return An instance of {@link WebClient}
*/
public WebClient getWebClient() {
final Consumer<ClientCodecConfigurer> codec = configurer ->
configurer.defaultCodecs().maxInMemorySize(this.maxInMemorySize);
return WebClient.builder()
.baseUrl(this.baseUrl)
.codecs(codec)
.filters(exchangeFilterFunctions -> exchangeFilterFunctions.addAll(this.requestFilters))
.filters(exchangeFilterFunctions -> exchangeFilterFunctions.addAll(this.responseFilters))
.clientConnector(getClientHttpConnector())
.build();
}
private ClientHttpConnector getClientHttpConnector() {
return new ReactorClientHttpConnector(this.reactorResourceFactory, httpClient -> httpClient.tcpConfiguration(tcpClient -> {
TcpClient enhancedTcpClient = tcpClient;
enhancedTcpClient = enhancedTcpClient.secure(t -> t.sslContext(this.sslContext))
.option(ChannelOption.SO_KEEPALIVE, this.socketKeepAlive)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, this.connectionTimeout);
if (this.readTimeoutSecs > 0) {
enhancedTcpClient = enhancedTcpClient.doOnConnected(connection -> connection.addHandlerLast(new ReadTimeoutHandler(this.readTimeoutSecs)));
}
return enhancedTcpClient;
}));
}
private SslContext getSslContext() {
try {
return SslContextBuilder.forClient().build();
} catch (final SSLException ex) {
throw new IllegalStateException("Unable to create SSL context", ex);
}
}
}