From fac751c008189a57093f407bca46917450daad8b Mon Sep 17 00:00:00 2001 From: Mark Liu Date: Tue, 25 Oct 2016 14:57:13 -0700 Subject: [PATCH 1/3] [BEAM-747] Fix FileChecksumMatcher That Inconsistent With FS --- .../org/apache/beam/examples/WordCountIT.java | 2 +- .../beam/sdk/testing/FileChecksumMatcher.java | 142 ++++++++++++++---- .../sdk/testing/FileChecksumMatcherTest.java | 100 +++++++++++- 3 files changed, 212 insertions(+), 32 deletions(-) diff --git a/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java index f2afe6aefbda6..01438de336b0a 100644 --- a/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java +++ b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java @@ -65,7 +65,7 @@ public void testE2EWordCount() throws Exception { "output", "results")); options.setOnSuccessMatcher( - new FileChecksumMatcher(DEFAULT_OUTPUT_CHECKSUM, options.getOutput() + "*")); + new FileChecksumMatcher(DEFAULT_OUTPUT_CHECKSUM, options.getOutput() + "*-of-*")); WordCount.main(TestPipeline.convertToArgs(options)); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java index de6cea383e8c7..b1d88d872d227 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java @@ -20,7 +20,12 @@ import static com.google.common.base.Preconditions.checkArgument; +import com.google.api.client.util.BackOff; +import com.google.api.client.util.BackOffUtils; +import com.google.api.client.util.Sleeper; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; +import com.google.common.collect.Lists; import com.google.common.hash.HashCode; import com.google.common.hash.Hashing; import com.google.common.io.CharStreams; @@ -28,14 +33,21 @@ import java.io.Reader; import java.nio.channels.Channels; import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import javax.annotation.Nonnull; import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.sdk.util.IOChannelFactory; import org.apache.beam.sdk.util.IOChannelUtils; import org.hamcrest.Description; import org.hamcrest.TypeSafeMatcher; +import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,20 +55,41 @@ * Matcher to verify file checksum in E2E test. * *

For example: - *

{@code [
- *   assertTrue(job, new FileChecksumMatcher(checksumString, filePath));
- * ]}
+ *
{@code
+ *   assertThat(job, new FileChecksumMatcher(checksumString, filePath));
+ * }
+ * or + *
{@code
+ *   assertThat(job, new FileChecksumMatcher(checksumString, filePath, shardTemplate));
+ * }
+ * + *

Checksum of outputs is generated based on SHA-1 algorithm. If output file is empty, + * SHA-1 hash of empty string (da39a3ee5e6b4b0d3255bfef95601890afd80709) is used as expected. */ public class FileChecksumMatcher extends TypeSafeMatcher implements SerializableMatcher { private static final Logger LOG = LoggerFactory.getLogger(FileChecksumMatcher.class); + static final int MAX_READ_RETRIES = 4; + static final Duration DEFAULT_SLEEP_DURATION = Duration.standardSeconds(10L); + static final FluentBackoff BACK_OFF_FACTORY = + FluentBackoff.DEFAULT + .withInitialBackoff(DEFAULT_SLEEP_DURATION) + .withMaxRetries(MAX_READ_RETRIES); + + private static final String DEFAULT_SHARD_TEMPLATE = "\\S*\\d+-of-(\\d+)$"; + private final String expectedChecksum; private final String filePath; + private final Pattern shardTemplate; private String actualChecksum; public FileChecksumMatcher(String checksum, String filePath) { + this(checksum, filePath, null); + } + + public FileChecksumMatcher(String checksum, String filePath, String shardTemplate) { checkArgument( !Strings.isNullOrEmpty(checksum), "Expected valid checksum, but received %s", checksum); @@ -66,48 +99,105 @@ public FileChecksumMatcher(String checksum, String filePath) { this.expectedChecksum = checksum; this.filePath = filePath; + this.shardTemplate = + Pattern.compile(shardTemplate == null ? DEFAULT_SHARD_TEMPLATE : shardTemplate); } @Override public boolean matchesSafely(PipelineResult pipelineResult) { + // Load output data + List outputs; try { - // Load output data - List outputs = readLines(filePath); + outputs = readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff()); + } catch (Exception e) { + throw new RuntimeException(String.format("Failed to read from: %s", filePath), e); + } - // Verify outputs. Checksum is computed using SHA-1 algorithm - actualChecksum = hashing(outputs); - LOG.info("Generated checksum for output data: {}", actualChecksum); + // Verify outputs. Checksum is computed using SHA-1 algorithm + actualChecksum = computeHash(outputs); + LOG.info("Generated checksum: {}", actualChecksum); - return actualChecksum.equals(expectedChecksum); - } catch (IOException e) { - throw new RuntimeException( - String.format("Failed to read from path: %s", filePath)); - } + return actualChecksum.equals(expectedChecksum); } - private List readLines(String path) throws IOException { - List readData = new ArrayList<>(); - IOChannelFactory factory = IOChannelUtils.getFactory(path); - - // Match inputPath which may contains glob - Collection files = factory.match(path); + @VisibleForTesting + List readFilesWithRetries(Sleeper sleeper, BackOff backOff) + throws IOException, InterruptedException { + IOChannelFactory factory = IOChannelUtils.getFactory(filePath); + IOException lastException = null; + + do { + try { + // Match inputPath which may contains glob + Collection files = factory.match(filePath); + LOG.info("Found {} file(s) by matching the path: {}", files.size(), filePath); + + if (files.isEmpty() || !checkTotalNumOfFiles(files)) { + continue; + } + + // Read data from file paths + return readLines(files, factory); + } catch (IOException e) { + // Ignore and retry + lastException = e; + LOG.warn("Error in file reading. Ignore and retry."); + } + } while(BackOffUtils.next(sleeper, backOff)); + // Failed after max retries + throw new IOException( + String.format("Unable to read file(s) after retrying %d times", MAX_READ_RETRIES), + lastException); + } - // Read data from file paths - int i = 0; + @VisibleForTesting + List readLines(Collection files, IOChannelFactory factory) throws IOException { + List allLines = Lists.newArrayList(); + int i = 1; for (String file : files) { try (Reader reader = - Channels.newReader(factory.open(file), StandardCharsets.UTF_8.name())) { + Channels.newReader(factory.open(file), StandardCharsets.UTF_8.name())) { List lines = CharStreams.readLines(reader); - readData.addAll(lines); + allLines.addAll(lines); LOG.info( - "[{} of {}] Read {} lines from file: {}", i, files.size() - 1, lines.size(), file); + "[{} of {}] Read {} lines from file: {}", i, files.size(), lines.size(), file); } i++; } - return readData; + return allLines; + } + + /** + * Check if total number of files is correct by comparing with the number that + * is parsed from shard name using a name template. If no template are specified, + * "SSSS-of-NNNN" will be used as default, and "NNNN" will be the expected total + * number of files. + */ + @VisibleForTesting + boolean checkTotalNumOfFiles(Collection files) { + for (String filePath : files.toArray(new String[0])) { + Path fileName = Paths.get(filePath).getFileName(); + if (fileName == null) { + // this path has zero elements + continue; + } + Matcher matcher = shardTemplate.matcher(fileName.toString()); + if (!matcher.matches()) { + // shard name doesn't match the pattern, check with the next shard + continue; + } + // once match, extract total number of shards and compare to file list + return files.size() == Integer.parseInt(matcher.group(1)); + } + LOG.warn("No name matches the shard template: {}", shardTemplate); + return false; } - private String hashing(List strs) { + private String computeHash(@Nonnull List strs) { + if (strs.isEmpty()) { + return Hashing.sha1().hashString("", StandardCharsets.UTF_8).toString(); + } + List hashCodes = new ArrayList<>(); for (String str : strs) { hashCodes.add(Hashing.sha1().hashString(str, StandardCharsets.UTF_8)); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java index b2f2ec853573a..bf57ee41663a9 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java @@ -19,12 +19,18 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyCollection; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.spy; +import com.google.api.client.util.BackOff; import com.google.common.io.Files; import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.util.IOChannelFactory; import org.apache.beam.sdk.util.IOChannelUtils; import org.junit.Rule; import org.junit.Test; @@ -42,10 +48,14 @@ public class FileChecksumMatcherTest { public TemporaryFolder tmpFolder = new TemporaryFolder(); @Rule public ExpectedException thrown = ExpectedException.none(); + @Rule + public FastNanoClockAndSleeper fastClock = new FastNanoClockAndSleeper(); @Mock private PipelineResult pResult = Mockito.mock(PipelineResult.class); + private BackOff backOff = FileChecksumMatcher.BACK_OFF_FACTORY.backoff(); + @Test public void testPreconditionChecksumIsNull() throws IOException { String tmpPath = tmpFolder.newFile().getPath(); @@ -79,8 +89,8 @@ public void testPreconditionFilePathIsEmpty() { } @Test - public void testMatcherVerifySingleFile() throws IOException{ - File tmpFile = tmpFolder.newFile(); + public void testMatcherThatVerifiesSingleFile() throws IOException{ + File tmpFile = tmpFolder.newFile("result-000-of-001"); Files.write("Test for file checksum verifier.", tmpFile, StandardCharsets.UTF_8); FileChecksumMatcher matcher = new FileChecksumMatcher("a8772322f5d7b851777f820fc79d050f9d302915", tmpFile.getPath()); @@ -89,16 +99,96 @@ public void testMatcherVerifySingleFile() throws IOException{ } @Test - public void testMatcherVerifyMultipleFilesInOneDir() throws IOException { - File tmpFile1 = tmpFolder.newFile(); - File tmpFile2 = tmpFolder.newFile(); + public void testMatcherThatVerifiesMultipleFiles() throws IOException { + File tmpFile1 = tmpFolder.newFile("result-000-of-002"); + File tmpFile2 = tmpFolder.newFile("result-001-of-002"); + File tmpFile3 = tmpFolder.newFile("tmp"); Files.write("To be or not to be, ", tmpFile1, StandardCharsets.UTF_8); Files.write("it is not a question.", tmpFile2, StandardCharsets.UTF_8); + Files.write("tmp", tmpFile3, StandardCharsets.UTF_8); + FileChecksumMatcher matcher = new FileChecksumMatcher( "90552392c28396935fe4f123bd0b5c2d0f6260c8", + IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "result-*")); + + assertThat(pResult, matcher); + } + + @Test + public void testMatcherThatVerifiesFileWithEmptyContent() throws IOException { + File emptyFile = tmpFolder.newFile("result-000-of-001"); + Files.write("", emptyFile, StandardCharsets.UTF_8); + FileChecksumMatcher matcher = + new FileChecksumMatcher( + "da39a3ee5e6b4b0d3255bfef95601890afd80709", IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*")); assertThat(pResult, matcher); } + + @Test + public void testReadWithRetriesFailsWhenTemplateIncorrect() + throws IOException, InterruptedException { + File tmpFile = tmpFolder.newFile(); + Files.write("Test for file checksum verifier.", tmpFile, StandardCharsets.UTF_8); + + FileChecksumMatcher matcher = new FileChecksumMatcher( + "mock-checksum", + IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*"), + "incorrect-template"); + + thrown.expect(IOException.class); + thrown.expectMessage( + containsString( + "Unable to read file(s) after retrying " + FileChecksumMatcher.MAX_READ_RETRIES)); + matcher.readFilesWithRetries(fastClock, backOff); + } + + @Test + public void testReadWithRetriesFailsSinceFilesystemError() throws Exception { + File tmpFile = tmpFolder.newFile(); + Files.write("Test for file checksum verifier.", tmpFile, StandardCharsets.UTF_8); + + FileChecksumMatcher matcher = + spy(new FileChecksumMatcher( + "mock-checksum", IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*"))); + doThrow(IOException.class) + .when(matcher).readLines(anyCollection(), any(IOChannelFactory.class)); + + thrown.expect(IOException.class); + thrown.expectMessage( + containsString( + "Unable to read file(s) after retrying " + FileChecksumMatcher.MAX_READ_RETRIES)); + matcher.readFilesWithRetries(fastClock, backOff); + } + + @Test + public void testReadWithRetriesFailsWhenOutputDirEmpty() throws Exception { + FileChecksumMatcher matcher = + new FileChecksumMatcher( + "mock-checksum", IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*")); + + thrown.expect(IOException.class); + thrown.expectMessage( + containsString( + "Unable to read file(s) after retrying " + FileChecksumMatcher.MAX_READ_RETRIES)); + matcher.readFilesWithRetries(fastClock, backOff); + } + + @Test + public void testReadWithRetriesFailsWhenRedundantFileLoaded() throws Exception { + tmpFolder.newFile("result-000-of-001"); + tmpFolder.newFile("tmp-result-000-of-001"); + + FileChecksumMatcher matcher = + new FileChecksumMatcher( + "mock-checksum", IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*")); + + thrown.expect(IOException.class); + thrown.expectMessage( + containsString( + "Unable to read file(s) after retrying " + FileChecksumMatcher.MAX_READ_RETRIES)); + matcher.readFilesWithRetries(fastClock, backOff); + } } From 5f26347dacffd74b65d66db0f9fff66498aca970 Mon Sep 17 00:00:00 2001 From: Mark Liu Date: Mon, 7 Nov 2016 15:42:03 -0800 Subject: [PATCH 2/3] fixup! Add More Unit Tests --- .../beam/sdk/testing/FileChecksumMatcher.java | 4 ++-- .../sdk/testing/FileChecksumMatcherTest.java | 20 +++++++++++++++++-- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java index b1d88d872d227..18ba498588694 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java @@ -86,7 +86,7 @@ public class FileChecksumMatcher extends TypeSafeMatcher private String actualChecksum; public FileChecksumMatcher(String checksum, String filePath) { - this(checksum, filePath, null); + this(checksum, filePath, DEFAULT_SHARD_TEMPLATE); } public FileChecksumMatcher(String checksum, String filePath, String shardTemplate) { @@ -169,7 +169,7 @@ List readLines(Collection files, IOChannelFactory factory) throw /** * Check if total number of files is correct by comparing with the number that - * is parsed from shard name using a name template. If no template are specified, + * is parsed from shard name using a name template. If no template is specified, * "SSSS-of-NNNN" will be used as default, and "NNNN" will be the expected total * number of files. */ diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java index bf57ee41663a9..952edca5d1aff 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java @@ -128,8 +128,24 @@ public void testMatcherThatVerifiesFileWithEmptyContent() throws IOException { } @Test - public void testReadWithRetriesFailsWhenTemplateIncorrect() - throws IOException, InterruptedException { + public void testMatcherThatUsesCustomizedTemplate() throws Exception { + // Customized template: resultSSS-totalNNN + File tmpFile1 = tmpFolder.newFile("result0-total2"); + File tmpFile2 = tmpFolder.newFile("result1-total2"); + Files.write("To be or not to be, ", tmpFile1, StandardCharsets.UTF_8); + Files.write("it is not a question.", tmpFile2, StandardCharsets.UTF_8); + + String customizedTemplate = "result\\d+-total(\\d+)$"; + FileChecksumMatcher matcher = new FileChecksumMatcher( + "90552392c28396935fe4f123bd0b5c2d0f6260c8", + IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*"), + customizedTemplate); + + assertThat(pResult, matcher); + } + + @Test + public void testReadWithRetriesFailsWhenTemplateIncorrect() throws Exception { File tmpFile = tmpFolder.newFile(); Files.write("Test for file checksum verifier.", tmpFile, StandardCharsets.UTF_8); From ae339c1282d9c1c2939c1ab91dc2804c6ebc74fe Mon Sep 17 00:00:00 2001 From: Mark Liu Date: Tue, 29 Nov 2016 14:26:37 -0800 Subject: [PATCH 3/3] fixup! Improve shard template and javadoc --- .../beam/sdk/testing/FileChecksumMatcher.java | 44 ++++++++++++++----- .../sdk/testing/FileChecksumMatcherTest.java | 19 +++++++- 2 files changed, 51 insertions(+), 12 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java index 18ba498588694..4b249fed38c56 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.testing; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; import com.google.api.client.util.BackOff; import com.google.api.client.util.BackOffUtils; @@ -78,29 +79,50 @@ public class FileChecksumMatcher extends TypeSafeMatcher .withInitialBackoff(DEFAULT_SLEEP_DURATION) .withMaxRetries(MAX_READ_RETRIES); - private static final String DEFAULT_SHARD_TEMPLATE = "\\S*\\d+-of-(\\d+)$"; + private static final Pattern DEFAULT_SHARD_TEMPLATE = + Pattern.compile("(?x) \\S* (? \\d+) -of- (? \\d+)"); private final String expectedChecksum; private final String filePath; private final Pattern shardTemplate; private String actualChecksum; + /** + * Constructor that uses default shard template. + * + * @param checksum expected checksum string used to verify file content. + * @param filePath path of files that's to be verified. + */ public FileChecksumMatcher(String checksum, String filePath) { this(checksum, filePath, DEFAULT_SHARD_TEMPLATE); } - public FileChecksumMatcher(String checksum, String filePath, String shardTemplate) { + /** + * Constructor. + * + * @param checksum expected checksum string used to verify file content. + * @param filePath path of files that's to be verified. + * @param shardTemplate template of shard name to parse out the total number of shards + * which is used in I/O retry to avoid inconsistency of filesystem. + * Customized template should assign name "numshards" to capturing + * group - total shard number. + */ + public FileChecksumMatcher(String checksum, String filePath, Pattern shardTemplate) { checkArgument( !Strings.isNullOrEmpty(checksum), "Expected valid checksum, but received %s", checksum); checkArgument( !Strings.isNullOrEmpty(filePath), "Expected valid file path, but received %s", filePath); + checkNotNull( + shardTemplate, + "Expected non-null shard pattern. " + + "Please call the other constructor to use default pattern: %s", + DEFAULT_SHARD_TEMPLATE); this.expectedChecksum = checksum; this.filePath = filePath; - this.shardTemplate = - Pattern.compile(shardTemplate == null ? DEFAULT_SHARD_TEMPLATE : shardTemplate); + this.shardTemplate = shardTemplate; } @Override @@ -115,7 +137,7 @@ public boolean matchesSafely(PipelineResult pipelineResult) { // Verify outputs. Checksum is computed using SHA-1 algorithm actualChecksum = computeHash(outputs); - LOG.info("Generated checksum: {}", actualChecksum); + LOG.debug("Generated checksum: {}", actualChecksum); return actualChecksum.equals(expectedChecksum); } @@ -130,7 +152,7 @@ List readFilesWithRetries(Sleeper sleeper, BackOff backOff) try { // Match inputPath which may contains glob Collection files = factory.match(filePath); - LOG.info("Found {} file(s) by matching the path: {}", files.size(), filePath); + LOG.debug("Found {} file(s) by matching the path: {}", files.size(), filePath); if (files.isEmpty() || !checkTotalNumOfFiles(files)) { continue; @@ -159,7 +181,7 @@ List readLines(Collection files, IOChannelFactory factory) throw Channels.newReader(factory.open(file), StandardCharsets.UTF_8.name())) { List lines = CharStreams.readLines(reader); allLines.addAll(lines); - LOG.info( + LOG.debug( "[{} of {}] Read {} lines from file: {}", i, files.size(), lines.size(), file); } i++; @@ -172,10 +194,13 @@ List readLines(Collection files, IOChannelFactory factory) throw * is parsed from shard name using a name template. If no template is specified, * "SSSS-of-NNNN" will be used as default, and "NNNN" will be the expected total * number of files. + * + * @return {@code true} if at least one shard name matches template and total number + * of given files equals the number that is parsed from shard name. */ @VisibleForTesting boolean checkTotalNumOfFiles(Collection files) { - for (String filePath : files.toArray(new String[0])) { + for (String filePath : files) { Path fileName = Paths.get(filePath).getFileName(); if (fileName == null) { // this path has zero elements @@ -187,9 +212,8 @@ boolean checkTotalNumOfFiles(Collection files) { continue; } // once match, extract total number of shards and compare to file list - return files.size() == Integer.parseInt(matcher.group(1)); + return files.size() == Integer.parseInt(matcher.group("numshards")); } - LOG.warn("No name matches the shard template: {}", shardTemplate); return false; } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java index 952edca5d1aff..0dc307d528c29 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java @@ -29,6 +29,8 @@ import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.regex.Pattern; + import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.util.IOChannelFactory; import org.apache.beam.sdk.util.IOChannelUtils; @@ -88,6 +90,18 @@ public void testPreconditionFilePathIsEmpty() { new FileChecksumMatcher("checksumString", ""); } + @Test + public void testPreconditionShardTemplateIsNull() throws IOException { + String tmpPath = tmpFolder.newFile().getPath(); + + thrown.expect(NullPointerException.class); + thrown.expectMessage( + containsString( + "Expected non-null shard pattern. " + + "Please call the other constructor to use default pattern:")); + new FileChecksumMatcher("checksumString", tmpPath, null); + } + @Test public void testMatcherThatVerifiesSingleFile() throws IOException{ File tmpFile = tmpFolder.newFile("result-000-of-001"); @@ -135,7 +149,8 @@ public void testMatcherThatUsesCustomizedTemplate() throws Exception { Files.write("To be or not to be, ", tmpFile1, StandardCharsets.UTF_8); Files.write("it is not a question.", tmpFile2, StandardCharsets.UTF_8); - String customizedTemplate = "result\\d+-total(\\d+)$"; + Pattern customizedTemplate = + Pattern.compile("(?x) result (?\\d+) - total (?\\d+)"); FileChecksumMatcher matcher = new FileChecksumMatcher( "90552392c28396935fe4f123bd0b5c2d0f6260c8", IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*"), @@ -152,7 +167,7 @@ public void testReadWithRetriesFailsWhenTemplateIncorrect() throws Exception { FileChecksumMatcher matcher = new FileChecksumMatcher( "mock-checksum", IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*"), - "incorrect-template"); + Pattern.compile("incorrect-template")); thrown.expect(IOException.class); thrown.expectMessage(