Skip to content

Commit

Permalink
Support reading VCF files from Hadoop.
Browse files Browse the repository at this point in the history
  • Loading branch information
tomwhite committed Aug 24, 2015
1 parent e5ec90e commit 73fc6a7
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 7 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ dependencies {
//compile 'com.google.appengine.tools:appengine-gcs-client:0.4.4'
compile 'org.jgrapht:jgrapht-core:0.9.1'
compile 'org.testng:testng:6.9.6' //compile instead of testCompile because it is needed for test infrastructure that needs to be packaged
compile 'com.cloudera.dataflow.spark:spark-dataflow:0.4.0'
compile 'com.cloudera.dataflow.spark:spark-dataflow:0.4.0.1'
compile('org.seqdoop:hadoop-bam:7.1.0') {
exclude group: 'org.apache.hadoop'
exclude module: 'htsjdk'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,24 @@ public class VariantsDataflowSource {
private final static Logger logger = LogManager.getLogger(VariantsDataflowSource.class);
private final List<String> variantSources;
private final Pipeline pipeline;
private final boolean hadoopUrl;

/**
* VariantsDataflowSource sets up source using local files (or eventually GCS buckets).
* @param variantFiles, list of files (or eventually buckets) to read from
* @param pipeline, options to get credentials to access GCS buckets.
*/
public VariantsDataflowSource(final List<String> variantFiles, final Pipeline pipeline) {
boolean hadoop = false;
for (final String variantSource : variantFiles) {
if (BucketUtils.isCloudStorageUrl(variantSource)) {
// This problem is tracked with issue 632.
throw new UnsupportedOperationException("Cloud storage URIs not supported");
} else if (BucketUtils.isHadoopUrl(variantSource)) {
hadoop = true;
}
}
this.hadoopUrl = hadoop;

this.variantSources = variantFiles;
this.pipeline = pipeline;
Expand All @@ -54,12 +59,16 @@ public VariantsDataflowSource(final List<String> variantFiles, final Pipeline pi
* @return a PCollection of variants found in the file.
*/
public PCollection<Variant> getAllVariants() {
final List<Variant> aggregatedResults = getVariantsList(variantSources);
if (aggregatedResults.isEmpty()) {
// empty list of interval type is something that Dataflow isn't happy with.
logger.warn("Warning: variant source is empty, you may see a coder failure.");
if (hadoopUrl) {
return VariantsHadoopSource.getAllVariants(variantSources, pipeline);
} else {
final List<Variant> aggregatedResults = getVariantsList(variantSources);
if (aggregatedResults.isEmpty()) {
// empty list of interval type is something that Dataflow isn't happy with.
logger.warn("Warning: variant source is empty, you may see a coder failure.");
}
return pipeline.apply(Create.of(aggregatedResults)).setName("creatingVariants");
}
return pipeline.apply(Create.of(aggregatedResults)).setName("creatingVariants");
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package org.broadinstitute.hellbender.engine.dataflow.datasources;

import com.cloudera.dataflow.hadoop.HadoopIO;
import com.cloudera.dataflow.hadoop.WritableCoder;
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.coders.KvCoder;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.common.base.Joiner;
import htsjdk.variant.variantcontext.VariantContext;
import org.apache.hadoop.io.LongWritable;
import org.broadinstitute.hellbender.utils.variant.Variant;
import org.broadinstitute.hellbender.utils.variant.VariantContextVariantAdapter;
import org.seqdoop.hadoop_bam.VCFInputFormat;
import org.seqdoop.hadoop_bam.VariantContextWritable;

import java.io.Serializable;
import java.util.List;

/**
* Loads variants into PCollections using a Hadoop filesystem.
*/
public final class VariantsHadoopSource implements Serializable {
private static final long serialVersionUID = 1L;

/**
* Reads variants from the specified files.
* @param commaSeparatedPaths one or more Hadoop filesystem paths, separated by commas
* @param pipeline the pipeline
* @return a PCollection of variants found in the file.
*/
public static PCollection<Variant> getAllVariants(String commaSeparatedPaths, Pipeline pipeline) {
PCollection<KV<LongWritable, VariantContextWritable>> input = pipeline.apply(
HadoopIO.Read.from(commaSeparatedPaths, VCFInputFormat.class, LongWritable.class, VariantContextWritable.class));
input.setCoder(KvCoder.of(WritableCoder.of(LongWritable.class), WritableCoder.of(VariantContextWritable.class)));
return input.apply(ParDo.of(new ConvertToVariant()));
}

/**
* Reads variants from the specified files.
* @param paths one or more Hadoop filesystem paths
* @param pipeline the pipeline
* @return a PCollection of variants found in the file.
*/
public static PCollection<Variant> getAllVariants(List<String> paths, Pipeline pipeline) {
return getAllVariants(Joiner.on(",").join(paths), pipeline);
}

static class ConvertToVariant extends DoFn<KV<LongWritable, VariantContextWritable>, Variant> {
private static final long serialVersionUID = 1L;
@Override
public void processElement(ProcessContext c) throws Exception {
VariantContext variantContext = c.element().getValue().get();
c.output(new VariantContextVariantAdapter(variantContext));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import htsjdk.samtools.util.Locatable;

import java.io.Serializable;
import java.util.UUID;

/**
Expand All @@ -10,7 +11,7 @@
* NOTE: getStart() and getEnd() are 1-base inclusive (which matches the current GATK tools).
* This does not match the GA4GH spec.
*/
public interface Variant extends Locatable {
public interface Variant extends Locatable, Serializable {
boolean isSnp();
boolean isIndel();
UUID getUUID();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.broadinstitute.hellbender.engine.dataflow.datasources;

import com.cloudera.dataflow.spark.SparkPipelineRunner;
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
import com.google.cloud.dataflow.sdk.transforms.Create;
Expand Down Expand Up @@ -69,6 +70,26 @@ public void testVariantPCollection(final List<Variant> variants) {
p.run();
}

@Test(dataProvider = "RealVariantData")
public void testVariantPCollectionFromHadoop(final List<Variant> variants) {
for (int i = 0; i < variants.size(); ++i) {
// Update the element with a cleared UUID.
variants.set(i, createVariantContextVariantAdapterForTesting(((VariantContextVariantAdapter) variants.get(i)), defaultUUID()));
}
// Now make a PCollection, to verify that variants can be coded.
Pipeline p = GATKTestPipeline.create();

// Now, we can test that we can get a PCollection from Hadoop
String file = FEATURE_DATA_SOURCE_TEST_DIRECTORY + "feature_data_source_test.vcf";
PCollection<Variant> allVariants = VariantsHadoopSource.getAllVariants(file, p);

// We have to clear the UUIDs to make the comparison.
PCollection<Variant> allVariants2 = allVariants.apply(ParDo.of(new clearUUIDDoFn()));
DataflowAssert.that(allVariants2).containsInAnyOrder(variants);

SparkPipelineRunner.create().run(p);
}

@Test(dataProvider = "VariantDataProvider")
public void testVariantAdapter(final List<Variant> expectedVariantList) {
// The test suite for reading in VCF files is FeatureDataSourceUnitTest.
Expand Down

0 comments on commit 73fc6a7

Please sign in to comment.