Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/**
* Licensed to Big Data Genomics (BDG) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The BDG licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bdgenomics.adam.io;

import htsjdk.samtools.util.BlockCompressedInputStream;
import java.io.BufferedInputStream;
import java.io.InputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.SplittableCompressionCodec;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.seqdoop.hadoop_bam.util.BGZFEnhancedGzipCodec;

public abstract class FastqInputFormat extends FileInputFormat<Void, Text> {

protected boolean splittable;

static final String FILE_SPLITTABLE = "org.bdgenomics.adam.io.FastqInputFormat.fileIsSplittable";

/**
* Checks to see if the file we are looking at is splittable.
*
* A file is splittable if it is:
*
* - Uncompressed.
* - Compressed with the BGZFEnhancedGzipCodec _and_ the underlying stream is
* a BGZF stream. BGZFEnhancedGzipCodec looks for files with a .gz
* extension, which means that the codec may be selected if the file is a
* non-block GZIPed file, and thus is non-splittable. To validate this, we
* use HTSJDKs in-built mechanism for checking if a stream is a BGZF stream.
* - Any other splittable codec (e.g., .bgz/BGZFCodec, .bz2/BZip2Codec)
*
* @param context The job context to get the configuration from.
* @param filename The path the input file is saved at.
* @return Returns false if this file is compressed.
*/
@Override protected boolean isSplitable(final JobContext context,
final Path filename) {
Configuration conf = context.getConfiguration();
final CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(filename);
if (codec == null) {
splittable = true;
} else if (codec instanceof BGZFEnhancedGzipCodec) {
// BGZFEnhancedGzipCodec extends SplittableCompressionCodec, so this
// should be unnecessary. but...
//
// as documented in the method javadoc, BGZFEnhancedGzipCodec matches
// to files with a .gz extension, which can either be BGZF or GZIP
// so we must actually look at the stream to determine if the file
// is BGZF and splittable or GZIP and not-splittable

try {
InputStream is = filename.getFileSystem(conf).open(filename);

// htsjdk can only test a buffered input stream
// throws an exception if the stream is unbuffered
// why htsjdk doesn't make the function take a buffered input stream instead of
// an input stream, i do not know
splittable = BlockCompressedInputStream.isValidFile(new BufferedInputStream(is));
} catch (Exception e) {
splittable = false;
}
} else if (codec instanceof SplittableCompressionCodec) {
splittable = true;
} else {
splittable = false;
}

// the behavior of the record reader depends on whether the underlying
// stream is splittable. the only reliable way to pass this to the
// record reader is through the hadoop configuration.
conf.setBoolean(FILE_SPLITTABLE, splittable);

return splittable;
}
}
219 changes: 167 additions & 52 deletions adam-core/src/main/java/org/bdgenomics/adam/io/FastqRecordReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.SplittableCompressionCodec;
import org.apache.hadoop.io.compress.SplitCompressionInputStream;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
Expand Down Expand Up @@ -104,14 +106,35 @@ abstract class FastqRecordReader extends RecordReader<Void, Text> {
*/
private static final int MAX_LINE_LENGTH = 10000;

/**
* True if the underlying data is splittable.
*/
protected boolean isSplittable = false;

/**
* True if the underlying data is compressed.
*/
protected boolean isCompressed = false;

/**
* True if the last read was &lt; 0 bytes in size.
*/
private boolean lastReadWasZeroBytes = false;

/**
* True if we hit the end of the split in a compressed stream.
*/
private boolean endOfCompressedSplit = false;

/**
* Builds a new record reader given a config file and an input split.
*
* @param conf The Hadoop configuration object. Used for gaining access
* to the underlying file system.
* @param split The file split to read.
*/
protected FastqRecordReader(final Configuration conf, final FileSplit split) throws IOException {
protected FastqRecordReader(final Configuration conf,
final FileSplit split) throws IOException {
file = split.getPath();
start = split.getStart();
end = start + split.getLength();
Expand All @@ -122,18 +145,44 @@ protected FastqRecordReader(final Configuration conf, final FileSplit split) thr
CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf);
CompressionCodec codec = codecFactory.getCodec(file);

if (codec == null) { // no codec. Uncompressed file.
positionAtFirstRecord(fileIn);
// we expect this record reader to be used only with input formats that
// extend FastqInputFormat. the behavior of this record reader depends on
// whether the input stream is splittable or not. FastqInputFormat maintains
// a contract where it will put the file's splittable status into the hadoop
// configuration object.
isSplittable = conf.getBoolean(FastqInputFormat.FILE_SPLITTABLE, false);

if (codec == null) {
// no codec. Uncompressed file.
int bytesToSkip = positionAtFirstRecord(fileIn, null);
inputStream = fileIn;
} else {
// compressed file
if (start != 0) {
throw new RuntimeException("Start position for compressed file is not 0! (found " + start + ")");
}
inputStream.skip(bytesToSkip);
lineReader = new LineReader(inputStream);
} else if (isSplittable) {
// file is compressed, but uses a splittable codec
isCompressed = true;
int bytesToSkip = positionAtFirstRecord(fileIn, codec);

// apparent fun finding: if you don't seek back to 0,
// SplittableCompressionCodec.createInputStream will seek in the stream
// to a start position, and funny things happen..
fileIn.seek(0);
inputStream = ((SplittableCompressionCodec)codec).createInputStream(fileIn,
codec.createDecompressor(),
start,
end,
SplittableCompressionCodec.READ_MODE.BYBLOCK);

inputStream.skip(bytesToSkip);
lineReader = new ResettableCompressedSplitLineReader((SplitCompressionInputStream)inputStream, conf);
} else {
// unsplittable compressed file
// expect a single split, first record at offset 0
isCompressed = true;
inputStream = codec.createInputStream(fileIn);
end = Long.MAX_VALUE; // read until the end of the file
lineReader = new LineReader(inputStream);
}
lineReader = new LineReader(inputStream);
}

/**
Expand All @@ -152,55 +201,66 @@ protected FastqRecordReader(final Configuration conf, final FileSplit split) thr
*
* @param stream The stream to reposition.
*/
protected final void positionAtFirstRecord(final FSDataInputStream stream) throws IOException {
protected final int positionAtFirstRecord(final FSDataInputStream stream,
final CompressionCodec codec) throws IOException {
Text buffer = new Text();

if (true) { // (start > 0) // use start>0 to assume that files start with valid data
// Advance to the start of the first record that ends with /1
// We use a temporary LineReader to read lines until we find the
// position of the right one. We then seek the file to that position.
stream.seek(start);
LineReader reader = new LineReader(stream);

int bytesRead = 0;
do {
long originalStart = start;

// Advance to the start of the first record that ends with /1
// We use a temporary LineReader to read lines until we find the
// position of the right one. We then seek the file to that position.
stream.seek(start);
LineReader reader;
if (codec == null) {
reader = new LineReader(stream);
} else {
// see above note about
// SplittableCompressionCodec.createInputStream needing the stream
// to be at offset 0
stream.seek(0);
reader = new LineReader(((SplittableCompressionCodec)codec).createInputStream(stream,
codec.createDecompressor(),
start,
end,
SplittableCompressionCodec.READ_MODE.BYBLOCK));
}

int bytesRead = 0;
do {
bytesRead = reader.readLine(buffer, (int) Math.min(MAX_LINE_LENGTH, end - start));
int bufferLength = buffer.getLength();
if (bytesRead > 0 && !checkBuffer(bufferLength, buffer)) {
start += bytesRead;
} else {
// line starts with @. Read two more and verify that it starts with a +
//
// If this isn't the start of a record, we want to backtrack to its end
long backtrackPosition = start + bytesRead;

bytesRead = reader.readLine(buffer, (int) Math.min(MAX_LINE_LENGTH, end - start));
int bufferLength = buffer.getLength();
if (bytesRead > 0 && !checkBuffer(bufferLength, buffer)) {
start += bytesRead;
bytesRead = reader.readLine(buffer, (int) Math.min(MAX_LINE_LENGTH, end - start));
if (bytesRead > 0 && buffer.getLength() > 0 && buffer.getBytes()[0] == '+') {
break; // all good!
} else {
// line starts with @. Read two more and verify that it starts with a +
//
// If this isn't the start of a record, we want to backtrack to its end
long backtrackPosition = start + bytesRead;

bytesRead = reader.readLine(buffer, (int) Math.min(MAX_LINE_LENGTH, end - start));
bytesRead = reader.readLine(buffer, (int) Math.min(MAX_LINE_LENGTH, end - start));
if (bytesRead > 0 && buffer.getLength() > 0 && buffer.getBytes()[0] == '+') {
break; // all good!
} else {
// backtrack to the end of the record we thought was the start.
start = backtrackPosition;
stream.seek(start);
reader = new LineReader(stream);
}
// backtrack to the end of the record we thought was the start.
start = backtrackPosition;
stream.seek(start);
reader = new LineReader(stream);
}
} while (bytesRead > 0);
}
} while (bytesRead > 0);

stream.seek(start);
}
pos = start;
stream.seek(originalStart);
return (int)(start - originalStart);
}

/**
* Method is a no-op.
*
* @param split The input split that we will parse.
* @param context The Hadoop task context.
*/
public final void initialize(final InputSplit split, final TaskAttemptContext context)
throws IOException, InterruptedException {}

throws IOException, InterruptedException {
// this method does nothing but is required by
// org.apache.hadoop.mapreduce.RecordReader
}

/**
* FASTQ has no keys, so we return null.
*
Expand Down Expand Up @@ -277,6 +337,10 @@ protected final String makePositionMessage() {
protected final boolean lowLevelFastqRead(final Text readName, final Text value)
throws IOException {

if (endOfCompressedSplit) {
return false;
}

// ID line
readName.clear();
long skipped = appendLineInto(readName, true);
Expand Down Expand Up @@ -328,14 +392,65 @@ private int appendLineInto(final Text dest, final boolean eofOk) throws EOFExcep
Text buf = new Text();
int bytesRead = lineReader.readLine(buf, (int) Math.min(MAX_LINE_LENGTH, end - start));

if (bytesRead < 0 || (bytesRead == 0 && !eofOk)) {
// ok, so first, split/unsplit, compressed/uncompressed notwithstanding,
// there are three cases we can run into:
//
// 1. we read data
// 2. we are at an acceptable eof/end-of-split and don't read data
// 3. we are at an unacceptable eof/end-of-split and don't read data
//
// cases 1 and 2 are consistent across split/unsplit, compressed/uncompressed.
//
// case 3 is simple in the unsplit or uncompressed cases; something has
// gone wrong, we throw an EOFException, and move on with our lives
//
// case 3 is where working with split compressed files gets fun.
//
// with the split compression stream, the first time we read past the
// end of the last compression block within a file split, we get no
// bytes back. the BZip2Codec and BGZFCodec's actually tell us that
// we'll get -2 back in this case, but we'll cast a wider net yet.
//
// this is important information---if we don't know this, we'll keep reading
// past the end of the split to the end of the file---but we still need to
// finish reading our multiline record, so we set some state to let us know
// that we're reading the last record in the split (endOfCompressedSplit)
// and repeat the read. if the read fails again, then that means that
// something has actually gone wrong, and we want to fall through and
// throw an EOFException or return no bytes read (depending on eofOk).
// that's why we have the lastReadWasZeroBytes flag around. we set this
// to true on the first read that gets bytesRead <= 0, and clear it on
// any read that reads more than 0 bytes.
if (isSplittable &&
isCompressed &&
!lastReadWasZeroBytes &&
bytesRead <= 0 &&
!eofOk) {

// we need to clear the reader state so we can continue reading
((ResettableCompressedSplitLineReader)lineReader).reset();

// set the state to stop us from reading another record and
// to catch back-to-back failed reads
lastReadWasZeroBytes = true;
endOfCompressedSplit = true;

// recursively call to redo the read
return appendLineInto(dest, eofOk);
} else if (bytesRead < 0 || (bytesRead == 0 && !eofOk)) {
throw new EOFException();
} else {
lastReadWasZeroBytes = false;
}

dest.append(buf.getBytes(), 0, buf.getLength());
dest.append(newline, 0, 1);
pos += bytesRead;
if (isSplittable && isCompressed) {
pos = ((SplitCompressionInputStream)inputStream).getPos();
} else {
pos += bytesRead;
}

return bytesRead;
}
}
}
Loading