Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spark: Change SparkPropertyFacetBuilder to support recording spark run time … #2523

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.openlineage.spark.agent.facets.ErrorFacet;
import io.openlineage.spark.agent.facets.SparkVersionFacet;
import io.openlineage.spark.agent.facets.builder.SparkProcessingEngineRunFacetBuilderDelegate;
import io.openlineage.spark.agent.facets.builder.SparkPropertyFacetBuilder;
import io.openlineage.spark.agent.util.PathUtils;
import io.openlineage.spark.agent.util.PlanUtils;
import io.openlineage.spark.agent.util.ScalaConversionUtils;
Expand Down Expand Up @@ -46,14 +47,7 @@
import org.apache.spark.rdd.HadoopRDD;
import org.apache.spark.rdd.MapPartitionsRDD;
import org.apache.spark.rdd.RDD;
import org.apache.spark.scheduler.ActiveJob;
import org.apache.spark.scheduler.JobFailed;
import org.apache.spark.scheduler.JobResult;
import org.apache.spark.scheduler.ResultStage;
import org.apache.spark.scheduler.SparkListenerJobEnd;
import org.apache.spark.scheduler.SparkListenerJobStart;
import org.apache.spark.scheduler.SparkListenerStageCompleted;
import org.apache.spark.scheduler.SparkListenerStageSubmitted;
import org.apache.spark.scheduler.*;
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd;
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart;
import org.apache.spark.util.SerializableJobConf;
Expand Down Expand Up @@ -210,7 +204,7 @@ public void start(SparkListenerJobStart jobStart) {
.eventType(OpenLineage.RunEvent.EventType.START)
.inputs(buildInputs(inputs))
.outputs(buildOutputs(outputs))
.run(ol.newRunBuilder().runId(runId).facets(buildRunFacets(null, ol)).build())
.run(ol.newRunBuilder().runId(runId).facets(buildRunFacets(null, ol, jobStart)).build())
.job(buildJob(jobStart.jobId()))
.build();

Expand Down Expand Up @@ -239,7 +233,7 @@ public void end(SparkListenerJobEnd jobEnd) {
.run(
ol.newRunBuilder()
.runId(runId)
.facets(buildRunFacets(buildJobErrorFacet(jobEnd.jobResult()), ol))
.facets(buildRunFacets(buildJobErrorFacet(jobEnd.jobResult()), ol, jobEnd))
.build())
.job(buildJob(jobEnd.jobId()))
.build();
Expand All @@ -248,7 +242,8 @@ public void end(SparkListenerJobEnd jobEnd) {
eventEmitter.emit(event);
}

protected OpenLineage.RunFacets buildRunFacets(ErrorFacet jobError, OpenLineage ol) {
protected OpenLineage.RunFacets buildRunFacets(
ErrorFacet jobError, OpenLineage ol, SparkListenerEvent event) {
OpenLineage.RunFacetsBuilder runFacetsBuilder = ol.newRunFacetsBuilder();
runFacetsBuilder.parent(buildApplicationParentFacet());
if (jobError != null) {
Expand All @@ -257,6 +252,7 @@ protected OpenLineage.RunFacets buildRunFacets(ErrorFacet jobError, OpenLineage

addSparkVersionFacet(runFacetsBuilder);
addProcessingEventFacet(runFacetsBuilder, ol);
addSparkPropertyFacet(runFacetsBuilder, event);

return runFacetsBuilder.build();
}
Expand All @@ -275,6 +271,10 @@ private void addProcessingEventFacet(OpenLineage.RunFacetsBuilder b0, OpenLineag
});
}

private void addSparkPropertyFacet(OpenLineage.RunFacetsBuilder b0, SparkListenerEvent event) {
b0.put("spark_properties", new SparkPropertyFacetBuilder().buildFacet(event));
}

private OpenLineage.ParentRunFacet buildApplicationParentFacet() {
return PlanUtils.parentRunFacet(
eventEmitter.getApplicationRunId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,42 @@
import io.openlineage.spark.agent.facets.SparkPropertyFacet;
import io.openlineage.spark.api.CustomFacetBuilder;
import io.openlineage.spark.api.OpenLineageContext;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.scheduler.SparkListenerEvent;
import org.apache.spark.scheduler.SparkListenerJobStart;
import org.apache.spark.sql.SparkSession;

@Slf4j
public class SparkPropertyFacetBuilder
extends CustomFacetBuilder<SparkListenerJobStart, SparkPropertyFacet> {
extends CustomFacetBuilder<SparkListenerEvent, SparkPropertyFacet> {
private static final Set<String> DEFAULT_ALLOWED_PROPERTIES =
new HashSet<>(Arrays.asList("spark.master", "spark.app.name"));
private static final String ALLOWED_PROPERTIES_KEY = "spark.openlineage.capturedProperties";
private final SparkConf conf;
private final Set<String> allowerProperties;
private SparkConf conf;
private Set<String> allowerProperties;

public SparkPropertyFacetBuilder(OpenLineageContext context) {
conf = context.getSparkContext().getConf();
fillConfAndAllowerProperties(context.getSparkContext());
}

public SparkPropertyFacetBuilder() {
try {
SparkSession session = SparkSession.active();
fillConfAndAllowerProperties(session.sparkContext());
} catch (IllegalStateException ie) {
log.info("No active or default Spark session found");
conf = new SparkConf();
allowerProperties = new HashSet<>();
}
}

private void fillConfAndAllowerProperties(SparkContext context) {
conf = context.getConf();
allowerProperties =
conf.contains(ALLOWED_PROPERTIES_KEY)
? Arrays.stream(conf.get(ALLOWED_PROPERTIES_KEY).split(",")).collect(Collectors.toSet())
Expand All @@ -36,14 +52,30 @@ public SparkPropertyFacetBuilder(OpenLineageContext context) {

@Override
protected void build(
SparkListenerJobStart event, BiConsumer<String, ? super SparkPropertyFacet> consumer) {
SparkListenerEvent event, BiConsumer<String, ? super SparkPropertyFacet> consumer) {
consumer.accept("spark_properties", buildFacet(event));
}

public SparkPropertyFacet buildFacet(SparkListenerEvent event) {
Map<String, Object> m = new HashMap<>();
Arrays.stream(conf.getAll())
.filter(t -> allowerProperties.contains(t._1))
.forEach(t -> m.putIfAbsent(t._1, t._2));
event.properties().entrySet().stream()
.filter(e -> allowerProperties.contains(e.getKey()))
.forEach(e -> m.putIfAbsent(e.getKey().toString(), e.getValue()));
consumer.accept("spark_properties", new SparkPropertyFacet(m));
if (event instanceof SparkListenerJobStart) {
SparkListenerJobStart startEvent = (SparkListenerJobStart) event;
startEvent.properties().entrySet().stream()
.filter(e -> allowerProperties.contains(e.getKey()))
.forEach(e -> m.putIfAbsent(e.getKey().toString(), e.getValue()));
}

try {
SparkSession session = SparkSession.active();
allowerProperties.forEach(item -> m.putIfAbsent(item, session.conf().get(item)));
} catch (RuntimeException e) {
log.info(
"Cannot add SparkPropertyFacet: Spark session is in a wrong status or a key in capturedProperties does not exist in run-time config");
}

return new SparkPropertyFacet(m);
}
}