From e12da9f1b2542fb1d5f862e1894c16ea834addcb Mon Sep 17 00:00:00 2001 From: Rick Cox Date: Thu, 26 Jan 2017 14:55:23 -0800 Subject: [PATCH 1/2] [FLINK-5669] Change DataStreamUtils to use the loopback address (127.0.0.1) with local environments. Using loopback rather than the "local address" allows tests to run in situations where the local machine's hostname may not be resolvable in DNS (because DNS is unreacable or the hostname is not found) or the hostname does resolve, but not to an IP address that is reachable. --- .../apache/flink/contrib/streaming/DataStreamUtils.java | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java index d4ef9eec726cd..8b2bdd2b667ae 100644 --- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java +++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java @@ -27,7 +27,6 @@ import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; -import java.net.UnknownHostException; import java.util.Iterator; public final class DataStreamUtils { @@ -58,12 +57,7 @@ public static Iterator collect(DataStream stream) throws IOExcep "receive back data from the streaming program.", e); } } else { - try { - clientAddress = InetAddress.getLocalHost(); - } catch (UnknownHostException e) { - throw new IOException("Could not determine this machines own local address to " + - "receive back data from the streaming program.", e); - } + clientAddress = InetAddress.getLoopbackAddress(); } DataStreamSink sink = stream.addSink(new CollectSink(clientAddress, iter.getPort(), serializer)); From b72c6890a6159d4549455e2316a12b2fd47bf003 Mon Sep 17 00:00:00 2001 From: Rick Cox Date: Tue, 31 Jan 2017 11:09:23 -0800 Subject: [PATCH 2/2] [FLINK-5669] Use loopback address only if the environment is a LocalStreamEnvironment. Tested with mini-cluster (online and offline), standalone cluster (after reproducing expected failure with earlier version that used loopback address for that case). --- .../flink/contrib/streaming/DataStreamUtils.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java index 8b2bdd2b667ae..2987597479c3d 100644 --- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java +++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java @@ -21,12 +21,14 @@ import org.apache.flink.runtime.net.ConnectionUtils; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.UnknownHostException; import java.util.Iterator; public final class DataStreamUtils { @@ -56,8 +58,15 @@ public static Iterator collect(DataStream stream) throws IOExcep throw new IOException("Could not determine an suitable network address to " + "receive back data from the streaming program.", e); } - } else { + } else if (env instanceof LocalStreamEnvironment) { clientAddress = InetAddress.getLoopbackAddress(); + } else { + try { + clientAddress = InetAddress.getLocalHost(); + } catch (UnknownHostException e) { + throw new IOException("Could not determine this machines own local address to " + + "receive back data from the streaming program.", e); + } } DataStreamSink sink = stream.addSink(new CollectSink(clientAddress, iter.getPort(), serializer));