Skip to content

Commit

Permalink
serialize auth onto pipeline, default to API_KEY if auth is missing
Browse files Browse the repository at this point in the history
  • Loading branch information
jean-philippe-martin committed Sep 2, 2015
1 parent bb424f1 commit 1ede474
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,62 @@
import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
import com.google.cloud.genomics.dataflow.utils.GCSOptions;
import com.google.cloud.genomics.dataflow.utils.GenomicsOptions;
import com.google.cloud.genomics.utils.GenomicsFactory;
import com.google.common.annotations.VisibleForTesting;
import org.broadinstitute.hellbender.cmdline.Argument;
import org.broadinstitute.hellbender.cmdline.CommandLineParser;
import org.broadinstitute.hellbender.cmdline.CommandLineProgram;
import org.broadinstitute.hellbender.exceptions.GATKException;
import org.broadinstitute.hellbender.exceptions.UserException;
import org.broadinstitute.hellbender.utils.dataflow.DataflowUtils;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.security.GeneralSecurityException;


public abstract class DataflowCommandLineProgram extends CommandLineProgram implements Serializable {
private static final long serialVersionUID = 1l;

// We need authentication options from GCSOptions, and Dataflow options from DataflowPipelineOptions.
// Neither inherits from the other, so we have to put them together like this.
public interface HellbenderDataflowOptions extends GCSOptions, DataflowPipelineOptions {}
// This also includes code to save the OfflineAuth onto the pipelineoptions, so we can get to them later
// (even if they're via client-secrets.json, which workers otherwise wouldn't be able to load).
public interface HellbenderDataflowOptions extends GCSOptions, DataflowPipelineOptions {

static class Methods {
public static void setOfflineAuth(HellbenderDataflowOptions opts, GenomicsFactory.OfflineAuth auth) throws IOException {
ByteArrayOutputStream os = new ByteArrayOutputStream();
try (ObjectOutputStream oos = new ObjectOutputStream(os)) {
oos.writeObject(auth);
oos.flush();
}
opts.setSerializedOfflineAuth(os.toByteArray());
}
public static GenomicsFactory.OfflineAuth getOfflineAuth(HellbenderDataflowOptions opts) throws IOException, ClassNotFoundException, GeneralSecurityException {
byte[] serialized = opts.getSerializedOfflineAuth();
if (null==serialized && opts.getApiKey()!=null) {
// fall back to using the API key only (even if a secrets file was also specified).
GenomicsFactory.Builder builder =
GenomicsFactory.builder(opts.getAppName()).setNumberOfRetries(opts.getNumberOfRetries());
return builder.build().getOfflineAuthFromApiKey(opts.getApiKey());
}
try (ObjectInputStream is = new ObjectInputStream(new ByteArrayInputStream(serialized))) {
return (GenomicsFactory.OfflineAuth)(is.readObject());
}
}
}

void setSerializedOfflineAuth(byte[] auth);
byte[] getSerializedOfflineAuth();

}

protected enum PipelineRunnerType implements CommandLineParser.ClpEnum {
LOCAL(DirectPipelineRunner.class, "run the pipeline locally"),
Expand Down Expand Up @@ -118,6 +157,10 @@ private PipelineOptions buildPipelineOptions() {
options.setProject(projectID);
options.setStagingLocation(stagingLocation);
options.setRunner(this.runnerType.runner);
// n1-standard-4 is 4x the RAM and 4x the CPUs as the default machine, at only 4x the price.
options.setWorkerMachineType("n1-standard-1");
// this is new code. If there's a problem, odds are it's our fault and retrying won't help.
options.setNumberOfRetries(0);
if (numWorkers!=0) {
options.setNumWorkers(numWorkers);
}
Expand All @@ -127,6 +170,17 @@ private PipelineOptions buildPipelineOptions() {
logger.info("Loading " + clientSecret.getName());
options.setSecretsFile(clientSecret.getAbsolutePath());
}
if (apiKey!=null || clientSecret != null) {
// put a serialized version of the credentials in the pipelineOptions, so we can get to it later.
try {
GenomicsFactory.OfflineAuth auth = GCSOptions.Methods.createGCSAuth(options);
HellbenderDataflowOptions.Methods.setOfflineAuth(options, auth);
} catch (Exception x) {
throw new GATKException("Error with credentials",x);
}
}
String name = jobName();
if (null!=name) options.setJobName(name);
return options;
} else {
final SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
Expand Down Expand Up @@ -168,6 +222,14 @@ protected static void runPipeline(final Pipeline p) {
*/
protected void afterPipeline(Pipeline pipeline) {}

/**
* Override to pick a name for the pipeline.
* Note that Dataflow requires the name to be unique among running jobs.
*/
protected String jobName() {
return null;
}

// ---------------------------------------------------
// Helpers

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import htsjdk.samtools.SAMSequenceRecord;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.broadinstitute.hellbender.engine.dataflow.DataflowCommandLineProgram;
import org.broadinstitute.hellbender.exceptions.GATKException;
import org.broadinstitute.hellbender.exceptions.UserException;
import org.broadinstitute.hellbender.utils.SimpleInterval;
import org.broadinstitute.hellbender.utils.Utils;
Expand Down Expand Up @@ -142,6 +144,7 @@ public ReferenceBases getReferenceBases(final PipelineOptions pipelineOptions, f
}
}


/**
* Return a sequence dictionary for the reference.
* @param optReadSequenceDictionaryToMatch - (optional) the sequence dictionary of the reads, we'll match its order if possible.
Expand Down Expand Up @@ -276,10 +279,10 @@ private void fillGenomicsService(final PipelineOptions pipelineOptions) {

private Genomics createGenomicsService(final PipelineOptions pipelineOptions) {
try {
final GenomicsFactory.OfflineAuth auth = GenomicsOptions.Methods.getGenomicsAuth(pipelineOptions.as(GCSOptions.class));
final GenomicsFactory.OfflineAuth auth = DataflowCommandLineProgram.HellbenderDataflowOptions.Methods.getOfflineAuth(pipelineOptions.as(DataflowCommandLineProgram.HellbenderDataflowOptions.class));
return auth.getGenomics(auth.getDefaultFactory());
}
catch ( GeneralSecurityException e ) {
catch ( GeneralSecurityException|ClassNotFoundException e ) {
throw new UserException("Authentication failed for Google genomics service", e);
}
catch ( IOException e ) {
Expand All @@ -297,6 +300,7 @@ private void writeObject(ObjectOutputStream stream) throws IOException {
}
stream.writeObject(referenceNameToIdTable);
}

private void readObject(ObjectInputStream stream) throws IOException, ClassNotFoundException {
JsonFactory jsonFactory = com.google.api.client.googleapis.util.Utils.getDefaultJsonFactory();
final Map<String, Reference> refs = new HashMap<>();
Expand Down

0 comments on commit 1ede474

Please sign in to comment.