From e11b8118c1f95b2048a2e0e5db646100be989eea Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Tue, 9 Aug 2016 15:16:59 +0200 Subject: [PATCH 1/2] [FLINK-4340] Remove RocksDB Semi-Async Checkpoint Mode This was causing to many problems and is also incompatible with the upcoming key-group/sharding changes. --- .../streaming/state/RocksDBStateBackend.java | 264 +----------------- .../FullyAsyncRocksDBStateBackendTest.java | 65 ----- .../state/RocksDBAsyncKVSnapshotTest.java | 1 - 3 files changed, 2 insertions(+), 328 deletions(-) delete mode 100644 flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/FullyAsyncRocksDBStateBackendTest.java diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index 9496d12c42283..476699c805320 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -50,7 +50,6 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.runtime.execution.Environment; -import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.AsynchronousKvStateSnapshot; import org.apache.flink.runtime.state.KvState; @@ -59,20 +58,12 @@ import org.apache.flink.api.common.state.StateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.util.SerializableObject; -import org.apache.flink.streaming.util.HDFSCopyFromLocal; -import org.apache.flink.streaming.util.HDFSCopyToLocal; -import org.apache.hadoop.fs.FileSystem; - -import org.rocksdb.BackupEngine; -import org.rocksdb.BackupableDBOptions; import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.DBOptions; -import org.rocksdb.Env; import org.rocksdb.ReadOptions; -import org.rocksdb.RestoreOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import org.rocksdb.RocksIterator; @@ -110,9 +101,6 @@ public class RocksDBStateBackend extends AbstractStateBackend { /** The state backend that stores the non-partitioned state */ private final AbstractStateBackend nonPartitionedStateBackend; - /** Whether we do snapshots fully asynchronous */ - private boolean fullyAsyncBackup = false; - /** Operator identifier that is used to uniqueify the RocksDB storage path. */ private String operatorIdentifier; @@ -389,70 +377,6 @@ public File[] getStoragePaths() { return new HashMap<>(); } - if (fullyAsyncBackup) { - return performFullyAsyncSnapshot(checkpointId, timestamp); - } else { - return performSemiAsyncSnapshot(checkpointId, timestamp); - } - } - - /** - * Performs a checkpoint by using the RocksDB backup feature to backup to a directory. - * This backup is the asynchronously copied to the final checkpoint location. - */ - private HashMap> performSemiAsyncSnapshot(long checkpointId, long timestamp) throws Exception { - // We don't snapshot individual k/v states since everything is stored in a central - // RocksDB data base. Create a dummy KvStateSnapshot that holds the information about - // that checkpoint. We use the in injectKeyValueStateSnapshots to restore. - - final File localBackupPath = new File(instanceBasePath, "local-chk-" + checkpointId); - final URI backupUri = new URI(instanceCheckpointPath + "/chk-" + checkpointId); - - if (!localBackupPath.exists()) { - if (!localBackupPath.mkdirs()) { - throw new RuntimeException("Could not create local backup path " + localBackupPath); - } - } - - long startTime = System.currentTimeMillis(); - - BackupableDBOptions backupOptions = new BackupableDBOptions(localBackupPath.getAbsolutePath()); - // we disabled the WAL - backupOptions.setBackupLogFiles(false); - // no need to sync since we use the backup only as intermediate data before writing to FileSystem snapshot - backupOptions.setSync(false); - - try (BackupEngine backupEngine = BackupEngine.open(Env.getDefault(), backupOptions)) { - // wait before flush with "true" - backupEngine.createNewBackup(db, true); - } - - long endTime = System.currentTimeMillis(); - LOG.info("RocksDB (" + instanceRocksDBPath + ") backup (synchronous part) took " + (endTime - startTime) + " ms."); - - // draw a copy in case it get's changed while performing the async snapshot - List kvStateInformationCopy = new ArrayList<>(); - for (Tuple2 state: kvStateInformation.values()) { - kvStateInformationCopy.add(state.f1); - } - SemiAsyncSnapshot dummySnapshot = new SemiAsyncSnapshot(localBackupPath, - backupUri, - kvStateInformationCopy, - checkpointId); - - - HashMap> result = new HashMap<>(); - result.put("dummy_state", dummySnapshot); - return result; - } - - /** - * Performs a checkpoint by drawing a {@link org.rocksdb.Snapshot} from RocksDB and then - * iterating over all key/value pairs in RocksDB to store them in the final checkpoint - * location. The only synchronous part is the drawing of the {@code Snapshot} which - * is essentially free. - */ - private HashMap> performFullyAsyncSnapshot(long checkpointId, long timestamp) throws Exception { // we draw a snapshot from RocksDB then iterate over all keys at that point // and store them in the backup location @@ -477,6 +401,7 @@ public File[] getStoragePaths() { HashMap> result = new HashMap<>(); result.put("dummy_state", dummySnapshot); + return result; } @@ -487,86 +412,13 @@ public final void injectKeyValueStateSnapshots(HashMap } KvStateSnapshot dummyState = keyValueStateSnapshots.get("dummy_state"); - if (dummyState instanceof FinalSemiAsyncSnapshot) { - restoreFromSemiAsyncSnapshot((FinalSemiAsyncSnapshot) dummyState); - } else if (dummyState instanceof FinalFullyAsyncSnapshot) { + if (dummyState instanceof FinalFullyAsyncSnapshot) { restoreFromFullyAsyncSnapshot((FinalFullyAsyncSnapshot) dummyState); } else { throw new RuntimeException("Unknown RocksDB snapshot: " + dummyState); } } - private void restoreFromSemiAsyncSnapshot(FinalSemiAsyncSnapshot snapshot) throws Exception { - // This does mostly the same work as initializeForJob, we remove the existing RocksDB - // directory and create a new one from the backup. - // This must be refactored. The StateBackend should either be initialized from - // scratch or from a snapshot. - - if (!instanceBasePath.exists()) { - if (!instanceBasePath.mkdirs()) { - throw new RuntimeException("Could not create RocksDB data directory."); - } - } - - db.dispose(); - - // clean it, this will remove the last part of the path but RocksDB will recreate it - try { - if (instanceRocksDBPath.exists()) { - LOG.warn("Deleting already existing db directory {}.", instanceRocksDBPath); - FileUtils.deleteDirectory(instanceRocksDBPath); - } - } catch (IOException e) { - throw new RuntimeException("Error cleaning RocksDB data directory.", e); - } - - final File localBackupPath = new File(instanceBasePath, "chk-" + snapshot.checkpointId); - - if (localBackupPath.exists()) { - try { - LOG.warn("Deleting already existing local backup directory {}.", localBackupPath); - FileUtils.deleteDirectory(localBackupPath); - } catch (IOException e) { - throw new RuntimeException("Error cleaning RocksDB local backup directory.", e); - } - } - - HDFSCopyToLocal.copyToLocal(snapshot.backupUri, instanceBasePath); - - try (BackupEngine backupEngine = BackupEngine.open(Env.getDefault(), new BackupableDBOptions(localBackupPath.getAbsolutePath()))) { - backupEngine.restoreDbFromLatestBackup(instanceRocksDBPath.getAbsolutePath(), instanceRocksDBPath.getAbsolutePath(), new RestoreOptions(true)); - } catch (RocksDBException|IllegalArgumentException e) { - throw new RuntimeException("Error while restoring RocksDB state from " + localBackupPath, e); - } finally { - try { - FileUtils.deleteDirectory(localBackupPath); - } catch (IOException e) { - LOG.error("Error cleaning up local restore directory " + localBackupPath, e); - } - } - - - List columnFamilyDescriptors = new ArrayList<>(snapshot.stateDescriptors.size()); - for (StateDescriptor stateDescriptor: snapshot.stateDescriptors) { - columnFamilyDescriptors.add(new ColumnFamilyDescriptor(stateDescriptor.getName().getBytes(), getColumnOptions())); - } - - // RocksDB seems to need this... - columnFamilyDescriptors.add(new ColumnFamilyDescriptor("default".getBytes())); - List columnFamilyHandles = new ArrayList<>(snapshot.stateDescriptors.size()); - try { - - db = RocksDB.open(getDbOptions(), instanceRocksDBPath.getAbsolutePath(), columnFamilyDescriptors, columnFamilyHandles); - this.kvStateInformation = new HashMap<>(); - for (int i = 0; i < snapshot.stateDescriptors.size(); i++) { - this.kvStateInformation.put(snapshot.stateDescriptors.get(i).getName(), new Tuple2<>(columnFamilyHandles.get(i), snapshot.stateDescriptors.get(i))); - } - - } catch (RocksDBException e) { - throw new RuntimeException("Error while opening RocksDB instance.", e); - } - } - private void restoreFromFullyAsyncSnapshot(FinalFullyAsyncSnapshot snapshot) throws Exception { DataInputView inputView = snapshot.stateHandle.getState(userCodeClassLoader); @@ -604,96 +456,6 @@ private void restoreFromFullyAsyncSnapshot(FinalFullyAsyncSnapshot snapshot) thr } } - // ------------------------------------------------------------------------ - // Semi-asynchronous Backup Classes - // ------------------------------------------------------------------------ - - /** - * Upon snapshotting the RocksDB backup is created synchronously. The asynchronous part is - * copying the backup to a (possibly) remote filesystem. This is done in {@link #materialize()}. - */ - private static class SemiAsyncSnapshot extends AsynchronousKvStateSnapshot, ValueStateDescriptor, RocksDBStateBackend> { - private static final long serialVersionUID = 1L; - private final File localBackupPath; - private final URI backupUri; - private final List stateDescriptors; - private final long checkpointId; - - private SemiAsyncSnapshot(File localBackupPath, - URI backupUri, - List columnFamilies, - long checkpointId) { - this.localBackupPath = localBackupPath; - this.backupUri = backupUri; - this.stateDescriptors = columnFamilies; - this.checkpointId = checkpointId; - } - - @Override - public KvStateSnapshot, ValueStateDescriptor, RocksDBStateBackend> materialize() throws Exception { - try { - long startTime = System.currentTimeMillis(); - HDFSCopyFromLocal.copyFromLocal(localBackupPath, backupUri); - long endTime = System.currentTimeMillis(); - LOG.info("RocksDB materialization from " + localBackupPath + " to " + backupUri + " (asynchronous part) took " + (endTime - startTime) + " ms."); - return new FinalSemiAsyncSnapshot(backupUri, checkpointId, stateDescriptors); - } catch (Exception e) { - FileSystem fs = FileSystem.get(backupUri, HadoopFileSystem.getHadoopConfiguration()); - fs.delete(new org.apache.hadoop.fs.Path(backupUri), true); - throw e; - } finally { - FileUtils.deleteQuietly(localBackupPath); - } - } - } - - /** - * Dummy {@link KvStateSnapshot} that holds the state of our one RocksDB data base. This - * also stores the column families that we had at the time of the snapshot so that we can - * restore these. This results from {@link SemiAsyncSnapshot}. - */ - private static class FinalSemiAsyncSnapshot implements KvStateSnapshot, ValueStateDescriptor, RocksDBStateBackend> { - private static final long serialVersionUID = 1L; - - final URI backupUri; - final long checkpointId; - private final List stateDescriptors; - - /** - * Creates a new snapshot from the given state parameters. - */ - private FinalSemiAsyncSnapshot(URI backupUri, long checkpointId, List stateDescriptors) { - this.backupUri = backupUri; - this.checkpointId = checkpointId; - this.stateDescriptors = stateDescriptors; - } - - @Override - public final KvState, ValueStateDescriptor, RocksDBStateBackend> restoreState( - RocksDBStateBackend stateBackend, - TypeSerializer keySerializer, - ClassLoader classLoader) throws Exception { - throw new RuntimeException("Should never happen."); - } - - @Override - public final void discardState() throws Exception { - FileSystem fs = FileSystem.get(backupUri, HadoopFileSystem.getHadoopConfiguration()); - fs.delete(new org.apache.hadoop.fs.Path(backupUri), true); - } - - @Override - public final long getStateSize() throws Exception { - FileSystem fs = FileSystem.get(backupUri, HadoopFileSystem.getHadoopConfiguration()); - return fs.getContentSummary(new org.apache.hadoop.fs.Path(backupUri)).getLength(); - } - - @Override - public void close() throws IOException { - // cannot do much here - } - } - // ------------------------------------------------------------------------ // Fully asynchronous Backup Classes // ------------------------------------------------------------------------ @@ -940,28 +702,6 @@ public StateHandle checkpointStateSerializable( // Parameters // ------------------------------------------------------------------------ - /** - * Enables fully asynchronous snapshotting of the partitioned state held in RocksDB. - * - *

By default, this is disabled. This means that RocksDB state is copied in a synchronous - * step, during which normal processing of elements pauses, followed by an asynchronous step - * of copying the RocksDB backup to the final checkpoint location. Fully asynchronous - * snapshots take longer (linear time requirement with respect to number of unique keys) - * but normal processing of elements is not paused. - */ - public void enableFullyAsyncSnapshots() { - this.fullyAsyncBackup = true; - } - - /** - * Disables fully asynchronous snapshotting of the partitioned state held in RocksDB. - * - *

By default, this is disabled. - */ - public void disableFullyAsyncSnapshots() { - this.fullyAsyncBackup = false; - } - /** * Sets the path where the RocksDB local database files should be stored on the local * file system. Setting this path overrides the default behavior, where the diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/FullyAsyncRocksDBStateBackendTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/FullyAsyncRocksDBStateBackendTest.java deleted file mode 100644 index 7861542521657..0000000000000 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/FullyAsyncRocksDBStateBackendTest.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.contrib.streaming.state; - -import org.apache.commons.io.FileUtils; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.runtime.state.StateBackendTestBase; -import org.apache.flink.runtime.state.memory.MemoryStateBackend; -import org.apache.flink.util.OperatingSystem; -import org.junit.Assume; -import org.junit.Before; - -import java.io.File; -import java.io.IOException; -import java.util.UUID; - -/** - * Tests for the partitioned state part of {@link RocksDBStateBackend} with fully asynchronous - * checkpointing enabled. - */ -public class FullyAsyncRocksDBStateBackendTest extends StateBackendTestBase { - - private File dbDir; - private File chkDir; - - @Before - public void checkOperatingSystem() { - Assume.assumeTrue("This test can't run successfully on Windows.", !OperatingSystem.isWindows()); - } - - @Override - protected RocksDBStateBackend getStateBackend() throws IOException { - dbDir = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()), "state"); - chkDir = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()), "snapshots"); - - RocksDBStateBackend backend = new RocksDBStateBackend(chkDir.getAbsoluteFile().toURI(), new MemoryStateBackend()); - backend.setDbStoragePath(dbDir.getAbsolutePath()); - backend.enableFullyAsyncSnapshots(); - return backend; - } - - @Override - protected void cleanup() { - try { - FileUtils.deleteDirectory(dbDir); - FileUtils.deleteDirectory(chkDir); - } catch (IOException ignore) {} - } -} diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncKVSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncKVSnapshotTest.java index 7118cf6beeaf9..66bba0cb837b2 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncKVSnapshotTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncKVSnapshotTest.java @@ -214,7 +214,6 @@ public String getKey(String value) throws Exception { RocksDBStateBackend backend = new RocksDBStateBackend(chkDir.getAbsoluteFile().toURI(), new MemoryStateBackend()); backend.setDbStoragePath(dbDir.getAbsolutePath()); - backend.enableFullyAsyncSnapshots(); streamConfig.setStateBackend(backend); From 8bf3faa7d134b2662078387419905df1b268c0fd Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Tue, 9 Aug 2016 16:16:36 +0200 Subject: [PATCH 2/2] Address comments and fix test --- .../streaming/util/HDFSCopyFromLocal.java | 65 ------------ .../flink/streaming/util/HDFSCopyToLocal.java | 63 ------------ .../streaming/util/HDFSCopyUtilitiesTest.java | 99 ------------------- .../EventTimeWindowCheckpointingITCase.java | 12 +-- 4 files changed, 1 insertion(+), 238 deletions(-) delete mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/util/HDFSCopyFromLocal.java delete mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/util/HDFSCopyToLocal.java delete mode 100644 flink-streaming-java/src/test/java/org/apache/flink/streaming/util/HDFSCopyUtilitiesTest.java diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/HDFSCopyFromLocal.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/HDFSCopyFromLocal.java deleted file mode 100644 index 65b278a0d39a5..0000000000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/HDFSCopyFromLocal.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.util; - -import org.apache.flink.api.java.tuple.Tuple1; -import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -import java.io.File; -import java.net.URI; - -/** - * Utility for copying from local file system to a HDFS {@link FileSystem}. - */ -public class HDFSCopyFromLocal { - - public static void copyFromLocal(final File localPath, - final URI remotePath) throws Exception { - // Do it in another Thread because HDFS can deadlock if being interrupted while copying - String threadName = "HDFS Copy from " + localPath + " to " + remotePath; - - final Tuple1 asyncException = Tuple1.of(null); - - Thread copyThread = new Thread(threadName) { - @Override - public void run() { - try { - Configuration hadoopConf = HadoopFileSystem.getHadoopConfiguration(); - - FileSystem fs = FileSystem.get(remotePath, hadoopConf); - - fs.copyFromLocalFile(new Path(localPath.getAbsolutePath()), - new Path(remotePath)); - } catch (Exception t) { - asyncException.f0 = t; - } - } - }; - - copyThread.setDaemon(true); - copyThread.start(); - copyThread.join(); - - if (asyncException.f0 != null) { - throw asyncException.f0; - } - } -} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/HDFSCopyToLocal.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/HDFSCopyToLocal.java deleted file mode 100644 index 7dfe7272b25f3..0000000000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/HDFSCopyToLocal.java +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.util; - -import org.apache.flink.api.java.tuple.Tuple1; -import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -import java.io.File; -import java.net.URI; - -/** - * Utility for copying from a HDFS {@link FileSystem} to the local file system. - */ -public class HDFSCopyToLocal { - - public static void copyToLocal(final URI remotePath, - final File localPath) throws Exception { - // Do it in another Thread because HDFS can deadlock if being interrupted while copying - String threadName = "HDFS Copy from " + remotePath + " to " + localPath; - - final Tuple1 asyncException = Tuple1.of(null); - - Thread copyThread = new Thread(threadName) { - @Override - public void run() { - try { - Configuration hadoopConf = HadoopFileSystem.getHadoopConfiguration(); - - FileSystem fs = FileSystem.get(remotePath, hadoopConf); - fs.copyToLocalFile(new Path(remotePath), new Path(localPath.getAbsolutePath())); - } catch (Exception t) { - asyncException.f0 = t; - } - } - }; - - copyThread.setDaemon(true); - copyThread.start(); - copyThread.join(); - - if (asyncException.f0 != null) { - throw asyncException.f0; - } - } -} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/HDFSCopyUtilitiesTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/HDFSCopyUtilitiesTest.java deleted file mode 100644 index f16750d8f184f..0000000000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/HDFSCopyUtilitiesTest.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.util; - -import org.apache.flink.core.fs.Path; -import org.apache.flink.util.OperatingSystem; -import org.junit.Assume; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; - -import static org.junit.Assert.assertTrue; - -public class HDFSCopyUtilitiesTest { - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void checkOperatingSystem() { - Assume.assumeTrue("This test can't run successfully on Windows.", !OperatingSystem.isWindows()); - } - - - /** - * This test verifies that a hadoop configuration is correctly read in the external - * process copying tools. - */ - @Test - public void testCopyFromLocal() throws Exception { - - File testFolder = tempFolder.newFolder(); - - File originalFile = new File(testFolder, "original"); - File copyFile = new File(testFolder, "copy"); - - try (DataOutputStream out = new DataOutputStream(new FileOutputStream(originalFile))) { - out.writeUTF("Hello there, 42!"); - } - - HDFSCopyFromLocal.copyFromLocal( - originalFile, - new Path("file://" + copyFile.getAbsolutePath()).toUri()); - - try (DataInputStream in = new DataInputStream(new FileInputStream(copyFile))) { - assertTrue(in.readUTF().equals("Hello there, 42!")); - - } - } - - /** - * This test verifies that a hadoop configuration is correctly read in the external - * process copying tools. - */ - @Test - public void testCopyToLocal() throws Exception { - - File testFolder = tempFolder.newFolder(); - - File originalFile = new File(testFolder, "original"); - File copyFile = new File(testFolder, "copy"); - - try (DataOutputStream out = new DataOutputStream(new FileOutputStream(originalFile))) { - out.writeUTF("Hello there, 42!"); - } - - HDFSCopyToLocal.copyToLocal( - new Path("file://" + originalFile.getAbsolutePath()).toUri(), - copyFile); - - try (DataInputStream in = new DataInputStream(new FileInputStream(copyFile))) { - assertTrue(in.readUTF().equals("Hello there, 42!")); - - } - } - -} diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java index 199a6af985256..2f954ea96d15c 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java @@ -116,20 +116,11 @@ public void initStateBackend() throws IOException { this.stateBackend = new FsStateBackend("file://" + backups); break; } - case ROCKSDB: { - String rocksDb = tempFolder.newFolder().getAbsolutePath(); - String rocksDbBackups = tempFolder.newFolder().toURI().toString(); - RocksDBStateBackend rdb = new RocksDBStateBackend(rocksDbBackups, new MemoryStateBackend()); - rdb.setDbStoragePath(rocksDb); - this.stateBackend = rdb; - break; - } case ROCKSDB_FULLY_ASYNC: { String rocksDb = tempFolder.newFolder().getAbsolutePath(); String rocksDbBackups = tempFolder.newFolder().toURI().toString(); RocksDBStateBackend rdb = new RocksDBStateBackend(rocksDbBackups, new MemoryStateBackend()); rdb.setDbStoragePath(rocksDb); - rdb.enableFullyAsyncSnapshots(); this.stateBackend = rdb; break; } @@ -772,14 +763,13 @@ public static Collection parameters(){ return Arrays.asList(new Object[][] { {StateBackendEnum.MEM}, {StateBackendEnum.FILE}, - {StateBackendEnum.ROCKSDB}, {StateBackendEnum.ROCKSDB_FULLY_ASYNC} } ); } private enum StateBackendEnum { - MEM, FILE, ROCKSDB, ROCKSDB_FULLY_ASYNC + MEM, FILE, ROCKSDB_FULLY_ASYNC }