Skip to content

Commit

Permalink
Merge pull request #869 from broadinstitute/jp_bigger_apiref
Browse files Browse the repository at this point in the history
RefAPISource support for multiple pages, with corresponding test code.
  • Loading branch information
jean-philippe-martin committed Aug 28, 2015
2 parents 8513941 + da4d697 commit bb424f1
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@
import com.google.cloud.genomics.dataflow.utils.GenomicsOptions;
import com.google.cloud.genomics.utils.GenomicsFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.primitives.Bytes;
import htsjdk.samtools.SAMSequenceDictionary;
import htsjdk.samtools.SAMSequenceRecord;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.broadinstitute.hellbender.exceptions.GATKException;
import org.broadinstitute.hellbender.exceptions.UserException;
import org.broadinstitute.hellbender.utils.SimpleInterval;
import org.broadinstitute.hellbender.utils.Utils;
Expand All @@ -26,11 +26,7 @@
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.*;


/**
Expand All @@ -51,7 +47,7 @@ public class RefAPISource implements ReferenceSource, Serializable {
public static final String URL_PREFIX = "gg://reference/";

private static final long serialVersionUID = 1L;
private static final int pageSize = 1000000; // The number results per request (the default is 200k).
private static final int defaultPageSize = 1_000_000; // The number of results per request
private final static Logger logger = LogManager.getLogger(RefAPISource.class);

// With our current design, there are three steps required (1) Create a map from reference name to Id,
Expand Down Expand Up @@ -87,50 +83,64 @@ public static String getReferenceSetID(String url) {
}


/**
* Query the Google Genomics API for reference bases spanning the specified interval from the specified
* reference name.
*
* Footnote: queries larger than pageSize will be truncated.
*
* @param pipelineOptions -- are used to get the credentials necessary to call the Genomics API
* @param interval - the range of bases to retrieve.
* @return the reference bases specified by interval and apiData (using the Google Genomics API).
*/
@Override
public ReferenceBases getReferenceBases(final PipelineOptions pipelineOptions, final SimpleInterval interval) {
Utils.nonNull(pipelineOptions);
Utils.nonNull(interval);

if (genomicsService == null) {
genomicsService = createGenomicsService(pipelineOptions);
}
if ( !referenceNameToIdTable.containsKey(interval.getContig()) ) {
throw new UserException("Contig " + interval.getContig() + " not in our set of reference names for this reference source");
}

try {
final Genomics.References.Bases.List listRequest = genomicsService.references().bases().list(referenceNameToIdTable.get(interval.getContig())).setPageSize(pageSize);
// We're subtracting 1 with the start but not the end because GA4GH is zero-based (inclusive,exclusive)
// for its intervals.
listRequest.setStart((long) interval.getGA4GHStart());
listRequest.setEnd((long) interval.getGA4GHEnd());

final ListBasesResponse result = listRequest.execute();
if ( result.getSequence() == null ) {
throw new UserException("No reference bases returned in query for interval " + interval + ". Is the interval valid for this reference?");
}
byte[] bases = result.getSequence().getBytes();
if (bases.length != interval.size()) {
throw new GATKException("Did not get all bases for query, is the query longer than pageSize?");
}

return new ReferenceBases(bases, interval);
}
catch ( IOException e ) {
throw new UserException("Query to genomics service failed for reference interval " + interval, e);
}
}
/**
* Query the Google Genomics API for reference bases spanning the specified interval from the specified
* reference name.
*
* @param pipelineOptions -- are used to get the credentials necessary to call the Genomics API
* @param interval - the range of bases to retrieve.
* @return the reference bases specified by interval and apiData (using the Google Genomics API).
*/
@Override
public ReferenceBases getReferenceBases(final PipelineOptions pipelineOptions, final SimpleInterval interval) {
return getReferenceBases(pipelineOptions, interval, defaultPageSize);
}

/**
* Query the Google Genomics API for reference bases spanning the specified interval from the specified
* reference name.
*
* @param pipelineOptions -- are used to get the credentials necessary to call the Genomics API
* @param interval - the range of bases to retrieve.
* @return the reference bases specified by interval and apiData (using the Google Genomics API).
*/
public ReferenceBases getReferenceBases(final PipelineOptions pipelineOptions, final SimpleInterval interval, int pageSize) {
Utils.nonNull(pipelineOptions);
Utils.nonNull(interval);

if (genomicsService == null) {
genomicsService = createGenomicsService(pipelineOptions);
}
if (!referenceNameToIdTable.containsKey(interval.getContig())) {
throw new UserException("Contig " + interval.getContig() + " not in our set of reference names for this reference source");
}

try {
final Genomics.References.Bases.List listRequest = genomicsService.references().bases().list(referenceNameToIdTable.get(interval.getContig())).setPageSize(pageSize);
listRequest.setStart(interval.getGA4GHStart());
listRequest.setEnd(interval.getGA4GHEnd());
ListBasesResponse result = listRequest.execute();
if (result.getSequence() == null) {
throw new UserException("No reference bases returned in query for interval " + interval + ". Is the interval valid for this reference?");
}
byte[] received = result.getSequence().getBytes();
byte[] bases = received;
if (received.length < interval.size()) {
ArrayList<byte[]> blobs = new ArrayList<byte[]>();
blobs.add(received);
while (result.getNextPageToken() != null) {
listRequest.setPageToken(result.getNextPageToken());
result = listRequest.execute();
blobs.add(result.getSequence().getBytes());
}
final byte[][] resultsArray = blobs.toArray(new byte[blobs.size()][]);
bases = Bytes.concat(resultsArray);
}
return new ReferenceBases(bases, interval);
} catch (IOException e) {
throw new UserException("Query to genomics service failed for reference interval " + interval, e);
}
}

/**
* Return a sequence dictionary for the reference.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ public final int getStart(){
/**
* @return the 0-based start position (from the GA4GH spec).
*/
public final int getGA4GHStart() {return start - 1; }
public final long getGA4GHStart() {return start - 1; }

/**
* @return the 1-based closed-ended end position of the interval on the contig.
Expand All @@ -186,7 +186,7 @@ public final int getEnd(){
/**
* @return the typical end spans are [zero-start,end) (from the GA4GH spec).
*/
public final int getGA4GHEnd() { return end; }
public final long getGA4GHEnd() { return end; }

/**
* @return number of bases covered by this interval (will always be > 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,15 @@ public SimpleInterval getInterval() {

/**
* getSubset returns only the bases of the interval passed in.
* @param interval, the subset to be returned
* @param subsetInterval, the subset to be returned
* @return the subset of ReferenceBases
*/
public ReferenceBases getSubset(SimpleInterval interval) {
if (!this.interval.contains(interval)) {
throw new GATKException("Reference doesn't match input interval");
public ReferenceBases getSubset(SimpleInterval subsetInterval) {
if (!this.interval.contains(subsetInterval)) {
throw new GATKException("Reference doesn't match input interval (asked for "+subsetInterval.toString()+" but we have "+this.interval+")");
}
int start = interval.getStart() - this.interval.getStart();
int end = interval.getEnd() - this.interval.getStart();
return new ReferenceBases(Arrays.copyOfRange(this.bases, start, end + 1), interval);
int start = subsetInterval.getStart() - this.interval.getStart();
int end = subsetInterval.getEnd() - this.interval.getStart();
return new ReferenceBases(Arrays.copyOfRange(this.bases, start, end + 1), subsetInterval);
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package org.broadinstitute.hellbender.engine.dataflow;
package org.broadinstitute.hellbender.engine.dataflow.datasources;

import com.google.api.services.genomics.model.Reference;
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.testing.TestPipeline;
import com.google.cloud.genomics.dataflow.utils.GenomicsOptions;

import htsjdk.samtools.SAMSequenceDictionary;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.broadinstitute.hellbender.engine.dataflow.datasources.RefAPISource;
import org.broadinstitute.hellbender.exceptions.UserException;
import org.broadinstitute.hellbender.utils.SimpleInterval;
import org.broadinstitute.hellbender.utils.reference.ReferenceBases;
Expand All @@ -18,21 +19,32 @@

import java.util.HashMap;
import java.util.Map;
import com.google.api.services.genomics.model.Reference;

public class RefAPISourceUnitTest extends BaseTest {

private ReferenceBases queryReferenceAPI( final String referenceName, final SimpleInterval interval ) {
GenomicsOptions options = PipelineOptionsFactory.create().as(GenomicsOptions.class);
options.setApiKey(getDataflowTestApiKey());
options.setProject(getDataflowTestProject());
private ReferenceBases queryReferenceAPI( final String referenceName, final SimpleInterval interval, int pageSize ) {
final Pipeline p = setupPipeline();

final Pipeline p = TestPipeline.create(options); // We don't use GATKTestPipeline because we need specific options.
RefAPISource refAPISource = new RefAPISource(p.getOptions(), RefAPISource.URL_PREFIX + referenceName);
return refAPISource.getReferenceBases(p.getOptions(), interval, pageSize);
}

private ReferenceBases queryReferenceAPI( final String referenceName, final SimpleInterval interval ) {
final Pipeline p = setupPipeline();

RefAPISource refAPISource = new RefAPISource(p.getOptions(), RefAPISource.URL_PREFIX + referenceName);
return refAPISource.getReferenceBases(p.getOptions(), interval);
}

private Pipeline setupPipeline() {
GenomicsOptions options = PipelineOptionsFactory.create().as(GenomicsOptions.class);
options.setApiKey(getDataflowTestApiKey());
options.setProject(getDataflowTestProject());

final Pipeline p = TestPipeline.create(options); // We don't use GATKTestPipeline because we need specific options.
return p;
}

@DataProvider(name = "sortData")
public Object[][] createSortData() {
return new String[][] {
Expand Down Expand Up @@ -112,6 +124,66 @@ public void testReferenceSourceQueryWithNullInterval() {
}


@Test(groups = "cloud")
public void testReferenceSourceMultiPageQuery() {
final int mio = 1_000_000;
final ReferenceBases bases1 = queryReferenceAPI(RefAPISource.HS37D5_REF_ID, new SimpleInterval("1", 50000, 50000 + mio + 50));
final ReferenceBases bases2 = queryReferenceAPI(RefAPISource.HS37D5_REF_ID, new SimpleInterval("1", 50025, 50025 + mio + 50));

Assert.assertNotNull(bases1);
Assert.assertNotNull(bases1.getBases());
Assert.assertNotNull(bases2);
Assert.assertNotNull(bases2.getBases());
// those SimpleIntervals include the end, hence +1
Assert.assertEquals(bases1.getBases().length, mio + 50 + 1, "Wrong number of bases returned");
Assert.assertEquals(bases2.getBases().length, mio + 50 + 1, "Wrong number of bases returned");

// grab some bases around the seam
ReferenceBases seam1 = bases1.getSubset(new SimpleInterval("1", 50000 + mio - 100, 50000 + mio + 50));
ReferenceBases seam2 = bases2.getSubset(new SimpleInterval("1", 50000 + mio - 100, 50000 + mio + 50));

Assert.assertEquals(seam1.getBases(), seam2.getBases(), "seam doesn't match (paging bug?)");

}

@Test(groups = "cloud")
public void testReferenceSourceMultiSmallPagesQuery() {
int pageSize = 300;
// not a multiple of pageSize (testing the fetching of a partial page)
final ReferenceBases bases1 = queryReferenceAPI(RefAPISource.HS37D5_REF_ID, new SimpleInterval("1", 50000, 51000), pageSize);
// multiple of pageSize (testing ending on an exact page boundary)
final ReferenceBases bases2 = queryReferenceAPI(RefAPISource.HS37D5_REF_ID, new SimpleInterval("1", 50025, 50924), pageSize);

Assert.assertNotNull(bases1);
Assert.assertNotNull(bases1.getBases());
Assert.assertNotNull(bases2);
Assert.assertNotNull(bases2.getBases());
// those SimpleIntervals include the end, hence +1
Assert.assertEquals(bases1.getBases().length, 1001, "Wrong number of bases returned");
Assert.assertEquals(bases2.getBases().length, 900, "Wrong number of bases returned");

// grab some bases they should have in common
ReferenceBases seam1 = bases1.getSubset(new SimpleInterval("1", 50025, 50902));
ReferenceBases seam2 = bases2.getSubset(new SimpleInterval("1", 50025, 50902));

Assert.assertEquals(seam1.getBases(), seam2.getBases(), "seam doesn't match (paging bug?)");
}

@Test(groups = "cloud")
public void testReferenceSourceVaryingPageSizeQuery() {

SimpleInterval interval = new SimpleInterval("1", 50000, 50050);
final ReferenceBases bases1 = queryReferenceAPI(RefAPISource.HS37D5_REF_ID, interval);
final ReferenceBases bases2 = queryReferenceAPI(RefAPISource.HS37D5_REF_ID, interval, 10);

Assert.assertNotNull(bases1);
Assert.assertNotNull(bases1.getBases());
Assert.assertNotNull(bases2);
Assert.assertNotNull(bases2.getBases());
Assert.assertEquals(bases1.getBases(), bases2.getBases(), "bases should match despite different paging size");
}


private Map<String, Reference> createDummyReferenceMap(String[] contig) {
HashMap<String,Reference> fake = new HashMap<>();
for (String s : contig) {
Expand Down

0 comments on commit bb424f1

Please sign in to comment.