Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,9 @@ public void setupOpenLineage(DDTraceId traceId) {
public synchronized void onApplicationStart(SparkListenerApplicationStart applicationStart) {
this.applicationStart = applicationStart;

if (isRunningOnDatabricks) {
log.info("databricksClusterName{}", databricksClusterName);
}
if (openLineageSparkListener == null) {
openLineageSparkListener =
InstanceStore.of(SparkListenerInterface.class).get("openLineageListener");
Expand All @@ -246,6 +249,7 @@ public synchronized void onApplicationStart(SparkListenerApplicationStart applic
.orElse(predeterminedTraceIdContext.getTraceId()));
}
notifyOl(x -> openLineageSparkListener.onApplicationStart(x), applicationStart);
log.info("end of application start");
}

private void initApplicationSpanIfNotInitialized() {
Expand All @@ -254,7 +258,7 @@ private void initApplicationSpanIfNotInitialized() {
}

log.debug("Starting tracer application span.");

log.info("Not a databricks span");
AgentTracer.SpanBuilder builder = buildSparkSpan("spark.application", null);

if (applicationStart != null) {
Expand Down Expand Up @@ -308,6 +312,9 @@ private void captureOpenlineageContextIfPresent(

@Override
public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
if (isRunningOnDatabricks) {
log.info("On Application End");
}
log.info(
"Received spark application end event, finish trace on this event: {}",
finishTraceOnApplicationEnd);
Expand All @@ -325,17 +332,46 @@ public synchronized void finishApplication(
log.info("Finishing spark application trace");

if (applicationEnded) {
log.info(
"finishApplicationEnded: isRunningOnDatabricks={}, databricksClusterName={}, applicationSpan={}, jobCount={}",
isRunningOnDatabricks,
databricksClusterName,
applicationSpan != null ? "exists" : "null",
jobCount);
return;
}
applicationEnded = true;

if (applicationSpan == null && jobCount > 0) {
log.info(
"finishApplication: isRunningOnDatabricks={}, databricksClusterName={}, applicationSpan={}, jobCount={}",
isRunningOnDatabricks,
databricksClusterName,
applicationSpan != null ? "exists" : "null",
jobCount);

if ((applicationSpan == null && jobCount > 0) || isRunningOnDatabricks) {
// If the application span is not initialized, but spark jobs have been executed, all those
// spark jobs were databricks or streaming. In this case we don't send the application span
log.info(
"finishApplicationInDatabricksCheck: isRunningOnDatabricks={}, databricksClusterName={}, applicationSpan={}, jobCount={}",
isRunningOnDatabricks,
databricksClusterName,
applicationSpan != null ? "exists" : "null",
jobCount);
return;
}
initApplicationSpanIfNotInitialized();

// if (applicationSpan == null) {
// // On Databricks or streaming environments, the application span is not created.
// // Flush any remaining traces and return.
// log.info(
// "No application span created, skipping. isRunningOnDatabricks={}",
// isRunningOnDatabricks);
// tracer.flush();
// return;
// }

if (throwable != null) {
applicationSpan.addThrowable(throwable);
} else if (exitCode != 0) {
Expand Down