Skip to content
This repository was archived by the owner on Oct 29, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@
<dependency>
<groupId>com.google.cloud.genomics</groupId>
<artifactId>google-genomics-utils</artifactId>
<version>v1-0.2</version>
<version>v1-0.3</version>
<exclusions>
<!-- Exclude an old version of guava which is being pulled
in by a transitive dependency google-api-client. -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.cloud.dataflow.sdk.transforms.SimpleFunction;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.genomics.utils.Contig;
import com.google.genomics.v1.StreamReadsRequest;
import com.google.genomics.v1.StreamVariantsRequest;

import java.util.logging.Logger;
Expand All @@ -45,7 +46,6 @@
* </ol>
*
* The fields may be comma, tab, or whitespace delimited.
*
*/
public class SitesToShards {

Expand All @@ -62,6 +62,9 @@ public static interface Options extends PipelineOptions {

private static final Pattern SITE_PATTERN = Pattern.compile("^\\s*([\\w\\.]+)\\W+(\\d+)\\W+(\\d+).*$");

/**
* Given a string encoding a site, parse it into a Contig object.
*/
public static class SitesToContigsFn extends DoFn<String, Contig> {

@Override
Expand All @@ -78,41 +81,95 @@ public void processElement(DoFn<String, Contig>.ProcessContext context) throws E
}
}

/**
* Given a contig object and request prototype, construct a request spanning the region
* defined by the contig.
*/
public static class ContigsToStreamVariantsRequestsFn extends
SimpleFunction<Contig, StreamVariantsRequest> {

private final String variantSetId;
private final StreamVariantsRequest prototype;

public ContigsToStreamVariantsRequestsFn(String variantSetId) {
public ContigsToStreamVariantsRequestsFn(StreamVariantsRequest prototype) {
super();
this.variantSetId = variantSetId;
this.prototype = prototype;
}

@Override
public StreamVariantsRequest apply(Contig contig) {
if (null == contig) {
return null;
}
return contig.getStreamVariantsRequest(variantSetId);
return contig.getStreamVariantsRequest(prototype);
}

}

/**
* Use this transform when you have file(s) of sites that should be converted into
* streaming requests that each span the region for a site.
*/
public static class SitesToStreamVariantsShardsTransform extends
PTransform<PCollection<String>, PCollection<StreamVariantsRequest>> {

private final String variantSetId;
private final StreamVariantsRequest prototype;

public SitesToStreamVariantsShardsTransform(String variantSetId) {
public SitesToStreamVariantsShardsTransform(StreamVariantsRequest prototype) {
super();
this.variantSetId = variantSetId;
this.prototype = prototype;
}

@Override
public PCollection<StreamVariantsRequest> apply(PCollection<String> lines) {
return lines.apply(ParDo.of(new SitesToContigsFn()))
.apply("Contigs to StreamVariantsRequests",
MapElements.via(new ContigsToStreamVariantsRequestsFn(variantSetId)));
MapElements.via(new ContigsToStreamVariantsRequestsFn(prototype)));
}
}

/**
* Given a contig object and request prototype, construct a request spanning the region
* defined by the contig.
*/
public static class ContigsToStreamReadsRequestsFn extends
SimpleFunction<Contig, StreamReadsRequest> {

private final StreamReadsRequest prototype;

public ContigsToStreamReadsRequestsFn(StreamReadsRequest prototype) {
super();
this.prototype = prototype;
}

@Override
public StreamReadsRequest apply(Contig contig) {
if (null == contig) {
return null;
}
return contig.getStreamReadsRequest(prototype);
}

}

/**
* Use this transform when you have file(s) of sites that should be converted into
* streaming requests that each span the region for a site.
*/
public static class SitesToStreamReadsShardsTransform extends
PTransform<PCollection<String>, PCollection<StreamReadsRequest>> {

private final StreamReadsRequest prototype;

public SitesToStreamReadsShardsTransform(StreamReadsRequest prototype) {
super();
this.prototype = prototype;
}

@Override
public PCollection<StreamReadsRequest> apply(PCollection<String> lines) {
return lines.apply(ParDo.of(new SitesToContigsFn()))
.apply("Contigs to StreamReadsRequests",
MapElements.via(new ContigsToStreamReadsRequestsFn(prototype)));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.google.cloud.genomics.dataflow.coders.GenericJsonCoder;
import com.google.cloud.genomics.dataflow.utils.AnnotationUtils;
import com.google.cloud.genomics.dataflow.utils.AnnotationUtils.VariantEffect;
import com.google.cloud.genomics.dataflow.utils.CallSetNamesOptions;
import com.google.cloud.genomics.dataflow.utils.GCSOutputOptions;
import com.google.cloud.genomics.dataflow.utils.GenomicsOptions;
import com.google.cloud.genomics.dataflow.utils.ShardOptions;
Expand Down Expand Up @@ -87,20 +88,25 @@
*/
public final class AnnotateVariants extends DoFn<StreamVariantsRequest, KV<String, VariantAnnotation>> {

public static interface Options extends ShardOptions, GCSOutputOptions {
public static interface Options extends
// Options for call set names.
CallSetNamesOptions,
// Options for calculating over regions, chromosomes, or whole genomes.
ShardOptions,
// Options for the output destination.
GCSOutputOptions {

@Override
@Description("The ID of the Google Genomics variant set this pipeline is accessing. "
+ "Defaults to 1000 Genomes.")
@Default.String("10473108253681171589")
String getVariantSetId();

void setVariantSetId(String variantSetId);

@Description("The IDs of the Google Genomics call sets this pipeline is working with, comma "
+ "delimited.Defaults to 1000 Genomes HG00261.")
@Default.String("10473108253681171589-0")
String getCallSetIds();
void setCallSetIds(String callSetIds);
@Override
@Description("The names of the Google Genomics call sets this pipeline is working with, comma "
+ "delimited. Defaults to 1000 Genomes HG00261.")
@Default.String("HG00261")
String getCallSetNames();

@Description("The IDs of the Google Genomics transcript sets this pipeline is working with, "
+ "comma delimited. Defaults to UCSC refGene (hg19).")
Expand Down Expand Up @@ -296,23 +302,22 @@ public static void main(String[] args) throws Exception {
// Option validation is not yet automatic, we make an explicit call here.
Options.Methods.validateOptions(options);

// Set up the prototype request and auth.
StreamVariantsRequest prototype = CallSetNamesOptions.Methods.getRequestPrototype(options);
OfflineAuth auth = GenomicsOptions.Methods.getGenomicsAuth(options);
Genomics genomics = GenomicsFactory.builder().build().fromOfflineAuth(auth);

List<String> callSetIds = ImmutableList.of();
if (!Strings.isNullOrEmpty(options.getCallSetIds().trim())) {
callSetIds = ImmutableList.copyOf(options.getCallSetIds().split(","));
}
List<String> callSetIds = CallSetNamesOptions.Methods.getCallSetIds(options);
List<String> transcriptSetIds =
validateAnnotationSetsFlag(genomics, options.getTranscriptSetIds(), "TRANSCRIPT");
List<String> variantAnnotationSetIds =
validateAnnotationSetsFlag(genomics, options.getVariantAnnotationSetIds(), "VARIANT");
validateRefsetForAnnotationSets(genomics, transcriptSetIds);

List<StreamVariantsRequest> requests = options.isAllReferences() ?
ShardUtils.getVariantRequests(options.getVariantSetId(), ShardUtils.SexChromosomeFilter.EXCLUDE_XY,
ShardUtils.getVariantRequests(prototype, ShardUtils.SexChromosomeFilter.EXCLUDE_XY,
options.getBasesPerShard(), auth) :
ShardUtils.getVariantRequests(options.getVariantSetId(), options.getReferences(), options.getBasesPerShard());
ShardUtils.getVariantRequests(prototype, options.getBasesPerShard(), options.getReferences());

Pipeline p = Pipeline.create(options);
p.getCoderRegistry().setFallbackCoderProvider(GenericJsonCoder.PROVIDER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,23 @@
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.genomics.dataflow.readers.VariantStreamer;
import com.google.cloud.genomics.dataflow.utils.CallSetNamesOptions;
import com.google.cloud.genomics.dataflow.utils.GCSOutputOptions;
import com.google.cloud.genomics.dataflow.utils.GenomicsOptions;
import com.google.cloud.genomics.dataflow.utils.ShardOptions;
import com.google.cloud.genomics.utils.OfflineAuth;
import com.google.cloud.genomics.utils.ShardBoundary;
import com.google.cloud.genomics.utils.ShardUtils;
import com.google.common.base.CharMatcher;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableSet;
import com.google.common.io.Files;
import com.google.genomics.v1.StreamVariantsRequest;
import com.google.genomics.v1.Variant;
import com.google.genomics.v1.VariantCall;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.security.GeneralSecurityException;
import java.util.List;

Expand All @@ -58,22 +54,24 @@
public class IdentifyPrivateVariants {

public static interface Options extends
// Options for calculating over regions, chromosomes, or whole genomes.
ShardOptions,
// Options for the output destination.
GCSOutputOptions {
// Options for call set names.
CallSetNamesOptions,
// Options for calculating over regions, chromosomes, or whole genomes.
ShardOptions,
// Options for the output destination.
GCSOutputOptions {

@Override
@Description("The ID of the Google Genomics variant set from which this pipeline "
+ "will identify private variants.")
@Required
String getVariantSetId();
void setVariantSetId(String variantSetId);

@Description("A local file path to a list of newline-separated callset IDs. "
+ "Any variants private to those callset IDs will be deleted.")
@Override
@Description("A local file path to a list of newline-separated callset names. "
+ "Any variants private to those callsets will be identified.")
@Required
String getCallSetIdsFilepath();
void setCallSetIdsFilepath(String filepath);
String getCallSetNamesFilepath();

@Description("Whether variants with no callsets should also be identified. Defaults to false.")
@Default.Boolean(false)
Expand Down Expand Up @@ -140,28 +138,24 @@ public static void main(String[] args) throws IOException, GeneralSecurityExcept
// Option validation is not yet automatic, we make an explicit call here.
Options.Methods.validateOptions(options);

// Set up the prototype request and auth.
StreamVariantsRequest prototype = CallSetNamesOptions.Methods.getRequestPrototype(options);
OfflineAuth auth = GenomicsOptions.Methods.getGenomicsAuth(options);

// Grab and parse the list of callset IDs.
String fileContents =
Files.toString(new File(options.getCallSetIdsFilepath()), Charset.defaultCharset());
ImmutableSet<String> callSetIds =
ImmutableSet
.<String>builder()
.addAll(
Splitter.on(CharMatcher.breakingWhitespace()).omitEmptyStrings().trimResults()
.split(fileContents)).build();
ImmutableSet<String> callSetIds = ImmutableSet.<String>builder()
.addAll(CallSetNamesOptions.Methods.getCallSetIds(options))
.build();
LOG.info("The pipeline will identify and write to Cloud Storage variants "
+ "private to " + callSetIds.size() + " genomes with callSetIds: " + callSetIds);
if (options.getIdentifyVariantsWithoutCalls()) {
LOG.info("* The pipeline will also identify variants with no callsets. *");
}

List<StreamVariantsRequest> shardRequests =
options.isAllReferences() ? ShardUtils.getVariantRequests(options.getVariantSetId(),
options.isAllReferences() ? ShardUtils.getVariantRequests(prototype,
ShardUtils.SexChromosomeFilter.INCLUDE_XY, options.getBasesPerShard(), auth)
: ShardUtils.getVariantRequests(options.getVariantSetId(), options.getReferences(),
options.getBasesPerShard());
: ShardUtils.getVariantRequests(prototype, options.getBasesPerShard(),
options.getReferences());

Pipeline p = Pipeline.create(options);
PCollection<Variant> variants = p.begin()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.google.cloud.genomics.dataflow.functions.ibs.IBSCalculator;
import com.google.cloud.genomics.dataflow.functions.ibs.SharedMinorAllelesCalculatorFactory;
import com.google.cloud.genomics.dataflow.readers.VariantStreamer;
import com.google.cloud.genomics.dataflow.utils.CallSetNamesOptions;
import com.google.cloud.genomics.dataflow.utils.GCSOutputOptions;
import com.google.cloud.genomics.dataflow.utils.GenomicsOptions;
import com.google.cloud.genomics.dataflow.utils.ShardOptions;
Expand All @@ -54,6 +55,8 @@
public class IdentityByState {

public static interface Options extends
// Options for call set names.
CallSetNamesOptions,
// Options for calculating over regions, chromosomes, or whole genomes.
ShardOptions,
// Options for calculating over a list of sites.
Expand All @@ -66,11 +69,13 @@ public static interface Options extends
GCSOutputOptions
{

@Override
@Description("The ID of the Google Genomics variant set this pipeline is accessing. "
+ "Defaults to 1000 Genomes.")
@Default.String("10473108253681171589")
String getVariantSetId();

@Override
void setVariantSetId(String variantSetId);

@Description("The class that determines the strategy for calculating the similarity of alleles.")
Expand Down Expand Up @@ -100,6 +105,9 @@ public static void main(String[] args) throws Exception {
PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
// Option validation is not yet automatic, we make an explicit call here.
Options.Methods.validateOptions(options);

// Set up the prototype request and auth.
StreamVariantsRequest prototype = CallSetNamesOptions.Methods.getRequestPrototype(options);
OfflineAuth auth = GenomicsOptions.Methods.getGenomicsAuth(options);

Pipeline p = Pipeline.create(options);
Expand All @@ -109,7 +117,7 @@ public static void main(String[] args) throws Exception {
// Compute IBS on a list of sites (e.g., SNPs).
PCollection<StreamVariantsRequest> requests = p.apply(TextIO.Read.named("ReadSites")
.from(options.getSitesFilepath()))
.apply(new SitesToShards.SitesToStreamVariantsShardsTransform(options.getVariantSetId()));
.apply(new SitesToShards.SitesToStreamVariantsShardsTransform(prototype));

if(options.getHasNonVariantSegments()) {
processedVariants = requests.apply(
Expand All @@ -121,9 +129,9 @@ public static void main(String[] args) throws Exception {
} else {
// Computing IBS over genomic region(s) or the whole genome.
List<StreamVariantsRequest> requests = options.isAllReferences() ?
ShardUtils.getVariantRequests(options.getVariantSetId(), ShardUtils.SexChromosomeFilter.EXCLUDE_XY,
ShardUtils.getVariantRequests(prototype, ShardUtils.SexChromosomeFilter.EXCLUDE_XY,
options.getBasesPerShard(), auth) :
ShardUtils.getVariantRequests(options.getVariantSetId(), options.getReferences(), options.getBasesPerShard());
ShardUtils.getVariantRequests(prototype, options.getBasesPerShard(), options.getReferences());
PCollection<Variant> variants = p.begin()
.apply(Create.of(requests))
.apply(new VariantStreamer(auth, ShardBoundary.Requirement.STRICT, VARIANT_FIELDS));
Expand Down
Loading