Skip to content

Commit

Permalink
This closes #2749
Browse files Browse the repository at this point in the history
  • Loading branch information
dhalperi committed Apr 28, 2017
2 parents 185dc47 + 34c3ee7 commit 47aaf11
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.google.api.client.util.Sleeper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.io.CharStreams;
import java.io.IOException;
Expand All @@ -32,7 +31,10 @@
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -49,11 +51,14 @@ public class ExplicitShardedFile implements ShardedFile {
.withInitialBackoff(DEFAULT_SLEEP_DURATION)
.withMaxRetries(MAX_READ_RETRIES);

private final Collection<String> files;
private final List<Metadata> files;

/** Constructs an {@link ExplicitShardedFile} for the given files. */
public ExplicitShardedFile(Collection<String> files) {
this.files = files;
public ExplicitShardedFile(Collection<String> files) throws IOException {
this.files = new LinkedList<>();
for (String file: files) {
this.files.add(FileSystems.matchSingleFileSpec(file));
}
}

@Override
Expand All @@ -63,13 +68,12 @@ public List<String> readFilesWithRetries(Sleeper sleeper, BackOff backOff)
return Collections.emptyList();
}

IOChannelFactory factory = IOChannelUtils.getFactory(Iterables.get(files, 0));
IOException lastException = null;

do {
try {
// Read data from file paths
return readLines(files, factory);
return readLines(files);
} catch (IOException e) {
// Ignore and retry
lastException = e;
Expand Down Expand Up @@ -104,11 +108,12 @@ public String toString() {
* than can be reasonably processed serially, in-memory, by a single thread.
*/
@VisibleForTesting
List<String> readLines(Collection<String> files, IOChannelFactory factory) throws IOException {
List<String> readLines(Collection<Metadata> files) throws IOException {
List<String> allLines = Lists.newArrayList();
int i = 1;
for (String file : files) {
try (Reader reader = Channels.newReader(factory.open(file), StandardCharsets.UTF_8.name())) {
for (Metadata file : files) {
try (Reader reader = Channels.newReader(FileSystems.open(file.resourceId()),
StandardCharsets.UTF_8.name())) {
List<String> lines = CharStreams.readLines(reader);
allLines.addAll(lines);
LOG.debug("[{} of {}] Read {} lines from file: {}", i, files.size(), lines.size(), file);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import com.google.api.client.util.Sleeper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.hash.HashCode;
import com.google.common.hash.Hashing;
Expand All @@ -34,14 +36,15 @@
import java.io.Reader;
import java.nio.channels.Channels;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nonnull;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -112,21 +115,22 @@ public String getFilePattern() {
@Override
public List<String> readFilesWithRetries(Sleeper sleeper, BackOff backOff)
throws IOException, InterruptedException {
IOChannelFactory factory = IOChannelUtils.getFactory(filePattern);
IOException lastException = null;

do {
try {
// Match inputPath which may contains glob
Collection<String> files = factory.match(filePattern);
Collection<Metadata> files = Arrays.asList(Iterables.getOnlyElement(
FileSystems.match(ImmutableList.of(filePattern))).metadata());

LOG.debug("Found {} file(s) by matching the path: {}", files.size(), filePattern);

if (files.isEmpty() || !checkTotalNumOfFiles(files)) {
continue;
}

// Read data from file paths
return readLines(files, factory);
return readLines(files);
} catch (IOException e) {
// Ignore and retry
lastException = e;
Expand Down Expand Up @@ -162,12 +166,13 @@ public String toString() {
* than can be reasonably processed serially, in-memory, by a single thread.
*/
@VisibleForTesting
List<String> readLines(Collection<String> files, IOChannelFactory factory) throws IOException {
List<String> readLines(Collection<Metadata> files) throws IOException {
List<String> allLines = Lists.newArrayList();
int i = 1;
for (String file : files) {
for (Metadata file : files) {
try (Reader reader =
Channels.newReader(factory.open(file), StandardCharsets.UTF_8.name())) {
Channels.newReader(FileSystems.open(file.resourceId()),
StandardCharsets.UTF_8.name())) {
List<String> lines = CharStreams.readLines(reader);
allLines.addAll(lines);
LOG.debug(
Expand All @@ -188,14 +193,16 @@ List<String> readLines(Collection<String> files, IOChannelFactory factory) throw
* of given files equals the number that is parsed from shard name.
*/
@VisibleForTesting
boolean checkTotalNumOfFiles(Collection<String> files) {
for (String filePath : files) {
Path fileName = Paths.get(filePath).getFileName();
boolean checkTotalNumOfFiles(Collection<Metadata> files) {
for (Metadata fileMedadata : files) {
String fileName = fileMedadata.resourceId().toString().substring(
fileMedadata.resourceId().getCurrentDirectory().toString().length());

if (fileName == null) {
// this path has zero elements
continue;
}
Matcher matcher = shardTemplate.matcher(fileName.toString());
Matcher matcher = shardTemplate.matcher(fileName);
if (!matcher.matches()) {
// shard name doesn't match the pattern, check with the next shard
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyCollection;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
Expand All @@ -33,7 +32,10 @@
import java.nio.charset.StandardCharsets;
import java.util.regex.Pattern;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.LocalResources;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
Expand All @@ -53,6 +55,13 @@ public class NumberedShardedFileTest {
@Mock private PipelineResult pResult = Mockito.mock(PipelineResult.class);

private final BackOff backOff = NumberedShardedFile.BACK_OFF_FACTORY.backoff();
private String filePattern;

@Before
public void setup() throws IOException {
filePattern = LocalResources.fromFile(tmpFolder.getRoot(), true).resolve(
"*", StandardResolveOptions.RESOLVE_FILE).toString();
}

@Test
public void testPreconditionFilePathIsNull() {
Expand Down Expand Up @@ -82,8 +91,9 @@ public void testReadMultipleShards() throws Exception {
Files.write(contents2, tmpFile2, StandardCharsets.UTF_8);
Files.write(contents3, tmpFile3, StandardCharsets.UTF_8);

NumberedShardedFile shardedFile =
new NumberedShardedFile(IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "result-*"));
filePattern = LocalResources.fromFile(tmpFolder.getRoot(), true).resolve(
"result-*", StandardResolveOptions.RESOLVE_FILE).toString();
NumberedShardedFile shardedFile = new NumberedShardedFile(filePattern);

assertThat(shardedFile.readFilesWithRetries(), containsInAnyOrder(contents1, contents2));
}
Expand All @@ -92,8 +102,7 @@ public void testReadMultipleShards() throws Exception {
public void testReadEmpty() throws Exception {
File emptyFile = tmpFolder.newFile("result-000-of-001");
Files.write("", emptyFile, StandardCharsets.UTF_8);
NumberedShardedFile shardedFile =
new NumberedShardedFile(IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*"));
NumberedShardedFile shardedFile = new NumberedShardedFile(filePattern);

assertThat(shardedFile.readFilesWithRetries(), empty());
}
Expand All @@ -110,9 +119,7 @@ public void testReadCustomTemplate() throws Exception {

Pattern customizedTemplate =
Pattern.compile("(?x) result (?<shardnum>\\d+) - total (?<numshards>\\d+)");
NumberedShardedFile shardedFile =
new NumberedShardedFile(
IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*"), customizedTemplate);
NumberedShardedFile shardedFile = new NumberedShardedFile(filePattern, customizedTemplate);

assertThat(shardedFile.readFilesWithRetries(), containsInAnyOrder(contents1, contents2));
}
Expand All @@ -122,10 +129,8 @@ public void testReadWithRetriesFailsWhenTemplateIncorrect() throws Exception {
File tmpFile = tmpFolder.newFile();
Files.write("Test for file checksum verifier.", tmpFile, StandardCharsets.UTF_8);

NumberedShardedFile shardedFile =
new NumberedShardedFile(
IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*"),
Pattern.compile("incorrect-template"));
NumberedShardedFile shardedFile = new NumberedShardedFile(filePattern,
Pattern.compile("incorrect-template"));

thrown.expect(IOException.class);
thrown.expectMessage(
Expand All @@ -138,12 +143,10 @@ public void testReadWithRetriesFailsWhenTemplateIncorrect() throws Exception {
public void testReadWithRetriesFailsSinceFilesystemError() throws Exception {
File tmpFile = tmpFolder.newFile();
Files.write("Test for file checksum verifier.", tmpFile, StandardCharsets.UTF_8);

NumberedShardedFile shardedFile =
spy(new NumberedShardedFile(IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*")));
NumberedShardedFile shardedFile = spy(new NumberedShardedFile(filePattern));
doThrow(IOException.class)
.when(shardedFile)
.readLines(anyCollection(), any(IOChannelFactory.class));
.readLines(anyCollection());

thrown.expect(IOException.class);
thrown.expectMessage(
Expand All @@ -154,8 +157,7 @@ public void testReadWithRetriesFailsSinceFilesystemError() throws Exception {

@Test
public void testReadWithRetriesFailsWhenOutputDirEmpty() throws Exception {
NumberedShardedFile shardedFile =
new NumberedShardedFile(IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*"));
NumberedShardedFile shardedFile = new NumberedShardedFile(filePattern);

thrown.expect(IOException.class);
thrown.expectMessage(
Expand All @@ -169,8 +171,7 @@ public void testReadWithRetriesFailsWhenRedundantFileLoaded() throws Exception {
tmpFolder.newFile("result-000-of-001");
tmpFolder.newFile("tmp-result-000-of-001");

NumberedShardedFile shardedFile =
new NumberedShardedFile(IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*"));
NumberedShardedFile shardedFile = new NumberedShardedFile(filePattern);

thrown.expect(IOException.class);
thrown.expectMessage(
Expand Down

0 comments on commit 47aaf11

Please sign in to comment.