From 2edc8cfa638afed0e4095e8a95669e5a82e3192c Mon Sep 17 00:00:00 2001 From: Sam McVeety Date: Mon, 12 Dec 2016 15:50:49 -0800 Subject: [PATCH 1/5] Initial backport --- .../dataflow/sdk/io/CompressedSource.java | 2 +- .../cloud/dataflow/sdk/io/FileBasedSink.java | 79 +++++++++++++++---- .../dataflow/sdk/io/FileBasedSource.java | 59 ++++++++++---- .../google/cloud/dataflow/sdk/io/TextIO.java | 64 ++++++++++++--- .../google/cloud/dataflow/sdk/io/XmlSink.java | 4 +- .../sdk/runners/DataflowPipelineRunner.java | 7 +- .../sdk/runners/dataflow/ReadTranslator.java | 12 ++- .../cloud/dataflow/sdk/io/TextIOTest.java | 19 +++++ .../cloud/dataflow/sdk/io/XmlSinkTest.java | 10 +-- .../runners/DataflowPipelineRunnerTest.java | 20 +++++ .../DataflowPipelineTranslatorTest.java | 28 +++++++ 11 files changed, 246 insertions(+), 58 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java index f0651022b7..2f2d82e823 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java @@ -236,7 +236,7 @@ public CompressedSource withDecompression(DecompressingChannelFactory channel */ private CompressedSource( FileBasedSource sourceDelegate, DecompressingChannelFactory channelFactory) { - super(sourceDelegate.getFileOrPatternSpec(), Long.MAX_VALUE); + super(sourceDelegate.getFileOrPatternSpecProvider(), Long.MAX_VALUE); this.sourceDelegate = sourceDelegate; this.channelFactory = channelFactory; } diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSink.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSink.java index 87102e1bac..e6a1418059 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSink.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSink.java @@ -22,6 +22,10 @@ import com.google.cloud.dataflow.sdk.coders.Coder; import com.google.cloud.dataflow.sdk.coders.SerializableCoder; import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.options.ValueProvider; +import com.google.cloud.dataflow.sdk.options.ValueProvider.NestedValueProvider; +import com.google.cloud.dataflow.sdk.options.ValueProvider.StaticValueProvider; +import com.google.cloud.dataflow.sdk.transforms.SerializableFunction; import com.google.cloud.dataflow.sdk.transforms.display.DisplayData; import com.google.cloud.dataflow.sdk.util.FileIOChannelFactory; import com.google.cloud.dataflow.sdk.util.GcsIOChannelFactory; @@ -31,8 +35,11 @@ import com.google.cloud.dataflow.sdk.util.IOChannelUtils; import com.google.cloud.dataflow.sdk.util.MimeTypes; +import org.joda.time.Instant; +import org.joda.time.format.DateTimeFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.joda.time.Instant; import java.io.IOException; import java.io.Serializable; @@ -71,7 +78,7 @@ public abstract class FileBasedSink extends Sink { /** * Base filename for final output files. */ - protected final String baseOutputFilename; + protected final ValueProvider baseOutputFilename; /** * The extension to be used for the final output files. @@ -88,7 +95,7 @@ public abstract class FileBasedSink extends Sink { * Construct a FileBasedSink with the given base output filename and extension. */ public FileBasedSink(String baseOutputFilename, String extension) { - this(baseOutputFilename, extension, ShardNameTemplate.INDEX_OF_MAX); + this(StaticValueProvider.of(baseOutputFilename), extension, ShardNameTemplate.INDEX_OF_MAX); } /** @@ -98,6 +105,17 @@ public FileBasedSink(String baseOutputFilename, String extension) { *

See {@link ShardNameTemplate} for a description of file naming templates. */ public FileBasedSink(String baseOutputFilename, String extension, String fileNamingTemplate) { + this(StaticValueProvider.of(baseOutputFilename), extension, fileNamingTemplate); + } + + /** + * Construct a FileBasedSink with the given base output filename, extension, and file naming + * template. + * + *

See {@link ShardNameTemplate} for a description of file naming templates. + */ + public FileBasedSink(ValueProvider baseOutputFilename, + String extension, String fileNamingTemplate) { this.baseOutputFilename = baseOutputFilename; this.extension = extension; this.fileNamingTemplate = fileNamingTemplate; @@ -106,7 +124,7 @@ public FileBasedSink(String baseOutputFilename, String extension, String fileNam /** * Returns the base output filename for this file based sink. */ - public String getBaseOutputFilename() { + public ValueProvider getBaseOutputFilenameProvider() { return baseOutputFilename; } @@ -130,7 +148,9 @@ public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); String fileNamePattern = String.format("%s%s%s", - baseOutputFilename, fileNamingTemplate, getFileExtension(extension)); + baseOutputFilename.isAccessible() + ? baseOutputFilename.get() : baseOutputFilename.toString(), + fileNamingTemplate, getFileExtension(extension)); builder.add(DisplayData.item("fileNamePattern", fileNamePattern) .withLabel("File Name Pattern")); } @@ -220,7 +240,7 @@ public enum TemporaryFileRetention { /** * Base filename used for temporary output files. Default is the baseOutputFilename. */ - protected final String baseTemporaryFilename; + protected final ValueProvider baseTemporaryFilename; /** * Name separator for temporary files. Temporary files will be named @@ -243,7 +263,7 @@ protected static final String buildTemporaryFilename(String prefix, String suffi * @param sink the FileBasedSink that will be used to configure this write operation. */ public FileBasedWriteOperation(FileBasedSink sink) { - this(sink, sink.baseOutputFilename); + this(sink, sink.getBaseOutputFilenameProvider(), TemporaryFileRetention.REMOVE); } /** @@ -253,7 +273,7 @@ public FileBasedWriteOperation(FileBasedSink sink) { * @param baseTemporaryFilename the base filename to be used for temporary output files. */ public FileBasedWriteOperation(FileBasedSink sink, String baseTemporaryFilename) { - this(sink, baseTemporaryFilename, TemporaryFileRetention.REMOVE); + this(sink, StaticValueProvider.of(baseTemporaryFilename), TemporaryFileRetention.REMOVE); } /** @@ -265,8 +285,35 @@ public FileBasedWriteOperation(FileBasedSink sink, String baseTemporaryFilena */ public FileBasedWriteOperation(FileBasedSink sink, String baseTemporaryFilename, TemporaryFileRetention temporaryFileRetention) { + this(sink, StaticValueProvider.of(baseTemporaryFilename), TemporaryFileRetention.REMOVE); + } + + private static class TemporaryDirectoryBuilder + implements SerializableFunction { + // The intent of the code is to have a consistent value of tempDirectory across + // all workers, which wouldn't happen if now() was called inline. + Instant now = Instant.now(); + + @Override + public String apply(String baseOutputFilename) { + try { + IOChannelFactory factory = IOChannelUtils.getFactory(baseOutputFilename); + return factory.resolve(baseOutputFilename, + "-temp-" + + now.toString(DateTimeFormat.forPattern("yyyy-MM-DD_HH-mm-ss"))) + .toString(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + private FileBasedWriteOperation(FileBasedSink sink, + ValueProvider baseTemporaryFilename, + TemporaryFileRetention temporaryFileRetention) { this.sink = sink; - this.baseTemporaryFilename = baseTemporaryFilename; + this.baseTemporaryFilename = NestedValueProvider.of( + baseTemporaryFilename, new TemporaryDirectoryBuilder()); this.temporaryFileRetention = temporaryFileRetention; } @@ -360,7 +407,7 @@ protected final List copyToOutputFiles(List filenames, PipelineO protected final List generateDestinationFilenames(int numFiles) { List destFilenames = new ArrayList<>(); String extension = getSink().extension; - String baseOutputFilename = getSink().baseOutputFilename; + String baseOutputFilename = getSink().baseOutputFilename.get(); String fileNamingTemplate = getSink().fileNamingTemplate; String suffix = getFileExtension(extension); @@ -395,17 +442,17 @@ protected final void removeTemporaryFiles(PipelineOptions options) throws IOExce */ protected final void removeTemporaryFiles( Collection knownFiles, PipelineOptions options) throws IOException { - String pattern = buildTemporaryFilename(baseTemporaryFilename, "*"); - LOG.debug("Finding temporary bundle output files matching {}.", pattern); - FileOperations fileOperations = FileOperationsFactory.getFileOperations(pattern, options); - IOChannelFactory factory = IOChannelUtils.getFactory(pattern); - Collection matches = factory.match(pattern); + String tempDir = baseTemporaryFilename.get(); + LOG.debug("Removing temporary bundle output files in {}.", tempDir); + IOChannelFactory factory = IOChannelUtils.getFactory(tempDir); + FileOperations fileOperations = FileOperationsFactory.getFileOperations(tempDir, options); + Collection matches = factory.match(tempDir); Set allMatches = new HashSet<>(matches); allMatches.addAll(knownFiles); LOG.debug( "Removing {} temporary files matching {} ({} matched glob, {} additional known files)", allMatches.size(), - pattern, + tempDir, matches.size(), allMatches.size() - matches.size()); fileOperations.remove(allMatches); @@ -508,7 +555,7 @@ protected void writeFooter() throws Exception {} public final void open(String uId) throws Exception { this.id = uId; filename = FileBasedWriteOperation.buildTemporaryFilename( - getWriteOperation().baseTemporaryFilename, uId); + getWriteOperation().baseTemporaryFilename.get(), uId); LOG.debug("Opening {}.", filename); channel = IOChannelUtils.create(filename, mimeType); try { diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSource.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSource.java index 5e47eeaa62..e48b314717 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSource.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSource.java @@ -18,6 +18,8 @@ import static com.google.common.base.Preconditions.checkState; import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.options.ValueProvider; +import com.google.cloud.dataflow.sdk.options.ValueProvider.StaticValueProvider; import com.google.cloud.dataflow.sdk.transforms.display.DisplayData; import com.google.cloud.dataflow.sdk.util.IOChannelFactory; import com.google.cloud.dataflow.sdk.util.IOChannelUtils; @@ -73,7 +75,7 @@ public abstract class FileBasedSource extends OffsetBasedSource { // Package-private for testing. static final int THREAD_POOL_SIZE = 128; - private final String fileOrPatternSpec; + private final ValueProvider fileOrPatternSpec; private final Mode mode; /** @@ -95,6 +97,16 @@ public enum Mode { * @param minBundleSize minimum bundle size in bytes. */ public FileBasedSource(String fileOrPatternSpec, long minBundleSize) { + this(StaticValueProvider.of(fileOrPatternSpec), minBundleSize); + } + + + /** + * Create a {@code FileBaseSource} based on a file or a file pattern specification. + * Same as the {@code String} constructor, but accepting a {@link ValueProvider} + * to allow for runtime configuration of the source. + */ + public FileBasedSource(ValueProvider fileOrPatternSpec, long minBundleSize) { super(0, Long.MAX_VALUE, minBundleSize); mode = Mode.FILEPATTERN; this.fileOrPatternSpec = fileOrPatternSpec; @@ -120,10 +132,14 @@ public FileBasedSource(String fileName, long minBundleSize, long startOffset, long endOffset) { super(startOffset, endOffset, minBundleSize); mode = Mode.SINGLE_FILE_OR_SUBRANGE; - this.fileOrPatternSpec = fileName; + this.fileOrPatternSpec = StaticValueProvider.of(fileName); } public final String getFileOrPatternSpec() { + return fileOrPatternSpec.get(); + } + + public final ValueProvider getFileOrPatternSpecProvider() { return fileOrPatternSpec; } @@ -142,7 +158,9 @@ public final FileBasedSource createSourceForSubrange(long start, long end) { + " of the subrange cannot be larger than the end offset value " + getEndOffset() + " of the parent source"); - FileBasedSource source = createForSubrangeOfFile(fileOrPatternSpec, start, end); + checkState(fileOrPatternSpec.isAccessible(), + "Subrange creation should only happen at execution time."); + FileBasedSource source = createForSubrangeOfFile(fileOrPatternSpec.get(), start, end); if (start > 0 || end != Long.MAX_VALUE) { checkArgument(source.getMode() == Mode.SINGLE_FILE_OR_SUBRANGE, "Source created for the range [" + start + "," + end + ")" @@ -180,19 +198,21 @@ public final long getEstimatedSizeBytes(PipelineOptions options) throws Exceptio // we perform the size estimation of files and file patterns using the interface provided by // IOChannelFactory. - IOChannelFactory factory = IOChannelUtils.getFactory(fileOrPatternSpec); if (mode == Mode.FILEPATTERN) { + checkState(fileOrPatternSpec.isAccessible(), + "Size estimation should be done at execution time."); + IOChannelFactory factory = IOChannelUtils.getFactory(fileOrPatternSpec.get()); // TODO Implement a more efficient parallel/batch size estimation mechanism for file patterns. long startTime = System.currentTimeMillis(); long totalSize = 0; - Collection inputs = factory.match(fileOrPatternSpec); + Collection inputs = factory.match(fileOrPatternSpec.get()); if (inputs.size() <= MAX_NUMBER_OF_FILES_FOR_AN_EXACT_STAT) { totalSize = getExactTotalSizeOfFiles(inputs, factory); - LOG.debug("Size estimation of all files of pattern " + fileOrPatternSpec + " took " + LOG.debug("Size estimation of all files of pattern " + fileOrPatternSpec.get() + " took " + (System.currentTimeMillis() - startTime) + " ms"); } else { totalSize = getEstimatedSizeOfFilesBySampling(inputs, factory); - LOG.debug("Size estimation of pattern " + fileOrPatternSpec + " by sampling took " + LOG.debug("Size estimation of pattern " + fileOrPatternSpec.get() + " by sampling took " + (System.currentTimeMillis() - startTime) + " ms"); } return totalSize; @@ -294,7 +314,9 @@ public final List> splitIntoBundles( ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(THREAD_POOL_SIZE)); try { - for (final String file : FileBasedSource.expandFilePattern(fileOrPatternSpec)) { + checkState(fileOrPatternSpec.isAccessible(), + "Bundle splitting should only happen at execution time."); + for (final String file : FileBasedSource.expandFilePattern(fileOrPatternSpec.get())) { futures.add(createFutureForFileSplit(file, desiredBundleSizeBytes, options, service)); } List> splitResults = @@ -334,8 +356,10 @@ protected boolean isSplittable() throws Exception { // We split a file-based source into subranges only if the file is efficiently seekable. // If a file is not efficiently seekable it would be highly inefficient to create and read a // source based on a subrange of that file. - IOChannelFactory factory = IOChannelUtils.getFactory(fileOrPatternSpec); - return factory.isReadSeekEfficient(fileOrPatternSpec); + checkState(fileOrPatternSpec.isAccessible(), + "isSplittable should only be called at runtime."); + IOChannelFactory factory = IOChannelUtils.getFactory(fileOrPatternSpec.get()); + return factory.isReadSeekEfficient(fileOrPatternSpec.get()); } @Override @@ -345,7 +369,7 @@ public final BoundedReader createReader(PipelineOptions options) throws IOExc if (mode == Mode.FILEPATTERN) { long startTime = System.currentTimeMillis(); - Collection files = FileBasedSource.expandFilePattern(fileOrPatternSpec); + Collection files = FileBasedSource.expandFilePattern(fileOrPatternSpec.get()); List> fileReaders = new ArrayList<>(); for (String fileName : files) { long endOffset; @@ -373,9 +397,9 @@ public final BoundedReader createReader(PipelineOptions options) throws IOExc public String toString() { switch (mode) { case FILEPATTERN: - return fileOrPatternSpec; + return fileOrPatternSpec.toString(); case SINGLE_FILE_OR_SUBRANGE: - return fileOrPatternSpec + " range " + super.toString(); + return fileOrPatternSpec.toString() + " range " + super.toString(); default: throw new IllegalStateException("Unexpected mode: " + mode); } @@ -407,8 +431,8 @@ public final long getMaxEndOffset(PipelineOptions options) throws Exception { throw new IllegalArgumentException("Cannot determine the exact end offset of a file pattern"); } if (getEndOffset() == Long.MAX_VALUE) { - IOChannelFactory factory = IOChannelUtils.getFactory(fileOrPatternSpec); - return factory.getSizeBytes(fileOrPatternSpec); + IOChannelFactory factory = IOChannelUtils.getFactory(fileOrPatternSpec.get()); + return factory.getSizeBytes(fileOrPatternSpec.get()); } else { return getEndOffset(); } @@ -480,8 +504,9 @@ public synchronized FileBasedSource getCurrentSource() { @Override protected final boolean startImpl() throws IOException { FileBasedSource source = getCurrentSource(); - IOChannelFactory factory = IOChannelUtils.getFactory(source.getFileOrPatternSpec()); - this.channel = factory.open(source.getFileOrPatternSpec()); + IOChannelFactory factory = IOChannelUtils.getFactory( + source.getFileOrPatternSpecProvider().get()); + this.channel = factory.open(source.getFileOrPatternSpecProvider().get()); if (channel instanceof SeekableByteChannel) { SeekableByteChannel seekChannel = (SeekableByteChannel) channel; diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/TextIO.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/TextIO.java index 40a71132c1..23665f3171 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/TextIO.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/TextIO.java @@ -17,6 +17,7 @@ package com.google.cloud.dataflow.sdk.io; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import com.google.cloud.dataflow.sdk.coders.Coder; @@ -25,6 +26,8 @@ import com.google.cloud.dataflow.sdk.coders.VoidCoder; import com.google.cloud.dataflow.sdk.io.Read.Bounded; import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.options.ValueProvider; +import com.google.cloud.dataflow.sdk.options.ValueProvider.StaticValueProvider; import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner; import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner; import com.google.cloud.dataflow.sdk.transforms.PTransform; @@ -148,6 +151,13 @@ public static Bound from(String filepattern) { return new Bound<>(DEFAULT_TEXT_CODER).from(filepattern); } + /** + * Same as {@code from(filepattern)}, but accepting a {@link ValueProvider}. + */ + public static Bound from(ValueProvider filepattern) { + return new Bound<>(DEFAULT_TEXT_CODER).from(filepattern); + } + /** * Returns a transform for reading text files that uses the given * {@code Coder} to decode each of the lines of the file into a @@ -201,7 +211,7 @@ public static Bound withCompressionType(TextIO.CompressionType compressi */ public static class Bound extends PTransform> { /** The filepattern to read from. */ - @Nullable private final String filepattern; + @Nullable private final ValueProvider filepattern; /** The Coder to use to decode each line. */ private final Coder coder; @@ -216,8 +226,8 @@ public static class Bound extends PTransform> { this(null, null, coder, true, TextIO.CompressionType.AUTO); } - private Bound(String name, String filepattern, Coder coder, boolean validate, - TextIO.CompressionType compressionType) { + private Bound(@Nullable String name, @Nullable ValueProvider filepattern, + Coder coder, boolean validate, TextIO.CompressionType compressionType) { super(name); this.coder = coder; this.filepattern = filepattern; @@ -244,6 +254,16 @@ public Bound named(String name) { */ public Bound from(String filepattern) { + checkNotNull(filepattern, "Filepattern cannot be empty."); + return new Bound<>(name, StaticValueProvider.of(filepattern), coder, validate, + compressionType); + } + + /** + * Same as {@code from(filepattern)}, but accepting a {@link ValueProvider}. + */ + public Bound from(ValueProvider filepattern) { + checkNotNull(filepattern, "Filepattern cannot be empty."); return new Bound<>(name, filepattern, coder, validate, compressionType); } @@ -295,14 +315,15 @@ public PCollection apply(PInput input) { } if (validate) { + checkState(filepattern.isAccessible(), "Cannot validate with a RVP."); try { checkState( - !IOChannelUtils.getFactory(filepattern).match(filepattern).isEmpty(), + !IOChannelUtils.getFactory(filepattern.get()).match(filepattern.get()).isEmpty(), "Unable to find any files matching %s", filepattern); } catch (IOException e) { throw new IllegalStateException( - String.format("Failed to validate %s", filepattern), e); + String.format("Failed to validate %s", filepattern.get()), e); } } @@ -352,7 +373,7 @@ protected Coder getDefaultOutputCoder() { } public String getFilepattern() { - return filepattern; + return filepattern.get(); } public boolean needsValidation() { @@ -399,6 +420,13 @@ public static Bound to(String prefix) { return new Bound<>(DEFAULT_TEXT_CODER).to(prefix); } + /** + * Like {@link #to(String)}, but with a {@link ValueProvider}. + */ + public static Bound to(ValueProvider prefix) { + return new Bound<>(DEFAULT_TEXT_CODER).to(prefix); + } + /** * Returns a transform for writing to text files that appends the specified suffix * to the created files. @@ -502,7 +530,7 @@ public static class Bound extends PTransform, PDone> { private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX; /** The prefix of each file written, combined with suffix and shardTemplate. */ - @Nullable private final String filenamePrefix; + @Nullable private final ValueProvider filenamePrefix; /** The suffix of each file written, combined with prefix and shardTemplate. */ private final String filenameSuffix; @@ -528,7 +556,7 @@ public static class Bound extends PTransform, PDone> { this(null, null, "", null, null, coder, 0, DEFAULT_SHARD_TEMPLATE, true); } - private Bound(String name, String filenamePrefix, String filenameSuffix, + private Bound(String name, ValueProvider filenamePrefix, String filenameSuffix, @Nullable String header, @Nullable String footer, Coder coder, int numShards, String shardTemplate, boolean validate) { super(name); @@ -563,7 +591,15 @@ public Bound named(String name) { */ public Bound to(String filenamePrefix) { validateOutputComponent(filenamePrefix); - return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards, + return new Bound<>(name, StaticValueProvider.of(filenamePrefix), filenameSuffix, + header, footer, coder, numShards, shardTemplate, validate); + } + + /** + * Like {@link #to(String)}, but with a {@link ValueProvider}. + */ + public Bound to(ValueProvider filenamePrefix) { + return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards, shardTemplate, validate); } @@ -742,7 +778,7 @@ protected Coder getDefaultOutputCoder() { } public String getFilenamePrefix() { - return filenamePrefix; + return filenamePrefix.get(); } public String getShardTemplate() { @@ -852,6 +888,12 @@ static class TextSource extends FileBasedSource { this.coder = coder; } + @VisibleForTesting + TextSource(ValueProvider fileSpec, Coder coder) { + super(fileSpec, 1L); + this.coder = coder; + } + private TextSource(String fileName, long start, long end, Coder coder) { super(fileName, 1L, start, end); this.coder = coder; @@ -1054,7 +1096,7 @@ static class TextSink extends FileBasedSink { @VisibleForTesting TextSink( - String baseOutputFilename, String extension, + ValueProvider baseOutputFilename, String extension, @Nullable String header, @Nullable String footer, String fileNameTemplate, Coder coder) { super(baseOutputFilename, extension, fileNameTemplate); diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/XmlSink.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/XmlSink.java index 104611ef94..720ef555b2 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/XmlSink.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/XmlSink.java @@ -177,7 +177,7 @@ private Bound(Class classToBind, String rootElementName, String baseOutputFil *

The specified class must be able to be used to create a JAXB context. */ public Bound ofRecordClass(Class classToBind) { - return new Bound<>(classToBind, rootElementName, baseOutputFilename); + return new Bound<>(classToBind, rootElementName, baseOutputFilename.get()); } /** @@ -195,7 +195,7 @@ public Bound toFilenamePrefix(String baseOutputFilename) { * supplied name. */ public Bound withRootElement(String rootElementName) { - return new Bound<>(classToBind, rootElementName, baseOutputFilename); + return new Bound<>(classToBind, rootElementName, baseOutputFilename.get()); } /** diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java index 318d0abd20..c1a7b88414 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java @@ -2120,8 +2120,11 @@ public BatchWrite(DataflowPipelineRunner runner, Write.Bound transform) { public PDone apply(PCollection input) { if (transform.getSink() instanceof FileBasedSink) { FileBasedSink sink = (FileBasedSink) transform.getSink(); - PathValidator validator = runner.options.getPathValidator(); - validator.validateOutputFilePrefixSupported(sink.getBaseOutputFilename()); + if (sink.getBaseOutputFilenameProvider().isAccessible()) { + PathValidator validator = runner.options.getPathValidator(); + validator.validateOutputFilePrefixSupported( + sink.getBaseOutputFilenameProvider().get()); + } } return transform.apply(input); } diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/ReadTranslator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/ReadTranslator.java index f110e84adc..1111d786c3 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/ReadTranslator.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/ReadTranslator.java @@ -24,6 +24,7 @@ import com.google.cloud.dataflow.sdk.io.FileBasedSource; import com.google.cloud.dataflow.sdk.io.Read; import com.google.cloud.dataflow.sdk.io.Source; +import com.google.cloud.dataflow.sdk.options.ValueProvider; import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator; import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TransformTranslator; import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TranslationContext; @@ -50,10 +51,13 @@ public static void translateReadHelper(Source source, // TODO: Move this validation out of translation once IOChannelUtils is portable // and can be reconstructed on the worker. if (source instanceof FileBasedSource) { - String filePatternOrSpec = ((FileBasedSource) source).getFileOrPatternSpec(); - context.getPipelineOptions() - .getPathValidator() - .validateInputFilePatternSupported(filePatternOrSpec); + ValueProvider filePatternOrSpec = + ((FileBasedSource) source).getFileOrPatternSpecProvider(); + if (filePatternOrSpec.isAccessible()) { + context.getPipelineOptions() + .getPathValidator() + .validateInputFilePatternSupported(filePatternOrSpec.get()); + } } context.addStep(transform, "ParallelRead"); diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java index 1d65d1ce2d..063760681e 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java @@ -50,6 +50,7 @@ import com.google.cloud.dataflow.sdk.io.TextIO.TextSource; import com.google.cloud.dataflow.sdk.options.PipelineOptions; import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.options.ValueProvider; import com.google.cloud.dataflow.sdk.testing.DataflowAssert; import com.google.cloud.dataflow.sdk.testing.SourceTestUtils; import com.google.cloud.dataflow.sdk.testing.TestDataflowPipelineOptions; @@ -622,6 +623,24 @@ public void testBadWildcardRecursive() throws Exception { pipeline.apply(TextIO.Read.from("gs://bucket/foo**/baz")); } + /** Options for testing. */ + public interface RuntimeTestOptions extends PipelineOptions { + ValueProvider getInput(); + void setInput(ValueProvider value); + + ValueProvider getOutput(); + void setOutput(ValueProvider value); + } + + @Test + public void testRuntimeOptionsNotCalledInApply() throws Exception { + Pipeline pipeline = TestPipeline.create(); + RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class); + pipeline + .apply(TextIO.Read.from(options.getInput()).withoutValidation()) + .apply(TextIO.Write.to(options.getOutput()).withoutValidation()); + } + @Test public void testReadWithoutValidationFlag() throws Exception { TextIO.Read.Bound read = TextIO.Read.from("gs://bucket/foo*/baz"); diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/XmlSinkTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/XmlSinkTest.java index 22c1b59f45..9cf1ad1245 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/XmlSinkTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/XmlSinkTest.java @@ -93,7 +93,7 @@ public void testBuildXmlSink() { .withRootElement(testRootElement); assertEquals(testClass, sink.classToBind); assertEquals(testRootElement, sink.rootElementName); - assertEquals(testFilePrefix, sink.baseOutputFilename); + assertEquals(testFilePrefix, sink.baseOutputFilename.get()); } /** @@ -105,7 +105,7 @@ public void testBuildXmlSinkDirect() { XmlSink.writeOf(Bird.class, testRootElement, testFilePrefix); assertEquals(testClass, sink.classToBind); assertEquals(testRootElement, sink.rootElementName); - assertEquals(testFilePrefix, sink.baseOutputFilename); + assertEquals(testFilePrefix, sink.baseOutputFilename.get()); } /** @@ -142,10 +142,10 @@ public void testCreateWriteOperations() { XmlSink.writeOf(testClass, testRootElement, testFilePrefix); XmlWriteOperation writeOp = sink.createWriteOperation(options); assertEquals(testClass, writeOp.getSink().classToBind); - assertEquals(testFilePrefix, writeOp.getSink().baseOutputFilename); + assertEquals(testFilePrefix, writeOp.getSink().baseOutputFilename.get()); assertEquals(testRootElement, writeOp.getSink().rootElementName); assertEquals(XmlSink.XML_EXTENSION, writeOp.getSink().extension); - assertEquals(testFilePrefix, writeOp.baseTemporaryFilename); + assertEquals(testFilePrefix, writeOp.baseTemporaryFilename.get()); } /** @@ -158,7 +158,7 @@ public void testCreateWriter() throws Exception { XmlSink.writeOf(testClass, testRootElement, testFilePrefix) .createWriteOperation(options); XmlWriter writer = writeOp.createWriter(options); - assertEquals(testFilePrefix, writer.getWriteOperation().baseTemporaryFilename); + assertEquals(testFilePrefix, writer.getWriteOperation().baseTemporaryFilename.get()); assertEquals(testRootElement, writer.getWriteOperation().getSink().rootElementName); assertNotNull(writer.marshaller); } diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java index 24cb9eca69..5d513f333d 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java @@ -55,6 +55,7 @@ import com.google.cloud.dataflow.sdk.options.PipelineOptions; import com.google.cloud.dataflow.sdk.options.PipelineOptions.CheckEnabled; import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.options.ValueProvider; import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.BatchViewAsList; import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.BatchViewAsMap; import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.BatchViewAsMultimap; @@ -334,6 +335,25 @@ protected boolean matchesSafely(DataflowJobAlreadyUpdatedException item) { p.run(); } + /** Options for testing. */ + public interface RuntimeTestOptions extends PipelineOptions { + ValueProvider getInput(); + void setInput(ValueProvider value); + + ValueProvider getOutput(); + void setOutput(ValueProvider value); + } + + @Test + public void testTextIOWithRuntimeParameters() throws IOException { + DataflowPipelineOptions dataflowOptions = buildPipelineOptions(); + RuntimeTestOptions options = dataflowOptions.as(RuntimeTestOptions.class); + Pipeline p = buildDataflowPipeline(dataflowOptions); + p + .apply(TextIO.Read.from(options.getInput()).withoutValidation()) + .apply(TextIO.Write.to(options.getOutput()).withoutValidation()); + } + @Test public void testRunWithFiles() throws IOException { // Test that the function DataflowPipelineRunner.stageFiles works as diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java index 0c4b1e37b2..d679594c63 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java @@ -51,6 +51,7 @@ import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; import com.google.cloud.dataflow.sdk.options.DataflowPipelineWorkerPoolOptions; import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.options.ValueProvider; import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TranslationContext; import com.google.cloud.dataflow.sdk.transforms.Count; import com.google.cloud.dataflow.sdk.transforms.Create; @@ -203,6 +204,33 @@ public void testSettingOfSdkPipelineOptions() throws IOException { assertThat(optionsMap, hasEntry("numberOfWorkerHarnessThreads", (Object) 0)); } + private static class TestValueProvider implements ValueProvider, Serializable { + @Override + public boolean isAccessible() { + return false; + } + + @Override + public String get() { + throw new RuntimeException("Should not be called."); + } + } + + @Test + public void testInaccessibleProvider() throws Exception { + DataflowPipelineOptions options = buildPipelineOptions(); + Pipeline pipeline = Pipeline.create(options); + DataflowPipelineTranslator t = DataflowPipelineTranslator.fromOptions(options); + + pipeline.apply(TextIO.Read.from(new TestValueProvider()).withoutValidation()); + + // Check that translation does not fail. + t.translate( + pipeline, + (DataflowPipelineRunner) pipeline.getRunner(), + Collections.emptyList()); + } + @Test public void testNetworkConfig() throws IOException { final String testNetwork = "test-network"; From c416b5d6a42be3164ca4c0c13070a15431eab8ec Mon Sep 17 00:00:00 2001 From: Sam McVeety Date: Mon, 12 Dec 2016 16:33:33 -0800 Subject: [PATCH 2/5] Initial backport --- .../cloud/dataflow/sdk/io/FileBasedSink.java | 39 ++++--------------- 1 file changed, 7 insertions(+), 32 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSink.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSink.java index e6a1418059..e0cb529025 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSink.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSink.java @@ -23,7 +23,6 @@ import com.google.cloud.dataflow.sdk.coders.SerializableCoder; import com.google.cloud.dataflow.sdk.options.PipelineOptions; import com.google.cloud.dataflow.sdk.options.ValueProvider; -import com.google.cloud.dataflow.sdk.options.ValueProvider.NestedValueProvider; import com.google.cloud.dataflow.sdk.options.ValueProvider.StaticValueProvider; import com.google.cloud.dataflow.sdk.transforms.SerializableFunction; import com.google.cloud.dataflow.sdk.transforms.display.DisplayData; @@ -35,11 +34,8 @@ import com.google.cloud.dataflow.sdk.util.IOChannelUtils; import com.google.cloud.dataflow.sdk.util.MimeTypes; -import org.joda.time.Instant; -import org.joda.time.format.DateTimeFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.joda.time.Instant; import java.io.IOException; import java.io.Serializable; @@ -288,32 +284,11 @@ public FileBasedWriteOperation(FileBasedSink sink, String baseTemporaryFilena this(sink, StaticValueProvider.of(baseTemporaryFilename), TemporaryFileRetention.REMOVE); } - private static class TemporaryDirectoryBuilder - implements SerializableFunction { - // The intent of the code is to have a consistent value of tempDirectory across - // all workers, which wouldn't happen if now() was called inline. - Instant now = Instant.now(); - - @Override - public String apply(String baseOutputFilename) { - try { - IOChannelFactory factory = IOChannelUtils.getFactory(baseOutputFilename); - return factory.resolve(baseOutputFilename, - "-temp-" - + now.toString(DateTimeFormat.forPattern("yyyy-MM-DD_HH-mm-ss"))) - .toString(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - } - private FileBasedWriteOperation(FileBasedSink sink, ValueProvider baseTemporaryFilename, TemporaryFileRetention temporaryFileRetention) { this.sink = sink; - this.baseTemporaryFilename = NestedValueProvider.of( - baseTemporaryFilename, new TemporaryDirectoryBuilder()); + this.baseTemporaryFilename = baseTemporaryFilename; this.temporaryFileRetention = temporaryFileRetention; } @@ -442,17 +417,17 @@ protected final void removeTemporaryFiles(PipelineOptions options) throws IOExce */ protected final void removeTemporaryFiles( Collection knownFiles, PipelineOptions options) throws IOException { - String tempDir = baseTemporaryFilename.get(); - LOG.debug("Removing temporary bundle output files in {}.", tempDir); - IOChannelFactory factory = IOChannelUtils.getFactory(tempDir); - FileOperations fileOperations = FileOperationsFactory.getFileOperations(tempDir, options); - Collection matches = factory.match(tempDir); + String pattern = buildTemporaryFilename(baseTemporaryFilename.get(), "*"); + LOG.debug("Finding temporary bundle output files matching {}.", pattern); + FileOperations fileOperations = FileOperationsFactory.getFileOperations(pattern, options); + IOChannelFactory factory = IOChannelUtils.getFactory(pattern); + Collection matches = factory.match(pattern); Set allMatches = new HashSet<>(matches); allMatches.addAll(knownFiles); LOG.debug( "Removing {} temporary files matching {} ({} matched glob, {} additional known files)", allMatches.size(), - tempDir, + pattern, matches.size(), allMatches.size() - matches.size()); fileOperations.remove(allMatches); From a178bcc9b5585bbc833463f62cbcf464894b4174 Mon Sep 17 00:00:00 2001 From: Sam McVeety Date: Mon, 12 Dec 2016 16:54:31 -0800 Subject: [PATCH 3/5] Initial backport --- .../java/com/google/cloud/dataflow/sdk/io/FileBasedSink.java | 3 +-- .../java/com/google/cloud/dataflow/sdk/io/FileBasedSource.java | 2 +- .../dataflow/sdk/runners/DataflowPipelineTranslatorTest.java | 1 + 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSink.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSink.java index e0cb529025..c695149334 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSink.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSink.java @@ -24,7 +24,6 @@ import com.google.cloud.dataflow.sdk.options.PipelineOptions; import com.google.cloud.dataflow.sdk.options.ValueProvider; import com.google.cloud.dataflow.sdk.options.ValueProvider.StaticValueProvider; -import com.google.cloud.dataflow.sdk.transforms.SerializableFunction; import com.google.cloud.dataflow.sdk.transforms.display.DisplayData; import com.google.cloud.dataflow.sdk.util.FileIOChannelFactory; import com.google.cloud.dataflow.sdk.util.GcsIOChannelFactory; @@ -281,7 +280,7 @@ public FileBasedWriteOperation(FileBasedSink sink, String baseTemporaryFilena */ public FileBasedWriteOperation(FileBasedSink sink, String baseTemporaryFilename, TemporaryFileRetention temporaryFileRetention) { - this(sink, StaticValueProvider.of(baseTemporaryFilename), TemporaryFileRetention.REMOVE); + this(sink, StaticValueProvider.of(baseTemporaryFilename), temporaryFileRetention); } private FileBasedWriteOperation(FileBasedSink sink, diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSource.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSource.java index e48b314717..ea67a9e22c 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSource.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSource.java @@ -281,7 +281,7 @@ private static long getEstimatedSizeOfFilesBySampling( @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - builder.add(DisplayData.item("filePattern", getFileOrPatternSpec()) + builder.add(DisplayData.item("filePattern", getFileOrPatternSpecProvider()) .withLabel("File Pattern")); } diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java index d679594c63..78d905d160 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java @@ -219,6 +219,7 @@ public String get() { @Test public void testInaccessibleProvider() throws Exception { DataflowPipelineOptions options = buildPipelineOptions(); + options.setRunner(DataflowPipelineRunner.class); Pipeline pipeline = Pipeline.create(options); DataflowPipelineTranslator t = DataflowPipelineTranslator.fromOptions(options); From af38399916559ade59e56c569fa4dd6ec1f3db16 Mon Sep 17 00:00:00 2001 From: Sam McVeety Date: Tue, 13 Dec 2016 11:08:36 -0800 Subject: [PATCH 4/5] Fixups --- .../com/google/cloud/dataflow/sdk/io/FileBasedSink.java | 7 +++++++ .../main/java/com/google/cloud/dataflow/sdk/io/TextIO.java | 3 ++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSink.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSink.java index c695149334..428d843dae 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSink.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSink.java @@ -116,6 +116,13 @@ public FileBasedSink(ValueProvider baseOutputFilename, this.fileNamingTemplate = fileNamingTemplate; } + /** + * Returns the base output filename for this file based sink. + */ + public String getBaseOutputFilename() { + return baseOutputFilename.get(); + } + /** * Returns the base output filename for this file based sink. */ diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/TextIO.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/TextIO.java index 23665f3171..216c07fdcd 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/TextIO.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/TextIO.java @@ -315,7 +315,8 @@ public PCollection apply(PInput input) { } if (validate) { - checkState(filepattern.isAccessible(), "Cannot validate with a RVP."); + checkState(filepattern.isAccessible(), + "Cannot validate with a filepattern provided at runtime."); try { checkState( !IOChannelUtils.getFactory(filepattern.get()).match(filepattern.get()).isEmpty(), From 7b9089a04a574fcc286d89790258112e1313d943 Mon Sep 17 00:00:00 2001 From: Sam McVeety Date: Tue, 13 Dec 2016 11:57:51 -0800 Subject: [PATCH 5/5] Fixups --- .../runners/DataflowPipelineRunnerTest.java | 38 +++++++++---------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java index 5d513f333d..ffcc688165 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java @@ -250,6 +250,25 @@ public void testRun() throws IOException { assertValidJob(jobCaptor.getValue()); } + /** Options for testing. */ + public interface RuntimeTestOptions extends PipelineOptions { + ValueProvider getInput(); + void setInput(ValueProvider value); + + ValueProvider getOutput(); + void setOutput(ValueProvider value); + } + + @Test + public void testTextIOWithRuntimeParameters() throws IOException { + DataflowPipelineOptions dataflowOptions = buildPipelineOptions(); + RuntimeTestOptions options = dataflowOptions.as(RuntimeTestOptions.class); + Pipeline p = buildDataflowPipeline(dataflowOptions); + p + .apply(TextIO.Read.from(options.getInput()).withoutValidation()) + .apply(TextIO.Write.to(options.getOutput()).withoutValidation()); + } + @Test public void testRunReturnDifferentRequestId() throws IOException { DataflowPipelineOptions options = buildPipelineOptions(); @@ -335,25 +354,6 @@ protected boolean matchesSafely(DataflowJobAlreadyUpdatedException item) { p.run(); } - /** Options for testing. */ - public interface RuntimeTestOptions extends PipelineOptions { - ValueProvider getInput(); - void setInput(ValueProvider value); - - ValueProvider getOutput(); - void setOutput(ValueProvider value); - } - - @Test - public void testTextIOWithRuntimeParameters() throws IOException { - DataflowPipelineOptions dataflowOptions = buildPipelineOptions(); - RuntimeTestOptions options = dataflowOptions.as(RuntimeTestOptions.class); - Pipeline p = buildDataflowPipeline(dataflowOptions); - p - .apply(TextIO.Read.from(options.getInput()).withoutValidation()) - .apply(TextIO.Write.to(options.getOutput()).withoutValidation()); - } - @Test public void testRunWithFiles() throws IOException { // Test that the function DataflowPipelineRunner.stageFiles works as