-
Notifications
You must be signed in to change notification settings - Fork 318
Track schema registry usage #9974
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Track schema registry usage #9974
Conversation
This comment has been minimized.
This comment has been minimized.
BenchmarksStartupParameters
See matching parameters
SummaryFound 0 performance improvements and 0 performance regressions! Performance is the same for 59 metrics, 6 unstable metrics. Startup time reports for insecure-bankgantt
title insecure-bank - global startup overhead: candidate=1.56.0-SNAPSHOT~325ae25d71, baseline=1.56.0-SNAPSHOT~5adec51856
dateFormat X
axisFormat %s
section tracing
Agent [baseline] (1.098 s) : 0, 1097796
Total [baseline] (8.921 s) : 0, 8921060
Agent [candidate] (1.098 s) : 0, 1098038
Total [candidate] (8.874 s) : 0, 8874118
section iast
Agent [baseline] (1.241 s) : 0, 1240766
Total [baseline] (9.606 s) : 0, 9606500
Agent [candidate] (1.245 s) : 0, 1244581
Total [candidate] (9.546 s) : 0, 9545743
gantt
title insecure-bank - break down per module: candidate=1.56.0-SNAPSHOT~325ae25d71, baseline=1.56.0-SNAPSHOT~5adec51856
dateFormat X
axisFormat %s
section tracing
crashtracking [baseline] (1.446 ms) : 0, 1446
crashtracking [candidate] (1.444 ms) : 0, 1444
BytebuddyAgent [baseline] (705.032 ms) : 0, 705032
BytebuddyAgent [candidate] (705.282 ms) : 0, 705282
GlobalTracer [baseline] (248.132 ms) : 0, 248132
GlobalTracer [candidate] (248.54 ms) : 0, 248540
AppSec [baseline] (32.275 ms) : 0, 32275
AppSec [candidate] (32.367 ms) : 0, 32367
Debugger [baseline] (63.62 ms) : 0, 63620
Debugger [candidate] (63.272 ms) : 0, 63272
Remote Config [baseline] (627.635 µs) : 0, 628
Remote Config [candidate] (633.094 µs) : 0, 633
Telemetry [baseline] (8.215 ms) : 0, 8215
Telemetry [candidate] (8.116 ms) : 0, 8116
Flare Poller [baseline] (3.713 ms) : 0, 3713
Flare Poller [candidate] (3.637 ms) : 0, 3637
section iast
crashtracking [baseline] (1.457 ms) : 0, 1457
crashtracking [candidate] (1.461 ms) : 0, 1461
BytebuddyAgent [baseline] (833.301 ms) : 0, 833301
BytebuddyAgent [candidate] (836.266 ms) : 0, 836266
GlobalTracer [baseline] (237.216 ms) : 0, 237216
GlobalTracer [candidate] (238.921 ms) : 0, 238921
IAST [baseline] (28.342 ms) : 0, 28342
IAST [candidate] (28.529 ms) : 0, 28529
AppSec [baseline] (33.266 ms) : 0, 33266
AppSec [candidate] (32.995 ms) : 0, 32995
Debugger [baseline] (60.409 ms) : 0, 60409
Debugger [candidate] (59.746 ms) : 0, 59746
Remote Config [baseline] (552.552 µs) : 0, 553
Remote Config [candidate] (539.317 µs) : 0, 539
Telemetry [baseline] (7.743 ms) : 0, 7743
Telemetry [candidate] (7.652 ms) : 0, 7652
Flare Poller [baseline] (3.467 ms) : 0, 3467
Flare Poller [candidate] (3.452 ms) : 0, 3452
Startup time reports for petclinicgantt
title petclinic - global startup overhead: candidate=1.56.0-SNAPSHOT~325ae25d71, baseline=1.56.0-SNAPSHOT~5adec51856
dateFormat X
axisFormat %s
section tracing
Agent [baseline] (1.104 s) : 0, 1104402
Total [baseline] (10.811 s) : 0, 10811487
Agent [candidate] (1.096 s) : 0, 1096257
Total [candidate] (10.759 s) : 0, 10758585
section appsec
Agent [baseline] (1.282 s) : 0, 1281716
Total [baseline] (11.125 s) : 0, 11124816
Agent [candidate] (1.278 s) : 0, 1278212
Total [candidate] (11.15 s) : 0, 11150089
section iast
Agent [baseline] (1.257 s) : 0, 1257479
Total [baseline] (11.359 s) : 0, 11358852
Agent [candidate] (1.238 s) : 0, 1238484
Total [candidate] (11.281 s) : 0, 11280554
section profiling
Agent [baseline] (1.233 s) : 0, 1232776
Total [baseline] (11.098 s) : 0, 11097565
Agent [candidate] (1.23 s) : 0, 1230262
Total [candidate] (11.025 s) : 0, 11024991
gantt
title petclinic - break down per module: candidate=1.56.0-SNAPSHOT~325ae25d71, baseline=1.56.0-SNAPSHOT~5adec51856
dateFormat X
axisFormat %s
section tracing
crashtracking [baseline] (1.464 ms) : 0, 1464
crashtracking [candidate] (1.437 ms) : 0, 1437
BytebuddyAgent [baseline] (708.449 ms) : 0, 708449
BytebuddyAgent [candidate] (703.624 ms) : 0, 703624
GlobalTracer [baseline] (249.784 ms) : 0, 249784
GlobalTracer [candidate] (247.984 ms) : 0, 247984
AppSec [baseline] (32.477 ms) : 0, 32477
AppSec [candidate] (32.153 ms) : 0, 32153
Debugger [baseline] (64.579 ms) : 0, 64579
Debugger [candidate] (63.872 ms) : 0, 63872
Remote Config [baseline] (637.092 µs) : 0, 637
Remote Config [candidate] (627.966 µs) : 0, 628
Telemetry [baseline] (8.339 ms) : 0, 8339
Telemetry [candidate] (8.141 ms) : 0, 8141
Flare Poller [baseline] (3.772 ms) : 0, 3772
Flare Poller [candidate] (3.719 ms) : 0, 3719
section appsec
crashtracking [baseline] (1.451 ms) : 0, 1451
crashtracking [candidate] (1.448 ms) : 0, 1448
BytebuddyAgent [baseline] (728.704 ms) : 0, 728704
BytebuddyAgent [candidate] (728.782 ms) : 0, 728782
GlobalTracer [baseline] (241.681 ms) : 0, 241681
GlobalTracer [candidate] (239.72 ms) : 0, 239720
IAST [baseline] (25.144 ms) : 0, 25144
IAST [candidate] (24.725 ms) : 0, 24725
AppSec [baseline] (175.584 ms) : 0, 175584
AppSec [candidate] (174.217 ms) : 0, 174217
Debugger [baseline] (60.897 ms) : 0, 60897
Debugger [candidate] (61.512 ms) : 0, 61512
Remote Config [baseline] (698.812 µs) : 0, 699
Remote Config [candidate] (662.018 µs) : 0, 662
Telemetry [baseline] (8.498 ms) : 0, 8498
Telemetry [candidate] (8.402 ms) : 0, 8402
Flare Poller [baseline] (4.068 ms) : 0, 4068
Flare Poller [candidate] (3.895 ms) : 0, 3895
section iast
crashtracking [baseline] (1.464 ms) : 0, 1464
crashtracking [candidate] (1.453 ms) : 0, 1453
BytebuddyAgent [baseline] (843.872 ms) : 0, 843872
BytebuddyAgent [candidate] (830.018 ms) : 0, 830018
GlobalTracer [baseline] (240.424 ms) : 0, 240424
GlobalTracer [candidate] (237.768 ms) : 0, 237768
IAST [baseline] (26.982 ms) : 0, 26982
IAST [candidate] (28.429 ms) : 0, 28429
AppSec [baseline] (35.564 ms) : 0, 35564
AppSec [candidate] (33.204 ms) : 0, 33204
Debugger [baseline] (61.967 ms) : 0, 61967
Debugger [candidate] (60.949 ms) : 0, 60949
Remote Config [baseline] (563.33 µs) : 0, 563
Remote Config [candidate] (535.993 µs) : 0, 536
Telemetry [baseline] (7.823 ms) : 0, 7823
Telemetry [candidate] (7.675 ms) : 0, 7675
Flare Poller [baseline] (3.576 ms) : 0, 3576
Flare Poller [candidate] (3.514 ms) : 0, 3514
section profiling
crashtracking [baseline] (1.421 ms) : 0, 1421
crashtracking [candidate] (1.429 ms) : 0, 1429
BytebuddyAgent [baseline] (733.097 ms) : 0, 733097
BytebuddyAgent [candidate] (729.972 ms) : 0, 729972
GlobalTracer [baseline] (223.64 ms) : 0, 223640
GlobalTracer [candidate] (223.22 ms) : 0, 223220
AppSec [baseline] (32.551 ms) : 0, 32551
AppSec [candidate] (32.456 ms) : 0, 32456
Debugger [baseline] (62.857 ms) : 0, 62857
Debugger [candidate] (62.979 ms) : 0, 62979
Remote Config [baseline] (643.404 µs) : 0, 643
Remote Config [candidate] (647.452 µs) : 0, 647
Telemetry [baseline] (8.004 ms) : 0, 8004
Telemetry [candidate] (8.106 ms) : 0, 8106
Flare Poller [baseline] (3.767 ms) : 0, 3767
Flare Poller [candidate] (3.839 ms) : 0, 3839
ProfilingAgent [baseline] (97.074 ms) : 0, 97074
ProfilingAgent [candidate] (98.131 ms) : 0, 98131
Profiling [baseline] (97.663 ms) : 0, 97663
Profiling [candidate] (98.72 ms) : 0, 98720
LoadParameters
See matching parameters
SummaryFound 2 performance improvements and 2 performance regressions! Performance is the same for 15 metrics, 17 unstable metrics.
Request duration reports for insecure-bankgantt
title insecure-bank - request duration [CI 0.99] : candidate=1.56.0-SNAPSHOT~325ae25d71, baseline=1.56.0-SNAPSHOT~5adec51856
dateFormat X
axisFormat %s
section baseline
no_agent (1.205 ms) : 1193, 1217
. : milestone, 1205,
iast (3.185 ms) : 3147, 3224
. : milestone, 3185,
iast_FULL (5.818 ms) : 5759, 5876
. : milestone, 5818,
iast_GLOBAL (3.673 ms) : 3620, 3727
. : milestone, 3673,
profiling (2.008 ms) : 1989, 2026
. : milestone, 2008,
tracing (1.863 ms) : 1847, 1879
. : milestone, 1863,
section candidate
no_agent (1.197 ms) : 1185, 1209
. : milestone, 1197,
iast (3.218 ms) : 3182, 3253
. : milestone, 3218,
iast_FULL (5.823 ms) : 5765, 5881
. : milestone, 5823,
iast_GLOBAL (3.469 ms) : 3412, 3527
. : milestone, 3469,
profiling (2.304 ms) : 2281, 2327
. : milestone, 2304,
tracing (1.793 ms) : 1779, 1808
. : milestone, 1793,
Request duration reports for petclinicgantt
title petclinic - request duration [CI 0.99] : candidate=1.56.0-SNAPSHOT~325ae25d71, baseline=1.56.0-SNAPSHOT~5adec51856
dateFormat X
axisFormat %s
section baseline
no_agent (17.856 ms) : 17676, 18037
. : milestone, 17856,
appsec (19.787 ms) : 19584, 19989
. : milestone, 19787,
code_origins (17.766 ms) : 17591, 17940
. : milestone, 17766,
iast (17.51 ms) : 17338, 17682
. : milestone, 17510,
profiling (18.481 ms) : 18300, 18662
. : milestone, 18481,
tracing (17.561 ms) : 17386, 17736
. : milestone, 17561,
section candidate
no_agent (19.473 ms) : 19274, 19672
. : milestone, 19473,
appsec (18.614 ms) : 18423, 18805
. : milestone, 18614,
code_origins (17.791 ms) : 17615, 17968
. : milestone, 17791,
iast (17.786 ms) : 17609, 17962
. : milestone, 17786,
profiling (18.929 ms) : 18738, 19120
. : milestone, 18929,
tracing (17.548 ms) : 17372, 17723
. : milestone, 17548,
DacapoParameters
See matching parameters
SummaryFound 0 performance improvements and 0 performance regressions! Performance is the same for 12 metrics, 0 unstable metrics. Execution time for biojavagantt
title biojava - execution time [CI 0.99] : candidate=1.56.0-SNAPSHOT~325ae25d71, baseline=1.56.0-SNAPSHOT~5adec51856
dateFormat X
axisFormat %s
section baseline
no_agent (15.072 s) : 15072000, 15072000
. : milestone, 15072000,
appsec (14.52 s) : 14520000, 14520000
. : milestone, 14520000,
iast (18.529 s) : 18529000, 18529000
. : milestone, 18529000,
iast_GLOBAL (17.759 s) : 17759000, 17759000
. : milestone, 17759000,
profiling (15.019 s) : 15019000, 15019000
. : milestone, 15019000,
tracing (14.788 s) : 14788000, 14788000
. : milestone, 14788000,
section candidate
no_agent (14.732 s) : 14732000, 14732000
. : milestone, 14732000,
appsec (14.834 s) : 14834000, 14834000
. : milestone, 14834000,
iast (18.55 s) : 18550000, 18550000
. : milestone, 18550000,
iast_GLOBAL (18.203 s) : 18203000, 18203000
. : milestone, 18203000,
profiling (14.559 s) : 14559000, 14559000
. : milestone, 14559000,
tracing (14.92 s) : 14920000, 14920000
. : milestone, 14920000,
Execution time for tomcatgantt
title tomcat - execution time [CI 0.99] : candidate=1.56.0-SNAPSHOT~325ae25d71, baseline=1.56.0-SNAPSHOT~5adec51856
dateFormat X
axisFormat %s
section baseline
no_agent (1.474 ms) : 1462, 1485
. : milestone, 1474,
appsec (2.462 ms) : 2411, 2513
. : milestone, 2462,
iast (2.2 ms) : 2136, 2263
. : milestone, 2200,
iast_GLOBAL (2.244 ms) : 2180, 2308
. : milestone, 2244,
profiling (2.064 ms) : 2012, 2116
. : milestone, 2064,
tracing (2.022 ms) : 1972, 2071
. : milestone, 2022,
section candidate
no_agent (1.477 ms) : 1466, 1489
. : milestone, 1477,
appsec (2.475 ms) : 2423, 2528
. : milestone, 2475,
iast (2.213 ms) : 2148, 2278
. : milestone, 2213,
iast_GLOBAL (2.253 ms) : 2188, 2317
. : milestone, 2253,
profiling (2.089 ms) : 2034, 2143
. : milestone, 2089,
tracing (2.042 ms) : 1991, 2093
. : milestone, 2042,
|
Kafka / producer-benchmarkParameters
See matching parameters
SummaryFound 0 performance improvements and 0 performance regressions! Performance is the same for 3 metrics, 0 unstable metrics. See unchanged results
|
|
Hi! 👋 Thanks for your pull request! 🎉 To help us review it, please make sure to:
If you need help, please check our contributing guidelines. |
amarziali
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are adjustments to be done to avoid using reflection
| public static class ConfigureAdvice { | ||
| @Advice.OnMethodExit(suppress = Throwable.class) | ||
| public static void onExit( | ||
| @Advice.This org.apache.kafka.common.serialization.Deserializer deserializer, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: you can import Deserialzer I think
| InstrumentationContext.get( | ||
| org.apache.kafka.common.serialization.Deserializer.class, Boolean.class) | ||
| .get(deserializer); | ||
| boolean isKey = isKeyObj != null ? isKeyObj : false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| boolean isKey = isKeyObj != null ? isKeyObj : false; | |
| boolean isKey = isKeyObj != null && isKeyObj; |
| public static class ConfigureAdvice { | ||
| @Advice.OnMethodExit(suppress = Throwable.class) | ||
| public static void onExit( | ||
| @Advice.This org.apache.kafka.common.serialization.Serializer serializer, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be imported?
| "datadog.trace.instrumentation.confluentschemaregistry.ClusterIdHolder", | ||
| false, | ||
| consumer.getClass().getClassLoader()); | ||
| holderClass.getMethod("set", String.class).invoke(null, clusterId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally speaking, resolving the method on each run is inefficient. Also, calling a method by reflection is as well inefficient since it implies checking access that can be avoided by unrolling directly the method call with a MethodHandle.
Now the question I have is why should we use reflection to call a method that we own? Instead
- Declare that class as a helper to inject
- Add a depedency on the module that declare that class
If the depedency cannot be added (i.e. circular?) so that class needs to be moved in a common module or in the bootstrap in order to be visible by all the modules that need it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense. I actually realized we have a kafka-common package, that was done exactly for this use case (used for kafka-streams). So I put that class in that package.
| // Set cluster ID for Schema Registry instrumentation | ||
| if (clusterId != null) { | ||
| try { | ||
| Class<?> holderClass = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above
| // Clear cluster ID from Schema Registry instrumentation | ||
| try { | ||
| Class<?> holderClass = | ||
| Class.forName( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above
| boolean isSuccess, | ||
| boolean isKey, | ||
| String operation) { | ||
| log.info( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logging as an info will generate a lot of verbosity on customer side. I would have changed to debug
| writer.startMap(3 + (hasBacklogs ? 1 : 0)); | ||
| boolean hasSchemaRegistryUsages = !bucket.getSchemaRegistryUsages().isEmpty(); | ||
| int mapSize = 3; | ||
| if (hasBacklogs) mapSize++; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: use brackets
| boolean isKey = isKeyObj != null ? isKeyObj : false; | ||
|
|
||
| // Get cluster ID from thread-local (set by Kafka producer instrumentation) | ||
| String clusterId = ClusterIdHolder.get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit] Should we clear the cluster Id from the holder? Just to be extra careful.
As of today, I do not see a path that can bring us here without having passed for poll (which sets the new value)...but just in case in case of api changes in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are clearing it in the place we are setting it.
For the producer, we could clear it after using it, but for the consumer, it's not possible, because you can poll a batch of messages, so if we clear it after using it, we would only have the Kafka cluster ID for the first message.
So for consistency, I was thinking of only setting the Kafka cluster ID from the Kafka instrumentation. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes total sense about the issue with batching.
So for consistency, I was thinking of only setting the Kafka cluster ID from the Kafka instrumentation. What do you think?
Unsure of what you mean, but the current implementation (setting in poll enter + the ClusterIdHolder.clear() that you just added to the poll exit) feels like enough to me. Do you have a way in mind to improve on it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, that's what I was thinking of 👍
| public void trackBacklog(DataStreamsTags tags, long value) {} | ||
|
|
||
| @Override | ||
| public void setSchemaRegistryUsage( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe reportSchemaRegistryUsage?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. Renamed
| * registry serializer/deserializer instrumentation reads it. | ||
| */ | ||
| public class ClusterIdHolder { | ||
| private static final ThreadLocal<String> CLUSTER_ID = new ThreadLocal<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need this to be ThreadLocal? Why don't just use FixedSizeCache (DDCaches.newFixedSizeCache)?
| int schemaId = -1; | ||
|
|
||
| // Extract schema ID from the serialized bytes if successful | ||
| if (isSuccess && result != null && result.length >= 5 && result[0] == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this looks duplicated code. It can be factored in a helper
| cleanup: | ||
| serializer.close() | ||
| deserializer.close() | ||
| ClusterIdHolder.clear() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that rather cleaning the threadLocal it would be good to test that the insturmentation correctly cleaned the threadlocal since it should do that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have just added a test in the Kafka instrumentation for that. In this test, it's set manually.
| } | ||
|
|
||
| public static void clear() { | ||
| CLUSTER_ID.remove(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using set(null) is way more performant
| schemaId = | ||
| ((data[1] & 0xFF) << 24) | ||
| | ((data[2] & 0xFF) << 16) | ||
| | ((data[3] & 0xFF) << 8) | ||
| | (data[4] & 0xFF); | ||
| } catch (Throwable ignored) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that logic is duplicated twice and can be factored in a helper perhaps
amarziali
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks OK. Please be careful when using threadlocals since it can leave stale data. However in this case it does not seem possible to pass data around consumer/producer and serializer/deserializers since they does not look sharing common object that can be used to store an instrumentation context
Kafka / consumer-benchmarkParameters
See matching parameters
SummaryFound 0 performance improvements and 0 performance regressions! Performance is the same for 3 metrics, 0 unstable metrics. See unchanged results
|
What Does This Do
Tracks schema registry usage for Data Streams Monitoring.
This is going to power the Data Streams schema page.
We will get schemas from the schema registry, and know which services use them & how much based on this data.
Motivation
Being able to link data from the schema registry (schemas, each one has an ID), and data from applications (which application uses which schema). This unlocks two features:
Additional Notes
Contributor Checklist
type:and (comp:orinst:) labels in addition to any useful labelsclose,fixor any linking keywords when referencing an issue.Use
solvesinstead, and assign the PR milestone to the issueJira ticket: DSMON-1141