-
Notifications
You must be signed in to change notification settings - Fork 13k
/
NetUtils.java
431 lines (392 loc) · 14.3 KB
/
NetUtils.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.util;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sun.net.util.IPAddressUtil;
import java.io.IOException;
import java.net.Inet4Address;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.ServerSocket;
import java.net.URL;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
/**
* Utility for various network related tasks (such as finding free ports).
*/
@Internal
public class NetUtils {
private static final Logger LOG = LoggerFactory.getLogger(NetUtils.class);
/** The wildcard address to listen on all interfaces (either 0.0.0.0 or ::). */
private static final String WILDCARD_ADDRESS = new InetSocketAddress(0).getAddress().getHostAddress();
/**
* Turn a fully qualified domain name (fqdn) into a hostname. If the fqdn has multiple subparts
* (separated by a period '.'), it will take the first part. Otherwise it takes the entire fqdn.
*
* @param fqdn The fully qualified domain name.
* @return The hostname.
*/
public static String getHostnameFromFQDN(String fqdn) {
if (fqdn == null) {
throw new IllegalArgumentException("fqdn is null");
}
int dotPos = fqdn.indexOf('.');
if (dotPos == -1) {
return fqdn;
} else {
return fqdn.substring(0, dotPos);
}
}
/**
* Method to validate if the given String represents a hostname:port.
*
* <p>Works also for ipv6.
*
* <p>See: http://stackoverflow.com/questions/2345063/java-common-way-to-validate-and-convert-hostport-to-inetsocketaddress
*
* @return URL object for accessing host and Port
*/
private static URL validateHostPort(String hostPort) {
try {
URL u = new URL("http://" + hostPort);
if (u.getHost() == null) {
throw new IllegalArgumentException("The given host:port ('" + hostPort + "') doesn't contain a valid host");
}
if (u.getPort() == -1) {
throw new IllegalArgumentException("The given host:port ('" + hostPort + "') doesn't contain a valid port");
}
return u;
} catch (MalformedURLException e) {
throw new IllegalArgumentException("The given host:port ('" + hostPort + "') is invalid", e);
}
}
/**
* Validates the "host:port" string and returns a url.
*
* @return URL object for accessing host and Port
*/
public static URL getCorrectHostnamePort(String hostPort) {
return validateHostPort(hostPort);
}
/**
* Validates the "hostname:port" string and returns a socket address.
*
* @return InetSocketAddress object for accessing host and Port
*/
public static InetSocketAddress parseHostPortAddress(String hostPort) {
URL url = validateHostPort(hostPort);
return new InetSocketAddress(url.getHost(), url.getPort());
}
// ------------------------------------------------------------------------
// Lookup of to free ports
// ------------------------------------------------------------------------
/**
* Find a non-occupied port.
*
* @return A non-occupied port.
*/
public static int getAvailablePort() {
for (int i = 0; i < 50; i++) {
try (ServerSocket serverSocket = new ServerSocket(0)) {
int port = serverSocket.getLocalPort();
if (port != 0) {
return port;
}
}
catch (IOException ignored) {}
}
throw new RuntimeException("Could not find a free permitted port on the machine.");
}
// ------------------------------------------------------------------------
// Encoding of IP addresses for URLs
// ------------------------------------------------------------------------
/**
* Returns an address in a normalized format for Akka.
* When an IPv6 address is specified, it normalizes the IPv6 address to avoid
* complications with the exact URL match policy of Akka.
* @param host The hostname, IPv4 or IPv6 address
* @return host which will be normalized if it is an IPv6 address
*/
public static String unresolvedHostToNormalizedString(String host) {
// Return loopback interface address if host is null
// This represents the behavior of {@code InetAddress.getByName } and RFC 3330
if (host == null) {
host = InetAddress.getLoopbackAddress().getHostAddress();
} else {
host = host.trim().toLowerCase();
if (host.startsWith("[") && host.endsWith("]")) {
String address = host.substring(1, host.length() - 1);
if (IPAddressUtil.isIPv6LiteralAddress(address)) {
host = address;
}
}
}
// normalize and valid address
if (IPAddressUtil.isIPv6LiteralAddress(host)) {
byte[] ipV6Address = IPAddressUtil.textToNumericFormatV6(host);
host = getIPv6UrlRepresentation(ipV6Address);
} else if (!IPAddressUtil.isIPv4LiteralAddress(host)) {
try {
// We don't allow these in hostnames
Preconditions.checkArgument(!host.startsWith("."));
Preconditions.checkArgument(!host.endsWith("."));
Preconditions.checkArgument(!host.contains(":"));
} catch (Exception e) {
throw new IllegalConfigurationException("The configured hostname is not valid", e);
}
}
return host;
}
/**
* Returns a valid address for Akka. It returns a String of format 'host:port'.
* When an IPv6 address is specified, it normalizes the IPv6 address to avoid
* complications with the exact URL match policy of Akka.
* @param host The hostname, IPv4 or IPv6 address
* @param port The port
* @return host:port where host will be normalized if it is an IPv6 address
*/
public static String unresolvedHostAndPortToNormalizedString(String host, int port) {
Preconditions.checkArgument(port >= 0 && port < 65536,
"Port is not within the valid range,");
return unresolvedHostToNormalizedString(host) + ":" + port;
}
/**
* Encodes an IP address properly as a URL string. This method makes sure that IPv6 addresses
* have the proper formatting to be included in URLs.
*
* @param address The IP address to encode.
* @return The proper URL string encoded IP address.
*/
public static String ipAddressToUrlString(InetAddress address) {
if (address == null) {
throw new NullPointerException("address is null");
}
else if (address instanceof Inet4Address) {
return address.getHostAddress();
}
else if (address instanceof Inet6Address) {
return getIPv6UrlRepresentation((Inet6Address) address);
}
else {
throw new IllegalArgumentException("Unrecognized type of InetAddress: " + address);
}
}
/**
* Encodes an IP address and port to be included in URL. in particular, this method makes
* sure that IPv6 addresses have the proper formatting to be included in URLs.
*
* @param address The address to be included in the URL.
* @param port The port for the URL address.
* @return The proper URL string encoded IP address and port.
*/
public static String ipAddressAndPortToUrlString(InetAddress address, int port) {
return ipAddressToUrlString(address) + ':' + port;
}
/**
* Encodes an IP address and port to be included in URL. in particular, this method makes
* sure that IPv6 addresses have the proper formatting to be included in URLs.
*
* @param address The socket address with the IP address and port.
* @return The proper URL string encoded IP address and port.
*/
public static String socketAddressToUrlString(InetSocketAddress address) {
if (address.isUnresolved()) {
throw new IllegalArgumentException("Address cannot be resolved: " + address.getHostString());
}
return ipAddressAndPortToUrlString(address.getAddress(), address.getPort());
}
/**
* Normalizes and encodes a hostname and port to be included in URL.
* In particular, this method makes sure that IPv6 address literals have the proper
* formatting to be included in URLs.
*
* @param host The address to be included in the URL.
* @param port The port for the URL address.
* @return The proper URL string encoded IP address and port.
* @throws java.net.UnknownHostException Thrown, if the hostname cannot be translated into a URL.
*/
public static String hostAndPortToUrlString(String host, int port) throws UnknownHostException {
return ipAddressAndPortToUrlString(InetAddress.getByName(host), port);
}
/**
* Creates a compressed URL style representation of an Inet6Address.
*
* <p>This method copies and adopts code from Google's Guava library.
* We re-implement this here in order to reduce dependency on Guava.
* The Guava library has frequently caused dependency conflicts in the past.
*/
private static String getIPv6UrlRepresentation(Inet6Address address) {
return getIPv6UrlRepresentation(address.getAddress());
}
/**
* Creates a compressed URL style representation of an Inet6Address.
*
* <p>This method copies and adopts code from Google's Guava library.
* We re-implement this here in order to reduce dependency on Guava.
* The Guava library has frequently caused dependency conflicts in the past.
*/
private static String getIPv6UrlRepresentation(byte[] addressBytes) {
// first, convert bytes to 16 bit chunks
int[] hextets = new int[8];
for (int i = 0; i < hextets.length; i++) {
hextets[i] = (addressBytes[2 * i] & 0xFF) << 8 | (addressBytes[2 * i + 1] & 0xFF);
}
// now, find the sequence of zeros that should be compressed
int bestRunStart = -1;
int bestRunLength = -1;
int runStart = -1;
for (int i = 0; i < hextets.length + 1; i++) {
if (i < hextets.length && hextets[i] == 0) {
if (runStart < 0) {
runStart = i;
}
} else if (runStart >= 0) {
int runLength = i - runStart;
if (runLength > bestRunLength) {
bestRunStart = runStart;
bestRunLength = runLength;
}
runStart = -1;
}
}
if (bestRunLength >= 2) {
Arrays.fill(hextets, bestRunStart, bestRunStart + bestRunLength, -1);
}
// convert into text form
StringBuilder buf = new StringBuilder(40);
buf.append('[');
boolean lastWasNumber = false;
for (int i = 0; i < hextets.length; i++) {
boolean thisIsNumber = hextets[i] >= 0;
if (thisIsNumber) {
if (lastWasNumber) {
buf.append(':');
}
buf.append(Integer.toHexString(hextets[i]));
} else {
if (i == 0 || lastWasNumber) {
buf.append("::");
}
}
lastWasNumber = thisIsNumber;
}
buf.append(']');
return buf.toString();
}
// ------------------------------------------------------------------------
// Port range parsing
// ------------------------------------------------------------------------
/**
* Returns an iterator over available ports defined by the range definition.
*
* @param rangeDefinition String describing a single port, a range of ports or multiple ranges.
* @return Set of ports from the range definition
* @throws NumberFormatException If an invalid string is passed.
*/
public static Iterator<Integer> getPortRangeFromString(String rangeDefinition) throws NumberFormatException {
final String[] ranges = rangeDefinition.trim().split(",");
UnionIterator<Integer> iterators = new UnionIterator<>();
for (String rawRange: ranges) {
Iterator<Integer> rangeIterator;
String range = rawRange.trim();
int dashIdx = range.indexOf('-');
if (dashIdx == -1) {
// only one port in range:
final int port = Integer.valueOf(range);
if (port < 0 || port > 65535) {
throw new IllegalConfigurationException("Invalid port configuration. Port must be between 0" +
"and 65535, but was " + port + ".");
}
rangeIterator = Collections.singleton(Integer.valueOf(range)).iterator();
} else {
// evaluate range
final int start = Integer.valueOf(range.substring(0, dashIdx));
if (start < 0 || start > 65535) {
throw new IllegalConfigurationException("Invalid port configuration. Port must be between 0" +
"and 65535, but was " + start + ".");
}
final int end = Integer.valueOf(range.substring(dashIdx + 1, range.length()));
if (end < 0 || end > 65535) {
throw new IllegalConfigurationException("Invalid port configuration. Port must be between 0" +
"and 65535, but was " + end + ".");
}
rangeIterator = new Iterator<Integer>() {
int i = start;
@Override
public boolean hasNext() {
return i <= end;
}
@Override
public Integer next() {
return i++;
}
@Override
public void remove() {
throw new UnsupportedOperationException("Remove not supported");
}
};
}
iterators.add(rangeIterator);
}
return iterators;
}
/**
* Tries to allocate a socket from the given sets of ports.
*
* @param portsIterator A set of ports to choose from.
* @param factory A factory for creating the SocketServer
* @return null if no port was available or an allocated socket.
*/
public static ServerSocket createSocketFromPorts(Iterator<Integer> portsIterator, SocketFactory factory) {
while (portsIterator.hasNext()) {
int port = portsIterator.next();
LOG.debug("Trying to open socket on port {}", port);
try {
return factory.createSocket(port);
} catch (IOException | IllegalArgumentException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Unable to allocate socket on port", e);
} else {
LOG.info("Unable to allocate on port {}, due to error: {}", port, e.getMessage());
}
}
}
return null;
}
/**
* Returns the wildcard address to listen on all interfaces.
* @return Either 0.0.0.0 or :: depending on the IP setup.
*/
public static String getWildcardIPAddress() {
return WILDCARD_ADDRESS;
}
/**
* A factory for a local socket from port number.
*/
@FunctionalInterface
public interface SocketFactory {
ServerSocket createSocket(int port) throws IOException;
}
}