From dab3c654fafecf8caee6453b7da8965174646771 Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Thu, 25 Feb 2016 16:55:04 +0100 Subject: [PATCH 1/2] [FLINK-3513] [runtime] Fix interplay of automatic Operator UID and Changing name of WindowOperator --- .../api/graph/StreamingJobGraphGenerator.java | 2 -- ...treamingJobGraphGeneratorNodeHashTest.java | 24 +++++++++++++++++++ 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index da4642430953d..e3e1ac612dff1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -736,8 +736,6 @@ private void generateNodeLocalHash(StreamNode node, Hasher hasher, int id) { hasher.putInt(node.getParallelism()); - hasher.putString(node.getOperatorName(), Charset.forName("UTF-8")); - if (node.getOperator() instanceof AbstractUdfStreamOperator) { String udfClassName = ((AbstractUdfStreamOperator) node.getOperator()) .getUserFunction().getClass().getName(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java index 98750d098f67c..9e1e9b4250183 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java @@ -31,9 +31,13 @@ import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.graph.StreamNode; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.transformations.StreamTransformation; import org.apache.flink.streaming.util.NoOpSink; import org.junit.Test; +import java.lang.reflect.Field; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -319,6 +323,26 @@ public void testNodeHashIdenticalNodes() throws Exception { } } + /** + * Tests that a changed operator name does not affect the hash. + */ + @Test + public void testChangedOperatorName() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); + env.addSource(new NoOpSourceFunction(), "A").map(new NoOpMapFunction()); + JobGraph jobGraph = env.getStreamGraph().getJobGraph(); + + JobVertexID expected = jobGraph.getVerticesAsArray()[0].getID(); + + env = StreamExecutionEnvironment.createLocalEnvironment(); + env.addSource(new NoOpSourceFunction(), "B").map(new NoOpMapFunction()); + jobGraph = env.getStreamGraph().getJobGraph(); + + JobVertexID actual = jobGraph.getVerticesAsArray()[0].getID(); + + assertEquals(expected, actual); + } + // ------------------------------------------------------------------------ // Manual hash assignment // ------------------------------------------------------------------------ From 3c3a81acee8de1128a3eee7f59ca6f8803acbd5c Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Thu, 25 Feb 2016 16:55:38 +0100 Subject: [PATCH 2/2] [FLINK-3512] [runtime] Savepoint backend should not revert to 'jobmanager' --- .../checkpoint/SavepointStoreFactory.java | 63 +++++++------------ .../checkpoint/SavepointStoreFactoryTest.java | 24 ++++--- 2 files changed, 37 insertions(+), 50 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointStoreFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointStoreFactory.java index 04a32276f6202..6d25e18f28322 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointStoreFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointStoreFactory.java @@ -18,8 +18,8 @@ package org.apache.flink.runtime.checkpoint; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,6 +32,7 @@ public class SavepointStoreFactory { public static final String SAVEPOINT_BACKEND_KEY = "savepoints.state.backend"; public static final String SAVEPOINT_DIRECTORY_KEY = "savepoints.state.backend.fs.dir"; + public static final String DEFAULT_SAVEPOINT_BACKEND = "jobmanager"; public static final Logger LOG = LoggerFactory.getLogger(SavepointStoreFactory.class); @@ -52,55 +53,33 @@ public static SavepointStore createFromConfig( Configuration config) throws Exception { // Try a the savepoint-specific configuration first. - String savepointBackend = config.getString(SAVEPOINT_BACKEND_KEY, null); + String savepointBackend = config.getString(SAVEPOINT_BACKEND_KEY, DEFAULT_SAVEPOINT_BACKEND); if (savepointBackend == null) { LOG.info("No savepoint state backend configured. " + "Using job manager savepoint state backend."); return createJobManagerSavepointStore(); - } - else if (savepointBackend.equals("jobmanager")) { + } else if (savepointBackend.equals("jobmanager")) { LOG.info("Using job manager savepoint state backend."); return createJobManagerSavepointStore(); - } - else if (savepointBackend.equals("filesystem")) { - // Sanity check that the checkpoints are not stored on the job manager only - String checkpointBackend = config.getString( - ConfigConstants.STATE_BACKEND, "jobmanager"); - - if (checkpointBackend.equals("jobmanager")) { - LOG.warn("The combination of file system backend for savepoints and " + - "jobmanager backend for checkpoints does not work. The savepoint " + - "will *not* be recoverable after the job manager shuts down. " + - "Falling back to job manager savepoint state backend."); - - return createJobManagerSavepointStore(); + } else if (savepointBackend.equals("filesystem")) { + String rootPath = config.getString(SAVEPOINT_DIRECTORY_KEY, null); + + if (rootPath == null) { + throw new IllegalConfigurationException("Using filesystem as savepoint state backend, " + + "but did not specify directory. Please set the " + + "following configuration key: '" + SAVEPOINT_DIRECTORY_KEY + + "' (e.g. " + SAVEPOINT_DIRECTORY_KEY + ": hdfs:///flink/savepoints/). " + + "Falling back to job manager savepoint backend."); + } else { + LOG.info("Using filesystem savepoint backend (root path: {}).", rootPath); + + return createFileSystemSavepointStore(rootPath); } - else { - String rootPath = config.getString(SAVEPOINT_DIRECTORY_KEY, null); - - if (rootPath == null) { - LOG.warn("Using filesystem as savepoint state backend, " + - "but did not specify directory. Please set the " + - "following configuration key: '" + SAVEPOINT_DIRECTORY_KEY + - "' (e.g. " + SAVEPOINT_DIRECTORY_KEY + ": hdfs:///flink/savepoints/). " + - "Falling back to job manager savepoint backend."); - - return createJobManagerSavepointStore(); - } - else { - LOG.info("Using filesystem savepoint backend (root path: {}).", rootPath); - - return createFileSystemSavepointStore(rootPath); - } - } - } - else { - // Fallback - LOG.warn("Unexpected savepoint backend configuration '{}'. " + - "Falling back to job manager savepoint state backend.", savepointBackend); - - return createJobManagerSavepointStore(); + } else { + throw new IllegalConfigurationException("Unexpected savepoint backend " + + "configuration '" + savepointBackend + "'. " + + "Falling back to job manager savepoint state backend."); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointStoreFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointStoreFactoryTest.java index 69b6f814a75a8..c0605f7e0c3e0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointStoreFactoryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointStoreFactoryTest.java @@ -20,11 +20,13 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.core.fs.Path; import org.junit.Test; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class SavepointStoreFactoryTest { @@ -61,28 +63,34 @@ public void testSavepointBackendFileSystem() throws Exception { @Test public void testSavepointBackendFileSystemButCheckpointBackendJobManager() throws Exception { Configuration config = new Configuration(); - + String rootPath = System.getProperty("java.io.tmpdir"); // This combination does not make sense, because the checkpoints will be // lost after the job manager shuts down. config.setString(ConfigConstants.STATE_BACKEND, "jobmanager"); config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "filesystem"); + config.setString(SavepointStoreFactory.SAVEPOINT_DIRECTORY_KEY, rootPath); + SavepointStore store = SavepointStoreFactory.createFromConfig(config); - assertTrue(store.getStateStore() instanceof HeapStateStore); + assertTrue(store.getStateStore() instanceof FileSystemStateStore); + + FileSystemStateStore stateStore = (FileSystemStateStore) + store.getStateStore(); + assertEquals(new Path(rootPath), stateStore.getRootPath()); } - @Test + @Test(expected = IllegalConfigurationException.class) public void testSavepointBackendFileSystemButNoDirectory() throws Exception { Configuration config = new Configuration(); config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "filesystem"); - SavepointStore store = SavepointStoreFactory.createFromConfig(config); - assertTrue(store.getStateStore() instanceof HeapStateStore); + SavepointStoreFactory.createFromConfig(config); + fail("Did not throw expected Exception"); } - @Test + @Test(expected = IllegalConfigurationException.class) public void testUnexpectedSavepointBackend() throws Exception { Configuration config = new Configuration(); config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "unexpected"); - SavepointStore store = SavepointStoreFactory.createFromConfig(config); - assertTrue(store.getStateStore() instanceof HeapStateStore); + SavepointStoreFactory.createFromConfig(config); + fail("Did not throw expected Exception"); } }