From caf781800b4ee3ee27f4c56f6c87f04ac46225f3 Mon Sep 17 00:00:00 2001 From: Sela Date: Mon, 6 Mar 2017 11:17:00 +0200 Subject: [PATCH] [BEAM-1556] Make PipelineOptions a lazy-singleton and init IOs as part of it. --- .../translation/SparkRuntimeContext.java | 29 +++++++++++++++---- 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java index 9c3d79f0c8d61..4ccfeadcd9e30 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java @@ -32,6 +32,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.spark.Accumulator; /** @@ -40,12 +41,10 @@ */ public class SparkRuntimeContext implements Serializable { private final String serializedPipelineOptions; + private transient CoderRegistry coderRegistry; - /** - * Map fo names to Beam aggregators. - */ + // map for names to Beam aggregators. private final Map> aggregators = new HashMap<>(); - private transient CoderRegistry coderRegistry; SparkRuntimeContext(Pipeline pipeline) { this.serializedPipelineOptions = serializePipelineOptions(pipeline.getOptions()); @@ -67,8 +66,8 @@ private static PipelineOptions deserializePipelineOptions(String serializedPipel } } - public synchronized PipelineOptions getPipelineOptions() { - return deserializePipelineOptions(serializedPipelineOptions); + public PipelineOptions getPipelineOptions() { + return PipelineOptionsHolder.getOrInit(serializedPipelineOptions); } /** @@ -118,6 +117,24 @@ public CoderRegistry getCoderRegistry() { return coderRegistry; } + private static class PipelineOptionsHolder { + // on executors, this should deserialize once. + private static transient volatile PipelineOptions pipelineOptions = null; + + static PipelineOptions getOrInit(String serializedPipelineOptions) { + if (pipelineOptions == null) { + synchronized (PipelineOptionsHolder.class) { + if (pipelineOptions == null) { + pipelineOptions = deserializePipelineOptions(serializedPipelineOptions); + } + } + // register IO factories. + IOChannelUtils.registerIOFactoriesAllowOverride(pipelineOptions); + } + return pipelineOptions; + } + } + /** * Initialize spark aggregators exactly once. *