From 3c4d8452021ea3e84d5f78eeb1b24dcc04adf865 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Tue, 9 Jan 2018 20:37:08 +0100 Subject: [PATCH 1/5] [FLINK-8393] [flip6] Reconnect to last known JobMaster when connection is lost In case of a heartbeat timeout or a disconnect call, the TaskExecutor tries to reconnect to the last known JobMaster location. This closes #5267. --- .../registration/RegisteredRpcConnection.java | 101 ++++++++++++++---- .../taskexecutor/JobLeaderService.java | 45 +++++++- .../runtime/taskexecutor/TaskExecutor.java | 27 ++++- .../taskexecutor/slot/TaskSlotTable.java | 13 ++- .../RegisteredRpcConnectionTest.java | 79 +++++++++++--- 5 files changed, 219 insertions(+), 46 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java index c76bcf8f5fa01..7d2c35a7a4b90 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java @@ -27,6 +27,7 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -46,6 +47,11 @@ */ public abstract class RegisteredRpcConnection { + private static final AtomicReferenceFieldUpdater REGISTRATION_UPDATER = AtomicReferenceFieldUpdater.newUpdater( + RegisteredRpcConnection.class, + RetryingRegistration.class, + "pendingRegistration"); + /** The logger for all log messages of this class. */ protected final Logger log; @@ -59,7 +65,7 @@ public abstract class RegisteredRpcConnection pendingRegistration; + private volatile RetryingRegistration pendingRegistration; /** The gateway to register, it's null until the registration is completed. */ private volatile G targetGateway; @@ -85,27 +91,47 @@ public void start() { checkState(!closed, "The RPC connection is already closed"); checkState(!isConnected() && pendingRegistration == null, "The RPC connection is already started"); - pendingRegistration = checkNotNull(generateRegistration()); - pendingRegistration.startRegistration(); + final RetryingRegistration newRegistration = createNewRegistration(); - CompletableFuture> future = pendingRegistration.getFuture(); + if (REGISTRATION_UPDATER.compareAndSet(this, null, newRegistration)) { + newRegistration.startRegistration(); + } else { + // concurrent start operation + newRegistration.cancel(); + } + } - future.whenCompleteAsync( - (Tuple2 result, Throwable failure) -> { - if (failure != null) { - if (failure instanceof CancellationException) { - // we ignore cancellation exceptions because they originate from cancelling - // the RetryingRegistration - log.debug("Retrying registration towards {} was cancelled.", targetAddress); - } else { - // this future should only ever fail if there is a bug, not if the registration is declined - onRegistrationFailure(failure); - } - } else { - targetGateway = result.f0; - onRegistrationSuccess(result.f1); - } - }, executor); + public boolean tryReconnect() { + checkState(isConnected(), "Cannot reconnect to an unknown destination."); + + if (closed) { + return false; + } else { + final RetryingRegistration currentPendingRegistration = pendingRegistration; + + if (currentPendingRegistration != null) { + currentPendingRegistration.cancel(); + } + + final RetryingRegistration newRegistration = createNewRegistration(); + + if (REGISTRATION_UPDATER.compareAndSet(this, currentPendingRegistration, newRegistration)) { + newRegistration.startRegistration(); + } else { + // concurrent modification + newRegistration.cancel(); + return false; + } + + // double check for concurrent close operations + if (closed) { + newRegistration.cancel(); + + return false; + } else { + return true; + } + } } /** @@ -175,13 +201,42 @@ public String toString() { } if (isClosed()) { - connectionInfo = connectionInfo + " is closed"; + connectionInfo += " is closed"; } else if (isConnected()){ - connectionInfo = connectionInfo + " is established"; + connectionInfo += " is established"; } else { - connectionInfo = connectionInfo + " is connecting"; + connectionInfo += " is connecting"; } return connectionInfo; } + + // ------------------------------------------------------------------------ + // Internal methods + // ------------------------------------------------------------------------ + + private RetryingRegistration createNewRegistration() { + RetryingRegistration newRegistration = checkNotNull(generateRegistration()); + + CompletableFuture> future = newRegistration.getFuture(); + + future.whenCompleteAsync( + (Tuple2 result, Throwable failure) -> { + if (failure != null) { + if (failure instanceof CancellationException) { + // we ignore cancellation exceptions because they originate from cancelling + // the RetryingRegistration + log.debug("Retrying registration towards {} was cancelled.", targetAddress); + } else { + // this future should only ever fail if there is a bug, not if the registration is declined + onRegistrationFailure(failure); + } + } else { + targetGateway = result.f0; + onRegistrationSuccess(result.f1); + } + }, executor); + + return newRegistration; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java index 77737e19ba208..3b4da4ef5ce7c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java @@ -204,6 +204,23 @@ public void addJob(final JobID jobId, final String defaultTargetAddress) throws jobLeaderServices.put(jobId, Tuple2.of(leaderRetrievalService, jobManagerLeaderListener)); } + /** + * Triggers reconnection to the last known leader of the given job. + * + * @param jobId specifying the job for which to trigger reconnection + */ + public void reconnect(final JobID jobId) { + Preconditions.checkNotNull(jobId, "JobID must not be null."); + + final Tuple2 jobLeaderService = jobLeaderServices.get(jobId); + + if (jobLeaderService != null) { + jobLeaderService.f1.reconnect(); + } else { + LOG.info("Cannot reconnect to job {} because it is not registered.", jobId); + } + } + /** * Leader listener which tries to establish a connection to a newly detected job leader. */ @@ -213,7 +230,7 @@ private final class JobManagerLeaderListener implements LeaderRetrievalListener private final JobID jobId; /** Rpc connection to the job leader. */ - private RegisteredRpcConnection rpcConnection; + private volatile RegisteredRpcConnection rpcConnection; /** State of the listener. */ private volatile boolean stopped; @@ -237,6 +254,32 @@ public void stop() { } } + public void reconnect() { + if (stopped) { + LOG.debug("Cannot reconnect because the JobManagerLeaderListener has already been stopped."); + } else { + final RegisteredRpcConnection currentRpcConnection = rpcConnection; + + if (currentRpcConnection != null) { + if (currentRpcConnection.isConnected()) { + + if (currentRpcConnection.tryReconnect()) { + // double check for concurrent stop operation + if (stopped) { + currentRpcConnection.close(); + } + } else { + LOG.debug("Could not reconnect to the JobMaster {}.", currentRpcConnection.getTargetAddress()); + } + } else { + LOG.debug("Ongoing registration to JobMaster {}.", currentRpcConnection.getTargetAddress()); + } + } else { + LOG.debug("Cannot reconnect to an unknown JobMaster."); + } + } + } + @Override public void notifyLeaderAddress(final String leaderAddress, final UUID leaderId) { if (stopped) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index 55774720b1b50..3c7d1cb2e155a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -687,6 +687,7 @@ public CompletableFuture freeSlot(AllocationID allocationId, Throwa @Override public void disconnectJobManager(JobID jobId, Exception cause) { closeJobManagerConnection(jobId, cause); + jobLeaderService.reconnect(jobId); } @Override @@ -1079,16 +1080,34 @@ private void freeSlotInternal(AllocationID allocationId, Throwable cause) { Preconditions.checkNotNull(allocationId); try { - int freedSlotIndex = taskSlotTable.freeSlot(allocationId, cause); + TaskSlot taskSlot = taskSlotTable.freeSlot(allocationId, cause); - if (freedSlotIndex != -1 && isConnectedToResourceManager()) { + if (taskSlot != null && isConnectedToResourceManager()) { // the slot was freed. Tell the RM about it ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway(); resourceManagerGateway.notifySlotAvailable( resourceManagerConnection.getRegistrationId(), - new SlotID(getResourceID(), freedSlotIndex), + new SlotID(getResourceID(), taskSlot.getIndex()), allocationId); + + // check whether we still have allocated slots for the same job + final JobID jobId = taskSlot.getJobId(); + final Iterator tasks = taskSlotTable.getTasks(jobId); + + if (!tasks.hasNext()) { + // we can remove the job from the job leader service + try { + jobLeaderService.removeJob(jobId); + } catch (Exception e) { + log.info("Could not remove job {} from JobLeaderService.", jobId, e); + } + + closeJobManagerConnection( + jobId, + new FlinkException("TaskExecutor " + getAddress() + + " has no more allocated slots for job " + jobId + '.')); + } } } catch (SlotNotFoundException e) { log.debug("Could not free slot for allocation id {}.", allocationId, e); @@ -1295,6 +1314,8 @@ public void notifyHeartbeatTimeout(final ResourceID resourceID) { closeJobManagerConnection( jobManagerConnection.getJobID(), new TimeoutException("The heartbeat of JobManager with id " + resourceID + " timed out.")); + + jobLeaderService.reconnect(jobManagerConnection.getJobID()); } } }); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java index 62101e7318a9f..ab62a86f89c96 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java @@ -33,6 +33,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -266,7 +268,7 @@ public boolean markSlotInactive(AllocationID allocationId, Time slotTimeout) thr * @throws SlotNotFoundException if there is not task slot for the given allocation id * @return Index of the freed slot if the slot could be freed; otherwise -1 */ - public int freeSlot(AllocationID allocationId) throws SlotNotFoundException { + public TaskSlot freeSlot(AllocationID allocationId) throws SlotNotFoundException { return freeSlot(allocationId, new Exception("The task slot of this task is being freed.")); } @@ -278,9 +280,10 @@ public int freeSlot(AllocationID allocationId) throws SlotNotFoundException { * @param allocationId identifying the task slot to be freed * @param cause to fail the tasks with if slot is not empty * @throws SlotNotFoundException if there is not task slot for the given allocation id - * @return Index of the freed slot if the slot could be freed; otherwise -1 + * @return The freed TaskSlot. If the TaskSlot cannot be freed then null. */ - public int freeSlot(AllocationID allocationId, Throwable cause) throws SlotNotFoundException { + @Nullable + public TaskSlot freeSlot(AllocationID allocationId, Throwable cause) throws SlotNotFoundException { checkInit(); TaskSlot taskSlot = getTaskSlot(allocationId); @@ -314,7 +317,7 @@ public int freeSlot(AllocationID allocationId, Throwable cause) throws SlotNotFo slotsPerJob.remove(jobId); } - return taskSlot.getIndex(); + return taskSlot; } else { // we couldn't free the task slot because it still contains task, fail the tasks // and set the slot state to releasing so that it gets eventually freed @@ -326,7 +329,7 @@ public int freeSlot(AllocationID allocationId, Throwable cause) throws SlotNotFo taskIterator.next().failExternally(cause); } - return -1; + return null; } } else { throw new SlotNotFoundException(allocationId); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java index 19a57563c3815..650a0f2112b9f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java @@ -27,12 +27,15 @@ import org.slf4j.LoggerFactory; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; import static org.mockito.Mockito.mock; @@ -60,14 +63,14 @@ public void testSuccessfulRpcConnection() throws Exception { connection.start(); //wait for connection established - Thread.sleep(RetryingRegistrationTest.TestRetryingRegistration.MAX_TIMEOUT); + final String actualConnectionId = connection.getConnectionFuture().get(); // validate correct invocation and result assertTrue(connection.isConnected()); assertEquals(testRpcConnectionEndpointAddress, connection.getTargetAddress()); assertEquals(leaderId, connection.getTargetLeaderId()); assertEquals(testGateway, connection.getTargetGateway()); - assertEquals(connectionID, connection.getConnectionId()); + assertEquals(connectionID, actualConnectionId); } finally { testGateway.stop(); @@ -86,8 +89,9 @@ public void testRpcConnectionFailures() throws Exception { try { // gateway that upon calls Throw an exception TestRegistrationGateway testGateway = mock(TestRegistrationGateway.class); + final RuntimeException registrationException = new RuntimeException(connectionFailureMessage); when(testGateway.registrationCall(any(UUID.class), anyLong())).thenThrow( - new RuntimeException(connectionFailureMessage)); + registrationException); rpcService.registerGateway(testRpcConnectionEndpointAddress, testGateway); @@ -95,14 +99,18 @@ public void testRpcConnectionFailures() throws Exception { connection.start(); //wait for connection failure - Thread.sleep(RetryingRegistrationTest.TestRetryingRegistration.MAX_TIMEOUT); + try { + connection.getConnectionFuture().get(); + fail("expected failure."); + } catch (ExecutionException ee) { + assertEquals(registrationException, ee.getCause()); + } // validate correct invocation and result assertFalse(connection.isConnected()); assertEquals(testRpcConnectionEndpointAddress, connection.getTargetAddress()); assertEquals(leaderId, connection.getTargetLeaderId()); assertNull(connection.getTargetGateway()); - assertEquals(connectionFailureMessage, connection.getFailareMessage()); } finally { rpcService.stopService(); @@ -137,21 +145,53 @@ public void testRpcConnectionClose() throws Exception { } } + @Test + public void testReconnect() throws Exception { + final String connectionId1 = "Test RPC Connection ID 1"; + final String connectionId2 = "Test RPC Connection ID 2"; + final TestingRpcService rpcService = new TestingRpcService(); + final String testRpcConnectionEndpointAddress = ""; + final UUID leaderId = UUID.randomUUID(); + final TestRegistrationGateway testGateway = new TestRegistrationGateway( + new RetryingRegistrationTest.TestRegistrationSuccess(connectionId1), + new RetryingRegistrationTest.TestRegistrationSuccess(connectionId2)); + + try { + rpcService.registerGateway(testRpcConnectionEndpointAddress, testGateway); + + TestRpcConnection connection = new TestRpcConnection(testRpcConnectionEndpointAddress, leaderId, rpcService.getExecutor(), rpcService); + connection.start(); + + final String actualConnectionId1 = connection.getConnectionFuture().get(); + + assertEquals(actualConnectionId1, connectionId1); + + assertTrue(connection.tryReconnect()); + + final String actualConnectionId2 = connection.getConnectionFuture().get(); + + assertEquals(actualConnectionId2, connectionId2); + } finally { + rpcService.stopService(); + } + } + // ------------------------------------------------------------------------ // test RegisteredRpcConnection // ------------------------------------------------------------------------ private static class TestRpcConnection extends RegisteredRpcConnection { - private final RpcService rpcService; + private final Object lock = new Object(); - private String connectionId; + private final RpcService rpcService; - private String failureMessage; + private CompletableFuture connectionFuture; public TestRpcConnection(String targetAddress, UUID targetLeaderId, Executor executor, RpcService rpcService) { super(LoggerFactory.getLogger(RegisteredRpcConnectionTest.class), targetAddress, targetLeaderId, executor); this.rpcService = rpcService; + this.connectionFuture = new CompletableFuture<>(); } @Override @@ -161,20 +201,31 @@ protected RetryingRegistration(); + } + return super.tryReconnect(); } - public String getFailareMessage() { - return failureMessage; + public CompletableFuture getConnectionFuture() { + synchronized (lock) { + return connectionFuture; + } } } } From 55ced94f9a25e9572b2206297d44849dc5afb42b Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Mon, 23 Oct 2017 17:02:58 +0200 Subject: [PATCH 2/5] [FLINK-7918] Run AbstractTestBase tests on Flip-6 MiniCluster This closes #5095. --- .../connectors/fs/RollingSinkITCase.java | 2 + .../minicluster/JobExecutorService.java | 37 ++++++ .../runtime/minicluster/MiniCluster.java | 17 ++- .../minicluster/FlinkMiniCluster.scala | 14 +- .../flink/test/util/AbstractTestBase.java | 3 + .../flink/test/util/MiniClusterResource.java | 120 ++++++++++++++---- pom.xml | 3 + 7 files changed, 169 insertions(+), 27 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/JobExecutorService.java diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java index 78f643f717eda..e1124e4b9a77d 100644 --- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java +++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java @@ -121,6 +121,8 @@ public static void setup() throws Exception { new org.apache.flink.configuration.Configuration(), 1, 4)); + + miniClusterResource.before(); } @AfterClass diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/JobExecutorService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/JobExecutorService.java new file mode 100644 index 0000000000000..03d2447d02afe --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/JobExecutorService.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.minicluster; + +import java.util.concurrent.CompletableFuture; + +/** + * Interface to control {@link JobExecutor}. + */ +public interface JobExecutorService extends JobExecutor { + + /** + * Terminate the given JobExecutorService. + * + *

This method can be implemented asynchronously. Therefore it returns a future + * which is completed once the termination has been done. + * + * @return Termination future which can also contain an exception if the termination went wrong + */ + CompletableFuture terminate(); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index 2598a6031f1f9..531c1a1a726f9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.clusterframework.FlinkResourceManager; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; @@ -58,7 +59,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; -public class MiniCluster implements JobExecutor { +public class MiniCluster implements JobExecutorService { private static final Logger LOG = LoggerFactory.getLogger(MiniCluster.class); @@ -458,6 +459,10 @@ public JobExecutionResult executeJobBlocking(JobGraph job) throws JobExecutionEx dispatcher = this.jobDispatcher; } + // we have to allow queued scheduling in Flip-6 mode because we need to request slots + // from the ResourceManager + job.setAllowQueuedScheduling(true); + return dispatcher.runJobBlocking(job); } @@ -593,6 +598,16 @@ private static Throwable shutDownRpcs(RpcService[] rpcServices, Throwable priorE return priorException; } + @Override + public CompletableFuture terminate() { + try { + shutdown(); + return CompletableFuture.completedFuture(null); + } catch (Exception e) { + return FutureUtils.completedExceptionally(e); + } + } + private class TerminatingFatalErrorHandler implements FatalErrorHandler { private final int index; diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala index cc8ae5f520517..44e3a677a2823 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala @@ -20,7 +20,7 @@ package org.apache.flink.runtime.minicluster import java.net.{URL, URLClassLoader} import java.util.UUID -import java.util.concurrent.{Executors, TimeUnit} +import java.util.concurrent.{CompletableFuture, Executors, TimeUnit} import akka.pattern.Patterns.gracefulStop import akka.pattern.ask @@ -65,7 +65,7 @@ abstract class FlinkMiniCluster( val highAvailabilityServices: HighAvailabilityServices, val useSingleActorSystem: Boolean) extends LeaderRetrievalListener - with JobExecutor { + with JobExecutorService { protected val LOG = LoggerFactory.getLogger(classOf[FlinkMiniCluster]) @@ -713,4 +713,14 @@ abstract class FlinkMiniCluster( override def executeJobBlocking(jobGraph: JobGraph) = { submitJobAndWait(jobGraph, false) } + + override def terminate() = { + try { + stop() + CompletableFuture.completedFuture(null) + } catch { + case e: Exception => + FutureUtils.completedExceptionally(e) + } + } } diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java index 65b351daa696e..d73f6246b3cad 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java @@ -19,9 +19,11 @@ package org.apache.flink.test.util; import org.apache.flink.configuration.Configuration; +import org.apache.flink.testutils.category.OldAndFlip6; import org.apache.flink.util.FileUtils; import org.junit.ClassRule; +import org.junit.experimental.categories.Category; import org.junit.rules.TemporaryFolder; import java.io.File; @@ -54,6 +56,7 @@ * * */ +@Category(OldAndFlip6.class) public abstract class AbstractTestBase extends TestBaseUtils { private static final int DEFAULT_PARALLELISM = 4; diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java index 20dbebb029148..69070c6c7b038 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java @@ -22,15 +22,23 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.concurrent.FutureUtils; -import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.runtime.minicluster.JobExecutorService; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; import org.apache.flink.streaming.util.TestStreamEnvironment; +import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; import org.junit.rules.ExternalResource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; + +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + /** * Starts a Flink mini cluster as a resource and registers the respective * ExecutionEnvironment and StreamExecutionEnvironment. @@ -39,16 +47,31 @@ public class MiniClusterResource extends ExternalResource { private static final Logger LOG = LoggerFactory.getLogger(MiniClusterResource.class); + private static final String CODEBASE_KEY = "codebase"; + + private static final String FLIP6_CODEBASE = "flip6"; + private final MiniClusterResourceConfiguration miniClusterResourceConfiguration; - private LocalFlinkMiniCluster localFlinkMiniCluster; + private final MiniClusterType miniClusterType; + + private JobExecutorService jobExecutorService; private int numberSlots = -1; private TestEnvironment executionEnvironment; public MiniClusterResource(final MiniClusterResourceConfiguration miniClusterResourceConfiguration) { + this( + miniClusterResourceConfiguration, + Objects.equals(FLIP6_CODEBASE, System.getProperty(CODEBASE_KEY)) ? MiniClusterType.FLIP6 : MiniClusterType.OLD); + } + + public MiniClusterResource( + final MiniClusterResourceConfiguration miniClusterResourceConfiguration, + final MiniClusterType miniClusterType) { this.miniClusterResourceConfiguration = Preconditions.checkNotNull(miniClusterResourceConfiguration); + this.miniClusterType = Preconditions.checkNotNull(miniClusterType); } public int getNumberSlots() { @@ -61,37 +84,74 @@ public TestEnvironment getTestEnvironment() { @Override public void before() throws Exception { - final Configuration configuration = new Configuration(miniClusterResourceConfiguration.getConfiguration()); - configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, miniClusterResourceConfiguration.getNumberTaskManagers()); - configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, miniClusterResourceConfiguration.numberSlotsPerTaskManager); - - localFlinkMiniCluster = TestBaseUtils.startCluster( - configuration, - true); + jobExecutorService = startJobExecutorService(miniClusterType); numberSlots = miniClusterResourceConfiguration.getNumberSlotsPerTaskManager() * miniClusterResourceConfiguration.getNumberTaskManagers(); - executionEnvironment = new TestEnvironment(localFlinkMiniCluster, numberSlots, false); + executionEnvironment = new TestEnvironment(jobExecutorService, numberSlots, false); executionEnvironment.setAsContext(); - TestStreamEnvironment.setAsContext(localFlinkMiniCluster, numberSlots); + TestStreamEnvironment.setAsContext(jobExecutorService, numberSlots); } @Override public void after() { - if (localFlinkMiniCluster != null) { - try { - TestBaseUtils.stopCluster( - localFlinkMiniCluster, - FutureUtils.toFiniteDuration(miniClusterResourceConfiguration.getShutdownTimeout())); - } catch (Exception e) { - LOG.warn("Could not properly shut down the Flink mini cluster.", e); - } - - TestStreamEnvironment.unsetAsContext(); - TestEnvironment.unsetAsContext(); - localFlinkMiniCluster = null; + + TestStreamEnvironment.unsetAsContext(); + TestEnvironment.unsetAsContext(); + + final CompletableFuture terminationFuture = jobExecutorService.terminate(); + + try { + terminationFuture.get( + miniClusterResourceConfiguration.getShutdownTimeout().toMilliseconds(), + TimeUnit.MILLISECONDS); + } catch (Exception e) { + LOG.warn("Could not properly shut down the MiniClusterResource.", e); } + + jobExecutorService = null; + } + + private JobExecutorService startJobExecutorService(MiniClusterType miniClusterType) throws Exception { + final JobExecutorService jobExecutorService; + switch (miniClusterType) { + case OLD: + jobExecutorService = startOldMiniCluster(); + break; + case FLIP6: + jobExecutorService = startFlip6MiniCluster(); + break; + default: + throw new FlinkRuntimeException("Unknown MiniClusterType " + miniClusterType + '.'); + } + + return jobExecutorService; + } + + private JobExecutorService startOldMiniCluster() throws Exception { + final Configuration configuration = new Configuration(miniClusterResourceConfiguration.getConfiguration()); + configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, miniClusterResourceConfiguration.getNumberTaskManagers()); + configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, miniClusterResourceConfiguration.getNumberSlotsPerTaskManager()); + + return TestBaseUtils.startCluster( + configuration, + true); + } + + @Nonnull + private JobExecutorService startFlip6MiniCluster() throws Exception { + final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder() + .setConfiguration(miniClusterResourceConfiguration.getConfiguration()) + .setNumTaskManagers(miniClusterResourceConfiguration.getNumberTaskManagers()) + .setNumSlotsPerTaskManager(miniClusterResourceConfiguration.getNumberSlotsPerTaskManager()) + .build(); + + final MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration); + + miniCluster.start(); + + return miniCluster; } /** @@ -144,4 +204,16 @@ public Time getShutdownTimeout() { return shutdownTimeout; } } + + // --------------------------------------------- + // Enum definitions + // --------------------------------------------- + + /** + * Type of the mini cluster to start. + */ + public enum MiniClusterType { + OLD, + FLIP6 + } } diff --git a/pom.xml b/pom.xml index 6b68932efe577..ee07414648a73 100644 --- a/pom.xml +++ b/pom.xml @@ -129,6 +129,7 @@ under the License. org.apache.flink.testutils.category.Flip6 + old + flip6 @@ -1131,6 +1133,7 @@ under the License. 0${surefire.forkNumber} ${log4j.configuration} + ${codebase} -Xms256m -Xmx2048m -Dmvn.forkNumber=${surefire.forkNumber} -XX:+UseG1GC From 64a910e5347cf6513e9b8bffbe6354d3819987f9 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Fri, 1 Dec 2017 19:19:29 +0100 Subject: [PATCH 3/5] [hotfix] [tests] Refactor TypeHintITCase to extend AbstractTestBase --- .../flink/test/operators/TypeHintITCase.java | 371 ++++++++---------- 1 file changed, 170 insertions(+), 201 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/TypeHintITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/TypeHintITCase.java index b63400506124f..62b5186a478da 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/operators/TypeHintITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/operators/TypeHintITCase.java @@ -32,227 +32,196 @@ import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.test.operators.util.CollectionDataSets; -import org.apache.flink.test.util.JavaProgramTestBase; +import org.apache.flink.test.util.AbstractTestBase; import org.apache.flink.util.Collector; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; +import org.junit.Test; -import java.util.ArrayList; -import java.util.Collection; import java.util.List; /** * Integration tests for {@link org.apache.flink.api.common.typeinfo.TypeHint}. */ -@RunWith(Parameterized.class) -public class TypeHintITCase extends JavaProgramTestBase { +public class TypeHintITCase extends AbstractTestBase { - private static final int NUM_PROGRAMS = 9; + @Test + public void testIdentityMapWithMissingTypesAndStringTypeHint() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().disableSysoutLogging(); - private final int curProgId; + DataSet> ds = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet> identityMapDs = ds + .map(new Mapper, Tuple3>()) + .returns("Tuple3"); + List> result = identityMapDs.collect(); - public TypeHintITCase(int curProgId) { - this.curProgId = curProgId; + String expectedResult = "(2,2,Hello)\n" + + "(3,2,Hello world)\n" + + "(1,1,Hi)\n"; + + compareResultAsText(result, expectedResult); } - @Override - protected void testProgram() throws Exception { - TypeHintProgs.runProgram(curProgId); + @Test + public void testIdentityMapWithMissingTypesAndTypeInformationTypeHint() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().disableSysoutLogging(); + + DataSet> ds = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet> identityMapDs = ds + // all following generics get erased during compilation + .map(new Mapper, Tuple3>()) + .returns(new TupleTypeInfo>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO)); + List> result = identityMapDs + .collect(); + + String expectedResult = "(2,2,Hello)\n" + + "(3,2,Hello world)\n" + + "(1,1,Hi)\n"; + + compareResultAsText(result, expectedResult); } - @Parameters - public static Collection getConfigurations() { + @Test + public void testFlatMapWithClassTypeHint() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().disableSysoutLogging(); - Collection parameters = new ArrayList<>(NUM_PROGRAMS); + DataSet> ds = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet identityMapDs = ds + .flatMap(new FlatMapper, Integer>()) + .returns(Integer.class); + List result = identityMapDs.collect(); - for (int i = 1; i <= NUM_PROGRAMS; i++) { - parameters.add(new Object[]{i}); - } + String expectedResult = "2\n" + + "3\n" + + "1\n"; - return parameters; + compareResultAsText(result, expectedResult); } - private static class TypeHintProgs { - - public static void runProgram(int progId) throws Exception { - switch(progId) { - // Test identity map with missing types and string type hint - case 1: { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet> ds = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet> identityMapDs = ds - .map(new Mapper, Tuple3>()) - .returns("Tuple3"); - List> result = identityMapDs.collect(); - - String expectedResult = "(2,2,Hello)\n" + - "(3,2,Hello world)\n" + - "(1,1,Hi)\n"; - - compareResultAsText(result, expectedResult); - break; - } - // Test identity map with missing types and type information type hint - case 2: { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet> ds = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet> identityMapDs = ds - // all following generics get erased during compilation - .map(new Mapper, Tuple3>()) - .returns(new TupleTypeInfo>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO)); - List> result = identityMapDs - .collect(); - - String expectedResult = "(2,2,Hello)\n" + - "(3,2,Hello world)\n" + - "(1,1,Hi)\n"; - - compareResultAsText(result, expectedResult); - break; - } - // Test flat map with class type hint - case 3: { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet> ds = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet identityMapDs = ds - .flatMap(new FlatMapper, Integer>()) - .returns(Integer.class); - List result = identityMapDs.collect(); - - String expectedResult = "2\n" + - "3\n" + - "1\n"; - - compareResultAsText(result, expectedResult); - break; - } - // Test join with type information type hint - case 4: { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet> ds2 = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet resultDs = ds1 - .join(ds2) - .where(0) - .equalTo(0) - .with(new Joiner, Tuple3, Integer>()) - .returns(BasicTypeInfo.INT_TYPE_INFO); - List result = resultDs.collect(); - - String expectedResult = "2\n" + - "3\n" + - "1\n"; - - compareResultAsText(result, expectedResult); - break; - } - // Test flat join with type information type hint - case 5: { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet> ds2 = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet resultDs = ds1 - .join(ds2) - .where(0) - .equalTo(0) - .with(new FlatJoiner, Tuple3, Integer>()) - .returns(BasicTypeInfo.INT_TYPE_INFO); - List result = resultDs.collect(); - - String expectedResult = "2\n" + - "3\n" + - "1\n"; - - compareResultAsText(result, expectedResult); - break; - } - // Test unsorted group reduce with type information type hint - case 6: { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet> ds = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet resultDs = ds - .groupBy(0) - .reduceGroup(new GroupReducer, Integer>()) - .returns(BasicTypeInfo.INT_TYPE_INFO); - List result = resultDs.collect(); - - String expectedResult = "2\n" + - "3\n" + - "1\n"; - - compareResultAsText(result, expectedResult); - break; - } - // Test sorted group reduce with type information type hint - case 7: { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet> ds = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet resultDs = ds - .groupBy(0) - .sortGroup(0, Order.ASCENDING) - .reduceGroup(new GroupReducer, Integer>()) - .returns(BasicTypeInfo.INT_TYPE_INFO); - List result = resultDs.collect(); - - String expectedResult = "2\n" + - "3\n" + - "1\n"; - - compareResultAsText(result, expectedResult); - break; - } - // Test combine group with type information type hint - case 8: { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet> ds = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet resultDs = ds - .groupBy(0) - .combineGroup(new GroupCombiner, Integer>()) - .returns(BasicTypeInfo.INT_TYPE_INFO); - List result = resultDs.collect(); - - String expectedResult = "2\n" + - "3\n" + - "1\n"; - - compareResultAsText(result, expectedResult); - break; - } - // Test cogroup with type information type hint - case 9: { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet> ds2 = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet resultDs = ds1 - .coGroup(ds2) - .where(0) - .equalTo(0) - .with(new CoGrouper, Tuple3, Integer>()) - .returns(BasicTypeInfo.INT_TYPE_INFO); - List result = resultDs.collect(); - - String expectedResult = "2\n" + - "3\n" + - "1\n"; - - compareResultAsText(result, expectedResult); - break; - } - default: - throw new IllegalArgumentException("Invalid program id"); - } - } + @Test + public void testJoinWithTypeInformationTypeHint() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().disableSysoutLogging(); + + DataSet> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet> ds2 = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet resultDs = ds1 + .join(ds2) + .where(0) + .equalTo(0) + .with(new Joiner, Tuple3, Integer>()) + .returns(BasicTypeInfo.INT_TYPE_INFO); + List result = resultDs.collect(); + + String expectedResult = "2\n" + + "3\n" + + "1\n"; + + compareResultAsText(result, expectedResult); + } + + @Test + public void testFlatJoinWithTypeInformationTypeHint() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().disableSysoutLogging(); + + DataSet> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet> ds2 = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet resultDs = ds1 + .join(ds2) + .where(0) + .equalTo(0) + .with(new FlatJoiner, Tuple3, Integer>()) + .returns(BasicTypeInfo.INT_TYPE_INFO); + List result = resultDs.collect(); + + String expectedResult = "2\n" + + "3\n" + + "1\n"; + + compareResultAsText(result, expectedResult); + } + + @Test + public void testUnsortedGroupReduceWithTypeInformationTypeHint() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().disableSysoutLogging(); + + DataSet> ds = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet resultDs = ds + .groupBy(0) + .reduceGroup(new GroupReducer, Integer>()) + .returns(BasicTypeInfo.INT_TYPE_INFO); + List result = resultDs.collect(); + + String expectedResult = "2\n" + + "3\n" + + "1\n"; + + compareResultAsText(result, expectedResult); + } + + @Test + public void testSortedGroupReduceWithTypeInformationTypeHint() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().disableSysoutLogging(); + + DataSet> ds = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet resultDs = ds + .groupBy(0) + .sortGroup(0, Order.ASCENDING) + .reduceGroup(new GroupReducer, Integer>()) + .returns(BasicTypeInfo.INT_TYPE_INFO); + List result = resultDs.collect(); + + String expectedResult = "2\n" + + "3\n" + + "1\n"; + + compareResultAsText(result, expectedResult); + } + + @Test + public void testCombineGroupWithTypeInformationTypeHint() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().disableSysoutLogging(); + + DataSet> ds = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet resultDs = ds + .groupBy(0) + .combineGroup(new GroupCombiner, Integer>()) + .returns(BasicTypeInfo.INT_TYPE_INFO); + List result = resultDs.collect(); + + String expectedResult = "2\n" + + "3\n" + + "1\n"; + + compareResultAsText(result, expectedResult); + } + + @Test + public void testCoGroupWithTypeInformationTypeHint() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().disableSysoutLogging(); + + DataSet> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet> ds2 = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet resultDs = ds1 + .coGroup(ds2) + .where(0) + .equalTo(0) + .with(new CoGrouper, Tuple3, Integer>()) + .returns(BasicTypeInfo.INT_TYPE_INFO); + List result = resultDs.collect(); + + String expectedResult = "2\n" + + "3\n" + + "1\n"; + + compareResultAsText(result, expectedResult); } // -------------------------------------------------------------------------------------------- From 608185804f076b68aec6c06aa300e3d63e392339 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Fri, 1 Dec 2017 19:20:04 +0100 Subject: [PATCH 4/5] [hotfix] [tests] Fix JavaProgramTestBase to reset MiniClusterResource#TestEnvironment --- .../java/org/apache/flink/test/util/JavaProgramTestBase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java index de536d81bac19..9262ae6283187 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java @@ -210,7 +210,7 @@ public void testJobCollectionExecution() throws Exception { e.printStackTrace(); Assert.fail("Error while calling the test program: " + e.getMessage()); } finally { - CollectionTestEnvironment.unsetAsContext(); + miniClusterResource.getTestEnvironment().setAsContext(); } Assert.assertNotNull("The test program never triggered an execution.", this.latestExecutionResult); From 32eb9b30be773da939cf23f2a707924eda72b28d Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Fri, 1 Dec 2017 19:20:35 +0100 Subject: [PATCH 5/5] [hotfix] [tests] Fix PageRankITCase, AggregatorsITCase and DataSinkITCase to use fresh result path --- .../main/java/org/apache/flink/core/fs/FileSystem.java | 2 +- .../apache/flink/test/example/java/PageRankITCase.java | 6 +++++- .../test/iterative/aggregators/AggregatorsITCase.java | 8 +++++--- .../org/apache/flink/test/operators/DataSinkITCase.java | 6 +++++- 4 files changed, 16 insertions(+), 6 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java index 07a1e76c0c3c2..8698595cc61a6 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java @@ -752,7 +752,7 @@ public boolean initOutPathLocalFS(Path outPath, WriteMode writeMode, boolean cre return true; } else { // file may not be overwritten - throw new IOException("File or directory already exists. Existing files and directories " + + throw new IOException("File or directory " + outPath + " already exists. Existing files and directories " + "are not overwritten in " + WriteMode.NO_OVERWRITE.name() + " mode. Use " + WriteMode.OVERWRITE.name() + " mode to overwrite existing files and directories."); } diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/java/PageRankITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/java/PageRankITCase.java index ee7bf826364da..2fae57d799e06 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/example/java/PageRankITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/example/java/PageRankITCase.java @@ -33,6 +33,7 @@ import org.junit.runners.Parameterized; import java.io.File; +import java.util.UUID; /** * Test for {@link PageRank}. @@ -54,7 +55,10 @@ public PageRankITCase(TestExecutionMode mode){ @Before public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); + final File folder = tempFolder.newFolder(); + final File resultFile = new File(folder, UUID.randomUUID().toString()); + resultPath = resultFile.toURI().toString(); + File verticesFile = tempFolder.newFile(); FileUtils.writeFileUtf8(verticesFile, PageRankData.VERTICES); diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java index 64ee98a599c24..bd42ac287a5c3 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java @@ -49,6 +49,7 @@ import java.io.FileReader; import java.io.FileWriter; import java.util.Random; +import java.util.UUID; import static org.junit.Assert.assertEquals; @@ -78,9 +79,10 @@ public AggregatorsITCase(TestExecutionMode mode){ @Before public void before() throws Exception{ - File tempFile = tempFolder.newFile(); - testPath = tempFile.toString(); - resultPath = tempFile.toURI().toString(); + final File folder = tempFolder.newFolder(); + final File resultFile = new File(folder, UUID.randomUUID().toString()); + testPath = resultFile.toString(); + resultPath = resultFile.toURI().toString(); } @After diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/DataSinkITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/DataSinkITCase.java index deb51708fcc17..b35e8185a93da 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/operators/DataSinkITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/operators/DataSinkITCase.java @@ -35,7 +35,9 @@ import org.junit.runners.Parameterized; import java.io.BufferedReader; +import java.io.File; import java.util.Random; +import java.util.UUID; import static org.junit.Assert.assertTrue; @@ -57,7 +59,9 @@ public DataSinkITCase(TestExecutionMode mode) { @Before public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); + final File folder = tempFolder.newFolder(); + final File resultFile = new File(folder, UUID.randomUUID().toString()); + resultPath = resultFile.toURI().toString(); } @Test