Skip to content

Conversation

@GuiSong01
Copy link
Contributor

@GuiSong01 GuiSong01 commented Nov 14, 2025

Support kafka-clients-3.9.x intercept

Upgrade kafka-clients version in optional-reporter-plugins to 3.9.1

@wu-sheng
Copy link
Member

To add kafka new client version support, you should add new version into test scenarios.

https://github.com/apache/skywalking-java/blob/main/test/plugin/scenarios/kafka-scenario/support-version.list#L11

As we have 3.7.1, you should add newer versions.

@wu-sheng wu-sheng added enhancement New feature or request plugin labels Nov 15, 2025
@wu-sheng
Copy link
Member

As we don't have Spring Kafka 3+, please add them as well.

https://github.com/apache/skywalking-java/tree/main/test/plugin/scenarios/spring-kafka-2.3.x-scenario

I am not sure whether you need a new scenario project. If so, please add one, and make it in the proper GHA control file.

@GuiSong01
Copy link
Contributor Author

To add kafka new client version support, you should add new version into test scenarios.

https://github.com/apache/skywalking-java/blob/main/test/plugin/scenarios/kafka-scenario/support-version.list#L11

As we have 3.7.1, you should add newer versions.

I have not modified the kafka-client plugin, so there is no need to add new test scenarios. This PR enhances the spring-kafka plugin by adding support for spring-kafka 3.1.0+ versions. The reason is that spring-kafka 3.1.0+ introduces an internal class named ExtendedKafkaConsumer in DefaultKafkaConsumerFactory, which caused the original spring-kafka 2.x version plugin to become ineffective.

In spring-kafka 2.x, span injection was implemented by intercepting the class org.apache.kafka.clients.consumer.KafkaConsumer in the kafka-plugin. However, in spring-kafka 3.1.0+, the consumer class has been replaced by ExtendedKafkaConsumer.

This change necessitates the enhancement to ensure compatibility with the newer versions of spring-kafka.

@GuiSong01
Copy link
Contributor Author

As we don't have Spring Kafka 3+, please add them as well.

https://github.com/apache/skywalking-java/tree/main/test/plugin/scenarios/spring-kafka-2.3.x-scenario

I am not sure whether you need a new scenario project. If so, please add one, and make it in the proper GHA control file.

Ok, I'll add a new scenario.

@wu-sheng
Copy link
Member

To add kafka new client version support, you should add new version into test scenarios.

https://github.com/apache/skywalking-java/blob/main/test/plugin/scenarios/kafka-scenario/support-version.list#L11

As we have 3.7.1, you should add newer versions.

I have not modified the kafka-client plugin, so there is no need to add new test scenarios. This PR enhances the spring-kafka plugin by adding support for spring-kafka 3.1.0+ versions. The reason is that spring-kafka 3.1.0+ introduces an internal class named ExtendedKafkaConsumer in DefaultKafkaConsumerFactory, which caused the original spring-kafka 2.x version plugin to become ineffective.

In spring-kafka 2.x, span injection was implemented by intercepting the class org.apache.kafka.clients.consumer.KafkaConsumer in the kafka-plugin. However, in spring-kafka 3.1.0+, the consumer class has been replaced by ExtendedKafkaConsumer.

This change necessitates the enhancement to ensure compatibility with the newer versions of spring-kafka.

This makes sense to me.
I was reminding you there are tests to be added.
And as you changed the plugin name, this is a breaking change. Make sure this is mentioned in change logs.

@wu-sheng
Copy link
Member

Your new test is not added into GitHub action control file. Please fix.

@GuiSong01
Copy link
Contributor Author

Your new test is not added into GitHub action control file. Please fix.

Fixed

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This pull request enhances the Spring Kafka plugin to support newer versions of kafka-clients (3.7.1+) and spring-kafka (3.1.0+), while maintaining backward compatibility with version 2.x. The PR renames the plugin module from spring-kafka-2.x-plugin to spring-kafka-2.x-3.x-plugin and upgrades the kafka-clients dependency to version 3.9.1.

Key changes include:

  • Adding support for Spring Kafka 3.x by instrumenting ExtendedKafkaConsumer class introduced in newer versions
  • Creating a new test scenario for spring-kafka-3.3.x to verify the enhanced functionality
  • Upgrading kafka-clients version from 2.4.1 to 3.9.1 in optional-reporter-plugins

Reviewed Changes

Copilot reviewed 21 out of 26 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
test/plugin/scenarios/spring-kafka-3.3.x-scenario/* New test scenario for spring-kafka 3.3.x with controller, configuration, and expected data
apm-sniffer/apm-sdk-plugin/spring-plugins/spring-kafka-2.x-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/kafka/* New interceptors and instrumentation for ExtendedKafkaConsumer to support spring-kafka 3.x
apm-sniffer/apm-sdk-plugin/spring-plugins/spring-kafka-2.x-3.x-plugin/src/test/java/* Unit tests for the new ExtendedKafkaConsumer interceptor
apm-sniffer/apm-sdk-plugin/spring-plugins/spring-kafka-2.x-3.x-plugin/pom.xml Renamed artifact ID and added kafka-clients 3.9.1 dependency with test dependencies
apm-sniffer/optional-reporter-plugins/pom.xml Upgraded kafka-clients version from 2.4.1 to 3.9.1
docs/en/setup/service-agent/java-agent/Plugin-list.md Updated documentation to reflect support for spring-kafka 2.x/3.x
CHANGES.md Added changelog entries for the enhancements
.github/workflows/plugins-test.3.yaml Added spring-kafka-3.3.x-scenario to test workflow

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@GuiSong01 GuiSong01 force-pushed the main branch 2 times, most recently from 162e088 to e7c145b Compare November 16, 2025 10:23
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;

public class ExtendedConstructorInterceptPoint implements InstanceConstructorInterceptor {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public class ExtendedConstructorInterceptPoint implements InstanceConstructorInterceptor {
public class ExtendedConstructorInterceptor implements InstanceConstructorInterceptor {

@Override
public void onConstruct(final EnhancedInstance objInst, final Object[] allArguments) throws Throwable {
ExtendedConsumerEnhanceRequiredInfo requiredInfo = new ExtendedConsumerEnhanceRequiredInfo();
extractConsumerConfig(allArguments, requiredInfo);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not check argument type and arg list in the interceptor. The more proper way is setting the ElementMatcher for the specific method(s).

@wu-sheng
Copy link
Member

For your new case

Error: COMPILATION ERROR :
Error: /home/runner/work/skywalking-java/skywalking-java/test/plugin/scenarios/spring-kafka-3.3.x-scenario/src/main/java/test/apache/skywalking/apm/testcase/spring/kafka/Application.java:[21,32] cannot access org.springframework.boot.SpringApplication
bad class file: /home/runner/.m2/repository/org/springframework/boot/spring-boot/3.3.10/spring-boot-3.3.10.jar(org/springframework/boot/SpringApplication.class)
class file has wrong version 61.0, should be 52.0
Please remove or make sure it appears in the correct subdirectory of the classpath.
Error: Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project spring-kafka-3.3.x-scenario: Compilation failure
Error: /home/runner/work/skywalking-java/skywalking-java/test/plugin/scenarios/spring-kafka-3.3.x-scenario/src/main/java/test/apache/skywalking/apm/testcase/spring/kafka/Application.java:[21,32] cannot access org.springframework.boot.SpringApplication
Error: bad class file: /home/runner/.m2/repository/org/springframework/boot/spring-boot/3.3.10/spring-boot-3.3.10.jar(org/springframework/boot/SpringApplication.class)
Error: class file has wrong version 61.0, should be 52.0
Error: Please remove or make sure it appears in the correct subdirectory of the classpath.
Error: -> [Help 1]
Error:
Error: To see the full stack trace of the errors, re-run Maven with the -e switch.
Error: Re-run Maven using the -X switch to enable full debug logging.

This compiling error should indicate JVM version incorrect. For newer JDK is requried, you should put this case into another other GHA control file.

@GuiSong01
Copy link
Contributor Author

For your new case

Error: COMPILATION ERROR :
Error: /home/runner/work/skywalking-java/skywalking-java/test/plugin/scenarios/spring-kafka-3.3.x-scenario/src/main/java/test/apache/skywalking/apm/testcase/spring/kafka/Application.java:[21,32] cannot access org.springframework.boot.SpringApplication
bad class file: /home/runner/.m2/repository/org/springframework/boot/spring-boot/3.3.10/spring-boot-3.3.10.jar(org/springframework/boot/SpringApplication.class)
class file has wrong version 61.0, should be 52.0
Please remove or make sure it appears in the correct subdirectory of the classpath.
Error: Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project spring-kafka-3.3.x-scenario: Compilation failure
Error: /home/runner/work/skywalking-java/skywalking-java/test/plugin/scenarios/spring-kafka-3.3.x-scenario/src/main/java/test/apache/skywalking/apm/testcase/spring/kafka/Application.java:[21,32] cannot access org.springframework.boot.SpringApplication
Error: bad class file: /home/runner/.m2/repository/org/springframework/boot/spring-boot/3.3.10/spring-boot-3.3.10.jar(org/springframework/boot/SpringApplication.class)
Error: class file has wrong version 61.0, should be 52.0
Error: Please remove or make sure it appears in the correct subdirectory of the classpath.
Error: -> [Help 1]
Error:
Error: To see the full stack trace of the errors, re-run Maven with the -e switch.
Error: Re-run Maven using the -X switch to enable full debug logging.

This compiling error should indicate JVM version incorrect. For newer JDK is requried, you should put this case into another other GHA control file.

Yes, I've identified this error. Spring Boot 3.x requires JDK 17, and I'll fix this issue.

@wu-sheng
Copy link
Member

To start mock collector
http://localhost:12800/receiveData: HTTP/1.1 200
Exception in thread "main" java.lang.reflect.InvocationTargetException
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:569)
at org.springframework.boot.loader.launch.Launcher.launch(Launcher.java:102)
at org.springframework.boot.loader.launch.Launcher.launch(Launcher.java:64)
at org.springframework.boot.loader.launch.JarLauncher.main(JarLauncher.java:40)
Caused by: java.lang.IllegalStateException: java.lang.NoSuchMethodError: 'java.util.LinkedHashSet org.springframework.util.CollectionUtils.newLinkedHashSet(int)'
at org.springframework.boot.SpringApplication.handleRunFailure(SpringApplication.java:825)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:345)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1363)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1352)
at test.apache.skywalking.apm.testcase.spring.kafka.Application.main(Application.java:28)
... 7 more
Caused by: java.lang.NoSuchMethodError: 'java.util.LinkedHashSet org.springframework.util.CollectionUtils.newLinkedHashSet(int)'
at org.springframework.context.annotation.AnnotationConfigUtils.registerAnnotationConfigProcessors(AnnotationConfigUtils.java:157)
at org.springframework.context.annotation.AnnotationConfigUtils.registerAnnotationConfigProcessors(AnnotationConfigUtils.java:133)
at org.springframework.context.annotation.AnnotatedBeanDefinitionReader.(AnnotatedBeanDefinitionReader.java:89)
at org.springframework.context.annotation.AnnotatedBeanDefinitionReader.(AnnotatedBeanDefinitionReader.java:72)
at org.springframework.boot.web.servlet.context.AnnotationConfigServletWebServerApplicationContext.(AnnotationConfigServletWebServerApplicationContext.java:73)
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContextFactory.createContext(ServletWebServerApplicationContextFactory.java:52)
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContextFactory.create(ServletWebServerApplicationContextFactory.java:47)
at org.springframework.boot.DefaultApplicationContextFactory.getFromSpringFactories(DefaultApplicationContextFactory.java:70)
at org.springframework.boot.DefaultApplicationContextFactory.create(DefaultApplicationContextFactory.java:50)
at org.springframework.boot.SpringApplication.createApplicationContext(SpringApplication.java:589)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:332)
... 10 more
[ERROR] spring-kafka-3.3.x-scenario-3.3.10 url=http://localhost:8080/spring-kafka-3.3.x-scenario/case/healthCheck, status= health check failed!

It seems your demo app can't boot successfully.

@wu-sheng
Copy link
Member

Did you find the issue of testing failue?

@GuiSong01
Copy link
Contributor Author

Did you find the issue of testing failue?

Yes, the reason is the Spring version matching issue; I'll verify it thoroughly locally first

@GuiSong01 GuiSong01 changed the title Enhance spring-kafka plugin to support kafka-clients 3.7.1+ with spring-kafka 3.1.0+ & Upgrade kafka-clients version in optional-reporter-plugins to 3.9.1 Support kafka-clients-3.9.x intercept & Upgrade kafka-clients version in optional-reporter-plugins to 3.9.1 Nov 20, 2025
@GuiSong01
Copy link
Contributor Author

After local testing, I found the issue wasn't caused by spring-kafka's new ExtendKafkaConsumer class. The real cause was kafka-clients 3.9.0 renaming LegacyKafkaConsumer to ClassicKafkaConsumer, which broke the existing interceptor. I'll add a new interceptor, resubmit the code, and update the issue

@wu-sheng wu-sheng merged commit 67ab8b2 into apache:main Nov 20, 2025
207 of 230 checks passed
@wu-sheng wu-sheng added this to the 9.6.0 milestone Nov 23, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request plugin

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants