Skip to content

Commit

Permalink
[BEAM-11212] Display Beam Metrics in Spark History Server for Classic…
Browse files Browse the repository at this point in the history
… Runner too

It also removes the SparkPipelineOptions related to events logging
because those are already configured and used by Spark config.
  • Loading branch information
iemejia committed Apr 2, 2021
1 parent df055c1 commit 8059182
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,6 @@
*/
public interface SparkPipelineOptions extends SparkCommonPipelineOptions {

@Description("Set it to true if event logs should be saved to Spark History Server directory")
@Default.Boolean(false)
boolean getEventLogEnabled();

void setEventLogEnabled(boolean eventLogEnabled);

@Description("The directory to save Spark History Server logs")
@Default.String("/tmp/spark-events/")
String getSparkHistoryDir();

void setSparkHistoryDir(String sparkHistoryDir);

@Description("Batch interval for Spark streaming in milliseconds.")
@Default.Long(500)
Long getBatchIntervalMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@
import static org.apache.beam.runners.core.construction.resources.PipelineResources.detectClassPathResourcesToStage;
import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.hasUnboundedPCollections;
import static org.apache.beam.runners.spark.SparkPipelineOptions.prepareFilesToStage;
import static org.apache.beam.runners.spark.util.SparkCommon.startEventLoggingListener;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -61,8 +60,6 @@
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.scheduler.EventLoggingListener;
import org.apache.spark.scheduler.SparkListenerApplicationEnd;
import org.apache.spark.scheduler.SparkListenerExecutorAdded;
import org.apache.spark.scheduler.cluster.ExecutorInfo;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingListener;
import org.apache.spark.streaming.api.java.JavaStreamingListenerWrapper;
Expand All @@ -72,7 +69,6 @@
import org.kohsuke.args4j.Option;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

/** Runs a portable pipeline on Apache Spark. */
@SuppressWarnings({
Expand All @@ -89,8 +85,7 @@ public SparkPipelineRunner(SparkPipelineOptions pipelineOptions) {
}

@Override
public PortablePipelineResult run(RunnerApi.Pipeline pipeline, JobInfo jobInfo)
throws URISyntaxException {
public PortablePipelineResult run(RunnerApi.Pipeline pipeline, JobInfo jobInfo) {
SparkPortablePipelineTranslator translator;
boolean isStreaming = pipelineOptions.isStreaming() || hasUnboundedPCollections(pipeline);
if (isStreaming) {
Expand Down Expand Up @@ -135,30 +130,9 @@ public PortablePipelineResult run(RunnerApi.Pipeline pipeline, JobInfo jobInfo)
PortablePipelineResult result;
final JavaSparkContext jsc = SparkContextFactory.getSparkContext(pipelineOptions);

long startTime = Instant.now().getMillis();
EventLoggingListener eventLoggingListener = null;
if (pipelineOptions.getEventLogEnabled()) {
eventLoggingListener =
new EventLoggingListener(
jsc.getConf().getAppId(),
scala.Option.apply("1"),
new URI(pipelineOptions.getSparkHistoryDir()),
jsc.getConf(),
jsc.hadoopConfiguration());
eventLoggingListener.initializeLogIfNecessary(false, false);
eventLoggingListener.start();
scala.collection.immutable.Map<String, String> logUrlMap =
new scala.collection.immutable.HashMap<String, String>();
Tuple2<String, String>[] sparkMasters = jsc.getConf().getAllWithPrefix("spark.master");
Tuple2<String, String>[] sparkExecutors = jsc.getConf().getAllWithPrefix("spark.executor.id");
for (Tuple2<String, String> sparkExecutor : sparkExecutors) {
eventLoggingListener.onExecutorAdded(
new SparkListenerExecutorAdded(
startTime,
sparkExecutor._2(),
new ExecutorInfo(sparkMasters[0]._2(), 0, logUrlMap)));
}
}
final long startTime = Instant.now().getMillis();
EventLoggingListener eventLoggingListener =
startEventLoggingListener(jsc, pipelineOptions, startTime);

// Initialize accumulators.
AggregatorsAccumulator.init(pipelineOptions, jsc);
Expand Down Expand Up @@ -243,7 +217,8 @@ public PortablePipelineResult run(RunnerApi.Pipeline pipeline, JobInfo jobInfo)
pipelineOptions.as(MetricsOptions.class),
result);
metricsPusher.start();
if (pipelineOptions.getEventLogEnabled()) {

if (eventLoggingListener != null) {
eventLoggingListener.onApplicationStart(
SparkCompat.buildSparkListenerApplicationStart(jsc, pipelineOptions, startTime, result));
eventLoggingListener.onApplicationEnd(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.beam.runners.core.construction.resources.PipelineResources.detectClassPathResourcesToStage;
import static org.apache.beam.runners.spark.SparkPipelineOptions.prepareFilesToStage;
import static org.apache.beam.runners.spark.util.SparkCommon.startEventLoggingListener;

import java.util.Collection;
import java.util.HashMap;
Expand All @@ -43,6 +44,7 @@
import org.apache.beam.runners.spark.translation.streaming.Checkpoint.CheckpointDir;
import org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory;
import org.apache.beam.runners.spark.util.GlobalWatermarkHolder.WatermarkAdvancingStreamingListener;
import org.apache.beam.runners.spark.util.SparkCompat;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
Expand All @@ -67,9 +69,12 @@
import org.apache.spark.SparkEnv$;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.metrics.MetricsSystem;
import org.apache.spark.scheduler.EventLoggingListener;
import org.apache.spark.scheduler.SparkListenerApplicationEnd;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingListener;
import org.apache.spark.streaming.api.java.JavaStreamingListenerWrapper;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -97,7 +102,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
private static final Logger LOG = LoggerFactory.getLogger(SparkRunner.class);

/** Options used in this pipeline runner. */
private final SparkPipelineOptions mOptions;
private final SparkPipelineOptions pipelineOptions;

/**
* Creates and returns a new SparkRunner with default options. In particular, against a spark
Expand Down Expand Up @@ -151,7 +156,7 @@ public static SparkRunner fromOptions(PipelineOptions options) {
* thread.
*/
private SparkRunner(SparkPipelineOptions options) {
mOptions = options;
pipelineOptions = options;
}

@Override
Expand All @@ -172,25 +177,31 @@ public SparkPipelineResult run(final Pipeline pipeline) {

// Default to using the primitive versions of Read.Bounded and Read.Unbounded if we are
// executing an unbounded pipeline or the user specifically requested it.
if (mOptions.isStreaming()
if (pipelineOptions.isStreaming()
|| ExperimentalOptions.hasExperiment(
pipeline.getOptions(), "beam_fn_api_use_deprecated_read")
|| ExperimentalOptions.hasExperiment(pipeline.getOptions(), "use_deprecated_read")) {
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReads(pipeline);
}

pipeline.replaceAll(SparkTransformOverrides.getDefaultOverrides(mOptions.isStreaming()));
pipeline.replaceAll(SparkTransformOverrides.getDefaultOverrides(pipelineOptions.isStreaming()));

prepareFilesToStage(mOptions);
prepareFilesToStage(pipelineOptions);

if (mOptions.isStreaming()) {
CheckpointDir checkpointDir = new CheckpointDir(mOptions.getCheckpointDir());
final long startTime = Instant.now().getMillis();
EventLoggingListener eventLoggingListener = null;
JavaSparkContext jsc = SparkContextFactory.getSparkContext(pipelineOptions);
if (pipelineOptions.isStreaming()) {
CheckpointDir checkpointDir = new CheckpointDir(pipelineOptions.getCheckpointDir());
SparkRunnerStreamingContextFactory streamingContextFactory =
new SparkRunnerStreamingContextFactory(pipeline, mOptions, checkpointDir);
new SparkRunnerStreamingContextFactory(pipeline, pipelineOptions, checkpointDir);
final JavaStreamingContext jssc =
JavaStreamingContext.getOrCreate(
checkpointDir.getSparkCheckpointDir().toString(), streamingContextFactory);

jsc = jssc.sparkContext();
eventLoggingListener = startEventLoggingListener(jsc, pipelineOptions, startTime);

// Checkpoint aggregator/metrics values
jssc.addStreamingListener(
new JavaStreamingListenerWrapper(
Expand All @@ -200,7 +211,8 @@ public SparkPipelineResult run(final Pipeline pipeline) {
new MetricsAccumulator.AccumulatorCheckpointingSparkListener()));

// register user-defined listeners.
for (JavaStreamingListener listener : mOptions.as(SparkContextOptions.class).getListeners()) {
for (JavaStreamingListener listener :
pipelineOptions.as(SparkContextOptions.class).getListeners()) {
LOG.info("Registered listener {}." + listener.getClass().getSimpleName());
jssc.addStreamingListener(new JavaStreamingListenerWrapper(listener));
}
Expand All @@ -213,7 +225,7 @@ public SparkPipelineResult run(final Pipeline pipeline) {
// SparkRunnerStreamingContextFactory is because the factory is not called when resuming
// from checkpoint (When not resuming from checkpoint initAccumulators will be called twice
// but this is fine since it is idempotent).
initAccumulators(mOptions, jssc.sparkContext());
initAccumulators(pipelineOptions, jssc.sparkContext());

startPipeline =
executorService.submit(
Expand All @@ -225,15 +237,15 @@ public SparkPipelineResult run(final Pipeline pipeline) {

result = new SparkPipelineResult.StreamingMode(startPipeline, jssc);
} else {
// create the evaluation context
final JavaSparkContext jsc = SparkContextFactory.getSparkContext(mOptions);
final EvaluationContext evaluationContext = new EvaluationContext(jsc, pipeline, mOptions);
eventLoggingListener = startEventLoggingListener(jsc, pipelineOptions, startTime);
final EvaluationContext evaluationContext =
new EvaluationContext(jsc, pipeline, pipelineOptions);
translator = new TransformTranslator.Translator();

// update the cache candidates
updateCacheCandidates(pipeline, translator, evaluationContext);

initAccumulators(mOptions, jsc);
initAccumulators(pipelineOptions, jsc);
startPipeline =
executorService.submit(
() -> {
Expand All @@ -246,17 +258,28 @@ public SparkPipelineResult run(final Pipeline pipeline) {
result = new SparkPipelineResult.BatchMode(startPipeline, jsc);
}

if (mOptions.getEnableSparkMetricSinks()) {
registerMetricsSource(mOptions.getAppName());
if (pipelineOptions.getEnableSparkMetricSinks()) {
registerMetricsSource(pipelineOptions.getAppName());
}

// it would have been better to create MetricsPusher from runner-core but we need
// runner-specific
// MetricsContainerStepMap
MetricsPusher metricsPusher =
new MetricsPusher(
MetricsAccumulator.getInstance().value(), mOptions.as(MetricsOptions.class), result);
MetricsAccumulator.getInstance().value(),
pipelineOptions.as(MetricsOptions.class),
result);
metricsPusher.start();

if (eventLoggingListener != null) {
eventLoggingListener.onApplicationStart(
SparkCompat.buildSparkListenerApplicationStart(jsc, pipelineOptions, startTime, result));
eventLoggingListener.onApplicationEnd(
new SparkListenerApplicationEnd(Instant.now().getMillis()));
eventLoggingListener.stop();
}

return result;
}

Expand Down Expand Up @@ -288,7 +311,7 @@ private void detectTranslationMode(Pipeline pipeline) {
pipeline.traverseTopologically(detector);
if (detector.getTranslationMode().equals(TranslationMode.STREAMING)) {
// set streaming mode if it's a streaming pipeline
this.mOptions.setStreaming(true);
this.pipelineOptions.setStreaming(true);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.spark.util;

import java.net.URI;
import java.net.URISyntaxException;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.scheduler.EventLoggingListener;
import org.apache.spark.scheduler.SparkListenerExecutorAdded;
import org.apache.spark.scheduler.cluster.ExecutorInfo;
import org.checkerframework.checker.nullness.qual.Nullable;
import scala.Tuple2;

/** Common methods to build Spark specific objects used by different runners. */
@Internal
@SuppressWarnings({
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
public class SparkCommon {

/**
* Starts an EventLoggingListener to save Beam Metrics on Spark's History Server if event logging
* is enabled.
*
* @return The associated EventLoggingListener or null if it could not be started.
*/
public static @Nullable EventLoggingListener startEventLoggingListener(
final JavaSparkContext jsc, SparkPipelineOptions pipelineOptions, long startTime) {
EventLoggingListener eventLoggingListener = null;
try {
if (jsc.getConf().getBoolean("spark.eventLog.enabled", false)) {
eventLoggingListener =
new EventLoggingListener(
jsc.getConf().getAppId(),
scala.Option.apply("1"),
new URI(jsc.getConf().get("spark.eventLog.dir", null)),
jsc.getConf(),
jsc.hadoopConfiguration());
eventLoggingListener.initializeLogIfNecessary(false, false);
eventLoggingListener.start();

scala.collection.immutable.Map<String, String> logUrlMap =
new scala.collection.immutable.HashMap<>();
Tuple2<String, String>[] sparkMasters = jsc.getConf().getAllWithPrefix("spark.master");
Tuple2<String, String>[] sparkExecutors =
jsc.getConf().getAllWithPrefix("spark.executor.id");
for (Tuple2<String, String> sparkExecutor : sparkExecutors) {
eventLoggingListener.onExecutorAdded(
new SparkListenerExecutorAdded(
startTime,
sparkExecutor._2(),
new ExecutorInfo(sparkMasters[0]._2(), 0, logUrlMap)));
}
return eventLoggingListener;
}
} catch (URISyntaxException e) {
throw new RuntimeException(
"The URI syntax in the Spark config \"spark.eventLog.dir\" is not correct", e);
}
return eventLoggingListener;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
import java.lang.reflect.Method;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.beam.runners.jobsubmission.PortablePipelineResult;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.metrics.SparkBeamMetric;
import org.apache.beam.runners.spark.translation.SparkCombineFn;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.spark.api.java.JavaPairRDD;
Expand Down Expand Up @@ -125,7 +125,7 @@ public static <K, InputT, AccumT, OutputT> JavaPairRDD<K, WindowedValue<OutputT>
}

public static SparkListenerApplicationStart buildSparkListenerApplicationStart(
JavaSparkContext jsc, PipelineOptions options, long time, PortablePipelineResult result) {
final JavaSparkContext jsc, SparkPipelineOptions options, long time, PipelineResult result) {
String appName = options.as(ApplicationNameOptions.class).getAppName();
Option<String> appId = Option.apply(jsc.getConf().getAppId());
Option<String> appAttemptId = Option.apply("1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public void processElement(ProcessContext context) {

@Category(UsesMetricsPusher.class)
@Test
public void testInSBatchMode() throws Exception {
public void testInBatchMode() throws Exception {
pipeline.apply(Create.of(1, 2, 3, 4, 5, 6)).apply(ParDo.of(new CountingDoFn()));

pipeline.run();
Expand Down

0 comments on commit 8059182

Please sign in to comment.