From daf58f3ecbe329c1fb15bea6b6f226e383f4e1d5 Mon Sep 17 00:00:00 2001 From: gzlicanyi Date: Fri, 26 May 2023 16:53:02 +0800 Subject: [PATCH 1/8] Fix the gson dependency in the kafka-reporter-plugin. (#10848) --- CHANGES.md | 1 + .../kafka-reporter-plugin/pom.xml | 13 +++++++++++++ 2 files changed, 14 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index b87d8aa5ba..a0b3338e91 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -18,6 +18,7 @@ Release Notes. * Support Grizzly Trace * Fix possible IllegalStateException when using Micrometer. * Support Grizzly Work ThreadPool Metric Monitor +* Fix the gson dependency in the kafka-reporter-plugin. #### Documentation diff --git a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/pom.xml b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/pom.xml index a0f4fd92d8..bac27707d0 100644 --- a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/pom.xml +++ b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/pom.xml @@ -28,12 +28,21 @@ kafka-reporter-plugin jar + + com.google + + org.apache.kafka kafka-clients ${kafka-clients.version} + + com.google.code.gson + gson + ${gson.version} + @@ -76,6 +85,10 @@ org.slf4j ${shade.package}/org.slf4j + + ${shade.com.google.source} + ${shade.package}.${shade.com.google.source} + From 9a1ad5dc562037493b1c7f69e178aa5c0b05b376 Mon Sep 17 00:00:00 2001 From: gzlicanyi Date: Fri, 26 May 2023 18:17:06 +0800 Subject: [PATCH 2/8] Fix the gson dependency by maven-shade-plugin in the kafka-reporter-plugin. --- .../kafka-reporter-plugin/pom.xml | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/pom.xml b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/pom.xml index bac27707d0..ec7d4cb272 100644 --- a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/pom.xml +++ b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/pom.xml @@ -28,21 +28,12 @@ kafka-reporter-plugin jar - - com.google - - org.apache.kafka kafka-clients ${kafka-clients.version} - - com.google.code.gson - gson - ${gson.version} - @@ -86,8 +77,8 @@ ${shade.package}/org.slf4j - ${shade.com.google.source} - ${shade.package}.${shade.com.google.source} + com.google.gson + ${shade.package}.com.google.gson From c4c7e496a1ed004ab0aba9453f4369587ef5f0f0 Mon Sep 17 00:00:00 2001 From: gzlicanyi Date: Wed, 31 May 2023 15:58:12 +0800 Subject: [PATCH 3/8] Fix deserialization of kafka producer json config in the kafka-reporter-plugin. --- .../core/kafka/KafkaProducerManager.java | 21 ++++++++++++------- .../core/kafka/KafkaProducerManagerTest.java | 16 ++++++++++++-- 2 files changed, 27 insertions(+), 10 deletions(-) diff --git a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java index 9bf52f94da..f42501217b 100644 --- a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java +++ b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java @@ -18,6 +18,7 @@ package org.apache.skywalking.apm.agent.core.kafka; +import com.google.common.reflect.TypeToken; import com.google.gson.Gson; import java.util.ArrayList; import java.util.HashSet; @@ -32,7 +33,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; - import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.DescribeTopicsResult; import org.apache.kafka.clients.producer.KafkaProducer; @@ -105,11 +105,7 @@ public void run() { Properties properties = new Properties(); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Kafka.BOOTSTRAP_SERVERS); - if (StringUtil.isNotEmpty(Kafka.PRODUCER_CONFIG_JSON)) { - Gson gson = new Gson(); - Map config = (Map) gson.fromJson(Kafka.PRODUCER_CONFIG_JSON, Map.class); - config.forEach(properties::setProperty); - } + setPropertiesFromJsonConfig(properties); Kafka.PRODUCER_CONFIG.forEach(properties::setProperty); try (AdminClient adminClient = AdminClient.create(properties)) { @@ -129,12 +125,12 @@ public void run() { }) .filter(Objects::nonNull) .collect(Collectors.toSet()); - + if (!topics.isEmpty()) { LOGGER.warn("kafka topics {} is not exist, connect to kafka cluster abort", topics); return; } - + try { producer = new KafkaProducer<>(properties, new StringSerializer(), new BytesSerializer()); } catch (Exception e) { @@ -147,6 +143,15 @@ public void run() { } } + void setPropertiesFromJsonConfig(Properties properties) { + if (StringUtil.isNotEmpty(Kafka.PRODUCER_CONFIG_JSON)) { + Gson gson = new Gson(); + Map config = gson.fromJson(Kafka.PRODUCER_CONFIG_JSON, + new TypeToken>() { }.getType()); + config.forEach(properties::setProperty); + } + } + private void notifyListeners(KafkaConnectionStatus status) { for (KafkaConnectionStatusListener listener : listeners) { listener.onStatusChanged(status); diff --git a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/test/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManagerTest.java b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/test/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManagerTest.java index 20dc5935dd..a817620bd5 100644 --- a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/test/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManagerTest.java +++ b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/test/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManagerTest.java @@ -20,6 +20,7 @@ import static org.junit.Assert.assertEquals; import java.lang.reflect.Method; +import java.util.Properties; import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; @@ -33,8 +34,8 @@ public void testAddListener() throws Exception { kafkaProducerManager.addListener(new MockListener(counter)); } Method notifyListeners = kafkaProducerManager - .getClass() - .getDeclaredMethod("notifyListeners", KafkaConnectionStatus.class); + .getClass() + .getDeclaredMethod("notifyListeners", KafkaConnectionStatus.class); notifyListeners.setAccessible(true); notifyListeners.invoke(kafkaProducerManager, KafkaConnectionStatus.CONNECTED); @@ -54,6 +55,17 @@ public void testFormatTopicNameThenRegister() { assertEquals(KafkaReporterPluginConfig.Plugin.Kafka.TOPIC_METRICS, value); } + @Test + public void testSetPropertiesFromJsonConfig() { + KafkaProducerManager kafkaProducerManager = new KafkaProducerManager(); + Properties properties = new Properties(); + + KafkaReporterPluginConfig.Plugin.Kafka.PRODUCER_CONFIG_JSON = "{\"batch.size\":32768}"; + kafkaProducerManager.setPropertiesFromJsonConfig(properties); + + assertEquals(properties.get("batch.size"), "32768"); + } + static class MockListener implements KafkaConnectionStatusListener { private AtomicInteger counter; From f0c0c7c62d357449332f327d5e4001cc1cf1721c Mon Sep 17 00:00:00 2001 From: gzlicanyi Date: Wed, 31 May 2023 16:03:06 +0800 Subject: [PATCH 4/8] add changes doc --- CHANGES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES.md b/CHANGES.md index a0b3338e91..31f7576116 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -19,6 +19,7 @@ Release Notes. * Fix possible IllegalStateException when using Micrometer. * Support Grizzly Work ThreadPool Metric Monitor * Fix the gson dependency in the kafka-reporter-plugin. +* Fix deserialization of kafka producer json config in the kafka-reporter-plugin. #### Documentation From e953bf68701ce985ef1d06611e95bc383e251b31 Mon Sep 17 00:00:00 2001 From: gzlicanyi Date: Wed, 31 May 2023 16:24:30 +0800 Subject: [PATCH 5/8] add com.google.common shade in the kafka-reporter-plugin. --- .../optional-reporter-plugins/kafka-reporter-plugin/pom.xml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/pom.xml b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/pom.xml index ec7d4cb272..5b54a76474 100644 --- a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/pom.xml +++ b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/pom.xml @@ -80,6 +80,10 @@ com.google.gson ${shade.package}.com.google.gson + + com.google.gson + ${shade.package}.com.google.common + From 1df8d6460cb987d9eed72df024e6b1d244455383 Mon Sep 17 00:00:00 2001 From: gzlicanyi Date: Wed, 31 May 2023 17:55:05 +0800 Subject: [PATCH 6/8] Fix com.google.common shade in the kafka-reporter-plugin. --- .../optional-reporter-plugins/kafka-reporter-plugin/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/pom.xml b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/pom.xml index 5b54a76474..2d7b77dd9d 100644 --- a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/pom.xml +++ b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/pom.xml @@ -81,7 +81,7 @@ ${shade.package}.com.google.gson - com.google.gson + com.google.common ${shade.package}.com.google.common From c4ef0e516decab7e25895117135c913f40223412 Mon Sep 17 00:00:00 2001 From: gzlicanyi Date: Wed, 31 May 2023 18:11:00 +0800 Subject: [PATCH 7/8] Change use gson TypeToken in the KafkaProducerManager. --- .../optional-reporter-plugins/kafka-reporter-plugin/pom.xml | 4 ---- .../skywalking/apm/agent/core/kafka/KafkaProducerManager.java | 2 +- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/pom.xml b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/pom.xml index 2d7b77dd9d..ec7d4cb272 100644 --- a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/pom.xml +++ b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/pom.xml @@ -80,10 +80,6 @@ com.google.gson ${shade.package}.com.google.gson - - com.google.common - ${shade.package}.com.google.common - diff --git a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java index f42501217b..28ba6abead 100644 --- a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java +++ b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java @@ -18,7 +18,7 @@ package org.apache.skywalking.apm.agent.core.kafka; -import com.google.common.reflect.TypeToken; +import com.google.gson.reflect.TypeToken; import com.google.gson.Gson; import java.util.ArrayList; import java.util.HashSet; From 0ca303da4ce5029976d59094b9bf7ca63601e4a1 Mon Sep 17 00:00:00 2001 From: gzlicanyi Date: Wed, 31 May 2023 18:37:38 +0800 Subject: [PATCH 8/8] delete kafka code --- .../skywalking/apm/agent/core/kafka/KafkaProducerManager.java | 1 - 1 file changed, 1 deletion(-) diff --git a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java index 5ee6224fd6..31dc8482f1 100644 --- a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java +++ b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java @@ -107,7 +107,6 @@ public void run() { properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Kafka.BOOTSTRAP_SERVERS); setPropertiesFromJsonConfig(properties); - Kafka.PRODUCER_CONFIG.forEach(properties::setProperty); decode(Kafka.PRODUCER_CONFIG).forEach(properties::setProperty); try (AdminClient adminClient = AdminClient.create(properties)) {