From 3bc377b6050fb6f4489d8e4a161afeae12e1a6c9 Mon Sep 17 00:00:00 2001 From: Piotr Nowojski Date: Tue, 1 Aug 2017 18:11:27 +0200 Subject: [PATCH 1/4] [FLINK-7343][utils] Add network proxy utility to simulate network failures --- .../networking/NetworkFailureHandler.java | 178 ++++++++++++++++++ .../networking/NetworkFailuresProxy.java | 125 ++++++++++++ .../apache/flink/networking/EchoServer.java | 113 +++++++++++ .../networking/NetworkFailuresProxyTest.java | 124 ++++++++++++ 4 files changed, 540 insertions(+) create mode 100644 flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailureHandler.java create mode 100644 flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailuresProxy.java create mode 100644 flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/networking/EchoServer.java create mode 100644 flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/networking/NetworkFailuresProxyTest.java diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailureHandler.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailureHandler.java new file mode 100644 index 0000000000000..0ce0b12559058 --- /dev/null +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailureHandler.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.networking; + +import org.jboss.netty.bootstrap.ClientBootstrap; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelFutureListener; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ChannelStateEvent; +import org.jboss.netty.channel.ExceptionEvent; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.SimpleChannelUpstreamHandler; +import org.jboss.netty.channel.socket.ClientSocketChannelFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; + +/** + * Handler that is forwarding inbound traffic from the source channel to the target channel on remoteHost:remotePort + * and the responses in the opposite direction. All of the network traffic can be blocked at any time using blocked + * flag. + */ +class NetworkFailureHandler extends SimpleChannelUpstreamHandler { + private static final Logger LOG = LoggerFactory.getLogger(NetworkFailureHandler.class); + private static final String TARGET_CHANNEL_HANDLER_NAME = "target_channel_handler"; + + // mapping between source and target channels, used for finding correct target channel to use for given source. + private final Map sourceToTargetChannels = new ConcurrentHashMap<>(); + private final Consumer onClose; + private final ClientSocketChannelFactory channelFactory; + private final String remoteHost; + private final int remotePort; + + private final AtomicBoolean blocked; + + public NetworkFailureHandler( + AtomicBoolean blocked, + Consumer onClose, + ClientSocketChannelFactory channelFactory, + String remoteHost, + int remotePort) { + this.blocked = blocked; + this.onClose = onClose; + this.channelFactory = channelFactory; + this.remoteHost = remoteHost; + this.remotePort = remotePort; + } + + /** + * Closes the specified channel after all queued write requests are flushed. + */ + static void closeOnFlush(Channel channel) { + if (channel.isConnected()) { + channel.write(ChannelBuffers.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); + } + } + + public void closeConnections() { + for (Map.Entry entry : sourceToTargetChannels.entrySet()) { + // target channel is closed on source's channel channelClosed even + entry.getKey().close(); + } + } + + @Override + public void channelOpen(ChannelHandlerContext context, ChannelStateEvent event) throws Exception { + // Suspend incoming traffic until connected to the remote host. + final Channel sourceChannel = event.getChannel(); + sourceChannel.setReadable(false); + + if (blocked.get()) { + sourceChannel.close(); + return; + } + + // Start the connection attempt. + ClientBootstrap targetConnectionBootstrap = new ClientBootstrap(channelFactory); + targetConnectionBootstrap.getPipeline().addLast(TARGET_CHANNEL_HANDLER_NAME, new TargetChannelHandler(event.getChannel(), blocked)); + ChannelFuture connectFuture = targetConnectionBootstrap.connect(new InetSocketAddress(remoteHost, remotePort)); + sourceToTargetChannels.put(sourceChannel, connectFuture.getChannel()); + + connectFuture.addListener(future -> { + if (future.isSuccess()) { + // Connection attempt succeeded: + // Begin to accept incoming traffic. + sourceChannel.setReadable(true); + } else { + // Close the connection if the connection attempt has failed. + sourceChannel.close(); + } + }); + } + + @Override + public void messageReceived(ChannelHandlerContext context, MessageEvent event) throws Exception { + if (blocked.get()) { + return; + } + + ChannelBuffer msg = (ChannelBuffer) event.getMessage(); + Channel targetChannel = sourceToTargetChannels.get(event.getChannel()); + if (targetChannel == null) { + throw new IllegalStateException("Could not find a target channel for the source channel"); + } + targetChannel.write(msg); + } + + @Override + public void channelClosed(ChannelHandlerContext context, ChannelStateEvent event) throws Exception { + Channel targetChannel = sourceToTargetChannels.get(event.getChannel()); + if (targetChannel == null) { + return; + } + closeOnFlush(targetChannel); + sourceToTargetChannels.remove(event.getChannel()); + onClose.accept(this); + } + + @Override + public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event) throws Exception { + LOG.error("Closing communication channel because of an exception", event.getCause()); + closeOnFlush(event.getChannel()); + } + + private static class TargetChannelHandler extends SimpleChannelUpstreamHandler { + private final Channel sourceChannel; + private final AtomicBoolean blocked; + + TargetChannelHandler(Channel sourceChannel, AtomicBoolean blocked) { + this.sourceChannel = sourceChannel; + this.blocked = blocked; + } + + @Override + public void messageReceived(ChannelHandlerContext context, MessageEvent event) throws Exception { + if (blocked.get()) { + return; + } + ChannelBuffer msg = (ChannelBuffer) event.getMessage(); + sourceChannel.write(msg); + } + + @Override + public void channelClosed(ChannelHandlerContext context, ChannelStateEvent event) throws Exception { + closeOnFlush(sourceChannel); + } + + @Override + public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event) throws Exception { + LOG.error("Closing communication channel because of an exception", event.getCause()); + closeOnFlush(event.getChannel()); + } + } +} diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailuresProxy.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailuresProxy.java new file mode 100644 index 0000000000000..70300494a7e0e --- /dev/null +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailuresProxy.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.networking; + +import org.jboss.netty.bootstrap.ServerBootstrap; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ChannelPipelineFactory; +import org.jboss.netty.channel.Channels; +import org.jboss.netty.channel.socket.ClientSocketChannelFactory; +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; +import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * This class acts as a network proxy - listening on local port and forwarding all of the network to the remote + * host/port. It allows to simulate a network failures in the communication. + */ +public class NetworkFailuresProxy implements AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(NetworkFailuresProxy.class); + private static final String NETWORK_FAILURE_HANDLER_NAME = "network_failure_handler"; + + private final Executor executor = Executors.newCachedThreadPool(); + private final ServerBootstrap serverBootstrap; + private final Channel channel; + private final AtomicBoolean blocked = new AtomicBoolean(); + // collection of networkFailureHandlers so that we can call {@link NetworkFailureHandler.closeConnections} on them. + private final Set networkFailureHandlers = Collections.newSetFromMap(new ConcurrentHashMap<>()); + + public NetworkFailuresProxy(int localPort, String remoteHost, int remotePort) { + LOG.info("Proxying [*:{}] to [{}:{}]", localPort, remoteHost, remotePort); + + // Configure the bootstrap. + serverBootstrap = new ServerBootstrap( + new NioServerSocketChannelFactory(executor, executor)); + + // Set up the event pipeline factory. + ClientSocketChannelFactory channelFactory = new NioClientSocketChannelFactory(executor, executor); + serverBootstrap.setOption("child.tcpNoDelay", true); + serverBootstrap.setOption("child.keepAlive", true); + serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() { + public ChannelPipeline getPipeline() throws Exception { + ChannelPipeline pipeline = Channels.pipeline(); + + // synchronized for a race between blocking and creating new handlers + synchronized (networkFailureHandlers) { + NetworkFailureHandler failureHandler = new NetworkFailureHandler( + blocked, + networkFailureHandler -> networkFailureHandlers.remove(networkFailureHandler), + channelFactory, + remoteHost, + remotePort); + networkFailureHandlers.add(failureHandler); + pipeline.addLast(NETWORK_FAILURE_HANDLER_NAME, failureHandler); + } + return pipeline; + } + }); + channel = serverBootstrap.bind(new InetSocketAddress(localPort)); + + } + + /** + * @return local port on which {@link NetworkFailuresProxy} is listening. + */ + public int getLocalPort() { + return ((InetSocketAddress) channel.getLocalAddress()).getPort(); + } + + /** + * Blocks all ongoing traffic, closes all ongoing and closes any new incoming connections. + */ + public void blockTraffic() { + setTrafficBlocked(true); + } + + /** + * Resumes normal communication. + */ + public void unblockTraffic() { + setTrafficBlocked(false); + } + + @Override + public void close() throws Exception { + channel.close(); + } + + private void setTrafficBlocked(boolean blocked) { + this.blocked.set(blocked); + if (blocked) { + // synchronized for a race between blocking and creating new handlers + synchronized (networkFailureHandlers) { + for (NetworkFailureHandler failureHandler : networkFailureHandlers) { + failureHandler.closeConnections(); + } + } + } + } +} diff --git a/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/networking/EchoServer.java b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/networking/EchoServer.java new file mode 100644 index 0000000000000..06e77ea58d4c0 --- /dev/null +++ b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/networking/EchoServer.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.networking; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.PrintWriter; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * TCP EchoServer for test purposes. + */ +public class EchoServer extends Thread implements AutoCloseable { + private final ServerSocket serverSocket = new ServerSocket(0); + private final int socketTimeout; + private final List workerThreads = Collections.synchronizedList(new ArrayList<>()); + + private volatile boolean close = false; + private Exception threadException; + + public EchoServer(int socketTimeout) throws IOException { + serverSocket.setSoTimeout(socketTimeout); + this.socketTimeout = socketTimeout; + } + + public int getLocalPort() { + return serverSocket.getLocalPort(); + } + + @Override + public void run() { + while (!close) { + try { + EchoWorkerThread thread = new EchoWorkerThread(serverSocket.accept(), socketTimeout); + thread.start(); + } catch (IOException e) { + threadException = e; + } + } + } + + @Override + public void close() throws Exception { + for (EchoWorkerThread thread : workerThreads) { + thread.close(); + thread.join(); + } + close = true; + if (threadException != null) { + throw threadException; + } + serverSocket.close(); + this.join(); + } + + private static class EchoWorkerThread extends Thread implements AutoCloseable { + private final PrintWriter output; + private final BufferedReader input; + + private volatile boolean close; + private Exception threadException; + + public EchoWorkerThread(Socket clientSocket, int socketTimeout) throws IOException { + output = new PrintWriter(clientSocket.getOutputStream(), true); + input = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); + clientSocket.setSoTimeout(socketTimeout); + } + + @Override + public void run() { + try { + String inputLine; + while (!close && (inputLine = input.readLine()) != null) { + output.println(inputLine); + } + } catch (IOException e) { + threadException = e; + } + } + + @Override + public void close() throws Exception { + close = true; + if (threadException != null) { + throw threadException; + } + input.close(); + output.close(); + this.join(); + } + } +} diff --git a/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/networking/NetworkFailuresProxyTest.java b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/networking/NetworkFailuresProxyTest.java new file mode 100644 index 0000000000000..00468687a2d78 --- /dev/null +++ b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/networking/NetworkFailuresProxyTest.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.networking; + +import org.junit.Test; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.PrintWriter; +import java.net.Socket; +import java.net.SocketException; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for NetworkFailuresProxy. + */ +public class NetworkFailuresProxyTest { + public static final int SOCKET_TIMEOUT = 500_000; + + @Test + public void testProxy() throws Exception { + try ( + EchoServer echoServer = new EchoServer(SOCKET_TIMEOUT); + NetworkFailuresProxy proxy = new NetworkFailuresProxy(0, "localhost", echoServer.getLocalPort()); + EchoClient echoClient = new EchoClient("localhost", proxy.getLocalPort(), SOCKET_TIMEOUT)) { + echoServer.start(); + + assertEquals("42", echoClient.write("42")); + assertEquals("Ala ma kota!", echoClient.write("Ala ma kota!")); + } + } + + @Test + public void testMultipleConnections() throws Exception { + try ( + EchoServer echoServer = new EchoServer(SOCKET_TIMEOUT); + NetworkFailuresProxy proxy = new NetworkFailuresProxy(0, "localhost", echoServer.getLocalPort()); + EchoClient echoClient1 = new EchoClient("localhost", proxy.getLocalPort(), SOCKET_TIMEOUT); + EchoClient echoClient2 = new EchoClient("localhost", proxy.getLocalPort(), SOCKET_TIMEOUT)) { + echoServer.start(); + + assertEquals("42", echoClient1.write("42")); + assertEquals("Ala ma kota!", echoClient2.write("Ala ma kota!")); + assertEquals("Ala hat eine Katze!", echoClient1.write("Ala hat eine Katze!")); + } + } + + @Test + public void testBlockTraffic() throws Exception { + try ( + EchoServer echoServer = new EchoServer(SOCKET_TIMEOUT); + NetworkFailuresProxy proxy = new NetworkFailuresProxy(0, "localhost", echoServer.getLocalPort())) { + echoServer.start(); + + try (EchoClient echoClient = new EchoClient("localhost", proxy.getLocalPort(), SOCKET_TIMEOUT)) { + assertEquals("42", echoClient.write("42")); + proxy.blockTraffic(); + try { + echoClient.write("Ala ma kota!"); + } catch (SocketException ex) { + assertEquals("Connection reset", ex.getMessage()); + } + } + + try (EchoClient echoClient = new EchoClient("localhost", proxy.getLocalPort(), SOCKET_TIMEOUT)) { + assertEquals(null, echoClient.write("42")); + } catch (SocketException ex) { + assertEquals("Connection reset", ex.getMessage()); + } + + proxy.unblockTraffic(); + try (EchoClient echoClient = new EchoClient("localhost", proxy.getLocalPort(), SOCKET_TIMEOUT)) { + assertEquals("42", echoClient.write("42")); + assertEquals("Ala ma kota!", echoClient.write("Ala ma kota!")); + } + } + } + + /** + * Simple echo client that sends a message over the network and waits for the answer. + */ + public static class EchoClient implements AutoCloseable { + private final Socket socket; + private final PrintWriter output; + private final BufferedReader input; + + public EchoClient(String hostName, int portNumber, int socketTimeout) throws IOException { + socket = new Socket(hostName, portNumber); + socket.setSoTimeout(socketTimeout); + output = new PrintWriter(socket.getOutputStream(), true); + input = new BufferedReader(new InputStreamReader(socket.getInputStream())); + } + + public String write(String message) throws IOException { + output.println(message); + return input.readLine(); + } + + @Override + public void close() throws Exception { + input.close(); + output.close(); + socket.close(); + } + } +} From efb2cef28f2e356631bce52fc3e26114d9f5b62f Mon Sep 17 00:00:00 2001 From: Piotr Nowojski Date: Mon, 7 Aug 2017 15:53:35 +0200 Subject: [PATCH 2/4] [hotfix][Kafka] Refactor properties for KafkaTestEnvironment setup --- .../kafka/KafkaTestEnvironmentImpl.java | 33 ++++++------- .../kafka/KafkaTestEnvironmentImpl.java | 19 +++---- .../kafka/KafkaTestEnvironmentImpl.java | 35 ++++++------- .../kafka/KafkaShortRetentionTestBase.java | 2 +- .../connectors/kafka/KafkaTestBase.java | 2 +- .../kafka/KafkaTestEnvironment.java | 49 +++++++++++++++++-- 6 files changed, 88 insertions(+), 52 deletions(-) diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index d3b45a9312541..9f1d37989c306 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -79,8 +79,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { private String zookeeperConnectionString; private String brokerConnectionString = ""; private Properties standardProps; - private Properties additionalServerProperties; - private boolean secureMode = false; + private Config config; // 6 seconds is default. Seems to be too small for travis. 30 seconds private int zkTimeout = 30000; @@ -96,7 +95,7 @@ public Properties getStandardProperties() { @Override public Properties getSecureProperties() { Properties prop = new Properties(); - if (secureMode) { + if (config.isSecureMode()) { prop.put("security.inter.broker.protocol", "SASL_PLAINTEXT"); prop.put("security.protocol", "SASL_PLAINTEXT"); prop.put("sasl.kerberos.service.name", "kafka"); @@ -215,26 +214,24 @@ public boolean isSecureRunSupported() { } @Override - public void prepare(int numKafkaServers, Properties additionalServerProperties, boolean secureMode) { + public void prepare(Config config) { //increase the timeout since in Travis ZK connection takes long time for secure connection. - if (secureMode) { + if (config.isSecureMode()) { //run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout - numKafkaServers = 1; + config.setKafkaServersNumber(1); zkTimeout = zkTimeout * 15; } + this.config = config; - this.additionalServerProperties = additionalServerProperties; - this.secureMode = secureMode; File tempDir = new File(System.getProperty("java.io.tmpdir")); - tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString())); assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs()); tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir-" + (UUID.randomUUID().toString())); assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs()); - tmpKafkaDirs = new ArrayList<>(numKafkaServers); - for (int i = 0; i < numKafkaServers; i++) { + tmpKafkaDirs = new ArrayList<>(config.getKafkaServersNumber()); + for (int i = 0; i < config.getKafkaServersNumber(); i++) { File tmpDir = new File(tmpKafkaParent, "server-" + i); assertTrue("cannot create kafka temp dir", tmpDir.mkdir()); tmpKafkaDirs.add(tmpDir); @@ -249,12 +246,12 @@ public void prepare(int numKafkaServers, Properties additionalServerProperties, LOG.info("Starting Zookeeper with zookeeperConnectionString: {}", zookeeperConnectionString); LOG.info("Starting KafkaServer"); - brokers = new ArrayList<>(numKafkaServers); + brokers = new ArrayList<>(config.getKafkaServersNumber()); - for (int i = 0; i < numKafkaServers; i++) { + for (int i = 0; i < config.getKafkaServersNumber(); i++) { brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i))); - if (secureMode) { + if (config.isSecureMode()) { brokerConnectionString += hostAndPortToUrlString( KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort( @@ -347,7 +344,7 @@ public void createTestTopic(String topic, int numberOfPartitions, int replicatio final long deadline = System.nanoTime() + 30_000_000_000L; do { try { - if (secureMode) { + if (config.isSecureMode()) { //increase wait time since in Travis ZK timeout occurs frequently int wait = zkTimeout / 100; LOG.info("waiting for {} msecs before the topic {} can be checked", wait, topic); @@ -407,8 +404,8 @@ protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws Except // for CI stability, increase zookeeper session timeout kafkaProperties.put("zookeeper.session.timeout.ms", zkTimeout); kafkaProperties.put("zookeeper.connection.timeout.ms", zkTimeout); - if (additionalServerProperties != null) { - kafkaProperties.putAll(additionalServerProperties); + if (config.getKafkaServerProperties() != null) { + kafkaProperties.putAll(config.getKafkaServerProperties()); } final int numTries = 5; @@ -418,7 +415,7 @@ protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws Except kafkaProperties.put("port", Integer.toString(kafkaPort)); //to support secure kafka cluster - if (secureMode) { + if (config.isSecureMode()) { LOG.info("Adding Kafka secure configurations"); kafkaProperties.put("listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort); kafkaProperties.put("advertised.listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort); diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index ab976e117b40d..af5ad67c30d14 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -84,7 +84,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { private String zookeeperConnectionString; private String brokerConnectionString = ""; private Properties standardProps; - private Properties additionalServerProperties; + + private Config config; public String getBrokerConnectionString() { return brokerConnectionString; @@ -206,8 +207,8 @@ public boolean isSecureRunSupported() { } @Override - public void prepare(int numKafkaServers, Properties additionalServerProperties, boolean secureMode) { - this.additionalServerProperties = additionalServerProperties; + public void prepare(Config config) { + this.config = config; File tempDir = new File(System.getProperty("java.io.tmpdir")); tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString())); @@ -224,8 +225,8 @@ public void prepare(int numKafkaServers, Properties additionalServerProperties, fail("cannot create kafka temp dir: " + e.getMessage()); } - tmpKafkaDirs = new ArrayList<>(numKafkaServers); - for (int i = 0; i < numKafkaServers; i++) { + tmpKafkaDirs = new ArrayList<>(config.getKafkaServersNumber()); + for (int i = 0; i < config.getKafkaServersNumber(); i++) { File tmpDir = new File(tmpKafkaParent, "server-" + i); assertTrue("cannot create kafka temp dir", tmpDir.mkdir()); tmpKafkaDirs.add(tmpDir); @@ -240,9 +241,9 @@ public void prepare(int numKafkaServers, Properties additionalServerProperties, zookeeperConnectionString = zookeeper.getConnectString(); LOG.info("Starting KafkaServer"); - brokers = new ArrayList<>(numKafkaServers); + brokers = new ArrayList<>(config.getKafkaServersNumber()); - for (int i = 0; i < numKafkaServers; i++) { + for (int i = 0; i < config.getKafkaServersNumber(); i++) { brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i))); SocketServer socketServer = brokers.get(i).socketServer(); @@ -391,8 +392,8 @@ protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws Except // for CI stability, increase zookeeper session timeout kafkaProperties.put("zookeeper.session.timeout.ms", "30000"); kafkaProperties.put("zookeeper.connection.timeout.ms", "30000"); - if (additionalServerProperties != null) { - kafkaProperties.putAll(additionalServerProperties); + if (config.getKafkaServerProperties() != null) { + kafkaProperties.putAll(config.getKafkaServerProperties()); } final int numTries = 5; diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index df954203995f5..517f0969b4c51 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -78,11 +78,11 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { private String zookeeperConnectionString; private String brokerConnectionString = ""; private Properties standardProps; - private Properties additionalServerProperties; - private boolean secureMode = false; // 6 seconds is default. Seems to be too small for travis. 30 seconds private String zkTimeout = "30000"; + private Config config; + public String getBrokerConnectionString() { return brokerConnectionString; } @@ -200,27 +200,24 @@ public boolean isSecureRunSupported() { } @Override - public void prepare(int numKafkaServers, Properties additionalServerProperties, boolean secureMode) { - + public void prepare(Config config) { //increase the timeout since in Travis ZK connection takes long time for secure connection. - if (secureMode) { + if (config.isSecureMode()) { //run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout - numKafkaServers = 1; + config.setKafkaServersNumber(1); zkTimeout = String.valueOf(Integer.parseInt(zkTimeout) * 15); } + this.config = config; - this.additionalServerProperties = additionalServerProperties; - this.secureMode = secureMode; File tempDir = new File(System.getProperty("java.io.tmpdir")); - tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString())); assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs()); tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir-" + (UUID.randomUUID().toString())); assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs()); - tmpKafkaDirs = new ArrayList<>(numKafkaServers); - for (int i = 0; i < numKafkaServers; i++) { + tmpKafkaDirs = new ArrayList<>(config.getKafkaServersNumber()); + for (int i = 0; i < config.getKafkaServersNumber(); i++) { File tmpDir = new File(tmpKafkaParent, "server-" + i); assertTrue("cannot create kafka temp dir", tmpDir.mkdir()); tmpKafkaDirs.add(tmpDir); @@ -236,13 +233,13 @@ public void prepare(int numKafkaServers, Properties additionalServerProperties, LOG.info("zookeeperConnectionString: {}", zookeeperConnectionString); LOG.info("Starting KafkaServer"); - brokers = new ArrayList<>(numKafkaServers); + brokers = new ArrayList<>(config.getKafkaServersNumber()); - for (int i = 0; i < numKafkaServers; i++) { + for (int i = 0; i < config.getKafkaServersNumber(); i++) { brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i))); SocketServer socketServer = brokers.get(i).socketServer(); - if (secureMode) { + if (this.config.isSecureMode()) { brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.SASL_PLAINTEXT)) + ","; } else { brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ","; @@ -335,7 +332,7 @@ public void createTestTopic(String topic, int numberOfPartitions, int replicatio final long deadline = System.nanoTime() + Integer.parseInt(zkTimeout) * 1_000_000L; do { try { - if (secureMode) { + if (config.isSecureMode()) { //increase wait time since in Travis ZK timeout occurs frequently int wait = Integer.parseInt(zkTimeout) / 100; LOG.info("waiting for {} msecs before the topic {} can be checked", wait, topic); @@ -400,8 +397,8 @@ protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws Except // for CI stability, increase zookeeper session timeout kafkaProperties.put("zookeeper.session.timeout.ms", zkTimeout); kafkaProperties.put("zookeeper.connection.timeout.ms", zkTimeout); - if (additionalServerProperties != null) { - kafkaProperties.putAll(additionalServerProperties); + if (config.getKafkaServerProperties() != null) { + kafkaProperties.putAll(config.getKafkaServerProperties()); } final int numTries = 5; @@ -411,7 +408,7 @@ protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws Except kafkaProperties.put("port", Integer.toString(kafkaPort)); //to support secure kafka cluster - if (secureMode) { + if (config.isSecureMode()) { LOG.info("Adding Kafka secure configurations"); kafkaProperties.put("listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort); kafkaProperties.put("advertised.listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort); @@ -442,7 +439,7 @@ protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws Except public Properties getSecureProperties() { Properties prop = new Properties(); - if (secureMode) { + if (config.isSecureMode()) { prop.put("security.inter.broker.protocol", "SASL_PLAINTEXT"); prop.put("security.protocol", "SASL_PLAINTEXT"); prop.put("sasl.kerberos.service.name", "kafka"); diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java index d5c9276244e8a..3163f52d1847a 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java @@ -98,7 +98,7 @@ public static void prepare() throws IOException, ClassNotFoundException { specificProperties.setProperty("log.retention.minutes", "0"); specificProperties.setProperty("log.retention.ms", "250"); specificProperties.setProperty("log.retention.check.interval.ms", "100"); - kafkaServer.prepare(1, specificProperties, false); + kafkaServer.prepare(kafkaServer.createConfig().setKafkaServerProperties(specificProperties)); standardProps = kafkaServer.getStandardProperties(); diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java index c484a4beefbba..8eb0351ce91e9 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java @@ -135,7 +135,7 @@ protected static void startClusters(boolean secureMode) throws ClassNotFoundExce LOG.info("Starting KafkaTestBase.prepare() for Kafka " + kafkaServer.getVersion()); - kafkaServer.prepare(NUMBER_OF_KAFKA_SERVERS, secureMode); + kafkaServer.prepare(kafkaServer.createConfig().setKafkaServersNumber(NUMBER_OF_KAFKA_SERVERS).setSecureMode(secureMode)); standardProps = kafkaServer.getStandardProperties(); diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java index 50eff23fe652d..ea292a9a48692 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java @@ -38,15 +38,56 @@ * Abstract class providing a Kafka test environment. */ public abstract class KafkaTestEnvironment { + /** + * Configuration class for {@link KafkaTestEnvironment}. + */ + public static class Config { + private int kafkaServersNumber = 1; + private Properties kafkaServerProperties = null; + private boolean secureMode = false; + + /** + * Please use {@link KafkaTestEnvironment#createConfig()} method. + */ + private Config() { + } + + public int getKafkaServersNumber() { + return kafkaServersNumber; + } + + public Config setKafkaServersNumber(int kafkaServersNumber) { + this.kafkaServersNumber = kafkaServersNumber; + return this; + } + + public Properties getKafkaServerProperties() { + return kafkaServerProperties; + } + + public Config setKafkaServerProperties(Properties kafkaServerProperties) { + this.kafkaServerProperties = kafkaServerProperties; + return this; + } + + public boolean isSecureMode() { + return secureMode; + } + + public Config setSecureMode(boolean secureMode) { + this.secureMode = secureMode; + return this; + } + } protected static final String KAFKA_HOST = "localhost"; - public abstract void prepare(int numKafkaServers, Properties kafkaServerProperties, boolean secureMode); - - public void prepare(int numberOfKafkaServers, boolean secureMode) { - this.prepare(numberOfKafkaServers, null, secureMode); + public static Config createConfig() { + return new Config(); } + public abstract void prepare(Config config); + public abstract void shutdown(); public abstract void deleteTestTopic(String topic); From 22c1cae2e1d05c535fef9752de2fcbdf159e534e Mon Sep 17 00:00:00 2001 From: Piotr Nowojski Date: Thu, 3 Aug 2017 11:27:12 +0200 Subject: [PATCH 3/4] [FLINK-7343][Kafka] Use NetworkFailureProxy in kafka tests We shouldn't fail KafkaServers directly, because they might not be able to flush the data. Since we don't want to test how well Kafka implements at-least-once/exactly-once semantic, we just simulate network failure between Flink and Kafka in our at-least-once tests. --- .../kafka/KafkaTestEnvironmentImpl.java | 6 +++ .../connectors/kafka/Kafka08ITCase.java | 7 +++ .../kafka/KafkaTestEnvironmentImpl.java | 8 ++- .../kafka/Kafka09SecuredRunITCase.java | 4 +- .../kafka/KafkaTestEnvironmentImpl.java | 6 +++ .../kafka/KafkaProducerTestBase.java | 49 +++++-------------- .../kafka/KafkaShortRetentionTestBase.java | 2 +- .../connectors/kafka/KafkaTestBase.java | 16 ++++-- .../kafka/KafkaTestEnvironment.java | 37 +++++++++++++- 9 files changed, 89 insertions(+), 46 deletions(-) diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index 9f1d37989c306..5be802f0dce06 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -17,6 +17,7 @@ package org.apache.flink.streaming.connectors.kafka; +import org.apache.flink.networking.NetworkFailuresProxy; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.operators.StreamSink; @@ -414,6 +415,11 @@ protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws Except int kafkaPort = NetUtils.getAvailablePort(); kafkaProperties.put("port", Integer.toString(kafkaPort)); + if (config.isHideKafkaBehindProxy()) { + NetworkFailuresProxy proxy = createProxy(KAFKA_HOST, kafkaPort); + kafkaProperties.put("advertised.port", proxy.getLocalPort()); + } + //to support secure kafka cluster if (config.isSecureMode()) { LOG.info("Adding Kafka secure configurations"); diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java index 91dc9298eb3fb..b3afa579aab6e 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java @@ -23,6 +23,7 @@ import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler; import org.apache.curator.framework.CuratorFramework; +import org.junit.BeforeClass; import org.junit.Test; import java.util.Properties; @@ -36,6 +37,12 @@ */ public class Kafka08ITCase extends KafkaConsumerTestBase { + @BeforeClass + public static void prepare() throws ClassNotFoundException { + // Somehow KafkaConsumer 0.8 doesn't handle broker failures if they are behind a proxy + prepare(false); + } + // ------------------------------------------------------------------------ // Suite of Tests // ------------------------------------------------------------------------ diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index af5ad67c30d14..eb1f57e149294 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -17,6 +17,7 @@ package org.apache.flink.streaming.connectors.kafka; +import org.apache.flink.networking.NetworkFailuresProxy; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.operators.StreamSink; @@ -84,7 +85,6 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { private String zookeeperConnectionString; private String brokerConnectionString = ""; private Properties standardProps; - private Config config; public String getBrokerConnectionString() { @@ -401,6 +401,12 @@ protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws Except for (int i = 1; i <= numTries; i++) { int kafkaPort = NetUtils.getAvailablePort(); kafkaProperties.put("port", Integer.toString(kafkaPort)); + + if (config.isHideKafkaBehindProxy()) { + NetworkFailuresProxy proxy = createProxy(KAFKA_HOST, kafkaPort); + kafkaProperties.put("advertised.port", Integer.toString(proxy.getLocalPort())); + } + KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties); try { diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java index d41cd91a65375..b4002c7a5f40c 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java @@ -41,11 +41,11 @@ public static void prepare() throws ClassNotFoundException { SecureTestEnvironment.prepare(tempFolder); SecureTestEnvironment.populateFlinkSecureConfigurations(getFlinkConfiguration()); - startClusters(true); + startClusters(true, false); } @AfterClass - public static void shutDownServices() { + public static void shutDownServices() throws Exception { shutdownClusters(); SecureTestEnvironment.cleanup(); } diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index 517f0969b4c51..676e5884060a5 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -17,6 +17,7 @@ package org.apache.flink.streaming.connectors.kafka; +import org.apache.flink.networking.NetworkFailuresProxy; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.operators.StreamSink; @@ -407,6 +408,11 @@ protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws Except int kafkaPort = NetUtils.getAvailablePort(); kafkaProperties.put("port", Integer.toString(kafkaPort)); + if (config.isHideKafkaBehindProxy()) { + NetworkFailuresProxy proxy = createProxy(KAFKA_HOST, kafkaPort); + kafkaProperties.put("advertised.port", proxy.getLocalPort()); + } + //to support secure kafka cluster if (config.isSecureMode()) { LOG.info("Adding Kafka secure configurations"); diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java index 1af9ca8c09942..4a611039af4bf 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java @@ -46,7 +46,6 @@ import org.apache.flink.util.Preconditions; import com.google.common.collect.ImmutableSet; -import kafka.server.KafkaServer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.Test; @@ -214,7 +213,8 @@ public void testOneToOneAtLeastOnceCustomOperator() throws Exception { /** * This test sets KafkaProducer so that it will not automatically flush the data and - * and fails the broker to check whether FlinkKafkaProducer flushed records manually on snapshotState. + * simulate network failure between Flink and Kafka to check whether FlinkKafkaProducer + * flushed records manually on snapshotState. */ protected void testOneToOneAtLeastOnce(boolean regularSink) throws Exception { final String topic = regularSink ? "oneToOneTopicRegularSink" : "oneToOneTopicCustomOperator"; @@ -243,13 +243,12 @@ protected void testOneToOneAtLeastOnce(boolean regularSink) throws Exception { properties.setProperty("batch.size", "10240000"); properties.setProperty("linger.ms", "10000"); - int leaderId = kafkaServer.getLeaderToShutDown(topic); - BrokerRestartingMapper.resetState(); + BrokerRestartingMapper.resetState(kafkaServer::blockProxyTraffic); // process exactly failAfterElements number of elements and then shutdown Kafka broker and fail application DataStream inputStream = env .fromCollection(getIntegersSequence(numElements)) - .map(new BrokerRestartingMapper(leaderId, failAfterElements)); + .map(new BrokerRestartingMapper<>(failAfterElements)); StreamSink kafkaSink = kafkaServer.getProducerSink(topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner() { @Override @@ -276,10 +275,10 @@ public int partition(Integer record, byte[] key, byte[] value, String targetTopi fail("Job should fail!"); } catch (JobExecutionException ex) { - assertEquals("Broker was shutdown!", ex.getCause().getMessage()); + // ignore error, it can be one of many errors so it would be hard to check the exception message/cause } - kafkaServer.restartBroker(leaderId); + kafkaServer.unblockProxyTraffic(); // assert that before failure we successfully snapshot/flushed all expected elements assertAtLeastOnceForTopic( @@ -438,22 +437,22 @@ private static class BrokerRestartingMapper extends RichMapFunction public static volatile boolean restartedLeaderBefore; public static volatile boolean hasBeenCheckpointedBeforeFailure; public static volatile int numElementsBeforeSnapshot; + public static volatile Runnable shutdownAction; - private final int shutdownBrokerId; private final int failCount; private int numElementsTotal; private boolean failer; private boolean hasBeenCheckpointed; - public static void resetState() { + public static void resetState(Runnable shutdownAction) { restartedLeaderBefore = false; hasBeenCheckpointedBeforeFailure = false; numElementsBeforeSnapshot = 0; + BrokerRestartingMapper.shutdownAction = shutdownAction; } - public BrokerRestartingMapper(int shutdownBrokerId, int failCount) { - this.shutdownBrokerId = shutdownBrokerId; + public BrokerRestartingMapper(int failCount) { this.failCount = failCount; } @@ -471,31 +470,9 @@ public T map(T value) throws Exception { if (failer && numElementsTotal >= failCount) { // shut down a Kafka broker - KafkaServer toShutDown = null; - for (KafkaServer server : kafkaServer.getBrokers()) { - - if (kafkaServer.getBrokerId(server) == shutdownBrokerId) { - toShutDown = server; - break; - } - } - - if (toShutDown == null) { - StringBuilder listOfBrokers = new StringBuilder(); - for (KafkaServer server : kafkaServer.getBrokers()) { - listOfBrokers.append(kafkaServer.getBrokerId(server)); - listOfBrokers.append(" ; "); - } - - throw new Exception("Cannot find broker to shut down: " + shutdownBrokerId - + " ; available brokers: " + listOfBrokers.toString()); - } else { - hasBeenCheckpointedBeforeFailure = hasBeenCheckpointed; - restartedLeaderBefore = true; - toShutDown.shutdown(); - toShutDown.awaitShutdown(); - throw new Exception("Broker was shutdown!"); - } + hasBeenCheckpointedBeforeFailure = hasBeenCheckpointed; + restartedLeaderBefore = true; + shutdownAction.run(); } } return value; diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java index 3163f52d1847a..fbf902f11e27a 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java @@ -115,7 +115,7 @@ public static void prepare() throws IOException, ClassNotFoundException { } @AfterClass - public static void shutDownServices() { + public static void shutDownServices() throws Exception { TestStreamEnvironment.unsetAsContext(); if (flink != null) { diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java index 8eb0351ce91e9..19f38e2e4a620 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java @@ -90,18 +90,21 @@ public abstract class KafkaTestBase extends TestLogger { @BeforeClass public static void prepare() throws ClassNotFoundException { + prepare(true); + } + public static void prepare(boolean hideKafkaBehindProxy) throws ClassNotFoundException { LOG.info("-------------------------------------------------------------------------"); LOG.info(" Starting KafkaTestBase "); LOG.info("-------------------------------------------------------------------------"); - startClusters(false); + startClusters(false, hideKafkaBehindProxy); TestStreamEnvironment.setAsContext(flink, PARALLELISM); } @AfterClass - public static void shutDownServices() { + public static void shutDownServices() throws Exception { LOG.info("-------------------------------------------------------------------------"); LOG.info(" Shut down KafkaTestBase "); @@ -127,7 +130,7 @@ protected static Configuration getFlinkConfiguration() { return flinkConfig; } - protected static void startClusters(boolean secureMode) throws ClassNotFoundException { + protected static void startClusters(boolean secureMode, boolean hideKafkaBehindProxy) throws ClassNotFoundException { // dynamically load the implementation for the test Class clazz = Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl"); @@ -135,7 +138,10 @@ protected static void startClusters(boolean secureMode) throws ClassNotFoundExce LOG.info("Starting KafkaTestBase.prepare() for Kafka " + kafkaServer.getVersion()); - kafkaServer.prepare(kafkaServer.createConfig().setKafkaServersNumber(NUMBER_OF_KAFKA_SERVERS).setSecureMode(secureMode)); + kafkaServer.prepare(kafkaServer.createConfig() + .setKafkaServersNumber(NUMBER_OF_KAFKA_SERVERS) + .setSecureMode(secureMode) + .setHideKafkaBehindProxy(hideKafkaBehindProxy)); standardProps = kafkaServer.getStandardProperties(); @@ -154,7 +160,7 @@ protected static void startClusters(boolean secureMode) throws ClassNotFoundExce flink.start(); } - protected static void shutdownClusters() { + protected static void shutdownClusters() throws Exception { if (flink != null) { flink.shutdown(); diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java index ea292a9a48692..21171f881e370 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java @@ -17,6 +17,7 @@ package org.apache.flink.streaming.connectors.kafka; +import org.apache.flink.networking.NetworkFailuresProxy; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.operators.StreamSink; @@ -29,6 +30,7 @@ import kafka.server.KafkaServer; import org.apache.kafka.clients.consumer.ConsumerRecord; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -45,6 +47,7 @@ public static class Config { private int kafkaServersNumber = 1; private Properties kafkaServerProperties = null; private boolean secureMode = false; + private boolean hideKafkaBehindProxy = false; /** * Please use {@link KafkaTestEnvironment#createConfig()} method. @@ -78,17 +81,32 @@ public Config setSecureMode(boolean secureMode) { this.secureMode = secureMode; return this; } + + public boolean isHideKafkaBehindProxy() { + return hideKafkaBehindProxy; + } + + public Config setHideKafkaBehindProxy(boolean hideKafkaBehindProxy) { + this.hideKafkaBehindProxy = hideKafkaBehindProxy; + return this; + } } protected static final String KAFKA_HOST = "localhost"; + protected final List networkFailuresProxies = new ArrayList<>(); + public static Config createConfig() { return new Config(); } public abstract void prepare(Config config); - public abstract void shutdown(); + public void shutdown() throws Exception { + for (NetworkFailuresProxy proxy : networkFailuresProxies) { + proxy.close(); + } + } public abstract void deleteTestTopic(String topic); @@ -168,4 +186,21 @@ public interface KafkaOffsetHandler { public abstract boolean isSecureRunSupported(); + public void blockProxyTraffic() { + for (NetworkFailuresProxy proxy : networkFailuresProxies) { + proxy.blockTraffic(); + } + } + + public void unblockProxyTraffic() { + for (NetworkFailuresProxy proxy : networkFailuresProxies) { + proxy.unblockTraffic(); + } + } + + protected NetworkFailuresProxy createProxy(String remoteHost, int remotePort) { + NetworkFailuresProxy proxy = new NetworkFailuresProxy(0, remoteHost, remotePort); + networkFailuresProxies.add(proxy); + return proxy; + } } From 7121a5d6e08fb8d7a081c4686fa4791796b4981b Mon Sep 17 00:00:00 2001 From: Piotr Nowojski Date: Thu, 3 Aug 2017 11:35:26 +0200 Subject: [PATCH 4/4] [hotfix][Kafka] Clean up getKafkaServer method --- .../kafka/KafkaTestEnvironmentImpl.java | 17 ++++------------- .../kafka/KafkaTestEnvironmentImpl.java | 14 +++++--------- 2 files changed, 9 insertions(+), 22 deletions(-) diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index 5be802f0dce06..5a5caadebb452 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -249,20 +249,11 @@ public void prepare(Config config) { LOG.info("Starting KafkaServer"); brokers = new ArrayList<>(config.getKafkaServersNumber()); + ListenerName listenerName = ListenerName.forSecurityProtocol(config.isSecureMode() ? SecurityProtocol.SASL_PLAINTEXT : SecurityProtocol.PLAINTEXT); for (int i = 0; i < config.getKafkaServersNumber(); i++) { - brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i))); - - if (config.isSecureMode()) { - brokerConnectionString += hostAndPortToUrlString( - KafkaTestEnvironment.KAFKA_HOST, - brokers.get(i).socketServer().boundPort( - ListenerName.forSecurityProtocol(SecurityProtocol.SASL_PLAINTEXT))); - } else { - brokerConnectionString += hostAndPortToUrlString( - KafkaTestEnvironment.KAFKA_HOST, - brokers.get(i).socketServer().boundPort( - ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))); - } + KafkaServer kafkaServer = getKafkaServer(i, tmpKafkaDirs.get(i)); + brokers.add(kafkaServer); + brokerConnectionString += hostAndPortToUrlString(KAFKA_HOST, kafkaServer.socketServer().boundPort(listenerName)); brokerConnectionString += ","; } diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index 676e5884060a5..26b41e672ec67 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -30,7 +30,6 @@ import kafka.admin.AdminUtils; import kafka.api.PartitionMetadata; import kafka.common.KafkaException; -import kafka.network.SocketServer; import kafka.server.KafkaConfig; import kafka.server.KafkaServer; import kafka.utils.SystemTime$; @@ -236,15 +235,12 @@ public void prepare(Config config) { LOG.info("Starting KafkaServer"); brokers = new ArrayList<>(config.getKafkaServersNumber()); + SecurityProtocol securityProtocol = config.isSecureMode() ? SecurityProtocol.SASL_PLAINTEXT : SecurityProtocol.PLAINTEXT; for (int i = 0; i < config.getKafkaServersNumber(); i++) { - brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i))); - - SocketServer socketServer = brokers.get(i).socketServer(); - if (this.config.isSecureMode()) { - brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.SASL_PLAINTEXT)) + ","; - } else { - brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ","; - } + KafkaServer kafkaServer = getKafkaServer(i, tmpKafkaDirs.get(i)); + brokers.add(kafkaServer); + brokerConnectionString += hostAndPortToUrlString(KAFKA_HOST, kafkaServer.socketServer().boundPort(securityProtocol)); + brokerConnectionString += ","; } LOG.info("ZK and KafkaServer started.");