From 1ede4740e59a99da2fd9570d89cf6a64ffeaebb0 Mon Sep 17 00:00:00 2001 From: JP Martin Date: Wed, 2 Sep 2015 12:42:03 -0700 Subject: [PATCH] serialize auth onto pipeline, default to API_KEY if auth is missing --- .../dataflow/DataflowCommandLineProgram.java | 64 ++++++++++++++++++- .../dataflow/datasources/RefAPISource.java | 8 ++- 2 files changed, 69 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/broadinstitute/hellbender/engine/dataflow/DataflowCommandLineProgram.java b/src/main/java/org/broadinstitute/hellbender/engine/dataflow/DataflowCommandLineProgram.java index e2375d95afd..e90c943dbbe 100644 --- a/src/main/java/org/broadinstitute/hellbender/engine/dataflow/DataflowCommandLineProgram.java +++ b/src/main/java/org/broadinstitute/hellbender/engine/dataflow/DataflowCommandLineProgram.java @@ -12,15 +12,24 @@ import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner; import com.google.cloud.dataflow.sdk.runners.PipelineRunner; import com.google.cloud.genomics.dataflow.utils.GCSOptions; +import com.google.cloud.genomics.dataflow.utils.GenomicsOptions; +import com.google.cloud.genomics.utils.GenomicsFactory; import com.google.common.annotations.VisibleForTesting; import org.broadinstitute.hellbender.cmdline.Argument; import org.broadinstitute.hellbender.cmdline.CommandLineParser; import org.broadinstitute.hellbender.cmdline.CommandLineProgram; +import org.broadinstitute.hellbender.exceptions.GATKException; import org.broadinstitute.hellbender.exceptions.UserException; import org.broadinstitute.hellbender.utils.dataflow.DataflowUtils; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.File; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.io.Serializable; +import java.security.GeneralSecurityException; public abstract class DataflowCommandLineProgram extends CommandLineProgram implements Serializable { @@ -28,7 +37,37 @@ public abstract class DataflowCommandLineProgram extends CommandLineProgram impl // We need authentication options from GCSOptions, and Dataflow options from DataflowPipelineOptions. // Neither inherits from the other, so we have to put them together like this. - public interface HellbenderDataflowOptions extends GCSOptions, DataflowPipelineOptions {} + // This also includes code to save the OfflineAuth onto the pipelineoptions, so we can get to them later + // (even if they're via client-secrets.json, which workers otherwise wouldn't be able to load). + public interface HellbenderDataflowOptions extends GCSOptions, DataflowPipelineOptions { + + static class Methods { + public static void setOfflineAuth(HellbenderDataflowOptions opts, GenomicsFactory.OfflineAuth auth) throws IOException { + ByteArrayOutputStream os = new ByteArrayOutputStream(); + try (ObjectOutputStream oos = new ObjectOutputStream(os)) { + oos.writeObject(auth); + oos.flush(); + } + opts.setSerializedOfflineAuth(os.toByteArray()); + } + public static GenomicsFactory.OfflineAuth getOfflineAuth(HellbenderDataflowOptions opts) throws IOException, ClassNotFoundException, GeneralSecurityException { + byte[] serialized = opts.getSerializedOfflineAuth(); + if (null==serialized && opts.getApiKey()!=null) { + // fall back to using the API key only (even if a secrets file was also specified). + GenomicsFactory.Builder builder = + GenomicsFactory.builder(opts.getAppName()).setNumberOfRetries(opts.getNumberOfRetries()); + return builder.build().getOfflineAuthFromApiKey(opts.getApiKey()); + } + try (ObjectInputStream is = new ObjectInputStream(new ByteArrayInputStream(serialized))) { + return (GenomicsFactory.OfflineAuth)(is.readObject()); + } + } + } + + void setSerializedOfflineAuth(byte[] auth); + byte[] getSerializedOfflineAuth(); + + } protected enum PipelineRunnerType implements CommandLineParser.ClpEnum { LOCAL(DirectPipelineRunner.class, "run the pipeline locally"), @@ -118,6 +157,10 @@ private PipelineOptions buildPipelineOptions() { options.setProject(projectID); options.setStagingLocation(stagingLocation); options.setRunner(this.runnerType.runner); + // n1-standard-4 is 4x the RAM and 4x the CPUs as the default machine, at only 4x the price. + options.setWorkerMachineType("n1-standard-1"); + // this is new code. If there's a problem, odds are it's our fault and retrying won't help. + options.setNumberOfRetries(0); if (numWorkers!=0) { options.setNumWorkers(numWorkers); } @@ -127,6 +170,17 @@ private PipelineOptions buildPipelineOptions() { logger.info("Loading " + clientSecret.getName()); options.setSecretsFile(clientSecret.getAbsolutePath()); } + if (apiKey!=null || clientSecret != null) { + // put a serialized version of the credentials in the pipelineOptions, so we can get to it later. + try { + GenomicsFactory.OfflineAuth auth = GCSOptions.Methods.createGCSAuth(options); + HellbenderDataflowOptions.Methods.setOfflineAuth(options, auth); + } catch (Exception x) { + throw new GATKException("Error with credentials",x); + } + } + String name = jobName(); + if (null!=name) options.setJobName(name); return options; } else { final SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); @@ -168,6 +222,14 @@ protected static void runPipeline(final Pipeline p) { */ protected void afterPipeline(Pipeline pipeline) {} + /** + * Override to pick a name for the pipeline. + * Note that Dataflow requires the name to be unique among running jobs. + */ + protected String jobName() { + return null; + } + // --------------------------------------------------- // Helpers diff --git a/src/main/java/org/broadinstitute/hellbender/engine/dataflow/datasources/RefAPISource.java b/src/main/java/org/broadinstitute/hellbender/engine/dataflow/datasources/RefAPISource.java index bbc0336d319..f49d0146c9a 100644 --- a/src/main/java/org/broadinstitute/hellbender/engine/dataflow/datasources/RefAPISource.java +++ b/src/main/java/org/broadinstitute/hellbender/engine/dataflow/datasources/RefAPISource.java @@ -16,6 +16,8 @@ import htsjdk.samtools.SAMSequenceRecord; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.broadinstitute.hellbender.engine.dataflow.DataflowCommandLineProgram; +import org.broadinstitute.hellbender.exceptions.GATKException; import org.broadinstitute.hellbender.exceptions.UserException; import org.broadinstitute.hellbender.utils.SimpleInterval; import org.broadinstitute.hellbender.utils.Utils; @@ -142,6 +144,7 @@ public ReferenceBases getReferenceBases(final PipelineOptions pipelineOptions, f } } + /** * Return a sequence dictionary for the reference. * @param optReadSequenceDictionaryToMatch - (optional) the sequence dictionary of the reads, we'll match its order if possible. @@ -276,10 +279,10 @@ private void fillGenomicsService(final PipelineOptions pipelineOptions) { private Genomics createGenomicsService(final PipelineOptions pipelineOptions) { try { - final GenomicsFactory.OfflineAuth auth = GenomicsOptions.Methods.getGenomicsAuth(pipelineOptions.as(GCSOptions.class)); + final GenomicsFactory.OfflineAuth auth = DataflowCommandLineProgram.HellbenderDataflowOptions.Methods.getOfflineAuth(pipelineOptions.as(DataflowCommandLineProgram.HellbenderDataflowOptions.class)); return auth.getGenomics(auth.getDefaultFactory()); } - catch ( GeneralSecurityException e ) { + catch ( GeneralSecurityException|ClassNotFoundException e ) { throw new UserException("Authentication failed for Google genomics service", e); } catch ( IOException e ) { @@ -297,6 +300,7 @@ private void writeObject(ObjectOutputStream stream) throws IOException { } stream.writeObject(referenceNameToIdTable); } + private void readObject(ObjectInputStream stream) throws IOException, ClassNotFoundException { JsonFactory jsonFactory = com.google.api.client.googleapis.util.Utils.getDefaultJsonFactory(); final Map refs = new HashMap<>();