diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java index e490588e503..f02d060a8b6 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java @@ -233,6 +233,9 @@ public void setupOpenLineage(DDTraceId traceId) { public synchronized void onApplicationStart(SparkListenerApplicationStart applicationStart) { this.applicationStart = applicationStart; + if (isRunningOnDatabricks) { + log.info("databricksClusterName{}", databricksClusterName); + } if (openLineageSparkListener == null) { openLineageSparkListener = InstanceStore.of(SparkListenerInterface.class).get("openLineageListener"); @@ -246,6 +249,7 @@ public synchronized void onApplicationStart(SparkListenerApplicationStart applic .orElse(predeterminedTraceIdContext.getTraceId())); } notifyOl(x -> openLineageSparkListener.onApplicationStart(x), applicationStart); + log.info("end of application start"); } private void initApplicationSpanIfNotInitialized() { @@ -254,7 +258,7 @@ private void initApplicationSpanIfNotInitialized() { } log.debug("Starting tracer application span."); - + log.info("Not a databricks span"); AgentTracer.SpanBuilder builder = buildSparkSpan("spark.application", null); if (applicationStart != null) { @@ -308,6 +312,9 @@ private void captureOpenlineageContextIfPresent( @Override public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) { + if (isRunningOnDatabricks) { + log.info("On Application End"); + } log.info( "Received spark application end event, finish trace on this event: {}", finishTraceOnApplicationEnd); @@ -325,17 +332,46 @@ public synchronized void finishApplication( log.info("Finishing spark application trace"); if (applicationEnded) { + log.info( + "finishApplicationEnded: isRunningOnDatabricks={}, databricksClusterName={}, applicationSpan={}, jobCount={}", + isRunningOnDatabricks, + databricksClusterName, + applicationSpan != null ? "exists" : "null", + jobCount); return; } applicationEnded = true; - if (applicationSpan == null && jobCount > 0) { + log.info( + "finishApplication: isRunningOnDatabricks={}, databricksClusterName={}, applicationSpan={}, jobCount={}", + isRunningOnDatabricks, + databricksClusterName, + applicationSpan != null ? "exists" : "null", + jobCount); + + if ((applicationSpan == null && jobCount > 0) || isRunningOnDatabricks) { // If the application span is not initialized, but spark jobs have been executed, all those // spark jobs were databricks or streaming. In this case we don't send the application span + log.info( + "finishApplicationInDatabricksCheck: isRunningOnDatabricks={}, databricksClusterName={}, applicationSpan={}, jobCount={}", + isRunningOnDatabricks, + databricksClusterName, + applicationSpan != null ? "exists" : "null", + jobCount); return; } initApplicationSpanIfNotInitialized(); + // if (applicationSpan == null) { + // // On Databricks or streaming environments, the application span is not created. + // // Flush any remaining traces and return. + // log.info( + // "No application span created, skipping. isRunningOnDatabricks={}", + // isRunningOnDatabricks); + // tracer.flush(); + // return; + // } + if (throwable != null) { applicationSpan.addThrowable(throwable); } else if (exitCode != 0) {