Skip to content

Commit

Permalink
Initial Spark dataflow commit, basic skeleton with some tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
davidadamsphd committed Aug 21, 2015
1 parent 921e730 commit 2310062
Show file tree
Hide file tree
Showing 26 changed files with 1,295 additions and 26 deletions.
16 changes: 10 additions & 6 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -125,18 +125,22 @@ dependencies {
}
compile('org.apache.hadoop:hadoop-client:2.2.0') // should be a 'provided' dependency

//needed for DataflowAssert
testCompile 'org.hamcrest:hamcrest-all:1.3'
testCompile 'junit:junit:4.12'
testCompile 'org.apache.hadoop:hadoop-minicluster:2.7.1'
testCompile "org.mockito:mockito-core:1.10.19"
testCompile('org.apache.spark:spark-core_2.10:1.4.1') {
compile('org.apache.spark:spark-core_2.10:1.4.1') {
// JUL is used by Google Dataflow as the backend logger, so exclude jul-to-slf4j to avoid a loop
exclude module: 'jul-to-slf4j'
exclude module: 'javax.servlet'
exclude module: 'servlet-api'
exclude module: 'kryo'
}

compile 'com.esotericsoftware.kryo:kryo:2.24.0'
compile 'de.javakaffee:kryo-serializers:0.26'

//needed for DataflowAssert
testCompile 'org.hamcrest:hamcrest-all:1.3'
testCompile 'junit:junit:4.12'
testCompile 'org.apache.hadoop:hadoop-minicluster:2.7.1'
testCompile "org.mockito:mockito-core:1.10.19"
}

sourceCompatibility = 1.8
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.google.cloud.genomics.utils.Contig;
import com.google.cloud.genomics.utils.GenomicsFactory;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import htsjdk.samtools.SAMFileHeader;
import htsjdk.samtools.SamReader;
import htsjdk.samtools.SamReaderFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.google.api.services.genomics.model.SearchReferencesRequest;
import com.google.api.services.genomics.model.SearchReferencesResponse;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.genomics.dataflow.utils.GCSOptions;
import com.google.cloud.genomics.dataflow.utils.GenomicsOptions;
import com.google.cloud.genomics.utils.GenomicsFactory;
Expand Down Expand Up @@ -64,11 +65,20 @@ public class RefAPISource implements ReferenceSource, Serializable {

private Map<String, Reference> referenceMap;
private Map<String, String> referenceNameToIdTable;
private String apiKey;

public RefAPISource(final PipelineOptions pipelineOptions, final String referenceURL) {
String referenceName = getReferenceSetID(referenceURL);
this.referenceMap = getReferenceNameToReferenceTable(pipelineOptions, referenceName);
this.referenceNameToIdTable = getReferenceNameToIdTableFromMap(referenceMap);

// For Spark, we keep around the apiKey from the PipelineOptions since we don't have
// a "context" with PipelineOptions available on each worker.
// If we go with Spark, we'll end up refactoring this to store all of the secrets
// directly in this class and get rid of PipelineOptions as an argument to
// getReferenceBases.
Utils.nonNull(pipelineOptions);
this.apiKey = pipelineOptions.as(GCSOptions.class).getApiKey();
}

@VisibleForTesting
Expand Down Expand Up @@ -99,11 +109,17 @@ public static String getReferenceSetID(String url) {
*/
@Override
public ReferenceBases getReferenceBases(final PipelineOptions pipelineOptions, final SimpleInterval interval) {
Utils.nonNull(pipelineOptions);
Utils.nonNull(interval);

if (genomicsService == null) {
genomicsService = createGenomicsService(pipelineOptions);
if (pipelineOptions == null) {
// Fall back on the saved apiKey for Spark.
GCSOptions options = PipelineOptionsFactory.as(GCSOptions.class);
options.setApiKey(apiKey);
genomicsService = createGenomicsService(options);
} else {
genomicsService = createGenomicsService(pipelineOptions);
}
}
if ( !referenceNameToIdTable.containsKey(interval.getContig()) ) {
throw new UserException("Contig " + interval.getContig() + " not in our set of reference names for this reference source");
Expand Down Expand Up @@ -277,6 +293,8 @@ private Genomics createGenomicsService(final PipelineOptions pipelineOptions) {
}
}

// TODO: Move these to a CustomCoder. That will allow us to do something else (possibly better) for Spark.
// TODO: See Issue #849.
// implement methods for Java serialization, since Reference does not implement Serializable
private void writeObject(ObjectOutputStream stream) throws IOException {
JsonFactory jsonFactory = com.google.api.client.googleapis.util.Utils.getDefaultJsonFactory();
Expand All @@ -286,6 +304,7 @@ private void writeObject(ObjectOutputStream stream) throws IOException {
stream.writeUTF(jsonFactory.toString(e.getValue()));
}
stream.writeObject(referenceNameToIdTable);
stream.writeObject(apiKey);
}
private void readObject(ObjectInputStream stream) throws IOException, ClassNotFoundException {
JsonFactory jsonFactory = com.google.api.client.googleapis.util.Utils.getDefaultJsonFactory();
Expand All @@ -296,7 +315,9 @@ private void readObject(ObjectInputStream stream) throws IOException, ClassNotFo
}
@SuppressWarnings("unchecked")
final Map<String, String> refTable = (Map<String, String>) stream.readObject();
final String apiKey = (String) stream.readObject();
this.referenceMap = refs;
this.referenceNameToIdTable = refTable;
this.apiKey = apiKey;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package org.broadinstitute.hellbender.engine.spark;

import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.broadinstitute.hellbender.engine.dataflow.datasources.ReadContextData;
import org.broadinstitute.hellbender.engine.dataflow.datasources.ReferenceDataflowSource;
import org.broadinstitute.hellbender.utils.read.GATKRead;
import org.broadinstitute.hellbender.utils.reference.ReferenceBases;
import org.broadinstitute.hellbender.utils.variant.Variant;
import scala.Tuple2;

import java.util.List;
import java.util.Map;

/**
* AddContextDataToRead pairs reference bases and overlapping variants with each GATKRead in the PCollection input.
* The variants are obtained from a local file (later a GCS Bucket). The reference bases come from the Google Genomics API.
*
* This transform is intended for direct use in pipelines.
*
* The reference bases paired with each read can be customized by passing in a reference window function
* inside the {@link ReferenceDataflowSource} argument to {@link #add}. See
* {@link org.broadinstitute.hellbender.engine.dataflow.datasources.RefWindowFunctions} for examples.
*/
public class AddContextDataToReadSpark {
public static JavaPairRDD<GATKRead, ReadContextData> add(
final JavaRDD<GATKRead> reads, final ReferenceDataflowSource referenceDataflowSource,
final JavaRDD<Variant> variants) {
// Join Reads and Variants, Reads and ReferenceBases
JavaPairRDD<GATKRead, Iterable<Variant>> readiVariants = JoinReadsWithVariants.join(reads, variants);
JavaPairRDD<GATKRead, ReferenceBases> readRefBases = JoinReadsWithRefBases.addBases(referenceDataflowSource, reads);

// For testing we want to know that the reads from the KVs coming back from JoinReadsWithVariants.Join
// and JoinReadsWithRefBases.Pair are the same reads from "reads".
boolean assertsEnabled = false;
assert assertsEnabled = true; // Intentional side-effect!!!
// Now assertsEnabled is set to the correct value
if (assertsEnabled) {
assertSameReads(reads, readRefBases, readiVariants);
}

JavaPairRDD<GATKRead, Tuple2<Iterable<Iterable<Variant>>, Iterable<ReferenceBases>>> cogroup = readiVariants.cogroup(readRefBases);
return cogroup.mapToPair(in -> {
List<Iterable<Variant>> liVariants = Lists.newArrayList(in._2()._1());
List<Variant> lVariants = Lists.newArrayList();
if (!liVariants.isEmpty()) {
final Iterable<Variant> iVariant = Iterables.getOnlyElement(in._2()._1());
// It's possible for the iVariant to contain only a null variant, we don't
// want to add that to the ReadContextData.
final Variant next = iVariant.iterator().next();
if (next != null) {
lVariants = Lists.newArrayList(iVariant);
}
}

ReferenceBases refBases = Iterables.getOnlyElement(in._2()._2());
ReadContextData readContextData = new ReadContextData(refBases, lVariants);
return new Tuple2<>(in._1(), readContextData);
});
}

private static void assertSameReads(final JavaRDD<GATKRead> reads,
final JavaPairRDD<GATKRead, ReferenceBases> readRefBases,
final JavaPairRDD<GATKRead, Iterable<Variant>> readiVariants) {
List<GATKRead> vReads = reads.collect();
Map<GATKRead, ReferenceBases> vReadRef = readRefBases.collectAsMap();
Map<GATKRead, Iterable<Variant>> vReadVariant = readiVariants.collectAsMap();

// This assumes all rdds doesn't have any duplicates.
JavaRDD<GATKRead> refBasesReads = readRefBases.keys();
JavaRDD<GATKRead> variantsReads = readiVariants.keys();
JavaRDD<GATKRead> distinctReads = reads.intersection(refBasesReads).intersection(variantsReads);

long counts = reads.count();
assert counts == distinctReads.count();
assert counts == refBasesReads.count();
assert counts == variantsReads.count();

assert vReadRef.size() == vReads.size();
assert vReadVariant.size() == vReads.size();
}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package org.broadinstitute.hellbender.engine.spark;

import com.esotericsoftware.kryo.Kryo;
import com.google.api.services.genomics.model.Read;
import de.javakaffee.kryoserializers.UnmodifiableCollectionsSerializer;
import org.apache.spark.serializer.KryoRegistrator;

import java.util.Collections;

/**
* GATKRegistrator registers Serializers for our project. We need a JsonSerializer for the Google Genomics classes
* and UnmodifiableCollectionsSerializer from a bug in the version of Kryo we're on.
*/
public class GATKRegistrator implements KryoRegistrator {
@SuppressWarnings("unchecked")
@Override
public void registerClasses(Kryo kryo) {

// JsonSerializer is needed for the Google Genomics classes like Read and Reference.
kryo.register(Read.class, new JsonSerializer<Read>());
// htsjdk.variant.variantcontext.CommonInfo has a Map<String, Object> that defaults to
// a Collections.unmodifiableMap. This can't be handled by the version of kryo used in Spark, it's fixed
// in newer versions, but we can't use those because of incompatibility with Spark. We just include the fix
// here.
kryo.register(Collections.unmodifiableMap(Collections.EMPTY_MAP).getClass(), new UnmodifiableCollectionsSerializer());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package org.broadinstitute.hellbender.engine.spark;

import com.google.common.collect.Lists;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.broadinstitute.hellbender.engine.dataflow.datasources.ReferenceDataflowSource;
import org.broadinstitute.hellbender.engine.dataflow.datasources.ReferenceShard;
import org.broadinstitute.hellbender.utils.SimpleInterval;
import org.broadinstitute.hellbender.utils.read.GATKRead;
import org.broadinstitute.hellbender.utils.reference.ReferenceBases;
import scala.Tuple2;

import java.util.List;

/**
* RefBasesForReads queries the Google Genomics API for reference bases overlapping all of the reads.
*
* step 1: key reads by reference shards
*
* |--------- shard 0 ----------|---------- shard 1 ----------|--------- shard 2 ----------|--------- shard 3 ----------|
* |------ read a -----| |-- read b --| |---- read c ----|
*
* step 2: group reads by the shard they start in
*
* |--------- shard 0 ----------|
* |------ read a -----|
*
*
* |--------- shard 2 ----------|
* |-- read b --| |---- read c ----|
*
* step 3: query the Google Genomics API for all bases needed for each shard
*
* |--- ref bases 1 ---| |--------- ref bases 2 -----------|
* |------ read a -----| |-- read b --| |---- read c ----|
*
* step 4: pair the ref bases needed for each read with the read
*
* |------ read a -----| |-- read b --| |---- read c ----|
* |-- ref bases 1a ---| |ref bases 2b| |- ref bases 2c -|
*
* or in code,
* KV<read a, ref bases 1a>
* KV<read b, ref bases 2b>
* KV<read c, ref bases 2c>
*
* The reference bases paired with each read can be customized by passing in a reference window function
* inside the {@link ReferenceDataflowSource} argument to {@link #addBases}. See {@link org.broadinstitute.hellbender.engine.dataflow.datasources.RefWindowFunctions} for examples.
*/
public class JoinReadsWithRefBases {
public static JavaPairRDD<GATKRead, ReferenceBases> addBases(final ReferenceDataflowSource referenceDataflowSource,
final JavaRDD<GATKRead> reads) {

JavaPairRDD<ReferenceShard, GATKRead> shardRead = reads.mapToPair(gatkRead -> {
ReferenceShard shard = ReferenceShard.getShardNumberFromInterval(gatkRead);
return new Tuple2<>(shard, gatkRead);
});

JavaPairRDD<ReferenceShard, Iterable<GATKRead>> shardiRead = shardRead.groupByKey();

return shardiRead.flatMapToPair(in -> {
List<Tuple2<GATKRead, ReferenceBases>> out = Lists.newArrayList();
Iterable<GATKRead> iReads = in._2();
SimpleInterval interval = SimpleInterval.getSpanningInterval(iReads);
ReferenceBases bases = referenceDataflowSource.getReferenceBases(null, interval);
for (GATKRead r : iReads) {
final ReferenceBases subset = bases.getSubset(new SimpleInterval(r));
out.add(new Tuple2<>(r, subset));
}
return out;
});
}
}

0 comments on commit 2310062

Please sign in to comment.