-
Notifications
You must be signed in to change notification settings - Fork 124
/
NettyClientUtils.scala
117 lines (99 loc) · 4.18 KB
/
NettyClientUtils.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
/*
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.grpc.internal
import java.lang.reflect.Field
import java.util.concurrent.TimeUnit
import akka.Done
import akka.annotation.InternalApi
import akka.event.LoggingAdapter
import akka.grpc.GrpcClientSettings
import io.grpc.CallOptions
import io.grpc.netty.shaded.io.grpc.netty.{ GrpcSslContexts, NegotiationType, NettyChannelBuilder }
import io.grpc.netty.shaded.io.netty.handler.ssl._
import javax.net.ssl.SSLContext
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ ExecutionContext, Promise }
/**
* Used to indicate that the service discovery returned no target.
*
* Can be caught to re-try lookup if it is likely that
* your service discovery mechanism will resolve to different instances.
*/
final class NoTargetException(msg: String) extends RuntimeException(msg)
/**
* INTERNAL API
*/
@InternalApi
object NettyClientUtils {
/**
* INTERNAL API
*/
@InternalApi
def createChannel(settings: GrpcClientSettings, log: LoggingAdapter)(
implicit ec: ExecutionContext): InternalChannel = {
var builder =
NettyChannelBuilder
// Not sure why netty wants to be able to shoe-horn the target into a URI... but ok,
// we follow their lead and encode the service name as the 'authority' of the URI.
.forTarget("//" + settings.serviceName)
.flowControlWindow(NettyChannelBuilder.DEFAULT_FLOW_CONTROL_WINDOW)
.nameResolverFactory(
new AkkaDiscoveryNameResolverProvider(
settings.serviceDiscovery,
settings.defaultPort,
settings.servicePortName,
settings.serviceProtocol,
settings.resolveTimeout))
if (!settings.useTls)
builder = builder.usePlaintext()
else {
builder = settings.sslContext
.map(javaCtx => builder.negotiationType(NegotiationType.TLS).sslContext(nettyHttp2SslContext(javaCtx)))
.getOrElse(builder.negotiationType(NegotiationType.PLAINTEXT))
}
builder = settings.grpcLoadBalancingType.map(builder.defaultLoadBalancingPolicy(_)).getOrElse(builder)
builder = settings.overrideAuthority.map(builder.overrideAuthority(_)).getOrElse(builder)
builder = settings.userAgent.map(builder.userAgent(_)).getOrElse(builder)
builder = settings.channelBuilderOverrides(builder)
val channel = builder.build()
val promise = Promise[Done]()
ChannelUtils.monitorChannel(promise, channel, settings.connectionAttempts, log)
InternalChannel(channel, promise.future)
}
/**
* INTERNAL API
*
* Given a Java [[SSLContext]], create a Netty [[SslContext]] that can be used to build
* a Netty HTTP/2 channel.
*/
@InternalApi
private def nettyHttp2SslContext(javaSslContext: SSLContext): SslContext = {
// FIXME: Create a JdkSslContext using a normal constructor. Need to work out sensible values for all args first.
// In the meantime, use a Netty SslContextBuild to create a JdkSslContext, then use reflection to patch the
// object's internal SSLContext. It's not pretty, but it gets something working for now.
// Create a Netty JdkSslContext object with all the correct ciphers, protocol settings, etc initialized.
val nettySslContext: JdkSslContext =
GrpcSslContexts.configure(GrpcSslContexts.forClient, SslProvider.JDK).build.asInstanceOf[JdkSslContext]
// Patch the SSLContext value inside the JdkSslContext object
val nettySslContextField: Field = classOf[JdkSslContext].getDeclaredField("sslContext")
nettySslContextField.setAccessible(true)
nettySslContextField.set(nettySslContext, javaSslContext)
nettySslContext
}
/**
* INTERNAL API
*/
@InternalApi def callOptions(settings: GrpcClientSettings): CallOptions =
settings.callCredentials.map(CallOptions.DEFAULT.withCallCredentials).getOrElse(CallOptions.DEFAULT)
/**
* INTERNAL API
*/
@InternalApi private[akka] def callOptionsWithDeadline(
defaultOptions: CallOptions,
settings: GrpcClientSettings): CallOptions =
settings.deadline match {
case d: FiniteDuration => defaultOptions.withDeadlineAfter(d.toMillis, TimeUnit.MILLISECONDS)
case _ => defaultOptions
}
}