Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-11213] Display Beam Metrics in Spark History Server #13743

Merged
merged 28 commits into from Feb 3, 2021
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
291ced1
Create Spark Metrics in directory using Spark History Server format
Jan 13, 2021
f044ec1
java spotless apply
Jan 13, 2021
c578c84
add --spark-history-dir option in spark job server
Jan 14, 2021
cb1b46f
add driver distribution logs
Jan 14, 2021
864603a
remove start import check
Jan 14, 2021
f0c7412
spotless apply
Jan 14, 2021
ad359f5
render all metrics & add eventLogEnabled
Jan 24, 2021
bb7c05e
add exception when eventLogEnabled is true
Jan 24, 2021
cac34a7
handle eventLogEnabled
Jan 24, 2021
e669894
apply spotless
Jan 24, 2021
8be02d6
remove star import
Jan 24, 2021
5392f66
run java spotless
Jan 24, 2021
1da04b4
fix renderName is static from now on
Jan 24, 2021
9bec263
spark.executor.id loop and switch to boolean
Jan 27, 2021
31f86b8
spotless apply
Jan 27, 2021
c98a89e
remove uncessary code
Jan 28, 2021
6529c7a
scope.Option.apply
Jan 30, 2021
03c2457
remove options from spark job server configuration & fix the spark li…
Jan 31, 2021
d3b7838
remove options from spark job server configuration & fix the spark li…
Jan 31, 2021
ba68363
remove options from spark job server configuration & fix the spark li…
Jan 31, 2021
6acf592
remove options from spark job server configuration & fix the spark li…
Jan 31, 2021
a513fbf
remove options from spark job server configuration & fix the spark li…
Feb 2, 2021
f383e17
remove options from spark job server configuration & fix the spark li…
Feb 2, 2021
e105b9f
minor cleanup
Feb 3, 2021
068a33e
minor cleanup
Feb 3, 2021
b35c282
minor improvments
Feb 3, 2021
4b64b5f
remove whitespace changes
Feb 3, 2021
6329b57
remove whitespace changes
Feb 3, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions runners/spark/job-server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ runShadow {
args += ["--clean-artifacts-per-job=${project.property('cleanArtifactsPerJob')}"]
if (project.hasProperty('sparkMasterUrl'))
args += ["--spark-master-url=${project.property('sparkMasterUrl')}"]
if (project.hasProperty('sparkHistoryDir'))
ibzib marked this conversation as resolved.
Show resolved Hide resolved
args += ["--spark-history-dir=${project.property('sparkHistoryDir')}"]
if (project.hasProperty('eventLogEnabled'))
args += ["--event-log-enabled=${project.property('eventLogEnabled')}"]

systemProperties System.properties

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
public interface SparkCommonPipelineOptions
extends PipelineOptions, StreamingOptions, ApplicationNameOptions {
String DEFAULT_MASTER_URL = "local[4]";
String DEFAULT_SPARK_HISTORY_DIR = "/tmp/spark-events/";
ibzib marked this conversation as resolved.
Show resolved Hide resolved
boolean DEFAULT_EVENT_LOG_ENABLED = false;

@Description("The url of the spark master to connect to, (e.g. spark://host:port, local[4]).")
@Default.String(DEFAULT_MASTER_URL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ protected JobInvocation invokeWithExecutor(
if (sparkOptions.getSparkMaster().equals(SparkPipelineOptions.DEFAULT_MASTER_URL)) {
sparkOptions.setSparkMaster(configuration.getSparkMasterUrl());
}
if (sparkOptions.getSparkHistoryDir().equals(SparkPipelineOptions.DEFAULT_SPARK_HISTORY_DIR)) {
sparkOptions.setSparkHistoryDir(configuration.getSparkHistoryDir());
}
if (sparkOptions.getEventLogEnabled() == SparkPipelineOptions.DEFAULT_EVENT_LOG_ENABLED) {
sparkOptions.setEventLogEnabled(configuration.getEventLogEnabled());
}

// Options can't be translated to proto if runner class is unresolvable, so set it to null.
sparkOptions.setRunner(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.kohsuke.args4j.CmdLineException;
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
import org.kohsuke.args4j.spi.ExplicitBooleanOptionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -36,6 +37,27 @@ public class SparkJobServerDriver extends JobServerDriver {

/** Spark runner-specific Configuration for the jobServer. */
public static class SparkServerConfiguration extends ServerConfiguration {

ibzib marked this conversation as resolved.
Show resolved Hide resolved
@Option(
name = "--event-log-enabled",
usage = "Set it to true if event logs should be saved to Spark History Server directory",
handler = ExplicitBooleanOptionHandler.class)
private boolean eventLogEnabled = SparkPipelineOptions.DEFAULT_EVENT_LOG_ENABLED;

boolean getEventLogEnabled() {
return this.eventLogEnabled;
}

@Option(
name = "--spark-history-dir",
usage =
"Spark history dir path to store Spark History Server logs (e. g. /tmp/spark-events/)")
private String sparkHistoryDir = SparkPipelineOptions.DEFAULT_SPARK_HISTORY_DIR;

String getSparkHistoryDir() {
return this.sparkHistoryDir;
}

@Option(
name = "--spark-master-url",
usage = "Spark master url to submit job (e.g. spark://host:port, local[4])")
Expand Down Expand Up @@ -72,7 +94,6 @@ private static SparkJobServerDriver fromParams(String[] args) {
printUsage(parser);
throw new IllegalArgumentException("Unable to parse command line arguments.", e);
}

return fromConfig(configuration);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,18 @@
*/
public interface SparkPipelineOptions extends SparkCommonPipelineOptions {

@Description("Set it to true if event logs should be saved to Spark History Server directory")
@Default.Boolean(false)
boolean getEventLogEnabled();

void setEventLogEnabled(boolean eventLogEnabled);

@Description("The directory to save Spark History Server logs")
@Default.String("/tmp/spark-events/")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we set spark.eventLog.dir in the Spark conf? Or does that not matter?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't matter, however for consistency I think it would be good to configure it in such a way.

ibzib marked this conversation as resolved.
Show resolved Hide resolved
String getSparkHistoryDir();

void setSparkHistoryDir(String sparkHistoryDir);

@Description("Batch interval for Spark streaming in milliseconds.")
@Default.Long(500)
Long getBatchIntervalMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.hasUnboundedPCollections;
import static org.apache.beam.runners.spark.SparkPipelineOptions.prepareFilesToStage;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -42,6 +44,7 @@
import org.apache.beam.runners.jobsubmission.PortablePipelineRunner;
import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
import org.apache.beam.runners.spark.metrics.SparkBeamMetric;
import org.apache.beam.runners.spark.translation.SparkBatchPortablePipelineTranslator;
import org.apache.beam.runners.spark.translation.SparkContextFactory;
import org.apache.beam.runners.spark.translation.SparkPortablePipelineTranslator;
Expand All @@ -56,22 +59,29 @@
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Struct;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.scheduler.EventLoggingListener;
import org.apache.spark.scheduler.SparkListenerApplicationEnd;
import org.apache.spark.scheduler.SparkListenerApplicationStart;
import org.apache.spark.scheduler.SparkListenerExecutorAdded;
import org.apache.spark.scheduler.cluster.ExecutorInfo;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingListener;
import org.apache.spark.streaming.api.java.JavaStreamingListenerWrapper;
import org.joda.time.Instant;
import org.kohsuke.args4j.CmdLineException;
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import scala.collection.JavaConverters;

/** Runs a portable pipeline on Apache Spark. */
@SuppressWarnings({
"rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
public class SparkPipelineRunner implements PortablePipelineRunner {

private static final Logger LOG = LoggerFactory.getLogger(SparkPipelineRunner.class);

private final SparkPipelineOptions pipelineOptions;
Expand All @@ -81,7 +91,8 @@ public SparkPipelineRunner(SparkPipelineOptions pipelineOptions) {
}

@Override
public PortablePipelineResult run(RunnerApi.Pipeline pipeline, JobInfo jobInfo) {
public PortablePipelineResult run(RunnerApi.Pipeline pipeline, JobInfo jobInfo)
throws URISyntaxException {
SparkPortablePipelineTranslator translator;
boolean isStreaming = pipelineOptions.isStreaming() || hasUnboundedPCollections(pipeline);
if (isStreaming) {
Expand Down Expand Up @@ -123,10 +134,33 @@ public PortablePipelineResult run(RunnerApi.Pipeline pipeline, JobInfo jobInfo)
"Will stage {} files. (Enable logging at DEBUG level to see which files will be staged.)",
pipelineOptions.getFilesToStage().size());
LOG.debug("Staging files: {}", pipelineOptions.getFilesToStage());

PortablePipelineResult result;
final JavaSparkContext jsc = SparkContextFactory.getSparkContext(pipelineOptions);

EventLoggingListener eventLoggingListener = null;
if (pipelineOptions.getEventLogEnabled()) {
eventLoggingListener =
new EventLoggingListener(
jobInfo.jobId(),
scala.Option.apply(jobInfo.jobName()),
ibzib marked this conversation as resolved.
Show resolved Hide resolved
new URI(pipelineOptions.getSparkHistoryDir()),
jsc.getConf(),
jsc.hadoopConfiguration());
eventLoggingListener.initializeLogIfNecessary(false, false);
eventLoggingListener.start();
scala.collection.immutable.Map<String, String> logUrlMap =
new scala.collection.immutable.HashMap<String, String>();
Tuple2<String, String>[] sparkMasters = jsc.getConf().getAllWithPrefix("spark.master");
Tuple2<String, String>[] sparkExecutors = jsc.getConf().getAllWithPrefix("spark.executor.id");
for (int i = 0; i < sparkMasters.length; i++) {
eventLoggingListener.onExecutorAdded(
new SparkListenerExecutorAdded(
Instant.now().getMillis(),
sparkExecutors[i]._2(),
ibzib marked this conversation as resolved.
Show resolved Hide resolved
new ExecutorInfo(sparkMasters[i]._2(), 0, logUrlMap)));
}
}

LOG.info(String.format("Running job %s on Spark master %s", jobInfo.jobId(), jsc.master()));

// Initialize accumulators.
Expand Down Expand Up @@ -212,6 +246,22 @@ public PortablePipelineResult run(RunnerApi.Pipeline pipeline, JobInfo jobInfo)
pipelineOptions.as(MetricsOptions.class),
result);
metricsPusher.start();
if (pipelineOptions.getEventLogEnabled()) {
eventLoggingListener.onApplicationStart(
new SparkListenerApplicationStart(
jobInfo.jobId(),
ibzib marked this conversation as resolved.
Show resolved Hide resolved
scala.Option.apply(jobInfo.jobName()),
Instant.now().getMillis(),
ibzib marked this conversation as resolved.
Show resolved Hide resolved
jsc.sparkUser(),
scala.Option.apply(jobInfo.jobName()),
scala.Option.apply(
JavaConverters.mapAsScalaMapConverter(
SparkBeamMetric.renderAllToString(result.metrics()))
.asScala())));
eventLoggingListener.onApplicationEnd(
new SparkListenerApplicationEnd(Instant.now().getMillis()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure pipeline end time (and also start time for that matter) is itself a metric. To keep things consistent, it'd be better to use that metric here instead of Instant.now().

Copy link
Author

@ghost ghost Jan 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I printed out the results of renderAll method I didn't found such metrics for whole pipeline only for it's parts. Maybe not all metrics appear in renderAll method or should I filter for them specifically?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure, it's not a blocker for this PR though. Thanks for checking.

eventLoggingListener.stop();
}

return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,11 @@
/**
* An adapter between the {@link MetricsContainerStepMap} and Codahale's {@link Metric} interface.
*/
class SparkBeamMetric implements Metric {
public class SparkBeamMetric implements Metric {
private static final String ILLEGAL_CHARACTERS = "[^A-Za-z0-9-]";

Map<String, ?> renderAll() {
static Map<String, ?> renderAll(MetricResults metricResults) {
Map<String, Object> metrics = new HashMap<>();
MetricResults metricResults =
asAttemptedOnlyMetricResults(MetricsAccumulator.getInstance().value());
MetricQueryResults metricQueryResults = metricResults.allMetrics();
for (MetricResult<Long> metricResult : metricQueryResults.getCounters()) {
metrics.put(renderName(metricResult), metricResult.getAttempted());
Expand All @@ -63,8 +61,24 @@ class SparkBeamMetric implements Metric {
return metrics;
}

public static Map<String, String> renderAllToString(MetricResults metricResults) {
Map<String, String> metricsString = new HashMap<>();
for (Map.Entry<String, ?> entry : renderAll(metricResults).entrySet()) {
String key = entry.getKey();
String value = String.valueOf(entry.getValue());
metricsString.put(key, value);
}
return metricsString;
}

Map<String, ?> renderAll() {
MetricResults metricResults =
asAttemptedOnlyMetricResults(MetricsAccumulator.getInstance().value());
return renderAll(metricResults);
}

@VisibleForTesting
String renderName(MetricResult<?> metricResult) {
static String renderName(MetricResult<?> metricResult) {
MetricKey key = metricResult.getKey();
MetricName name = key.metricName();
String step = key.stepName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public void testRenderName() {
"myStep.one.two(three)", MetricName.named("myNameSpace//", "myName()")),
123,
456);
String renderedName = new SparkBeamMetric().renderName(metricResult);
String renderedName = SparkBeamMetric.renderName(metricResult);
assertThat(
"Metric name was not rendered correctly",
renderedName,
Expand Down