From 9678b1cc6c799767e48ebc4d9071db099b4d135d Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 3 Nov 2016 20:44:45 -0700 Subject: [PATCH 1/3] Add IntervalWindow coder to the standard registry --- .../src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java index 9110de058af6a..65f4209acfb16 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java @@ -40,6 +40,7 @@ import org.apache.beam.sdk.coders.CannotProvideCoderException.ReasonCode; import org.apache.beam.sdk.coders.protobuf.ProtoCoder; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TimestampedValue; @@ -107,6 +108,7 @@ public void registerStandardCoders() { registerCoder(TimestampedValue.class, TimestampedValue.TimestampedValueCoder.class); registerCoder(Void.class, VoidCoder.class); registerCoder(byte[].class, ByteArrayCoder.class); + registerCoder(IntervalWindow.class, IntervalWindow.getCoder()); } /** From db41940f977bf3315ea7e5460d188d8f9b4fa119 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Mon, 5 Dec 2016 14:32:12 -0800 Subject: [PATCH 2/3] Factor out ShardedFile from FileChecksumMatcher --- .../beam/sdk/testing/FileChecksumMatcher.java | 114 ++------- .../beam/sdk/util/ExplicitShardedFile.java | 120 ++++++++++ .../beam/sdk/util/NumberedShardedFile.java | 220 ++++++++++++++++++ .../org/apache/beam/sdk/util/ShardedFile.java | 42 ++++ .../sdk/testing/FileChecksumMatcherTest.java | 77 ------ .../sdk/util/NumberedShardedFileTest.java | 181 ++++++++++++++ 6 files changed, 581 insertions(+), 173 deletions(-) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExplicitShardedFile.java create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/util/ShardedFile.java create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/util/NumberedShardedFileTest.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java index 4b249fed38c56..82a6b71176369 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java @@ -21,31 +21,19 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; -import com.google.api.client.util.BackOff; -import com.google.api.client.util.BackOffUtils; import com.google.api.client.util.Sleeper; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; -import com.google.common.collect.Lists; import com.google.common.hash.HashCode; import com.google.common.hash.Hashing; -import com.google.common.io.CharStreams; -import java.io.IOException; -import java.io.Reader; -import java.nio.channels.Channels; import java.nio.charset.StandardCharsets; -import java.nio.file.Path; -import java.nio.file.Paths; import java.util.ArrayList; -import java.util.Collection; import java.util.List; -import java.util.regex.Matcher; import java.util.regex.Pattern; import javax.annotation.Nonnull; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.util.FluentBackoff; -import org.apache.beam.sdk.util.IOChannelFactory; -import org.apache.beam.sdk.util.IOChannelUtils; +import org.apache.beam.sdk.util.NumberedShardedFile; +import org.apache.beam.sdk.util.ShardedFile; import org.hamcrest.Description; import org.hamcrest.TypeSafeMatcher; import org.joda.time.Duration; @@ -83,9 +71,8 @@ public class FileChecksumMatcher extends TypeSafeMatcher Pattern.compile("(?x) \\S* (? \\d+) -of- (? \\d+)"); private final String expectedChecksum; - private final String filePath; - private final Pattern shardTemplate; private String actualChecksum; + private final ShardedFile shardedFile; /** * Constructor that uses default shard template. @@ -98,7 +85,7 @@ public FileChecksumMatcher(String checksum, String filePath) { } /** - * Constructor. + * Constructor using a custom shard template. * * @param checksum expected checksum string used to verify file content. * @param filePath path of files that's to be verified. @@ -121,8 +108,17 @@ public FileChecksumMatcher(String checksum, String filePath, Pattern shardTempla DEFAULT_SHARD_TEMPLATE); this.expectedChecksum = checksum; - this.filePath = filePath; - this.shardTemplate = shardTemplate; + this.shardedFile = new NumberedShardedFile(filePath, shardTemplate); + } + + /** + * Constructor using an entirely custom {@link ShardedFile} implementation. + * + *

For internal use only. + */ + public FileChecksumMatcher(String expectedChecksum, ShardedFile shardedFile) { + this.expectedChecksum = expectedChecksum; + this.shardedFile = shardedFile; } @Override @@ -130,9 +126,10 @@ public boolean matchesSafely(PipelineResult pipelineResult) { // Load output data List outputs; try { - outputs = readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff()); + outputs = shardedFile.readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff()); } catch (Exception e) { - throw new RuntimeException(String.format("Failed to read from: %s", filePath), e); + throw new RuntimeException( + String.format("Failed to read from: %s", shardedFile), e); } // Verify outputs. Checksum is computed using SHA-1 algorithm @@ -142,81 +139,6 @@ public boolean matchesSafely(PipelineResult pipelineResult) { return actualChecksum.equals(expectedChecksum); } - @VisibleForTesting - List readFilesWithRetries(Sleeper sleeper, BackOff backOff) - throws IOException, InterruptedException { - IOChannelFactory factory = IOChannelUtils.getFactory(filePath); - IOException lastException = null; - - do { - try { - // Match inputPath which may contains glob - Collection files = factory.match(filePath); - LOG.debug("Found {} file(s) by matching the path: {}", files.size(), filePath); - - if (files.isEmpty() || !checkTotalNumOfFiles(files)) { - continue; - } - - // Read data from file paths - return readLines(files, factory); - } catch (IOException e) { - // Ignore and retry - lastException = e; - LOG.warn("Error in file reading. Ignore and retry."); - } - } while(BackOffUtils.next(sleeper, backOff)); - // Failed after max retries - throw new IOException( - String.format("Unable to read file(s) after retrying %d times", MAX_READ_RETRIES), - lastException); - } - - @VisibleForTesting - List readLines(Collection files, IOChannelFactory factory) throws IOException { - List allLines = Lists.newArrayList(); - int i = 1; - for (String file : files) { - try (Reader reader = - Channels.newReader(factory.open(file), StandardCharsets.UTF_8.name())) { - List lines = CharStreams.readLines(reader); - allLines.addAll(lines); - LOG.debug( - "[{} of {}] Read {} lines from file: {}", i, files.size(), lines.size(), file); - } - i++; - } - return allLines; - } - - /** - * Check if total number of files is correct by comparing with the number that - * is parsed from shard name using a name template. If no template is specified, - * "SSSS-of-NNNN" will be used as default, and "NNNN" will be the expected total - * number of files. - * - * @return {@code true} if at least one shard name matches template and total number - * of given files equals the number that is parsed from shard name. - */ - @VisibleForTesting - boolean checkTotalNumOfFiles(Collection files) { - for (String filePath : files) { - Path fileName = Paths.get(filePath).getFileName(); - if (fileName == null) { - // this path has zero elements - continue; - } - Matcher matcher = shardTemplate.matcher(fileName.toString()); - if (!matcher.matches()) { - // shard name doesn't match the pattern, check with the next shard - continue; - } - // once match, extract total number of shards and compare to file list - return files.size() == Integer.parseInt(matcher.group("numshards")); - } - return false; - } - private String computeHash(@Nonnull List strs) { if (strs.isEmpty()) { return Hashing.sha1().hashString("", StandardCharsets.UTF_8).toString(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExplicitShardedFile.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExplicitShardedFile.java new file mode 100644 index 0000000000000..5f5bf1ffaf725 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExplicitShardedFile.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.util; + +import com.google.api.client.util.BackOff; +import com.google.api.client.util.BackOffUtils; +import com.google.api.client.util.Sleeper; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.io.CharStreams; +import java.io.IOException; +import java.io.Reader; +import java.nio.channels.Channels; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** A sharded file where the file names are simply provided. */ +public class ExplicitShardedFile implements ShardedFile { + + private static final Logger LOG = LoggerFactory.getLogger(ExplicitShardedFile.class); + + private static final int MAX_READ_RETRIES = 4; + private static final Duration DEFAULT_SLEEP_DURATION = Duration.standardSeconds(10L); + static final FluentBackoff BACK_OFF_FACTORY = + FluentBackoff.DEFAULT + .withInitialBackoff(DEFAULT_SLEEP_DURATION) + .withMaxRetries(MAX_READ_RETRIES); + + private final Collection files; + + /** Constructs an {@link ExplicitShardedFile} for the given files. */ + public ExplicitShardedFile(Collection files) { + this.files = files; + } + + @Override + public List readFilesWithRetries(Sleeper sleeper, BackOff backOff) + throws IOException, InterruptedException { + if (files.isEmpty()) { + return Collections.emptyList(); + } + + IOChannelFactory factory = IOChannelUtils.getFactory(Iterables.get(files, 0)); + IOException lastException = null; + + do { + try { + // Read data from file paths + return readLines(files, factory); + } catch (IOException e) { + // Ignore and retry + lastException = e; + LOG.warn("Error in file reading. Ignore and retry."); + } + } while (BackOffUtils.next(sleeper, backOff)); + // Failed after max retries + throw new IOException( + String.format("Unable to read file(s) after retrying %d times", MAX_READ_RETRIES), + lastException); + } + + /** + * Discovers all shards of this file using the provided {@link Sleeper} and {@link BackOff}. + * + *

Because of eventual consistency, reads may discover no files or fewer files than the shard + * template implies. In this case, the read is considered to have failed. + */ + public List readFilesWithRetries() throws IOException, InterruptedException { + return readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff()); + } + + @Override + public String toString() { + return String.format("explicit sharded file (%s)", Joiner.on(", ").join(files)); + } + + /** + * Reads all the lines of all the files. + * + *

Not suitable for use except in testing of small data, since the data size may be far more + * than can be reasonably processed serially, in-memory, by a single thread. + */ + @VisibleForTesting + List readLines(Collection files, IOChannelFactory factory) throws IOException { + List allLines = Lists.newArrayList(); + int i = 1; + for (String file : files) { + try (Reader reader = Channels.newReader(factory.open(file), StandardCharsets.UTF_8.name())) { + List lines = CharStreams.readLines(reader); + allLines.addAll(lines); + LOG.debug("[{} of {}] Read {} lines from file: {}", i, files.size(), lines.size(), file); + } + i++; + } + return allLines; + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java new file mode 100644 index 0000000000000..f9f2d6d5dc9bb --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.util; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.api.client.util.BackOff; +import com.google.api.client.util.BackOffUtils; +import com.google.api.client.util.Sleeper; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.common.hash.HashCode; +import com.google.common.hash.Hashing; +import com.google.common.io.CharStreams; +import java.io.IOException; +import java.io.Reader; +import java.nio.channels.Channels; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import javax.annotation.Nonnull; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility methods for working with sharded files. For internal use only; many parameters + * are just hardcoded to allow existing uses to work OK. + */ +public class NumberedShardedFile implements ShardedFile { + + private static final Logger LOG = LoggerFactory.getLogger(NumberedShardedFile.class); + + static final int MAX_READ_RETRIES = 4; + static final Duration DEFAULT_SLEEP_DURATION = Duration.standardSeconds(10L); + static final FluentBackoff BACK_OFF_FACTORY = + FluentBackoff.DEFAULT + .withInitialBackoff(DEFAULT_SLEEP_DURATION) + .withMaxRetries(MAX_READ_RETRIES); + + private static final Pattern DEFAULT_SHARD_TEMPLATE = + Pattern.compile("(?x) \\S* (? \\d+) -of- (? \\d+)"); + + private final String filePath; + private final Pattern shardTemplate; + + /** + * Constructor that uses default shard template. + * + * @param filePath path or glob of files to include + */ + public NumberedShardedFile(String filePath) { + this(filePath, DEFAULT_SHARD_TEMPLATE); + } + + /** + * Constructor. + * + * @param filePath path or glob of files to include + * @param shardTemplate template of shard name to parse out the total number of shards + * which is used in I/O retry to avoid inconsistency of filesystem. + * Customized template should assign name "numshards" to capturing + * group - total shard number. + */ + public NumberedShardedFile(String filePath, Pattern shardTemplate) { + checkArgument( + !Strings.isNullOrEmpty(filePath), + "Expected valid file path, but received %s", filePath); + checkNotNull( + shardTemplate, + "Expected non-null shard pattern. " + + "Please call the other constructor to use default pattern: %s", + DEFAULT_SHARD_TEMPLATE); + + this.filePath = filePath; + this.shardTemplate = shardTemplate; + } + + public String getFilePath() { + return filePath; + } + + /** + * Discovers all shards of this file using the provided {@link Sleeper} and {@link BackOff}. + * + *

Because of eventual consistency, reads may discover no files or fewer files than + * the shard template implies. In this case, the read is considered to have failed. + */ + @Override + public List readFilesWithRetries(Sleeper sleeper, BackOff backOff) + throws IOException, InterruptedException { + IOChannelFactory factory = IOChannelUtils.getFactory(filePath); + IOException lastException = null; + + do { + try { + // Match inputPath which may contains glob + Collection files = factory.match(filePath); + LOG.debug("Found {} file(s) by matching the path: {}", files.size(), filePath); + + if (files.isEmpty() || !checkTotalNumOfFiles(files)) { + continue; + } + + // Read data from file paths + return readLines(files, factory); + } catch (IOException e) { + // Ignore and retry + lastException = e; + LOG.warn("Error in file reading. Ignore and retry."); + } + } while(BackOffUtils.next(sleeper, backOff)); + // Failed after max retries + throw new IOException( + String.format("Unable to read file(s) after retrying %d times", MAX_READ_RETRIES), + lastException); + } + + /** + * Discovers all shards of this file using the provided {@link Sleeper} and {@link BackOff}. + * + *

Because of eventual consistency, reads may discover no files or fewer files than + * the shard template implies. In this case, the read is considered to have failed. + */ + public List readFilesWithRetries() + throws IOException, InterruptedException { + return readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff()); + } + + @Override + public String toString() { + return String.format("%s with shard template '%s'", filePath, shardTemplate); + } + + /** + * Reads all the lines of all the files. + * + *

Not suitable for use except in testing of small data, since the data size may be far more + * than can be reasonably processed serially, in-memory, by a single thread. + */ + @VisibleForTesting + List readLines(Collection files, IOChannelFactory factory) throws IOException { + List allLines = Lists.newArrayList(); + int i = 1; + for (String file : files) { + try (Reader reader = + Channels.newReader(factory.open(file), StandardCharsets.UTF_8.name())) { + List lines = CharStreams.readLines(reader); + allLines.addAll(lines); + LOG.debug( + "[{} of {}] Read {} lines from file: {}", i, files.size(), lines.size(), file); + } + i++; + } + return allLines; + } + + /** + * Check if total number of files is correct by comparing with the number that + * is parsed from shard name using a name template. If no template is specified, + * "SSSS-of-NNNN" will be used as default, and "NNNN" will be the expected total + * number of files. + * + * @return {@code true} if at least one shard name matches template and total number + * of given files equals the number that is parsed from shard name. + */ + @VisibleForTesting + boolean checkTotalNumOfFiles(Collection files) { + for (String filePath : files) { + Path fileName = Paths.get(filePath).getFileName(); + if (fileName == null) { + // this path has zero elements + continue; + } + Matcher matcher = shardTemplate.matcher(fileName.toString()); + if (!matcher.matches()) { + // shard name doesn't match the pattern, check with the next shard + continue; + } + // once match, extract total number of shards and compare to file list + return files.size() == Integer.parseInt(matcher.group("numshards")); + } + return false; + } + + private String computeHash(@Nonnull List strs) { + if (strs.isEmpty()) { + return Hashing.sha1().hashString("", StandardCharsets.UTF_8).toString(); + } + + List hashCodes = new ArrayList<>(); + for (String str : strs) { + hashCodes.add(Hashing.sha1().hashString(str, StandardCharsets.UTF_8)); + } + return Hashing.combineUnordered(hashCodes).toString(); + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ShardedFile.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ShardedFile.java new file mode 100644 index 0000000000000..ec9ed641976c1 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ShardedFile.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.util; + +import com.google.api.client.util.BackOff; +import com.google.api.client.util.Sleeper; +import java.io.IOException; +import java.io.Serializable; +import java.util.List; +import org.apache.beam.sdk.testing.SerializableMatcher; + +/** + * Bare-bones class for using sharded files. + * + *

For internal use only; used only in SDK tests. Must be {@link Serializable} so it can be + * shipped as a {@link SerializableMatcher}. + */ +public interface ShardedFile extends Serializable { + + /** + * Reads the lines from all shards of this file using the provided {@link Sleeper} and {@link + * BackOff}. + */ + List readFilesWithRetries(Sleeper sleeper, BackOff backOff) + throws IOException, InterruptedException; +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java index 0dc307d528c29..5438479273be1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java @@ -19,10 +19,6 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyCollection; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.spy; import com.google.api.client.util.BackOff; import com.google.common.io.Files; @@ -30,9 +26,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.regex.Pattern; - import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.util.IOChannelFactory; import org.apache.beam.sdk.util.IOChannelUtils; import org.junit.Rule; import org.junit.Test; @@ -76,13 +70,6 @@ public void testPreconditionChecksumIsEmpty() throws IOException { new FileChecksumMatcher("", tmpPath); } - @Test - public void testPreconditionFilePathIsNull() { - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage(containsString("Expected valid file path, but received")); - new FileChecksumMatcher("checksumString", null); - } - @Test public void testPreconditionFilePathIsEmpty() { thrown.expect(IllegalArgumentException.class); @@ -158,68 +145,4 @@ public void testMatcherThatUsesCustomizedTemplate() throws Exception { assertThat(pResult, matcher); } - - @Test - public void testReadWithRetriesFailsWhenTemplateIncorrect() throws Exception { - File tmpFile = tmpFolder.newFile(); - Files.write("Test for file checksum verifier.", tmpFile, StandardCharsets.UTF_8); - - FileChecksumMatcher matcher = new FileChecksumMatcher( - "mock-checksum", - IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*"), - Pattern.compile("incorrect-template")); - - thrown.expect(IOException.class); - thrown.expectMessage( - containsString( - "Unable to read file(s) after retrying " + FileChecksumMatcher.MAX_READ_RETRIES)); - matcher.readFilesWithRetries(fastClock, backOff); - } - - @Test - public void testReadWithRetriesFailsSinceFilesystemError() throws Exception { - File tmpFile = tmpFolder.newFile(); - Files.write("Test for file checksum verifier.", tmpFile, StandardCharsets.UTF_8); - - FileChecksumMatcher matcher = - spy(new FileChecksumMatcher( - "mock-checksum", IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*"))); - doThrow(IOException.class) - .when(matcher).readLines(anyCollection(), any(IOChannelFactory.class)); - - thrown.expect(IOException.class); - thrown.expectMessage( - containsString( - "Unable to read file(s) after retrying " + FileChecksumMatcher.MAX_READ_RETRIES)); - matcher.readFilesWithRetries(fastClock, backOff); - } - - @Test - public void testReadWithRetriesFailsWhenOutputDirEmpty() throws Exception { - FileChecksumMatcher matcher = - new FileChecksumMatcher( - "mock-checksum", IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*")); - - thrown.expect(IOException.class); - thrown.expectMessage( - containsString( - "Unable to read file(s) after retrying " + FileChecksumMatcher.MAX_READ_RETRIES)); - matcher.readFilesWithRetries(fastClock, backOff); - } - - @Test - public void testReadWithRetriesFailsWhenRedundantFileLoaded() throws Exception { - tmpFolder.newFile("result-000-of-001"); - tmpFolder.newFile("tmp-result-000-of-001"); - - FileChecksumMatcher matcher = - new FileChecksumMatcher( - "mock-checksum", IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*")); - - thrown.expect(IOException.class); - thrown.expectMessage( - containsString( - "Unable to read file(s) after retrying " + FileChecksumMatcher.MAX_READ_RETRIES)); - matcher.readFilesWithRetries(fastClock, backOff); - } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NumberedShardedFileTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NumberedShardedFileTest.java new file mode 100644 index 0000000000000..475e459e73466 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NumberedShardedFileTest.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.empty; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyCollection; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.spy; + +import com.google.api.client.util.BackOff; +import com.google.common.io.Files; +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.regex.Pattern; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.testing.FastNanoClockAndSleeper; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.Mockito; + +/** Tests for {@link NumberedShardedFile}. */ +@RunWith(JUnit4.class) +public class NumberedShardedFileTest { + @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); + @Rule public ExpectedException thrown = ExpectedException.none(); + @Rule public FastNanoClockAndSleeper fastClock = new FastNanoClockAndSleeper(); + + @Mock private PipelineResult pResult = Mockito.mock(PipelineResult.class); + + private final BackOff backOff = NumberedShardedFile.BACK_OFF_FACTORY.backoff(); + + @Test + public void testPreconditionFilePathIsNull() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage(containsString("Expected valid file path, but received")); + new NumberedShardedFile(null); + } + + @Test + public void testPreconditionFilePathIsEmpty() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage(containsString("Expected valid file path, but received")); + new NumberedShardedFile(""); + } + + @Test + public void testReadMultipleShards() throws Exception { + String + contents1 = "To be or not to be, ", + contents2 = "it is not a question.", + contents3 = "should not be included"; + + File tmpFile1 = tmpFolder.newFile("result-000-of-002"); + File tmpFile2 = tmpFolder.newFile("result-001-of-002"); + File tmpFile3 = tmpFolder.newFile("tmp"); + Files.write(contents1, tmpFile1, StandardCharsets.UTF_8); + Files.write(contents2, tmpFile2, StandardCharsets.UTF_8); + Files.write(contents3, tmpFile3, StandardCharsets.UTF_8); + + NumberedShardedFile shardedFile = + new NumberedShardedFile(IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "result-*")); + + assertThat(shardedFile.readFilesWithRetries(), containsInAnyOrder(contents1, contents2)); + } + + @Test + public void testReadEmpty() throws Exception { + File emptyFile = tmpFolder.newFile("result-000-of-001"); + Files.write("", emptyFile, StandardCharsets.UTF_8); + NumberedShardedFile shardedFile = + new NumberedShardedFile(IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*")); + + assertThat(shardedFile.readFilesWithRetries(), empty()); + } + + @Test + public void testReadCustomTemplate() throws Exception { + String contents1 = "To be or not to be, ", contents2 = "it is not a question."; + + // Customized template: resultSSS-totalNNN + File tmpFile1 = tmpFolder.newFile("result0-total2"); + File tmpFile2 = tmpFolder.newFile("result1-total2"); + Files.write(contents1, tmpFile1, StandardCharsets.UTF_8); + Files.write(contents2, tmpFile2, StandardCharsets.UTF_8); + + Pattern customizedTemplate = + Pattern.compile("(?x) result (?\\d+) - total (?\\d+)"); + NumberedShardedFile shardedFile = + new NumberedShardedFile( + IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*"), customizedTemplate); + + assertThat(shardedFile.readFilesWithRetries(), containsInAnyOrder(contents1, contents2)); + } + + @Test + public void testReadWithRetriesFailsWhenTemplateIncorrect() throws Exception { + File tmpFile = tmpFolder.newFile(); + Files.write("Test for file checksum verifier.", tmpFile, StandardCharsets.UTF_8); + + NumberedShardedFile shardedFile = + new NumberedShardedFile( + IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*"), + Pattern.compile("incorrect-template")); + + thrown.expect(IOException.class); + thrown.expectMessage( + containsString( + "Unable to read file(s) after retrying " + NumberedShardedFile.MAX_READ_RETRIES)); + shardedFile.readFilesWithRetries(fastClock, backOff); + } + + @Test + public void testReadWithRetriesFailsSinceFilesystemError() throws Exception { + File tmpFile = tmpFolder.newFile(); + Files.write("Test for file checksum verifier.", tmpFile, StandardCharsets.UTF_8); + + NumberedShardedFile shardedFile = + spy(new NumberedShardedFile(IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*"))); + doThrow(IOException.class) + .when(shardedFile) + .readLines(anyCollection(), any(IOChannelFactory.class)); + + thrown.expect(IOException.class); + thrown.expectMessage( + containsString( + "Unable to read file(s) after retrying " + NumberedShardedFile.MAX_READ_RETRIES)); + shardedFile.readFilesWithRetries(fastClock, backOff); + } + + @Test + public void testReadWithRetriesFailsWhenOutputDirEmpty() throws Exception { + NumberedShardedFile shardedFile = + new NumberedShardedFile(IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*")); + + thrown.expect(IOException.class); + thrown.expectMessage( + containsString( + "Unable to read file(s) after retrying " + NumberedShardedFile.MAX_READ_RETRIES)); + shardedFile.readFilesWithRetries(fastClock, backOff); + } + + @Test + public void testReadWithRetriesFailsWhenRedundantFileLoaded() throws Exception { + tmpFolder.newFile("result-000-of-001"); + tmpFolder.newFile("tmp-result-000-of-001"); + + NumberedShardedFile shardedFile = + new NumberedShardedFile(IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*")); + + thrown.expect(IOException.class); + thrown.expectMessage( + containsString( + "Unable to read file(s) after retrying " + NumberedShardedFile.MAX_READ_RETRIES)); + shardedFile.readFilesWithRetries(fastClock, backOff); + } +} From 42595dcd29c248bd3572596c9bb8464d18acd19b Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 3 Nov 2016 14:37:26 -0700 Subject: [PATCH 3/3] Revise WindowedWordCount for runner and execution mode portability --- .../beam/examples/WindowedWordCount.java | 177 +++++++++-------- .../common/WriteWindowedFilesDoFn.java | 77 ++++++++ .../beam/examples/WindowedWordCountIT.java | 182 +++++++++++++++--- 3 files changed, 326 insertions(+), 110 deletions(-) create mode 100644 examples/java/src/main/java/org/apache/beam/examples/common/WriteWindowedFilesDoFn.java diff --git a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java index 4e254bd0e56eb..5c19454d244ba 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java +++ b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java @@ -17,26 +17,25 @@ */ package org.apache.beam.examples; -import com.google.api.services.bigquery.model.TableFieldSchema; -import com.google.api.services.bigquery.model.TableReference; -import com.google.api.services.bigquery.model.TableRow; -import com.google.api.services.bigquery.model.TableSchema; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; +import java.util.concurrent.ThreadLocalRandom; import org.apache.beam.examples.common.ExampleBigQueryTableOptions; import org.apache.beam.examples.common.ExampleOptions; -import org.apache.beam.examples.common.ExampleUtils; +import org.apache.beam.examples.common.WriteWindowedFilesDoFn; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.DefaultValueFactory; import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -63,7 +62,8 @@ * 2. Adding timestamps to data * 3. Windowing * 4. Re-using PTransforms over windowed PCollections - * 5. Writing to BigQuery + * 5. Accessing the window of an element + * 6. Writing data to per-window text files * * *

By default, the examples will run with the {@code DirectRunner}. @@ -74,25 +74,23 @@ * * See examples/java/README.md for instructions about how to configure different runners. * - *

Optionally specify the input file path via: - * {@code --inputFile=gs://INPUT_PATH}, - * which defaults to {@code gs://apache-beam-samples/shakespeare/kinglear.txt}. + *

To execute this pipeline locally, specify a local output file (if using the + * {@code DirectRunner}) or output prefix on a supported distributed file system. + *

{@code
+ *   --output=[YOUR_LOCAL_FILE | YOUR_OUTPUT_PREFIX]
+ * }
* - *

Specify an output BigQuery dataset and optionally, a table for the output. If you don't - * specify the table, one will be created for you using the job name. If you don't specify the - * dataset, a dataset called {@code beam_examples} must already exist in your project. - * {@code --bigQueryDataset=YOUR-DATASET --bigQueryTable=YOUR-NEW-TABLE-NAME}. + *

The input file defaults to a public data set containing the text of of King Lear, + * by William Shakespeare. You can override it and choose your own input with {@code --inputFile}. * *

By default, the pipeline will do fixed windowing, on 1-minute windows. You can * change this interval by setting the {@code --windowSize} parameter, e.g. {@code --windowSize=10} * for 10-minute windows. * - *

The example will try to cancel the pipelines on the signal to terminate the process (CTRL-C) - * and then exits. + *

The example will try to cancel the pipeline on the signal to terminate the process (CTRL-C). */ public class WindowedWordCount { - static final int WINDOW_SIZE = 1; // Default window duration in minutes - + static final int WINDOW_SIZE = 10; // Default window duration in minutes /** * Concept #2: A DoFn that sets the data element timestamp. This is a silly method, just for * this example, for the bounded data case. @@ -102,18 +100,22 @@ public class WindowedWordCount { * 2-hour period. */ static class AddTimestampFn extends DoFn { - private static final Duration RAND_RANGE = Duration.standardHours(2); + private static final Duration RAND_RANGE = Duration.standardHours(1); private final Instant minTimestamp; + private final Instant maxTimestamp; - AddTimestampFn() { - this.minTimestamp = new Instant(System.currentTimeMillis()); + AddTimestampFn(Instant minTimestamp, Instant maxTimestamp) { + this.minTimestamp = minTimestamp; + this.maxTimestamp = maxTimestamp; } @ProcessElement public void processElement(ProcessContext c) { - // Generate a timestamp that falls somewhere in the past two hours. - long randMillis = (long) (Math.random() * RAND_RANGE.getMillis()); - Instant randomTimestamp = minTimestamp.plus(randMillis); + Instant randomTimestamp = + new Instant( + ThreadLocalRandom.current() + .nextLong(minTimestamp.getMillis(), maxTimestamp.getMillis())); + /** * Concept #2: Set the data element with that timestamp. */ @@ -121,50 +123,29 @@ public void processElement(ProcessContext c) { } } - /** A DoFn that converts a Word and Count into a BigQuery table row. */ - static class FormatAsTableRowFn extends DoFn, TableRow> { - @ProcessElement - public void processElement(ProcessContext c) { - TableRow row = new TableRow() - .set("word", c.element().getKey()) - .set("count", c.element().getValue()) - // include a field for the window timestamp - .set("window_timestamp", c.timestamp().toString()); - c.output(row); + /** A {@link DefaultValueFactory} that returns the current system time. */ + public static class DefaultToCurrentSystemTime implements DefaultValueFactory { + @Override + public Long create(PipelineOptions options) { + return System.currentTimeMillis(); } } - /** - * Helper method that defines the BigQuery schema used for the output. - */ - private static TableSchema getSchema() { - List fields = new ArrayList<>(); - fields.add(new TableFieldSchema().setName("word").setType("STRING")); - fields.add(new TableFieldSchema().setName("count").setType("INTEGER")); - fields.add(new TableFieldSchema().setName("window_timestamp").setType("TIMESTAMP")); - TableSchema schema = new TableSchema().setFields(fields); - return schema; - } - - /** - * Concept #5: We'll stream the results to a BigQuery table. The BigQuery output source is one - * that supports both bounded and unbounded data. This is a helper method that creates a - * TableReference from input options, to tell the pipeline where to write its BigQuery results. - */ - private static TableReference getTableReference(Options options) { - TableReference tableRef = new TableReference(); - tableRef.setProjectId(options.getProject()); - tableRef.setDatasetId(options.getBigQueryDataset()); - tableRef.setTableId(options.getBigQueryTable()); - return tableRef; + /** A {@link DefaultValueFactory} that returns the minimum timestamp plus one hour. */ + public static class DefaultToMinTimestampPlusOneHour implements DefaultValueFactory { + @Override + public Long create(PipelineOptions options) { + return options.as(Options.class).getMinTimestampMillis() + + Duration.standardHours(1).getMillis(); + } } /** - * Options supported by {@link WindowedWordCount}. + * Options for {@link WindowedWordCount}. * - *

Inherits standard example configuration options, which allow specification of the BigQuery - * table, as well as the {@link WordCount.WordCountOptions} support for - * specification of the input file. + *

Inherits standard example configuration options, which allow specification of the + * runner, as well as the {@link WordCount.WordCountOptions} support for + * specification of the input and output files. */ public interface Options extends WordCount.WordCountOptions, ExampleOptions, ExampleBigQueryTableOptions { @@ -172,14 +153,24 @@ public interface Options extends WordCount.WordCountOptions, @Default.Integer(WINDOW_SIZE) Integer getWindowSize(); void setWindowSize(Integer value); + + @Description("Minimum randomly assigned timestamp, in milliseconds-since-epoch") + @Default.InstanceFactory(DefaultToCurrentSystemTime.class) + Long getMinTimestampMillis(); + void setMinTimestampMillis(Long value); + + @Description("Maximum randomly assigned timestamp, in milliseconds-since-epoch") + @Default.InstanceFactory(DefaultToMinTimestampPlusOneHour.class) + Long getMaxTimestampMillis(); + void setMaxTimestampMillis(Long value); } public static void main(String[] args) throws IOException { Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); - options.setBigQuerySchema(getSchema()); - // ExampleUtils creates the necessary input sources to simplify execution of this Pipeline. - ExampleUtils exampleUtils = new ExampleUtils(options); - exampleUtils.setup(); + final String output = options.getOutput(); + final Duration windowSize = Duration.standardMinutes(options.getWindowSize()); + final Instant minTimestamp = new Instant(options.getMinTimestampMillis()); + final Instant maxTimestamp = new Instant(options.getMaxTimestampMillis()); Pipeline pipeline = Pipeline.create(options); @@ -192,7 +183,7 @@ public static void main(String[] args) throws IOException { .apply(TextIO.Read.from(options.getInputFile())) // Concept #2: Add an element timestamp, using an artificial time just to show windowing. // See AddTimestampFn for more detail on this. - .apply(ParDo.of(new AddTimestampFn())); + .apply(ParDo.of(new AddTimestampFn(minTimestamp, maxTimestamp))); /** * Concept #3: Window into fixed windows. The fixed window size for this example defaults to 1 @@ -200,9 +191,10 @@ public static void main(String[] args) throws IOException { * information on how fixed windows work, and for information on the other types of windowing * available (e.g., sliding windows). */ - PCollection windowedWords = input - .apply(Window.into( - FixedWindows.of(Duration.standardMinutes(options.getWindowSize())))); + PCollection windowedWords = + input.apply( + Window.into( + FixedWindows.of(Duration.standardMinutes(options.getWindowSize())))); /** * Concept #4: Re-use our existing CountWords transform that does not have knowledge of @@ -211,19 +203,40 @@ public static void main(String[] args) throws IOException { PCollection> wordCounts = windowedWords.apply(new WordCount.CountWords()); /** - * Concept #5: Format the results for a BigQuery table, then write to BigQuery. - * The BigQuery output source supports both bounded and unbounded data. + * Concept #5: Customize the output format using windowing information + * + *

At this point, the data is organized by window. We're writing text files and and have no + * late data, so for simplicity we can use the window as the key and {@link GroupByKey} to get + * one output file per window. (if we had late data this key would not be unique) + * + *

To access the window in a {@link DoFn}, add a {@link BoundedWindow} parameter. This will + * be automatically detected and populated with the window for the current element. */ - wordCounts.apply(ParDo.of(new FormatAsTableRowFn())) - .apply(BigQueryIO.Write - .to(getTableReference(options)) - .withSchema(getSchema()) - .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) - .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); + PCollection>> keyedByWindow = + wordCounts.apply( + ParDo.of( + new DoFn, KV>>() { + @ProcessElement + public void processElement(ProcessContext context, IntervalWindow window) { + context.output(KV.of(window, context.element())); + } + })); - PipelineResult result = pipeline.run(); + /** + * Concept #6: Format the results and write to a sharded file partitioned by window, using a + * simple ParDo operation. Because there may be failures followed by retries, the + * writes must be idempotent, but the details of writing to files is elided here. + */ + keyedByWindow + .apply(GroupByKey.>create()) + .apply(ParDo.of(new WriteWindowedFilesDoFn(output))); - // ExampleUtils will try to cancel the pipeline before the program exists. - exampleUtils.waitToFinish(result); + PipelineResult result = pipeline.run(); + try { + result.waitUntilFinish(); + } catch (Exception exc) { + result.cancel(); + } } + } diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/WriteWindowedFilesDoFn.java b/examples/java/src/main/java/org/apache/beam/examples/common/WriteWindowedFilesDoFn.java new file mode 100644 index 0000000000000..cd6baad442920 --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/common/WriteWindowedFilesDoFn.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.common; + +import com.google.common.annotations.VisibleForTesting; +import java.io.OutputStream; +import java.nio.channels.Channels; +import java.nio.charset.StandardCharsets; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.util.IOChannelFactory; +import org.apache.beam.sdk.util.IOChannelUtils; +import org.apache.beam.sdk.values.KV; +import org.joda.time.format.DateTimeFormatter; +import org.joda.time.format.ISODateTimeFormat; + +/** + * A {@link DoFn} that writes elements to files with names deterministically derived from the lower + * and upper bounds of their key (an {@link IntervalWindow}). + * + *

This is test utility code, not for end-users, so examples can be focused + * on their primary lessons. + */ +public class WriteWindowedFilesDoFn + extends DoFn>>, Void> { + + static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8); + static final Coder STRING_CODER = StringUtf8Coder.of(); + + private static DateTimeFormatter formatter = ISODateTimeFormat.hourMinute(); + + private final String output; + + public WriteWindowedFilesDoFn(String output) { + this.output = output; + } + + @VisibleForTesting + public static String fileForWindow(String output, IntervalWindow window) { + return String.format( + "%s-%s-%s", output, formatter.print(window.start()), formatter.print(window.end())); + } + + @ProcessElement + public void processElement(ProcessContext context) throws Exception { + // Build a file name from the window + IntervalWindow window = context.element().getKey(); + String outputShard = fileForWindow(output, window); + + // Open the file and write all the values + IOChannelFactory factory = IOChannelUtils.getFactory(outputShard); + OutputStream out = Channels.newOutputStream(factory.create(outputShard, "text/plain")); + for (KV wordCount : context.element().getValue()) { + STRING_CODER.encode( + wordCount.getKey() + ": " + wordCount.getValue(), out, Coder.Context.OUTER); + out.write(NEWLINE); + } + out.close(); + } +} diff --git a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java index 5d77dd5703958..e4570acb9533c 100644 --- a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java +++ b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java @@ -17,37 +17,59 @@ */ package org.apache.beam.examples; -import java.io.IOException; +import static org.hamcrest.Matchers.equalTo; + +import com.google.api.client.util.Sleeper; +import com.google.common.base.MoreObjects; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import java.util.Collections; import java.util.Date; +import java.util.List; +import java.util.SortedMap; +import java.util.TreeMap; +import org.apache.beam.examples.common.WriteWindowedFilesDoFn; +import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.StreamingOptions; -import org.apache.beam.sdk.testing.BigqueryMatcher; +import org.apache.beam.sdk.testing.FileChecksumMatcher; +import org.apache.beam.sdk.testing.SerializableMatcher; import org.apache.beam.sdk.testing.StreamingIT; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestPipelineOptions; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.util.ExplicitShardedFile; +import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.sdk.util.IOChannelUtils; +import org.apache.beam.sdk.util.ShardedFile; +import org.hamcrest.Description; +import org.hamcrest.TypeSafeMatcher; +import org.joda.time.Duration; +import org.joda.time.Instant; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -/** - * End-to-end integration test of {@link WindowedWordCount}. - */ +/** End-to-end integration test of {@link WindowedWordCount}. */ @RunWith(JUnit4.class) public class WindowedWordCountIT { private static final String DEFAULT_INPUT = "gs://apache-beam-samples/shakespeare/winterstale-personae"; - private static final String DEFAULT_OUTPUT_CHECKSUM = "cd5b52939257e12428a9fa085c32a84dd209b180"; + static final int MAX_READ_RETRIES = 4; + static final Duration DEFAULT_SLEEP_DURATION = Duration.standardSeconds(10L); + static final FluentBackoff BACK_OFF_FACTORY = + FluentBackoff.DEFAULT + .withInitialBackoff(DEFAULT_SLEEP_DURATION) + .withMaxRetries(MAX_READ_RETRIES); - /** - * Options for the {@link WindowedWordCount} Integration Test. - */ + /** Options for the {@link WindowedWordCount} Integration Test. */ public interface WindowedWordCountITOptions - extends WindowedWordCount.Options, TestPipelineOptions, StreamingOptions { - } + extends WindowedWordCount.Options, TestPipelineOptions, StreamingOptions {} @BeforeClass public static void setUp() { @@ -55,36 +77,140 @@ public static void setUp() { } @Test - public void testWindowedWordCountInBatch() throws IOException { - testWindowedWordCountPipeline(false /* isStreaming */); + public void testWindowedWordCountInBatch() throws Exception { + testWindowedWordCountPipeline(defaultOptions()); } @Test @Category(StreamingIT.class) - public void testWindowedWordCountInStreaming() throws IOException { - testWindowedWordCountPipeline(true /* isStreaming */); + public void testWindowedWordCountInStreaming() throws Exception { + testWindowedWordCountPipeline(streamingOptions()); } - private void testWindowedWordCountPipeline(boolean isStreaming) throws IOException { + private WindowedWordCountITOptions defaultOptions() throws Exception { WindowedWordCountITOptions options = TestPipeline.testingPipelineOptions().as(WindowedWordCountITOptions.class); - options.setStreaming(isStreaming); options.setInputFile(DEFAULT_INPUT); + options.setTestTimeoutSeconds(1200L); + + options.setMinTimestampMillis(0L); + options.setMinTimestampMillis(Duration.standardHours(1).getMillis()); + options.setWindowSize(10); + + options.setOutput( + IOChannelUtils.resolve( + options.getTempRoot(), + String.format("WindowedWordCountIT-%tF-% expectedOutputFiles = Lists.newArrayListWithCapacity(6); + for (int startMinute : ImmutableList.of(0, 10, 20, 30, 40, 50)) { + Instant windowStart = + new Instant(options.getMinTimestampMillis()).plus(Duration.standardMinutes(startMinute)); + expectedOutputFiles.add( + WriteWindowedFilesDoFn.fileForWindow( + outputPrefix, + new IntervalWindow(windowStart, windowStart.plus(Duration.standardMinutes(10))))); + } - // Note: currently unused because the example writes to BigQuery, but WindowedWordCount.Options - // are tightly coupled to WordCount.Options, where the option is required. - options.setOutput(IOChannelUtils.resolve( - options.getTempRoot(), - String.format("WindowedWordCountIT-%tF-% expectedWordCounts = new TreeMap<>(); + for (String line : + inputFile.readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff())) { + String[] words = line.split("[^a-zA-Z']+"); + + for (String word : words) { + if (!word.isEmpty()) { + expectedWordCounts.put(word, + MoreObjects.firstNonNull(expectedWordCounts.get(word), 0L) + 1L); + } + } + } - String query = String.format("SELECT word, SUM(count) FROM [%s:%s.%s] GROUP BY word", - options.getProject(), options.getBigQueryDataset(), options.getBigQueryTable()); options.setOnSuccessMatcher( - new BigqueryMatcher( - options.getAppName(), options.getProject(), query, DEFAULT_OUTPUT_CHECKSUM)); + new WordCountsMatcher(expectedWordCounts, new ExplicitShardedFile(expectedOutputFiles))); WindowedWordCount.main(TestPipeline.convertToArgs(options)); } + + /** + * A matcher that bakes in expected word counts, so they can be read directly via some other + * mechanism, and compares a sharded output file with the result. + */ + private static class WordCountsMatcher extends TypeSafeMatcher + implements SerializableMatcher { + + private static final Logger LOG = LoggerFactory.getLogger(FileChecksumMatcher.class); + + private final SortedMap expectedWordCounts; + private final ShardedFile outputFile; + private SortedMap actualCounts; + + public WordCountsMatcher(SortedMap expectedWordCounts, ShardedFile outputFile) { + this.expectedWordCounts = expectedWordCounts; + this.outputFile = outputFile; + } + + @Override + public boolean matchesSafely(PipelineResult pipelineResult) { + try { + // Load output data + List lines = + outputFile.readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff()); + + // Since the windowing is nondeterministic we only check the sums + actualCounts = new TreeMap<>(); + for (String line : lines) { + String[] splits = line.split(": "); + String word = splits[0]; + long count = Long.parseLong(splits[1]); + + Long current = actualCounts.get(word); + if (current == null) { + actualCounts.put(word, count); + } else { + actualCounts.put(word, current + count); + } + } + + return actualCounts.equals(expectedWordCounts); + } catch (Exception e) { + throw new RuntimeException( + String.format("Failed to read from sharded output: %s", outputFile)); + } + } + + @Override + public void describeTo(Description description) { + equalTo(expectedWordCounts).describeTo(description); + } + + @Override + public void describeMismatchSafely(PipelineResult pResult, Description description) { + equalTo(expectedWordCounts).describeMismatch(actualCounts, description); + } + } }