Permalink
Browse files

Actual working version of unflushing VCFWriter

-- Uses high-performance local writer backed by byte array that writes the entire VCF line in some write operation to the underlying output stream.
-- Fixes problems with indexing of unflushed writes while still allowing efficient block zipping
-- Same (or better) IO performance as previous implementation
-- IndexingVariantContextWriter now properly closes the underlying output stream when it's closed
-- Updated compressed VCF output file
  • Loading branch information...
1 parent 848ba43 commit 76aa948eef993613ae3091206d020551d826b243 Mark DePristo committed Dec 13, 2012
@@ -93,16 +93,19 @@ public String getStreamName() {
* attempt to close the VCF file
*/
public void close() {
- // try to close the index stream (keep it separate to help debugging efforts)
- if ( indexer != null ) {
- try {
+ try {
+ // try to close the index stream (keep it separate to help debugging efforts)
+ if ( indexer != null ) {
Index index = indexer.finalizeIndex(positionalOutputStream.getPosition());
IndexDictionaryUtils.setIndexSequenceDictionary(index, refDict);
index.write(idxStream);
idxStream.close();
- } catch (IOException e) {
- throw new ReviewedStingException("Unable to close index for " + getStreamName(), e);
}
+
+ // close the underlying output stream as well
+ outputStream.close();
+ } catch (IOException e) {
+ throw new ReviewedStingException("Unable to close index for " + getStreamName(), e);
}
}
@@ -34,6 +34,7 @@
import java.io.*;
import java.lang.reflect.Array;
+import java.nio.charset.Charset;
import java.util.*;
/**
@@ -42,9 +43,6 @@
class VCFWriter extends IndexingVariantContextWriter {
private final static String VERSION_LINE = VCFHeader.METADATA_INDICATOR + VCFHeaderVersion.VCF4_1.getFormatString() + "=" + VCFHeaderVersion.VCF4_1.getVersionString();
- // the print stream we're writing to
- final protected BufferedWriter mWriter;
-
// should we write genotypes or just sites?
final protected boolean doNotWriteGenotypes;
@@ -53,15 +51,33 @@
final private boolean allowMissingFieldsInHeader;
+ /**
+ * The VCF writer uses an internal Writer, based by the ByteArrayOutputStream lineBuffer,
+ * to temp. buffer the header and per-site output before flushing the per line output
+ * in one go to the super.getOutputStream. This results in high-performance, proper encoding,
+ * and allows us to avoid flushing explicitly the output stream getOutputStream, which
+ * allows us to properly compress vcfs in gz format without breaking indexing on the fly
+ * for uncompressed streams.
+ */
+ private static final int INITIAL_BUFFER_SIZE = 1024 * 16;
+ private final ByteArrayOutputStream lineBuffer = new ByteArrayOutputStream(INITIAL_BUFFER_SIZE);
+ private final Writer writer;
+
+ /**
+ * The encoding used for VCF files. ISO-8859-1
+ */
+ final private Charset charset;
+
private IntGenotypeFieldAccessors intGenotypeFieldAccessors = new IntGenotypeFieldAccessors();
public VCFWriter(final File location, final OutputStream output, final SAMSequenceDictionary refDict,
final boolean enableOnTheFlyIndexing, boolean doNotWriteGenotypes,
final boolean allowMissingFieldsInHeader ) {
super(writerName(location, output), location, output, refDict, enableOnTheFlyIndexing);
- mWriter = new BufferedWriter(new OutputStreamWriter(getOutputStream())); // todo -- fix buffer size
this.doNotWriteGenotypes = doNotWriteGenotypes;
this.allowMissingFieldsInHeader = allowMissingFieldsInHeader;
+ this.charset = Charset.forName("ISO-8859-1");
+ this.writer = new OutputStreamWriter(lineBuffer, charset);
}
// --------------------------------------------------------------------------------
@@ -70,14 +86,44 @@ public VCFWriter(final File location, final OutputStream output, final SAMSequen
//
// --------------------------------------------------------------------------------
+ /**
+ * Write String s to the internal buffered writer.
+ *
+ * flushBuffer() must be called to actually write the data to the true output stream.
+ *
+ * @param s the string to write
+ * @throws IOException
+ */
+ private void write(final String s) throws IOException {
+ writer.write(s);
+ }
+
+ /**
+ * Actually write the line buffer contents to the destination output stream.
+ *
+ * After calling this function the line buffer is reset, so the contents of the buffer can be reused
+ *
+ * @throws IOException
+ */
+ private void flushBuffer() throws IOException {
+ writer.flush();
+ getOutputStream().write(lineBuffer.toByteArray());
+ lineBuffer.reset();
+ }
+
@Override
public void writeHeader(VCFHeader header) {
// note we need to update the mHeader object after this call because they header
// may have genotypes trimmed out of it, if doNotWriteGenotypes is true
- mHeader = writeHeader(header, mWriter, doNotWriteGenotypes, getVersionLine(), getStreamName());
+ try {
+ mHeader = writeHeader(header, writer, doNotWriteGenotypes, getVersionLine(), getStreamName());
+ flushBuffer();
+ } catch ( IOException e ) {
+ throw new UserException.CouldNotCreateOutputFile(getStreamName(), e);
+ }
}
- public static final String getVersionLine() {
+ public static String getVersionLine() {
return VERSION_LINE;
}
@@ -138,8 +184,8 @@ public static VCFHeader writeHeader(VCFHeader header,
public void close() {
// try to close the vcf stream
try {
- mWriter.flush();
- mWriter.close();
+ // TODO -- would it be useful to null out the line buffer so we don't have it around unnecessarily?
+ writer.close();
} catch (IOException e) {
throw new ReviewedStingException("Unable to close " + getStreamName(), e);
}
@@ -166,51 +212,51 @@ public void add(VariantContext vc) {
Map<Allele, String> alleleMap = buildAlleleMap(vc);
// CHROM
- mWriter.write(vc.getChr());
- mWriter.write(VCFConstants.FIELD_SEPARATOR);
+ write(vc.getChr());
+ write(VCFConstants.FIELD_SEPARATOR);
// POS
- mWriter.write(String.valueOf(vc.getStart()));
- mWriter.write(VCFConstants.FIELD_SEPARATOR);
+ write(String.valueOf(vc.getStart()));
+ write(VCFConstants.FIELD_SEPARATOR);
// ID
String ID = vc.getID();
- mWriter.write(ID);
- mWriter.write(VCFConstants.FIELD_SEPARATOR);
+ write(ID);
+ write(VCFConstants.FIELD_SEPARATOR);
// REF
String refString = vc.getReference().getDisplayString();
- mWriter.write(refString);
- mWriter.write(VCFConstants.FIELD_SEPARATOR);
+ write(refString);
+ write(VCFConstants.FIELD_SEPARATOR);
// ALT
if ( vc.isVariant() ) {
Allele altAllele = vc.getAlternateAllele(0);
String alt = altAllele.getDisplayString();
- mWriter.write(alt);
+ write(alt);
for (int i = 1; i < vc.getAlternateAlleles().size(); i++) {
altAllele = vc.getAlternateAllele(i);
alt = altAllele.getDisplayString();
- mWriter.write(",");
- mWriter.write(alt);
+ write(",");
+ write(alt);
}
} else {
- mWriter.write(VCFConstants.EMPTY_ALTERNATE_ALLELE_FIELD);
+ write(VCFConstants.EMPTY_ALTERNATE_ALLELE_FIELD);
}
- mWriter.write(VCFConstants.FIELD_SEPARATOR);
+ write(VCFConstants.FIELD_SEPARATOR);
// QUAL
if ( !vc.hasLog10PError() )
- mWriter.write(VCFConstants.MISSING_VALUE_v4);
+ write(VCFConstants.MISSING_VALUE_v4);
else
- mWriter.write(formatQualValue(vc.getPhredScaledQual()));
- mWriter.write(VCFConstants.FIELD_SEPARATOR);
+ write(formatQualValue(vc.getPhredScaledQual()));
+ write(VCFConstants.FIELD_SEPARATOR);
// FILTER
String filters = getFilterString(vc);
- mWriter.write(filters);
- mWriter.write(VCFConstants.FIELD_SEPARATOR);
+ write(filters);
+ write(VCFConstants.FIELD_SEPARATOR);
// INFO
Map<String, String> infoFields = new TreeMap<String, String>();
@@ -229,8 +275,8 @@ public void add(VariantContext vc) {
// FORMAT
final GenotypesContext gc = vc.getGenotypes();
if ( gc.isLazyWithData() && ((LazyGenotypesContext)gc).getUnparsedGenotypeData() instanceof String ) {
- mWriter.write(VCFConstants.FIELD_SEPARATOR);
- mWriter.write(((LazyGenotypesContext)gc).getUnparsedGenotypeData().toString());
+ write(VCFConstants.FIELD_SEPARATOR);
+ write(((LazyGenotypesContext) gc).getUnparsedGenotypeData().toString());
} else {
List<String> genotypeAttributeKeys = calcVCFGenotypeKeys(vc, mHeader);
if ( ! genotypeAttributeKeys.isEmpty() ) {
@@ -240,16 +286,17 @@ public void add(VariantContext vc) {
final String genotypeFormatString = ParsingUtils.join(VCFConstants.GENOTYPE_FIELD_SEPARATOR, genotypeAttributeKeys);
- mWriter.write(VCFConstants.FIELD_SEPARATOR);
- mWriter.write(genotypeFormatString);
+ write(VCFConstants.FIELD_SEPARATOR);
+ write(genotypeFormatString);
addGenotypeData(vc, alleleMap, genotypeAttributeKeys);
}
}
- mWriter.write("\n");
+ write("\n");
// note that we cannot call flush here if we want block gzipping to work properly
// calling flush results in all gzipped blocks for each variant
+ flushBuffer();
} catch (IOException e) {
throw new RuntimeException("Unable to write the VCF object to " + getStreamName(), e);
}
@@ -305,7 +352,7 @@ private String formatQualValue(double qual) {
*/
private void writeInfoString(Map<String, String> infoFields) throws IOException {
if ( infoFields.isEmpty() ) {
- mWriter.write(VCFConstants.EMPTY_INFO_FIELD);
+ write(VCFConstants.EMPTY_INFO_FIELD);
return;
}
@@ -314,16 +361,16 @@ private void writeInfoString(Map<String, String> infoFields) throws IOException
if ( isFirst )
isFirst = false;
else
- mWriter.write(VCFConstants.INFO_FIELD_SEPARATOR);
+ write(VCFConstants.INFO_FIELD_SEPARATOR);
String key = entry.getKey();
- mWriter.write(key);
+ write(key);
if ( !entry.getValue().equals("") ) {
VCFInfoHeaderLine metaData = mHeader.getInfoHeaderLine(key);
if ( metaData == null || metaData.getCountType() != VCFHeaderLineCount.INTEGER || metaData.getCount() != 0 ) {
- mWriter.write("=");
- mWriter.write(entry.getValue());
+ write("=");
+ write(entry.getValue());
}
}
}
@@ -342,7 +389,7 @@ private void addGenotypeData(VariantContext vc, Map<Allele, String> alleleMap, L
final int ploidy = vc.getMaxPloidy(2);
for ( String sample : mHeader.getGenotypeSamples() ) {
- mWriter.write(VCFConstants.FIELD_SEPARATOR);
+ write(VCFConstants.FIELD_SEPARATOR);
Genotype g = vc.getGenotype(sample);
if ( g == null ) g = GenotypeBuilder.createMissing(sample, ploidy);
@@ -356,7 +403,7 @@ private void addGenotypeData(VariantContext vc, Map<Allele, String> alleleMap, L
writeAllele(g.getAllele(0), alleleMap);
for (int i = 1; i < g.getPloidy(); i++) {
- mWriter.write(g.isPhased() ? VCFConstants.PHASED : VCFConstants.UNPHASED);
+ write(g.isPhased() ? VCFConstants.PHASED : VCFConstants.UNPHASED);
writeAllele(g.getAllele(i), alleleMap);
}
@@ -420,8 +467,8 @@ else if ( intValues.length == 1 ) // fast path
for (int i = 0; i < attrs.size(); i++) {
if ( i > 0 || genotypeFormatKeys.contains(VCFConstants.GENOTYPE_KEY) )
- mWriter.write(VCFConstants.GENOTYPE_FIELD_SEPARATOR);
- mWriter.write(attrs.get(i));
+ write(VCFConstants.GENOTYPE_FIELD_SEPARATOR);
+ write(attrs.get(i));
}
}
}
@@ -435,7 +482,7 @@ private void writeAllele(Allele allele, Map<Allele, String> alleleMap) throws IO
String encoding = alleleMap.get(allele);
if ( encoding == null )
throw new TribbleException.InternalCodecException("Allele " + allele + " is not an allele in the variant context");
- mWriter.write(encoding);
+ write(encoding);
}
/**

0 comments on commit 76aa948

Please sign in to comment.