From 981c2f4aae87563d3e426accd8f7f7ac3679138f Mon Sep 17 00:00:00 2001 From: Borisa Zivkovic Date: Mon, 15 May 2017 08:56:19 +0100 Subject: [PATCH 1/4] adding windowing into default filename policy --- .../beam/sdk/io/DefaultFilenamePolicy.java | 78 +++++++++++++++---- .../sdk/io/DefaultFilenamePolicyTest.java | 22 ++++++ 2 files changed, 85 insertions(+), 15 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java index 07bc2db4d804..dfc2a11e6777 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java @@ -32,13 +32,16 @@ import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; /** - * A default {@link FilenamePolicy} for unwindowed files. This policy is constructed using three - * parameters that together define the output name of a sharded file, in conjunction with the number - * of shards and index of the particular file, using {@link #constructName}. + * A default {@link FilenamePolicy} for windowed and unwindowed files. This policy is constructed + * using three parameters that together define the output name of a sharded file, in conjunction + * with the number of shards and index of the particular file, using {@link #constructName}. * - *

Most users of unwindowed files will use this {@link DefaultFilenamePolicy}. For more advanced + *

Most users will use this {@link DefaultFilenamePolicy}. For more advanced * uses in generating different files for each window and other sharding controls, see the * {@code WriteOneFilePerWindow} example pipeline. */ @@ -46,8 +49,10 @@ public final class DefaultFilenamePolicy extends FilenamePolicy { /** The default sharding name template used in {@link #constructUsingStandardParameters}. */ public static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX; + private static final String DEFAULT_WINDOWED_SHARED_TEMPLATE_SUFFIX = "-PPP-W"; + // Pattern that matches shard placeholders within a shard template. - private static final Pattern SHARD_FORMAT_RE = Pattern.compile("(S+|N+)"); + private static final Pattern SHARD_FORMAT_RE = Pattern.compile("(S+|N+|P+|W)"); /** * Constructs a new {@link DefaultFilenamePolicy}. @@ -95,29 +100,45 @@ public static DefaultFilenamePolicy constructUsingStandardParameters( * strings. * *

Within a shard template, repeating sequences of the letters "S" or "N" - * are replaced with the shard number, or number of shards respectively. The - * numbers are formatted with leading zeros to match the length of the + * are replaced with the shard number, or number of shards respectively. + * Repeating sequence of "P" is replaced with the window + * The numbers are formatted with leading zeros to match the length of the * repeated sequence of letters. + * "W" is replaced by stringification of current window. * *

For example, if prefix = "output", shardTemplate = "-SSS-of-NNN", and * suffix = ".txt", with shardNum = 1 and numShards = 100, the following is * produced: "output-001-of-100.txt". */ - public static String constructName( - String prefix, String shardTemplate, String suffix, int shardNum, int numShards) { + static String constructName( + String prefix, String shardTemplate, String suffix, int shardNum, int numShards, + long currentPaneIndex, String windowStr) { // Matcher API works with StringBuffer, rather than StringBuilder. StringBuffer sb = new StringBuffer(); sb.append(prefix); Matcher m = SHARD_FORMAT_RE.matcher(shardTemplate); while (m.find()) { - boolean isShardNum = (m.group(1).charAt(0) == 'S'); + boolean isCurrentShardNum = (m.group(1).charAt(0) == 'S'); + boolean isNumberOfShards = (m.group(1).charAt(0) == 'N'); + boolean isCurrentPaneIndex = (m.group(1).charAt(0) == 'P') && currentPaneIndex > -1; + boolean isWindow = (m.group(1).charAt(0) == 'W') && windowStr != null; char[] zeros = new char[m.end() - m.start()]; Arrays.fill(zeros, '0'); DecimalFormat df = new DecimalFormat(String.valueOf(zeros)); - String formatted = df.format(isShardNum ? shardNum : numShards); - m.appendReplacement(sb, formatted); + if (isCurrentShardNum) { + String formatted = df.format(shardNum); + m.appendReplacement(sb, formatted); + } else if (isNumberOfShards) { + String formatted = df.format(numShards); + m.appendReplacement(sb, formatted); + } else if (isCurrentPaneIndex) { + String formatted = df.format(currentPaneIndex); + m.appendReplacement(sb, formatted); + } else if (isWindow) { + m.appendReplacement(sb, windowStr); + } } m.appendTail(sb); @@ -125,6 +146,11 @@ public static String constructName( return sb.toString(); } + static String constructName(String prefix, String shardTemplate, String suffix, int shardNum, + int numShards) { + return constructName(prefix, shardTemplate, suffix, shardNum, numShards, -1L, null); + } + @Override @Nullable public ResourceId unwindowedFilename(ResourceId outputDirectory, Context context, @@ -138,9 +164,31 @@ public ResourceId unwindowedFilename(ResourceId outputDirectory, Context context @Override public ResourceId windowedFilename(ResourceId outputDirectory, - WindowedContext c, String extension) { - throw new UnsupportedOperationException("There is no default policy for windowed file" - + " output. Please provide an explicit FilenamePolicy to generate filenames."); + WindowedContext context, String extension) { + long currentPaneIndex = (context.getPaneInfo() == null ? -1L + : context.getPaneInfo().getIndex()); + String windowStr = windowToString(context.getWindow()); + String windowShardTemplate = shardTemplate + DEFAULT_WINDOWED_SHARED_TEMPLATE_SUFFIX; + String filename = constructName(prefix.get(), windowShardTemplate, suffix, + context.getShardNumber(), + context.getNumShards(), currentPaneIndex, windowStr) + + extension; + return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE); + } + + /* + * Since not all windows have toString() that is nice or is compatible to be part of file name. + */ + private String windowToString(BoundedWindow window) { + if (window instanceof GlobalWindow) { + return "GlobalWindow"; + } + if (window instanceof IntervalWindow) { + IntervalWindow iw = (IntervalWindow) window; + return String.format("IntervalWindow-from-%s-to-%s", iw.start().toString(), + iw.end().toString()); + } + return window.toString(); } @Override diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java index c895da87f718..f5b58ffc6619 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java @@ -52,4 +52,26 @@ public void testConstructNameWithLargeShardCount() { assertEquals("out-100-of-5000.txt", constructName("out", "-SS-of-NN", ".txt", 100, 5000)); } + + @Test + public void testConstructWindowedName() { + assertEquals("output-001-of-123.txt", + constructName("output", "-SSS-of-NNN", ".txt", 1, 123, -1, null)); + + assertEquals("output-001-of-123-PPP-W.txt", + constructName("output", "-SSS-of-NNN-PPP-W", ".txt", 1, 123, -1, null)); + + assertEquals("out.txt/part-00042-003-myWindowStr", + constructName("out.txt", "/part-SSSSS-PPP-W", "", 42, 100, 3, "myWindowStr")); + + assertEquals("out.txt", constructName("ou", "t.t", "xt", 1, 1, 2, "anotherWindowStr")); + + assertEquals("out0102shard-oneMoreWindowStr-0013.txt", + constructName("out", "SSNNshard-W-PPPP", ".txt", 1, 2, 13, "oneMoreWindowStr")); + + assertEquals("out-2/1.part-1-of-2-slidingWindow1-33-windowslidingWindow1-pane033.txt", + constructName("out", + "-N/S.part-S-of-N-W-PP-windowW-panePPP", ".txt", 1, 2, 33, "slidingWindow1")); + } + } From 9f9d61b3320553f421da04e72b7482418bc53d30 Mon Sep 17 00:00:00 2001 From: Borisa Zivkovic Date: Mon, 15 May 2017 11:30:19 +0100 Subject: [PATCH 2/4] adding windowed file naming policy to DefaultFileNamePolicy --- .../org/apache/beam/sdk/io/DefaultFilenamePolicy.java | 10 ++++++++-- .../src/main/java/org/apache/beam/sdk/io/TextIO.java | 8 ++++---- .../test/java/org/apache/beam/sdk/io/TextIOTest.java | 10 ---------- 3 files changed, 12 insertions(+), 16 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java index dfc2a11e6777..eb500dec8d8f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java @@ -49,6 +49,10 @@ public final class DefaultFilenamePolicy extends FilenamePolicy { /** The default sharding name template used in {@link #constructUsingStandardParameters}. */ public static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX; + /** The default windowed sharding name template used when writing windowed files. + * Currently this is automatically appended to provided sharding name template + * when there is a need to write windowed files. + */ private static final String DEFAULT_WINDOWED_SHARED_TEMPLATE_SUFFIX = "-PPP-W"; // Pattern that matches shard placeholders within a shard template. @@ -74,7 +78,9 @@ public final class DefaultFilenamePolicy extends FilenamePolicy { *

Any filename component of the provided resource will be used as the filename prefix. * *

If provided, the shard name template will be used; otherwise {@link #DEFAULT_SHARD_TEMPLATE} - * will be used. + * will be used. Shard name template will automatically be expanded in case when there is + * need to write windowed files. There is no need to specify any template for how windowed + * file names will be constructed. * *

If provided, the suffix will be used; otherwise the files will have an empty suffix. */ @@ -101,7 +107,7 @@ public static DefaultFilenamePolicy constructUsingStandardParameters( * *

Within a shard template, repeating sequences of the letters "S" or "N" * are replaced with the shard number, or number of shards respectively. - * Repeating sequence of "P" is replaced with the window + * Repeating sequence of "P" is replaced with the index of the window pane. * The numbers are formatted with leading zeros to match the length of the * repeated sequence of letters. * "W" is replaced by stringification of current window. diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index 5c068ce06441..afb5849ef7c0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -70,8 +70,10 @@ * {@link TextIO.Write#withWindowedWrites()} will cause windowing and triggering to be * preserved. When producing windowed writes, the number of output shards must be set explicitly * using {@link TextIO.Write#withNumShards(int)}; some runners may set this for you to a - * runner-chosen value, so you may need not set it yourself. A {@link FilenamePolicy} must be - * set, and unique windows and triggers must produce unique filenames. + * runner-chosen value, so you may need not set it yourself. A {@link FilenamePolicy} can also be + * set in case you need better control over naming files created by unique windows. + * {@link DefaultFilenamePolicy} policy for producing unique filenames might not be appropriate + * for your use case. * *

Any existing files with the same names as generated output files will be overwritten. * @@ -434,8 +436,6 @@ public PDone expand(PCollection input) { (getFilenamePolicy() == null) || (getShardTemplate() == null && getFilenameSuffix() == null), "Cannot set a filename policy and also a filename template or suffix."); - checkState(!getWindowedWrites() || (getFilenamePolicy() != null), - "When using windowed writes, a filename policy must be set via withFilenamePolicy()."); FilenamePolicy usedFilenamePolicy = getFilenamePolicy(); if (usedFilenamePolicy == null) { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java index 0d8fbbd34fa3..6c7a53f87664 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java @@ -1103,15 +1103,5 @@ public void testInitialSplitGzipModeGz() throws Exception { SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options); } - @Test - public void testWindowedWriteRequiresFilenamePolicy() { - PCollection emptyInput = p.apply(Create.empty(StringUtf8Coder.of())); - TextIO.Write write = TextIO.write().to("/tmp/some/file").withWindowedWrites(); - - expectedException.expect(IllegalStateException.class); - expectedException.expectMessage( - "When using windowed writes, a filename policy must be set via withFilenamePolicy()"); - emptyInput.apply(write); - } } From c6dac59d02dc91184f280693a3121f42ee45c121 Mon Sep 17 00:00:00 2001 From: Borisa Zivkovic Date: Fri, 19 May 2017 15:30:17 +0100 Subject: [PATCH 3/4] fixing after initial code review --- .../beam/sdk/io/DefaultFilenamePolicy.java | 94 +++++++++++++++---- .../sdk/io/DefaultFilenamePolicyTest.java | 48 ++++++++-- 2 files changed, 120 insertions(+), 22 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java index eb500dec8d8f..5daf6ee08c5c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java @@ -35,17 +35,24 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A default {@link FilenamePolicy} for windowed and unwindowed files. This policy is constructed * using three parameters that together define the output name of a sharded file, in conjunction - * with the number of shards and index of the particular file, using {@link #constructName}. + * with the number of shards, index of the particular file, current window and pane information, + * using {@link #constructName}. * *

Most users will use this {@link DefaultFilenamePolicy}. For more advanced * uses in generating different files for each window and other sharding controls, see the * {@code WriteOneFilePerWindow} example pipeline. */ public final class DefaultFilenamePolicy extends FilenamePolicy { + + private static final Logger LOG = LoggerFactory.getLogger(DefaultFilenamePolicy.class); + /** The default sharding name template used in {@link #constructUsingStandardParameters}. */ public static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX; @@ -53,10 +60,28 @@ public final class DefaultFilenamePolicy extends FilenamePolicy { * Currently this is automatically appended to provided sharding name template * when there is a need to write windowed files. */ - private static final String DEFAULT_WINDOWED_SHARED_TEMPLATE_SUFFIX = "-PPP-W"; + private static final String DEFAULT_WINDOWED_SHARED_TEMPLATE_SUFFIX = + "-PPP-firstpane-F-lastpane-L-W"; + + /* + * pattern for only non-windowed file names + */ + private static final String NON_WINDOWED_ONLY_PATTERN = "S+|N+"; + + /* + * pattern for only windowed file names + */ + private static final String WINDOWED_ONLY_PATTERN = "P+|L|F|W"; + + /* + * pattern for both windowed and non-windowed file names + */ + private static final String TEMPLATE_PATTERN = "(" + NON_WINDOWED_ONLY_PATTERN + "|" + + WINDOWED_ONLY_PATTERN + ")"; // Pattern that matches shard placeholders within a shard template. - private static final Pattern SHARD_FORMAT_RE = Pattern.compile("(S+|N+|P+|W)"); + private static final Pattern SHARD_FORMAT_RE = Pattern.compile(TEMPLATE_PATTERN); + private static final Pattern WINDOWED_FORMAT_RE = Pattern.compile(WINDOWED_ONLY_PATTERN); /** * Constructs a new {@link DefaultFilenamePolicy}. @@ -68,6 +93,11 @@ public final class DefaultFilenamePolicy extends FilenamePolicy { this.prefix = prefix; this.shardTemplate = shardTemplate; this.suffix = suffix; + boolean isWindowed = isWindowedTemplate(this.shardTemplate); + if (!isWindowed){ + LOG.info("Template {} does not have enough information to create windowed file names." + + "Will use default template for windowed file names if needed.", this.shardTemplate); + } } /** @@ -78,9 +108,11 @@ public final class DefaultFilenamePolicy extends FilenamePolicy { *

Any filename component of the provided resource will be used as the filename prefix. * *

If provided, the shard name template will be used; otherwise {@link #DEFAULT_SHARD_TEMPLATE} - * will be used. Shard name template will automatically be expanded in case when there is - * need to write windowed files. There is no need to specify any template for how windowed - * file names will be constructed. + * will be used. + * + *

Shard name template will automatically be expanded in case when there is + * need to write windowed files and user did not provide enough information in the + * template to deal with windowed file names. * *

If provided, the suffix will be used; otherwise the files will have an empty suffix. */ @@ -98,6 +130,17 @@ public static DefaultFilenamePolicy constructUsingStandardParameters( private final String shardTemplate; private final String suffix; + /* + * Checks whether given template contains enough information to form windowed file names + */ + static boolean isWindowedTemplate(String template){ + if (template != null){ + Matcher m = WINDOWED_FORMAT_RE.matcher(template); + return m.find(); + } + return false; + } + /** * Constructs a fully qualified name from components. * @@ -108,17 +151,20 @@ public static DefaultFilenamePolicy constructUsingStandardParameters( *

Within a shard template, repeating sequences of the letters "S" or "N" * are replaced with the shard number, or number of shards respectively. * Repeating sequence of "P" is replaced with the index of the window pane. - * The numbers are formatted with leading zeros to match the length of the - * repeated sequence of letters. + * "L" is replaced with "true" in case of last pane or "false" otherwise. + * "F" is replaced with "true" in case of first pane or "false" otherwise. * "W" is replaced by stringification of current window. * + *

The numbers are formatted with leading zeros to match the length of the + * repeated sequence of letters. + * *

For example, if prefix = "output", shardTemplate = "-SSS-of-NNN", and * suffix = ".txt", with shardNum = 1 and numShards = 100, the following is * produced: "output-001-of-100.txt". */ static String constructName( String prefix, String shardTemplate, String suffix, int shardNum, int numShards, - long currentPaneIndex, String windowStr) { + long currentPaneIndex, boolean firstPane, boolean lastPane, String windowStr) { // Matcher API works with StringBuffer, rather than StringBuilder. StringBuffer sb = new StringBuffer(); sb.append(prefix); @@ -129,6 +175,8 @@ static String constructName( boolean isNumberOfShards = (m.group(1).charAt(0) == 'N'); boolean isCurrentPaneIndex = (m.group(1).charAt(0) == 'P') && currentPaneIndex > -1; boolean isWindow = (m.group(1).charAt(0) == 'W') && windowStr != null; + boolean isLastPane = (m.group(1).charAt(0) == 'L'); + boolean isFirstPane = (m.group(1).charAt(0) == 'F'); char[] zeros = new char[m.end() - m.start()]; Arrays.fill(zeros, '0'); @@ -144,6 +192,10 @@ static String constructName( m.appendReplacement(sb, formatted); } else if (isWindow) { m.appendReplacement(sb, windowStr); + } else if (isFirstPane){ + m.appendReplacement(sb, String.valueOf(firstPane)); + } else if (isLastPane){ + m.appendReplacement(sb, String.valueOf(lastPane)); } } m.appendTail(sb); @@ -154,7 +206,8 @@ static String constructName( static String constructName(String prefix, String shardTemplate, String suffix, int shardNum, int numShards) { - return constructName(prefix, shardTemplate, suffix, shardNum, numShards, -1L, null); + return constructName(prefix, shardTemplate, suffix, shardNum, numShards, -1L, false, + false, null); } @Override @@ -171,19 +224,28 @@ public ResourceId unwindowedFilename(ResourceId outputDirectory, Context context @Override public ResourceId windowedFilename(ResourceId outputDirectory, WindowedContext context, String extension) { - long currentPaneIndex = (context.getPaneInfo() == null ? -1L - : context.getPaneInfo().getIndex()); + + final PaneInfo paneInfo = context.getPaneInfo(); + long currentPaneIndex = (paneInfo == null ? -1L + : paneInfo.getIndex()); + boolean firstPane = (paneInfo == null ? false : paneInfo.isFirst()); + boolean lastPane = (paneInfo == null ? false : paneInfo.isLast()); String windowStr = windowToString(context.getWindow()); - String windowShardTemplate = shardTemplate + DEFAULT_WINDOWED_SHARED_TEMPLATE_SUFFIX; - String filename = constructName(prefix.get(), windowShardTemplate, suffix, + + String templateToUse = shardTemplate; + if (!isWindowedTemplate(shardTemplate)){ + templateToUse = shardTemplate + DEFAULT_WINDOWED_SHARED_TEMPLATE_SUFFIX; + } + + String filename = constructName(prefix.get(), templateToUse, suffix, context.getShardNumber(), - context.getNumShards(), currentPaneIndex, windowStr) + context.getNumShards(), currentPaneIndex, firstPane, lastPane, windowStr) + extension; return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE); } /* - * Since not all windows have toString() that is nice or is compatible to be part of file name. + * Since not all windows have toString() that is nice or is compatible to be a part of file name. */ private String windowToString(BoundedWindow window) { if (window instanceof GlobalWindow) { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java index f5b58ffc6619..bba9d73839c8 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java @@ -18,7 +18,10 @@ package org.apache.beam.sdk.io; import static org.apache.beam.sdk.io.DefaultFilenamePolicy.constructName; +import static org.apache.beam.sdk.io.DefaultFilenamePolicy.isWindowedTemplate; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import org.junit.Test; import org.junit.runner.RunWith; @@ -29,6 +32,7 @@ */ @RunWith(JUnit4.class) public class DefaultFilenamePolicyTest { + @Test public void testConstructName() { assertEquals("output-001-of-123.txt", @@ -53,25 +57,57 @@ public void testConstructNameWithLargeShardCount() { constructName("out", "-SS-of-NN", ".txt", 100, 5000)); } + @Test + public void testIsWindowedTemplate(){ + assertTrue(isWindowedTemplate("-SSS-of-NNN-PPP-W")); + assertTrue(isWindowedTemplate("-SSS-of-NNN-PPP-W")); + assertTrue(isWindowedTemplate("-SSS-of-NNN-L")); + assertTrue(isWindowedTemplate("-SSS-of-NNN-firstPane=F")); + + assertFalse(isWindowedTemplate("-SSS-of-NNN")); + assertFalse(isWindowedTemplate("-SSS-of-lp")); + } + @Test public void testConstructWindowedName() { assertEquals("output-001-of-123.txt", - constructName("output", "-SSS-of-NNN", ".txt", 1, 123, -1, null)); + constructName("output", "-SSS-of-NNN", ".txt", 1, 123, -1, false, false, null)); assertEquals("output-001-of-123-PPP-W.txt", - constructName("output", "-SSS-of-NNN-PPP-W", ".txt", 1, 123, -1, null)); + constructName("output", "-SSS-of-NNN-PPP-W", ".txt", 1, 123, -1, false, false, null)); assertEquals("out.txt/part-00042-003-myWindowStr", - constructName("out.txt", "/part-SSSSS-PPP-W", "", 42, 100, 3, "myWindowStr")); + constructName("out.txt", "/part-SSSSS-PPP-W", "", 42, 100, 3, false, false, + "myWindowStr")); - assertEquals("out.txt", constructName("ou", "t.t", "xt", 1, 1, 2, "anotherWindowStr")); + assertEquals("out.txt", constructName("ou", "t.t", "xt", 1, 1, 2, false, false, + "anotherWindowStr")); assertEquals("out0102shard-oneMoreWindowStr-0013.txt", - constructName("out", "SSNNshard-W-PPPP", ".txt", 1, 2, 13, "oneMoreWindowStr")); + constructName("out", "SSNNshard-W-PPPP", ".txt", 1, 2, 13, false, false, + "oneMoreWindowStr")); assertEquals("out-2/1.part-1-of-2-slidingWindow1-33-windowslidingWindow1-pane033.txt", constructName("out", - "-N/S.part-S-of-N-W-PP-windowW-panePPP", ".txt", 1, 2, 33, "slidingWindow1")); + "-N/S.part-S-of-N-W-PP-windowW-panePPP", ".txt", 1, 2, 33, false, false, + "slidingWindow1")); + + // test first/last pane + assertEquals("out.txt/part-00042-003-myWindowStr-last=false-first=false", + constructName("out.txt", "/part-SSSSS-PPP-W-last=L-first=F", "", 42, 100, 3, false, + false, "myWindowStr")); + + assertEquals("out.txt", constructName("ou", "t.t", "xt", 1, 1, 2, true, true, + "anotherWindowStr")); + + assertEquals("out0102shard-oneMoreWindowStr-first_true13-0013.txt", + constructName("out", "SSNNshard-W-first_FP-PPPP", ".txt", 1, 2, 13, true, false, + "oneMoreWindowStr")); + + assertEquals("out-2/1.part-1-of-2-sWindow1-winsWindow1-p033-lastpane=true-firstpane=true.txt", + constructName("out", + "-N/S.part-S-of-N-W-winW-pPPP-lastpane=L-firstpane=F", ".txt", 1, 2, 33, + true, true, "sWindow1")); } } From 44f92494f3d34e259ba25550d7f2ce6a9199f67d Mon Sep 17 00:00:00 2001 From: Borisa Zivkovic Date: Wed, 24 May 2017 11:14:04 +0100 Subject: [PATCH 4/4] fixing after code review --- .../beam/sdk/io/DefaultFilenamePolicy.java | 85 ++++++++++--------- .../sdk/io/DefaultFilenamePolicyTest.java | 45 +++++----- 2 files changed, 66 insertions(+), 64 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java index 5daf6ee08c5c..50738546b14e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java @@ -57,11 +57,13 @@ public final class DefaultFilenamePolicy extends FilenamePolicy { public static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX; /** The default windowed sharding name template used when writing windowed files. - * Currently this is automatically appended to provided sharding name template - * when there is a need to write windowed files. + * This is used as default in cases when user did not specify shard template to + * be used and there is a need to write windowed files. In cases when user does + * specify shard template to be used then provided template will be used for both + * windowed and non-windowed file names. */ - private static final String DEFAULT_WINDOWED_SHARED_TEMPLATE_SUFFIX = - "-PPP-firstpane-F-lastpane-L-W"; + private static final String DEFAULT_WINDOWED_SHARD_TEMPLATE = + "P-W" + DEFAULT_SHARD_TEMPLATE; /* * pattern for only non-windowed file names @@ -71,7 +73,7 @@ public final class DefaultFilenamePolicy extends FilenamePolicy { /* * pattern for only windowed file names */ - private static final String WINDOWED_ONLY_PATTERN = "P+|L|F|W"; + private static final String WINDOWED_ONLY_PATTERN = "P|W"; /* * pattern for both windowed and non-windowed file names @@ -93,11 +95,6 @@ public final class DefaultFilenamePolicy extends FilenamePolicy { this.prefix = prefix; this.shardTemplate = shardTemplate; this.suffix = suffix; - boolean isWindowed = isWindowedTemplate(this.shardTemplate); - if (!isWindowed){ - LOG.info("Template {} does not have enough information to create windowed file names." - + "Will use default template for windowed file names if needed.", this.shardTemplate); - } } /** @@ -108,11 +105,8 @@ public final class DefaultFilenamePolicy extends FilenamePolicy { *

Any filename component of the provided resource will be used as the filename prefix. * *

If provided, the shard name template will be used; otherwise {@link #DEFAULT_SHARD_TEMPLATE} - * will be used. - * - *

Shard name template will automatically be expanded in case when there is - * need to write windowed files and user did not provide enough information in the - * template to deal with windowed file names. + * will be used for non-windowed file names and {@link #DEFAULT_WINDOWED_SHARD_TEMPLATE} will + * be used for windowed file names. * *

If provided, the suffix will be used; otherwise the files will have an empty suffix. */ @@ -131,7 +125,9 @@ public static DefaultFilenamePolicy constructUsingStandardParameters( private final String suffix; /* - * Checks whether given template contains enough information to form windowed file names + * Checks whether given template contains enough information to form + * meaningful windowed file names - ie whether it uses pane and window + * info. */ static boolean isWindowedTemplate(String template){ if (template != null){ @@ -150,9 +146,7 @@ static boolean isWindowedTemplate(String template){ * *

Within a shard template, repeating sequences of the letters "S" or "N" * are replaced with the shard number, or number of shards respectively. - * Repeating sequence of "P" is replaced with the index of the window pane. - * "L" is replaced with "true" in case of last pane or "false" otherwise. - * "F" is replaced with "true" in case of first pane or "false" otherwise. + * "P" is replaced with by stringification of current pane. * "W" is replaced by stringification of current window. * *

The numbers are formatted with leading zeros to match the length of the @@ -164,7 +158,7 @@ static boolean isWindowedTemplate(String template){ */ static String constructName( String prefix, String shardTemplate, String suffix, int shardNum, int numShards, - long currentPaneIndex, boolean firstPane, boolean lastPane, String windowStr) { + String paneStr, String windowStr) { // Matcher API works with StringBuffer, rather than StringBuilder. StringBuffer sb = new StringBuffer(); sb.append(prefix); @@ -173,10 +167,8 @@ static String constructName( while (m.find()) { boolean isCurrentShardNum = (m.group(1).charAt(0) == 'S'); boolean isNumberOfShards = (m.group(1).charAt(0) == 'N'); - boolean isCurrentPaneIndex = (m.group(1).charAt(0) == 'P') && currentPaneIndex > -1; + boolean isPane = (m.group(1).charAt(0) == 'P') && paneStr != null; boolean isWindow = (m.group(1).charAt(0) == 'W') && windowStr != null; - boolean isLastPane = (m.group(1).charAt(0) == 'L'); - boolean isFirstPane = (m.group(1).charAt(0) == 'F'); char[] zeros = new char[m.end() - m.start()]; Arrays.fill(zeros, '0'); @@ -187,15 +179,10 @@ static String constructName( } else if (isNumberOfShards) { String formatted = df.format(numShards); m.appendReplacement(sb, formatted); - } else if (isCurrentPaneIndex) { - String formatted = df.format(currentPaneIndex); - m.appendReplacement(sb, formatted); + } else if (isPane) { + m.appendReplacement(sb, paneStr); } else if (isWindow) { m.appendReplacement(sb, windowStr); - } else if (isFirstPane){ - m.appendReplacement(sb, String.valueOf(firstPane)); - } else if (isLastPane){ - m.appendReplacement(sb, String.valueOf(lastPane)); } } m.appendTail(sb); @@ -206,8 +193,7 @@ static String constructName( static String constructName(String prefix, String shardTemplate, String suffix, int shardNum, int numShards) { - return constructName(prefix, shardTemplate, suffix, shardNum, numShards, -1L, false, - false, null); + return constructName(prefix, shardTemplate, suffix, shardNum, numShards, null, null); } @Override @@ -225,21 +211,30 @@ public ResourceId unwindowedFilename(ResourceId outputDirectory, Context context public ResourceId windowedFilename(ResourceId outputDirectory, WindowedContext context, String extension) { + boolean shardTemplateProvidedByUser = !this.shardTemplate.equals(DEFAULT_SHARD_TEMPLATE); + + if (shardTemplateProvidedByUser){ + boolean isWindowed = isWindowedTemplate(this.shardTemplate); + if (!isWindowed){ + LOG.info("Template you provided {} does not have enough information to create" + + "meaningful windowed file names. Consider using P and W in your template", + this.shardTemplate); + } + } + final PaneInfo paneInfo = context.getPaneInfo(); - long currentPaneIndex = (paneInfo == null ? -1L - : paneInfo.getIndex()); - boolean firstPane = (paneInfo == null ? false : paneInfo.isFirst()); - boolean lastPane = (paneInfo == null ? false : paneInfo.isLast()); + String paneStr = paneInfoToString(paneInfo); String windowStr = windowToString(context.getWindow()); String templateToUse = shardTemplate; - if (!isWindowedTemplate(shardTemplate)){ - templateToUse = shardTemplate + DEFAULT_WINDOWED_SHARED_TEMPLATE_SUFFIX; + if (!shardTemplateProvidedByUser){ + LOG.info("User did not provide shard template. For creating windowed file names " + + "default template {} will be used", DEFAULT_WINDOWED_SHARD_TEMPLATE); + templateToUse = DEFAULT_WINDOWED_SHARD_TEMPLATE; } String filename = constructName(prefix.get(), templateToUse, suffix, - context.getShardNumber(), - context.getNumShards(), currentPaneIndex, firstPane, lastPane, windowStr) + context.getShardNumber(), context.getNumShards(), paneStr, windowStr) + extension; return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE); } @@ -253,12 +248,20 @@ private String windowToString(BoundedWindow window) { } if (window instanceof IntervalWindow) { IntervalWindow iw = (IntervalWindow) window; - return String.format("IntervalWindow-from-%s-to-%s", iw.start().toString(), + return String.format("IntervalWindow-%s-%s", iw.start().toString(), iw.end().toString()); } return window.toString(); } + private String paneInfoToString(PaneInfo paneInfo){ + long currentPaneIndex = (paneInfo == null ? -1L + : paneInfo.getIndex()); + boolean firstPane = (paneInfo == null ? false : paneInfo.isFirst()); + boolean lastPane = (paneInfo == null ? false : paneInfo.isLast()); + return String.format("pane-%s-%b-%b", currentPaneIndex, firstPane, lastPane); + } + @Override public void populateDisplayData(DisplayData.Builder builder) { String filenamePattern; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java index bba9d73839c8..787403bad308 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java @@ -59,10 +59,10 @@ public void testConstructNameWithLargeShardCount() { @Test public void testIsWindowedTemplate(){ - assertTrue(isWindowedTemplate("-SSS-of-NNN-PPP-W")); - assertTrue(isWindowedTemplate("-SSS-of-NNN-PPP-W")); - assertTrue(isWindowedTemplate("-SSS-of-NNN-L")); - assertTrue(isWindowedTemplate("-SSS-of-NNN-firstPane=F")); + assertTrue(isWindowedTemplate("-SSS-of-NNN-P-W")); + assertTrue(isWindowedTemplate("-SSS-of-NNN-W")); + assertTrue(isWindowedTemplate("-SSS-of-NNN-P")); + assertTrue(isWindowedTemplate("W-SSS-of-NNN")); assertFalse(isWindowedTemplate("-SSS-of-NNN")); assertFalse(isWindowedTemplate("-SSS-of-lp")); @@ -71,43 +71,42 @@ public void testIsWindowedTemplate(){ @Test public void testConstructWindowedName() { assertEquals("output-001-of-123.txt", - constructName("output", "-SSS-of-NNN", ".txt", 1, 123, -1, false, false, null)); + constructName("output", "-SSS-of-NNN", ".txt", 1, 123, null, null)); assertEquals("output-001-of-123-PPP-W.txt", - constructName("output", "-SSS-of-NNN-PPP-W", ".txt", 1, 123, -1, false, false, null)); + constructName("output", "-SSS-of-NNN-PPP-W", ".txt", 1, 123, null, null)); - assertEquals("out.txt/part-00042-003-myWindowStr", - constructName("out.txt", "/part-SSSSS-PPP-W", "", 42, 100, 3, false, false, + assertEquals("out.txt/part-00042-myPaneStr-myWindowStr", + constructName("out.txt", "/part-SSSSS-P-W", "", 42, 100, "myPaneStr", "myWindowStr")); - assertEquals("out.txt", constructName("ou", "t.t", "xt", 1, 1, 2, false, false, + assertEquals("out.txt", constructName("ou", "t.t", "xt", 1, 1, "myPaneStr2", "anotherWindowStr")); - assertEquals("out0102shard-oneMoreWindowStr-0013.txt", - constructName("out", "SSNNshard-W-PPPP", ".txt", 1, 2, 13, false, false, + assertEquals("out0102shard-oneMoreWindowStr-anotherPaneStr.txt", + constructName("out", "SSNNshard-W-P", ".txt", 1, 2, "anotherPaneStr", "oneMoreWindowStr")); - assertEquals("out-2/1.part-1-of-2-slidingWindow1-33-windowslidingWindow1-pane033.txt", - constructName("out", - "-N/S.part-S-of-N-W-PP-windowW-panePPP", ".txt", 1, 2, 33, false, false, + assertEquals("out-2/1.part-1-of-2-slidingWindow1-myPaneStr3-windowslidingWindow1-" + + "panemyPaneStr3.txt", + constructName("out", "-N/S.part-S-of-N-W-P-windowW-paneP", ".txt", 1, 2, "myPaneStr3", "slidingWindow1")); // test first/last pane - assertEquals("out.txt/part-00042-003-myWindowStr-last=false-first=false", - constructName("out.txt", "/part-SSSSS-PPP-W-last=L-first=F", "", 42, 100, 3, false, - false, "myWindowStr")); + assertEquals("out.txt/part-00042-myWindowStr-pane-11-true-false", + constructName("out.txt", "/part-SSSSS-W-P", "", 42, 100, "pane-11-true-false", + "myWindowStr")); - assertEquals("out.txt", constructName("ou", "t.t", "xt", 1, 1, 2, true, true, + assertEquals("out.txt", constructName("ou", "t.t", "xt", 1, 1, "pane", "anotherWindowStr")); - assertEquals("out0102shard-oneMoreWindowStr-first_true13-0013.txt", - constructName("out", "SSNNshard-W-first_FP-PPPP", ".txt", 1, 2, 13, true, false, + assertEquals("out0102shard-oneMoreWindowStr-pane--1-false-false-pane--1-false-false.txt", + constructName("out", "SSNNshard-W-P-P", ".txt", 1, 2, "pane--1-false-false", "oneMoreWindowStr")); - assertEquals("out-2/1.part-1-of-2-sWindow1-winsWindow1-p033-lastpane=true-firstpane=true.txt", + assertEquals("out-2/1.part-1-of-2-sWindow1-winsWindow1-ppaneL.txt", constructName("out", - "-N/S.part-S-of-N-W-winW-pPPP-lastpane=L-firstpane=F", ".txt", 1, 2, 33, - true, true, "sWindow1")); + "-N/S.part-S-of-N-W-winW-pP", ".txt", 1, 2, "paneL", "sWindow1")); } }