From 418833001fe6dd581f42f7fcc3c35ef36f292007 Mon Sep 17 00:00:00 2001 From: Romain manni-Bucau Date: Sun, 19 Jun 2016 21:19:57 +0200 Subject: [PATCH 1/2] fixing build on windows --- .../beam/runners/flink/WriteSinkITCase.java | 13 + .../runners/spark/SimpleWordCountTest.java | 8 + .../runners/spark/io/AvroPipelineTest.java | 7 + .../beam/runners/spark/io/NumShardsTest.java | 7 + .../hadoop/HadoopFileFormatPipelineTest.java | 7 + .../translation/TransformTranslatorTest.java | 7 + .../src/main/resources/beam/checkstyle.xml | 4 +- .../org/apache/beam/sdk/io/FileBasedSink.java | 7 +- .../beam/sdk/testing/HadoopWorkarounds.java | 129 +++++++++ sdks/java/io/hdfs/pom.xml | 9 + .../beam/sdk/io/hdfs/HDFSFileSourceTest.java | 264 +++++++++--------- sdks/java/maven-archetypes/starter/pom.xml | 3 + 12 files changed, 334 insertions(+), 131 deletions(-) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/testing/HadoopWorkarounds.java diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java index 36d3aef1424d..1a5635057254 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java @@ -35,6 +35,7 @@ import org.apache.flink.test.util.JavaProgramTestBase; import java.io.File; +import java.io.IOException; import java.io.PrintWriter; import java.net.URI; @@ -75,6 +76,18 @@ private static void runProgram(String resultPath) { p.run(); } + + @Override + public void stopCluster() throws Exception { + try { + super.stopCluster(); + } catch (final IOException ioe) { + if (ioe.getMessage().startsWith("Unable to delete file")) { + // that's ok for the test itself, just the OS playing with us on cleanup phase + } + } + } + /** * Simple custom sink which writes to a file. */ diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java index 2b4464db303a..4980995bb342 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java @@ -25,6 +25,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.HadoopWorkarounds; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Count; @@ -40,11 +41,13 @@ import com.google.common.collect.Sets; import org.apache.commons.io.FileUtils; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; import java.io.File; +import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.Set; @@ -61,6 +64,11 @@ public class SimpleWordCountTest { private static final Set EXPECTED_COUNT_SET = ImmutableSet.of("hi: 5", "there: 1", "sue: 2", "bob: 2"); + @BeforeClass + public static void initWin() throws IOException { + HadoopWorkarounds.winTests(); + } + @Test public void testInMem() throws Exception { SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java index f3588788974b..f6d0d556c9f7 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java @@ -25,6 +25,7 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.AvroIO; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.HadoopWorkarounds; import org.apache.beam.sdk.values.PCollection; import com.google.common.collect.Lists; @@ -38,6 +39,7 @@ import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -58,6 +60,11 @@ public class AvroPipelineTest { @Rule public final TemporaryFolder tmpDir = new TemporaryFolder(); + @BeforeClass + public static void initWin() throws IOException { + HadoopWorkarounds.winTests(); + } + @Before public void setUp() throws IOException { inputFile = tmpDir.newFile("test.avro"); diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java index 23d45921df7c..8a864c4ff850 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java @@ -29,6 +29,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.HadoopWorkarounds; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.values.PCollection; @@ -38,6 +39,7 @@ import com.google.common.io.Files; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -64,6 +66,11 @@ public class NumShardsTest { @Rule public final TemporaryFolder tmpDir = new TemporaryFolder(); + @BeforeClass + public static void initWin() throws IOException { + HadoopWorkarounds.winTests(); + } + @Before public void setUp() throws IOException { outputDir = tmpDir.newFolder("out"); diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java index eaa508c45f58..767682e2d7ab 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java @@ -26,6 +26,7 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.HadoopWorkarounds; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -40,6 +41,7 @@ import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -58,6 +60,11 @@ public class HadoopFileFormatPipelineTest { @Rule public final TemporaryFolder tmpDir = new TemporaryFolder(); + @BeforeClass + public static void initWin() throws IOException { + HadoopWorkarounds.winTests(); + } + @Before public void setUp() throws IOException { inputFile = tmpDir.newFile("test.seq"); diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java index b59331650b82..fec0dc92b6c1 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java @@ -28,10 +28,12 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.testing.HadoopWorkarounds; import org.apache.beam.sdk.values.PCollection; import com.google.common.base.Charsets; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -51,6 +53,11 @@ public class TransformTranslatorTest { @Rule public TemporaryFolder tmp = new TemporaryFolder(); + @BeforeClass + public static void initWin() throws IOException { + HadoopWorkarounds.winTests(); + } + /** * Builds a simple pipeline with TextIO.Read and TextIO.Write, runs the pipeline * in DirectRunner and on SparkPipelineRunner, with the mapped dataflow-to-spark diff --git a/sdks/java/build-tools/src/main/resources/beam/checkstyle.xml b/sdks/java/build-tools/src/main/resources/beam/checkstyle.xml index 311f599df859..457675aaa6a3 100644 --- a/sdks/java/build-tools/src/main/resources/beam/checkstyle.xml +++ b/sdks/java/build-tools/src/main/resources/beam/checkstyle.xml @@ -29,7 +29,9 @@ page at http://checkstyle.sourceforge.net/config.html --> - + + + diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java index 521f54b55ffc..045d6add91d9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java @@ -35,6 +35,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.io.IOException; import java.io.Serializable; import java.nio.channels.WritableByteChannel; @@ -645,7 +646,11 @@ public void copy(List srcFilenames, List destFilenames) throws I private void copyOne(String source, String destination) throws IOException { try { // Copy the source file, replacing the existing destination. - Files.copy(Paths.get(source), Paths.get(destination), StandardCopyOption.REPLACE_EXISTING); + // Paths.get(x) will not work on win cause of the ":" after the drive letter + Files.copy( + new File(source).toPath(), + new File(destination).toPath(), + StandardCopyOption.REPLACE_EXISTING); } catch (NoSuchFileException e) { LOG.debug("{} does not exist.", source); // Suppress exception if file does not exist. diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/HadoopWorkarounds.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/HadoopWorkarounds.java new file mode 100644 index 000000000000..ee2e1359cf2e --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/HadoopWorkarounds.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.testing; + +import org.apache.commons.compress.utils.IOUtils; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.MalformedURLException; +import java.net.URL; + +/** + * A simple class ensure winutils.exe can be found in the JVM. + */ +public class HadoopWorkarounds { + /** + * In practise this method only needs to be called once by JVM + * since hadoop uses static variables to store it. + * + * Note: ensure invocation is done before hadoop reads it + * and ensure this folder survives tests + * (avoid temporary folder usage since tests can share it). + * + * @param hadoopHome where to fake hadoop home. + */ + public static void win(final File hadoopHome) { + // if (Shell.osType != Shell.OSType.OS_TYPE_WIN) { // don't do that to not load Shell yet + if (!System.getProperty("os.name", "").startsWith("Windows") + || System.getProperty("hadoop.home.dir") != null) { + return; + } + + // hadoop doesn't have winutils.exe :(: https://issues.apache.org/jira/browse/HADOOP-10051 + // so use this github repo temporarly then just use the main tar.gz + /* + String hadoopVersion = VersionInfo.getVersion(); + final URL url = new URL("https://archive.apache.org/dist/hadoop/common/ + hadoop-" + hadoopVersion + "/hadoop-" + hadoopVersion + ".tar.gz"); + final File hadoopTar = tmpFolder.newFile(); + try (final InputStream is = new GZIPInputStream(url.openStream()); + final OutputStream os = new FileOutputStream(hadoopTar)) { + System.out.println("Downloading Hadoop in " + hadoopTar + ", " + + "this can take a while, if you have it locally " + + "maybe set \"hadoop.home.dir\" system property"); + IOUtils.copyLarge(is, os, new byte[1024 * 1024]); + } + + final File hadoopHome = tmpFolder.newFolder(); + try (final ArchiveInputStream stream = new TarArchiveInputStream( + new FileInputStream(hadoopTar))) { + ArchiveEntry entry; + while ((entry = stream.getNextEntry()) != null) { + if (entry.isDirectory()) { + FileUtils.forceMkdir(new File(hadoopHome, entry.getName())); + continue; + } + final File out = new File(hadoopHome, entry.getName()); + FileUtils.forceMkdir(out.getParentFile()); + try (final OutputStream os = new FileOutputStream(out)) { + IOUtils.copy(stream, os); + } + } + } + + final String hadoopRoot = "hadoop-" + hadoopVersion; + final File[] files = hadoopHome.listFiles(new FileFilter() { + @Override + public boolean accept(final File pathname) { + return pathname.isDirectory() && pathname.getName().equals(hadoopRoot); + } + }); + if (files == null || files.length != 1) { + throw new IllegalStateException("Didn't find hadoop in " + hadoopHome); + } + System.setProperty("hadoop.home.dir", files[0].getAbsolutePath()); + */ + + System.out.println("You are on windows (sorry) and you don't set " + + "-Dhadoop.home.dir so we'll download winutils.exe"); + + new File(hadoopHome, "bin").mkdirs(); + final URL url; + try { + url = new URL("https://github.com/steveloughran/winutils/" + + "raw/master/hadoop-2.7.1/bin/winutils.exe"); + } catch (final MalformedURLException e) { // unlikely + throw new IllegalArgumentException(e); + } + try { + try (final InputStream is = url.openStream(); + final OutputStream os = new FileOutputStream( + new File(hadoopHome, "bin/winutils.exe"))) { + try { + IOUtils.copy(is, os, 1024 * 1024); + } catch (final IOException e) { + throw new IllegalStateException(e); + } + } + } catch (final IOException e) { + throw new IllegalStateException(e); + } + System.setProperty("hadoop.home.dir", hadoopHome.getAbsolutePath()); + } + + /** + * Just a convenient win(File) invocation for tests. + */ + public static void winTests() { + win(new File("target/hadoop-win")); + } +} diff --git a/sdks/java/io/hdfs/pom.xml b/sdks/java/io/hdfs/pom.xml index 9c307922e2e5..f8e3c142ec97 100644 --- a/sdks/java/io/hdfs/pom.xml +++ b/sdks/java/io/hdfs/pom.xml @@ -83,5 +83,14 @@ junit test + + diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java index 67df7bcb4bb4..2ce1af7ae659 100644 --- a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java +++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java @@ -28,9 +28,9 @@ import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.HadoopWorkarounds; import org.apache.beam.sdk.testing.SourceTestUtils; import org.apache.beam.sdk.values.KV; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; @@ -38,6 +38,7 @@ import org.apache.hadoop.io.SequenceFile.Writer; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -53,138 +54,143 @@ */ public class HDFSFileSourceTest { - Random random = new Random(0L); - - @Rule - public TemporaryFolder tmpFolder = new TemporaryFolder(); - - @Test - public void testFullyReadSingleFile() throws Exception { - PipelineOptions options = PipelineOptionsFactory.create(); - List> expectedResults = createRandomRecords(3, 10, 0); - File file = createFileWithData("tmp.seq", expectedResults); - - HDFSFileSource source = - HDFSFileSource.from(file.toString(), SequenceFileInputFormat.class, - IntWritable.class, Text.class); - - assertEquals(file.length(), source.getEstimatedSizeBytes(null)); - - assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray())); - } - - @Test - public void testFullyReadFilePattern() throws IOException { - PipelineOptions options = PipelineOptionsFactory.create(); - List> data1 = createRandomRecords(3, 10, 0); - File file1 = createFileWithData("file1", data1); - - List> data2 = createRandomRecords(3, 10, 10); - createFileWithData("file2", data2); - - List> data3 = createRandomRecords(3, 10, 20); - createFileWithData("file3", data3); - - List> data4 = createRandomRecords(3, 10, 30); - createFileWithData("otherfile", data4); - - HDFSFileSource source = - HDFSFileSource.from(new File(file1.getParent(), "file*").toString(), - SequenceFileInputFormat.class, IntWritable.class, Text.class); - List> expectedResults = new ArrayList<>(); - expectedResults.addAll(data1); - expectedResults.addAll(data2); - expectedResults.addAll(data3); - assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray())); - } - - @Test - public void testCloseUnstartedFilePatternReader() throws IOException { - PipelineOptions options = PipelineOptionsFactory.create(); - List> data1 = createRandomRecords(3, 10, 0); - File file1 = createFileWithData("file1", data1); - - List> data2 = createRandomRecords(3, 10, 10); - createFileWithData("file2", data2); - - List> data3 = createRandomRecords(3, 10, 20); - createFileWithData("file3", data3); - - List> data4 = createRandomRecords(3, 10, 30); - createFileWithData("otherfile", data4); - - HDFSFileSource source = - HDFSFileSource.from(new File(file1.getParent(), "file*").toString(), - SequenceFileInputFormat.class, IntWritable.class, Text.class); - Source.Reader> reader = source.createReader(options); - // Closing an unstarted FilePatternReader should not throw an exception. - try { - reader.close(); - } catch (Exception e) { - fail("Closing an unstarted FilePatternReader should not throw an exception"); + Random random = new Random(0L); + + @Rule + public final TemporaryFolder tmpFolder = new TemporaryFolder(); + + @BeforeClass + public static void setUpOnWinWithMissingHadoopHome() throws IOException { + HadoopWorkarounds.winTests(); + } + + @Test + public void testFullyReadSingleFile() throws Exception { + PipelineOptions options = PipelineOptionsFactory.create(); + List> expectedResults = createRandomRecords(3, 10, 0); + File file = createFileWithData("tmp.seq", expectedResults); + + HDFSFileSource source = + HDFSFileSource.from(file.toString(), SequenceFileInputFormat.class, + IntWritable.class, Text.class); + + assertEquals(file.length(), source.getEstimatedSizeBytes(null)); + + assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray())); } - } - - @Test - public void testSplits() throws Exception { - PipelineOptions options = PipelineOptionsFactory.create(); - - List> expectedResults = createRandomRecords(3, 10000, 0); - File file = createFileWithData("tmp.avro", expectedResults); - - HDFSFileSource source = - HDFSFileSource.from(file.toString(), SequenceFileInputFormat.class, - IntWritable.class, Text.class); - - // Assert that the source produces the expected records - assertEquals(expectedResults, readFromSource(source, options)); - - // Split with a small bundle size (has to be at least size of sync interval) - List>> splits = source - .splitIntoBundles(SequenceFile.SYNC_INTERVAL, options); - assertTrue(splits.size() > 2); - SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options); - int nonEmptySplits = 0; - for (BoundedSource> subSource : splits) { - if (readFromSource(subSource, options).size() > 0) { - nonEmptySplits += 1; - } + + @Test + public void testFullyReadFilePattern() throws IOException { + PipelineOptions options = PipelineOptionsFactory.create(); + List> data1 = createRandomRecords(3, 10, 0); + File file1 = createFileWithData("file1", data1); + + List> data2 = createRandomRecords(3, 10, 10); + createFileWithData("file2", data2); + + List> data3 = createRandomRecords(3, 10, 20); + createFileWithData("file3", data3); + + List> data4 = createRandomRecords(3, 10, 30); + createFileWithData("otherfile", data4); + + HDFSFileSource source = + HDFSFileSource.from(new File(file1.getParent(), "file*").toString(), + SequenceFileInputFormat.class, IntWritable.class, Text.class); + List> expectedResults = new ArrayList<>(); + expectedResults.addAll(data1); + expectedResults.addAll(data2); + expectedResults.addAll(data3); + assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray())); + } + + @Test + public void testCloseUnstartedFilePatternReader() throws IOException { + PipelineOptions options = PipelineOptionsFactory.create(); + List> data1 = createRandomRecords(3, 10, 0); + File file1 = createFileWithData("file1", data1); + + List> data2 = createRandomRecords(3, 10, 10); + createFileWithData("file2", data2); + + List> data3 = createRandomRecords(3, 10, 20); + createFileWithData("file3", data3); + + List> data4 = createRandomRecords(3, 10, 30); + createFileWithData("otherfile", data4); + + HDFSFileSource source = + HDFSFileSource.from(new File(file1.getParent(), "file*").toString(), + SequenceFileInputFormat.class, IntWritable.class, Text.class); + Source.Reader> reader = source.createReader(options); + // Closing an unstarted FilePatternReader should not throw an exception. + try { + reader.close(); + } catch (Exception e) { + fail("Closing an unstarted FilePatternReader should not throw an exception"); + } + } + + @Test + public void testSplits() throws Exception { + PipelineOptions options = PipelineOptionsFactory.create(); + + List> expectedResults = createRandomRecords(3, 10000, 0); + File file = createFileWithData("tmp.avro", expectedResults); + + HDFSFileSource source = + HDFSFileSource.from(file.toString(), SequenceFileInputFormat.class, + IntWritable.class, Text.class); + + // Assert that the source produces the expected records + assertEquals(expectedResults, readFromSource(source, options)); + + // Split with a small bundle size (has to be at least size of sync interval) + List>> splits = source + .splitIntoBundles(SequenceFile.SYNC_INTERVAL, options); + assertTrue(splits.size() > 2); + SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options); + int nonEmptySplits = 0; + for (BoundedSource> subSource : splits) { + if (readFromSource(subSource, options).size() > 0) { + nonEmptySplits += 1; + } + } + assertTrue(nonEmptySplits > 2); } - assertTrue(nonEmptySplits > 2); - } - - private File createFileWithData(String filename, List> records) - throws IOException { - File tmpFile = tmpFolder.newFile(filename); - try (Writer writer = SequenceFile.createWriter(new Configuration(), - Writer.keyClass(IntWritable.class), Writer.valueClass(Text.class), - Writer.file(new Path(tmpFile.toURI())))) { - - for (KV record : records) { - writer.append(record.getKey(), record.getValue()); - } + + private File createFileWithData(String filename, List> records) + throws IOException { + File tmpFile = tmpFolder.newFile(filename); + try (Writer writer = SequenceFile.createWriter(new Configuration(), + Writer.keyClass(IntWritable.class), Writer.valueClass(Text.class), + Writer.file(new Path(tmpFile.toURI())))) { + + for (KV record : records) { + writer.append(record.getKey(), record.getValue()); + } + } + return tmpFile; } - return tmpFile; - } - - private List> createRandomRecords(int dataItemLength, - int numItems, int offset) { - List> records = new ArrayList<>(); - for (int i = 0; i < numItems; i++) { - IntWritable key = new IntWritable(i + offset); - Text value = new Text(createRandomString(dataItemLength)); - records.add(KV.of(key, value)); + + private List> createRandomRecords(int dataItemLength, + int numItems, int offset) { + List> records = new ArrayList<>(); + for (int i = 0; i < numItems; i++) { + IntWritable key = new IntWritable(i + offset); + Text value = new Text(createRandomString(dataItemLength)); + records.add(KV.of(key, value)); + } + return records; } - return records; - } - - private String createRandomString(int length) { - char[] chars = "abcdefghijklmnopqrstuvwxyz".toCharArray(); - StringBuilder builder = new StringBuilder(); - for (int i = 0; i < length; i++) { - builder.append(chars[random.nextInt(chars.length)]); + + private String createRandomString(int length) { + char[] chars = "abcdefghijklmnopqrstuvwxyz".toCharArray(); + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < length; i++) { + builder.append(chars[random.nextInt(chars.length)]); + } + return builder.toString(); } - return builder.toString(); - } } diff --git a/sdks/java/maven-archetypes/starter/pom.xml b/sdks/java/maven-archetypes/starter/pom.xml index 5b6cb2ab2132..9fb21e90eda3 100644 --- a/sdks/java/maven-archetypes/starter/pom.xml +++ b/sdks/java/maven-archetypes/starter/pom.xml @@ -60,6 +60,9 @@ integration-test + + true + From 460d21cb7070603f789da9d13e12668194c91e9b Mon Sep 17 00:00:00 2001 From: Romain manni-Bucau Date: Tue, 21 Jun 2016 10:37:05 +0200 Subject: [PATCH 2/2] better comments for win workaround and basic sanity checks for winutils.exe --- .../beam/runners/flink/WriteSinkITCase.java | 2 +- .../beam/sdk/testing/HadoopWorkarounds.java | 109 ++++++++++++++++-- sdks/java/io/hdfs/pom.xml | 9 -- sdks/java/maven-archetypes/starter/pom.xml | 6 +- 4 files changed, 104 insertions(+), 22 deletions(-) diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java index 1a5635057254..bb3778d70b0f 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java @@ -54,7 +54,7 @@ public WriteSinkITCase(){ @Override protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); + resultPath = getTempDirPath("result-" + System.nanoTime()); } @Override diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/HadoopWorkarounds.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/HadoopWorkarounds.java index ee2e1359cf2e..1c2aa2014fe8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/HadoopWorkarounds.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/HadoopWorkarounds.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.testing; +import static java.util.Arrays.asList; + import org.apache.commons.compress.utils.IOUtils; import java.io.File; @@ -26,15 +28,21 @@ import java.io.OutputStream; import java.net.MalformedURLException; import java.net.URL; +import java.nio.file.Files; +import java.util.Arrays; /** * A simple class ensure winutils.exe can be found in the JVM. + *

+ * See http://wiki.apache.org/hadoop/WindowsProblems for details. + *

+ * Note: don't forget to add org.bouncycastle:bcpg-jdk16 dependency to use it. */ public class HadoopWorkarounds { /** * In practise this method only needs to be called once by JVM * since hadoop uses static variables to store it. - * + *

* Note: ensure invocation is done before hadoop reads it * and ensure this folder survives tests * (avoid temporary folder usage since tests can share it). @@ -51,6 +59,8 @@ public static void win(final File hadoopHome) { // hadoop doesn't have winutils.exe :(: https://issues.apache.org/jira/browse/HADOOP-10051 // so use this github repo temporarly then just use the main tar.gz /* + // note this commented code requires commons-compress dependency (to add if we use that) + String hadoopVersion = VersionInfo.getVersion(); final URL url = new URL("https://archive.apache.org/dist/hadoop/common/ hadoop-" + hadoopVersion + "/hadoop-" + hadoopVersion + ".tar.gz"); @@ -97,19 +107,49 @@ public boolean accept(final File pathname) { + "-Dhadoop.home.dir so we'll download winutils.exe"); new File(hadoopHome, "bin").mkdirs(); - final URL url; - try { - url = new URL("https://github.com/steveloughran/winutils/" - + "raw/master/hadoop-2.7.1/bin/winutils.exe"); - } catch (final MalformedURLException e) { // unlikely - throw new IllegalArgumentException(e); + final File winutils = new File(hadoopHome, "bin/winutils.exe"); + + for (final String suffix : asList("", ".asc")) { + final URL url; + try { + // this code is not a random URL - read HADOOP-10051 + // it is provided and signed with an ASF gpg key. + + // note: 2.6.3 cause 2.6.4, 2.7.1 don't have .asc + url = new URL("https://github.com/steveloughran/winutils/" + + "raw/master/hadoop-2.6.3/bin/winutils.exe" + suffix); + } catch (final MalformedURLException e) { // unlikely + throw new IllegalArgumentException(e); + } + + // download winutils.exe + try { + try (final InputStream is = url.openStream(); + final OutputStream os = new FileOutputStream( + new File(hadoopHome, "bin/winutils.exe" + suffix))) { + try { + IOUtils.copy(is, os, 1024 * 1024); + } catch (final IOException e) { + throw new IllegalStateException(e); + } + } + } catch (final IOException e) { + throw new IllegalStateException(e); + } } + + // get the gpg key which is supposed to have signed the winutils.exe + final File gpg = new File(hadoopHome, "bin/gpg"); try { - try (final InputStream is = url.openStream(); - final OutputStream os = new FileOutputStream( - new File(hadoopHome, "bin/winutils.exe"))) { + /* + key is https://github.com/steveloughran/winutils/blob/master/KEYS + bu we trust the ASF not github so use the one we trust. + */ + final URL gpgUrl = new URL("http://home.apache.org/keys/committer/stevel"); + try (final InputStream is = gpgUrl.openStream(); + final OutputStream os = new FileOutputStream(gpg)) { try { - IOUtils.copy(is, os, 1024 * 1024); + IOUtils.copy(is, os); } catch (final IOException e) { throw new IllegalStateException(e); } @@ -117,9 +157,56 @@ public boolean accept(final File pathname) { } catch (final IOException e) { throw new IllegalStateException(e); } + + final File ascFile = new File(winutils.getParentFile(), winutils.getName() + ".asc"); + try { + sanityCheck(winutils, ascFile, gpg); + } catch (IOException e) { + throw new IllegalStateException("Invalid download"); + } + System.setProperty("hadoop.home.dir", hadoopHome.getAbsolutePath()); } + // TODO: replace with gpg --verify? + // for now it is just some basic sanity checks to ensure we use the files we think + private static void sanityCheck( + final File winutils, final File ascFile, final File gpg) + throws IOException { + + final byte[] asc = Files.readAllBytes(ascFile.toPath()); + final byte[] expectedAsc = ("-----BEGIN PGP SIGNATURE-----\n" + + "Comment: GPGTools - https://gpgtools.org\n" + + "\n" + + "iQIcBAABCgAGBQJWeb5GAAoJEKkkVPkXR4a0qUgP/1u1Z5vV+IvU/8w79HIYX56+\n" + + "FHMRGxM5953dggqjhGSBtfx62YA8oxhDP+8qLpQWtfjTC3//CW1Oz5hrkL0m+Am5\n" + + "Kf+qiINDLqX3Fsc4wHQvnLMt2pJPmm4K9FtpkedCdAchLOiM6Wr7WtGiWYQAdUh0\n" + + "5FjUZLLVx95Kj3cTY+1B/BL+z/hB63Ry2AC29oZG4fCuAH1nTZjhH3vBD1/kzS+E\n" + + "LEKHrGh/pP6ADgg9AfJvVmRhidlCVi21ZfwWHAaitwDTMFvtFSGq03A3F6Xn2iyQ\n" + + "3H6RcZ8dqEbtUEa1jOh1xNGzqP4oipWe0KQJ/Lx2eiSh8te73k/Pfw1Ta9CuHXqk\n" + + "n8ko7cBc/pUm7nXbfjiURtWFJ4corT4oahJQna+GgvYR4BrYVLlSGb5VijTkzb7i\n" + + "0XU40BM5sOcDS/I0lkvqKP0mSi+mMJXbm10y0jw2S7KR7KeHLwzybsjco05DfWUD\n" + + "fSaCHK726g5SLsWJvZaurwna7+Mepzmo1HpAVy6nAuiAa2OQVIioNyFanIbuhbM3\n" + + "7PXBDWbfPOgr1WbYW4TASoepvsuJsAahYf2SlGagByOiDNliDHJi1z+ArfWsCFFh\n" + + "fAMMzPLKJwkmKPahyej3MrcywtntX68D7R8wTCAaj3xCxJsvX4IRv6YRk1+hQ2je\n" + + "EXQFW2c8nTI6XqtFpsbw\n" + + "=42+k\n" + + "-----END PGP SIGNATURE-----\n").getBytes("UTF-8"); + if (!Arrays.equals(asc, expectedAsc)) { + throw new IllegalArgumentException( + "Invalid asc file, did the repo get corrupted?"); + } + + final byte[] exe = Files.readAllBytes(winutils.toPath()); + if (exe.length != 108032 || exe[0] != 77 + || exe[exe.length - 1] != 0 || exe[exe.length / 3] != -127) { + throw new IllegalArgumentException( + "Invalid winutils.exe file, did the repo get corrupted?"); + } + + // for now we ignore gpg cause it is useless until we can use gpg tools + } + /** * Just a convenient win(File) invocation for tests. */ diff --git a/sdks/java/io/hdfs/pom.xml b/sdks/java/io/hdfs/pom.xml index f8e3c142ec97..9c307922e2e5 100644 --- a/sdks/java/io/hdfs/pom.xml +++ b/sdks/java/io/hdfs/pom.xml @@ -83,14 +83,5 @@ junit test - - diff --git a/sdks/java/maven-archetypes/starter/pom.xml b/sdks/java/maven-archetypes/starter/pom.xml index 9fb21e90eda3..3d8267ebffd8 100644 --- a/sdks/java/maven-archetypes/starter/pom.xml +++ b/sdks/java/maven-archetypes/starter/pom.xml @@ -61,7 +61,11 @@ integration-test - true + + true