/
ServiceDiscovery.scala
316 lines (263 loc) · 9.99 KB
/
ServiceDiscovery.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
/*
* Copyright (C) 2017-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.discovery
import java.net.InetAddress
import java.util.Optional
import java.util.concurrent.CompletionStage
import java.util.concurrent.TimeUnit
import scala.collection.immutable
import scala.compat.java8.OptionConverters._
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import akka.actor.DeadLetterSuppression
import akka.util.HashCode
object ServiceDiscovery {
object Resolved {
def apply(serviceName: String, addresses: immutable.Seq[ResolvedTarget]): Resolved =
new Resolved(serviceName, addresses)
def unapply(resolved: Resolved): Option[(String, immutable.Seq[ResolvedTarget])] =
Some((resolved.serviceName, resolved.addresses))
}
/** Result of a successful resolve request */
final class Resolved(val serviceName: String, val addresses: immutable.Seq[ResolvedTarget])
extends DeadLetterSuppression {
/**
* Java API
*/
def getAddresses: java.util.List[ResolvedTarget] = {
import scala.collection.JavaConverters._
addresses.asJava
}
override def toString: String = s"Resolved($serviceName,$addresses)"
override def equals(obj: Any): Boolean = obj match {
case other: Resolved ⇒ serviceName == other.serviceName && addresses == other.addresses
case _ ⇒ false
}
override def hashCode(): Int = {
var result = HashCode.SEED
result = HashCode.hash(result, serviceName)
result = HashCode.hash(result, addresses)
result
}
}
object ResolvedTarget {
// Simply compare the bytes of the address.
// This may not work in exotic cases such as IPv4 addresses encoded as IPv6 addresses.
private implicit val inetAddressOrdering: Ordering[InetAddress] =
Ordering.by[InetAddress, Iterable[Byte]](_.getAddress)
implicit val addressOrdering: Ordering[ResolvedTarget] = Ordering.by { t ⇒
(t.address, t.host, t.port)
}
/**
* @param host the hostname or the IP address of the target
* @param port optional port number
* @param address IP address of the target. This is used during cluster bootstap when available.
*/
def apply(host: String, port: Option[Int], address: Option[InetAddress]): ResolvedTarget =
new ResolvedTarget(host, port, address)
}
/**
* Resolved target host, with optional port and the IP address.
* @param host the hostname or the IP address of the target
* @param port optional port number
* @param address optional IP address of the target. This is used during cluster bootstap when available.
*/
final class ResolvedTarget(
val host: String,
val port: Option[Int],
val address: Option[InetAddress]
) {
/**
* Java API
*/
def getPort: Optional[Int] =
port.asJava
/**
* Java API
*/
def getAddress: Optional[InetAddress] =
address.asJava
override def toString: String = s"ResolvedTarget($host,$port,$address)"
override def equals(obj: Any): Boolean = obj match {
case other: ResolvedTarget ⇒ host == other.host && port == other.port && address == other.address
case _ ⇒ false
}
override def hashCode(): Int = {
var result = HashCode.SEED
result = HashCode.hash(result, host)
result = HashCode.hash(result, port)
result = HashCode.hash(result, address)
result
}
}
}
/**
* A service lookup. It is up to each method to decide
* what to do with the optional portName and protocol fields.
* For example `portName` could be used to distinguish between
* Akka remoting ports and HTTP ports.
*
* @throws IllegalArgumentException if [[serviceName]] is 'null' or an empty String
*/
final class Lookup(
val serviceName: String,
val portName: Option[String],
val protocol: Option[String]) {
require(serviceName != null, "'serviceName' cannot be null")
require(serviceName.trim.nonEmpty, "'serviceName' cannot be empty")
/**
* Which port for a service e.g. Akka remoting or HTTP.
* Maps to "service" for an SRV records.
*/
def withPortName(value: String): Lookup = copy(portName = Some(value))
/**
* Which protocol e.g. TCP or UDP.
* Maps to "protocol" for SRV records.
*/
def withProtocol(value: String): Lookup = copy(protocol = Some(value))
/**
* Java API
*/
def getPortName: Optional[String] =
portName.asJava
/**
* Java API
*/
def getProtocol: Optional[String] =
protocol.asJava
private def copy(
serviceName: String = serviceName,
portName: Option[String] = portName,
protocol: Option[String] = protocol): Lookup =
new Lookup(serviceName, portName, protocol)
override def toString: String = s"Lookup($serviceName,$portName,$protocol)"
override def equals(obj: Any): Boolean = obj match {
case other: Lookup ⇒ serviceName == other.serviceName && portName == other.portName && protocol == other.protocol
case _ ⇒ false
}
override def hashCode(): Int = {
var result = HashCode.SEED
result = HashCode.hash(result, serviceName)
result = HashCode.hash(result, portName)
result = HashCode.hash(result, protocol)
result
}
}
case object Lookup {
/**
* Create a service Lookup with only a serviceName.
* Use withPortName and withProtocol to provide optional portName
* and protocol
*/
def apply(serviceName: String): Lookup = new Lookup(serviceName, None, None)
/**
* Create a service Lookup with `serviceName`, optional `portName` and optional `protocol`.
*/
def apply(serviceName: String, portName: Option[String], protocol: Option[String]): Lookup =
new Lookup(serviceName, portName, protocol)
/**
* Java API
*
* Create a service Lookup with only a serviceName.
* Use withPortName and withProtocol to provide optional portName
* and protocol
*/
def create(serviceName: String): Lookup = new Lookup(serviceName, None, None)
private val SrvQuery = """^_(.+?)\._(.+?)\.(.+?)$""".r
/**
* Validates domain name:
* - a node name has 1 to 63 chars
* - valid chars for a node name are: a-z, A-Z, 0-9 and -
* - a node name can't start with - character
* - a node name can't end with - character
* - nodes are separated by a . character
*
* Starts with a node:
* Node Pattern: (?!-)[A-Za-z0-9-]{1,63}(?<!-)
* (?!-) => negative look ahead, first char can't be -
* [A-Za-z0-9-]{1,63} => digits and letters, from 1 to 63
* (?<!-) => negative look behind, last char can't be -
*
* A node can be followed by another nodes:
* Pattern: (\.(?!-)[A-Za-z0-9-]{1,63}(?<!-)))*
* . => starts with a . (dot)
* node pattern => (?!-)[A-Za-z0-9-]{1,63}(?<!-)
* * => match zero or more times
*/
private val DomainName = "^((?!-)[A-Za-z0-9-]{1,63}(?<!-))((\\.(?!-)[A-Za-z0-9-]{1,63}(?<!-)))*$".r
/**
* Create a service Lookup from a string with format:
* _portName._protocol.serviceName.
* (as specified by https://www.ietf.org/rfc/rfc2782.txt)
*
* If the passed string conforms with this format, a SRV Lookup is returned.
* The serviceName part must be a valid domain name.
*
* The string is parsed and dismembered to build a Lookup as following:
* Lookup(serviceName).withPortName(portName).withProtocol(protocol)
*
* @throws NullPointerException If the passed string is null
* @throws IllegalArgumentException If the string doesn't not conform with the SRV format
*/
def parseSrv(str: String): Lookup =
str match {
case SrvQuery(portName, protocol, serviceName) if validDomainName(serviceName) ⇒
Lookup(serviceName).withPortName(portName).withProtocol(protocol)
case null ⇒ throw new NullPointerException("Unable to create Lookup from passed SRV string. Passed value is 'null'")
case _ ⇒ throw new IllegalArgumentException(s"Unable to create Lookup from passed SRV string, invalid format: $str")
}
/**
* Returns true if passed string conforms with SRV format. Otherwise returns false.
*/
def isValidSrv(srv: String): Boolean =
srv match {
case SrvQuery(_, _, serviceName) ⇒ validDomainName(serviceName)
case _ ⇒ false
}
private def validDomainName(name: String): Boolean =
DomainName.pattern.asPredicate().test(name)
}
/**
* Implement to provide a service discovery method
*
*/
abstract class ServiceDiscovery {
import ServiceDiscovery._
/**
* Scala API: Perform lookup using underlying discovery implementation.
*
* @param lookup A service discovery lookup.
* @param resolveTimeout Timeout. Up to the discovery-method to adhere to his
*/
def lookup(lookup: Lookup, resolveTimeout: FiniteDuration): Future[Resolved]
/**
* Scala API: Perform lookup using underlying discovery implementation.
*
* Convenience for when only a name is required.
*/
def lookup(serviceName: String, resolveTimeout: FiniteDuration): Future[Resolved] =
lookup(Lookup(serviceName), resolveTimeout)
/**
* Java API: Perform basic lookup using underlying discovery implementation.
*
* While the implementation may provide other settings and ways to configure timeouts,
* the passed `resolveTimeout` should never be exceeded, as it signals the application's
* eagerness to wait for a result for this specific lookup.
*
* The returned future SHOULD be failed once resolveTimeout has passed.
*
*/
def lookup(query: Lookup, resolveTimeout: java.time.Duration): CompletionStage[Resolved] = {
import scala.compat.java8.FutureConverters._
lookup(query, FiniteDuration(resolveTimeout.toMillis, TimeUnit.MILLISECONDS)).toJava
}
/**
* Java API
*
* @param serviceName A name, see discovery-method's docs for how this is interpreted
* @param resolveTimeout Timeout. Up to the discovery-methodto adhere to his
*/
def lookup(serviceName: String, resolveTimeout: java.time.Duration): CompletionStage[Resolved] =
lookup(Lookup(serviceName), resolveTimeout)
}