Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-11213] Display Beam Metrics in Spark History Server for Classic Runner #14409

Merged
merged 1 commit into from
Apr 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,6 @@
*/
public interface SparkPipelineOptions extends SparkCommonPipelineOptions {

@Description("Set it to true if event logs should be saved to Spark History Server directory")
Copy link
Member Author

@iemejia iemejia Apr 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Historically we avoid if possible to introduce variables that are already defined as Spark Configurations, so these variables should come from the Spark configuration to avoid duplicated non synchronized configuration changes and confusion for end users.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. 👍

@Default.Boolean(false)
boolean getEventLogEnabled();

void setEventLogEnabled(boolean eventLogEnabled);

@Description("The directory to save Spark History Server logs")
@Default.String("/tmp/spark-events/")
String getSparkHistoryDir();

void setSparkHistoryDir(String sparkHistoryDir);

@Description("Batch interval for Spark streaming in milliseconds.")
@Default.Long(500)
Long getBatchIntervalMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@
import static org.apache.beam.runners.core.construction.resources.PipelineResources.detectClassPathResourcesToStage;
import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.hasUnboundedPCollections;
import static org.apache.beam.runners.spark.SparkPipelineOptions.prepareFilesToStage;
import static org.apache.beam.runners.spark.util.SparkCommon.startEventLoggingListener;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -61,8 +60,6 @@
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.scheduler.EventLoggingListener;
import org.apache.spark.scheduler.SparkListenerApplicationEnd;
import org.apache.spark.scheduler.SparkListenerExecutorAdded;
import org.apache.spark.scheduler.cluster.ExecutorInfo;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingListener;
import org.apache.spark.streaming.api.java.JavaStreamingListenerWrapper;
Expand All @@ -72,7 +69,6 @@
import org.kohsuke.args4j.Option;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

/** Runs a portable pipeline on Apache Spark. */
@SuppressWarnings({
Expand All @@ -89,8 +85,7 @@ public SparkPipelineRunner(SparkPipelineOptions pipelineOptions) {
}

@Override
public PortablePipelineResult run(RunnerApi.Pipeline pipeline, JobInfo jobInfo)
throws URISyntaxException {
public PortablePipelineResult run(RunnerApi.Pipeline pipeline, JobInfo jobInfo) {
SparkPortablePipelineTranslator translator;
boolean isStreaming = pipelineOptions.isStreaming() || hasUnboundedPCollections(pipeline);
if (isStreaming) {
Expand Down Expand Up @@ -135,30 +130,9 @@ public PortablePipelineResult run(RunnerApi.Pipeline pipeline, JobInfo jobInfo)
PortablePipelineResult result;
final JavaSparkContext jsc = SparkContextFactory.getSparkContext(pipelineOptions);

long startTime = Instant.now().getMillis();
EventLoggingListener eventLoggingListener = null;
if (pipelineOptions.getEventLogEnabled()) {
eventLoggingListener =
new EventLoggingListener(
jsc.getConf().getAppId(),
scala.Option.apply("1"),
new URI(pipelineOptions.getSparkHistoryDir()),
jsc.getConf(),
jsc.hadoopConfiguration());
eventLoggingListener.initializeLogIfNecessary(false, false);
eventLoggingListener.start();
scala.collection.immutable.Map<String, String> logUrlMap =
new scala.collection.immutable.HashMap<String, String>();
Tuple2<String, String>[] sparkMasters = jsc.getConf().getAllWithPrefix("spark.master");
Tuple2<String, String>[] sparkExecutors = jsc.getConf().getAllWithPrefix("spark.executor.id");
for (Tuple2<String, String> sparkExecutor : sparkExecutors) {
eventLoggingListener.onExecutorAdded(
new SparkListenerExecutorAdded(
startTime,
sparkExecutor._2(),
new ExecutorInfo(sparkMasters[0]._2(), 0, logUrlMap)));
}
}
final long startTime = Instant.now().getMillis();
EventLoggingListener eventLoggingListener =
startEventLoggingListener(jsc, pipelineOptions, startTime);

// Initialize accumulators.
AggregatorsAccumulator.init(pipelineOptions, jsc);
Expand Down Expand Up @@ -243,7 +217,8 @@ public PortablePipelineResult run(RunnerApi.Pipeline pipeline, JobInfo jobInfo)
pipelineOptions.as(MetricsOptions.class),
result);
metricsPusher.start();
if (pipelineOptions.getEventLogEnabled()) {

if (eventLoggingListener != null) {
eventLoggingListener.onApplicationStart(
SparkCompat.buildSparkListenerApplicationStart(jsc, pipelineOptions, startTime, result));
eventLoggingListener.onApplicationEnd(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.beam.runners.core.construction.resources.PipelineResources.detectClassPathResourcesToStage;
import static org.apache.beam.runners.spark.SparkPipelineOptions.prepareFilesToStage;
import static org.apache.beam.runners.spark.util.SparkCommon.startEventLoggingListener;

import java.util.Collection;
import java.util.HashMap;
Expand All @@ -43,6 +44,7 @@
import org.apache.beam.runners.spark.translation.streaming.Checkpoint.CheckpointDir;
import org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory;
import org.apache.beam.runners.spark.util.GlobalWatermarkHolder.WatermarkAdvancingStreamingListener;
import org.apache.beam.runners.spark.util.SparkCompat;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
Expand All @@ -67,9 +69,12 @@
import org.apache.spark.SparkEnv$;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.metrics.MetricsSystem;
import org.apache.spark.scheduler.EventLoggingListener;
import org.apache.spark.scheduler.SparkListenerApplicationEnd;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingListener;
import org.apache.spark.streaming.api.java.JavaStreamingListenerWrapper;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -97,7 +102,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
private static final Logger LOG = LoggerFactory.getLogger(SparkRunner.class);

/** Options used in this pipeline runner. */
private final SparkPipelineOptions mOptions;
private final SparkPipelineOptions pipelineOptions;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 😄


/**
* Creates and returns a new SparkRunner with default options. In particular, against a spark
Expand Down Expand Up @@ -151,7 +156,7 @@ public static SparkRunner fromOptions(PipelineOptions options) {
* thread.
*/
private SparkRunner(SparkPipelineOptions options) {
mOptions = options;
pipelineOptions = options;
}

@Override
Expand All @@ -172,24 +177,29 @@ public SparkPipelineResult run(final Pipeline pipeline) {

// Default to using the primitive versions of Read.Bounded and Read.Unbounded if we are
// executing an unbounded pipeline or the user specifically requested it.
if (mOptions.isStreaming()
if (pipelineOptions.isStreaming()
|| ExperimentalOptions.hasExperiment(
pipeline.getOptions(), "beam_fn_api_use_deprecated_read")
|| ExperimentalOptions.hasExperiment(pipeline.getOptions(), "use_deprecated_read")) {
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReads(pipeline);
}

pipeline.replaceAll(SparkTransformOverrides.getDefaultOverrides(mOptions.isStreaming()));
pipeline.replaceAll(SparkTransformOverrides.getDefaultOverrides(pipelineOptions.isStreaming()));

prepareFilesToStage(mOptions);
prepareFilesToStage(pipelineOptions);

if (mOptions.isStreaming()) {
CheckpointDir checkpointDir = new CheckpointDir(mOptions.getCheckpointDir());
final long startTime = Instant.now().getMillis();
EventLoggingListener eventLoggingListener = null;
JavaSparkContext jsc = null;
if (pipelineOptions.isStreaming()) {
CheckpointDir checkpointDir = new CheckpointDir(pipelineOptions.getCheckpointDir());
SparkRunnerStreamingContextFactory streamingContextFactory =
new SparkRunnerStreamingContextFactory(pipeline, mOptions, checkpointDir);
new SparkRunnerStreamingContextFactory(pipeline, pipelineOptions, checkpointDir);
final JavaStreamingContext jssc =
JavaStreamingContext.getOrCreate(
checkpointDir.getSparkCheckpointDir().toString(), streamingContextFactory);
jsc = jssc.sparkContext();
eventLoggingListener = startEventLoggingListener(jsc, pipelineOptions, startTime);

// Checkpoint aggregator/metrics values
jssc.addStreamingListener(
Expand All @@ -200,7 +210,8 @@ public SparkPipelineResult run(final Pipeline pipeline) {
new MetricsAccumulator.AccumulatorCheckpointingSparkListener()));

// register user-defined listeners.
for (JavaStreamingListener listener : mOptions.as(SparkContextOptions.class).getListeners()) {
for (JavaStreamingListener listener :
pipelineOptions.as(SparkContextOptions.class).getListeners()) {
LOG.info("Registered listener {}." + listener.getClass().getSimpleName());
jssc.addStreamingListener(new JavaStreamingListenerWrapper(listener));
}
Expand All @@ -213,7 +224,7 @@ public SparkPipelineResult run(final Pipeline pipeline) {
// SparkRunnerStreamingContextFactory is because the factory is not called when resuming
// from checkpoint (When not resuming from checkpoint initAccumulators will be called twice
// but this is fine since it is idempotent).
initAccumulators(mOptions, jssc.sparkContext());
initAccumulators(pipelineOptions, jssc.sparkContext());

startPipeline =
executorService.submit(
Expand All @@ -225,15 +236,16 @@ public SparkPipelineResult run(final Pipeline pipeline) {

result = new SparkPipelineResult.StreamingMode(startPipeline, jssc);
} else {
// create the evaluation context
final JavaSparkContext jsc = SparkContextFactory.getSparkContext(mOptions);
final EvaluationContext evaluationContext = new EvaluationContext(jsc, pipeline, mOptions);
jsc = SparkContextFactory.getSparkContext(pipelineOptions);
eventLoggingListener = startEventLoggingListener(jsc, pipelineOptions, startTime);
final EvaluationContext evaluationContext =
new EvaluationContext(jsc, pipeline, pipelineOptions);
translator = new TransformTranslator.Translator();

// update the cache candidates
updateCacheCandidates(pipeline, translator, evaluationContext);

initAccumulators(mOptions, jsc);
initAccumulators(pipelineOptions, jsc);
startPipeline =
executorService.submit(
() -> {
Expand All @@ -246,17 +258,28 @@ public SparkPipelineResult run(final Pipeline pipeline) {
result = new SparkPipelineResult.BatchMode(startPipeline, jsc);
}

if (mOptions.getEnableSparkMetricSinks()) {
registerMetricsSource(mOptions.getAppName());
if (pipelineOptions.getEnableSparkMetricSinks()) {
registerMetricsSource(pipelineOptions.getAppName());
}

// it would have been better to create MetricsPusher from runner-core but we need
// runner-specific
// MetricsContainerStepMap
MetricsPusher metricsPusher =
new MetricsPusher(
MetricsAccumulator.getInstance().value(), mOptions.as(MetricsOptions.class), result);
MetricsAccumulator.getInstance().value(),
pipelineOptions.as(MetricsOptions.class),
result);
metricsPusher.start();

if (eventLoggingListener != null && jsc != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious how jsc could be null here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can't it would have definitely failed before but I am double checking just to be sure.

eventLoggingListener.onApplicationStart(
SparkCompat.buildSparkListenerApplicationStart(jsc, pipelineOptions, startTime, result));
eventLoggingListener.onApplicationEnd(
new SparkListenerApplicationEnd(Instant.now().getMillis()));
eventLoggingListener.stop();
}

return result;
}

Expand Down Expand Up @@ -288,7 +311,7 @@ private void detectTranslationMode(Pipeline pipeline) {
pipeline.traverseTopologically(detector);
if (detector.getTranslationMode().equals(TranslationMode.STREAMING)) {
// set streaming mode if it's a streaming pipeline
this.mOptions.setStreaming(true);
this.pipelineOptions.setStreaming(true);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.spark.util;

import java.net.URI;
import java.net.URISyntaxException;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.scheduler.EventLoggingListener;
import org.apache.spark.scheduler.SparkListenerExecutorAdded;
import org.apache.spark.scheduler.cluster.ExecutorInfo;
import org.checkerframework.checker.nullness.qual.Nullable;
import scala.Tuple2;

/** Common methods to build Spark specific objects used by different runners. */
@Internal
@SuppressWarnings({
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
public class SparkCommon {

/**
* Starts an EventLoggingListener to save Beam Metrics on Spark's History Server if event logging
* is enabled.
*
* @return The associated EventLoggingListener or null if it could not be started.
*/
public static @Nullable EventLoggingListener startEventLoggingListener(
final JavaSparkContext jsc, SparkPipelineOptions pipelineOptions, long startTime) {
EventLoggingListener eventLoggingListener = null;
try {
if (jsc.getConf().getBoolean("spark.eventLog.enabled", false)) {
Copy link
Member Author

@iemejia iemejia Apr 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the official Spark way to configure what the eventLogEnabled and sparkHistoryDir options were doing. So better to use these.

eventLoggingListener =
new EventLoggingListener(
jsc.getConf().getAppId(),
scala.Option.apply("1"),
new URI(jsc.getConf().get("spark.eventLog.dir", null)),
jsc.getConf(),
jsc.hadoopConfiguration());
eventLoggingListener.initializeLogIfNecessary(false, false);
eventLoggingListener.start();

scala.collection.immutable.Map<String, String> logUrlMap =
new scala.collection.immutable.HashMap<>();
Tuple2<String, String>[] sparkMasters = jsc.getConf().getAllWithPrefix("spark.master");
Tuple2<String, String>[] sparkExecutors =
jsc.getConf().getAllWithPrefix("spark.executor.id");
for (Tuple2<String, String> sparkExecutor : sparkExecutors) {
eventLoggingListener.onExecutorAdded(
new SparkListenerExecutorAdded(
startTime,
sparkExecutor._2(),
new ExecutorInfo(sparkMasters[0]._2(), 0, logUrlMap)));
}
return eventLoggingListener;
}
} catch (URISyntaxException e) {
throw new RuntimeException(
"The URI syntax in the Spark config \"spark.eventLog.dir\" is not correct", e);
}
return eventLoggingListener;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
import java.lang.reflect.Method;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.beam.runners.jobsubmission.PortablePipelineResult;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.metrics.SparkBeamMetric;
import org.apache.beam.runners.spark.translation.SparkCombineFn;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.spark.api.java.JavaPairRDD;
Expand Down Expand Up @@ -125,7 +125,7 @@ public static <K, InputT, AccumT, OutputT> JavaPairRDD<K, WindowedValue<OutputT>
}

public static SparkListenerApplicationStart buildSparkListenerApplicationStart(
JavaSparkContext jsc, PipelineOptions options, long time, PortablePipelineResult result) {
final JavaSparkContext jsc, SparkPipelineOptions options, long time, PipelineResult result) {
String appName = options.as(ApplicationNameOptions.class).getAppName();
Option<String> appId = Option.apply(jsc.getConf().getAppId());
Option<String> appAttemptId = Option.apply("1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public void processElement(ProcessContext context) {

@Category(UsesMetricsPusher.class)
@Test
public void testInSBatchMode() throws Exception {
public void testInBatchMode() throws Exception {
pipeline.apply(Create.of(1, 2, 3, 4, 5, 6)).apply(ParDo.of(new CountingDoFn()));

pipeline.run();
Expand Down