Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -504,8 +504,12 @@ public void flattenWithDuplicateInputsNonFlatten() {
@Test
public void writeWithRunnerDeterminedSharding() {
ResourceId outputDirectory = LocalResources.fromString("/foo/bar", true /* isDirectory */);
FilenamePolicy policy = DefaultFilenamePolicy.constructUsingStandardParameters(
StaticValueProvider.of(outputDirectory), DefaultFilenamePolicy.DEFAULT_SHARD_TEMPLATE, "");
FilenamePolicy policy =
DefaultFilenamePolicy.constructUsingStandardParameters(
StaticValueProvider.of(outputDirectory),
DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE,
"",
false);
WriteFiles<Integer> write =
WriteFiles.to(
new FileBasedSink<Integer>(StaticValueProvider.of(outputDirectory), policy) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,20 @@ public void dynamicallyReshardedWrite() throws Exception {
@Test
public void withNoShardingSpecifiedReturnsNewTransform() {
ResourceId outputDirectory = LocalResources.fromString("/foo", true /* isDirectory */);
FilenamePolicy policy = DefaultFilenamePolicy.constructUsingStandardParameters(
StaticValueProvider.of(outputDirectory), DefaultFilenamePolicy.DEFAULT_SHARD_TEMPLATE, "");
WriteFiles<Object> original = WriteFiles.to(
new FileBasedSink<Object>(StaticValueProvider.of(outputDirectory), policy) {
@Override
public WriteOperation<Object> createWriteOperation() {
throw new IllegalArgumentException("Should not be used");
}
});
FilenamePolicy policy =
DefaultFilenamePolicy.constructUsingStandardParameters(
StaticValueProvider.of(outputDirectory),
DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE,
"",
false);
WriteFiles<Object> original =
WriteFiles.to(
new FileBasedSink<Object>(StaticValueProvider.of(outputDirectory), policy) {
@Override
public WriteOperation<Object> createWriteOperation() {
throw new IllegalArgumentException("Should not be used");
}
});
@SuppressWarnings("unchecked")
PCollection<Object> objs = (PCollection) p.apply(Create.empty(VoidCoder.of()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ public PDone expand(PCollection<T> input) {
FilenamePolicy usedFilenamePolicy = getFilenamePolicy();
if (usedFilenamePolicy == null) {
usedFilenamePolicy = DefaultFilenamePolicy.constructUsingStandardParameters(
getFilenamePrefix(), getShardTemplate(), getFilenameSuffix());
getFilenamePrefix(), getShardTemplate(), getFilenameSuffix(), getWindowedWrites());
}

WriteFiles<T> write = WriteFiles.to(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -54,7 +55,7 @@ public final class DefaultFilenamePolicy extends FilenamePolicy {
private static final Logger LOG = LoggerFactory.getLogger(DefaultFilenamePolicy.class);

/** The default sharding name template used in {@link #constructUsingStandardParameters}. */
public static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;
public static final String DEFAULT_UNWINDOWED_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;

/** The default windowed sharding name template used when writing windowed files.
* This is used as default in cases when user did not specify shard template to
Expand All @@ -63,27 +64,12 @@ public final class DefaultFilenamePolicy extends FilenamePolicy {
* windowed and non-windowed file names.
*/
private static final String DEFAULT_WINDOWED_SHARD_TEMPLATE =
"P-W" + DEFAULT_SHARD_TEMPLATE;

/*
* pattern for only non-windowed file names
*/
private static final String NON_WINDOWED_ONLY_PATTERN = "S+|N+";

/*
* pattern for only windowed file names
*/
private static final String WINDOWED_ONLY_PATTERN = "P|W";
"W-P" + DEFAULT_UNWINDOWED_SHARD_TEMPLATE;

/*
* pattern for both windowed and non-windowed file names
*/
private static final String TEMPLATE_PATTERN = "(" + NON_WINDOWED_ONLY_PATTERN + "|"
+ WINDOWED_ONLY_PATTERN + ")";

// Pattern that matches shard placeholders within a shard template.
private static final Pattern SHARD_FORMAT_RE = Pattern.compile(TEMPLATE_PATTERN);
private static final Pattern WINDOWED_FORMAT_RE = Pattern.compile(WINDOWED_ONLY_PATTERN);
private static final Pattern SHARD_FORMAT_RE = Pattern.compile("(S+|N+|W|P)");

/**
* Constructs a new {@link DefaultFilenamePolicy}.
Expand All @@ -104,39 +90,30 @@ public final class DefaultFilenamePolicy extends FilenamePolicy {
*
* <p>Any filename component of the provided resource will be used as the filename prefix.
*
* <p>If provided, the shard name template will be used; otherwise {@link #DEFAULT_SHARD_TEMPLATE}
* will be used for non-windowed file names and {@link #DEFAULT_WINDOWED_SHARD_TEMPLATE} will
* be used for windowed file names.
* <p>If provided, the shard name template will be used; otherwise
* {@link #DEFAULT_UNWINDOWED_SHARD_TEMPLATE} will be used for non-windowed file names and
* {@link #DEFAULT_WINDOWED_SHARD_TEMPLATE} will be used for windowed file names.
*
* <p>If provided, the suffix will be used; otherwise the files will have an empty suffix.
*/
public static DefaultFilenamePolicy constructUsingStandardParameters(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some of these changes are backwards incompatible, and this class appears to be on the public API and not marked with @Experimental or @Internal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have marked all of FileBasedSink experimental, and this is used by that. That should cover use of FilenamePolicy as well

ValueProvider<ResourceId> outputPrefix,
@Nullable String shardTemplate,
@Nullable String filenameSuffix) {
@Nullable String filenameSuffix,
boolean windowedWrites) {
// Pick the appropriate default policy based on whether windowed writes are being performed.
String defaultTemplate =
windowedWrites ? DEFAULT_WINDOWED_SHARD_TEMPLATE : DEFAULT_UNWINDOWED_SHARD_TEMPLATE;
return new DefaultFilenamePolicy(
NestedValueProvider.of(outputPrefix, new ExtractFilename()),
firstNonNull(shardTemplate, DEFAULT_SHARD_TEMPLATE),
firstNonNull(shardTemplate, defaultTemplate),
firstNonNull(filenameSuffix, ""));
}

private final ValueProvider<String> prefix;
private final String shardTemplate;
private final String suffix;

/*
* Checks whether given template contains enough information to form
* meaningful windowed file names - ie whether it uses pane and window
* info.
*/
static boolean isWindowedTemplate(String template){
if (template != null){
Matcher m = WINDOWED_FORMAT_RE.matcher(template);
return m.find();
}
return false;
}

/**
* Constructs a fully qualified name from components.
*
Expand Down Expand Up @@ -191,51 +168,23 @@ static String constructName(
return sb.toString();
}

static String constructName(String prefix, String shardTemplate, String suffix, int shardNum,
int numShards) {
return constructName(prefix, shardTemplate, suffix, shardNum, numShards, null, null);
}

@Override
@Nullable
public ResourceId unwindowedFilename(ResourceId outputDirectory, Context context,
String extension) {
String filename =
constructName(
prefix.get(), shardTemplate, suffix, context.getShardNumber(), context.getNumShards())
+ extension;
String filename = constructName(prefix.get(), shardTemplate, suffix, context.getShardNumber(),
context.getNumShards(), null, null) + extension;
return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
}

@Override
public ResourceId windowedFilename(ResourceId outputDirectory,
WindowedContext context, String extension) {

boolean shardTemplateProvidedByUser = !this.shardTemplate.equals(DEFAULT_SHARD_TEMPLATE);

if (shardTemplateProvidedByUser){
boolean isWindowed = isWindowedTemplate(this.shardTemplate);
if (!isWindowed){
LOG.info("Template you provided {} does not have enough information to create"
+ "meaningful windowed file names. Consider using P and W in your template",
this.shardTemplate);
}
}

final PaneInfo paneInfo = context.getPaneInfo();
String paneStr = paneInfoToString(paneInfo);
String windowStr = windowToString(context.getWindow());

String templateToUse = shardTemplate;
if (!shardTemplateProvidedByUser){
LOG.info("User did not provide shard template. For creating windowed file names "
+ "default template {} will be used", DEFAULT_WINDOWED_SHARD_TEMPLATE);
templateToUse = DEFAULT_WINDOWED_SHARD_TEMPLATE;
}

String filename = constructName(prefix.get(), templateToUse, suffix,
context.getShardNumber(), context.getNumShards(), paneStr, windowStr)
+ extension;
String filename = constructName(prefix.get(), shardTemplate, suffix, context.getShardNumber(),
context.getNumShards(), paneStr, windowStr) + extension;
return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
}

Expand All @@ -248,18 +197,20 @@ private String windowToString(BoundedWindow window) {
}
if (window instanceof IntervalWindow) {
IntervalWindow iw = (IntervalWindow) window;
return String.format("IntervalWindow-%s-%s", iw.start().toString(),
iw.end().toString());
return String.format("%s-%s", iw.start().toString(), iw.end().toString());
}
return window.toString();
}

private String paneInfoToString(PaneInfo paneInfo){
long currentPaneIndex = (paneInfo == null ? -1L
: paneInfo.getIndex());
boolean firstPane = (paneInfo == null ? false : paneInfo.isFirst());
boolean lastPane = (paneInfo == null ? false : paneInfo.isLast());
return String.format("pane-%s-%b-%b", currentPaneIndex, firstPane, lastPane);
private String paneInfoToString(PaneInfo paneInfo) {
String paneString = String.format("pane-%d", paneInfo.getIndex());
if (paneInfo.getTiming() == Timing.LATE) {
paneString = String.format("%s-late", paneString);
}
if (paneInfo.isLast()) {
paneString = String.format("%s-last", paneString);
}
return paneString;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ static class TFRecordSink extends FileBasedSink<byte[]> {
super(
outputPrefix,
DefaultFilenamePolicy.constructUsingStandardParameters(
outputPrefix, shardTemplate, suffix),
outputPrefix, shardTemplate, suffix, false),
writableByteChannelFactory(compressionType));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ public PDone expand(PCollection<String> input) {
FilenamePolicy usedFilenamePolicy = getFilenamePolicy();
if (usedFilenamePolicy == null) {
usedFilenamePolicy = DefaultFilenamePolicy.constructUsingStandardParameters(
getFilenamePrefix(), getShardTemplate(), getFilenameSuffix());
getFilenamePrefix(), getShardTemplate(), getFilenameSuffix(), getWindowedWrites());
}
WriteFiles<String> write =
WriteFiles.to(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,8 @@ private void runTestWrite(String[] expectedElements, int numShards) throws IOExc
p.run();

String shardNameTemplate =
firstNonNull(write.getShardTemplate(), DefaultFilenamePolicy.DEFAULT_SHARD_TEMPLATE);
firstNonNull(write.getShardTemplate(),
DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);

assertTestOutputs(expectedElements, numShards, outputFilePrefix, shardNameTemplate);
}
Expand All @@ -493,7 +494,13 @@ public static void assertTestOutputs(
expectedFiles.add(
new File(
DefaultFilenamePolicy.constructName(
outputFilePrefix, shardNameTemplate, "" /* no suffix */, i, numShards)));
outputFilePrefix,
shardNameTemplate,
"" /* no suffix */,
i,
numShards,
null,
null)));
}

List<String> actualElements = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@
package org.apache.beam.sdk.io;

import static org.apache.beam.sdk.io.DefaultFilenamePolicy.constructName;
import static org.apache.beam.sdk.io.DefaultFilenamePolicy.isWindowedTemplate;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -36,36 +33,25 @@ public class DefaultFilenamePolicyTest {
@Test
public void testConstructName() {
assertEquals("output-001-of-123.txt",
constructName("output", "-SSS-of-NNN", ".txt", 1, 123));
constructName("output", "-SSS-of-NNN", ".txt", 1, 123, null, null));

assertEquals("out.txt/part-00042",
constructName("out.txt", "/part-SSSSS", "", 42, 100));
constructName("out.txt", "/part-SSSSS", "", 42, 100, null, null));

assertEquals("out.txt",
constructName("ou", "t.t", "xt", 1, 1));
constructName("ou", "t.t", "xt", 1, 1, null, null));

assertEquals("out0102shard.txt",
constructName("out", "SSNNshard", ".txt", 1, 2));
constructName("out", "SSNNshard", ".txt", 1, 2, null, null));

assertEquals("out-2/1.part-1-of-2.txt",
constructName("out", "-N/S.part-S-of-N", ".txt", 1, 2));
constructName("out", "-N/S.part-S-of-N", ".txt", 1, 2, null, null));
}

@Test
public void testConstructNameWithLargeShardCount() {
assertEquals("out-100-of-5000.txt",
constructName("out", "-SS-of-NN", ".txt", 100, 5000));
}

@Test
public void testIsWindowedTemplate(){
assertTrue(isWindowedTemplate("-SSS-of-NNN-P-W"));
assertTrue(isWindowedTemplate("-SSS-of-NNN-W"));
assertTrue(isWindowedTemplate("-SSS-of-NNN-P"));
assertTrue(isWindowedTemplate("W-SSS-of-NNN"));

assertFalse(isWindowedTemplate("-SSS-of-NNN"));
assertFalse(isWindowedTemplate("-SSS-of-lp"));
constructName("out", "-SS-of-NN", ".txt", 100, 5000, null, null));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,8 @@ private void runTestWrite(
p.run();

assertOutputFiles(elems, header, footer, numShards, baseDir, outputName,
firstNonNull(write.getShardTemplate(), DefaultFilenamePolicy.DEFAULT_SHARD_TEMPLATE));
firstNonNull(write.getShardTemplate(),
DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE));
}

public static void assertOutputFiles(
Expand All @@ -337,7 +338,7 @@ public static void assertOutputFiles(
new File(
rootLocation.toString(),
DefaultFilenamePolicy.constructName(
outputName, shardNameTemplate, "", i, numShards)));
outputName, shardNameTemplate, "", i, numShards, null, null)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class XmlSink<T> extends FileBasedSink<T> {

private static DefaultFilenamePolicy makeFilenamePolicy(XmlIO.Write<?> spec) {
return DefaultFilenamePolicy.constructUsingStandardParameters(
spec.getFilenamePrefix(), ShardNameTemplate.INDEX_OF_MAX, XML_EXTENSION);
spec.getFilenamePrefix(), ShardNameTemplate.INDEX_OF_MAX, XML_EXTENSION, false);
}

XmlSink(XmlIO.Write<T> spec) {
Expand Down