From fec2420dc0c1f7a5076e2086c25006661d28ed8f Mon Sep 17 00:00:00 2001 From: Nicole Deflaux Date: Wed, 18 May 2016 13:53:25 -0700 Subject: [PATCH] Update variant pipelines to take a subset of call sets. --- pom.xml | 2 +- .../dataflow/functions/SitesToShards.java | 75 ++++++++-- .../dataflow/pipelines/AnnotateVariants.java | 33 +++-- .../pipelines/IdentifyPrivateVariants.java | 46 +++--- .../dataflow/pipelines/IdentityByState.java | 14 +- .../dataflow/pipelines/VariantSimilarity.java | 37 +++-- .../dataflow/pipelines/VerifyBamId.java | 20 ++- .../dataflow/utils/CallSetNamesOptions.java | 140 ++++++++++++++++++ .../dataflow/functions/SitesToShardsTest.java | 13 +- .../pipelines/AnnotateVariantsITCase.java | 29 +++- .../IdentifyPrivateVariantsITCase.java | 4 +- .../pipelines/IntegrationTestHelper.java | 1 + .../pipelines/VariantSimilarityITCase.java | 27 +++- .../genomics/dataflow/pipelines/family.txt | 26 ++-- 14 files changed, 368 insertions(+), 99 deletions(-) create mode 100644 src/main/java/com/google/cloud/genomics/dataflow/utils/CallSetNamesOptions.java diff --git a/pom.xml b/pom.xml index 7971bad..53acab3 100644 --- a/pom.xml +++ b/pom.xml @@ -121,7 +121,7 @@ com.google.cloud.genomics google-genomics-utils - v1-0.2 + v1-0.3 diff --git a/src/main/java/com/google/cloud/genomics/dataflow/functions/SitesToShards.java b/src/main/java/com/google/cloud/genomics/dataflow/functions/SitesToShards.java index 24948a7..f589ee8 100644 --- a/src/main/java/com/google/cloud/genomics/dataflow/functions/SitesToShards.java +++ b/src/main/java/com/google/cloud/genomics/dataflow/functions/SitesToShards.java @@ -22,6 +22,7 @@ import com.google.cloud.dataflow.sdk.transforms.SimpleFunction; import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.cloud.genomics.utils.Contig; +import com.google.genomics.v1.StreamReadsRequest; import com.google.genomics.v1.StreamVariantsRequest; import java.util.logging.Logger; @@ -45,7 +46,6 @@ * * * The fields may be comma, tab, or whitespace delimited. - * */ public class SitesToShards { @@ -62,6 +62,9 @@ public static interface Options extends PipelineOptions { private static final Pattern SITE_PATTERN = Pattern.compile("^\\s*([\\w\\.]+)\\W+(\\d+)\\W+(\\d+).*$"); + /** + * Given a string encoding a site, parse it into a Contig object. + */ public static class SitesToContigsFn extends DoFn { @Override @@ -78,14 +81,18 @@ public void processElement(DoFn.ProcessContext context) throws E } } + /** + * Given a contig object and request prototype, construct a request spanning the region + * defined by the contig. + */ public static class ContigsToStreamVariantsRequestsFn extends SimpleFunction { - private final String variantSetId; + private final StreamVariantsRequest prototype; - public ContigsToStreamVariantsRequestsFn(String variantSetId) { + public ContigsToStreamVariantsRequestsFn(StreamVariantsRequest prototype) { super(); - this.variantSetId = variantSetId; + this.prototype = prototype; } @Override @@ -93,26 +100,76 @@ public StreamVariantsRequest apply(Contig contig) { if (null == contig) { return null; } - return contig.getStreamVariantsRequest(variantSetId); + return contig.getStreamVariantsRequest(prototype); } } + /** + * Use this transform when you have file(s) of sites that should be converted into + * streaming requests that each span the region for a site. + */ public static class SitesToStreamVariantsShardsTransform extends PTransform, PCollection> { - private final String variantSetId; + private final StreamVariantsRequest prototype; - public SitesToStreamVariantsShardsTransform(String variantSetId) { + public SitesToStreamVariantsShardsTransform(StreamVariantsRequest prototype) { super(); - this.variantSetId = variantSetId; + this.prototype = prototype; } @Override public PCollection apply(PCollection lines) { return lines.apply(ParDo.of(new SitesToContigsFn())) .apply("Contigs to StreamVariantsRequests", - MapElements.via(new ContigsToStreamVariantsRequestsFn(variantSetId))); + MapElements.via(new ContigsToStreamVariantsRequestsFn(prototype))); + } + } + + /** + * Given a contig object and request prototype, construct a request spanning the region + * defined by the contig. + */ + public static class ContigsToStreamReadsRequestsFn extends + SimpleFunction { + + private final StreamReadsRequest prototype; + + public ContigsToStreamReadsRequestsFn(StreamReadsRequest prototype) { + super(); + this.prototype = prototype; + } + + @Override + public StreamReadsRequest apply(Contig contig) { + if (null == contig) { + return null; + } + return contig.getStreamReadsRequest(prototype); + } + + } + + /** + * Use this transform when you have file(s) of sites that should be converted into + * streaming requests that each span the region for a site. + */ + public static class SitesToStreamReadsShardsTransform extends + PTransform, PCollection> { + + private final StreamReadsRequest prototype; + + public SitesToStreamReadsShardsTransform(StreamReadsRequest prototype) { + super(); + this.prototype = prototype; + } + + @Override + public PCollection apply(PCollection lines) { + return lines.apply(ParDo.of(new SitesToContigsFn())) + .apply("Contigs to StreamReadsRequests", + MapElements.via(new ContigsToStreamReadsRequestsFn(prototype))); } } } diff --git a/src/main/java/com/google/cloud/genomics/dataflow/pipelines/AnnotateVariants.java b/src/main/java/com/google/cloud/genomics/dataflow/pipelines/AnnotateVariants.java index 92a1a52..0f46530 100644 --- a/src/main/java/com/google/cloud/genomics/dataflow/pipelines/AnnotateVariants.java +++ b/src/main/java/com/google/cloud/genomics/dataflow/pipelines/AnnotateVariants.java @@ -33,6 +33,7 @@ import com.google.cloud.genomics.dataflow.coders.GenericJsonCoder; import com.google.cloud.genomics.dataflow.utils.AnnotationUtils; import com.google.cloud.genomics.dataflow.utils.AnnotationUtils.VariantEffect; +import com.google.cloud.genomics.dataflow.utils.CallSetNamesOptions; import com.google.cloud.genomics.dataflow.utils.GCSOutputOptions; import com.google.cloud.genomics.dataflow.utils.GenomicsOptions; import com.google.cloud.genomics.dataflow.utils.ShardOptions; @@ -87,20 +88,25 @@ */ public final class AnnotateVariants extends DoFn> { - public static interface Options extends ShardOptions, GCSOutputOptions { + public static interface Options extends + // Options for call set names. + CallSetNamesOptions, + // Options for calculating over regions, chromosomes, or whole genomes. + ShardOptions, + // Options for the output destination. + GCSOutputOptions { + @Override @Description("The ID of the Google Genomics variant set this pipeline is accessing. " + "Defaults to 1000 Genomes.") @Default.String("10473108253681171589") String getVariantSetId(); - void setVariantSetId(String variantSetId); - - @Description("The IDs of the Google Genomics call sets this pipeline is working with, comma " - + "delimited.Defaults to 1000 Genomes HG00261.") - @Default.String("10473108253681171589-0") - String getCallSetIds(); - void setCallSetIds(String callSetIds); + @Override + @Description("The names of the Google Genomics call sets this pipeline is working with, comma " + + "delimited. Defaults to 1000 Genomes HG00261.") + @Default.String("HG00261") + String getCallSetNames(); @Description("The IDs of the Google Genomics transcript sets this pipeline is working with, " + "comma delimited. Defaults to UCSC refGene (hg19).") @@ -296,13 +302,12 @@ public static void main(String[] args) throws Exception { // Option validation is not yet automatic, we make an explicit call here. Options.Methods.validateOptions(options); + // Set up the prototype request and auth. + StreamVariantsRequest prototype = CallSetNamesOptions.Methods.getRequestPrototype(options); OfflineAuth auth = GenomicsOptions.Methods.getGenomicsAuth(options); Genomics genomics = GenomicsFactory.builder().build().fromOfflineAuth(auth); - List callSetIds = ImmutableList.of(); - if (!Strings.isNullOrEmpty(options.getCallSetIds().trim())) { - callSetIds = ImmutableList.copyOf(options.getCallSetIds().split(",")); - } + List callSetIds = CallSetNamesOptions.Methods.getCallSetIds(options); List transcriptSetIds = validateAnnotationSetsFlag(genomics, options.getTranscriptSetIds(), "TRANSCRIPT"); List variantAnnotationSetIds = @@ -310,9 +315,9 @@ public static void main(String[] args) throws Exception { validateRefsetForAnnotationSets(genomics, transcriptSetIds); List requests = options.isAllReferences() ? - ShardUtils.getVariantRequests(options.getVariantSetId(), ShardUtils.SexChromosomeFilter.EXCLUDE_XY, + ShardUtils.getVariantRequests(prototype, ShardUtils.SexChromosomeFilter.EXCLUDE_XY, options.getBasesPerShard(), auth) : - ShardUtils.getVariantRequests(options.getVariantSetId(), options.getReferences(), options.getBasesPerShard()); + ShardUtils.getVariantRequests(prototype, options.getBasesPerShard(), options.getReferences()); Pipeline p = Pipeline.create(options); p.getCoderRegistry().setFallbackCoderProvider(GenericJsonCoder.PROVIDER); diff --git a/src/main/java/com/google/cloud/genomics/dataflow/pipelines/IdentifyPrivateVariants.java b/src/main/java/com/google/cloud/genomics/dataflow/pipelines/IdentifyPrivateVariants.java index c41ab76..3f13988 100644 --- a/src/main/java/com/google/cloud/genomics/dataflow/pipelines/IdentifyPrivateVariants.java +++ b/src/main/java/com/google/cloud/genomics/dataflow/pipelines/IdentifyPrivateVariants.java @@ -24,17 +24,15 @@ import com.google.cloud.dataflow.sdk.transforms.ParDo; import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.cloud.genomics.dataflow.readers.VariantStreamer; +import com.google.cloud.genomics.dataflow.utils.CallSetNamesOptions; import com.google.cloud.genomics.dataflow.utils.GCSOutputOptions; import com.google.cloud.genomics.dataflow.utils.GenomicsOptions; import com.google.cloud.genomics.dataflow.utils.ShardOptions; import com.google.cloud.genomics.utils.OfflineAuth; import com.google.cloud.genomics.utils.ShardBoundary; import com.google.cloud.genomics.utils.ShardUtils; -import com.google.common.base.CharMatcher; import com.google.common.base.Joiner; -import com.google.common.base.Splitter; import com.google.common.collect.ImmutableSet; -import com.google.common.io.Files; import com.google.genomics.v1.StreamVariantsRequest; import com.google.genomics.v1.Variant; import com.google.genomics.v1.VariantCall; @@ -42,9 +40,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; import java.io.IOException; -import java.nio.charset.Charset; import java.security.GeneralSecurityException; import java.util.List; @@ -58,22 +54,24 @@ public class IdentifyPrivateVariants { public static interface Options extends - // Options for calculating over regions, chromosomes, or whole genomes. - ShardOptions, - // Options for the output destination. - GCSOutputOptions { + // Options for call set names. + CallSetNamesOptions, + // Options for calculating over regions, chromosomes, or whole genomes. + ShardOptions, + // Options for the output destination. + GCSOutputOptions { + @Override @Description("The ID of the Google Genomics variant set from which this pipeline " + "will identify private variants.") @Required String getVariantSetId(); - void setVariantSetId(String variantSetId); - @Description("A local file path to a list of newline-separated callset IDs. " - + "Any variants private to those callset IDs will be deleted.") + @Override + @Description("A local file path to a list of newline-separated callset names. " + + "Any variants private to those callsets will be identified.") @Required - String getCallSetIdsFilepath(); - void setCallSetIdsFilepath(String filepath); + String getCallSetNamesFilepath(); @Description("Whether variants with no callsets should also be identified. Defaults to false.") @Default.Boolean(false) @@ -140,17 +138,13 @@ public static void main(String[] args) throws IOException, GeneralSecurityExcept // Option validation is not yet automatic, we make an explicit call here. Options.Methods.validateOptions(options); + // Set up the prototype request and auth. + StreamVariantsRequest prototype = CallSetNamesOptions.Methods.getRequestPrototype(options); OfflineAuth auth = GenomicsOptions.Methods.getGenomicsAuth(options); - // Grab and parse the list of callset IDs. - String fileContents = - Files.toString(new File(options.getCallSetIdsFilepath()), Charset.defaultCharset()); - ImmutableSet callSetIds = - ImmutableSet - .builder() - .addAll( - Splitter.on(CharMatcher.breakingWhitespace()).omitEmptyStrings().trimResults() - .split(fileContents)).build(); + ImmutableSet callSetIds = ImmutableSet.builder() + .addAll(CallSetNamesOptions.Methods.getCallSetIds(options)) + .build(); LOG.info("The pipeline will identify and write to Cloud Storage variants " + "private to " + callSetIds.size() + " genomes with callSetIds: " + callSetIds); if (options.getIdentifyVariantsWithoutCalls()) { @@ -158,10 +152,10 @@ public static void main(String[] args) throws IOException, GeneralSecurityExcept } List shardRequests = - options.isAllReferences() ? ShardUtils.getVariantRequests(options.getVariantSetId(), + options.isAllReferences() ? ShardUtils.getVariantRequests(prototype, ShardUtils.SexChromosomeFilter.INCLUDE_XY, options.getBasesPerShard(), auth) - : ShardUtils.getVariantRequests(options.getVariantSetId(), options.getReferences(), - options.getBasesPerShard()); + : ShardUtils.getVariantRequests(prototype, options.getBasesPerShard(), + options.getReferences()); Pipeline p = Pipeline.create(options); PCollection variants = p.begin() diff --git a/src/main/java/com/google/cloud/genomics/dataflow/pipelines/IdentityByState.java b/src/main/java/com/google/cloud/genomics/dataflow/pipelines/IdentityByState.java index ab940b1..6e92e15 100644 --- a/src/main/java/com/google/cloud/genomics/dataflow/pipelines/IdentityByState.java +++ b/src/main/java/com/google/cloud/genomics/dataflow/pipelines/IdentityByState.java @@ -33,6 +33,7 @@ import com.google.cloud.genomics.dataflow.functions.ibs.IBSCalculator; import com.google.cloud.genomics.dataflow.functions.ibs.SharedMinorAllelesCalculatorFactory; import com.google.cloud.genomics.dataflow.readers.VariantStreamer; +import com.google.cloud.genomics.dataflow.utils.CallSetNamesOptions; import com.google.cloud.genomics.dataflow.utils.GCSOutputOptions; import com.google.cloud.genomics.dataflow.utils.GenomicsOptions; import com.google.cloud.genomics.dataflow.utils.ShardOptions; @@ -54,6 +55,8 @@ public class IdentityByState { public static interface Options extends + // Options for call set names. + CallSetNamesOptions, // Options for calculating over regions, chromosomes, or whole genomes. ShardOptions, // Options for calculating over a list of sites. @@ -66,11 +69,13 @@ public static interface Options extends GCSOutputOptions { + @Override @Description("The ID of the Google Genomics variant set this pipeline is accessing. " + "Defaults to 1000 Genomes.") @Default.String("10473108253681171589") String getVariantSetId(); + @Override void setVariantSetId(String variantSetId); @Description("The class that determines the strategy for calculating the similarity of alleles.") @@ -100,6 +105,9 @@ public static void main(String[] args) throws Exception { PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); // Option validation is not yet automatic, we make an explicit call here. Options.Methods.validateOptions(options); + + // Set up the prototype request and auth. + StreamVariantsRequest prototype = CallSetNamesOptions.Methods.getRequestPrototype(options); OfflineAuth auth = GenomicsOptions.Methods.getGenomicsAuth(options); Pipeline p = Pipeline.create(options); @@ -109,7 +117,7 @@ public static void main(String[] args) throws Exception { // Compute IBS on a list of sites (e.g., SNPs). PCollection requests = p.apply(TextIO.Read.named("ReadSites") .from(options.getSitesFilepath())) - .apply(new SitesToShards.SitesToStreamVariantsShardsTransform(options.getVariantSetId())); + .apply(new SitesToShards.SitesToStreamVariantsShardsTransform(prototype)); if(options.getHasNonVariantSegments()) { processedVariants = requests.apply( @@ -121,9 +129,9 @@ public static void main(String[] args) throws Exception { } else { // Computing IBS over genomic region(s) or the whole genome. List requests = options.isAllReferences() ? - ShardUtils.getVariantRequests(options.getVariantSetId(), ShardUtils.SexChromosomeFilter.EXCLUDE_XY, + ShardUtils.getVariantRequests(prototype, ShardUtils.SexChromosomeFilter.EXCLUDE_XY, options.getBasesPerShard(), auth) : - ShardUtils.getVariantRequests(options.getVariantSetId(), options.getReferences(), options.getBasesPerShard()); + ShardUtils.getVariantRequests(prototype, options.getBasesPerShard(), options.getReferences()); PCollection variants = p.begin() .apply(Create.of(requests)) .apply(new VariantStreamer(auth, ShardBoundary.Requirement.STRICT, VARIANT_FIELDS)); diff --git a/src/main/java/com/google/cloud/genomics/dataflow/pipelines/VariantSimilarity.java b/src/main/java/com/google/cloud/genomics/dataflow/pipelines/VariantSimilarity.java index 03ed29a..b850e80 100644 --- a/src/main/java/com/google/cloud/genomics/dataflow/pipelines/VariantSimilarity.java +++ b/src/main/java/com/google/cloud/genomics/dataflow/pipelines/VariantSimilarity.java @@ -27,6 +27,7 @@ import com.google.cloud.genomics.dataflow.functions.pca.ExtractSimilarCallsets; import com.google.cloud.genomics.dataflow.functions.pca.OutputPCoAFile; import com.google.cloud.genomics.dataflow.readers.VariantStreamer; +import com.google.cloud.genomics.dataflow.utils.CallSetNamesOptions; import com.google.cloud.genomics.dataflow.utils.GCSOutputOptions; import com.google.cloud.genomics.dataflow.utils.GenomicsOptions; import com.google.cloud.genomics.dataflow.utils.ShardOptions; @@ -36,6 +37,7 @@ import com.google.cloud.genomics.utils.ShardUtils; import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; +import com.google.common.collect.Lists; import com.google.genomics.v1.StreamVariantsRequest; import java.io.IOException; @@ -52,20 +54,21 @@ public class VariantSimilarity { public static interface Options extends - // Options for calculating over regions, chromosomes, or whole genomes. - ShardOptions, - // Options for calculating over a list of sites. - SitesToShards.Options, - // Options for the output destination. - GCSOutputOptions { - + // Options for call set names. + CallSetNamesOptions, + // Options for calculating over regions, chromosomes, or whole genomes. + ShardOptions, + // Options for calculating over a list of sites. + SitesToShards.Options, + // Options for the output destination. + GCSOutputOptions { + + @Override @Description("The ID of the Google Genomics variant set this pipeline is accessing. " + "Defaults to 1000 Genomes.") @Default.String("10473108253681171589") String getVariantSetId(); - void setVariantSetId(String variantSetId); - public static class Methods { public static void validateOptions(Options options) { GCSOutputOptions.Methods.validateOptions(options); @@ -85,9 +88,14 @@ public static void main(String[] args) throws IOException, GeneralSecurityExcept // Option validation is not yet automatic, we make an explicit call here. Options.Methods.validateOptions(options); + // Set up the prototype request and auth. + StreamVariantsRequest prototype = CallSetNamesOptions.Methods.getRequestPrototype(options); OfflineAuth auth = GenomicsOptions.Methods.getGenomicsAuth(options); - List callSetNames = GenomicsUtils.getCallSetsNames(options.getVariantSetId() , auth); + // Make a bimap of the callsets so that the indices the pipeline is passing around are small. + List callSetNames = (0 < prototype.getCallSetIdsCount()) + ? Lists.newArrayList(CallSetNamesOptions.Methods.getCallSetNames(options)) + : GenomicsUtils.getCallSetsNames(options.getVariantSetId(), auth); Collections.sort(callSetNames); // Ensure a stable sort order for reproducible results. BiMap dataIndices = HashBiMap.create(); for(String callSetName : callSetNames) { @@ -97,17 +105,18 @@ public static void main(String[] args) throws IOException, GeneralSecurityExcept Pipeline p = Pipeline.create(options); p.begin(); - PCollection requests ; + PCollection requests; if(null != options.getSitesFilepath()) { // Compute PCA on a list of sites. requests = p.apply(TextIO.Read.named("ReadSites") .from(options.getSitesFilepath())) - .apply(new SitesToShards.SitesToStreamVariantsShardsTransform(options.getVariantSetId())); + .apply(new SitesToShards.SitesToStreamVariantsShardsTransform(prototype)); } else { + // Compute PCA over genomic regions. List shardRequests = options.isAllReferences() ? - ShardUtils.getVariantRequests(options.getVariantSetId(), ShardUtils.SexChromosomeFilter.EXCLUDE_XY, + ShardUtils.getVariantRequests(prototype, ShardUtils.SexChromosomeFilter.EXCLUDE_XY, options.getBasesPerShard(), auth) : - ShardUtils.getVariantRequests(options.getVariantSetId(), options.getReferences(), options.getBasesPerShard()); + ShardUtils.getVariantRequests(prototype, options.getBasesPerShard(), options.getReferences()); requests = p.apply(Create.of(shardRequests)); } diff --git a/src/main/java/com/google/cloud/genomics/dataflow/pipelines/VerifyBamId.java b/src/main/java/com/google/cloud/genomics/dataflow/pipelines/VerifyBamId.java index dfd4ece..d57e36e 100644 --- a/src/main/java/com/google/cloud/genomics/dataflow/pipelines/VerifyBamId.java +++ b/src/main/java/com/google/cloud/genomics/dataflow/pipelines/VerifyBamId.java @@ -45,6 +45,7 @@ import com.google.cloud.genomics.dataflow.pipelines.CalculateCoverage.CheckMatchingReferenceSet; import com.google.cloud.genomics.dataflow.readers.ReadGroupStreamer; import com.google.cloud.genomics.dataflow.readers.VariantStreamer; +import com.google.cloud.genomics.dataflow.utils.CallSetNamesOptions; import com.google.cloud.genomics.dataflow.utils.GCSOutputOptions; import com.google.cloud.genomics.dataflow.utils.GenomicsOptions; import com.google.cloud.genomics.dataflow.utils.ShardOptions; @@ -94,7 +95,13 @@ public class VerifyBamId { /** * Options required to run this pipeline. */ - public static interface Options extends ShardOptions, GCSOutputOptions { + public static interface Options extends + // Options for call set names. + CallSetNamesOptions, + // Options for calculating over regions, chromosomes, or whole genomes. + ShardOptions, + // Options for the output destination. + GCSOutputOptions { @Description("A comma delimited list of the IDs of the Google Genomics ReadGroupSets this " + "pipeline is working with. Default (empty) indicates all ReadGroupSets in InputDatasetId." @@ -116,6 +123,7 @@ public static interface Options extends ShardOptions, GCSOutputOptions { void setInputDatasetId(String inputDatasetId); public String DEFAULT_VARIANTSET = "10473108253681171589"; + @Override @Description("The ID of the Google Genomics VariantSet this pipeline is working with." + " It assumes the variant set has INFO field 'AF' from which it retrieves the" + " allele frequency for the variant, such as 1,000 Genomes phase 1 or phase 3 variants." @@ -123,8 +131,6 @@ public static interface Options extends ShardOptions, GCSOutputOptions { @Default.String(DEFAULT_VARIANTSET) String getVariantSetId(); - void setVariantSetId(String variantSetId); - @Description("The minimum allele frequency to use in analysis. Defaults to 0.01.") @Default.Double(0.01) double getMinFrequency(); @@ -155,7 +161,7 @@ public static void validateOptions(Options options) { private static final String HASH_PREFIX = ""; // Tip: Use the API explorer to test which fields to include in partial responses. // https://developers.google.com/apis-explorer/#p/genomics/v1/genomics.variants.stream?fields=variants(alternateBases%252Ccalls(callSetName%252Cgenotype)%252CreferenceBases)&_h=3&resource=%257B%250A++%2522variantSetId%2522%253A+%25223049512673186936334%2522%252C%250A++%2522referenceName%2522%253A+%2522chr17%2522%252C%250A++%2522start%2522%253A+%252241196311%2522%252C%250A++%2522end%2522%253A+%252241196312%2522%252C%250A++%2522callSetIds%2522%253A+%250A++%255B%25223049512673186936334-0%2522%250A++%255D%250A%257D& - private static final String VARIANT_FIELDS = "variants(start,calls(genotype,callSetName))"; + private static final String VARIANT_FIELDS = "variants(alternateBases,filter,info,quality,referenceBases,referenceName,start)"; /** * Run the VerifyBamId algorithm and output the resulting contamination estimate. @@ -168,6 +174,8 @@ public static void main(String[] args) throws GeneralSecurityException, IOExcept // Option validation is not yet automatic, we make an explicit call here. Options.Methods.validateOptions(pipelineOptions); + // Set up the prototype request and auth. + StreamVariantsRequest prototype = CallSetNamesOptions.Methods.getRequestPrototype(pipelineOptions); auth = GenomicsOptions.Methods.getGenomicsAuth(pipelineOptions); p = Pipeline.create(pipelineOptions); @@ -213,9 +221,9 @@ to get their ranges (we only need to stream Reads that overlap the selected Vari // Reads in Variants. TODO potentially provide an option to load the Variants from a file. List variantRequests = pipelineOptions.isAllReferences() ? - ShardUtils.getVariantRequests(pipelineOptions.getVariantSetId(), ShardUtils.SexChromosomeFilter.INCLUDE_XY, + ShardUtils.getVariantRequests(prototype, ShardUtils.SexChromosomeFilter.INCLUDE_XY, pipelineOptions.getBasesPerShard(), auth) : - ShardUtils.getVariantRequests(pipelineOptions.getVariantSetId(), pipelineOptions.getReferences(), pipelineOptions.getBasesPerShard()); + ShardUtils.getVariantRequests(prototype, pipelineOptions.getBasesPerShard(), pipelineOptions.getReferences()); PCollection variants = p.apply(Create.of(variantRequests)) .apply(new VariantStreamer(auth, ShardBoundary.Requirement.STRICT, VARIANT_FIELDS)); diff --git a/src/main/java/com/google/cloud/genomics/dataflow/utils/CallSetNamesOptions.java b/src/main/java/com/google/cloud/genomics/dataflow/utils/CallSetNamesOptions.java new file mode 100644 index 0000000..4ac84a8 --- /dev/null +++ b/src/main/java/com/google/cloud/genomics/dataflow/utils/CallSetNamesOptions.java @@ -0,0 +1,140 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.genomics.dataflow.utils; + +import com.google.api.services.genomics.model.CallSet; +import com.google.cloud.dataflow.sdk.options.Description; +import com.google.cloud.dataflow.sdk.options.Validation.Required; +import com.google.cloud.genomics.utils.CallSetUtils; +import com.google.cloud.genomics.utils.GenomicsUtils; +import com.google.common.base.CharMatcher; +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.collect.BiMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.io.Files; +import com.google.genomics.v1.StreamVariantsRequest; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.List; + +/** + * A common options class for variant sets and call sets. + */ +public interface CallSetNamesOptions extends GenomicsOptions { + + @Required + @Description("The ID of the Google Genomics variant set this pipeline is accessing.") + String getVariantSetId(); + void setVariantSetId(String variantSetId); + + @Description("A comma-separated list of callset names. Use this option or " + + "--callSetNamesFilepath to specify the subset of callsets over which " + + "this pipeline should compute.") + String getCallSetNames(); + void setCallSetNames(String callSetNames); + + @Description("A local file path to a list of newline-separated callset names. " + + "Use this option or --callSetNames to specify the subset of callsets " + + "over which this pipeline should compute.") + String getCallSetNamesFilepath(); + void setCallSetNamesFilepath(String callSetNamesFilepath); + + public static class Methods { + + /** + * Construct a request prototype with several fields already filled in using option values. + * + * @param options + * @return the request prototype + * @throws IOException + */ + public static StreamVariantsRequest getRequestPrototype(final CallSetNamesOptions options) throws IOException { + return StreamVariantsRequest.newBuilder() + .setProjectId(options.getProject()) + .setVariantSetId(options.getVariantSetId()) + .addAllCallSetIds(getCallSetIds(options)) + .build(); + } + + /** + * Parse and return the unique call set names specified in the options. + * + * @param options + * @return a list of unique call set names + * @throws IOException + */ + public static List getCallSetNames(final CallSetNamesOptions options) throws IOException { + Preconditions.checkArgument( + null == options.getCallSetNames() || null == options.getCallSetNamesFilepath(), + "Only specify one of --callSetNamesList or --callSetNamesFilepath"); + if (null != options.getCallSetNamesFilepath()) { + String fileContents = + Files.toString(new File(options.getCallSetNamesFilepath()), Charset.defaultCharset()); + return ImmutableSet + .builder() + .addAll( + Splitter.on(CharMatcher.breakingWhitespace()).omitEmptyStrings().trimResults() + .split(fileContents)).build().asList(); + } + if (null != options.getCallSetNames()) { + return ImmutableSet + .builder() + .addAll( + Splitter.on(CharMatcher.is(',')).omitEmptyStrings().trimResults() + .split(options.getCallSetNames())).build().asList(); + } + return ImmutableSet.builder().build().asList(); + } + + /** + * Return the call set ids corresponding to the call set names provided in the options. + * + * This has a side-effect of confirming that the call set names within the variant set are unique. + * + * @param options + * @return a list of unique call set ids + * @throws IOException + */ + public static List getCallSetIds(final CallSetNamesOptions options) throws IOException { + List callSetNames = getCallSetNames(options); + if (callSetNames.isEmpty()) { + return callSetNames; // Return the empty list. + } + + ImmutableSet.Builder callSetIds = ImmutableSet.builder(); + Iterable callSets = GenomicsUtils.getCallSets(options.getVariantSetId(), + GenomicsOptions.Methods.getGenomicsAuth(options)); + BiMap nameToId = null; + try { + nameToId = CallSetUtils.getCallSetNameMapping(callSets); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("VariantSet " + options.getVariantSetId() + + " contains duplicate callset name(s).", e); + } + for (String callSetName : callSetNames) { + String id = nameToId.get(callSetName); + Preconditions.checkNotNull(id, + "Call set name '%s' does not correspond to a call set id in variant set id %s", + callSetName, options.getVariantSetId()); + callSetIds.add(id); + } + return callSetIds.build().asList(); + } + } +} diff --git a/src/test/java/com/google/cloud/genomics/dataflow/functions/SitesToShardsTest.java b/src/test/java/com/google/cloud/genomics/dataflow/functions/SitesToShardsTest.java index 04b9856..0de8900 100644 --- a/src/test/java/com/google/cloud/genomics/dataflow/functions/SitesToShardsTest.java +++ b/src/test/java/com/google/cloud/genomics/dataflow/functions/SitesToShardsTest.java @@ -82,15 +82,18 @@ public void testSitesToStreamVariantsShards() throws Exception { "1, 2000000, 3000000", ""}); // blank line + StreamVariantsRequest prototype = StreamVariantsRequest.newBuilder() + .setProjectId("theProjectId") + .setVariantSetId("theVariantSetId") + .build(); + List expectedOutput = new ArrayList(); - expectedOutput.add(StreamVariantsRequest.newBuilder() - .setVariantSetId("variantSetId") + expectedOutput.add(StreamVariantsRequest.newBuilder(prototype) .setReferenceName("chrX") .setStart(2000000) .setEnd(3000000) .build()); - expectedOutput.add(StreamVariantsRequest.newBuilder() - .setVariantSetId("variantSetId") + expectedOutput.add(StreamVariantsRequest.newBuilder(prototype) .setReferenceName("1") .setStart(2000000) .setEnd(3000000) @@ -101,7 +104,7 @@ public void testSitesToStreamVariantsShards() throws Exception { PCollection input = p.apply(Create.of(SITES).withCoder(StringUtf8Coder.of())); PCollection output = input.apply("test transform", - new SitesToShards.SitesToStreamVariantsShardsTransform("variantSetId")); + new SitesToShards.SitesToStreamVariantsShardsTransform(prototype)); DataflowAssert.that(output).containsInAnyOrder(expectedOutput); p.run(); diff --git a/src/test/java/com/google/cloud/genomics/dataflow/pipelines/AnnotateVariantsITCase.java b/src/test/java/com/google/cloud/genomics/dataflow/pipelines/AnnotateVariantsITCase.java index 7cf2522..cb2b50e 100644 --- a/src/test/java/com/google/cloud/genomics/dataflow/pipelines/AnnotateVariantsITCase.java +++ b/src/test/java/com/google/cloud/genomics/dataflow/pipelines/AnnotateVariantsITCase.java @@ -16,14 +16,18 @@ package com.google.cloud.genomics.dataflow.pipelines; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; import com.google.api.client.util.Lists; import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath; import org.hamcrest.CoreMatchers; +import org.hamcrest.core.IsInstanceOf; import org.junit.After; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -51,6 +55,9 @@ */ @RunWith(JUnit4.class) public class AnnotateVariantsITCase { + @Rule + public ExpectedException thrown = ExpectedException.none(); + static final String[] EXPECTED_RESULT = { "chr17:40714803:A:CI7s77ro84KpKhIFY2hyMTcYs4S1EyDwuoPB1PDR19AB: [{alternateBases=A, effect=NONSYNONYMOUS_SNP, " @@ -84,7 +91,27 @@ public void testLocal() throws Exception { "--variantSetId=" + helper.PLATINUM_GENOMES_DATASET, "--transcriptSetIds=CIjfoPXj9LqPlAEQ5vnql4KewYuSAQ", "--variantAnnotationSetIds=CILSqfjtlY6tHxC0nNH-4cu-xlQ", - "--callSetIds=3049512673186936334-0", + "--callSetNames=NA12877", + "--output=" + outputPrefix, + }; + + System.out.println(ARGS); + + testBase(ARGS, EXPECTED_RESULT); + } + + @Test + public void testBadCallSetName() throws Exception { + thrown.expect(IsInstanceOf.instanceOf(NullPointerException.class)); + thrown.expectMessage(containsString("Call set name 'NotInVariantSet' does not correspond to a call " + + "set id in variant set id 3049512673186936334")); + + String[] ARGS = { + "--references=chr17:40700000:40800000", + "--variantSetId=" + helper.PLATINUM_GENOMES_DATASET, + "--transcriptSetIds=CIjfoPXj9LqPlAEQ5vnql4KewYuSAQ", + "--variantAnnotationSetIds=CILSqfjtlY6tHxC0nNH-4cu-xlQ", + "--callSetNames=NotInVariantSet", "--output=" + outputPrefix, }; diff --git a/src/test/java/com/google/cloud/genomics/dataflow/pipelines/IdentifyPrivateVariantsITCase.java b/src/test/java/com/google/cloud/genomics/dataflow/pipelines/IdentifyPrivateVariantsITCase.java index 57ee723..998e409 100644 --- a/src/test/java/com/google/cloud/genomics/dataflow/pipelines/IdentifyPrivateVariantsITCase.java +++ b/src/test/java/com/google/cloud/genomics/dataflow/pipelines/IdentifyPrivateVariantsITCase.java @@ -54,7 +54,7 @@ public class IdentifyPrivateVariantsITCase { // This file contains mother, father, and children of CEPH pedigree 1463. The variants of // the grandparents are retained. - static final String CALLSET_IDS_FILEPATH = "src/test/resources/com/google/cloud/genomics/dataflow/pipelines/family.txt"; + static final String CALLSET_NAMES_FILEPATH = "src/test/resources/com/google/cloud/genomics/dataflow/pipelines/family.txt"; static final String[] EXPECTED_RESULT = { @@ -140,7 +140,7 @@ public void testLocal() throws Exception { String[] ARGS = { "--references=chr17:41198200:41198300", // smaller portion of BRCA1 "--variantSetId=" + helper.PLATINUM_GENOMES_DATASET, - "--callSetIdsFilepath=" + CALLSET_IDS_FILEPATH, + "--callSetNamesFilepath=" + CALLSET_NAMES_FILEPATH, "--output=" + outputPrefix, }; diff --git a/src/test/java/com/google/cloud/genomics/dataflow/pipelines/IntegrationTestHelper.java b/src/test/java/com/google/cloud/genomics/dataflow/pipelines/IntegrationTestHelper.java index e6c4889..5b90222 100644 --- a/src/test/java/com/google/cloud/genomics/dataflow/pipelines/IntegrationTestHelper.java +++ b/src/test/java/com/google/cloud/genomics/dataflow/pipelines/IntegrationTestHelper.java @@ -47,6 +47,7 @@ public class IntegrationTestHelper { public static final String PLATINUM_GENOMES_DATASET = "3049512673186936334"; public static final String PLATINUM_GENOMES_BRCA1_REFERENCES = "chr17:41196311:41277499"; public static final int PLATINUM_GENOMES_NUMBER_OF_SAMPLES = 17; + public static final String A_FEW_PLATINUM_GENOMES_CALLSET_NAMES = "NA12877,NA12880,NA12890"; private GenomicsOptions popts = PipelineOptionsFactory.create().as(GenomicsOptions.class); GcsUtil gcsUtil; diff --git a/src/test/java/com/google/cloud/genomics/dataflow/pipelines/VariantSimilarityITCase.java b/src/test/java/com/google/cloud/genomics/dataflow/pipelines/VariantSimilarityITCase.java index afdfcde..b03aa54 100644 --- a/src/test/java/com/google/cloud/genomics/dataflow/pipelines/VariantSimilarityITCase.java +++ b/src/test/java/com/google/cloud/genomics/dataflow/pipelines/VariantSimilarityITCase.java @@ -49,7 +49,7 @@ * mvn install -DskipITs * * To run one test: - * mvn -Dit.test=VariantSimilarityITCase#testStreamingLocal verify + * mvn -Dit.test=VariantSimilarityITCase#testLocal verify * * See also http://maven.apache.org/surefire/maven-failsafe-plugin/examples/single-test.html */ @@ -96,6 +96,12 @@ public class VariantSimilarityITCase { new GraphResult("NA12893", 5.18, -1.18) }; + static final GraphResult[] EXPECTED_CALLSETS_RESULT = { + new GraphResult("NA12877", 4.58, 2.63), + new GraphResult("NA12880", -9.1, 0.01), + new GraphResult("NA12890", 4.5, -2.66) + }; + static String outputPrefix; static IntegrationTestHelper helper; @@ -113,7 +119,7 @@ public void tearDown() throws Exception { } @Test - public void testStreamingLocal() throws IOException, GeneralSecurityException { + public void testLocal() throws IOException, GeneralSecurityException { String[] ARGS = { "--references=" + helper.PLATINUM_GENOMES_BRCA1_REFERENCES, "--variantSetId=" + helper.PLATINUM_GENOMES_DATASET, @@ -123,7 +129,7 @@ public void testStreamingLocal() throws IOException, GeneralSecurityException { } @Test - public void testSitesFilepathStreamingLocal() throws IOException, GeneralSecurityException { + public void testSitesFilepathLocal() throws IOException, GeneralSecurityException { String[] ARGS = { "--sitesFilepath=" + IdentityByStateITCase.SITES_FILEPATH, "--variantSetId=" + helper.PLATINUM_GENOMES_DATASET, @@ -133,7 +139,18 @@ public void testSitesFilepathStreamingLocal() throws IOException, GeneralSecurit } @Test - public void testStreamingCloud() throws IOException, GeneralSecurityException { + public void testCallSetsLocal() throws IOException, GeneralSecurityException { + String[] ARGS = { + "--references=" + helper.PLATINUM_GENOMES_BRCA1_REFERENCES, + "--variantSetId=" + helper.PLATINUM_GENOMES_DATASET, + "--callSetNames=" + helper.A_FEW_PLATINUM_GENOMES_CALLSET_NAMES, + "--output=" + outputPrefix, + }; + testBase(ARGS, EXPECTED_CALLSETS_RESULT); + } + + @Test + public void testCloud() throws IOException, GeneralSecurityException { String[] ARGS = { "--references=" + helper.PLATINUM_GENOMES_BRCA1_REFERENCES, "--variantSetId=" + helper.PLATINUM_GENOMES_DATASET, @@ -159,7 +176,7 @@ private void testBase(String[] ARGS, GraphResult[] expectedResult) throws IOExce } // Check the pipeline results. - assertEquals(helper.PLATINUM_GENOMES_NUMBER_OF_SAMPLES, results.size()); + assertEquals(expectedResult.length, results.size()); assertThat(results, CoreMatchers.allOf(CoreMatchers.hasItems(expectedResult))); diff --git a/src/test/resources/com/google/cloud/genomics/dataflow/pipelines/family.txt b/src/test/resources/com/google/cloud/genomics/dataflow/pipelines/family.txt index 690a6ac..e8c82d0 100644 --- a/src/test/resources/com/google/cloud/genomics/dataflow/pipelines/family.txt +++ b/src/test/resources/com/google/cloud/genomics/dataflow/pipelines/family.txt @@ -1,13 +1,13 @@ -3049512673186936334-0 NA12877 [u'3049512673186936334'] -3049512673186936334-1 NA12893 [u'3049512673186936334'] -3049512673186936334-2 NA12885 [u'3049512673186936334'] -3049512673186936334-4 NA12887 [u'3049512673186936334'] -3049512673186936334-5 NA12881 [u'3049512673186936334'] -3049512673186936334-6 NA12888 [u'3049512673186936334'] -3049512673186936334-7 NA12882 [u'3049512673186936334'] -3049512673186936334-8 NA12879 [u'3049512673186936334'] -3049512673186936334-10 NA12883 [u'3049512673186936334'] -3049512673186936334-12 NA12886 [u'3049512673186936334'] -3049512673186936334-14 NA12878 [u'3049512673186936334'] -3049512673186936334-15 NA12884 [u'3049512673186936334'] -3049512673186936334-16 NA12880 [u'3049512673186936334'] +NA12877 +NA12893 +NA12885 +NA12887 +NA12881 +NA12888 +NA12882 +NA12879 +NA12883 +NA12886 +NA12878 +NA12884 +NA12880