Skip to content

Commit

Permalink
This closes #2724
Browse files Browse the repository at this point in the history
  • Loading branch information
dhalperi committed Apr 27, 2017
2 parents 1babed2 + 235a79a commit 0c740f4
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,16 @@
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Path;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.zip.GZIPOutputStream;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
Expand Down Expand Up @@ -104,6 +108,46 @@
*/
public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
private static final Logger LOG = LoggerFactory.getLogger(FileBasedSink.class);
// Pattern that matches shard placeholders within a shard template.
private static final Pattern SHARD_FORMAT_RE = Pattern.compile("(S+|N+)");

/**
* Constructs a fully qualified name from components.
*
* <p>The name is built from a prefix, shard template (with shard numbers
* applied), and a suffix. All components are required, but may be empty
* strings.
*
* <p>Within a shard template, repeating sequences of the letters "S" or "N"
* are replaced with the shard number, or number of shards respectively. The
* numbers are formatted with leading zeros to match the length of the
* repeated sequence of letters.
*
* <p>For example, if prefix = "output", shardTemplate = "-SSS-of-NNN", and
* suffix = ".txt", with shardNum = 1 and numShards = 100, the following is
* produced: "output-001-of-100.txt".
*/
public static String constructName(String prefix,
String shardTemplate, String suffix, int shardNum, int numShards) {
// Matcher API works with StringBuffer, rather than StringBuilder.
StringBuffer sb = new StringBuffer();
sb.append(prefix);

Matcher m = SHARD_FORMAT_RE.matcher(shardTemplate);
while (m.find()) {
boolean isShardNum = (m.group(1).charAt(0) == 'S');

char[] zeros = new char[m.end() - m.start()];
Arrays.fill(zeros, '0');
DecimalFormat df = new DecimalFormat(String.valueOf(zeros));
String formatted = df.format(isShardNum ? shardNum : numShards);
m.appendReplacement(sb, formatted);
}
m.appendTail(sb);

sb.append(suffix);
return sb.toString();
}

/**
* Directly supported file output compression types.
Expand Down Expand Up @@ -301,7 +345,7 @@ public String unwindowedFilename(FilenamePolicy.Context context) {
}

String suffix = getFileExtension(extension);
String filename = IOChannelUtils.constructName(
String filename = constructName(
baseOutputFilename.get(), fileNamingTemplate, suffix, context.getShardNumber(),
context.getNumShards());
return filename;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
import java.io.IOException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.text.DecimalFormat;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -43,6 +41,7 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nonnull;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.common.ReflectHelpers;

Expand All @@ -54,9 +53,6 @@ public class IOChannelUtils {
private static final Map<String, IOChannelFactory> FACTORY_MAP =
Collections.synchronizedMap(new HashMap<String, IOChannelFactory>());

// Pattern that matches shard placeholders within a shard template.
private static final Pattern SHARD_FORMAT_RE = Pattern.compile("(S+|N+)");

private static final ClassLoader CLASS_LOADER = ReflectHelpers.findClassLoader();

/**
Expand Down Expand Up @@ -201,7 +197,7 @@ public static ReadableByteChannel open(String filename)
public static WritableByteChannel create(String prefix, String shardTemplate,
String suffix, int numShards, String mimeType) throws IOException {
if (numShards == 1) {
return create(constructName(prefix, shardTemplate, suffix, 0, 1),
return create(FileBasedSink.constructName(prefix, shardTemplate, suffix, 0, 1),
mimeType);
}

Expand All @@ -213,7 +209,7 @@ public static WritableByteChannel create(String prefix, String shardTemplate,
Set<String> outputNames = new HashSet<>();
for (int i = 0; i < numShards; i++) {
String outputName =
constructName(prefix, shardTemplate, suffix, i, numShards);
FileBasedSink.constructName(prefix, shardTemplate, suffix, i, numShards);
if (!outputNames.add(outputName)) {
throw new IllegalArgumentException(
"Shard name collision detected for: " + outputName);
Expand All @@ -236,46 +232,6 @@ public static long getSizeBytes(String spec) throws IOException {
return getFactory(spec).getSizeBytes(spec);
}

/**
* Constructs a fully qualified name from components.
*
* <p>The name is built from a prefix, shard template (with shard numbers
* applied), and a suffix. All components are required, but may be empty
* strings.
*
* <p>Within a shard template, repeating sequences of the letters "S" or "N"
* are replaced with the shard number, or number of shards respectively. The
* numbers are formatted with leading zeros to match the length of the
* repeated sequence of letters.
*
* <p>For example, if prefix = "output", shardTemplate = "-SSS-of-NNN", and
* suffix = ".txt", with shardNum = 1 and numShards = 100, the following is
* produced: "output-001-of-100.txt".
*/
public static String constructName(String prefix,
String shardTemplate, String suffix, int shardNum, int numShards) {
// Matcher API works with StringBuffer, rather than StringBuilder.
StringBuffer sb = new StringBuffer();
sb.append(prefix);

Matcher m = SHARD_FORMAT_RE.matcher(shardTemplate);
while (m.find()) {
boolean isShardNum = (m.group(1).charAt(0) == 'S');

char[] zeros = new char[m.end() - m.start()];
Arrays.fill(zeros, '0');
DecimalFormat df = new DecimalFormat(String.valueOf(zeros));
String formatted = df.format(isShardNum
? shardNum
: numShards);
m.appendReplacement(sb, formatted);
}
m.appendTail(sb);

sb.append(suffix);
return sb.toString();
}

private static final Pattern URI_SCHEME_PATTERN = Pattern.compile(
"(?<scheme>[a-zA-Z][-a-zA-Z0-9+.]*)://.*");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ public static void assertTestOutputs(
for (int i = 0; i < numShards; i++) {
expectedFiles.add(
new File(
IOChannelUtils.constructName(
FileBasedSink.constructName(
outputFilePrefix, shardNameTemplate, "" /* no suffix */, i, numShards)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
*/
package org.apache.beam.sdk.io;

import static org.apache.beam.sdk.io.FileBasedSink.constructName;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import com.google.common.collect.Lists;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
Expand All @@ -45,7 +45,6 @@
import java.util.List;
import java.util.Map;
import java.util.zip.GZIPInputStream;

import org.apache.beam.sdk.io.FileBasedSink.CompressionType;
import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation;
import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriter;
Expand Down Expand Up @@ -87,6 +86,30 @@ private String getBaseTempDirectory() {
return appendToTempFolder(tempDirectory);
}

@Test
public void testConstructName() {
assertEquals("output-001-of-123.txt",
constructName("output", "-SSS-of-NNN", ".txt", 1, 123));

assertEquals("out.txt/part-00042",
constructName("out.txt", "/part-SSSSS", "", 42, 100));

assertEquals("out.txt",
constructName("ou", "t.t", "xt", 1, 1));

assertEquals("out0102shard.txt",
constructName("out", "SSNNshard", ".txt", 1, 2));

assertEquals("out-2/1.part-1-of-2.txt",
constructName("out", "-N/S.part-S-of-N", ".txt", 1, 2));
}

@Test
public void testConstructNameWithLargeShardCount() {
assertEquals("out-100-of-5000.txt",
constructName("out", "-SS-of-NN", ".txt", 100, 5000));
}

/**
* FileBasedWriter opens the correct file, writes the header, footer, and elements in the
* correct order, and returns the correct filename.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ public static void assertOutputFiles(
expectedFiles.add(
new File(
rootLocation.toString(),
IOChannelUtils.constructName(outputName, shardNameTemplate, "", i, numShards)));
FileBasedSink.constructName(outputName, shardNameTemplate, "", i, numShards)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.io.File;
import java.nio.charset.StandardCharsets;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
Expand All @@ -47,28 +46,6 @@ public class IOChannelUtilsTest {
@Rule
public ExpectedException thrown = ExpectedException.none();

@Test
public void testShardFormatExpansion() {
assertEquals("output-001-of-123.txt",
IOChannelUtils.constructName("output", "-SSS-of-NNN",
".txt",
1, 123));

assertEquals("out.txt/part-00042",
IOChannelUtils.constructName("out.txt", "/part-SSSSS", "",
42, 100));

assertEquals("out.txt",
IOChannelUtils.constructName("ou", "t.t", "xt", 1, 1));

assertEquals("out0102shard.txt",
IOChannelUtils.constructName("out", "SSNNshard", ".txt", 1, 2));

assertEquals("out-2/1.part-1-of-2.txt",
IOChannelUtils.constructName("out", "-N/S.part-S-of-N",
".txt", 1, 2));
}

@Test(expected = IllegalArgumentException.class)
public void testShardNameCollision() throws Exception {
File outFolder = tmpFolder.newFolder();
Expand All @@ -79,13 +56,6 @@ public void testShardNameCollision() throws Exception {
+ "to filename collision");
}

@Test
public void testLargeShardCount() {
Assert.assertEquals("out-100-of-5000.txt",
IOChannelUtils.constructName("out", "-SS-of-NN", ".txt",
100, 5000));
}

@Test
public void testHandlerNoScheme() throws Exception {
String pathToTempFolder = tmpFolder.getRoot().getAbsolutePath();
Expand Down

0 comments on commit 0c740f4

Please sign in to comment.