Skip to content
This repository was archived by the owner on Oct 29, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 12 additions & 94 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,15 @@
</distributionManagement>

<dependencies>
<!-- Remove this later https://github.com/googlegenomics/dataflow-java/issues/197 -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.java.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http2</artifactId>
<version>4.1.0.Final</version>
<version>${io.netty.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
Expand All @@ -82,58 +86,10 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.cloud.dataflow</groupId>
<artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
<version>${google.dataflow.version}</version>
</dependency>
<dependency>
<groupId>com.google.api-client</groupId>
<artifactId>google-api-client</artifactId>
<version>${google.api.version}</version>
<exclusions>
<!-- Exclude an old version of guava which is being pulled
in by a transitive dependency google-api-client. -->
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava-jdk5</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.oauth-client</groupId>
<artifactId>google-oauth-client</artifactId>
<version>${google.api.version}</version>
</dependency>
<dependency>
<groupId>com.google.oauth-client</groupId>
<artifactId>google-oauth-client-jetty</artifactId>
<version>${google.api.version}</version>
</dependency>
<dependency>
<groupId>com.google.oauth-client</groupId>
<artifactId>google-oauth-client-java6</artifactId>
<version>${google.api.version}</version>
</dependency>
<dependency>
<groupId>com.google.http-client</groupId>
<artifactId>google-http-client</artifactId>
<version>${google.api.version}</version>
</dependency>
<dependency>
<groupId>com.google.http-client</groupId>
<artifactId>google-http-client-jackson2</artifactId>
<version>${google.api.version}</version>
</dependency>
<dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-genomics</artifactId>
<version>${google.api.genomics.version}</version>
</dependency>
<dependency>
<groupId>com.google.cloud.genomics</groupId>
<artifactId>google-genomics-utils</artifactId>
<version>v1-0.3</version>
<version>v1-0.6</version>
<exclusions>
<!-- Exclude an old version of guava which is being pulled
in by a transitive dependency google-api-client. -->
Expand All @@ -144,9 +100,9 @@
</exclusions>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${google.guava.version}</version>
<groupId>com.google.cloud.dataflow</groupId>
<artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
<version>${google.dataflow.version}</version>
</dependency>
<dependency>
<groupId>gov.nist.math.jama</groupId>
Expand All @@ -158,11 +114,6 @@
<artifactId>commons-math3</artifactId>
<version>3.2</version>
</dependency>
<dependency>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
<version>0.9.9</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand All @@ -175,45 +126,11 @@
<version>1.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
<version>${jetty.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
<version>${jetty.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<version>2.5</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.github.samtools</groupId>
<artifactId>htsjdk</artifactId>
<version>1.128</version>
<exclusions>
<exclusion>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.cloud.genomics</groupId>
<artifactId>gatk-tools-java</artifactId>
<version>1.0</version>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the latest version is 1.1 based on this:

https://oss.sonatype.org/#nexus-search;quick~gatk-tools-java

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I'm not going to update this particular dependency at this time.

</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.java.version}</version>
</dependency>
</dependencies>

<profiles>
Expand Down Expand Up @@ -378,7 +295,8 @@
<!-- Work around dataflow, grpc, and protobuf version issues
http://stackoverflow.com/questions/36257708/java-lang-noclassdeffounderror-while-deserializing-protobuf-messages -->
<google.dataflow.version>1.6.0</google.dataflow.version>
<io.grpc.version>0.14.0</io.grpc.version>
<io.netty.version>4.1.1.Final</io.netty.version>
<io.grpc.version>0.15.0</io.grpc.version>
<protobuf.java.version>3.0.0-beta-1</protobuf.java.version>

<java.version>1.7</java.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.google.cloud.genomics.utils.grpc.VariantEmitterStrategy;
import com.google.cloud.genomics.utils.grpc.VariantMergeStrategy;
import com.google.cloud.genomics.utils.grpc.VariantStreamIterator;
import com.google.cloud.genomics.utils.grpc.VariantUtils;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.genomics.v1.StreamVariantsRequest;
Expand Down Expand Up @@ -105,7 +104,6 @@ public PCollection<Variant> apply(PCollection<Variant> input) {
return input
.apply(ParDo.of(new BinVariantsFn()))
.apply(GroupByKey.<KV<String, Long>, Variant>create())
.apply(ParDo.of(new RetrieveWindowOfVariantsFn()))
.apply(ParDo.of(new CombineVariantsFn()));
}

Expand All @@ -125,34 +123,23 @@ public static final long getEndBin(int binSize, Variant variant) {
public void processElement(ProcessContext context) {
Options options =
context.getPipelineOptions().as(Options.class);
int binSize = options.getBinSize();
Variant variant = context.element();
long startBin = getStartBin(options.getBinSize(), variant);
long endBin =
VariantUtils.IS_NON_VARIANT_SEGMENT.apply(variant) ? getEndBin(options.getBinSize(),
variant) : startBin;
for (long bin = startBin; bin <= endBin; bin++) {
context.output(KV.of(KV.of(variant.getReferenceName(), bin), variant));
}
}
}

static final class RetrieveWindowOfVariantsFn extends
DoFn<KV<KV<String, Long>, Iterable<Variant>>, Iterable<Variant>> {

@Override
public void processElement(ProcessContext context) {

// The upper bound on number of variants in the iterable is dependent upon the binSize
// used in the prior step to construct the key.
KV<KV<String, Long>, Iterable<Variant>> kv = context.element();
context.output(kv.getValue());
long startBin = getStartBin(binSize, variant);
long endBin = getEndBin(binSize, variant);
for (long bin = startBin; bin <= endBin; bin++) {
context.output(KV.of(KV.of(variant.getReferenceName(), bin * binSize), variant));
}
}
}
}

/**
* Use this transform when working with a collection of sites across the genome.
*
* It passes the data onto the next step retaining the ordering imposed by the
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/ordering/order

* Google Genomics API which is sorted by (variantset id, contig, start pos, variant id).
*
* The amount of RAM needed during the combine step is controlled by the number of
* base pairs between the start and end position of each site.
* Compared to the BinShuffleAndCombineTransform, this transform has:
Expand Down Expand Up @@ -183,7 +170,7 @@ public PCollection<Variant> apply(PCollection<StreamVariantsRequest> input) {
.apply(ParDo.of(new CombineVariantsFn()));
}

static final class RetrieveFn extends DoFn<StreamVariantsRequest, Iterable<Variant>> {
public static final class RetrieveFn extends DoFn<StreamVariantsRequest, KV<KV<String, Long>, Iterable<Variant>>> {
private final OfflineAuth auth;
private String fields;

Expand All @@ -194,44 +181,55 @@ public RetrieveFn(OfflineAuth auth, String fields) {
}

@Override
public void processElement(DoFn<StreamVariantsRequest, Iterable<Variant>>.ProcessContext context)
public void processElement(DoFn<StreamVariantsRequest, KV<KV<String, Long>, Iterable<Variant>>>.ProcessContext context)
throws Exception {
StreamVariantsRequest request = context.element();

Iterator<StreamVariantsResponse> iter = VariantStreamIterator.enforceShardBoundary(auth, context.element(),
ShardBoundary.Requirement.NON_VARIANT_OVERLAPS, fields);
Iterator<StreamVariantsResponse> iter = VariantStreamIterator.enforceShardBoundary(auth, request,
ShardBoundary.Requirement.OVERLAPS, fields);

if (iter.hasNext()) {
// We do have some data overlapping this site.
List<Iterable<Variant>> allVariantsForRequest = new ArrayList<>();
while (iter.hasNext()) {
allVariantsForRequest.add(iter.next().getVariantsList());
}
context.output(Iterables.concat(allVariantsForRequest));
context.output(KV.of(KV.of(request.getReferenceName(), request.getStart()), Iterables.concat(allVariantsForRequest)));
}
}
}
}

public static final class CombineVariantsFn extends DoFn<Iterable<Variant>, Variant> {
/**
* Pass a window of variants from Dataflow to the merge strategy implementation.
*
* See {@link VariantMergeStrategy} for more detail.
*/
public static final class CombineVariantsFn extends DoFn<KV<KV<String, Long>, Iterable<Variant>>, Variant> {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the extra KV<String, Long>, and for what is would the extra KV in the object be useful as it expands each object? What was wrong with having Iterable<Variant> from before? It would be nice to have some JavaDoc documentation added to this function to provide the user some details regarding its purpose, and why it is built this way.

When adding the JavaDoc, it would be nice to say more than that the reference chromosome and start position is required, but more along the lines of why and how it would be used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question! Its covered in the JavaDoc for the merge strategy. I added a JavaDoc here to direct readers there.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for adding the documentation reference, though there is still some small confusion that I commented at the following post:

#206 (comment)

Thanks,
`p

private VariantMergeStrategy merger;

@Override
public void startBundle(DoFn<Iterable<Variant>, Variant>.Context c) throws Exception {
public void startBundle(DoFn<KV<KV<String, Long>, Iterable<Variant>>, Variant>.Context c) throws Exception {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above please regarding JavaDoc and reasoning.

super.startBundle(c);
Options options = c.getPipelineOptions().as(Options.class);
merger = options.getVariantMergeStrategy().newInstance();
}

@Override
public void processElement(ProcessContext context) throws Exception {
merger.merge(context.element(), new DataflowVariantEmitter(context));
merger.merge(context.element().getKey().getValue(), context.element().getValue(), new DataflowVariantEmitter(context));
}
}

/**
* Emit a merged variant to the Dataflow pipeline.
*
* See {@link VariantEmitterStrategy} for more detail.
*/
public static class DataflowVariantEmitter implements VariantEmitterStrategy {
private final DoFn<Iterable<Variant>, Variant>.ProcessContext context;
private final DoFn<KV<KV<String, Long>, Iterable<Variant>>, Variant>.ProcessContext context;

public DataflowVariantEmitter(DoFn<Iterable<Variant>, Variant>.ProcessContext context) {
public DataflowVariantEmitter(DoFn<KV<KV<String, Long>, Iterable<Variant>>, Variant>.ProcessContext context) {
this.context = context;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,10 @@ public static void main(String[] args) throws Exception {
new JoinNonVariantSegmentsWithVariants.RetrieveAndCombineTransform(auth, VARIANT_FIELDS));
} else {
processedVariants = requests.apply(
new VariantStreamer(auth, ShardBoundary.Requirement.NON_VARIANT_OVERLAPS, VARIANT_FIELDS));
new VariantStreamer(auth, ShardBoundary.Requirement.STRICT, VARIANT_FIELDS));
}
} else {
// Computing IBS over genomic region(s) or the whole genome.
// Compute IBS over genomic region(s) or the whole genome.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Proposed substitution for the whole sentence:

Computes IBS over one or more genomic regions, or the entire genome.

Feel free to use whole instead of entire if that is preferred.

List<StreamVariantsRequest> requests = options.isAllReferences() ?
ShardUtils.getVariantRequests(prototype, ShardUtils.SexChromosomeFilter.EXCLUDE_XY,
options.getBasesPerShard(), auth) :
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,10 @@ public void testIsOverlapping() {

@Test
public void testCombineVariantsFn() {
DoFnTester<Iterable<Variant>, Variant> fn =
DoFnTester<KV<KV<String, Long>, Iterable<Variant>>, Variant> fn =
DoFnTester.of(new JoinNonVariantSegmentsWithVariants.CombineVariantsFn());

Assert.assertThat(fn.processBatch(Arrays.asList(input)),
Assert.assertThat(fn.processBatch(KV.of(KV.of("chr7", 200000L), (Iterable<Variant>) Arrays.asList(input))),
CoreMatchers.hasItems(expectedSnp1, expectedSnp2, expectedInsert));
}

Expand All @@ -163,14 +163,14 @@ public void testBinVariantsFn() {
DoFnTester.of(new JoinNonVariantSegmentsWithVariants.BinShuffleAndCombineTransform.BinVariantsFn());

List<KV<KV<String, Long>, Variant>> binVariantsOutput = binVariantsFn.processBatch(input);
assertThat(binVariantsOutput, CoreMatchers.hasItem(KV.of(KV.of("chr7", 200L), snp1)));
assertThat(binVariantsOutput, CoreMatchers.hasItem(KV.of(KV.of("chr7", 200L), snp2)));
assertThat(binVariantsOutput, CoreMatchers.hasItem(KV.of(KV.of("chr7", 200L), insert)));
assertThat(binVariantsOutput, CoreMatchers.hasItem(KV.of(KV.of("chr7", 199L), blockRecord1)));
assertThat(binVariantsOutput, CoreMatchers.hasItem(KV.of(KV.of("chr7", 200L), blockRecord1)));
assertThat(binVariantsOutput, CoreMatchers.hasItem(KV.of(KV.of("chr7", 201L), blockRecord1)));
assertThat(binVariantsOutput, CoreMatchers.hasItem(KV.of(KV.of("chr7", 202L), blockRecord1)));
assertThat(binVariantsOutput, CoreMatchers.hasItem(KV.of(KV.of("chr7", 200L), blockRecord2)));
assertThat(binVariantsOutput, CoreMatchers.hasItem(KV.of(KV.of("chr7", 200000L), snp1)));
assertThat(binVariantsOutput, CoreMatchers.hasItem(KV.of(KV.of("chr7", 200000L), snp2)));
assertThat(binVariantsOutput, CoreMatchers.hasItem(KV.of(KV.of("chr7", 200000L), insert)));
assertThat(binVariantsOutput, CoreMatchers.hasItem(KV.of(KV.of("chr7", 199000L), blockRecord1)));
assertThat(binVariantsOutput, CoreMatchers.hasItem(KV.of(KV.of("chr7", 200000L), blockRecord1)));
assertThat(binVariantsOutput, CoreMatchers.hasItem(KV.of(KV.of("chr7", 201000L), blockRecord1)));
assertThat(binVariantsOutput, CoreMatchers.hasItem(KV.of(KV.of("chr7", 202000L), blockRecord1)));
assertThat(binVariantsOutput, CoreMatchers.hasItem(KV.of(KV.of("chr7", 200000L), blockRecord2)));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why create a KV<String, Long> as part of the binVariantsOutput object using chr7, if the Matcher checks if only at least one item of the Iterable matches, which in this case is in the set of {snp1, snp2, insert, blockRecord1, blockRecord2}? Basically what I'm asking, isn't the first KV too complex for a match that would not happen? A String probably would be easier or something else simple like that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a compound key here, (chromosome, start of bin), and the compound key is used in a key value pair. So that is why we wind up with a nested KV.

Copy link

@pgrosu pgrosu Jul 18, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I'm confused. It looks like you never use the chromosome string (i.e. "chr7"), since your VariantMergeStrategy is basically just this:

context.element().getKey().getValue(), context.element().getValue(), new DataflowVariantEmitter(context)

And you are merging the variant block of records starting with a start position, and the rest based on the maximum value in that list. So why not just remove the chromosome reference and just write an offset, as your variants are sorted by position. It would be better to actually utilize the reference with an offset, or even better by a genomic region to intersect by.

I recommend that the combine/filter strategy in utils-java of variants within non-variant segments be rewritten to provide better clarity, and with actually taking advantage of the chromosome and a location based on a region of interest to perform the merge on. It will make the code impossible to maintain as one expands to complex, mixed merges in parallel.

Trust me on this - it will save you major headaches later on,
~p

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The form of the KV is consistent throughout JoinNonVariantSegmentsWithVariants. That code is binning variants based on chromosome and position.

The variant merge strategy is different, it is handed a particular bin of variants to work on. It doesn't make sense to merge variants on different chromosomes so that's why chromosome isn't in the signature.

But your point is well taken that it could use more clarity - thanks for this feedback and I will address it in a future PR!

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the clarification, and that more documentation will be added with a future PR. Of course I agree it doesn't make sense to merge variants on different chromosomes, but to double-check that the reference is not included in the argument list of the called merger.merge() function, I added printouts to JoinNonVariantSegmentsWithVariants. Below is the output, and it does not seem to be collecting the reference information as a way to filter by, but is part of the Variant object that would need to be filtered:

Running com.google.cloud.genomics.dataflow.functions.JoinNonVariantSegmentsWithVariantsTest
The contents of `context.element().getKey().getValue()` are = 199000

The contents of `context.element().getValue()` are = [reference_bases: "A"
calls {
  genotype: 0
  genotype: 0
  call_set_name: "hom sample"
}
calls {
  genotype: -1
  genotype: -1
  call_set_name: "no call sample"
}
end: 202050
reference_name: "chr7"
start: 199005
]
...

What I am proposing are only simplifications to the way things are approached because they will not only be harder to debug later on, but more importantly expanded by users to increase the set of features available to this nice API, and other dependent APIs of Google Genomics.

assertEquals(8, binVariantsOutput.size());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ private void testBase(String[] ARGS, String[] expectedResult) throws Exception {
}

// Check the pipeline results.
assertEquals(results.size(), expectedResult.length);
assertEquals(expectedResult.length, results.size());
assertThat(results,
CoreMatchers.allOf(CoreMatchers.hasItems(expectedResult)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
* mvn install -DskipITs
*
* To run one test:
* mvn -Dit.test=IdentityByStateITCase#testStreamingLocal verify
* mvn -Dit.test=IdentityByStateITCase#testLocal verify
*
* See also http://maven.apache.org/surefire/maven-failsafe-plugin/examples/single-test.html
*/
Expand Down Expand Up @@ -357,7 +357,7 @@ public void tearDown() throws Exception {
}

@Test
public void testSitesFilepathStreamingLocal() throws Exception {
public void testSitesFilepathLocal() throws Exception {

String[] ARGS = {
"--sitesFilepath=" + SITES_FILEPATH,
Expand All @@ -369,7 +369,7 @@ public void testSitesFilepathStreamingLocal() throws Exception {
}

@Test
public void testStreamingLocal() throws Exception {
public void testLocal() throws Exception {
String[] ARGS = {
"--references=" + helper.PLATINUM_GENOMES_BRCA1_REFERENCES,
"--variantSetId=" + helper.PLATINUM_GENOMES_DATASET,
Expand All @@ -380,7 +380,7 @@ public void testStreamingLocal() throws Exception {
}

@Test
public void testStreamingCloud() throws Exception {
public void testCloud() throws Exception {
String[] ARGS = {
"--references=" + helper.PLATINUM_GENOMES_BRCA1_REFERENCES,
"--variantSetId=" + helper.PLATINUM_GENOMES_DATASET,
Expand Down
Loading