Skip to content

Commit

Permalink
Add a Spark Kryo serializer for SAMRecords.
Browse files Browse the repository at this point in the history
This improves the serialized size considerably, which has a very big effect
on the amount of data going through the shuffle.
  • Loading branch information
tomwhite committed Sep 15, 2015
1 parent 1c89045 commit 112b376
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.google.api.services.genomics.model.Read;
import de.javakaffee.kryoserializers.UnmodifiableCollectionsSerializer;
import org.apache.spark.serializer.KryoRegistrator;
import org.broadinstitute.hellbender.utils.read.SAMRecordToGATKReadAdapter;

import java.util.Collections;

Expand All @@ -24,5 +25,7 @@ public void registerClasses(Kryo kryo) {
// fix here.
// We are tracking this issue with (#874)
kryo.register(Collections.unmodifiableMap(Collections.EMPTY_MAP).getClass(), new UnmodifiableCollectionsSerializer());

kryo.register(SAMRecordToGATKReadAdapter.class, new SAMRecordToGATKReadAdapterSerializer());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.broadinstitute.hellbender.engine.spark;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import htsjdk.samtools.BAMRecordCodec;
import htsjdk.samtools.DefaultSAMRecordFactory;
import org.broadinstitute.hellbender.utils.read.SAMRecordToGATKReadAdapter;
import org.seqdoop.hadoop_bam.LazyBAMRecordFactory;

public class SAMRecordToGATKReadAdapterSerializer extends Serializer<SAMRecordToGATKReadAdapter> {

private BAMRecordCodec lazyCodec = new BAMRecordCodec(null, new LazyBAMRecordFactory());

@Override
public void write(Kryo kryo, Output output, SAMRecordToGATKReadAdapter adapter) {
final BAMRecordCodec codec = new BAMRecordCodec(adapter.getSamRecord().getHeader(),
DefaultSAMRecordFactory.getInstance());
codec.setOutputStream(output);
codec.encode(adapter.getSamRecord());
}

@Override
public SAMRecordToGATKReadAdapter read(Kryo kryo, Input input, Class<SAMRecordToGATKReadAdapter> type) {
lazyCodec.setInputStream(input);
return (SAMRecordToGATKReadAdapter) SAMRecordToGATKReadAdapter.sparkReadAdapter(lazyCodec.decode());
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package org.broadinstitute.hellbender.engine.spark.datasources;

import com.google.api.services.genomics.model.Read;
import com.google.cloud.genomics.utils.ReadUtils;
import htsjdk.samtools.SAMException;
import htsjdk.samtools.SAMFileHeader;
import htsjdk.samtools.SAMRecord;
Expand All @@ -15,7 +13,7 @@
import org.broadinstitute.hellbender.utils.IntervalUtils;
import org.broadinstitute.hellbender.utils.SimpleInterval;
import org.broadinstitute.hellbender.utils.read.GATKRead;
import org.broadinstitute.hellbender.utils.read.GoogleGenomicsReadToGATKReadAdapter;
import org.broadinstitute.hellbender.utils.read.SAMRecordToGATKReadAdapter;
import org.seqdoop.hadoop_bam.AnySAMInputFormat;
import org.seqdoop.hadoop_bam.SAMRecordWritable;
import org.seqdoop.hadoop_bam.util.SAMHeaderReader;
Expand All @@ -42,7 +40,9 @@ public ReadsSparkSource(JavaSparkContext ctx) {
*/
public JavaRDD<GATKRead> getParallelReads(final String bam, final List<SimpleInterval> intervals) {
Configuration conf = new Configuration();
conf.set("mapred.max.split.size", "2097152");
// reads take more space in memory than on disk so we need to limit the split size
// TODO: make this configurable, or tune automatically
conf.set("mapred.max.split.size", "20971520");

JavaPairRDD<LongWritable, SAMRecordWritable> rdd2 = ctx.newAPIHadoopFile(
bam, AnySAMInputFormat.class, LongWritable.class, SAMRecordWritable.class,
Expand All @@ -52,14 +52,9 @@ public JavaRDD<GATKRead> getParallelReads(final String bam, final List<SimpleInt
SAMRecord sam = v1._2().get();
if (samRecordOverlaps(sam, intervals)) {
try {
// TODO: Try using the SAMRecord without the header (#875)
Read read = ReadUtils.makeRead(sam);
if (read == null) {
throw new GATKException("null read, initial sam: " + sam);
}
return GoogleGenomicsReadToGATKReadAdapter.sparkReadAdapter(read);
return SAMRecordToGATKReadAdapter.sparkReadAdapter(sam);
} catch (SAMException e) {
// Do nothing.
// TODO: add stringency
}
}
return null;
Expand Down Expand Up @@ -103,7 +98,12 @@ private static boolean samRecordOverlaps(final SAMRecord record, final List<Simp
return true;
}
for (SimpleInterval interval : intervals) {
if (interval.overlaps(record)) {
if (record.getReadUnmappedFlag() && record.getAlignmentStart() != SAMRecord.NO_ALIGNMENT_START) {
// This follows the behavior of htsjdk's SamReader which states that "an unmapped read will be returned
// by this call if it has a coordinate for the purpose of sorting that is in the query region".
int start = record.getAlignmentStart();
return interval.getStart() <= start && interval.getEnd() >= start;
} else if (interval.overlaps(record)) {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@ public SAMRecordToGATKReadAdapter( final SAMRecord samRecord ) {
this(samRecord, UUID.randomUUID());
}

/**
* Produces a SAMRecordToGATKReadAdapter with a 0L,0L UUID. Spark doesn't need the UUIDs
* and loading the reads twice (which can happen when caching is missing) prevents joining.
* @param samRecord Read to adapt
* @return adapted Read
*/
public static GATKRead sparkReadAdapter(final SAMRecord samRecord) {
return new SAMRecordToGATKReadAdapter(samRecord, new UUID(0L, 0L));
}

/**
* Constructor that allows an explicit UUID to be passed in -- only meant
* for internal use and test class use, which is why it's package protected.
Expand Down Expand Up @@ -498,6 +508,10 @@ public SAMRecord convertToSAMRecord( final SAMFileHeader header ) {
return samRecord;
}

public SAMRecord getSamRecord() {
return samRecord;
}

@Override
public Read convertToGoogleGenomicsRead() {
// TODO: this converter is imperfect/lossy and should either be patched or replaced
Expand Down

0 comments on commit 112b376

Please sign in to comment.