From 4d5c1753aeb1fd09c571c3a1a2b97012df164a05 Mon Sep 17 00:00:00 2001 From: zentol Date: Sat, 13 May 2017 12:56:56 +0200 Subject: [PATCH] [FLINK-6575] Disable tests on Windows that use HDFS --- .../fs/bucketing/BucketingSinkMigrationTest.java | 9 ++++++++- .../connectors/fs/bucketing/BucketingSinkTest.java | 8 +++++++- .../fs/bucketing/RollingSinkMigrationTest.java | 8 ++++++++ .../bucketing/RollingToBucketingMigrationTest.java | 8 ++++++++ .../ContinuousFileProcessingMigrationTest.java | 9 ++++++++- .../hdfstests/ContinuousFileProcessingTest.java | 8 +++++--- .../flink/hdfstests/FileStateBackendTest.java | 14 +++++++++++--- .../FsNegativeRunningJobsRegistryTest.java | 4 ++++ .../java/org/apache/flink/hdfstests/HDFSTest.java | 8 ++++++++ 9 files changed, 67 insertions(+), 9 deletions(-) diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java index f876a145014d5..bfb326c358d18 100644 --- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java +++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java @@ -30,11 +30,13 @@ import org.apache.flink.streaming.util.OperatorSnapshotUtil; import org.apache.flink.streaming.util.migration.MigrationTestUtil; import org.apache.flink.streaming.util.migration.MigrationVersion; +import org.apache.flink.util.OperatingSystem; import org.apache.commons.io.FileUtils; import org.apache.hadoop.fs.Path; - import org.junit.Assert; +import org.junit.Assume; +import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Ignore; import org.junit.Test; @@ -75,6 +77,11 @@ public class BucketingSinkMigrationTest { private static final String IN_PROGRESS_SUFFIX = ".in-progress"; private static final String VALID_LENGTH_SUFFIX = ".valid"; + @BeforeClass + public static void verifyOS() { + Assume.assumeTrue("HDFS cluster cannot be started on Windows without extensions.", !OperatingSystem.isWindows()); + } + @Parameterized.Parameters(name = "Migration Savepoint / Bucket Files Prefix: {0}") public static Collection> parameters () { return Arrays.asList( diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java index 67af91fe7f9ba..d6852efad2200 100644 --- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java +++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java @@ -33,6 +33,7 @@ import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.util.NetUtils; +import org.apache.flink.util.OperatingSystem; import org.apache.avro.Schema; import org.apache.avro.file.DataFileConstants; @@ -53,6 +54,7 @@ import org.apache.hadoop.io.Text; import org.junit.AfterClass; import org.junit.Assert; +import org.junit.Assume; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; @@ -135,6 +137,8 @@ private OneInputStreamOperatorTestHarness createTestSink( @BeforeClass public static void createHDFS() throws IOException { + Assume.assumeTrue("HDFS cluster cannot be started on Windows without extensions.", !OperatingSystem.isWindows()); + Configuration conf = new Configuration(); File dataDir = tempFolder.newFolder(); @@ -152,7 +156,9 @@ public static void createHDFS() throws IOException { @AfterClass public static void destroyHDFS() { - hdfsCluster.shutdown(); + if (hdfsCluster != null) { + hdfsCluster.shutdown(); + } } @Test diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java index 75eb685d566b3..e0413795b2aa7 100644 --- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java +++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java @@ -23,9 +23,12 @@ import org.apache.flink.streaming.connectors.fs.StringWriter; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.util.OperatingSystem; import org.apache.commons.io.FileUtils; import org.junit.Assert; +import org.junit.Assume; +import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -50,6 +53,11 @@ public class RollingSinkMigrationTest { private static final String IN_PROGRESS_SUFFIX = ".in-progress"; private static final String VALID_LENGTH_SUFFIX = ".valid"; + @BeforeClass + public static void verifyOS() { + Assume.assumeTrue("HDFS cluster cannot be started on Windows without extensions.", !OperatingSystem.isWindows()); + } + @Test public void testMigration() throws Exception { diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingToBucketingMigrationTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingToBucketingMigrationTest.java index ed4ab88d51945..8a8dbd6bc9301 100644 --- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingToBucketingMigrationTest.java +++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingToBucketingMigrationTest.java @@ -23,9 +23,12 @@ import org.apache.flink.streaming.connectors.fs.StringWriter; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.util.OperatingSystem; import org.apache.commons.io.FileUtils; import org.junit.Assert; +import org.junit.Assume; +import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -49,6 +52,11 @@ public class RollingToBucketingMigrationTest { private static final String IN_PROGRESS_SUFFIX = ".in-progress"; private static final String VALID_LENGTH_SUFFIX = ".valid"; + @BeforeClass + public static void verifyOS() { + Assume.assumeTrue("HDFS cluster cannot be started on Windows without extensions.", !OperatingSystem.isWindows()); + } + @Test public void testMigration() throws Exception { final File outDir = tempFolder.newFolder(); diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java index 0c2971c77d66a..78c57edabf953 100644 --- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java +++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java @@ -42,10 +42,12 @@ import org.apache.flink.streaming.util.OperatorSnapshotUtil; import org.apache.flink.streaming.util.migration.MigrationTestUtil; import org.apache.flink.streaming.util.migration.MigrationVersion; +import org.apache.flink.util.OperatingSystem; import org.apache.commons.io.FileUtils; - import org.junit.Assert; +import org.junit.Assume; +import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Ignore; import org.junit.Test; @@ -96,6 +98,11 @@ public ContinuousFileProcessingMigrationTest(Tuple2 migr @ClassRule public static TemporaryFolder tempFolder = new TemporaryFolder(); + @BeforeClass + public static void verifyOS() { + Assume.assumeTrue("HDFS cluster cannot be start on Windows without extensions.", !OperatingSystem.isWindows()); + } + /** * Manually run this to write binary snapshot data. Remove @Ignore to run. */ diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java index 2fc00c49b61f5..5d5a1c3d11054 100644 --- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java +++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java @@ -42,6 +42,7 @@ import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.util.OperatingSystem; import org.apache.flink.util.Preconditions; import org.apache.hadoop.fs.FSDataOutputStream; @@ -49,6 +50,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.junit.AfterClass; import org.junit.Assert; +import org.junit.Assume; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; @@ -88,6 +90,8 @@ public class ContinuousFileProcessingTest { @BeforeClass public static void createHDFS() { + Assume.assumeTrue("HDFS cluster cannot be start on Windows without extensions.", !OperatingSystem.isWindows()); + try { File hdfsDir = tempFolder.newFolder(); @@ -109,10 +113,8 @@ public static void createHDFS() { @AfterClass public static void destroyHDFS() { - try { + if (hdfsCluster != null) { hdfsCluster.shutdown(); - } catch (Throwable t) { - throw new RuntimeException(t); } } diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java index 99587291c38ed..86d0bc2e2368e 100644 --- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java +++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java @@ -29,11 +29,13 @@ import org.apache.flink.runtime.state.filesystem.FileStateHandle; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; +import org.apache.flink.util.OperatingSystem; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.junit.AfterClass; +import org.junit.Assume; import org.junit.BeforeClass; import org.junit.Test; @@ -71,6 +73,8 @@ public class FileStateBackendTest extends StateBackendTestBase { @BeforeClass public static void createHDFS() { + Assume.assumeTrue("HDFS cluster cannot be started on Windows without extensions.", !OperatingSystem.isWindows()); + try { tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()); @@ -93,10 +97,14 @@ public static void createHDFS() { @AfterClass public static void destroyHDFS() { try { - hdfsCluster.shutdown(); - FileUtils.deleteDirectory(tempDir); + if (hdfsCluster != null) { + hdfsCluster.shutdown(); + } + if (tempDir != null) { + FileUtils.deleteDirectory(tempDir); + } + } catch (IOException ignored) { } - catch (Exception ignored) {} } @Override diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FsNegativeRunningJobsRegistryTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FsNegativeRunningJobsRegistryTest.java index 6076c8fd22db9..1273a4ee9d221 100644 --- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FsNegativeRunningJobsRegistryTest.java +++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FsNegativeRunningJobsRegistryTest.java @@ -22,10 +22,12 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.highavailability.FsNegativeRunningJobsRegistry; import org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobSchedulingStatus; +import org.apache.flink.util.OperatingSystem; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.junit.AfterClass; +import org.junit.Assume; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; @@ -53,6 +55,8 @@ public class FsNegativeRunningJobsRegistryTest { @BeforeClass public static void createHDFS() throws Exception { + Assume.assumeTrue("HDFS cluster cannot be start on Windows without extensions.", !OperatingSystem.isWindows()); + final File tempDir = TEMP_DIR.newFolder(); Configuration hdConf = new Configuration(); diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java index 5f778f2f77056..c0cfb4f62d97c 100644 --- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java +++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.util.FileUtils; +import org.apache.flink.util.OperatingSystem; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; @@ -44,7 +45,9 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.junit.After; import org.junit.Assert; +import org.junit.Assume; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import java.io.File; @@ -67,6 +70,11 @@ public class HDFSTest { private org.apache.hadoop.fs.Path hdPath; protected org.apache.hadoop.fs.FileSystem hdfs; + @BeforeClass + public static void verifyOS() { + Assume.assumeTrue("HDFS cluster cannot be started on Windows without extensions.", !OperatingSystem.isWindows()); + } + @Before public void createHDFS() { try {