diff --git a/pom.xml b/pom.xml index 4ae0d75..524fbe1 100644 --- a/pom.xml +++ b/pom.xml @@ -65,11 +65,15 @@ - + + com.google.protobuf + protobuf-java + ${protobuf.java.version} + io.netty netty-codec-http2 - 4.1.0.Final + ${io.netty.version} io.grpc @@ -82,58 +86,10 @@ - - com.google.cloud.dataflow - google-cloud-dataflow-java-sdk-all - ${google.dataflow.version} - - - com.google.api-client - google-api-client - ${google.api.version} - - - - com.google.guava - guava-jdk5 - - - - - com.google.oauth-client - google-oauth-client - ${google.api.version} - - - com.google.oauth-client - google-oauth-client-jetty - ${google.api.version} - - - com.google.oauth-client - google-oauth-client-java6 - ${google.api.version} - - - com.google.http-client - google-http-client - ${google.api.version} - - - com.google.http-client - google-http-client-jackson2 - ${google.api.version} - - - com.google.apis - google-api-services-genomics - ${google.api.genomics.version} - com.google.cloud.genomics google-genomics-utils - v1-0.3 + v1-0.6 @@ -144,9 +100,9 @@ - com.google.guava - guava - ${google.guava.version} + com.google.cloud.dataflow + google-cloud-dataflow-java-sdk-all + ${google.dataflow.version} gov.nist.math.jama @@ -158,11 +114,6 @@ commons-math3 3.2 - - org.reflections - reflections - 0.9.9 - junit junit @@ -175,45 +126,11 @@ 1.3 test - - org.mortbay.jetty - jetty - ${jetty.version} - runtime - - - org.mortbay.jetty - jetty-util - ${jetty.version} - runtime - - - javax.servlet - servlet-api - 2.5 - runtime - - - com.github.samtools - htsjdk - 1.128 - - - org.testng - testng - - - com.google.cloud.genomics gatk-tools-java 1.0 - - com.google.protobuf - protobuf-java - ${protobuf.java.version} - @@ -378,7 +295,8 @@ 1.6.0 - 0.14.0 + 4.1.1.Final + 0.15.0 3.0.0-beta-1 1.7 diff --git a/src/main/java/com/google/cloud/genomics/dataflow/functions/JoinNonVariantSegmentsWithVariants.java b/src/main/java/com/google/cloud/genomics/dataflow/functions/JoinNonVariantSegmentsWithVariants.java index 279897b..1794af0 100644 --- a/src/main/java/com/google/cloud/genomics/dataflow/functions/JoinNonVariantSegmentsWithVariants.java +++ b/src/main/java/com/google/cloud/genomics/dataflow/functions/JoinNonVariantSegmentsWithVariants.java @@ -28,7 +28,6 @@ import com.google.cloud.genomics.utils.grpc.VariantEmitterStrategy; import com.google.cloud.genomics.utils.grpc.VariantMergeStrategy; import com.google.cloud.genomics.utils.grpc.VariantStreamIterator; -import com.google.cloud.genomics.utils.grpc.VariantUtils; import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; import com.google.genomics.v1.StreamVariantsRequest; @@ -105,7 +104,6 @@ public PCollection apply(PCollection input) { return input .apply(ParDo.of(new BinVariantsFn())) .apply(GroupByKey., Variant>create()) - .apply(ParDo.of(new RetrieveWindowOfVariantsFn())) .apply(ParDo.of(new CombineVariantsFn())); } @@ -125,27 +123,13 @@ public static final long getEndBin(int binSize, Variant variant) { public void processElement(ProcessContext context) { Options options = context.getPipelineOptions().as(Options.class); + int binSize = options.getBinSize(); Variant variant = context.element(); - long startBin = getStartBin(options.getBinSize(), variant); - long endBin = - VariantUtils.IS_NON_VARIANT_SEGMENT.apply(variant) ? getEndBin(options.getBinSize(), - variant) : startBin; - for (long bin = startBin; bin <= endBin; bin++) { - context.output(KV.of(KV.of(variant.getReferenceName(), bin), variant)); - } - } - } - - static final class RetrieveWindowOfVariantsFn extends - DoFn, Iterable>, Iterable> { - - @Override - public void processElement(ProcessContext context) { - - // The upper bound on number of variants in the iterable is dependent upon the binSize - // used in the prior step to construct the key. - KV, Iterable> kv = context.element(); - context.output(kv.getValue()); + long startBin = getStartBin(binSize, variant); + long endBin = getEndBin(binSize, variant); + for (long bin = startBin; bin <= endBin; bin++) { + context.output(KV.of(KV.of(variant.getReferenceName(), bin * binSize), variant)); + } } } } @@ -153,6 +137,9 @@ public void processElement(ProcessContext context) { /** * Use this transform when working with a collection of sites across the genome. * + * It passes the data onto the next step retaining the ordering imposed by the + * Google Genomics API which is sorted by (variantset id, contig, start pos, variant id). + * * The amount of RAM needed during the combine step is controlled by the number of * base pairs between the start and end position of each site. * Compared to the BinShuffleAndCombineTransform, this transform has: @@ -183,7 +170,7 @@ public PCollection apply(PCollection input) { .apply(ParDo.of(new CombineVariantsFn())); } - static final class RetrieveFn extends DoFn> { + public static final class RetrieveFn extends DoFn, Iterable>> { private final OfflineAuth auth; private String fields; @@ -194,11 +181,12 @@ public RetrieveFn(OfflineAuth auth, String fields) { } @Override - public void processElement(DoFn>.ProcessContext context) + public void processElement(DoFn, Iterable>>.ProcessContext context) throws Exception { + StreamVariantsRequest request = context.element(); - Iterator iter = VariantStreamIterator.enforceShardBoundary(auth, context.element(), - ShardBoundary.Requirement.NON_VARIANT_OVERLAPS, fields); + Iterator iter = VariantStreamIterator.enforceShardBoundary(auth, request, + ShardBoundary.Requirement.OVERLAPS, fields); if (iter.hasNext()) { // We do have some data overlapping this site. @@ -206,17 +194,22 @@ public void processElement(DoFn>.Proces while (iter.hasNext()) { allVariantsForRequest.add(iter.next().getVariantsList()); } - context.output(Iterables.concat(allVariantsForRequest)); + context.output(KV.of(KV.of(request.getReferenceName(), request.getStart()), Iterables.concat(allVariantsForRequest))); } } } } - public static final class CombineVariantsFn extends DoFn, Variant> { + /** + * Pass a window of variants from Dataflow to the merge strategy implementation. + * + * See {@link VariantMergeStrategy} for more detail. + */ + public static final class CombineVariantsFn extends DoFn, Iterable>, Variant> { private VariantMergeStrategy merger; @Override - public void startBundle(DoFn, Variant>.Context c) throws Exception { + public void startBundle(DoFn, Iterable>, Variant>.Context c) throws Exception { super.startBundle(c); Options options = c.getPipelineOptions().as(Options.class); merger = options.getVariantMergeStrategy().newInstance(); @@ -224,14 +217,19 @@ public void startBundle(DoFn, Variant>.Context c) throws Excep @Override public void processElement(ProcessContext context) throws Exception { - merger.merge(context.element(), new DataflowVariantEmitter(context)); + merger.merge(context.element().getKey().getValue(), context.element().getValue(), new DataflowVariantEmitter(context)); } } + /** + * Emit a merged variant to the Dataflow pipeline. + * + * See {@link VariantEmitterStrategy} for more detail. + */ public static class DataflowVariantEmitter implements VariantEmitterStrategy { - private final DoFn, Variant>.ProcessContext context; + private final DoFn, Iterable>, Variant>.ProcessContext context; - public DataflowVariantEmitter(DoFn, Variant>.ProcessContext context) { + public DataflowVariantEmitter(DoFn, Iterable>, Variant>.ProcessContext context) { this.context = context; } diff --git a/src/main/java/com/google/cloud/genomics/dataflow/pipelines/IdentityByState.java b/src/main/java/com/google/cloud/genomics/dataflow/pipelines/IdentityByState.java index 5bc96cd..ea222e3 100644 --- a/src/main/java/com/google/cloud/genomics/dataflow/pipelines/IdentityByState.java +++ b/src/main/java/com/google/cloud/genomics/dataflow/pipelines/IdentityByState.java @@ -121,10 +121,10 @@ public static void main(String[] args) throws Exception { new JoinNonVariantSegmentsWithVariants.RetrieveAndCombineTransform(auth, VARIANT_FIELDS)); } else { processedVariants = requests.apply( - new VariantStreamer(auth, ShardBoundary.Requirement.NON_VARIANT_OVERLAPS, VARIANT_FIELDS)); + new VariantStreamer(auth, ShardBoundary.Requirement.STRICT, VARIANT_FIELDS)); } } else { - // Computing IBS over genomic region(s) or the whole genome. + // Compute IBS over genomic region(s) or the whole genome. List requests = options.isAllReferences() ? ShardUtils.getVariantRequests(prototype, ShardUtils.SexChromosomeFilter.EXCLUDE_XY, options.getBasesPerShard(), auth) : diff --git a/src/test/java/com/google/cloud/genomics/dataflow/functions/JoinNonVariantSegmentsWithVariantsTest.java b/src/test/java/com/google/cloud/genomics/dataflow/functions/JoinNonVariantSegmentsWithVariantsTest.java index f5d7233..de29252 100644 --- a/src/test/java/com/google/cloud/genomics/dataflow/functions/JoinNonVariantSegmentsWithVariantsTest.java +++ b/src/test/java/com/google/cloud/genomics/dataflow/functions/JoinNonVariantSegmentsWithVariantsTest.java @@ -150,10 +150,10 @@ public void testIsOverlapping() { @Test public void testCombineVariantsFn() { - DoFnTester, Variant> fn = + DoFnTester, Iterable>, Variant> fn = DoFnTester.of(new JoinNonVariantSegmentsWithVariants.CombineVariantsFn()); - Assert.assertThat(fn.processBatch(Arrays.asList(input)), + Assert.assertThat(fn.processBatch(KV.of(KV.of("chr7", 200000L), (Iterable) Arrays.asList(input))), CoreMatchers.hasItems(expectedSnp1, expectedSnp2, expectedInsert)); } @@ -163,14 +163,14 @@ public void testBinVariantsFn() { DoFnTester.of(new JoinNonVariantSegmentsWithVariants.BinShuffleAndCombineTransform.BinVariantsFn()); List, Variant>> binVariantsOutput = binVariantsFn.processBatch(input); - assertThat(binVariantsOutput, CoreMatchers.hasItem(KV.of(KV.of("chr7", 200L), snp1))); - assertThat(binVariantsOutput, CoreMatchers.hasItem(KV.of(KV.of("chr7", 200L), snp2))); - assertThat(binVariantsOutput, CoreMatchers.hasItem(KV.of(KV.of("chr7", 200L), insert))); - assertThat(binVariantsOutput, CoreMatchers.hasItem(KV.of(KV.of("chr7", 199L), blockRecord1))); - assertThat(binVariantsOutput, CoreMatchers.hasItem(KV.of(KV.of("chr7", 200L), blockRecord1))); - assertThat(binVariantsOutput, CoreMatchers.hasItem(KV.of(KV.of("chr7", 201L), blockRecord1))); - assertThat(binVariantsOutput, CoreMatchers.hasItem(KV.of(KV.of("chr7", 202L), blockRecord1))); - assertThat(binVariantsOutput, CoreMatchers.hasItem(KV.of(KV.of("chr7", 200L), blockRecord2))); + assertThat(binVariantsOutput, CoreMatchers.hasItem(KV.of(KV.of("chr7", 200000L), snp1))); + assertThat(binVariantsOutput, CoreMatchers.hasItem(KV.of(KV.of("chr7", 200000L), snp2))); + assertThat(binVariantsOutput, CoreMatchers.hasItem(KV.of(KV.of("chr7", 200000L), insert))); + assertThat(binVariantsOutput, CoreMatchers.hasItem(KV.of(KV.of("chr7", 199000L), blockRecord1))); + assertThat(binVariantsOutput, CoreMatchers.hasItem(KV.of(KV.of("chr7", 200000L), blockRecord1))); + assertThat(binVariantsOutput, CoreMatchers.hasItem(KV.of(KV.of("chr7", 201000L), blockRecord1))); + assertThat(binVariantsOutput, CoreMatchers.hasItem(KV.of(KV.of("chr7", 202000L), blockRecord1))); + assertThat(binVariantsOutput, CoreMatchers.hasItem(KV.of(KV.of("chr7", 200000L), blockRecord2))); assertEquals(8, binVariantsOutput.size()); } diff --git a/src/test/java/com/google/cloud/genomics/dataflow/pipelines/IdentifyPrivateVariantsITCase.java b/src/test/java/com/google/cloud/genomics/dataflow/pipelines/IdentifyPrivateVariantsITCase.java index 315fadd..0b3e835 100644 --- a/src/test/java/com/google/cloud/genomics/dataflow/pipelines/IdentifyPrivateVariantsITCase.java +++ b/src/test/java/com/google/cloud/genomics/dataflow/pipelines/IdentifyPrivateVariantsITCase.java @@ -164,7 +164,7 @@ private void testBase(String[] ARGS, String[] expectedResult) throws Exception { } // Check the pipeline results. - assertEquals(results.size(), expectedResult.length); + assertEquals(expectedResult.length, results.size()); assertThat(results, CoreMatchers.allOf(CoreMatchers.hasItems(expectedResult))); } diff --git a/src/test/java/com/google/cloud/genomics/dataflow/pipelines/IdentityByStateITCase.java b/src/test/java/com/google/cloud/genomics/dataflow/pipelines/IdentityByStateITCase.java index 1d27da0..e2b033b 100644 --- a/src/test/java/com/google/cloud/genomics/dataflow/pipelines/IdentityByStateITCase.java +++ b/src/test/java/com/google/cloud/genomics/dataflow/pipelines/IdentityByStateITCase.java @@ -46,7 +46,7 @@ * mvn install -DskipITs * * To run one test: - * mvn -Dit.test=IdentityByStateITCase#testStreamingLocal verify + * mvn -Dit.test=IdentityByStateITCase#testLocal verify * * See also http://maven.apache.org/surefire/maven-failsafe-plugin/examples/single-test.html */ @@ -357,7 +357,7 @@ public void tearDown() throws Exception { } @Test - public void testSitesFilepathStreamingLocal() throws Exception { + public void testSitesFilepathLocal() throws Exception { String[] ARGS = { "--sitesFilepath=" + SITES_FILEPATH, @@ -369,7 +369,7 @@ public void testSitesFilepathStreamingLocal() throws Exception { } @Test - public void testStreamingLocal() throws Exception { + public void testLocal() throws Exception { String[] ARGS = { "--references=" + helper.PLATINUM_GENOMES_BRCA1_REFERENCES, "--variantSetId=" + helper.PLATINUM_GENOMES_DATASET, @@ -380,7 +380,7 @@ public void testStreamingLocal() throws Exception { } @Test - public void testStreamingCloud() throws Exception { + public void testCloud() throws Exception { String[] ARGS = { "--references=" + helper.PLATINUM_GENOMES_BRCA1_REFERENCES, "--variantSetId=" + helper.PLATINUM_GENOMES_DATASET, diff --git a/src/test/resources/com/google/cloud/genomics/dataflow/pipelines/family.txt b/src/test/resources/com/google/cloud/genomics/dataflow/pipelines/family.txt index e8c82d0..c7089eb 100644 --- a/src/test/resources/com/google/cloud/genomics/dataflow/pipelines/family.txt +++ b/src/test/resources/com/google/cloud/genomics/dataflow/pipelines/family.txt @@ -1,13 +1,13 @@ NA12877 -NA12893 -NA12885 -NA12887 +NA12878 +NA12879 +NA12880 NA12881 -NA12888 NA12882 -NA12879 NA12883 -NA12886 -NA12878 NA12884 -NA12880 +NA12885 +NA12886 +NA12887 +NA12888 +NA12893