/
PeerClientFactory.kt
117 lines (106 loc) · 3.81 KB
/
PeerClientFactory.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package misk.client
import com.google.common.cache.CacheBuilder
import com.google.common.cache.CacheLoader
import com.google.inject.Provides
import misk.clustering.Cluster
import misk.config.AppName
import misk.inject.KAbstractModule
import misk.security.cert.X500Name
import misk.web.WebConfig
import misk.web.jetty.JettyService
import okhttp3.OkHttpClient
import java.security.cert.X509Certificate
import java.util.concurrent.TimeUnit
import javax.inject.Singleton
import javax.net.ssl.HostnameVerifier
import javax.net.ssl.SSLSession
/**
* Binds a [PeerClientFactory] that calls peers on the HTTPS port of this process's server,
* as determined by the SSL port in the [WebConfig].
*/
class PeerClientModule : KAbstractModule() {
@Provides @Singleton fun peerClientFactory(
@AppName appName: String,
httpClientsConfig: HttpClientsConfig,
httpClientFactory: HttpClientFactory,
webConfig: WebConfig
): PeerClientFactory {
check(webConfig.ssl?.port ?: 0 > 0) { "server must have static HTTPS port" }
return PeerClientFactory(
appName = appName,
httpClientsConfig = httpClientsConfig,
httpClientFactory = httpClientFactory,
httpsPort = webConfig.ssl!!.port
)
}
}
/**
* For testing.
*
* Binds a [PeerClientFactory] that calls peers on the HTTPS port of this process's server,
* as determined by the Jetty server's port.
*/
class JettyPortPeerClientModule : KAbstractModule() {
@Provides @Singleton fun peerClientFactory(
@AppName appName: String,
httpClientsConfig: HttpClientsConfig,
httpClientFactory: HttpClientFactory,
jetty: JettyService
): PeerClientFactory {
return PeerClientFactory(
appName = appName,
httpClientsConfig = httpClientsConfig,
httpClientFactory = httpClientFactory,
httpsPort = jetty.httpsServerUrl!!.port
)
}
}
/**
* Factory that creates [OkHttpClient]s for connecting to another instance of the same application
* running in the same cluster.
*
* An [OkHttpClient] is cached for each peer.
*/
class PeerClientFactory(
private val appName: String,
private val httpClientsConfig: HttpClientsConfig,
private val httpClientFactory: HttpClientFactory,
private val httpsPort: Int
) {
private val cache = CacheBuilder.newBuilder()
.expireAfterAccess(5, TimeUnit.MINUTES)
.build<Cluster.Member, OkHttpClient>(object : CacheLoader<Cluster.Member, OkHttpClient>() {
override fun load(peer: Cluster.Member): OkHttpClient {
val config = httpClientsConfig[appName].copy(
url = baseUrl(peer),
envoy = null
)
return httpClientFactory.create(config).newBuilder()
.hostnameVerifier(object : HostnameVerifier {
override fun verify(hostname: String?, session: SSLSession?): Boolean {
val ou =
(session?.peerCertificates?.firstOrNull() as? X509Certificate)?.let { peerCert ->
X500Name.parse(peerCert.subjectX500Principal.name).organizationalUnit
}
return appName == ou
}
})
.build()
}
})
init {
require(httpsPort > 0) { "port must be a positive integer " }
// There must be web client config. The URL and Envoy config are ultimately ignored.
httpClientsConfig[appName] // This throws if config is missing
}
/** Get the base URL for calling the given peer cluster member. */
fun baseUrl(peer: Cluster.Member): String {
return "https://${peer.ipAddress}:$httpsPort"
}
/**
* Get a client to call the given peer cluster member.
* This client will fail when calling different services, as determined by the OU in the certificate
* returned by the called service.
*/
fun client(peer: Cluster.Member): OkHttpClient = cache[peer]
}