From cbf0ca99dab8de479d0978f1983ddc6e6f190c77 Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Mon, 10 Jul 2017 21:30:50 -0700 Subject: [PATCH 1/2] Unbundle Context and WindowedContext. --- .../common/WriteOneFilePerWindow.java | 19 ++-- .../WriteFilesTranslationTest.java | 11 ++- .../beam/sdk/io/DefaultFilenamePolicy.java | 20 ++-- .../org/apache/beam/sdk/io/FileBasedSink.java | 93 +++---------------- .../org/apache/beam/sdk/io/AvroIOTest.java | 21 +++-- .../apache/beam/sdk/io/FileBasedSinkTest.java | 8 +- .../apache/beam/sdk/io/WriteFilesTest.java | 23 +++-- 7 files changed, 79 insertions(+), 116 deletions(-) diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java index 49865ba60b7c..abd14b70118f 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java +++ b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java @@ -28,7 +28,9 @@ import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; import org.joda.time.format.DateTimeFormatter; @@ -88,14 +90,18 @@ public String filenamePrefixForWindow(IntervalWindow window) { } @Override - public ResourceId windowedFilename(WindowedContext context, OutputFileHints outputFileHints) { - IntervalWindow window = (IntervalWindow) context.getWindow(); + public ResourceId windowedFilename(int shardNumber, + int numShards, + BoundedWindow window, + PaneInfo paneInfo, + OutputFileHints outputFileHints) { + IntervalWindow intervalWindow = (IntervalWindow) window; String filename = String.format( "%s-%s-of-%s%s", - filenamePrefixForWindow(window), - context.getShardNumber(), - context.getNumShards(), + filenamePrefixForWindow(intervalWindow), + shardNumber, + numShards, outputFileHints.getSuggestedFilenameSuffix()); return baseFilename .getCurrentDirectory() @@ -103,7 +109,8 @@ public ResourceId windowedFilename(WindowedContext context, OutputFileHints outp } @Override - public ResourceId unwindowedFilename(Context context, OutputFileHints outputFileHints) { + public ResourceId unwindowedFilename( + int shardNumber, int numShards, OutputFileHints outputFileHints) { throw new UnsupportedOperationException("Unsupported."); } } diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java index 283df1657ded..a5b38fb01426 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java @@ -40,6 +40,8 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.SerializableFunctions; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; import org.junit.Test; @@ -163,13 +165,18 @@ public FileBasedSink.Writer createWriter() throws Exception { private static class DummyFilenamePolicy extends FilenamePolicy { @Override - public ResourceId windowedFilename(WindowedContext c, OutputFileHints outputFileHints) { + public ResourceId windowedFilename(int shardNumber, + int numShards, + BoundedWindow window, + PaneInfo paneInfo, + OutputFileHints outputFileHints) { throw new UnsupportedOperationException("Should never be called."); } @Nullable @Override - public ResourceId unwindowedFilename(Context c, OutputFileHints outputFileHints) { + public ResourceId unwindowedFilename( + int shardNumber, int numShards, OutputFileHints outputFileHints) { throw new UnsupportedOperationException("Should never be called."); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java index 7a60e49ebfb0..59cd95f2ee71 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java @@ -284,28 +284,32 @@ static ResourceId constructName( @Override @Nullable - public ResourceId unwindowedFilename(Context context, OutputFileHints outputFileHints) { + public ResourceId unwindowedFilename( + int shardNumber, int numShards, OutputFileHints outputFileHints) { return constructName( params.baseFilename.get(), params.shardTemplate, params.suffix + outputFileHints.getSuggestedFilenameSuffix(), - context.getShardNumber(), - context.getNumShards(), + shardNumber, + numShards, null, null); } @Override - public ResourceId windowedFilename(WindowedContext context, OutputFileHints outputFileHints) { - final PaneInfo paneInfo = context.getPaneInfo(); + public ResourceId windowedFilename(int shardNumber, + int numShards, + BoundedWindow window, + PaneInfo paneInfo, + OutputFileHints outputFileHints) { String paneStr = paneInfoToString(paneInfo); - String windowStr = windowToString(context.getWindow()); + String windowStr = windowToString(window); return constructName( params.baseFilename.get(), params.shardTemplate, params.suffix + outputFileHints.getSuggestedFilenameSuffix(), - context.getShardNumber(), - context.getNumShards(), + shardNumber, + numShards, paneStr, windowStr); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java index 583af60df68b..0edeb45749d7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java @@ -58,8 +58,6 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.StructuredCoder; import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy.Context; -import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy.WindowedContext; import org.apache.beam.sdk.io.fs.MatchResult; import org.apache.beam.sdk.io.fs.MatchResult.Metadata; import org.apache.beam.sdk.io.fs.MoveOptions.StandardMoveOptions; @@ -284,86 +282,21 @@ final Coder getDestinationCoderWithDefault(CoderRegistry registry) /** A naming policy for output files. */ @Experimental(Kind.FILESYSTEM) public abstract static class FilenamePolicy implements Serializable { - /** - * Context used for generating a name based on shard number, and num shards. - * The policy must produce unique filenames for unique {@link Context} objects. - * - *

Be careful about adding fields to this as existing strategies will not notice the new - * fields, and may not produce unique filenames. - */ - public static class Context { - private int shardNumber; - private int numShards; - - - public Context(int shardNumber, int numShards) { - this.shardNumber = shardNumber; - this.numShards = numShards; - } - - public int getShardNumber() { - return shardNumber; - } - - - public int getNumShards() { - return numShards; - } - } - - /** - * Context used for generating a name based on window, pane, shard number, and num shards. - * The policy must produce unique filenames for unique {@link WindowedContext} objects. - * - *

Be careful about adding fields to this as existing strategies will not notice the new - * fields, and may not produce unique filenames. - */ - public static class WindowedContext { - private int shardNumber; - private int numShards; - private BoundedWindow window; - private PaneInfo paneInfo; - - public WindowedContext( - BoundedWindow window, - PaneInfo paneInfo, - int shardNumber, - int numShards) { - this.window = window; - this.paneInfo = paneInfo; - this.shardNumber = shardNumber; - this.numShards = numShards; - } - - public BoundedWindow getWindow() { - return window; - } - - public PaneInfo getPaneInfo() { - return paneInfo; - } - - public int getShardNumber() { - return shardNumber; - } - - public int getNumShards() { - return numShards; - } - } - /** * When a sink has requested windowed or triggered output, this method will be invoked to return * the file {@link ResourceId resource} to be created given the base output directory and a * {@link OutputFileHints} containing information about the file, including a suggested * extension (e.g. coming from {@link CompressionType}). * - *

The {@link WindowedContext} object gives access to the window and pane, as well as - * sharding information. The policy must return unique and consistent filenames for different - * windows and panes. + *

The policy must return unique and consistent filenames for different windows and panes. */ @Experimental(Kind.FILESYSTEM) - public abstract ResourceId windowedFilename(WindowedContext c, OutputFileHints outputFileHints); + public abstract ResourceId windowedFilename( + int shardNumber, + int numShards, + BoundedWindow window, + PaneInfo paneInfo, + OutputFileHints outputFileHints); /** * When a sink has not requested windowed or triggered output, this method will be invoked to @@ -371,12 +304,13 @@ public int getNumShards() { * a {@link OutputFileHints} containing information about the file, including a suggested (e.g. * coming from {@link CompressionType}). * - *

The {@link Context} object only provides sharding information, which is used by the policy - * to generate unique and consistent filenames. + *

The shardNumber and numShards parameters, should be used by the policy to generate + * unique and consistent filenames. */ @Experimental(Kind.FILESYSTEM) @Nullable - public abstract ResourceId unwindowedFilename(Context c, OutputFileHints outputFileHints); + public abstract ResourceId unwindowedFilename( + int shardNumber, int numShards, OutputFileHints outputFileHints); /** * Populates the display data. @@ -1063,10 +997,9 @@ public ResourceId getDestinationFile( FilenamePolicy policy = dynamicDestinations.getFilenamePolicy(destination); if (getWindow() != null) { return policy.windowedFilename( - new WindowedContext(getWindow(), getPaneInfo(), getShard(), numShards), - outputFileHints); + getShard(), numShards, getWindow(), getPaneInfo(), outputFileHints); } else { - return policy.unwindowedFilename(new Context(getShard(), numShards), outputFileHints); + return policy.unwindowedFilename(getShard(), numShards, outputFileHints); } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java index 260e47a25a5c..12ff592153b6 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java @@ -68,8 +68,10 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.values.PCollection; @@ -284,7 +286,11 @@ private static class WindowedFilenamePolicy extends FilenamePolicy { } @Override - public ResourceId windowedFilename(WindowedContext input, OutputFileHints outputFileHints) { + public ResourceId windowedFilename(int shardNumber, + int numShards, + BoundedWindow window, + PaneInfo paneInfo, + OutputFileHints outputFileHints) { String filenamePrefix = outputFilePrefix.isDirectory() ? "" : firstNonNull(outputFilePrefix.getFilename(), ""); @@ -292,11 +298,11 @@ public ResourceId windowedFilename(WindowedContext input, OutputFileHints output String.format( "%s-%s-%s-of-%s-pane-%s%s%s", filenamePrefix, - input.getWindow(), - input.getShardNumber(), - input.getNumShards() - 1, - input.getPaneInfo().getIndex(), - input.getPaneInfo().isLast() ? "-final" : "", + window, + shardNumber, + numShards - 1, + paneInfo.getIndex(), + paneInfo.isLast() ? "-final" : "", outputFileHints.getSuggestedFilenameSuffix()); return outputFilePrefix .getCurrentDirectory() @@ -304,7 +310,8 @@ public ResourceId windowedFilename(WindowedContext input, OutputFileHints output } @Override - public ResourceId unwindowedFilename(Context input, OutputFileHints outputFileHints) { + public ResourceId unwindowedFilename( + int shardNumber, int numShards, OutputFileHints outputFileHints) { throw new UnsupportedOperationException("Expecting windowed outputs only"); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java index 755bb598524d..e14296d2b4ac 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java @@ -48,7 +48,6 @@ import org.apache.beam.sdk.io.FileBasedSink.CompressionType; import org.apache.beam.sdk.io.FileBasedSink.FileResult; import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; -import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy.Context; import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory; import org.apache.beam.sdk.io.FileBasedSink.WriteOperation; import org.apache.beam.sdk.io.FileBasedSink.Writer; @@ -218,7 +217,7 @@ private void runFinalize(SimpleSink.SimpleWriteOperation writeOp, List tem .getSink() .getDynamicDestinations() .getFilenamePolicy(null) - .unwindowedFilename(new Context(i, numFiles), CompressionType.UNCOMPRESSED); + .unwindowedFilename(i, numFiles, CompressionType.UNCOMPRESSED); assertTrue(new File(outputFilename.toString()).exists()); assertFalse(temporaryFiles.get(i).exists()); } @@ -301,8 +300,7 @@ public void testCopyToOutputFiles() throws Exception { .getSink() .getDynamicDestinations() .getFilenamePolicy(null) - .unwindowedFilename( - new Context(i, inputFilenames.size()), CompressionType.UNCOMPRESSED)); + .unwindowedFilename(i, inputFilenames.size(), CompressionType.UNCOMPRESSED)); } // Copy input files to output files. @@ -320,7 +318,7 @@ public List generateDestinationFilenames( List filenames = new ArrayList<>(); for (int i = 0; i < numFiles; i++) { filenames.add( - policy.unwindowedFilename(new Context(i, numFiles), CompressionType.UNCOMPRESSED)); + policy.unwindowedFilename(i, numFiles, CompressionType.UNCOMPRESSED)); } return filenames; } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java index 55f2a8720560..db61a8cac3fe 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java @@ -70,8 +70,10 @@ import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayData.Builder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Sessions; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; @@ -594,14 +596,18 @@ public String filenamePrefixForWindow(IntervalWindow window) { } @Override - public ResourceId windowedFilename(WindowedContext context, OutputFileHints outputFileHints) { - IntervalWindow window = (IntervalWindow) context.getWindow(); + public ResourceId windowedFilename(int shardNumber, + int numShards, + BoundedWindow window, + PaneInfo paneInfo, + OutputFileHints outputFileHints) { + IntervalWindow intervalWindow = (IntervalWindow) window; String filename = String.format( "%s-%s-of-%s%s%s", - filenamePrefixForWindow(window), - context.getShardNumber(), - context.getNumShards(), + filenamePrefixForWindow(intervalWindow), + shardNumber, + numShards, outputFileHints.getSuggestedFilenameSuffix(), suffix); return baseFilename @@ -610,15 +616,16 @@ public ResourceId windowedFilename(WindowedContext context, OutputFileHints outp } @Override - public ResourceId unwindowedFilename(Context context, OutputFileHints outputFileHints) { + public ResourceId unwindowedFilename( + int shardNumber, int numShards, OutputFileHints outputFileHints) { String prefix = baseFilename.isDirectory() ? "" : firstNonNull(baseFilename.getFilename(), ""); String filename = String.format( "%s-%s-of-%s%s%s", prefix, - context.getShardNumber(), - context.getNumShards(), + shardNumber, + numShards, outputFileHints.getSuggestedFilenameSuffix(), suffix); return baseFilename From 40e15d89b5dea09d2d424c39eea28d11183518ce Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Tue, 11 Jul 2017 21:46:55 -0700 Subject: [PATCH 2/2] Fix WriteToText.java. --- .../complete/game/utils/WriteToText.java | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java index 1d601987211b..6b7c928ee2e4 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java @@ -36,6 +36,7 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; import org.joda.time.DateTimeZone; @@ -143,20 +144,25 @@ public String filenamePrefixForWindow(IntervalWindow window) { } @Override - public ResourceId windowedFilename(WindowedContext context, OutputFileHints outputFileHints) { - IntervalWindow window = (IntervalWindow) context.getWindow(); + public ResourceId windowedFilename(int shardNumber, + int numShards, + BoundedWindow window, + PaneInfo paneInfo, + OutputFileHints outputFileHints) { + IntervalWindow intervalWindow = (IntervalWindow) window; String filename = String.format( "%s-%s-of-%s%s", - filenamePrefixForWindow(window), - context.getShardNumber(), - context.getNumShards(), + filenamePrefixForWindow(intervalWindow), + shardNumber, + numShards, outputFileHints.getSuggestedFilenameSuffix()); return prefix.getCurrentDirectory().resolve(filename, StandardResolveOptions.RESOLVE_FILE); } @Override - public ResourceId unwindowedFilename(Context context, OutputFileHints outputFileHints) { + public ResourceId unwindowedFilename( + int shardNumber, int numShards, OutputFileHints outputFileHints) { throw new UnsupportedOperationException("Unsupported."); } }