Skip to content

Commit feb33de

Browse files
Fix muzzle mismatch causing Spark jobs on Databricks 17.3 to not be instrumented (#9872)
* relax muzzle check * relax more * spotlessApply * fix muzzle block by avoiding streaming hard deps and relaxing listener bus matchers * remove unnecessary defense on StreamExecution * revert changes on OL listener * improve getMicroBatchExecutionBatchIdKey * import InvocationTargetException * Update AbstractDatadogSparkListener.java * no need reflection since key is known * Update AbstractSparkInstrumentation.java * reflection against SparkPlanInfoAdvice * spotless * [DJM-974] Use reflection for constructor in Scala 2.12, lookup by parameter classes (#9886) --------- Co-authored-by: Charles Yu <charles.yu@datadoghq.com>
1 parent e914e0b commit feb33de

File tree

3 files changed

+54
-17
lines changed

3 files changed

+54
-17
lines changed

dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
import com.google.auto.service.AutoService;
77
import datadog.trace.agent.tooling.InstrumenterModule;
88
import datadog.trace.api.Config;
9+
import de.thetaphi.forbiddenapis.SuppressForbidden;
10+
import java.lang.reflect.Constructor;
911
import net.bytebuddy.asm.Advice;
1012
import org.apache.spark.SparkContext;
1113
import org.apache.spark.sql.execution.SparkPlan;
@@ -14,6 +16,7 @@
1416
import org.slf4j.LoggerFactory;
1517
import scala.Predef;
1618
import scala.collection.JavaConverters;
19+
import scala.collection.immutable.Map;
1720

1821
@AutoService(InstrumenterModule.class)
1922
public class Spark212Instrumentation extends AbstractSparkInstrumentation {
@@ -94,21 +97,37 @@ public static void enter(@Advice.This SparkContext sparkContext) {
9497

9598
public static class SparkPlanInfoAdvice {
9699
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
100+
@SuppressForbidden
97101
public static void exit(
98102
@Advice.Return(readOnly = false) SparkPlanInfo planInfo,
99103
@Advice.Argument(0) SparkPlan plan) {
100104
if (planInfo.metadata().size() == 0
101105
&& (Config.get().isDataJobsParseSparkPlanEnabled()
102106
|| Config.get().isDataJobsExperimentalFeaturesEnabled())) {
103107
Spark212PlanSerializer planUtils = new Spark212PlanSerializer();
104-
planInfo =
105-
new SparkPlanInfo(
106-
planInfo.nodeName(),
107-
planInfo.simpleString(),
108-
planInfo.children(),
109-
JavaConverters.mapAsScalaMap(planUtils.extractFormattedProduct(plan))
110-
.toMap(Predef.$conforms()),
111-
planInfo.metrics());
108+
Map<String, String> meta =
109+
JavaConverters.mapAsScalaMap(planUtils.extractFormattedProduct(plan))
110+
.toMap(Predef.$conforms());
111+
try {
112+
Constructor<?> targetCtor = null;
113+
for (Constructor<?> c : SparkPlanInfo.class.getConstructors()) {
114+
if (c.getParameterCount() == 5) {
115+
targetCtor = c;
116+
break;
117+
}
118+
}
119+
if (targetCtor != null) {
120+
Object newInst =
121+
targetCtor.newInstance(
122+
planInfo.nodeName(),
123+
planInfo.simpleString(),
124+
planInfo.children(),
125+
meta,
126+
planInfo.metrics());
127+
planInfo = (SparkPlanInfo) newInst;
128+
}
129+
} catch (Throwable ignored) {
130+
}
112131
}
113132
}
114133
}

dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
import com.google.auto.service.AutoService;
77
import datadog.trace.agent.tooling.InstrumenterModule;
88
import datadog.trace.api.Config;
9+
import de.thetaphi.forbiddenapis.SuppressForbidden;
10+
import java.lang.reflect.Constructor;
911
import net.bytebuddy.asm.Advice;
1012
import org.apache.spark.SparkContext;
1113
import org.apache.spark.sql.execution.SparkPlan;
@@ -14,6 +16,7 @@
1416
import org.slf4j.LoggerFactory;
1517
import scala.collection.JavaConverters;
1618
import scala.collection.immutable.HashMap;
19+
import scala.collection.immutable.Map;
1720

1821
@AutoService(InstrumenterModule.class)
1922
public class Spark213Instrumentation extends AbstractSparkInstrumentation {
@@ -95,20 +98,36 @@ public static void enter(@Advice.This SparkContext sparkContext) {
9598

9699
public static class SparkPlanInfoAdvice {
97100
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
101+
@SuppressForbidden
98102
public static void exit(
99103
@Advice.Return(readOnly = false) SparkPlanInfo planInfo,
100104
@Advice.Argument(0) SparkPlan plan) {
101105
if (planInfo.metadata().size() == 0
102106
&& (Config.get().isDataJobsParseSparkPlanEnabled()
103107
|| Config.get().isDataJobsExperimentalFeaturesEnabled())) {
104108
Spark213PlanSerializer planUtils = new Spark213PlanSerializer();
105-
planInfo =
106-
new SparkPlanInfo(
107-
planInfo.nodeName(),
108-
planInfo.simpleString(),
109-
planInfo.children(),
110-
HashMap.from(JavaConverters.asScala(planUtils.extractFormattedProduct(plan))),
111-
planInfo.metrics());
109+
Map<String, String> meta =
110+
HashMap.from(JavaConverters.asScala(planUtils.extractFormattedProduct(plan)));
111+
try {
112+
Constructor<?> targetCtor = null;
113+
for (Constructor<?> c : SparkPlanInfo.class.getConstructors()) {
114+
if (c.getParameterCount() == 5) {
115+
targetCtor = c;
116+
break;
117+
}
118+
}
119+
if (targetCtor != null) {
120+
Object newInst =
121+
targetCtor.newInstance(
122+
planInfo.nodeName(),
123+
planInfo.simpleString(),
124+
planInfo.children(),
125+
meta,
126+
planInfo.metrics());
127+
planInfo = (SparkPlanInfo) newInst;
128+
}
129+
} catch (Throwable ignored) {
130+
}
112131
}
113132
}
114133
}

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import org.apache.spark.sql.execution.SQLExecution;
4444
import org.apache.spark.sql.execution.SparkPlanInfo;
4545
import org.apache.spark.sql.execution.metric.SQLMetricInfo;
46-
import org.apache.spark.sql.execution.streaming.MicroBatchExecution;
4746
import org.apache.spark.sql.execution.streaming.StreamExecution;
4847
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd;
4948
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart;
@@ -1243,7 +1242,7 @@ private static String getStreamingBatchKey(Properties properties) {
12431242
}
12441243

12451244
Object queryId = properties.get(StreamExecution.QUERY_ID_KEY());
1246-
Object batchId = properties.get(MicroBatchExecution.BATCH_ID_KEY());
1245+
Object batchId = properties.get("streaming.sql.batchId");
12471246

12481247
if (queryId == null || batchId == null) {
12491248
return null;

0 commit comments

Comments
 (0)