diff --git a/pom.xml b/pom.xml
index 9e1c59c..cf93de8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -204,6 +204,12 @@
protobuf-java3.0.0-alpha-3
+
+ org.apache.commons
+ commons-math3
+ 3.2
+ jar
+
diff --git a/src/main/java/com/google/cloud/genomics/dataflow/functions/LikelihoodFn.java b/src/main/java/com/google/cloud/genomics/dataflow/functions/LikelihoodFn.java
new file mode 100644
index 0000000..32fc76c
--- /dev/null
+++ b/src/main/java/com/google/cloud/genomics/dataflow/functions/LikelihoodFn.java
@@ -0,0 +1,175 @@
+/*
+ * Copyright 2015 Google.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.genomics.dataflow.functions;
+
+import com.google.api.services.genomics.model.Position;
+import com.google.cloud.genomics.dataflow.model.ReadCounts;
+import com.google.cloud.genomics.dataflow.model.ReadQualityCount;
+import com.google.cloud.genomics.dataflow.model.ReadQualityCount.Base;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.commons.math3.analysis.UnivariateFunction;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Implementation of the likelihood function in equation (2) in
+ * G. Jun, M. Flickinger, K. N. Hetrick, Kurt, J. M. Romm, K. F. Doheny,
+ * G. Abecasis, M. Boehnke,and H. M. Kang, Detecting and Estimating
+ * Contamination of Human DNA Samples in Sequencing and Array-Based Genotype
+ * Data, American journal of human genetics doi:10.1016/j.ajhg.2012.09.004
+ * (volume 91 issue 5 pp.839 - 848)
+ * http://www.sciencedirect.com/science/article/pii/S0002929712004788
+ */
+public class LikelihoodFn implements UnivariateFunction {
+
+ /** Possible genotypes for a SNP with a single alternate */
+ enum Genotype {
+ REF_HOMOZYGOUS, HETEROZYGOUS, NONREF_HOMOZYGOUS
+ }
+ /** Possible error statuses for a base in a read */
+ enum ReadStatus {
+ CORRECT, ERROR
+ }
+
+ static int toTableIndex(Base observed, Genotype trueGenotype, ReadStatus status) {
+ return observed.ordinal()
+ + Base.values().length * (status.ordinal()
+ + ReadStatus.values().length * trueGenotype.ordinal());
+ }
+
+ /*
+ * P_OBS_GIVEN_TRUTH contains the probability of observing a particular
+ * base (reference, non-reference, or other) given the true genotype
+ * and the error status of the read. See Table 1 of Jun et al.
+ */
+ private static final ImmutableList P_OBS_GIVEN_TRUTH;
+ static {
+ final ImmutableList pTable = ImmutableList.of(
+ // Observed base
+ // REF NONREF OTHER
+ 1.0, 0.0, 0.0, // P(base | REF_HOMOZYGOUS, CORRECT)
+ 0.0, 1.0 / 3.0, 2.0 / 3.0, // P(base | REF_HOMOZYGOUS, ERROR)
+ 0.5, 0.5, 0.0, // P(base | HETEROZYGOUS, CORRECT)
+ 1.0 / 6.0, 1.0 / 6.0, 2.0 / 3.0, // P(base | HETEROZYGOUS, ERROR)
+ 0.0, 1.0, 0.0, // P(base | NONREF_HOMOZYGOUS, CORRECT)
+ 1.0 / 3.0, 0.0, 2.0 / 3.0); // P(base | NONREF_HOMOZYGOUS, ERROR)
+ Iterator itProb = pTable.iterator();
+ ArrayList pCond = new ArrayList<>();
+ pCond.addAll(Collections.nCopies(
+ Base.values().length * ReadStatus.values().length * Genotype.values().length, 0.0));
+ for (
+ Genotype g : ImmutableList.of(Genotype.REF_HOMOZYGOUS, Genotype.HETEROZYGOUS,
+ Genotype.NONREF_HOMOZYGOUS)) {
+ for (ReadStatus r : ImmutableList.of(ReadStatus.CORRECT, ReadStatus.ERROR)) {
+ for (Base b : ImmutableList.of(Base.REF, Base.NONREF, Base.OTHER)) {
+ pCond.set(toTableIndex(b, g, r), itProb.next());
+ }
+ }
+ }
+ P_OBS_GIVEN_TRUTH = ImmutableList.copyOf(pCond);
+ }
+
+ private final Map readCounts;
+
+ /**
+ * Create a new LikelihoodFn instance for a given set of read counts.
+ *
+ * @param readCounts counts of reads by quality for each position of interest
+ */
+ public LikelihoodFn(Map readCounts) {
+ // copy the map so the counts don't get changed out from under us
+ this.readCounts = ImmutableMap.copyOf(readCounts);
+ }
+
+ /**
+ * Compute the probability of a genotype given the reference allele probability.
+ */
+ private static double pGenotype(Genotype g, double refProb) {
+ switch(g) {
+ case REF_HOMOZYGOUS:
+ return refProb * refProb;
+ case HETEROZYGOUS:
+ return refProb * (1.0 - refProb);
+ case NONREF_HOMOZYGOUS:
+ return (1.0 - refProb) * (1.0 - refProb);
+ default:
+ throw new IllegalArgumentException("Illegal genotype");
+ }
+ }
+
+ /**
+ * Look up the probability of an observation conditioned on the underlying state.
+ */
+ private static double probObsGivenTruth(Base observed, Genotype trueGenotype,
+ ReadStatus trueStatus) {
+ return P_OBS_GIVEN_TRUTH.get(toTableIndex(observed, trueGenotype, trueStatus));
+ }
+
+ /**
+ * Compute the likelihood of a contaminant fraction alpha.
+ *
+ *
See equation (2) in Jun et al.
+ */
+ @Override
+ public double value(double alpha) {
+ double logLikelihood = 0.0;
+ for (ReadCounts rc : readCounts.values()) {
+ double refProb = rc.getRefFreq();
+
+ double pPosition = 0.0;
+ for (Genotype trueGenotype1 : Genotype.values()) {
+ double pGenotype1 = pGenotype(trueGenotype1, refProb);
+ for (Genotype trueGenotype2 : Genotype.values()) {
+ double pGenotype2 = pGenotype(trueGenotype2, refProb);
+
+ double pObsGivenGenotype = 1.0;
+
+ for (ReadQualityCount rqc : rc.getReadQualityCounts()) {
+ Base base = rqc.getBase();
+ double pErr = phredToProb(rqc.getQuality());
+ double pObs
+ = ((1.0 - alpha)
+ * probObsGivenTruth(base, trueGenotype1, ReadStatus.CORRECT)
+ + (alpha)
+ * probObsGivenTruth(base, trueGenotype2, ReadStatus.CORRECT)
+ ) * (1.0 - pErr)
+ + ((1.0 - alpha)
+ * probObsGivenTruth(base, trueGenotype1, ReadStatus.ERROR)
+ + (alpha)
+ * probObsGivenTruth(base, trueGenotype2, ReadStatus.ERROR)
+ ) * pErr;
+ pObsGivenGenotype *= Math.pow(pObs, rqc.getCount());
+ }
+ pPosition += pObsGivenGenotype * pGenotype1 * pGenotype2;
+ }
+ }
+ logLikelihood += Math.log(pPosition);
+ }
+ return logLikelihood;
+ }
+
+ /**
+ * Convert a Phred score to a probability.
+ */
+ private static double phredToProb(int phred) {
+ return Math.pow(10.0, -(double) phred / 10.0);
+ }
+}
diff --git a/src/main/java/com/google/cloud/genomics/dataflow/model/AlleleFreq.java b/src/main/java/com/google/cloud/genomics/dataflow/model/AlleleFreq.java
new file mode 100644
index 0000000..da99231
--- /dev/null
+++ b/src/main/java/com/google/cloud/genomics/dataflow/model/AlleleFreq.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2015 Google.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.genomics.dataflow.model;
+
+import com.google.api.client.json.GenericJson;
+import com.google.cloud.dataflow.sdk.coders.DefaultCoder;
+import com.google.cloud.genomics.dataflow.coders.GenericJsonCoder;
+
+import java.util.List;
+
+/**
+ * Contains frequency for a set of alleles for a single position on a single chromosome.
+ * Used in VerifyBamId.
+ */
+@DefaultCoder(GenericJsonCoder.class)
+public class AlleleFreq extends GenericJson {
+ // Strings of length 1 of one of the following bases: ['A', 'C', 'T', 'G'].
+ private String refBases;
+ // List of length 1 of a String of length 1 of one of the following bases: ['A', 'C', 'T', 'G'].
+ private List altBases;
+ // Frequency for a set of alleles for the given position on the given chromosome
+ // in the range [0,1].
+ private double refFreq;
+
+ public String getRefBases() {
+ return refBases;
+ }
+
+ public void setRefBases(String refBases) {
+ this.refBases = refBases;
+ }
+
+ public List getAltBases() {
+ return altBases;
+ }
+
+ public void setAltBases(List altBases) {
+ this.altBases = altBases;
+ }
+
+ public double getRefFreq() {
+ return refFreq;
+ }
+
+ public void setRefFreq(double refFreq) {
+ this.refFreq = refFreq;
+ }
+}
diff --git a/src/main/java/com/google/cloud/genomics/dataflow/model/ReadCounts.java b/src/main/java/com/google/cloud/genomics/dataflow/model/ReadCounts.java
new file mode 100644
index 0000000..b591984
--- /dev/null
+++ b/src/main/java/com/google/cloud/genomics/dataflow/model/ReadCounts.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2015 Google.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.genomics.dataflow.model;
+
+import com.google.api.client.json.GenericJson;
+import com.google.api.client.util.Lists;
+import com.google.cloud.dataflow.sdk.coders.DefaultCoder;
+import com.google.cloud.genomics.dataflow.coders.GenericJsonCoder;
+import com.google.cloud.genomics.dataflow.model.ReadQualityCount.Base;
+
+import java.util.List;
+
+/**
+ * Counts of reads for a single SNP with a single alternate value for use in the
+ * VerifyBamId pipeline. For each SNP, we accumulate counts of bases and quality scores
+ * for associated aligned reads.
+ */
+@DefaultCoder(GenericJsonCoder.class)
+public class ReadCounts extends GenericJson {
+ /**
+ * The count for a single base and quality score is stored in a ReadQualityCount object.
+ */
+ private List readQualityCounts = Lists.newArrayList();
+ /**
+ * refFreq contains the population frequency of the reference allele.
+ */
+ private double refFreq;
+
+ public List getReadQualityCounts() {
+ return readQualityCounts;
+ }
+
+ public void setReadQualityCounts(List readQualityCounts) {
+ this.readQualityCounts = readQualityCounts;
+ }
+
+ public void addReadQualityCount(Base base, int quality, long count) {
+ ReadQualityCount rqc = new ReadQualityCount();
+ rqc.setBase(base);
+ rqc.setCount(count);
+ rqc.setQuality(quality);
+ this.readQualityCounts.add(rqc);
+ }
+
+ public void addReadQualityCount(ReadQualityCount rqc) {
+ this.readQualityCounts.add(rqc);
+ }
+
+ public double getRefFreq() {
+ return refFreq;
+ }
+
+ public void setRefFreq(double refFreq) {
+ this.refFreq = refFreq;
+ }
+}
diff --git a/src/main/java/com/google/cloud/genomics/dataflow/model/ReadQualityCount.java b/src/main/java/com/google/cloud/genomics/dataflow/model/ReadQualityCount.java
new file mode 100644
index 0000000..7cf0669
--- /dev/null
+++ b/src/main/java/com/google/cloud/genomics/dataflow/model/ReadQualityCount.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2015 Google.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.genomics.dataflow.model;
+
+/**
+ * This class is used to count the number of reads aligned to a SNP that show the reference base,
+ * the non-reference base, some other base, or an unknown base. Within each category, we count the
+ * number with each quality score.
+ *
+ * For example, we might have 2 reads that show the reference base with quality 10, 5 reads that
+ * show the non-reference base with quality 60, and 1 read that shows a different nucleotide with
+ * quality 0.
+ */
+public class ReadQualityCount {
+
+ private Base base;
+ private int quality;
+ private long count;
+
+ /**
+ * Which type of Base this ReadQualityCount represents.
+ */
+ public enum Base {
+ UNKNOWN, REF, NONREF, OTHER
+ };
+
+ public Base getBase() {
+ return base;
+ }
+
+ public void setBase(Base base) {
+ this.base = base;
+ }
+
+ public int getQuality() {
+ return quality;
+ }
+
+ public void setQuality(int quality) {
+ this.quality = quality;
+ }
+
+ public long getCount() {
+ return count;
+ }
+
+ public void setCount(long count) {
+ this.count = count;
+ }
+}
diff --git a/src/main/java/com/google/cloud/genomics/dataflow/pipelines/VerifyBamId.java b/src/main/java/com/google/cloud/genomics/dataflow/pipelines/VerifyBamId.java
new file mode 100644
index 0000000..ef861bb
--- /dev/null
+++ b/src/main/java/com/google/cloud/genomics/dataflow/pipelines/VerifyBamId.java
@@ -0,0 +1,616 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.genomics.dataflow.pipelines;
+
+import com.google.api.services.genomics.Genomics;
+import com.google.api.services.genomics.model.Position;
+import com.google.api.services.genomics.model.ReadGroupSet;
+import com.google.api.services.genomics.model.Reference;
+import com.google.api.services.genomics.model.SearchReferencesRequest;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.SerializableCoder;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.Filter;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
+import com.google.cloud.dataflow.sdk.transforms.View;
+import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult;
+import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey;
+import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PCollectionView;
+import com.google.cloud.dataflow.sdk.values.TupleTag;
+import com.google.cloud.genomics.dataflow.coders.GenericJsonCoder;
+import com.google.cloud.genomics.dataflow.functions.LikelihoodFn;
+import com.google.cloud.genomics.dataflow.model.AlleleFreq;
+import com.google.cloud.genomics.dataflow.model.ReadBaseQuality;
+import com.google.cloud.genomics.dataflow.model.ReadBaseWithReference;
+import com.google.cloud.genomics.dataflow.model.ReadCounts;
+import com.google.cloud.genomics.dataflow.model.ReadQualityCount;
+import com.google.cloud.genomics.dataflow.readers.ReadStreamer;
+import com.google.cloud.genomics.dataflow.readers.VariantStreamer;
+import com.google.cloud.genomics.dataflow.utils.DataflowWorkarounds;
+import com.google.cloud.genomics.dataflow.utils.GCSOptions;
+import com.google.cloud.genomics.dataflow.utils.GenomicsDatasetOptions;
+import com.google.cloud.genomics.dataflow.utils.GenomicsOptions;
+import com.google.cloud.genomics.dataflow.utils.ReadUtils;
+import com.google.cloud.genomics.dataflow.utils.Solver;
+import com.google.cloud.genomics.dataflow.utils.VariantUtils;
+import com.google.cloud.genomics.utils.Contig;
+import com.google.cloud.genomics.utils.GenomicsFactory;
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableMultiset;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multiset;
+import com.google.genomics.v1.Read;
+import com.google.genomics.v1.StreamReadsRequest;
+import com.google.genomics.v1.StreamVariantsRequest;
+import com.google.genomics.v1.Variant;
+import com.google.protobuf.ListValue;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.security.GeneralSecurityException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Test a set of reads for contamination.
+ *
+ * Takes a set of specified ReadGroupSets of reads to test and statistics on reference allele
+ * frequencies for SNPs with a single alternative from a specified set of VariantSets.
+ *
+ * Uses the sequence data alone approach described in:
+ * G. Jun, M. Flickinger, K. N. Hetrick, Kurt, J. M. Romm, K. F. Doheny,
+ * G. Abecasis, M. Boehnke,and H. M. Kang, Detecting and Estimating
+ * Contamination of Human DNA Samples in Sequencing and Array-Based Genotype
+ * Data, American journal of human genetics doi:10.1016/j.ajhg.2012.09.004
+ * (volume 91 issue 5 pp.839 - 848)
+ * http://www.sciencedirect.com/science/article/pii/S0002929712004788
+ */
+public class VerifyBamId {
+
+ private static VerifyBamId.VerifyBamIdOptions options;
+ private static Pipeline p;
+ private static GenomicsFactory.OfflineAuth auth;
+
+ /**
+ * Constant that represents the size that user given references will be parsed into for each
+ * individual request.
+ */
+ private static final long SHARD_SIZE = 10000000L;
+
+ /**
+ * String prefix used for sampling hash function
+ */
+ private static final String HASH_PREFIX = "";
+
+ /**
+ * Options required to run this pipeline.
+ */
+ public static interface VerifyBamIdOptions extends GenomicsDatasetOptions, GCSOptions {
+
+ @Description("A comma delimited list of the IDs of the Google Genomics ReadGroupSets this "
+ + "pipeline is working with. Default (empty) indicates all ReadGroupSets in InputDatasetId."
+ + " This(and variantSetIds) or InputDatasetId must be set. InputDatasetId overrides "
+ + "ReadGroupSetIds (if InputDatasetId is set, this field will be ignored).")
+ @Default.String("")
+ String getReadGroupSetIds();
+
+ void setReadGroupSetIds(String readGroupSetId);
+
+ @Description("A comma delimited list of the IDs of the Google Genomics VariantSets this "
+ + "pipeline is working with. Default (empty) indicates all VariantSets in InputDatasetId."
+ + " This(and readGroupSetIds) or InputDatasetId must be set. InputDatasetId overrides "
+ + "VariantSetIds (if InputDatasetId is set, this field will be ignored).")
+ @Default.String("")
+ String getVariantSetIds();
+
+ void setVariantSetIds(String variantSetId);
+
+ @Description("The ID of the Google Genomics Dataset that the pipeline will get its input reads"
+ + " from. Default (empty) means to use ReadGroupSetIds and VariantSetIds instead. This or"
+ + " ReadGroupSetIds and VariantSetIds must be set. InputDatasetId overrides"
+ + " ReadGroupSetIds and VariantSetIds (if this field is set, ReadGroupSetIds and"
+ + " VariantSetIds will be ignored).")
+ @Default.String("")
+ String getInputDatasetId();
+
+ void setInputDatasetId(String inputDatasetId);
+
+ @Description("The minimum allele frequency to use in analysis. Defaults to 0.01.")
+ @Default.Double(0.01)
+ double getMinFrequency();
+
+ void setMinFrequency(double minFrequency);
+
+ @Description("The fraction of positions to check. Defaults to 0.01.")
+ @Default.Double(0.01)
+ double getSamplingFraction();
+
+ void setSamplingFraction(double minFrequency);
+ }
+
+ /**
+ * Run the VerifyBamId algorithm and output the resulting contamination estimate.
+ */
+ public static void main(String[] args) throws GeneralSecurityException, IOException {
+ // Register the options so that they show up via --help
+ PipelineOptionsFactory.register(VerifyBamIdOptions.class);
+ options = PipelineOptionsFactory.fromArgs(args)
+ .withValidation()
+ .as(VerifyBamId.VerifyBamIdOptions.class);
+ // Option validation is not yet automatic, we make an explicit call here.
+ GenomicsDatasetOptions.Methods.validateOptions(options);
+ auth = GenomicsOptions.Methods.getGenomicsAuth(options);
+
+ p = Pipeline.create(options);
+ DataflowWorkarounds.registerGenomicsCoders(p);
+ DataflowWorkarounds.registerCoder(p, Read.class, SerializableCoder.of(Read.class));
+ DataflowWorkarounds.registerCoder(p, Variant.class, SerializableCoder.of(Variant.class));
+ DataflowWorkarounds.registerCoder(p, ReadBaseQuality.class,
+ GenericJsonCoder.of(ReadBaseQuality.class));
+ DataflowWorkarounds.registerCoder(p, AlleleFreq.class, GenericJsonCoder.of(AlleleFreq.class));
+ DataflowWorkarounds.registerCoder(p, ReadCounts.class, GenericJsonCoder.of(ReadCounts.class));
+
+ if (options.getInputDatasetId().isEmpty()
+ && (options.getReadGroupSetIds().isEmpty() || options.getVariantSetIds().isEmpty())) {
+ throw new IllegalArgumentException("InputDatasetId or ReadGroupSetIds and VariantSetIds must"
+ + " be specified");
+ }
+
+ List rgsIds;
+ List vsIds;
+ if (options.getInputDatasetId().isEmpty()) {
+ rgsIds = Lists.newArrayList(options.getReadGroupSetIds().split(","));
+ vsIds = Lists.newArrayList(options.getVariantSetIds().split(","));
+ } else {
+ rgsIds = ReadStreamer.getReadGroupSetIds(options.getInputDatasetId(), auth);
+ vsIds = VariantStreamer.getVariantSetIds(options.getInputDatasetId(), auth);
+ }
+
+ List contigs;
+ String referenceSetId = checkReferenceSetIds(rgsIds);
+ if (options.isAllReferences()) {
+ contigs = getAllReferences(referenceSetId);
+ } else {
+ contigs = parseReferences(options.getReferences(), referenceSetId);
+ }
+
+ /*
+ TODO: We can reduce the number of requests needed to be created by doing the following:
+ 1. Stream the Variants first (rather than concurrently with the Reads). Select a subset of
+ them equal to some threshold (say 50K by default).
+ 2. Create the requests for streaming Reads by running a ParDo over the selected Variants
+ to get their ranges (we only need to stream Reads that overlap the selected Variants).
+ 3. Stream the Reads from the created requests.
+ */
+
+ // Reads in Reads.
+ PCollection reads = getReadsFromAPI(rgsIds);
+
+ // Reads in Variants. TODO potentially provide an option to load the Variants from a file.
+ PCollection variants = getVariantsFromAPI(vsIds);
+
+ PCollection> refFreq = getFreq(variants, options.getMinFrequency());
+
+ PCollection> readCountsTable =
+ combineReads(reads, options.getSamplingFraction(), HASH_PREFIX, refFreq);
+
+ // Converts our results to a single Map of Position keys to ReadCounts values.
+ PCollectionView