Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

create SpanLink from spark.streaming_batch span to databricks.task.execution span for spark instrumentation if applicable. #6816

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import datadog.trace.api.sampling.SamplingMechanism;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import datadog.trace.bootstrap.instrumentation.api.SpanLink;
import de.thetaphi.forbiddenapis.SuppressForbidden;
import java.io.PrintWriter;
import java.io.StringWriter;
Expand Down Expand Up @@ -260,12 +261,14 @@ private void addDatabricksSpecificTags(
builder.withTag("databricks_job_run_id", databricksJobRunId);
builder.withTag("databricks_task_run_id", databricksTaskRunId);

if (withParentContext) {
AgentSpan.Context parentContext =
new DatabricksParentContext(databricksJobId, databricksJobRunId, databricksTaskRunId);
AgentSpan.Context parentContext =
new DatabricksParentContext(databricksJobId, databricksJobRunId, databricksTaskRunId);

if (parentContext.getTraceId() != DDTraceId.ZERO) {
if (parentContext.getTraceId() != DDTraceId.ZERO) {
if (withParentContext) {
builder.asChildOf(parentContext);
} else {
builder.withLink(SpanLink.from(parentContext));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package datadog.trace.instrumentation.spark

import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.api.DDTags
import datadog.trace.api.Platform
import datadog.trace.api.sampling.PrioritySampling
import datadog.trace.api.sampling.SamplingMechanism
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.StreamingQuery
import scala.Option
import scala.collection.JavaConverters
import scala.collection.immutable.Seq
Expand All @@ -23,6 +26,26 @@ class AbstractSparkStructuredStreamingTest extends AgentTestRunner {
injectSysConfig("dd.integration.spark.enabled", "true")
}

private SparkSession createSparkSession(String appName) {
return SparkSession.builder()
.appName(appName)
.config("spark.master", "local[2]")
.config("spark.sql.shuffle.partitions", "2")
.getOrCreate()
}

private SparkSession createDatabricksSparkSession(String appName) {
return SparkSession.builder()
.appName(appName)
.config("spark.master", "local[2]")
.config("spark.sql.shuffle.partitions", "2")
.config("spark.databricks.sparkContextId", "3291395623902517763")
.config("spark.databricks.job.id", "3822225623902514353")
.config("spark.databricks.job.parentRunId", "3851395623902519743")
.config("spark.databricks.job.runId", "3851395623902519743")
.getOrCreate()
}

private memoryStream(SparkSession sparkSession) {
if (TestSparkComputation.sparkVersion >= '3') {
return new MemoryStream<String>(1, sparkSession.sqlContext(), Option.empty(), Encoders.STRING())
Expand All @@ -31,18 +54,8 @@ class AbstractSparkStructuredStreamingTest extends AgentTestRunner {
return new MemoryStream<String>(1, sparkSession.sqlContext(), Encoders.STRING())
}

def "generate spark structured streaming batches"() {
setup:
def sparkSession = SparkSession.builder()
.config("spark.master", "local[2]")
.config("spark.sql.shuffle.partitions", "2") // Small parallelism to speed up tests
.appName("Sample Streaming Application")
.getOrCreate()

def inputStream = memoryStream(sparkSession)

def query = inputStream
.toDS()
private StreamingQuery generateTestStreamingComputation(Dataset<String> dataSet) {
return dataSet
.selectExpr("value", "current_timestamp() as event_time")
.withWatermark("event_time", "0 seconds")
.groupBy("value")
Expand All @@ -52,6 +65,14 @@ class AbstractSparkStructuredStreamingTest extends AgentTestRunner {
.outputMode("complete")
.format("console")
.start()
}

def "generate spark structured streaming batches"() {
setup:
def sparkSession = createSparkSession("Sample Streaming Application")

def inputStream = memoryStream(sparkSession)
def query = generateTestStreamingComputation(inputStream.toDS())

inputStream.addData(JavaConverters.asScalaBuffer(["foo", "foo", "bar"]).toSeq() as Seq<String>)
query.processAllAvailable()
Expand Down Expand Up @@ -102,6 +123,8 @@ class AbstractSparkStructuredStreamingTest extends AgentTestRunner {
if (TestSparkComputation.sparkVersion >= '3') {
"spark.latest_offset_duration" Long
}
// In non-databricks running environment, SpanLinks should be absent.
assert tag(DDTags.SPAN_LINKS) == null

// Regular spark tags
"spark.available_executor_time" Long
Expand Down Expand Up @@ -206,14 +229,9 @@ class AbstractSparkStructuredStreamingTest extends AgentTestRunner {

def "handle failure during streaming processing"() {
setup:
def sparkSession = SparkSession.builder()
.config("spark.master", "local[2]")
.config("spark.sql.shuffle.partitions", "2") // Small parallelism to speed up tests
.appName("Failing Streaming Application")
.getOrCreate()
def sparkSession = createSparkSession("Failing Streaming Application")

def inputStream = memoryStream(sparkSession)

def query = TestSparkComputation.generateTestFailingStreamingComputation(inputStream.toDS())

try {
Expand Down Expand Up @@ -260,4 +278,56 @@ class AbstractSparkStructuredStreamingTest extends AgentTestRunner {
}
}
}

def "add span links from spark.streaming_batch to databricks.task.execution if applicable"() {
setup:
def sparkSession = createDatabricksSparkSession("Example App")

def inputStream = memoryStream(sparkSession)
def query = generateTestStreamingComputation(inputStream.toDS())

inputStream.addData(JavaConverters.asScalaBuffer(["foo", "foo", "bar"]).toSeq() as Seq<String>)
query.processAllAvailable()

query.stop()
sparkSession.stop()

expect:
assertTraces(1) {
trace(5) {
span {
operationName "spark.streaming_batch"
resourceName "test-query"
spanType "spark"
parent()
assert span.tags.containsKey(DDTags.SPAN_LINKS)
assert span.tags[DDTags.SPAN_LINKS] != null
}
span {
operationName "spark.sql"
spanType "spark"
childOf(span(0))
assert !span.tags.containsKey(DDTags.SPAN_LINKS)
}
span {
operationName "spark.job"
spanType "spark"
childOf(span(1))
assert !span.tags.containsKey(DDTags.SPAN_LINKS)
}
span {
operationName "spark.stage"
spanType "spark"
childOf(span(2))
assert !span.tags.containsKey(DDTags.SPAN_LINKS)
}
span {
operationName "spark.stage"
spanType "spark"
childOf(span(2))
assert !span.tags.containsKey(DDTags.SPAN_LINKS)
}
}
}
}
}