From 213eef8bc835c9fa85333356b413cb5886987ba7 Mon Sep 17 00:00:00 2001 From: Eugene Kirpichov Date: Wed, 10 May 2017 12:18:47 -0700 Subject: [PATCH 1/2] This closes #3023 --- .../beam/examples/WindowedWordCount.java | 7 +- .../common/WriteOneFilePerWindow.java | 24 +- .../beam/examples/WindowedWordCountIT.java | 23 +- .../construction/PTransformMatchersTest.java | 2 +- .../direct/WriteWithShardingFactoryTest.java | 2 +- ...ltCoderCloudObjectTranslatorRegistrar.java | 2 - .../java/org/apache/beam/sdk/io/AvroSink.java | 14 +- .../org/apache/beam/sdk/io/FileBasedSink.java | 316 +++++++++++------- .../org/apache/beam/sdk/io/TFRecordIO.java | 16 +- .../java/org/apache/beam/sdk/io/TextSink.java | 14 +- .../org/apache/beam/sdk/io/WriteFiles.java | 275 ++++++++------- .../apache/beam/sdk/io/FileBasedSinkTest.java | 162 +++++---- .../org/apache/beam/sdk/io/SimpleSink.java | 4 +- .../org/apache/beam/sdk/io/xml/XmlSink.java | 8 +- 14 files changed, 495 insertions(+), 374 deletions(-) diff --git a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java index 45746af3efd1..20b48e435929 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java +++ b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java @@ -161,12 +161,15 @@ public interface Options extends WordCount.WordCountOptions, @Default.InstanceFactory(DefaultToMinTimestampPlusOneHour.class) Long getMaxTimestampMillis(); void setMaxTimestampMillis(Long value); + + @Description("Fixed number of shards to produce per window, or null for runner-chosen sharding") + Integer getNumShards(); + void setNumShards(Integer numShards); } public static void main(String[] args) throws IOException { Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); final String output = options.getOutput(); - final Duration windowSize = Duration.standardMinutes(options.getWindowSize()); final Instant minTimestamp = new Instant(options.getMinTimestampMillis()); final Instant maxTimestamp = new Instant(options.getMaxTimestampMillis()); @@ -207,7 +210,7 @@ public static void main(String[] args) throws IOException { */ wordCounts .apply(MapElements.via(new WordCount.FormatAsTextFn())) - .apply(new WriteOneFilePerWindow(output)); + .apply(new WriteOneFilePerWindow(output, options.getNumShards())); PipelineResult result = pipeline.run(); try { diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java index ed35c8a05503..5e6df9c781fd 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java +++ b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java @@ -19,6 +19,7 @@ import static com.google.common.base.Verify.verifyNotNull; +import javax.annotation.Nullable; import org.apache.beam.sdk.io.FileBasedSink; import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; import org.apache.beam.sdk.io.TextIO; @@ -40,12 +41,14 @@ * lessons. */ public class WriteOneFilePerWindow extends PTransform, PDone> { - private static final DateTimeFormatter FORMATTER = ISODateTimeFormat.hourMinute(); - private final String filenamePrefix; + private String filenamePrefix; + @Nullable + private Integer numShards; - public WriteOneFilePerWindow(String filenamePrefix) { + public WriteOneFilePerWindow(String filenamePrefix, Integer numShards) { this.filenamePrefix = filenamePrefix; + this.numShards = numShards; } @Override @@ -61,12 +64,15 @@ public PDone expand(PCollection input) { resource); } - return input.apply( - TextIO.write() - .to(resource.getCurrentDirectory()) - .withFilenamePolicy(new PerWindowFiles(prefix)) - .withWindowedWrites() - .withNumShards(3)); + + TextIO.Write write = TextIO.write() + .to(resource.getCurrentDirectory()) + .withFilenamePolicy(new PerWindowFiles(prefix)) + .withWindowedWrites(); + if (numShards != null) { + write = write.withNumShards(numShards); + } + return input.apply(write); } /** diff --git a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java index 93c4543e0739..eb7e4c4e936f 100644 --- a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java +++ b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java @@ -86,14 +86,29 @@ public static void setUp() { } @Test - public void testWindowedWordCountInBatch() throws Exception { - testWindowedWordCountPipeline(defaultOptions()); + public void testWindowedWordCountInBatchDynamicSharding() throws Exception { + WindowedWordCountITOptions options = batchOptions(); + // This is the default value, but make it explicit + options.setNumShards(null); + testWindowedWordCountPipeline(options); } + @Test + public void testWindowedWordCountInBatchStaticSharding() throws Exception { + WindowedWordCountITOptions options = batchOptions(); + options.setNumShards(3); + testWindowedWordCountPipeline(options); + } + + // TODO: add a test with streaming and dynamic sharding after resolving + // https://issues.apache.org/jira/browse/BEAM-1438 + @Test @Category(StreamingIT.class) - public void testWindowedWordCountInStreaming() throws Exception { - testWindowedWordCountPipeline(streamingOptions()); + public void testWindowedWordCountInStreamingStaticSharding() throws Exception { + WindowedWordCountITOptions options = streamingOptions(); + options.setNumShards(3); + testWindowedWordCountPipeline(options); } private WindowedWordCountITOptions defaultOptions() throws Exception { diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java index 9ae236be0737..cfea62ffb537 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java @@ -510,7 +510,7 @@ public void writeWithRunnerDeterminedSharding() { WriteFiles.to( new FileBasedSink(StaticValueProvider.of(outputDirectory), policy) { @Override - public FileBasedWriteOperation createWriteOperation() { + public WriteOperation createWriteOperation() { return null; } }); diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java index 5a2a328beab0..5c4fea1cb753 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java @@ -134,7 +134,7 @@ public void withNoShardingSpecifiedReturnsNewTransform() { WriteFiles original = WriteFiles.to( new FileBasedSink(StaticValueProvider.of(outputDirectory), policy) { @Override - public FileBasedWriteOperation createWriteOperation() { + public WriteOperation createWriteOperation() { throw new IllegalArgumentException("Should not be used"); } }); diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java index 45670988be15..5d42a5fedde0 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java @@ -47,7 +47,6 @@ import org.apache.beam.sdk.coders.TextualIntegerCoder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VoidCoder; -import org.apache.beam.sdk.io.FileBasedSink.FileResultCoder; import org.apache.beam.sdk.io.gcp.bigquery.TableDestinationCoder; import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder; @@ -91,7 +90,6 @@ public class DefaultCoderCloudObjectTranslatorRegistrar ByteCoder.class, DoubleCoder.class, DurationCoder.class, - FileResultCoder.class, FooterCoder.class, InstantCoder.class, IsmShardCoder.class, diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java index 7d42574447b9..6c362664d8fa 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java @@ -51,12 +51,12 @@ class AvroSink extends FileBasedSink { } @Override - public FileBasedWriteOperation createWriteOperation() { + public WriteOperation createWriteOperation() { return new AvroWriteOperation<>(this, coder, codec, metadata); } - /** A {@link FileBasedWriteOperation FileBasedWriteOperation} for Avro files. */ - private static class AvroWriteOperation extends FileBasedWriteOperation { + /** A {@link WriteOperation WriteOperation} for Avro files. */ + private static class AvroWriteOperation extends WriteOperation { private final AvroCoder coder; private final SerializableAvroCodecFactory codec; private final ImmutableMap metadata; @@ -72,19 +72,19 @@ private AvroWriteOperation(AvroSink sink, } @Override - public FileBasedWriter createWriter() throws Exception { + public Writer createWriter() throws Exception { return new AvroWriter<>(this, coder, codec, metadata); } } - /** A {@link FileBasedWriter FileBasedWriter} for Avro files. */ - private static class AvroWriter extends FileBasedWriter { + /** A {@link Writer Writer} for Avro files. */ + private static class AvroWriter extends Writer { private final AvroCoder coder; private DataFileWriter dataFileWriter; private SerializableAvroCodecFactory codec; private final ImmutableMap metadata; - public AvroWriter(FileBasedWriteOperation writeOperation, + public AvroWriter(WriteOperation writeOperation, AvroCoder coder, SerializableAvroCodecFactory codec, ImmutableMap metadata) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java index 32aa9c388d89..7f729a7ea828 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java @@ -22,10 +22,12 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verifyNotNull; +import static org.apache.beam.sdk.io.WriteFiles.UNKNOWN_SHARDNUM; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import com.google.common.collect.Ordering; import java.io.IOException; import java.io.InputStream; @@ -34,7 +36,9 @@ import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -43,10 +47,12 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.zip.GZIPOutputStream; import javax.annotation.Nullable; -import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.NullableCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.StructuredCoder; +import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy.Context; import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy.WindowedContext; import org.apache.beam.sdk.io.fs.MatchResult; @@ -63,6 +69,7 @@ import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder; import org.apache.beam.sdk.util.MimeTypes; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; import org.apache.commons.compress.compressors.deflate.DeflateCompressorOutputStream; @@ -78,7 +85,7 @@ * type, etc.). * *

At pipeline construction time, the methods of FileBasedSink are called to validate the sink - * and to create a {@link FileBasedWriteOperation} that manages the process of writing to the sink. + * and to create a {@link WriteOperation} that manages the process of writing to the sink. * *

The process of writing to file-based sink is as follows: *

    @@ -89,8 +96,8 @@ * *

    In order to ensure fault-tolerance, a bundle may be executed multiple times (e.g., in the * event of failure/retry or for redundancy). However, exactly one of these executions will have its - * result passed to the finalize method. Each call to {@link FileBasedWriter#openWindowed} - * or {@link FileBasedWriter#openUnwindowed} is passed a unique bundle id when it is called + * result passed to the finalize method. Each call to {@link Writer#openWindowed} + * or {@link Writer#openUnwindowed} is passed a unique bundle id when it is called * by the WriteFiles transform, so even redundant or retried bundles will have a unique way of * identifying * their output. @@ -306,8 +313,7 @@ public void populateDisplayData(DisplayData.Builder builder) { } /** The policy used to generate names of files to be produced. */ - @VisibleForTesting - final FilenamePolicy filenamePolicy; + private final FilenamePolicy filenamePolicy; /** The directory to which files will be written. */ private final ValueProvider baseOutputDirectoryProvider; @@ -359,10 +365,10 @@ public final FilenamePolicy getFilenamePolicy() { public void validate(PipelineOptions options) {} /** - * Return a subclass of {@link FileBasedSink.FileBasedWriteOperation} that will manage the write + * Return a subclass of {@link WriteOperation} that will manage the write * to the sink. */ - public abstract FileBasedWriteOperation createWriteOperation(); + public abstract WriteOperation createWriteOperation(); public void populateDisplayData(DisplayData.Builder builder) { getFilenamePolicy().populateDisplayData(builder); @@ -371,24 +377,24 @@ public void populateDisplayData(DisplayData.Builder builder) { /** * Abstract operation that manages the process of writing to {@link FileBasedSink}. * - *

    The primary responsibilities of the FileBasedWriteOperation is the management of output - * files. During a write, {@link FileBasedSink.FileBasedWriter}s write bundles to temporary file + *

    The primary responsibilities of the WriteOperation is the management of output + * files. During a write, {@link Writer}s write bundles to temporary file * locations. After the bundles have been written, *

      - *
    1. {@link FileBasedSink.FileBasedWriteOperation#finalize} is given a list of the temporary + *
    2. {@link WriteOperation#finalize} is given a list of the temporary * files containing the output bundles. *
    3. During finalize, these temporary files are copied to final output locations and named * according to a file naming template. *
    4. Finally, any temporary files that were created during the write are removed. *
    * - *

    Subclass implementations of FileBasedWriteOperation must implement - * {@link FileBasedSink.FileBasedWriteOperation#createWriter} to return a concrete + *

    Subclass implementations of WriteOperation must implement + * {@link WriteOperation#createWriter} to return a concrete * FileBasedSinkWriter. * *

    Temporary and Output File Naming:

    During the write, bundles are written to temporary * files using the tempDirectory that can be provided via the constructor of - * FileBasedWriteOperation. These temporary files will be named + * WriteOperation. These temporary files will be named * {@code {tempDirectory}/{bundleId}}, where bundleId is the unique id of the bundle. * For example, if tempDirectory is "gs://my-bucket/my_temp_output", the output for a * bundle with bundle id 15723 will be "gs://my-bucket/my_temp_output/15723". @@ -408,7 +414,7 @@ public void populateDisplayData(DisplayData.Builder builder) { * * @param the type of values written to the sink. */ - public abstract static class FileBasedWriteOperation implements Serializable { + public abstract static class WriteOperation implements Serializable { /** * The Sink that this WriteOperation will write to. */ @@ -427,7 +433,7 @@ protected static ResourceId buildTemporaryFilename(ResourceId tempDirectory, Str } /** - * Constructs a FileBasedWriteOperation using the default strategy for generating a temporary + * Constructs a WriteOperation using the default strategy for generating a temporary * directory from the base output filename. * *

    Default is a uniquely named sibling of baseOutputFilename, e.g. if baseOutputFilename is @@ -435,7 +441,7 @@ protected static ResourceId buildTemporaryFilename(ResourceId tempDirectory, Str * * @param sink the FileBasedSink that will be used to configure this write operation. */ - public FileBasedWriteOperation(FileBasedSink sink) { + public WriteOperation(FileBasedSink sink) { this(sink, NestedValueProvider.of( sink.getBaseOutputDirectoryProvider(), new TemporaryDirectoryBuilder())); } @@ -461,16 +467,16 @@ public ResourceId apply(ResourceId baseOutputDirectory) { } /** - * Create a new FileBasedWriteOperation. + * Create a new WriteOperation. * * @param sink the FileBasedSink that will be used to configure this write operation. * @param tempDirectory the base directory to be used for temporary output files. */ - public FileBasedWriteOperation(FileBasedSink sink, ResourceId tempDirectory) { + public WriteOperation(FileBasedSink sink, ResourceId tempDirectory) { this(sink, StaticValueProvider.of(tempDirectory)); } - private FileBasedWriteOperation( + private WriteOperation( FileBasedSink sink, ValueProvider tempDirectory) { this.sink = sink; this.tempDirectory = tempDirectory; @@ -478,10 +484,10 @@ private FileBasedWriteOperation( } /** - * Clients must implement to return a subclass of {@link FileBasedSink.FileBasedWriter}. This + * Clients must implement to return a subclass of {@link Writer}. This * method must not mutate the state of the object. */ - public abstract FileBasedWriter createWriter() throws Exception; + public abstract Writer createWriter() throws Exception; /** * Indicates that the operation will be performing windowed writes. @@ -523,31 +529,66 @@ public void finalize(Iterable writerResults) throws Exception { protected final Map buildOutputFilenames( Iterable writerResults) { + int numShards = Iterables.size(writerResults); Map outputFilenames = new HashMap<>(); - List files = new ArrayList<>(); + + FilenamePolicy policy = getSink().getFilenamePolicy(); + ResourceId baseOutputDir = getSink().getBaseOutputDirectoryProvider().get(); + + // Either all results have a shard number set (if the sink is configured with a fixed + // number of shards), or they all don't (otherwise). + Boolean isShardNumberSetEverywhere = null; for (FileResult result : writerResults) { - if (result.getDestinationFilename() != null) { - outputFilenames.put(result.getFilename(), result.getDestinationFilename()); + boolean isShardNumberSetHere = (result.getShard() != UNKNOWN_SHARDNUM); + if (isShardNumberSetEverywhere == null) { + isShardNumberSetEverywhere = isShardNumberSetHere; } else { - files.add(result.getFilename()); + checkArgument( + isShardNumberSetEverywhere == isShardNumberSetHere, + "Found a mix of files with and without shard number set: %s", + result); } } - // writerResults won't contain destination filenames, so we dynamically generate them here. - if (files.size() > 0) { - checkArgument(outputFilenames.isEmpty()); - // Sort files for idempotence. - files = Ordering.usingToString().sortedCopy(files); - ResourceId outputDirectory = getSink().getBaseOutputDirectoryProvider().get(); - FilenamePolicy filenamePolicy = getSink().filenamePolicy; - for (int i = 0; i < files.size(); i++) { - outputFilenames.put(files.get(i), - filenamePolicy.unwindowedFilename(outputDirectory, new Context(i, files.size()), - getSink().getExtension())); + if (isShardNumberSetEverywhere == null) { + isShardNumberSetEverywhere = true; + } + + List resultsWithShardNumbers = Lists.newArrayList(); + if (isShardNumberSetEverywhere) { + resultsWithShardNumbers = Lists.newArrayList(writerResults); + } else { + // Sort files for idempotence. Sort by temporary filename. + // Note that this codepath should not be used when processing triggered windows. In the + // case of triggers, the list of FileResult objects in the Finalize iterable is not + // deterministic, and might change over retries. This breaks the assumption below that + // sorting the FileResult objects provides idempotency. + List sortedByTempFilename = + Ordering.from( + new Comparator() { + @Override + public int compare(FileResult first, FileResult second) { + String firstFilename = first.getTempFilename().toString(); + String secondFilename = second.getTempFilename().toString(); + return firstFilename.compareTo(secondFilename); + } + }) + .sortedCopy(writerResults); + for (int i = 0; i < sortedByTempFilename.size(); i++) { + resultsWithShardNumbers.add(sortedByTempFilename.get(i).withShard(i)); } } - int numDistinctShards = new HashSet(outputFilenames.values()).size(); + for (FileResult result : resultsWithShardNumbers) { + checkArgument( + result.getShard() != UNKNOWN_SHARDNUM, "Should have set shard number on %s", result); + outputFilenames.put( + result.getTempFilename(), + result.getDestinationFile( + policy, baseOutputDir, numShards, getSink().getExtension())); + } + + int numDistinctShards = new HashSet<>(outputFilenames.values()).size(); checkState(numDistinctShards == outputFilenames.size(), "Only generated %s distinct file names for %s files.", numDistinctShards, outputFilenames.size()); @@ -558,7 +599,7 @@ protected final Map buildOutputFilenames( /** * Copy temporary files to final output filenames using the file naming template. * - *

    Can be called from subclasses that override {@link FileBasedWriteOperation#finalize}. + *

    Can be called from subclasses that override {@link WriteOperation#finalize}. * *

    Files will be named according to the file naming template. The order of the output files * will be the same as the sorted order of the input filenames. In other words, if the input @@ -591,7 +632,7 @@ final void copyToOutputFiles(Map filenames) /** * Removes temporary output files. Uses the temporary directory to find files to remove. * - *

    Can be called from subclasses that override {@link FileBasedWriteOperation#finalize}. + *

    Can be called from subclasses that override {@link WriteOperation#finalize}. * Note:If finalize is overridden and does not rename or otherwise finalize * temporary files, this method will remove them. */ @@ -647,6 +688,19 @@ final void removeTemporaryFiles( public FileBasedSink getSink() { return sink; } + + @Override + public String toString() { + String tempDirectoryStr = + tempDirectory.isAccessible() ? tempDirectory.get().toString() : tempDirectory.toString(); + return getClass().getSimpleName() + + "{" + + "tempDirectory=" + + tempDirectoryStr + + ", windowedWrites=" + + windowedWrites + + '}'; + } } /** Returns the extension that will be written to the produced files. */ @@ -667,15 +721,15 @@ protected final String getExtension() { * after the values in a bundle, respectively, as well as provide a MIME type for the output * channel. * - *

    Multiple {@link FileBasedWriter} instances may be created on the same worker, and therefore + *

    Multiple {@link Writer} instances may be created on the same worker, and therefore * any access to static members or methods should be thread safe. * * @param the type of values to write. */ - public abstract static class FileBasedWriter { - private static final Logger LOG = LoggerFactory.getLogger(FileBasedWriter.class); + public abstract static class Writer { + private static final Logger LOG = LoggerFactory.getLogger(Writer.class); - private final FileBasedWriteOperation writeOperation; + private final WriteOperation writeOperation; /** Unique id for this output bundle. */ private String id; @@ -683,7 +737,6 @@ public abstract static class FileBasedWriter { private BoundedWindow window; private PaneInfo paneInfo; private int shard = -1; - private int numShards = -1; /** The output file for this bundle. May be null if opening failed. */ private @Nullable ResourceId outputFile; @@ -704,9 +757,9 @@ public abstract static class FileBasedWriter { private final String mimeType; /** - * Construct a new {@link FileBasedWriter} that will produce files of the given MIME type. + * Construct a new {@link Writer} that will produce files of the given MIME type. */ - public FileBasedWriter(FileBasedWriteOperation writeOperation, String mimeType) { + public Writer(WriteOperation writeOperation, String mimeType) { checkNotNull(writeOperation); this.writeOperation = writeOperation; this.mimeType = mimeType; @@ -738,24 +791,23 @@ protected void writeFooter() throws Exception {} protected void finishWrite() throws Exception {} /** - * Performs bundle initialization. For example, creates a temporary file for writing or - * initializes any state that will be used across calls to {@link FileBasedWriter#write}. + * Performs bundle initialization. For example, creates a temporary file for writing or + * initializes any state that will be used across calls to {@link Writer#write}. * - *

    The unique id that is given to open should be used to ensure that the writer's output - * does not interfere with the output of other Writers, as a bundle may be executed many - * times for fault tolerance. + *

    The unique id that is given to open should be used to ensure that the writer's output does + * not interfere with the output of other Writers, as a bundle may be executed many times for + * fault tolerance. * - *

    The window and paneInfo arguments are populated when windowed writes are requested. - * shard and numShards are populated for the case of static sharding. In cases where the - * runner is dynamically picking sharding, shard and numShards might both be set to -1. + *

    The window and paneInfo arguments are populated when windowed writes are requested. shard + * id populated for the case of static sharding. In cases where the runner is dynamically + * picking sharding, shard might be set to -1. */ - public final void openWindowed( - String uId, BoundedWindow window, PaneInfo paneInfo, int shard, int numShards) + public final void openWindowed(String uId, BoundedWindow window, PaneInfo paneInfo, int shard) throws Exception { if (!getWriteOperation().windowedWrites) { throw new IllegalStateException("openWindowed called a non-windowed sink."); } - open(uId, window, paneInfo, shard, numShards); + open(uId, window, paneInfo, shard); } /** @@ -767,13 +819,11 @@ public final void openWindowed( * Similar to {@link #openWindowed} however for the case where unwindowed writes were * requested. */ - public final void openUnwindowed(String uId, - int shard, - int numShards) throws Exception { + public final void openUnwindowed(String uId, int shard) throws Exception { if (getWriteOperation().windowedWrites) { throw new IllegalStateException("openUnwindowed called a windowed sink."); } - open(uId, null, null, shard, numShards); + open(uId, null, null, shard); } // Helper function to close a channel, on exception cases. @@ -792,13 +842,11 @@ private static void closeChannelAndThrow( private void open(String uId, @Nullable BoundedWindow window, @Nullable PaneInfo paneInfo, - int shard, - int numShards) throws Exception { + int shard) throws Exception { this.id = uId; this.window = window; this.paneInfo = paneInfo; this.shard = shard; - this.numShards = numShards; ResourceId tempDirectory = getWriteOperation().tempDirectory.get(); outputFile = tempDirectory.resolve(id, StandardResolveOptions.RESOLVE_FILE); verifyNotNull( @@ -818,7 +866,7 @@ private void open(String uId, closeChannelAndThrow(tempChannel, outputFile, e); } - // The caller shouldn't have to close() this FileBasedWriter if it fails to open(), so close + // The caller shouldn't have to close() this Writer if it fails to open(), so close // the channel if prepareWrite() or writeHeader() fails. String step = ""; try { @@ -874,30 +922,15 @@ public final FileResult close() throws Exception { throw new IOException(String.format("Failed closing channel to %s", outputFile), e); } - FileBasedSink sink = getWriteOperation().getSink(); - ResourceId outputDirectory = sink.getBaseOutputDirectoryProvider().get(); - FilenamePolicy filenamePolicy = sink.filenamePolicy; - String extension = sink.getExtension(); - @Nullable ResourceId destinationFile; - if (window != null) { - destinationFile = filenamePolicy.windowedFilename(outputDirectory, new WindowedContext( - window, paneInfo, shard, numShards), extension); - } else if (numShards > 0) { - destinationFile = filenamePolicy.unwindowedFilename( - outputDirectory, new Context(shard, numShards), extension); - } else { - // Destination filename to be generated in the next step. - destinationFile = null; - } - FileResult result = new FileResult(outputFile, destinationFile); - LOG.debug("Result for bundle {}: {} {}", this.id, outputFile, destinationFile); + FileResult result = new FileResult(outputFile, shard, window, paneInfo); + LOG.debug("Result for bundle {}: {}", this.id, outputFile); return result; } /** - * Return the FileBasedWriteOperation that this Writer belongs to. + * Return the WriteOperation that this Writer belongs to. */ - public FileBasedWriteOperation getWriteOperation() { + public WriteOperation getWriteOperation() { return writeOperation; } } @@ -906,31 +939,58 @@ public FileBasedWriteOperation getWriteOperation() { * Result of a single bundle write. Contains the filename produced by the bundle, and if known * the final output filename. */ - public static final class FileResult implements Serializable { - private final ResourceId filename; - @Nullable private final ResourceId destinationFilename; + public static final class FileResult { + private final ResourceId tempFilename; + private final int shard; + private final BoundedWindow window; + private final PaneInfo paneInfo; + + public FileResult(ResourceId tempFilename, int shard, BoundedWindow window, PaneInfo paneInfo) { + this.tempFilename = tempFilename; + this.shard = shard; + this.window = window; + this.paneInfo = paneInfo; + } - public FileResult(ResourceId filename, @Nullable ResourceId destinationFilename) { - this.filename = filename; - this.destinationFilename = destinationFilename; + public ResourceId getTempFilename() { + return tempFilename; } - public ResourceId getFilename() { - return filename; + public int getShard() { + return shard; } - /** - * The filename to be written. Will be null if the output filename is unknown because the number - * of shards is determined dynamically by the runner. - */ - @Nullable public ResourceId getDestinationFilename() { - return destinationFilename; + public FileResult withShard(int shard) { + return new FileResult(tempFilename, shard, window, paneInfo); + } + + public BoundedWindow getWindow() { + return window; + } + + public PaneInfo getPaneInfo() { + return paneInfo; + } + + public ResourceId getDestinationFile(FilenamePolicy policy, ResourceId outputDirectory, + int numShards, String extension) { + checkArgument(getShard() != UNKNOWN_SHARDNUM); + checkArgument(numShards > 0); + if (getWindow() != null) { + return policy.windowedFilename(outputDirectory, new WindowedContext( + getWindow(), getPaneInfo(), getShard(), numShards), extension); + } else { + return policy.unwindowedFilename(outputDirectory, new Context(getShard(), numShards), + extension); + } } public String toString() { return MoreObjects.toStringHelper(FileResult.class) - .add("filename", filename) - .add("destinationFilename", destinationFilename) + .add("tempFilename", tempFilename) + .add("shard", shard) + .add("window", window) + .add("paneInfo", paneInfo) .toString(); } } @@ -938,12 +998,24 @@ public String toString() { /** * A coder for {@link FileResult} objects. */ - public static final class FileResultCoder extends AtomicCoder { - private static final FileResultCoder INSTANCE = new FileResultCoder(); - private final NullableCoder stringCoder = NullableCoder.of(StringUtf8Coder.of()); + public static final class FileResultCoder extends StructuredCoder { + private static final Coder FILENAME_CODER = StringUtf8Coder.of(); + private static final Coder SHARD_CODER = VarIntCoder.of(); + private static final Coder PANE_INFO_CODER = NullableCoder.of(PaneInfoCoder.INSTANCE); - public static FileResultCoder of() { - return INSTANCE; + private final Coder windowCoder; + + protected FileResultCoder(Coder windowCoder) { + this.windowCoder = NullableCoder.of(windowCoder); + } + + public static FileResultCoder of(Coder windowCoder) { + return new FileResultCoder(windowCoder); + } + + @Override + public List> getCoderArguments() { + return Arrays.asList(windowCoder); } @Override @@ -952,29 +1024,29 @@ public void encode(FileResult value, OutputStream outStream) if (value == null) { throw new CoderException("cannot encode a null value"); } - stringCoder.encode(value.getFilename().toString(), outStream); - if (value.getDestinationFilename() == null) { - stringCoder.encode(null, outStream); - } else { - stringCoder.encode(value.getDestinationFilename().toString(), outStream); - } + FILENAME_CODER.encode(value.getTempFilename().toString(), outStream); + windowCoder.encode(value.getWindow(), outStream); + PANE_INFO_CODER.encode(value.getPaneInfo(), outStream); + SHARD_CODER.encode(value.getShard(), outStream); } @Override - public FileResult decode(InputStream inStream) throws IOException { - String filename = stringCoder.decode(inStream); - assert filename != null; // fixes a compiler warning - @Nullable String destinationFilename = stringCoder.decode(inStream); - return new FileResult( - FileSystems.matchNewResource(filename, false /* isDirectory */), - destinationFilename == null - ? null - : FileSystems.matchNewResource(destinationFilename, false /* isDirectory */)); + public FileResult decode(InputStream inStream) + throws IOException { + String tempFilename = FILENAME_CODER.decode(inStream); + BoundedWindow window = windowCoder.decode(inStream); + PaneInfo paneInfo = PANE_INFO_CODER.decode(inStream); + int shard = SHARD_CODER.decode(inStream); + return new FileResult(FileSystems.matchNewResource(tempFilename, false /* isDirectory */), + shard, window, paneInfo); } @Override public void verifyDeterministic() throws NonDeterministicException { - stringCoder.verifyDeterministic(); + FILENAME_CODER.verifyDeterministic(); + windowCoder.verifyDeterministic(); + PANE_INFO_CODER.verifyDeterministic(); + SHARD_CODER.verifyDeterministic(); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java index 6350fb57aa98..f73d6f3d284f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java @@ -567,7 +567,7 @@ public ResourceId apply(ResourceId input) { } @Override - public FileBasedWriteOperation createWriteOperation() { + public WriteOperation createWriteOperation() { return new TFRecordWriteOperation(this); } @@ -587,29 +587,29 @@ private static WritableByteChannelFactory writableByteChannelFactory( } /** - * A {@link org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation - * FileBasedWriteOperation} for TFRecord files. + * A {@link WriteOperation + * WriteOperation} for TFRecord files. */ - private static class TFRecordWriteOperation extends FileBasedWriteOperation { + private static class TFRecordWriteOperation extends WriteOperation { private TFRecordWriteOperation(TFRecordSink sink) { super(sink); } @Override - public FileBasedWriter createWriter() throws Exception { + public Writer createWriter() throws Exception { return new TFRecordWriter(this); } } /** - * A {@link org.apache.beam.sdk.io.FileBasedSink.FileBasedWriter FileBasedWriter} + * A {@link Writer Writer} * for TFRecord files. */ - private static class TFRecordWriter extends FileBasedWriter { + private static class TFRecordWriter extends Writer { private WritableByteChannel outChannel; private TFRecordCodec codec; - private TFRecordWriter(FileBasedWriteOperation writeOperation) { + private TFRecordWriter(WriteOperation writeOperation) { super(writeOperation, MimeTypes.BINARY); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java index 0ba537e554a7..511d6976a4e9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java @@ -49,12 +49,12 @@ class TextSink extends FileBasedSink { this.footer = footer; } @Override - public FileBasedWriteOperation createWriteOperation() { + public WriteOperation createWriteOperation() { return new TextWriteOperation(this, header, footer); } - /** A {@link FileBasedWriteOperation FileBasedWriteOperation} for text files. */ - private static class TextWriteOperation extends FileBasedWriteOperation { + /** A {@link WriteOperation WriteOperation} for text files. */ + private static class TextWriteOperation extends WriteOperation { @Nullable private final String header; @Nullable private final String footer; @@ -65,20 +65,20 @@ private TextWriteOperation(TextSink sink, @Nullable String header, @Nullable Str } @Override - public FileBasedWriter createWriter() throws Exception { + public Writer createWriter() throws Exception { return new TextWriter(this, header, footer); } } - /** A {@link FileBasedWriter FileBasedWriter} for text files. */ - private static class TextWriter extends FileBasedWriter { + /** A {@link Writer Writer} for text files. */ + private static class TextWriter extends Writer { private static final String NEWLINE = "\n"; @Nullable private final String header; @Nullable private final String footer; private OutputStreamWriter out; public TextWriter( - FileBasedWriteOperation writeOperation, + WriteOperation writeOperation, @Nullable String header, @Nullable String footer) { super(writeOperation, MimeTypes.TEXT); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java index 5b219027e64c..0786e5d72fe9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java @@ -22,18 +22,21 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import javax.annotation.Nullable; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.VoidCoder; -import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation; -import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriter; import org.apache.beam.sdk.io.FileBasedSink.FileResult; import org.apache.beam.sdk.io.FileBasedSink.FileResultCoder; +import org.apache.beam.sdk.io.FileBasedSink.WriteOperation; +import org.apache.beam.sdk.io.FileBasedSink.Writer; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; @@ -48,6 +51,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -63,7 +67,7 @@ * finalization of the write. The output of a write is {@link PDone}. * *

    By default, every bundle in the input {@link PCollection} will be processed by a - * {@link org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation}, so the number of output + * {@link WriteOperation}, so the number of output * will vary based on runner behavior, though at least 1 output will always be produced. The * exact parallelism of the write stage can be controlled using {@link WriteFiles#withNumShards}, * typically used to control how many files are produced or to globally limit the number of @@ -82,11 +86,9 @@ public class WriteFiles extends PTransform, PDone> { private static final Logger LOG = LoggerFactory.getLogger(WriteFiles.class); - private static final int UNKNOWN_SHARDNUM = -1; - private static final int UNKNOWN_NUMSHARDS = -1; - + static final int UNKNOWN_SHARDNUM = -1; private FileBasedSink sink; - private FileBasedWriteOperation writeOperation; + private WriteOperation writeOperation; // This allows the number of shards to be dynamically computed based on the input // PCollection. @Nullable @@ -119,9 +121,18 @@ private WriteFiles( @Override public PDone expand(PCollection input) { - checkArgument(IsBounded.BOUNDED == input.isBounded() || windowedWrites, - "%s can only be applied to an unbounded PCollection if doing windowed writes", - WriteFiles.class.getSimpleName()); + if (input.isBounded() == IsBounded.UNBOUNDED) { + checkArgument(windowedWrites, + "Must use windowed writes when applying %s to an unbounded PCollection", + WriteFiles.class.getSimpleName()); + // The reason for this is https://issues.apache.org/jira/browse/BEAM-1438 + // and similar behavior in other runners. + checkArgument( + computeNumShards != null || numShardsProvider != null, + "When applying %s to an unbounded PCollection, " + + "must specify number of output shards explicitly", + WriteFiles.class.getSimpleName()); + } this.writeOperation = sink.createWriteOperation(); this.writeOperation.setWindowedWrites(windowedWrites); return createWrite(input); @@ -238,61 +249,94 @@ public WriteFiles withWindowedWrites() { } /** - * Writes all the elements in a bundle using a {@link FileBasedWriter} produced by the - * {@link FileBasedSink.FileBasedWriteOperation} associated with the {@link FileBasedSink}. + * Writes all the elements in a bundle using a {@link Writer} produced by the + * {@link WriteOperation} associated with the {@link FileBasedSink} with windowed writes enabled. */ - private class WriteBundles extends DoFn { + private class WriteWindowedBundles extends DoFn { + private Map, Writer> windowedWriters; + + @StartBundle + public void startBundle(StartBundleContext c) { + // Reset state in case of reuse. We need to make sure that each bundle gets unique writers. + windowedWriters = Maps.newHashMap(); + } + + @ProcessElement + public void processElement(ProcessContext c, BoundedWindow window) throws Exception { + PaneInfo paneInfo = c.pane(); + Writer writer; + // If we are doing windowed writes, we need to ensure that we have separate files for + // data in different windows/panes. + KV key = KV.of(window, paneInfo); + writer = windowedWriters.get(key); + if (writer == null) { + String uuid = UUID.randomUUID().toString(); + LOG.info( + "Opening writer {} for write operation {}, window {} pane {}", + uuid, + writeOperation, + window, + paneInfo); + writer = writeOperation.createWriter(); + writer.openWindowed(uuid, window, paneInfo, UNKNOWN_SHARDNUM); + windowedWriters.put(key, writer); + LOG.debug("Done opening writer"); + } + + writeOrClose(writer, c.element()); + } + + @FinishBundle + public void finishBundle(FinishBundleContext c) throws Exception { + for (Map.Entry, Writer> entry : windowedWriters.entrySet()) { + FileResult result = entry.getValue().close(); + BoundedWindow window = entry.getKey().getKey(); + c.output(result, window.maxTimestamp(), window); + } + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.delegate(WriteFiles.this); + } + } + + /** + * Writes all the elements in a bundle using a {@link Writer} produced by the + * {@link WriteOperation} associated with the {@link FileBasedSink} with windowed writes disabled. + */ + private class WriteUnwindowedBundles extends DoFn { // Writer that will write the records in this bundle. Lazily // initialized in processElement. - private FileBasedWriter writer = null; + private Writer writer = null; private BoundedWindow window = null; - WriteBundles() { + @StartBundle + public void startBundle(StartBundleContext c) { + // Reset state in case of reuse. We need to make sure that each bundle gets unique writers. + writer = null; } @ProcessElement public void processElement(ProcessContext c, BoundedWindow window) throws Exception { - // Lazily initialize the Writer + // Cache a single writer for the bundle. if (writer == null) { LOG.info("Opening writer for write operation {}", writeOperation); writer = writeOperation.createWriter(); - - if (windowedWrites) { - writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), UNKNOWN_SHARDNUM, - UNKNOWN_NUMSHARDS); - } else { - writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, UNKNOWN_NUMSHARDS); - } - this.window = window; - LOG.debug("Done opening writer {} for operation {}", writer, writeOperation); - } - try { - writer.write(c.element()); - } catch (Exception e) { - // Discard write result and close the write. - try { - writer.close(); - // The writer does not need to be reset, as this DoFn cannot be reused. - } catch (Exception closeException) { - if (closeException instanceof InterruptedException) { - // Do not silently ignore interrupted state. - Thread.currentThread().interrupt(); - } - // Do not mask the exception that caused the write to fail. - e.addSuppressed(closeException); - } - throw e; + writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM); + LOG.debug("Done opening writer"); } + this.window = window; + writeOrClose(this.writer, c.element()); } @FinishBundle public void finishBundle(FinishBundleContext c) throws Exception { - if (writer != null) { - FileResult result = writer.close(); - c.output(result, window.maxTimestamp(), window); - // Reset state in case of reuse. - writer = null; + if (writer == null) { + return; } + FileResult result = writer.close(); + c.output(result, window.maxTimestamp(), window); } @Override @@ -302,50 +346,26 @@ public void populateDisplayData(DisplayData.Builder builder) { } /** - * Like {@link WriteBundles}, but where the elements for each shard have been collected into - * a single iterable. - * - * @see WriteBundles + * Like {@link WriteWindowedBundles} and {@link WriteUnwindowedBundles}, but where the elements + * for each shard have been collected into a single iterable. */ private class WriteShardedBundles extends DoFn>, FileResult> { - private final PCollectionView numShardsView; - - WriteShardedBundles(PCollectionView numShardsView) { - this.numShardsView = numShardsView; - } - @ProcessElement public void processElement(ProcessContext c, BoundedWindow window) throws Exception { - int numShards = numShardsView != null ? c.sideInput(numShardsView) : getNumShards().get(); // In a sharded write, single input element represents one shard. We can open and close // the writer in each call to processElement. LOG.info("Opening writer for write operation {}", writeOperation); - FileBasedWriter writer = writeOperation.createWriter(); + Writer writer = writeOperation.createWriter(); if (windowedWrites) { - writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), c.element().getKey(), - numShards); + writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), c.element().getKey()); } else { - writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, UNKNOWN_NUMSHARDS); + writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM); } - LOG.debug("Done opening writer {} for operation {}", writer, writeOperation); + LOG.debug("Done opening writer"); try { - try { - for (T t : c.element().getValue()) { - writer.write(t); - } - } catch (Exception e) { - try { - writer.close(); - } catch (Exception closeException) { - if (closeException instanceof InterruptedException) { - // Do not silently ignore interrupted state. - Thread.currentThread().interrupt(); - } - // Do not mask the exception that caused the write to fail. - e.addSuppressed(closeException); - } - throw e; + for (T t : c.element().getValue()) { + writeOrClose(writer, t); } // Close the writer; if this throws let the error propagate. @@ -364,6 +384,24 @@ public void populateDisplayData(DisplayData.Builder builder) { } } + private static void writeOrClose(Writer writer, T t) throws Exception { + try { + writer.write(t); + } catch (Exception e) { + try { + writer.close(); + } catch (Exception closeException) { + if (closeException instanceof InterruptedException) { + // Do not silently ignore interrupted state. + Thread.currentThread().interrupt(); + } + // Do not mask the exception that caused the write to fail. + e.addSuppressed(closeException); + } + throw e; + } + } + private static class ApplyShardingKey extends DoFn> { private final PCollectionView numShardsView; private final ValueProvider numShardsProvider; @@ -378,7 +416,7 @@ private static class ApplyShardingKey extends DoFn> { @ProcessElement public void processElement(ProcessContext context) { - int shardCount = 0; + final int shardCount; if (numShardsView != null) { shardCount = context.sideInput(numShardsView); } else { @@ -404,29 +442,29 @@ public void processElement(ProcessContext context) { /** * A write is performed as sequence of three {@link ParDo}'s. * - *

    This singleton collection containing the FileBasedWriteOperation is then used as a side + *

    This singleton collection containing the WriteOperation is then used as a side * input to a ParDo over the PCollection of elements to write. In this bundle-writing phase, - * {@link FileBasedWriteOperation#createWriter} is called to obtain a {@link FileBasedWriter}. - * {@link FileBasedWriter#open} and {@link FileBasedWriter#close} are called in + * {@link WriteOperation#createWriter} is called to obtain a {@link Writer}. + * {@link Writer#open} and {@link Writer#close} are called in * {@link DoFn.StartBundle} and {@link DoFn.FinishBundle}, respectively, and - * {@link FileBasedWriter#write} method is called for every element in the bundle. The output + * {@link Writer#write} method is called for every element in the bundle. The output * of this ParDo is a PCollection of writer result objects (see {@link FileBasedSink} * for a description of writer results)-one for each bundle. * *

    The final do-once ParDo uses a singleton collection asinput and the collection of writer - * results as a side-input. In this ParDo, {@link FileBasedWriteOperation#finalize} is called + * results as a side-input. In this ParDo, {@link WriteOperation#finalize} is called * to finalize the write. * - *

    If the write of any element in the PCollection fails, {@link FileBasedWriter#close} will be + *

    If the write of any element in the PCollection fails, {@link Writer#close} will be * called before the exception that caused the write to fail is propagated and the write result * will be discarded. * - *

    Since the {@link FileBasedWriteOperation} is serialized after the initialization ParDo and + *

    Since the {@link WriteOperation} is serialized after the initialization ParDo and * deserialized in the bundle-writing and finalization phases, any state change to the - * FileBasedWriteOperation object that occurs during initialization is visible in the latter - * phases. However, the FileBasedWriteOperation is not serialized after the bundle-writing + * WriteOperation object that occurs during initialization is visible in the latter + * phases. However, the WriteOperation is not serialized after the bundle-writing * phase. This is why implementations should guarantee that - * {@link FileBasedWriteOperation#createWriter} does not mutate FileBasedWriteOperation). + * {@link WriteOperation#createWriter} does not mutate WriteOperation). */ private PDone createWrite(PCollection input) { Pipeline p = input.getPipeline(); @@ -442,41 +480,42 @@ private PDone createWrite(PCollection input) { // Perform the per-bundle writes as a ParDo on the input PCollection (with the - // FileBasedWriteOperation as a side input) and collect the results of the writes in a + // WriteOperation as a side input) and collect the results of the writes in a // PCollection. There is a dependency between this ParDo and the first (the - // FileBasedWriteOperation PCollection as a side input), so this will happen after the + // WriteOperation PCollection as a side input), so this will happen after the // initial ParDo. PCollection results; final PCollectionView numShardsView; + Coder shardedWindowCoder = + (Coder) input.getWindowingStrategy().getWindowFn().windowCoder(); if (computeNumShards == null && numShardsProvider == null) { - if (windowedWrites) { - throw new IllegalStateException("When doing windowed writes, numShards must be set" - + "explicitly to a positive value"); - } numShardsView = null; - results = input - .apply("WriteBundles", - ParDo.of(new WriteBundles())); + results = + input.apply( + "WriteBundles", + ParDo.of(windowedWrites ? new WriteWindowedBundles() : new WriteUnwindowedBundles())); } else { + List> sideInputs = Lists.newArrayList(); if (computeNumShards != null) { numShardsView = input.apply(computeNumShards); - results = input - .apply("ApplyShardLabel", ParDo.of( - new ApplyShardingKey(numShardsView, null)).withSideInputs(numShardsView)) - .apply("GroupIntoShards", GroupByKey.create()) - .apply("WriteShardedBundles", - ParDo.of(new WriteShardedBundles(numShardsView)) - .withSideInputs(numShardsView)); + sideInputs.add(numShardsView); } else { numShardsView = null; - results = input - .apply("ApplyShardLabel", ParDo.of(new ApplyShardingKey(null, numShardsProvider))) - .apply("GroupIntoShards", GroupByKey.create()) - .apply("WriteShardedBundles", - ParDo.of(new WriteShardedBundles(null))); } + + PCollection>> sharded = + input + .apply("ApplyShardLabel", ParDo.of( + new ApplyShardingKey(numShardsView, + (numShardsView != null) ? null : numShardsProvider)) + .withSideInputs(sideInputs)) + .apply("GroupIntoShards", GroupByKey.create()); + shardedWindowCoder = + (Coder) sharded.getWindowingStrategy().getWindowFn().windowCoder(); + + results = sharded.apply("WriteShardedBundles", ParDo.of(new WriteShardedBundles())); } - results.setCoder(FileResultCoder.of()); + results.setCoder(FileResultCoder.of(shardedWindowCoder)); if (windowedWrites) { // When processing streaming windowed writes, results will arrive multiple times. This @@ -486,7 +525,8 @@ private PDone createWrite(PCollection input) { // whenever new data arrives. PCollection> keyedResults = results.apply("AttachSingletonKey", WithKeys.of((Void) null)); - keyedResults.setCoder(KvCoder.of(VoidCoder.of(), FileResultCoder.of())); + keyedResults.setCoder(KvCoder.of(VoidCoder.of(), + FileResultCoder.of(shardedWindowCoder))); // Is the continuation trigger sufficient? keyedResults @@ -497,7 +537,7 @@ public void processElement(ProcessContext c) throws Exception { LOG.info("Finalizing write operation {}.", writeOperation); List results = Lists.newArrayList(c.element().getValue()); writeOperation.finalize(results); - LOG.debug("Done finalizing write operation {}", writeOperation); + LOG.debug("Done finalizing write operation"); } })); } else { @@ -511,7 +551,7 @@ public void processElement(ProcessContext c) throws Exception { // Finalize the write in another do-once ParDo on the singleton collection containing the // Writer. The results from the per-bundle writes are given as an Iterable side input. - // The FileBasedWriteOperation's state is the same as after its initialization in the first + // The WriteOperation's state is the same as after its initialization in the first // do-once ParDo. There is a dependency between this ParDo and the parallel write (the writer // results collection as a side input), so it will happen after the parallel write. // For the non-windowed case, we guarantee that if no data is written but the user has @@ -542,9 +582,8 @@ public void processElement(ProcessContext c) throws Exception { "Creating {} empty output shards in addition to {} written for a total of {}.", extraShardsNeeded, results.size(), minShardsNeeded); for (int i = 0; i < extraShardsNeeded; ++i) { - FileBasedWriter writer = writeOperation.createWriter(); - writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, - UNKNOWN_NUMSHARDS); + Writer writer = writeOperation.createWriter(); + writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM); FileResult emptyWrite = writer.close(); results.add(emptyWrite); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java index d9bcef441d43..caad75989cbd 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java @@ -46,12 +46,12 @@ import java.util.Map; import java.util.zip.GZIPInputStream; import org.apache.beam.sdk.io.FileBasedSink.CompressionType; -import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation; -import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriter; import org.apache.beam.sdk.io.FileBasedSink.FileResult; import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy.Context; import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory; +import org.apache.beam.sdk.io.FileBasedSink.WriteOperation; +import org.apache.beam.sdk.io.FileBasedSink.Writer; import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; @@ -67,8 +67,7 @@ */ @RunWith(JUnit4.class) public class FileBasedSinkTest { - @Rule - public TemporaryFolder tmpFolder = new TemporaryFolder(); + @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); private final String tempDirectoryName = "temp"; @@ -88,13 +87,13 @@ private ResourceId getBaseTempDirectory() { } /** - * FileBasedWriter opens the correct file, writes the header, footer, and elements in the - * correct order, and returns the correct filename. + * Writer opens the correct file, writes the header, footer, and elements in the correct + * order, and returns the correct filename. */ @Test public void testWriter() throws Exception { String testUid = "testId"; - ResourceId expectedFile = getBaseTempDirectory() + ResourceId expectedTempFile = getBaseTempDirectory() .resolve(testUid, StandardResolveOptions.RESOLVE_FILE); List values = Arrays.asList("sympathetic vulture", "boresome hummingbird"); List expected = new ArrayList<>(); @@ -104,14 +103,15 @@ public void testWriter() throws Exception { SimpleSink.SimpleWriter writer = buildWriteOperationWithTempDir(getBaseTempDirectory()).createWriter(); - writer.openUnwindowed(testUid, -1, -1); + writer.openUnwindowed(testUid, -1); for (String value : values) { writer.write(value); } FileResult result = writer.close(); - assertEquals(expectedFile, result.getFilename()); - assertFileContains(expected, expectedFile); + FileBasedSink sink = writer.getWriteOperation().getSink(); + assertEquals(expectedTempFile, result.getTempFilename()); + assertFileContains(expected, expectedTempFile); } /** @@ -131,9 +131,7 @@ private void assertFileContains(List expected, ResourceId file) throws E } } - /** - * Write lines to a file. - */ + /** Write lines to a file. */ private void writeFile(List lines, File file) throws Exception { try (PrintWriter writer = new PrintWriter(new FileOutputStream(file))) { for (String line : lines) { @@ -150,18 +148,14 @@ public void testRemoveWithTempFilename() throws Exception { testRemoveTemporaryFiles(3, getBaseTempDirectory()); } - /** - * Finalize copies temporary files to output files and removes any temporary files. - */ + /** Finalize copies temporary files to output files and removes any temporary files. */ @Test public void testFinalize() throws Exception { List files = generateTemporaryFilesForFinalize(3); runFinalize(buildWriteOperation(), files); } - /** - * Finalize can be called repeatedly. - */ + /** Finalize can be called repeatedly. */ @Test public void testFinalizeMultipleCalls() throws Exception { List files = generateTemporaryFilesForFinalize(3); @@ -170,9 +164,7 @@ public void testFinalizeMultipleCalls() throws Exception { runFinalize(writeOp, files); } - /** - * Finalize can be called when some temporary files do not exist and output files exist. - */ + /** Finalize can be called when some temporary files do not exist and output files exist. */ @Test public void testFinalizeWithIntermediateState() throws Exception { SimpleSink.SimpleWriteOperation writeOp = buildWriteOperation(); @@ -186,14 +178,12 @@ public void testFinalizeWithIntermediateState() throws Exception { runFinalize(writeOp, files); } - /** - * Generate n temporary files using the temporary file pattern of FileBasedWriter. - */ + /** Generate n temporary files using the temporary file pattern of Writer. */ private List generateTemporaryFilesForFinalize(int numFiles) throws Exception { List temporaryFiles = new ArrayList<>(); for (int i = 0; i < numFiles; i++) { ResourceId temporaryFile = - FileBasedWriteOperation.buildTemporaryFilename(getBaseTempDirectory(), "" + i); + WriteOperation.buildTemporaryFilename(getBaseTempDirectory(), "" + i); File tmpFile = new File(tmpFolder.getRoot(), temporaryFile.toString()); tmpFile.getParentFile().mkdirs(); assertTrue(tmpFile.createNewFile()); @@ -203,18 +193,20 @@ private List generateTemporaryFilesForFinalize(int numFiles) throws Except return temporaryFiles; } - /** - * Finalize and verify that files are copied and temporary files are optionally removed. - */ + /** Finalize and verify that files are copied and temporary files are optionally removed. */ private void runFinalize(SimpleSink.SimpleWriteOperation writeOp, List temporaryFiles) throws Exception { int numFiles = temporaryFiles.size(); List fileResults = new ArrayList<>(); // Create temporary output bundles and output File objects. - for (File f : temporaryFiles) { - ResourceId file = LocalResources.fromFile(f, false); - fileResults.add(new FileResult(file, null)); + for (int i = 0; i < numFiles; i++) { + fileResults.add( + new FileResult( + LocalResources.fromFile(temporaryFiles.get(i), false), + WriteFiles.UNKNOWN_SHARDNUM, + null, + null)); } writeOp.finalize(fileResults); @@ -233,8 +225,8 @@ private void runFinalize(SimpleSink.SimpleWriteOperation writeOp, List tem } /** - * Create n temporary and output files and verify that removeTemporaryFiles only - * removes temporary files. + * Create n temporary and output files and verify that removeTemporaryFiles only removes temporary + * files. */ private void testRemoveTemporaryFiles(int numFiles, ResourceId tempDirectory) throws Exception { @@ -242,14 +234,14 @@ private void testRemoveTemporaryFiles(int numFiles, ResourceId tempDirectory) SimpleSink sink = new SimpleSink(getBaseOutputDirectory(), prefix, "", ""); - FileBasedWriteOperation writeOp = + WriteOperation writeOp = new SimpleSink.SimpleWriteOperation(sink, tempDirectory); List temporaryFiles = new ArrayList<>(); List outputFiles = new ArrayList<>(); for (int i = 0; i < numFiles; i++) { ResourceId tempResource = - FileBasedWriteOperation.buildTemporaryFilename(tempDirectory, prefix + i); + WriteOperation.buildTemporaryFilename(tempDirectory, prefix + i); File tmpFile = new File(tempResource.toString()); tmpFile.getParentFile().mkdirs(); assertTrue("not able to create new temp file", tmpFile.createNewFile()); @@ -276,9 +268,7 @@ private void testRemoveTemporaryFiles(int numFiles, ResourceId tempDirectory) } } - /** - * Output files are copied to the destination location with the correct names and contents. - */ + /** Output files are copied to the destination location with the correct names and contents. */ @Test public void testCopyToOutputFiles() throws Exception { SimpleSink.SimpleWriteOperation writeOp = buildWriteOperation(); @@ -329,6 +319,7 @@ public List generateDestinationFilenames( /** * Output filenames are generated correctly when an extension is supplied. */ + @Test public void testGenerateOutputFilenames() { List expected; @@ -357,9 +348,7 @@ public void testGenerateOutputFilenames() { assertEquals(expected, actual); } - /** - * Reject non-distinct output filenames. - */ + /** Reject non-distinct output filenames. */ @Test public void testCollidingOutputFilenames() throws IOException { ResourceId root = getBaseOutputDirectory(); @@ -372,22 +361,19 @@ public void testCollidingOutputFilenames() throws IOException { ResourceId output = root.resolve("file-03.test", StandardResolveOptions.RESOLVE_FILE); // More than one shard does. try { - Iterable results = Lists.newArrayList( - new FileResult(temp1, output), - new FileResult(temp2, output), - new FileResult(temp3, output)); - + Iterable results = + Lists.newArrayList( + new FileResult(temp1, 1, null, null), + new FileResult(temp2, 1, null, null), + new FileResult(temp3, 1, null, null)); writeOp.buildOutputFilenames(results); fail("Should have failed."); } catch (IllegalStateException exn) { - assertEquals("Only generated 1 distinct file names for 3 files.", - exn.getMessage()); + assertEquals("Only generated 1 distinct file names for 3 files.", exn.getMessage()); } } - /** - * Output filenames are generated correctly when an extension is not supplied. - */ + /** Output filenames are generated correctly when an extension is not supplied. */ @Test public void testGenerateOutputFilenamesWithoutExtension() { List expected; @@ -415,53 +401,59 @@ public void testGenerateOutputFilenamesWithoutExtension() { assertEquals(expected, actual); } - /** - * {@link CompressionType#BZIP2} correctly writes BZip2 data. - */ + /** {@link CompressionType#BZIP2} correctly writes BZip2 data. */ @Test public void testCompressionTypeBZIP2() throws FileNotFoundException, IOException { final File file = writeValuesWithWritableByteChannelFactory(CompressionType.BZIP2, "abc", "123"); // Read Bzip2ed data back in using Apache commons API (de facto standard). - assertReadValues(new BufferedReader(new InputStreamReader( - new BZip2CompressorInputStream(new FileInputStream(file)), StandardCharsets.UTF_8.name())), - "abc", "123"); + assertReadValues( + new BufferedReader( + new InputStreamReader( + new BZip2CompressorInputStream(new FileInputStream(file)), + StandardCharsets.UTF_8.name())), + "abc", + "123"); } - /** - * {@link CompressionType#GZIP} correctly writes Gzipped data. - */ + /** {@link CompressionType#GZIP} correctly writes Gzipped data. */ @Test public void testCompressionTypeGZIP() throws FileNotFoundException, IOException { final File file = writeValuesWithWritableByteChannelFactory(CompressionType.GZIP, "abc", "123"); // Read Gzipped data back in using standard API. - assertReadValues(new BufferedReader(new InputStreamReader( - new GZIPInputStream(new FileInputStream(file)), StandardCharsets.UTF_8.name())), "abc", + assertReadValues( + new BufferedReader( + new InputStreamReader( + new GZIPInputStream(new FileInputStream(file)), StandardCharsets.UTF_8.name())), + "abc", "123"); } - /** - * {@link CompressionType#DEFLATE} correctly writes deflate data. - */ + /** {@link CompressionType#DEFLATE} correctly writes deflate data. */ @Test public void testCompressionTypeDEFLATE() throws FileNotFoundException, IOException { - final File file = writeValuesWithWritableByteChannelFactory( - CompressionType.DEFLATE, "abc", "123"); + final File file = + writeValuesWithWritableByteChannelFactory(CompressionType.DEFLATE, "abc", "123"); // Read Gzipped data back in using standard API. - assertReadValues(new BufferedReader(new InputStreamReader(new DeflateCompressorInputStream( - new FileInputStream(file)), StandardCharsets.UTF_8.name())), "abc", "123"); + assertReadValues( + new BufferedReader( + new InputStreamReader( + new DeflateCompressorInputStream(new FileInputStream(file)), + StandardCharsets.UTF_8.name())), + "abc", + "123"); } - /** - * {@link CompressionType#UNCOMPRESSED} correctly writes uncompressed data. - */ + /** {@link CompressionType#UNCOMPRESSED} correctly writes uncompressed data. */ @Test public void testCompressionTypeUNCOMPRESSED() throws FileNotFoundException, IOException { final File file = writeValuesWithWritableByteChannelFactory(CompressionType.UNCOMPRESSED, "abc", "123"); // Read uncompressed data back in using standard API. - assertReadValues(new BufferedReader(new InputStreamReader( - new FileInputStream(file), StandardCharsets.UTF_8.name())), "abc", + assertReadValues( + new BufferedReader( + new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8.name())), + "abc", "123"); } @@ -487,17 +479,17 @@ private File writeValuesWithWritableByteChannelFactory(final WritableByteChannel } /** - * {@link FileBasedWriter} writes to the {@link WritableByteChannel} provided by - * {@link DrunkWritableByteChannelFactory}. + * {@link Writer} writes to the {@link WritableByteChannel} provided by {@link + * DrunkWritableByteChannelFactory}. */ @Test public void testFileBasedWriterWithWritableByteChannelFactory() throws Exception { final String testUid = "testId"; ResourceId root = getBaseOutputDirectory(); - FileBasedWriteOperation writeOp = + WriteOperation writeOp = new SimpleSink(root, "file", "-SS-of-NN", "txt", new DrunkWritableByteChannelFactory()) .createWriteOperation(); - final FileBasedWriter writer = writeOp.createWriter(); + final Writer writer = writeOp.createWriter(); final ResourceId expectedFile = writeOp.tempDirectory.get().resolve(testUid, StandardResolveOptions.RESOLVE_FILE); @@ -511,18 +503,16 @@ public void testFileBasedWriterWithWritableByteChannelFactory() throws Exception expected.add("footer"); expected.add("footer"); - writer.openUnwindowed(testUid, -1, -1); + writer.openUnwindowed(testUid, -1); writer.write("a"); writer.write("b"); final FileResult result = writer.close(); - assertEquals(expectedFile, result.getFilename()); + assertEquals(expectedFile, result.getTempFilename()); assertFileContains(expected, expectedFile); } - /** - * Build a SimpleSink with default options. - */ + /** Build a SimpleSink with default options. */ private SimpleSink buildSink() { return new SimpleSink(getBaseOutputDirectory(), "file", "-SS-of-NN", ".test"); } @@ -535,9 +525,7 @@ private SimpleSink.SimpleWriteOperation buildWriteOperationWithTempDir(ResourceI return new SimpleSink.SimpleWriteOperation(sink, tempDirectory); } - /** - * Build a write operation with the default options for it and its parent sink. - */ + /** Build a write operation with the default options for it and its parent sink. */ private SimpleSink.SimpleWriteOperation buildWriteOperation() { return buildSink().createWriteOperation(); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java index 9265520b2486..c97313d397dd 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java @@ -45,7 +45,7 @@ public SimpleWriteOperation createWriteOperation() { return new SimpleWriteOperation(this); } - static final class SimpleWriteOperation extends FileBasedWriteOperation { + static final class SimpleWriteOperation extends WriteOperation { public SimpleWriteOperation(SimpleSink sink, ResourceId tempOutputDirectory) { super(sink, tempOutputDirectory); } @@ -60,7 +60,7 @@ public SimpleWriter createWriter() throws Exception { } } - static final class SimpleWriter extends FileBasedWriter { + static final class SimpleWriter extends Writer { static final String HEADER = "header"; static final String FOOTER = "footer"; diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java index 963ab1b10884..60075a777221 100644 --- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java +++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java @@ -76,9 +76,9 @@ void populateFileBasedDisplayData(DisplayData.Builder builder) { } /** - * {@link FileBasedSink.FileBasedWriteOperation} for XML {@link FileBasedSink}s. + * {@link WriteOperation} for XML {@link FileBasedSink}s. */ - protected static final class XmlWriteOperation extends FileBasedWriteOperation { + protected static final class XmlWriteOperation extends WriteOperation { public XmlWriteOperation(XmlSink sink) { super(sink); } @@ -113,9 +113,9 @@ ResourceId getTemporaryDirectory() { } /** - * A {@link FileBasedWriter} that can write objects as XML elements. + * A {@link Writer} that can write objects as XML elements. */ - protected static final class XmlWriter extends FileBasedWriter { + protected static final class XmlWriter extends Writer { final Marshaller marshaller; private OutputStream os = null; From ede0e3ba00c2bd380738e92002f437cada0f0510 Mon Sep 17 00:00:00 2001 From: Eugene Kirpichov Date: Wed, 10 May 2017 12:30:24 -0700 Subject: [PATCH 2/2] This closes #3059 --- examples/java/pom.xml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/examples/java/pom.xml b/examples/java/pom.xml index 68dea2ece03d..1ca6b2adf43c 100644 --- a/examples/java/pom.xml +++ b/examples/java/pom.xml @@ -472,6 +472,11 @@ google-api-services-bigquery + + com.google.code.findbugs + jsr305 + + com.google.http-client google-http-client