Skip to content

Commit

Permalink
[FLINK-3655] [core] Add support for multiple file paths for FileInput…
Browse files Browse the repository at this point in the history
…Format.

- Reverted API-breaking changes.
- Enable multi-path support for the following InputFormats:
  - AvroInputFormat,
  - [Pojo,Row,Tuple]CsvInputFormat,
  - OrcInputFormat,
  - TextInputFormat,
  - TextValueInputFormat
  • Loading branch information
fhueske committed Feb 16, 2018
1 parent 632996f commit 892eb85
Show file tree
Hide file tree
Showing 14 changed files with 558 additions and 534 deletions.
Expand Up @@ -393,6 +393,11 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE
} }
} }


@Override
public boolean supportsMultiPaths() {
return true;
}

// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
// Getter methods for tests // Getter methods for tests
// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
Expand Down
Expand Up @@ -111,13 +111,13 @@ public long getBlockSize() {


@Override @Override
public FileInputSplit[] createInputSplits(int minNumSplits) throws IOException { public FileInputSplit[] createInputSplits(int minNumSplits) throws IOException {
List<FileStatus> files = this.getFiles(); final List<FileStatus> files = this.getFiles();

final FileSystem fs = getFilePath().getFileSystem();
final long blockSize = this.blockSize == NATIVE_BLOCK_SIZE ? fs.getDefaultBlockSize() : this.blockSize;


final List<FileInputSplit> inputSplits = new ArrayList<FileInputSplit>(minNumSplits); final List<FileInputSplit> inputSplits = new ArrayList<FileInputSplit>(minNumSplits);
for (FileStatus file : files) { for (FileStatus file : files) {
final FileSystem fs = file.getPath().getFileSystem();
final long blockSize = this.blockSize == NATIVE_BLOCK_SIZE ? fs.getDefaultBlockSize() : this.blockSize;

for (long pos = 0, length = file.getLen(); pos < length; pos += blockSize) { for (long pos = 0, length = file.getLen(); pos < length; pos += blockSize) {
long remainingLength = Math.min(pos + blockSize, length) - pos; long remainingLength = Math.min(pos + blockSize, length) - pos;


Expand All @@ -132,10 +132,10 @@ public FileInputSplit[] createInputSplits(int minNumSplits) throws IOException {


if (inputSplits.size() < minNumSplits) { if (inputSplits.size() < minNumSplits) {
LOG.warn(String.format( LOG.warn(String.format(
"With the given block size %d, the file %s cannot be split into %d blocks. Filling up with empty splits...", "With the given block size %d, the files %s cannot be split into %d blocks. Filling up with empty splits...",
blockSize, getFilePath(), minNumSplits)); blockSize, Arrays.toString(getFilePaths()), minNumSplits));
FileStatus last = files.get(files.size() - 1); FileStatus last = files.get(files.size() - 1);
final BlockLocation[] blocks = fs.getFileBlockLocations(last, 0, last.getLen()); final BlockLocation[] blocks = last.getPath().getFileSystem().getFileBlockLocations(last, 0, last.getLen());
for (int index = files.size(); index < minNumSplits; index++) { for (int index = files.size(); index < minNumSplits; index++) {
inputSplits.add(new FileInputSplit(index, last.getPath(), last.getLen(), 0, blocks[0].getHosts())); inputSplits.add(new FileInputSplit(index, last.getPath(), last.getLen(), 0, blocks[0].getHosts()));
} }
Expand All @@ -146,9 +146,9 @@ public FileInputSplit[] createInputSplits(int minNumSplits) throws IOException {


protected List<FileStatus> getFiles() throws IOException { protected List<FileStatus> getFiles() throws IOException {
// get all the files that are involved in the splits // get all the files that are involved in the splits
List<FileStatus> files = new ArrayList<FileStatus>(); List<FileStatus> files = new ArrayList<>();


for (Path filePath: this.filePathList) { for (Path filePath: getFilePaths()) {
final FileSystem fs = filePath.getFileSystem(); final FileSystem fs = filePath.getFileSystem();
final FileStatus pathFile = fs.getFileStatus(filePath); final FileStatus pathFile = fs.getFileStatus(filePath);


Expand All @@ -172,10 +172,10 @@ public SequentialStatistics getStatistics(BaseStatistics cachedStats) {


final FileBaseStatistics cachedFileStats = cachedStats instanceof FileBaseStatistics ? final FileBaseStatistics cachedFileStats = cachedStats instanceof FileBaseStatistics ?
(FileBaseStatistics) cachedStats : null; (FileBaseStatistics) cachedStats : null;

try { try {
final ArrayList<FileStatus> allFiles = new ArrayList<FileStatus>(1); final ArrayList<FileStatus> allFiles = new ArrayList<FileStatus>(1);
final FileBaseStatistics stats = getFileStats(cachedFileStats, this.filePathList, allFiles); final FileBaseStatistics stats = getFileStats(cachedFileStats, getFilePaths(), allFiles);
if (stats == null) { if (stats == null) {
return null; return null;
} }
Expand All @@ -187,19 +187,18 @@ public SequentialStatistics getStatistics(BaseStatistics cachedStats) {
} catch (IOException ioex) { } catch (IOException ioex) {
if (LOG.isWarnEnabled()) { if (LOG.isWarnEnabled()) {
LOG.warn( LOG.warn(
String.format("Could not determine complete statistics for files in '%s' due to an I/O error", String.format("Could not determine complete statistics for files '%s' due to an I/O error",
this.filePathList), Arrays.toString(getFilePaths())),
ioex); ioex);
} }
} catch (Throwable t) { } catch (Throwable t) {
if (LOG.isErrorEnabled()) { if (LOG.isErrorEnabled()) {
LOG.error( LOG.error(
String.format("Unexpected problem while getting the file statistics for file in'%s'", String.format("Unexpected problem while getting the file statistics for files '%s'",
this.filePathList), Arrays.toString(getFilePaths())),
t); t);
} }
} }

// no stats available // no stats available
return null; return null;
} }
Expand Down
Expand Up @@ -350,33 +350,28 @@ public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOExc
final int oldLineLengthLimit = this.lineLengthLimit; final int oldLineLengthLimit = this.lineLengthLimit;
try { try {


final ArrayList<FileStatus> allFiles = new ArrayList<FileStatus>(1); final ArrayList<FileStatus> allFiles = new ArrayList<>(1);


// let the file input format deal with the up-to-date check and the // let the file input format deal with the up-to-date check and the basic size
// basic size final FileBaseStatistics stats = getFileStats(cachedFileStats, getFilePaths(), allFiles);
final FileBaseStatistics stats = getFileStats(cachedFileStats, this.filePathList, allFiles);
if (stats == null) { if (stats == null) {
return null; return null;
} }


// check whether the width per record is already known or the total // check whether the width per record is already known or the total size is unknown as well
// size is unknown as well
// in both cases, we return the stats as they are // in both cases, we return the stats as they are
if (stats.getAverageRecordWidth() != FileBaseStatistics.AVG_RECORD_BYTES_UNKNOWN if (stats.getAverageRecordWidth() != FileBaseStatistics.AVG_RECORD_BYTES_UNKNOWN ||
|| stats.getTotalInputSize() == FileBaseStatistics.SIZE_UNKNOWN) { stats.getTotalInputSize() == FileBaseStatistics.SIZE_UNKNOWN) {
return stats; return stats;
} }


// disabling sampling for unsplittable files since the logic below // disabling sampling for unsplittable files since the logic below assumes splitability.
// assumes splitability. // TODO: Add sampling for unsplittable files. Right now, only compressed text files are affected by this limitation.
// TODO: Add sampling for unsplittable files. Right now, only
// compressed text files are affected by this limitation.
if (unsplittable) { if (unsplittable) {
return stats; return stats;
} }


// compute how many samples to take, depending on the defined upper // compute how many samples to take, depending on the defined upper and lower bound
// and lower bound
final int numSamples; final int numSamples;
if (this.numLineSamples != NUM_SAMPLES_UNDEFINED) { if (this.numLineSamples != NUM_SAMPLES_UNDEFINED) {
numSamples = this.numLineSamples; numSamples = this.numLineSamples;
Expand All @@ -385,24 +380,23 @@ public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOExc
final int calcSamples = (int) (stats.getTotalInputSize() / 1024); final int calcSamples = (int) (stats.getTotalInputSize() / 1024);
numSamples = Math.min(DEFAULT_MAX_NUM_SAMPLES, Math.max(DEFAULT_MIN_NUM_SAMPLES, calcSamples)); numSamples = Math.min(DEFAULT_MAX_NUM_SAMPLES, Math.max(DEFAULT_MIN_NUM_SAMPLES, calcSamples));
} }

// check if sampling is disabled. // check if sampling is disabled.
if (numSamples == 0) { if (numSamples == 0) {
return stats; return stats;
} }
if (numSamples < 0) { if (numSamples < 0) {
throw new RuntimeException("Error: Invalid number of samples: " + numSamples); throw new RuntimeException("Error: Invalid number of samples: " + numSamples);
} }

// make sure that the sampling times out after a while if the file
// system does not answer in time // make sure that the sampling times out after a while if the file system does not answer in time
this.openTimeout = 10000; this.openTimeout = 10000;
// set a small read buffer size // set a small read buffer size
this.bufferSize = 4 * 1024; this.bufferSize = 4 * 1024;
// prevent overly large records, for example if we have an // prevent overly large records, for example if we have an incorrectly configured delimiter
// incorrectly configured delimiter
this.lineLengthLimit = MAX_SAMPLE_LEN; this.lineLengthLimit = MAX_SAMPLE_LEN;

long offset = 0; long offset = 0;
long totalNumBytes = 0; long totalNumBytes = 0;
long stepSize = stats.getTotalInputSize() / numSamples; long stepSize = stats.getTotalInputSize() / numSamples;
Expand Down Expand Up @@ -436,28 +430,28 @@ public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOExc
fileNum++; fileNum++;
} }
} }

// we have the width, store it // we have the width, store it
return new FileBaseStatistics(stats.getLastModificationTime(), stats.getTotalInputSize(), return new FileBaseStatistics(stats.getLastModificationTime(),
totalNumBytes / (float) samplesTaken); stats.getTotalInputSize(), totalNumBytes / (float) samplesTaken);

} catch (IOException ioex) { } catch (IOException ioex) {
if (LOG.isWarnEnabled()) { if (LOG.isWarnEnabled()) {
LOG.warn("Could not determine statistics for file(s) in '" + this.filePathList LOG.warn("Could not determine statistics for files '" + Arrays.toString(getFilePaths()) + "' " +
+ "' due to an io error: " + ioex.getMessage()); "due to an io error: " + ioex.getMessage());
} }
} catch (Throwable t) { }
catch (Throwable t) {
if (LOG.isErrorEnabled()) { if (LOG.isErrorEnabled()) {
LOG.error("Unexpected problen while getting the file statistics for file(s) in'" + this.filePathList LOG.error("Unexpected problem while getting the file statistics for files '" + Arrays.toString(getFilePaths()) + "': "
+ "': " + t.getMessage(), t); + t.getMessage(), t);
} }
} finally { } finally {
// restore properties (even on return) // restore properties (even on return)
this.openTimeout = oldTimeout; this.openTimeout = oldTimeout;
this.bufferSize = oldBufferSize; this.bufferSize = oldBufferSize;
this.lineLengthLimit = oldLineLengthLimit; this.lineLengthLimit = oldLineLengthLimit;
} }



// no statistics possible // no statistics possible
return null; return null;
Expand Down

0 comments on commit 892eb85

Please sign in to comment.