Skip to content
This repository was archived by the owner on Oct 29, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@
<dependency>
<groupId>com.google.cloud.genomics</groupId>
<artifactId>google-genomics-utils</artifactId>
<version>v1-0.2</version>
<version>v1-0.3</version>
<exclusions>
<!-- Exclude an old version of guava which is being pulled
in by a transitive dependency google-api-client. -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,19 @@
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.genomics.utils.OfflineAuth;
import com.google.cloud.genomics.utils.ShardBoundary;
import com.google.cloud.genomics.utils.grpc.MergeNonVariantSegmentsWithSnps;
import com.google.cloud.genomics.utils.grpc.VariantEmitterStrategy;
import com.google.cloud.genomics.utils.grpc.VariantMergeStrategy;
import com.google.cloud.genomics.utils.grpc.VariantStreamIterator;
import com.google.cloud.genomics.utils.grpc.VariantUtils;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.genomics.v1.StreamVariantsRequest;
import com.google.genomics.v1.StreamVariantsResponse;
import com.google.genomics.v1.Variant;
import com.google.genomics.v1.Variant.Builder;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;

/**
Expand All @@ -66,6 +62,11 @@ public static interface Options extends PipelineOptions {
int getBinSize();
void setBinSize(int binSize);

@Description("The class that determines the strategy for merging non-variant segments and variants.")
@Default.Class(MergeNonVariantSegmentsWithSnps.class)
Class<? extends VariantMergeStrategy> getVariantMergeStrategy();
void setVariantMergeStrategy(Class<? extends VariantMergeStrategy> mergeStrategy);

public static class Methods {
public static void validateOptions(Options options) {
Preconditions.checkArgument(0 < options.getBinSize(), "binSize must be greater than zero");
Expand Down Expand Up @@ -211,85 +212,32 @@ public void processElement(DoFn<StreamVariantsRequest, Iterable<Variant>>.Proces
}
}

/**
* This DoFn converts data with non-variant segments (such as data that was in
* source format Genome VCF (gVCF) or Complete Genomics) to variant-only data with calls from
* non-variant-segments merged into the variants with which they overlap.
*
* This is currently done only for SNP variants. Indels and structural variants are left as-is.
*/
public static final class CombineVariantsFn extends DoFn<Iterable<Variant>, Variant> {
private VariantMergeStrategy merger;

/**
* Dev note: this code aims to minimize the amount of data held in memory. It should only
* be the current variant we are considering and any non-variant segments that overlap it.
*/
@Override
public void processElement(ProcessContext context) throws Exception {
List<Variant> records = Lists.newArrayList(context.element());

// The sort order is critical here so that candidate overlapping reference matching blocks
// occur prior to any variants they may overlap.
Collections.sort(records, NON_VARIANT_SEGMENT_COMPARATOR);

// The upper bound on potential overlaps is the sample size plus the number of
// block records that occur between actual variants.
List<Variant> blockRecords = new LinkedList<>();

for (Variant record : records) {
if (!VariantUtils.IS_NON_VARIANT_SEGMENT.apply(record)) {
// Dataflow does not allow the output of modified input items, so we make a copy and
// modify that, if applicable.
Builder updatedRecord = Variant.newBuilder(record);
// TODO: determine and implement the correct criteria for overlaps of non-SNP variants
if (VariantUtils.IS_SNP.apply(record)) {
for (Iterator<Variant> iterator = blockRecords.iterator(); iterator.hasNext();) {
Variant blockRecord = iterator.next();
if (isOverlapping(blockRecord, record)) {
updatedRecord.addAllCalls(blockRecord.getCallsList());
} else {
// Remove the current element from the iterator and the list since it is
// left of the genomic region we are currently working on due to our sort.
iterator.remove();
}
}
}
// Emit this variant and move on (no need to hang onto it in memory).
context.output(updatedRecord.build());
} else {
blockRecords.add(record);
}
}
public void startBundle(DoFn<Iterable<Variant>, Variant>.Context c) throws Exception {
super.startBundle(c);
Options options = c.getPipelineOptions().as(Options.class);
merger = options.getVariantMergeStrategy().newInstance();
}

static final Ordering<Variant> BY_START = Ordering.natural().onResultOf(
new Function<Variant, Long>() {
@Override
public Long apply(Variant variant) {
return variant.getStart();
}
});
@Override
public void processElement(ProcessContext context) throws Exception {
merger.merge(context.element(), new DataflowVariantEmitter(context));
}
}

static final Ordering<Variant> BY_FIRST_OF_ALTERNATE_BASES = Ordering.natural()
.nullsFirst().onResultOf(new Function<Variant, String>() {
@Override
public String apply(Variant variant) {
if (null == variant.getAlternateBasesList() || variant.getAlternateBasesList().isEmpty()) {
return null;
}
return variant.getAlternateBases(0);
}
});
public static class DataflowVariantEmitter implements VariantEmitterStrategy {
private final DoFn<Iterable<Variant>, Variant>.ProcessContext context;

// Special-purpose comparator for use in dealing with both variant and non-variant segment data.
// Sort by start position ascending and ensure that if a variant and a ref-matching block are at
// the same position, the non-variant segment record comes first.
static final Comparator<Variant> NON_VARIANT_SEGMENT_COMPARATOR = BY_START
.compound(BY_FIRST_OF_ALTERNATE_BASES);
public DataflowVariantEmitter(DoFn<Iterable<Variant>, Variant>.ProcessContext context) {
this.context = context;
}

static final boolean isOverlapping(Variant blockRecord, Variant variant) {
return blockRecord.getStart() <= variant.getStart()
&& blockRecord.getEnd() >= variant.getStart() + 1;
@Override
public void emit(Variant variant) {
context.output(variant);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.cloud.dataflow.sdk.transforms.SimpleFunction;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.genomics.utils.Contig;
import com.google.genomics.v1.StreamReadsRequest;
import com.google.genomics.v1.StreamVariantsRequest;

import java.util.logging.Logger;
Expand All @@ -45,7 +46,6 @@
* </ol>
*
* The fields may be comma, tab, or whitespace delimited.
*
*/
public class SitesToShards {

Expand All @@ -62,6 +62,9 @@ public static interface Options extends PipelineOptions {

private static final Pattern SITE_PATTERN = Pattern.compile("^\\s*([\\w\\.]+)\\W+(\\d+)\\W+(\\d+).*$");

/**
* Given a string encoding a site, parse it into a Contig object.
*/
public static class SitesToContigsFn extends DoFn<String, Contig> {

@Override
Expand All @@ -78,41 +81,95 @@ public void processElement(DoFn<String, Contig>.ProcessContext context) throws E
}
}

/**
* Given a contig object and request prototype, construct a request spanning the region
* defined by the contig.
*/
public static class ContigsToStreamVariantsRequestsFn extends
SimpleFunction<Contig, StreamVariantsRequest> {

private final String variantSetId;
private final StreamVariantsRequest prototype;

public ContigsToStreamVariantsRequestsFn(String variantSetId) {
public ContigsToStreamVariantsRequestsFn(StreamVariantsRequest prototype) {
super();
this.variantSetId = variantSetId;
this.prototype = prototype;
}

@Override
public StreamVariantsRequest apply(Contig contig) {
if (null == contig) {
return null;
}
return contig.getStreamVariantsRequest(variantSetId);
return contig.getStreamVariantsRequest(prototype);
}

}

/**
* Use this transform when you have file(s) of sites that should be converted into
* streaming requests that each span the region for a site.
*/
public static class SitesToStreamVariantsShardsTransform extends
PTransform<PCollection<String>, PCollection<StreamVariantsRequest>> {

private final String variantSetId;
private final StreamVariantsRequest prototype;

public SitesToStreamVariantsShardsTransform(String variantSetId) {
public SitesToStreamVariantsShardsTransform(StreamVariantsRequest prototype) {
super();
this.variantSetId = variantSetId;
this.prototype = prototype;
}

@Override
public PCollection<StreamVariantsRequest> apply(PCollection<String> lines) {
return lines.apply(ParDo.of(new SitesToContigsFn()))
.apply("Contigs to StreamVariantsRequests",
MapElements.via(new ContigsToStreamVariantsRequestsFn(variantSetId)));
MapElements.via(new ContigsToStreamVariantsRequestsFn(prototype)));
}
}

/**
* Given a contig object and request prototype, construct a request spanning the region
* defined by the contig.
*/
public static class ContigsToStreamReadsRequestsFn extends
SimpleFunction<Contig, StreamReadsRequest> {

private final StreamReadsRequest prototype;

public ContigsToStreamReadsRequestsFn(StreamReadsRequest prototype) {
super();
this.prototype = prototype;
}

@Override
public StreamReadsRequest apply(Contig contig) {
if (null == contig) {
return null;
}
return contig.getStreamReadsRequest(prototype);
}

}

/**
* Use this transform when you have file(s) of sites that should be converted into
* streaming requests that each span the region for a site.
*/
public static class SitesToStreamReadsShardsTransform extends
PTransform<PCollection<String>, PCollection<StreamReadsRequest>> {

private final StreamReadsRequest prototype;

public SitesToStreamReadsShardsTransform(StreamReadsRequest prototype) {
super();
this.prototype = prototype;
}

@Override
public PCollection<StreamReadsRequest> apply(PCollection<String> lines) {
return lines.apply(ParDo.of(new SitesToContigsFn()))
.apply("Contigs to StreamReadsRequests",
MapElements.via(new ContigsToStreamReadsRequestsFn(prototype)));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.genomics.dataflow.utils.CallFilters;
import com.google.cloud.genomics.dataflow.utils.PairGenerator;
import com.google.cloud.genomics.utils.grpc.VariantUtils;
import com.google.cloud.genomics.utils.grpc.VariantCallUtils;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.genomics.v1.Variant;
Expand Down Expand Up @@ -56,7 +56,7 @@ public void processElement(ProcessContext context) {
CallSimilarityCalculator callSimilarityCalculator =
callSimilarityCalculatorFactory.get(isReferenceMajor(variant));
for (KV<VariantCall, VariantCall> pair : PairGenerator.WITHOUT_REPLACEMENT.allPairs(
getSamplesWithVariant(variant), VariantUtils.CALL_COMPARATOR)) {
getSamplesWithVariant(variant), VariantCallUtils.CALL_COMPARATOR)) {
accumulateCallSimilarity(callSimilarityCalculator, pair.getKey(), pair.getValue());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.google.cloud.genomics.dataflow.coders.GenericJsonCoder;
import com.google.cloud.genomics.dataflow.utils.AnnotationUtils;
import com.google.cloud.genomics.dataflow.utils.AnnotationUtils.VariantEffect;
import com.google.cloud.genomics.dataflow.utils.CallSetNamesOptions;
import com.google.cloud.genomics.dataflow.utils.GCSOutputOptions;
import com.google.cloud.genomics.dataflow.utils.GenomicsOptions;
import com.google.cloud.genomics.dataflow.utils.ShardOptions;
Expand Down Expand Up @@ -87,20 +88,25 @@
*/
public final class AnnotateVariants extends DoFn<StreamVariantsRequest, KV<String, VariantAnnotation>> {

public static interface Options extends ShardOptions, GCSOutputOptions {
public static interface Options extends
// Options for call set names.
CallSetNamesOptions,
// Options for calculating over regions, chromosomes, or whole genomes.
ShardOptions,
// Options for the output destination.
GCSOutputOptions {

@Override
@Description("The ID of the Google Genomics variant set this pipeline is accessing. "
+ "Defaults to 1000 Genomes.")
@Default.String("10473108253681171589")
String getVariantSetId();

void setVariantSetId(String variantSetId);

@Description("The IDs of the Google Genomics call sets this pipeline is working with, comma "
+ "delimited.Defaults to 1000 Genomes HG00261.")
@Default.String("10473108253681171589-0")
String getCallSetIds();
void setCallSetIds(String callSetIds);
@Override
@Description("The names of the Google Genomics call sets this pipeline is working with, comma "
+ "delimited. Defaults to 1000 Genomes HG00261.")
@Default.String("HG00261")
String getCallSetNames();

@Description("The IDs of the Google Genomics transcript sets this pipeline is working with, "
+ "comma delimited. Defaults to UCSC refGene (hg19).")
Expand Down Expand Up @@ -296,23 +302,22 @@ public static void main(String[] args) throws Exception {
// Option validation is not yet automatic, we make an explicit call here.
Options.Methods.validateOptions(options);

// Set up the prototype request and auth.
StreamVariantsRequest prototype = CallSetNamesOptions.Methods.getRequestPrototype(options);
OfflineAuth auth = GenomicsOptions.Methods.getGenomicsAuth(options);
Genomics genomics = GenomicsFactory.builder().build().fromOfflineAuth(auth);

List<String> callSetIds = ImmutableList.of();
if (!Strings.isNullOrEmpty(options.getCallSetIds().trim())) {
callSetIds = ImmutableList.copyOf(options.getCallSetIds().split(","));
}
List<String> callSetIds = CallSetNamesOptions.Methods.getCallSetIds(options);
List<String> transcriptSetIds =
validateAnnotationSetsFlag(genomics, options.getTranscriptSetIds(), "TRANSCRIPT");
List<String> variantAnnotationSetIds =
validateAnnotationSetsFlag(genomics, options.getVariantAnnotationSetIds(), "VARIANT");
validateRefsetForAnnotationSets(genomics, transcriptSetIds);

List<StreamVariantsRequest> requests = options.isAllReferences() ?
ShardUtils.getVariantRequests(options.getVariantSetId(), ShardUtils.SexChromosomeFilter.EXCLUDE_XY,
ShardUtils.getVariantRequests(prototype, ShardUtils.SexChromosomeFilter.EXCLUDE_XY,
options.getBasesPerShard(), auth) :
ShardUtils.getVariantRequests(options.getVariantSetId(), options.getReferences(), options.getBasesPerShard());
ShardUtils.getVariantRequests(prototype, options.getBasesPerShard(), options.getReferences());

Pipeline p = Pipeline.create(options);
p.getCoderRegistry().setFallbackCoderProvider(GenericJsonCoder.PROVIDER);
Expand Down
Loading