Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ public CompressedSource<T> withDecompression(DecompressingChannelFactory channel
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private CompressedSource(
FileBasedSource<T> sourceDelegate, DecompressingChannelFactory channelFactory) {
super(sourceDelegate.getFileOrPatternSpec(), Long.MAX_VALUE);
super(sourceDelegate.getFileOrPatternSpecProvider(), Long.MAX_VALUE);
this.sourceDelegate = sourceDelegate;
this.channelFactory = channelFactory;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.google.cloud.dataflow.sdk.coders.Coder;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lots of divergence here, but LGETM: https://www.diffchecker.com/kEkSAaQW

import com.google.cloud.dataflow.sdk.coders.SerializableCoder;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.options.ValueProvider;
import com.google.cloud.dataflow.sdk.options.ValueProvider.StaticValueProvider;
import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.util.FileIOChannelFactory;
import com.google.cloud.dataflow.sdk.util.GcsIOChannelFactory;
Expand Down Expand Up @@ -71,7 +73,7 @@ public abstract class FileBasedSink<T> extends Sink<T> {
/**
* Base filename for final output files.
*/
protected final String baseOutputFilename;
protected final ValueProvider<String> baseOutputFilename;

/**
* The extension to be used for the final output files.
Expand All @@ -88,7 +90,7 @@ public abstract class FileBasedSink<T> extends Sink<T> {
* Construct a FileBasedSink with the given base output filename and extension.
*/
public FileBasedSink(String baseOutputFilename, String extension) {
this(baseOutputFilename, extension, ShardNameTemplate.INDEX_OF_MAX);
this(StaticValueProvider.of(baseOutputFilename), extension, ShardNameTemplate.INDEX_OF_MAX);
}

/**
Expand All @@ -98,6 +100,17 @@ public FileBasedSink(String baseOutputFilename, String extension) {
* <p>See {@link ShardNameTemplate} for a description of file naming templates.
*/
public FileBasedSink(String baseOutputFilename, String extension, String fileNamingTemplate) {
this(StaticValueProvider.of(baseOutputFilename), extension, fileNamingTemplate);
}

/**
* Construct a FileBasedSink with the given base output filename, extension, and file naming
* template.
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to keep this in Dataflow -- removal is a breaking change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

* <p>See {@link ShardNameTemplate} for a description of file naming templates.
*/
public FileBasedSink(ValueProvider<String> baseOutputFilename,
String extension, String fileNamingTemplate) {
this.baseOutputFilename = baseOutputFilename;
this.extension = extension;
this.fileNamingTemplate = fileNamingTemplate;
Expand All @@ -107,6 +120,13 @@ public FileBasedSink(String baseOutputFilename, String extension, String fileNam
* Returns the base output filename for this file based sink.
*/
public String getBaseOutputFilename() {
return baseOutputFilename.get();
}

/**
* Returns the base output filename for this file based sink.
*/
public ValueProvider<String> getBaseOutputFilenameProvider() {
return baseOutputFilename;
}

Expand All @@ -130,7 +150,9 @@ public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);

String fileNamePattern = String.format("%s%s%s",
baseOutputFilename, fileNamingTemplate, getFileExtension(extension));
baseOutputFilename.isAccessible()
? baseOutputFilename.get() : baseOutputFilename.toString(),
fileNamingTemplate, getFileExtension(extension));
builder.add(DisplayData.item("fileNamePattern", fileNamePattern)
.withLabel("File Name Pattern"));
}
Expand Down Expand Up @@ -220,7 +242,7 @@ public enum TemporaryFileRetention {
/**
* Base filename used for temporary output files. Default is the baseOutputFilename.
*/
protected final String baseTemporaryFilename;
protected final ValueProvider<String> baseTemporaryFilename;

/**
* Name separator for temporary files. Temporary files will be named
Expand All @@ -243,7 +265,7 @@ protected static final String buildTemporaryFilename(String prefix, String suffi
* @param sink the FileBasedSink that will be used to configure this write operation.
*/
public FileBasedWriteOperation(FileBasedSink<T> sink) {
this(sink, sink.baseOutputFilename);
this(sink, sink.getBaseOutputFilenameProvider(), TemporaryFileRetention.REMOVE);
}

/**
Expand All @@ -253,7 +275,7 @@ public FileBasedWriteOperation(FileBasedSink<T> sink) {
* @param baseTemporaryFilename the base filename to be used for temporary output files.
*/
public FileBasedWriteOperation(FileBasedSink<T> sink, String baseTemporaryFilename) {
this(sink, baseTemporaryFilename, TemporaryFileRetention.REMOVE);
this(sink, StaticValueProvider.of(baseTemporaryFilename), TemporaryFileRetention.REMOVE);
}

/**
Expand All @@ -265,6 +287,12 @@ public FileBasedWriteOperation(FileBasedSink<T> sink, String baseTemporaryFilena
*/
public FileBasedWriteOperation(FileBasedSink<T> sink, String baseTemporaryFilename,
TemporaryFileRetention temporaryFileRetention) {
this(sink, StaticValueProvider.of(baseTemporaryFilename), temporaryFileRetention);
}

private FileBasedWriteOperation(FileBasedSink<T> sink,
ValueProvider<String> baseTemporaryFilename,
TemporaryFileRetention temporaryFileRetention) {
this.sink = sink;
this.baseTemporaryFilename = baseTemporaryFilename;
this.temporaryFileRetention = temporaryFileRetention;
Expand Down Expand Up @@ -360,7 +388,7 @@ protected final List<String> copyToOutputFiles(List<String> filenames, PipelineO
protected final List<String> generateDestinationFilenames(int numFiles) {
List<String> destFilenames = new ArrayList<>();
String extension = getSink().extension;
String baseOutputFilename = getSink().baseOutputFilename;
String baseOutputFilename = getSink().baseOutputFilename.get();
String fileNamingTemplate = getSink().fileNamingTemplate;

String suffix = getFileExtension(extension);
Expand Down Expand Up @@ -395,7 +423,7 @@ protected final void removeTemporaryFiles(PipelineOptions options) throws IOExce
*/
protected final void removeTemporaryFiles(
Collection<String> knownFiles, PipelineOptions options) throws IOException {
String pattern = buildTemporaryFilename(baseTemporaryFilename, "*");
String pattern = buildTemporaryFilename(baseTemporaryFilename.get(), "*");
LOG.debug("Finding temporary bundle output files matching {}.", pattern);
FileOperations fileOperations = FileOperationsFactory.getFileOperations(pattern, options);
IOChannelFactory factory = IOChannelUtils.getFactory(pattern);
Expand Down Expand Up @@ -508,7 +536,7 @@ protected void writeFooter() throws Exception {}
public final void open(String uId) throws Exception {
this.id = uId;
filename = FileBasedWriteOperation.buildTemporaryFilename(
getWriteOperation().baseTemporaryFilename, uId);
getWriteOperation().baseTemporaryFilename.get(), uId);
LOG.debug("Opening {}.", filename);
channel = IOChannelUtils.create(filename, mimeType);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import static com.google.common.base.Preconditions.checkState;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.options.ValueProvider;
import com.google.cloud.dataflow.sdk.options.ValueProvider.StaticValueProvider;
import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.util.IOChannelFactory;
import com.google.cloud.dataflow.sdk.util.IOChannelUtils;
Expand Down Expand Up @@ -73,7 +75,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
// Package-private for testing.
static final int THREAD_POOL_SIZE = 128;

private final String fileOrPatternSpec;
private final ValueProvider<String> fileOrPatternSpec;
private final Mode mode;

/**
Expand All @@ -95,6 +97,16 @@ public enum Mode {
* @param minBundleSize minimum bundle size in bytes.
*/
public FileBasedSource(String fileOrPatternSpec, long minBundleSize) {
this(StaticValueProvider.of(fileOrPatternSpec), minBundleSize);
}


/**
* Create a {@code FileBaseSource} based on a file or a file pattern specification.
* Same as the {@code String} constructor, but accepting a {@link ValueProvider}
* to allow for runtime configuration of the source.
*/
public FileBasedSource(ValueProvider<String> fileOrPatternSpec, long minBundleSize) {
super(0, Long.MAX_VALUE, minBundleSize);
mode = Mode.FILEPATTERN;
this.fileOrPatternSpec = fileOrPatternSpec;
Expand All @@ -120,10 +132,14 @@ public FileBasedSource(String fileName, long minBundleSize,
long startOffset, long endOffset) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noting that there is an API gap here -- you can't do a known offset with a VP filename.

This is probably okay, because if you don't know what file you're reading how could you know what positions you need to read?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree

super(startOffset, endOffset, minBundleSize);
mode = Mode.SINGLE_FILE_OR_SUBRANGE;
this.fileOrPatternSpec = fileName;
this.fileOrPatternSpec = StaticValueProvider.of(fileName);
}

public final String getFileOrPatternSpec() {
return fileOrPatternSpec.get();
}

public final ValueProvider<String> getFileOrPatternSpecProvider() {
return fileOrPatternSpec;
}

Expand All @@ -142,7 +158,9 @@ public final FileBasedSource<T> createSourceForSubrange(long start, long end) {
+ " of the subrange cannot be larger than the end offset value " + getEndOffset()
+ " of the parent source");

FileBasedSource<T> source = createForSubrangeOfFile(fileOrPatternSpec, start, end);
checkState(fileOrPatternSpec.isAccessible(),
"Subrange creation should only happen at execution time.");
FileBasedSource<T> source = createForSubrangeOfFile(fileOrPatternSpec.get(), start, end);
if (start > 0 || end != Long.MAX_VALUE) {
checkArgument(source.getMode() == Mode.SINGLE_FILE_OR_SUBRANGE,
"Source created for the range [" + start + "," + end + ")"
Expand Down Expand Up @@ -180,19 +198,21 @@ public final long getEstimatedSizeBytes(PipelineOptions options) throws Exceptio
// we perform the size estimation of files and file patterns using the interface provided by
// IOChannelFactory.

IOChannelFactory factory = IOChannelUtils.getFactory(fileOrPatternSpec);
if (mode == Mode.FILEPATTERN) {
checkState(fileOrPatternSpec.isAccessible(),
"Size estimation should be done at execution time.");
IOChannelFactory factory = IOChannelUtils.getFactory(fileOrPatternSpec.get());
// TODO Implement a more efficient parallel/batch size estimation mechanism for file patterns.
long startTime = System.currentTimeMillis();
long totalSize = 0;
Collection<String> inputs = factory.match(fileOrPatternSpec);
Collection<String> inputs = factory.match(fileOrPatternSpec.get());
if (inputs.size() <= MAX_NUMBER_OF_FILES_FOR_AN_EXACT_STAT) {
totalSize = getExactTotalSizeOfFiles(inputs, factory);
LOG.debug("Size estimation of all files of pattern " + fileOrPatternSpec + " took "
LOG.debug("Size estimation of all files of pattern " + fileOrPatternSpec.get() + " took "
+ (System.currentTimeMillis() - startTime) + " ms");
} else {
totalSize = getEstimatedSizeOfFilesBySampling(inputs, factory);
LOG.debug("Size estimation of pattern " + fileOrPatternSpec + " by sampling took "
LOG.debug("Size estimation of pattern " + fileOrPatternSpec.get() + " by sampling took "
+ (System.currentTimeMillis() - startTime) + " ms");
}
return totalSize;
Expand Down Expand Up @@ -261,7 +281,7 @@ private static long getEstimatedSizeOfFilesBySampling(
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder.add(DisplayData.item("filePattern", getFileOrPatternSpec())
builder.add(DisplayData.item("filePattern", getFileOrPatternSpecProvider())
.withLabel("File Pattern"));
}

Expand Down Expand Up @@ -294,7 +314,9 @@ public final List<? extends FileBasedSource<T>> splitIntoBundles(
ListeningExecutorService service =
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(THREAD_POOL_SIZE));
try {
for (final String file : FileBasedSource.expandFilePattern(fileOrPatternSpec)) {
checkState(fileOrPatternSpec.isAccessible(),
"Bundle splitting should only happen at execution time.");
for (final String file : FileBasedSource.expandFilePattern(fileOrPatternSpec.get())) {
futures.add(createFutureForFileSplit(file, desiredBundleSizeBytes, options, service));
}
List<? extends FileBasedSource<T>> splitResults =
Expand Down Expand Up @@ -334,8 +356,10 @@ protected boolean isSplittable() throws Exception {
// We split a file-based source into subranges only if the file is efficiently seekable.
// If a file is not efficiently seekable it would be highly inefficient to create and read a
// source based on a subrange of that file.
IOChannelFactory factory = IOChannelUtils.getFactory(fileOrPatternSpec);
return factory.isReadSeekEfficient(fileOrPatternSpec);
checkState(fileOrPatternSpec.isAccessible(),
"isSplittable should only be called at runtime.");
IOChannelFactory factory = IOChannelUtils.getFactory(fileOrPatternSpec.get());
return factory.isReadSeekEfficient(fileOrPatternSpec.get());
}

@Override
Expand All @@ -345,7 +369,7 @@ public final BoundedReader<T> createReader(PipelineOptions options) throws IOExc

if (mode == Mode.FILEPATTERN) {
long startTime = System.currentTimeMillis();
Collection<String> files = FileBasedSource.expandFilePattern(fileOrPatternSpec);
Collection<String> files = FileBasedSource.expandFilePattern(fileOrPatternSpec.get());
List<FileBasedReader<T>> fileReaders = new ArrayList<>();
for (String fileName : files) {
long endOffset;
Expand Down Expand Up @@ -373,9 +397,9 @@ public final BoundedReader<T> createReader(PipelineOptions options) throws IOExc
public String toString() {
switch (mode) {
case FILEPATTERN:
return fileOrPatternSpec;
return fileOrPatternSpec.toString();
case SINGLE_FILE_OR_SUBRANGE:
return fileOrPatternSpec + " range " + super.toString();
return fileOrPatternSpec.toString() + " range " + super.toString();
default:
throw new IllegalStateException("Unexpected mode: " + mode);
}
Expand Down Expand Up @@ -407,8 +431,8 @@ public final long getMaxEndOffset(PipelineOptions options) throws Exception {
throw new IllegalArgumentException("Cannot determine the exact end offset of a file pattern");
}
if (getEndOffset() == Long.MAX_VALUE) {
IOChannelFactory factory = IOChannelUtils.getFactory(fileOrPatternSpec);
return factory.getSizeBytes(fileOrPatternSpec);
IOChannelFactory factory = IOChannelUtils.getFactory(fileOrPatternSpec.get());
return factory.getSizeBytes(fileOrPatternSpec.get());
} else {
return getEndOffset();
}
Expand Down Expand Up @@ -480,8 +504,9 @@ public synchronized FileBasedSource<T> getCurrentSource() {
@Override
protected final boolean startImpl() throws IOException {
FileBasedSource<T> source = getCurrentSource();
IOChannelFactory factory = IOChannelUtils.getFactory(source.getFileOrPatternSpec());
this.channel = factory.open(source.getFileOrPatternSpec());
IOChannelFactory factory = IOChannelUtils.getFactory(
source.getFileOrPatternSpecProvider().get());
this.channel = factory.open(source.getFileOrPatternSpecProvider().get());

if (channel instanceof SeekableByteChannel) {
SeekableByteChannel seekChannel = (SeekableByteChannel) channel;
Expand Down
Loading