From bfa7630ab170abbc1dc8a839cbd9511513b6475a Mon Sep 17 00:00:00 2001
From: guisong <1217316870@qq.com>
Date: Fri, 26 May 2023 00:12:09 +0800
Subject: [PATCH 1/5] Support to config custom decryption methods for kafka
configurations and fix bug `kafka-reporter-plugin-8.15.0 throw exception
"java.lang.NoClassDefFoundError: com/google/gson/Gson"`
---
CHANGES.md | 2 +-
apm-sniffer/config/agent.config | 4 +++
.../kafka-reporter-plugin/pom.xml | 9 +++++
.../core/kafka/KafkaProducerManager.java | 36 ++++++++++---------
.../core/kafka/KafkaReporterPluginConfig.java | 8 +++++
.../core/kafka/KafkaProducerManagerTest.java | 36 +++++++++++++++++--
.../java-agent/configurations.md | 2 ++
7 files changed, 78 insertions(+), 19 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 9392da0b29..685b1a1881 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -17,7 +17,7 @@ Release Notes.
* Change the classloader to locate the agent path in AgentPackagePath, from `SystemClassLoader` to AgentPackagePath's loader.
* Support Grizzly Trace
* Fix possible IllegalStateException when using Micrometer.
-
+* Support to config custom decryption methods for kafka configurations and fix bug `kafka-reporter-plugin-8.15.0 throw exception "java.lang.NoClassDefFoundError: com/google/gson/Gson"`
#### Documentation
diff --git a/apm-sniffer/config/agent.config b/apm-sniffer/config/agent.config
index fa172b7687..b043118fd0 100755
--- a/apm-sniffer/config/agent.config
+++ b/apm-sniffer/config/agent.config
@@ -263,6 +263,10 @@ plugin.kafka.topic_management=${SW_PLUGIN_KAFKA_TOPIC_MANAGEMENT:skywalking-mana
plugin.kafka.topic_logging=${SW_PLUGIN_KAFKA_TOPIC_LOGGING:skywalking-logs}
# isolate multi OAP server when using same Kafka cluster (final topic name will append namespace before Kafka topics with `-` ).
plugin.kafka.namespace=${SW_KAFKA_NAMESPACE:}
+# Specify which class to decrypt encrypted configuration of kafka.You can set encrypted information in `plugin.kafka.producer_config_json` or `plugin.kafka.producer_config` if you need.
+plugin.kafka.decrypt_class=${SW_KAFKA_DECRYPT_CLASS:}
+# Specify which mathed to decrypt encrypted configuration of kafka.
+plugin.kafka.decrypt_method=${SW_KAFKA_DECRYPT_METHOD:decrypt}
# Match spring beans with regular expression for the class name. Multiple expressions could be separated by a comma. This only works when `Spring annotation plugin` has been activated.
plugin.springannotation.classname_match_regex=${SW_SPRINGANNOTATION_CLASSNAME_MATCH_REGEX:}
# Whether or not to transmit logged data as formatted or un-formatted.
diff --git a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/pom.xml b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/pom.xml
index a0f4fd92d8..67839806e4 100644
--- a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/pom.xml
+++ b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/pom.xml
@@ -34,6 +34,11 @@
kafka-clients
${kafka-clients.version}
+
+ com.google.code.gson
+ gson
+ ${gson.version}
+
@@ -76,6 +81,10 @@
org.slf4j
${shade.package}/org.slf4j
+
+ com.google.code.gson
+ ${shade.package}/com.google.code.gson
+
diff --git a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java
index 9bf52f94da..f0add04ef4 100644
--- a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java
+++ b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java
@@ -19,20 +19,6 @@
package org.apache.skywalking.apm.agent.core.kafka;
import com.google.gson.Gson;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.stream.Collectors;
-
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.producer.KafkaProducer;
@@ -52,6 +38,12 @@
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import org.apache.skywalking.apm.util.StringUtil;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.stream.Collectors;
+
/**
* Configuring, initializing and holding a KafkaProducer instance for reporters.
*/
@@ -108,9 +100,9 @@ public void run() {
if (StringUtil.isNotEmpty(Kafka.PRODUCER_CONFIG_JSON)) {
Gson gson = new Gson();
Map config = (Map) gson.fromJson(Kafka.PRODUCER_CONFIG_JSON, Map.class);
- config.forEach(properties::setProperty);
+ decrypt(config).forEach(properties::setProperty);
}
- Kafka.PRODUCER_CONFIG.forEach(properties::setProperty);
+ decrypt(Kafka.PRODUCER_CONFIG).forEach(properties::setProperty);
try (AdminClient adminClient = AdminClient.create(properties)) {
DescribeTopicsResult topicsResult = adminClient.describeTopics(topics);
@@ -153,6 +145,18 @@ private void notifyListeners(KafkaConnectionStatus status) {
}
}
+ private Map decrypt(Map config) {
+ try {
+ Class> decryptClazz = Class.forName(Kafka.DECRYPT_CLASS);
+ Method decryptMethod = decryptClazz.getMethod(Kafka.DECRYPT_METHOD, Map.class);
+ return (Map) decryptMethod.invoke(decryptClazz.newInstance(), config);
+ } catch (ClassNotFoundException | NoSuchMethodException | InstantiationException | IllegalAccessException | InvocationTargetException e) {
+ // ignore
+ LOGGER.warn("The decrypt class {} is not exist, exception:{}.", Kafka.DECRYPT_CLASS, e);
+ return config;
+ }
+ }
+
/**
* Get the KafkaProducer instance to send data to Kafka broker.
* @return Kafka producer
diff --git a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaReporterPluginConfig.java b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaReporterPluginConfig.java
index e3e43d63e5..461ffd4bbe 100644
--- a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaReporterPluginConfig.java
+++ b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaReporterPluginConfig.java
@@ -63,6 +63,14 @@ public static class Kafka {
* Timeout period of reading topics from the Kafka server, the unit is second.
*/
public static int GET_TOPIC_TIMEOUT = 10;
+ /**
+ * Class name of decrypting encrypted information in kafka configuration.
+ * */
+ public static String DECRYPT_CLASS = "";
+ /**
+ * Method name in decrypt class of decrypting encrypted information in kafka configuration.
+ * */
+ public static String DECRYPT_METHOD = "";
}
}
}
diff --git a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/test/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManagerTest.java b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/test/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManagerTest.java
index 20dc5935dd..62c2b30bc9 100644
--- a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/test/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManagerTest.java
+++ b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/test/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManagerTest.java
@@ -18,10 +18,16 @@
package org.apache.skywalking.apm.agent.core.kafka;
-import static org.junit.Assert.assertEquals;
+import org.junit.Test;
+
import java.lang.reflect.Method;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
-import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
public class KafkaProducerManagerTest {
@Test
@@ -54,6 +60,23 @@ public void testFormatTopicNameThenRegister() {
assertEquals(KafkaReporterPluginConfig.Plugin.Kafka.TOPIC_METRICS, value);
}
+ @Test
+ public void testDecrypt() throws Exception {
+ KafkaReporterPluginConfig.Plugin.Kafka.DECRYPT_CLASS = "org.apache.skywalking.apm.agent.core.kafka.KafkaProducerManagerTest$DecryptTool";
+ KafkaReporterPluginConfig.Plugin.Kafka.DECRYPT_METHOD = "decrypt";
+ KafkaProducerManager kafkaProducerManager = new KafkaProducerManager();
+
+ Map config = new HashMap<>();
+ String value = "test.99998888";
+ config.put("test.password", Base64.getEncoder().encodeToString(value.getBytes(StandardCharsets.UTF_8)));
+
+ Method decryptMethod = kafkaProducerManager.getClass().getDeclaredMethod("decrypt", Map.class);
+ decryptMethod.setAccessible(true);
+ Map encryptedConfig = (Map) decryptMethod.invoke(kafkaProducerManager, config);
+
+ assertEquals(value, encryptedConfig.get("test.password"));
+ }
+
static class MockListener implements KafkaConnectionStatusListener {
private AtomicInteger counter;
@@ -68,4 +91,13 @@ public void onStatusChanged(KafkaConnectionStatus status) {
}
}
+ static class DecryptTool {
+ public Map decrypt(Map config) {
+ if (config.containsKey("test.password")) {
+ config.put("test.password", new String(Base64.getDecoder().decode(config.get("test.password")), StandardCharsets.UTF_8));
+ }
+ return config;
+ }
+ }
+
}
diff --git a/docs/en/setup/service-agent/java-agent/configurations.md b/docs/en/setup/service-agent/java-agent/configurations.md
index 2df78b1271..5cd22169f4 100644
--- a/docs/en/setup/service-agent/java-agent/configurations.md
+++ b/docs/en/setup/service-agent/java-agent/configurations.md
@@ -103,6 +103,8 @@ This is the properties list supported in `agent/config/agent.config`.
| `plugin.kafka.topic_management` | Specify which Kafka topic name for the register or heartbeat data of Service Instance to report to. | SW_PLUGIN_KAFKA_TOPIC_MANAGEMENT | `skywalking-managements` |
| `plugin.kafka.topic_logging` | Specify which Kafka topic name for the logging data to report to. | SW_PLUGIN_KAFKA_TOPIC_LOGGING | `skywalking-logging` |
| `plugin.kafka.namespace` | isolate multi OAP server when using same Kafka cluster (final topic name will append namespace before Kafka topics with `-` ). | SW_KAFKA_NAMESPACE | `` |
+| `plugin.kafka.decrypt_class` | Specify which class to decrypt encrypted configuration of kafka.You can set encrypted information in `plugin.kafka.producer_config_json` or `plugin.kafka.producer_config` if you need. | SW_KAFKA_DECRYPT_CLASS | `` |
+| `plugin.kafka.decrypt_method` | Specify which mathed to decrypt encrypted configuration of kafka. | SW_KAFKA_DECRYPT_METHOD | `decrypt` |
| `plugin.springannotation.classname_match_regex` | Match spring beans with regular expression for the class name. Multiple expressions could be separated by a comma. This only works when `Spring annotation plugin` has been activated. | SW_SPRINGANNOTATION_CLASSNAME_MATCH_REGEX | `All the spring beans tagged with @Bean,@Service,@Dao, or @Repository.` |
| `plugin.toolkit.log.transmit_formatted` | Whether or not to transmit logged data as formatted or un-formatted. | SW_PLUGIN_TOOLKIT_LOG_TRANSMIT_FORMATTED | `true` |
| `plugin.lettuce.trace_redis_parameters` | If set to true, the parameters of Redis commands would be collected by Lettuce agent. | SW_PLUGIN_LETTUCE_TRACE_REDIS_PARAMETERS | `false` |
From 228c7c0c534f6be1d77ef12c3dbbb036ac73a58a Mon Sep 17 00:00:00 2001
From: guisong <1217316870@qq.com>
Date: Wed, 31 May 2023 00:00:46 +0800
Subject: [PATCH 2/5] [Feature] Support to config custom decryption methods for
kafka configurations
---
CHANGES.md | 2 +-
apm-sniffer/config/agent.config | 6 ++--
.../kafka-config-extension/pom.xml | 32 +++++++++++++++++
.../core/kafka/KafkaConfigExtension.java | 25 +++++++++++++
.../kafka-reporter-plugin/pom.xml | 5 +++
.../core/kafka/KafkaProducerManager.java | 21 ++++++-----
.../core/kafka/KafkaReporterPluginConfig.java | 10 ++----
.../core/kafka/KafkaProducerManagerTest.java | 18 +++++-----
apm-sniffer/optional-reporter-plugins/pom.xml | 1 +
.../java-agent/advanced-reporters.md | 36 +++++++++++--------
10 files changed, 112 insertions(+), 44 deletions(-)
create mode 100644 apm-sniffer/optional-reporter-plugins/kafka-config-extension/pom.xml
create mode 100644 apm-sniffer/optional-reporter-plugins/kafka-config-extension/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaConfigExtension.java
diff --git a/CHANGES.md b/CHANGES.md
index 932fc231af..b517839ee3 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -19,7 +19,7 @@ Release Notes.
* Fix possible IllegalStateException when using Micrometer.
* Support Grizzly Work ThreadPool Metric Monitor
* Fix the gson dependency in the kafka-reporter-plugin.
-* Support to config custom decryption methods for kafka configurations
+* Support to config custom decode methods for kafka configurations
#### Documentation
diff --git a/apm-sniffer/config/agent.config b/apm-sniffer/config/agent.config
index b043118fd0..af09ad6892 100755
--- a/apm-sniffer/config/agent.config
+++ b/apm-sniffer/config/agent.config
@@ -263,10 +263,8 @@ plugin.kafka.topic_management=${SW_PLUGIN_KAFKA_TOPIC_MANAGEMENT:skywalking-mana
plugin.kafka.topic_logging=${SW_PLUGIN_KAFKA_TOPIC_LOGGING:skywalking-logs}
# isolate multi OAP server when using same Kafka cluster (final topic name will append namespace before Kafka topics with `-` ).
plugin.kafka.namespace=${SW_KAFKA_NAMESPACE:}
-# Specify which class to decrypt encrypted configuration of kafka.You can set encrypted information in `plugin.kafka.producer_config_json` or `plugin.kafka.producer_config` if you need.
-plugin.kafka.decrypt_class=${SW_KAFKA_DECRYPT_CLASS:}
-# Specify which mathed to decrypt encrypted configuration of kafka.
-plugin.kafka.decrypt_method=${SW_KAFKA_DECRYPT_METHOD:decrypt}
+# Specify which class to decode encoded configuration of kafka.You can set encrypted information in `plugin.kafka.producer_config_json` or `plugin.kafka.producer_config` if you need.
+plugin.kafka.decode_class=${SW_KAFKA_DECODE_CLASS:}
# Match spring beans with regular expression for the class name. Multiple expressions could be separated by a comma. This only works when `Spring annotation plugin` has been activated.
plugin.springannotation.classname_match_regex=${SW_SPRINGANNOTATION_CLASSNAME_MATCH_REGEX:}
# Whether or not to transmit logged data as formatted or un-formatted.
diff --git a/apm-sniffer/optional-reporter-plugins/kafka-config-extension/pom.xml b/apm-sniffer/optional-reporter-plugins/kafka-config-extension/pom.xml
new file mode 100644
index 0000000000..a048ed21ce
--- /dev/null
+++ b/apm-sniffer/optional-reporter-plugins/kafka-config-extension/pom.xml
@@ -0,0 +1,32 @@
+
+
+
+
+ optional-reporter-plugins
+ org.apache.skywalking
+ 8.16.0-SNAPSHOT
+
+ 4.0.0
+
+ kafka-config-extension
+ jar
+
+ http://maven.apache.org
+
\ No newline at end of file
diff --git a/apm-sniffer/optional-reporter-plugins/kafka-config-extension/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaConfigExtension.java b/apm-sniffer/optional-reporter-plugins/kafka-config-extension/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaConfigExtension.java
new file mode 100644
index 0000000000..5ed0b9a3e5
--- /dev/null
+++ b/apm-sniffer/optional-reporter-plugins/kafka-config-extension/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaConfigExtension.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.agent.core.kafka;
+
+import java.util.Map;
+
+public interface KafkaConfigExtension {
+ Map decode(Map config);
+}
diff --git a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/pom.xml b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/pom.xml
index ec7d4cb272..d18f20393c 100644
--- a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/pom.xml
+++ b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/pom.xml
@@ -34,6 +34,11 @@
kafka-clients
${kafka-clients.version}
+
+ org.apache.skywalking
+ kafka-config-extension
+ ${project.version}
+
diff --git a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java
index 92aa40512e..784863479b 100644
--- a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java
+++ b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java
@@ -21,7 +21,6 @@
import com.google.gson.Gson;
import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@@ -111,9 +110,9 @@ public void run() {
if (StringUtil.isNotEmpty(Kafka.PRODUCER_CONFIG_JSON)) {
Gson gson = new Gson();
Map config = (Map) gson.fromJson(Kafka.PRODUCER_CONFIG_JSON, Map.class);
- decrypt(config).forEach(properties::setProperty);
+ decode(config).forEach(properties::setProperty);
}
- decrypt(Kafka.PRODUCER_CONFIG).forEach(properties::setProperty);
+ decode(Kafka.PRODUCER_CONFIG).forEach(properties::setProperty);
try (AdminClient adminClient = AdminClient.create(properties)) {
DescribeTopicsResult topicsResult = adminClient.describeTopics(topics);
@@ -156,16 +155,20 @@ private void notifyListeners(KafkaConnectionStatus status) {
}
}
- private Map decrypt(Map config) {
+ private Map decode(Map config) {
+ if (StringUtil.isBlank(Kafka.DECODE_CLASS)) {
+ return config;
+ }
try {
- Class> decryptClazz = Class.forName(Kafka.DECRYPT_CLASS);
- Method decryptMethod = decryptClazz.getMethod(Kafka.DECRYPT_METHOD, Map.class);
- return (Map) decryptMethod.invoke(decryptClazz.newInstance(), config);
+ Object decodeTool = Class.forName(Kafka.DECODE_CLASS).getDeclaredConstructor().newInstance();
+ if (decodeTool instanceof KafkaConfigExtension) {
+ return ((KafkaConfigExtension) decodeTool).decode(config);
+ }
} catch (ClassNotFoundException | NoSuchMethodException | InstantiationException | IllegalAccessException | InvocationTargetException e) {
// ignore
- LOGGER.warn("The decrypt class {} is not exist, exception:{}.", Kafka.DECRYPT_CLASS, e);
- return config;
+ LOGGER.warn("The decode class {} is not exist, exception:{}.", Kafka.DECODE_CLASS, e);
}
+ return config;
}
/**
diff --git a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaReporterPluginConfig.java b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaReporterPluginConfig.java
index 461ffd4bbe..23a0bb91f8 100644
--- a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaReporterPluginConfig.java
+++ b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaReporterPluginConfig.java
@@ -64,13 +64,9 @@ public static class Kafka {
*/
public static int GET_TOPIC_TIMEOUT = 10;
/**
- * Class name of decrypting encrypted information in kafka configuration.
- * */
- public static String DECRYPT_CLASS = "";
- /**
- * Method name in decrypt class of decrypting encrypted information in kafka configuration.
- * */
- public static String DECRYPT_METHOD = "";
+ * Class name of decoding encoded information in kafka configuration.
+ */
+ public static String DECODE_CLASS = "";
}
}
}
diff --git a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/test/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManagerTest.java b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/test/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManagerTest.java
index 62c2b30bc9..4317fb03cc 100644
--- a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/test/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManagerTest.java
+++ b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/test/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManagerTest.java
@@ -61,20 +61,19 @@ public void testFormatTopicNameThenRegister() {
}
@Test
- public void testDecrypt() throws Exception {
- KafkaReporterPluginConfig.Plugin.Kafka.DECRYPT_CLASS = "org.apache.skywalking.apm.agent.core.kafka.KafkaProducerManagerTest$DecryptTool";
- KafkaReporterPluginConfig.Plugin.Kafka.DECRYPT_METHOD = "decrypt";
+ public void testDecode() throws Exception {
+ KafkaReporterPluginConfig.Plugin.Kafka.DECODE_CLASS = "org.apache.skywalking.apm.agent.core.kafka.KafkaProducerManagerTest$DecodeTool";
KafkaProducerManager kafkaProducerManager = new KafkaProducerManager();
Map config = new HashMap<>();
String value = "test.99998888";
config.put("test.password", Base64.getEncoder().encodeToString(value.getBytes(StandardCharsets.UTF_8)));
- Method decryptMethod = kafkaProducerManager.getClass().getDeclaredMethod("decrypt", Map.class);
- decryptMethod.setAccessible(true);
- Map encryptedConfig = (Map) decryptMethod.invoke(kafkaProducerManager, config);
+ Method decodeMethod = kafkaProducerManager.getClass().getDeclaredMethod("decode", Map.class);
+ decodeMethod.setAccessible(true);
+ Map decodeConfig = (Map) decodeMethod.invoke(kafkaProducerManager, config);
- assertEquals(value, encryptedConfig.get("test.password"));
+ assertEquals(value, decodeConfig.get("test.password"));
}
static class MockListener implements KafkaConnectionStatusListener {
@@ -91,8 +90,9 @@ public void onStatusChanged(KafkaConnectionStatus status) {
}
}
- static class DecryptTool {
- public Map decrypt(Map config) {
+ static class DecodeTool implements KafkaConfigExtension {
+ @Override
+ public Map decode(Map config) {
if (config.containsKey("test.password")) {
config.put("test.password", new String(Base64.getDecoder().decode(config.get("test.password")), StandardCharsets.UTF_8));
}
diff --git a/apm-sniffer/optional-reporter-plugins/pom.xml b/apm-sniffer/optional-reporter-plugins/pom.xml
index f21f0beb4d..57b2e67b8f 100644
--- a/apm-sniffer/optional-reporter-plugins/pom.xml
+++ b/apm-sniffer/optional-reporter-plugins/pom.xml
@@ -30,6 +30,7 @@
kafka-reporter-plugin
+ kafka-config-extension
diff --git a/docs/en/setup/service-agent/java-agent/advanced-reporters.md b/docs/en/setup/service-agent/java-agent/advanced-reporters.md
index 11e0ce8635..27699f898c 100644
--- a/docs/en/setup/service-agent/java-agent/advanced-reporters.md
+++ b/docs/en/setup/service-agent/java-agent/advanced-reporters.md
@@ -33,42 +33,50 @@ plugin.kafka.producer_config_json={"delivery.timeout.ms": 12000, "compression.ty
Currently, there are 2 ways to configure advanced configurations below. Notice that, the new way, configured in JSON format, will be overridden by `plugin.kafka.producer_config[key]=value` when they have the duplication keys.
-Since Skywaling-java Agent 8.16.0, support to configure custom decryption methods for kafka configurations avoid to set plain password in `plugin.kafka.producer_config_json`,`plugin.kafka.producer_config` or environment variable `SW_PLUGIN_KAFKA_PRODUCER_CONFIG_JSON`.
+Since Skywaling-java Agent 8.16.0, support to implement custom decode method for kafka configurations avoid to set plain password in `plugin.kafka.producer_config_json`,`plugin.kafka.producer_config` or environment variable `SW_PLUGIN_KAFKA_PRODUCER_CONFIG_JSON`.
-First, you have to implement a custom decryption Class with a decryption method.For example:
+First, you have to add a dependency `KafkaConfigExtension` and implement `decode` interface.For example:
+Add the `KafkaConfigExtension` dependency to your project.
+```
+
+ org.apache.skywalking
+ kafka-config-extension
+ ${skywalking.version}
+
+```
+Implement your custom decode method.Like this:
```
package org.apache.skywalking.apm.agent.sample;
+import org.apache.skywalking.apm.agent.core.kafka.KafkaConfigExtension;
import java.util.Map;
/**
- * Custom decryption class
+ * Custom decode class
*/
-public class DecryptUtil {
+public class DecodeUtil implements KafkaConfigExtension {
/**
- * Custom decryption method.
+ * Custom decode method.
* @param config the value of `plugin.kafka.producer_config` or `plugin.kafka.producer_config_json` in `agent.config`.
- * @return the decrypted configuration if you implement your custom decryption code.
+ * @return the decoded configuration if you implement your custom decode logic.
*/
- public Map decrypt(Map config) {
+ public Map decode(Map config) {
/**
- * implement your custom decryption code
+ * implement your custom decode logic
* */
return config;
}
}
```
-Second,you need to package your code into a jar and move to `agent/plugins` for decrypting.Notice,your jar package should contain all the dependencies required for your custom decryption code.
+Second,you need to package your code into a jar and move to `agent/plugins` for decoding.Notice,your jar package should contain all the dependencies required for your custom decode code.
-Third,configure decryption class and method in `agent.config`,like this:
+Third,configure decode class in `agent.config`,like this:
```
-plugin.kafka.decrypt_class="org.apache.skywalking.apm.agent.sample.DecryptUtil"
-plugin.kafka.decrypt_method="decrypt"
+plugin.kafka.decrypt_class="org.apache.skywalking.apm.agent.sample.DecodeUtil"
```
or configure by environment variable
```
-SW_KAFKA_DECRYPT_CLASS="org.apache.skywalking.apm.agent.sample.DecryptUtil"
-SW_KAFKA_DECRYPT_METHOD="decrypt"
+SW_KAFKA_DECRYPT_CLASS="org.apache.skywalking.apm.agent.sample.DecodeUtil"
```
## 3rd party reporters
There are other reporter implementations from out of the Apache Software Foundation.
From 11e813c4f0f4d1fc5916ff536dc649321e281f56 Mon Sep 17 00:00:00 2001
From: guisong <1217316870@qq.com>
Date: Wed, 31 May 2023 00:13:53 +0800
Subject: [PATCH 3/5] [Feature] Support to config custom decryption methods for
kafka configurations
---
apm-sniffer/config/agent.config | 2 +-
docs/en/setup/service-agent/java-agent/configurations.md | 3 +--
2 files changed, 2 insertions(+), 3 deletions(-)
diff --git a/apm-sniffer/config/agent.config b/apm-sniffer/config/agent.config
index af09ad6892..14bf708b83 100755
--- a/apm-sniffer/config/agent.config
+++ b/apm-sniffer/config/agent.config
@@ -263,7 +263,7 @@ plugin.kafka.topic_management=${SW_PLUGIN_KAFKA_TOPIC_MANAGEMENT:skywalking-mana
plugin.kafka.topic_logging=${SW_PLUGIN_KAFKA_TOPIC_LOGGING:skywalking-logs}
# isolate multi OAP server when using same Kafka cluster (final topic name will append namespace before Kafka topics with `-` ).
plugin.kafka.namespace=${SW_KAFKA_NAMESPACE:}
-# Specify which class to decode encoded configuration of kafka.You can set encrypted information in `plugin.kafka.producer_config_json` or `plugin.kafka.producer_config` if you need.
+# Specify which class to decode encoded configuration of kafka.You can set encoded information in `plugin.kafka.producer_config_json` or `plugin.kafka.producer_config` if you need.
plugin.kafka.decode_class=${SW_KAFKA_DECODE_CLASS:}
# Match spring beans with regular expression for the class name. Multiple expressions could be separated by a comma. This only works when `Spring annotation plugin` has been activated.
plugin.springannotation.classname_match_regex=${SW_SPRINGANNOTATION_CLASSNAME_MATCH_REGEX:}
diff --git a/docs/en/setup/service-agent/java-agent/configurations.md b/docs/en/setup/service-agent/java-agent/configurations.md
index 5cd22169f4..b55dc4450f 100644
--- a/docs/en/setup/service-agent/java-agent/configurations.md
+++ b/docs/en/setup/service-agent/java-agent/configurations.md
@@ -103,8 +103,7 @@ This is the properties list supported in `agent/config/agent.config`.
| `plugin.kafka.topic_management` | Specify which Kafka topic name for the register or heartbeat data of Service Instance to report to. | SW_PLUGIN_KAFKA_TOPIC_MANAGEMENT | `skywalking-managements` |
| `plugin.kafka.topic_logging` | Specify which Kafka topic name for the logging data to report to. | SW_PLUGIN_KAFKA_TOPIC_LOGGING | `skywalking-logging` |
| `plugin.kafka.namespace` | isolate multi OAP server when using same Kafka cluster (final topic name will append namespace before Kafka topics with `-` ). | SW_KAFKA_NAMESPACE | `` |
-| `plugin.kafka.decrypt_class` | Specify which class to decrypt encrypted configuration of kafka.You can set encrypted information in `plugin.kafka.producer_config_json` or `plugin.kafka.producer_config` if you need. | SW_KAFKA_DECRYPT_CLASS | `` |
-| `plugin.kafka.decrypt_method` | Specify which mathed to decrypt encrypted configuration of kafka. | SW_KAFKA_DECRYPT_METHOD | `decrypt` |
+| `plugin.kafka.decode_class` | Specify which class to decode encoded configuration of kafka.You can set encoded information in `plugin.kafka.producer_config_json` or `plugin.kafka.producer_config` if you need. | SW_KAFKA_DECODE_CLASS | `` |
| `plugin.springannotation.classname_match_regex` | Match spring beans with regular expression for the class name. Multiple expressions could be separated by a comma. This only works when `Spring annotation plugin` has been activated. | SW_SPRINGANNOTATION_CLASSNAME_MATCH_REGEX | `All the spring beans tagged with @Bean,@Service,@Dao, or @Repository.` |
| `plugin.toolkit.log.transmit_formatted` | Whether or not to transmit logged data as formatted or un-formatted. | SW_PLUGIN_TOOLKIT_LOG_TRANSMIT_FORMATTED | `true` |
| `plugin.lettuce.trace_redis_parameters` | If set to true, the parameters of Redis commands would be collected by Lettuce agent. | SW_PLUGIN_LETTUCE_TRACE_REDIS_PARAMETERS | `false` |
From 37b510b8db1122c0d1715142e91c9912146dcdc6 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=90=B4=E6=99=9F=20Wu=20Sheng?=
Date: Wed, 31 May 2023 17:16:07 +0800
Subject: [PATCH 4/5] Update
apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java
---
.../skywalking/apm/agent/core/kafka/KafkaProducerManager.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java
index 784863479b..5bb66e45ca 100644
--- a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java
+++ b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java
@@ -166,7 +166,7 @@ private Map decode(Map config) {
}
} catch (ClassNotFoundException | NoSuchMethodException | InstantiationException | IllegalAccessException | InvocationTargetException e) {
// ignore
- LOGGER.warn("The decode class {} is not exist, exception:{}.", Kafka.DECODE_CLASS, e);
+ LOGGER.warn("The decode class {} does not exist, exception:{}.", Kafka.DECODE_CLASS, e);
}
return config;
}
From 14063f6387c859a1c0c3b481f11cb62f2721d8c0 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=90=B4=E6=99=9F=20Wu=20Sheng?=
Date: Wed, 31 May 2023 17:22:52 +0800
Subject: [PATCH 5/5] Update advanced-reporters.md
---
.../java-agent/advanced-reporters.md | 22 +++++++++++++------
1 file changed, 15 insertions(+), 7 deletions(-)
diff --git a/docs/en/setup/service-agent/java-agent/advanced-reporters.md b/docs/en/setup/service-agent/java-agent/advanced-reporters.md
index 27699f898c..44264d4e9f 100644
--- a/docs/en/setup/service-agent/java-agent/advanced-reporters.md
+++ b/docs/en/setup/service-agent/java-agent/advanced-reporters.md
@@ -33,18 +33,22 @@ plugin.kafka.producer_config_json={"delivery.timeout.ms": 12000, "compression.ty
Currently, there are 2 ways to configure advanced configurations below. Notice that, the new way, configured in JSON format, will be overridden by `plugin.kafka.producer_config[key]=value` when they have the duplication keys.
-Since Skywaling-java Agent 8.16.0, support to implement custom decode method for kafka configurations avoid to set plain password in `plugin.kafka.producer_config_json`,`plugin.kafka.producer_config` or environment variable `SW_PLUGIN_KAFKA_PRODUCER_CONFIG_JSON`.
+Since 8.16.0, users could implement their decoder for kafka configurations rather than using plain configurations(such as `password`) of Kafka producer,
+Including `plugin.kafka.producer_config_json`,`plugin.kafka.producer_config` or environment variable `SW_PLUGIN_KAFKA_PRODUCER_CONFIG_JSON`.
-First, you have to add a dependency `KafkaConfigExtension` and implement `decode` interface.For example:
-Add the `KafkaConfigExtension` dependency to your project.
+By doing that, add the `kafka-config-extension` dependency to your decoder project and implement `decode` interface.
+
+- Add the `KafkaConfigExtension` dependency to your project.
```
org.apache.skywalking
kafka-config-extension
${skywalking.version}
+ provided
```
-Implement your custom decode method.Like this:
+
+- Implement your custom decode method.Like this:
```
package org.apache.skywalking.apm.agent.sample;
@@ -68,9 +72,12 @@ public class DecodeUtil implements KafkaConfigExtension {
}
}
```
-Second,you need to package your code into a jar and move to `agent/plugins` for decoding.Notice,your jar package should contain all the dependencies required for your custom decode code.
-Third,configure decode class in `agent.config`,like this:
+Then, package your decoder project as a jar and move to `agent/plugins`.
+
+**Notice, the jar package should contain all the dependencies required for your custom decode code.**
+
+The last step is to activate the decoder class in `agent.config` like this:
```
plugin.kafka.decrypt_class="org.apache.skywalking.apm.agent.sample.DecodeUtil"
```
@@ -78,6 +85,7 @@ or configure by environment variable
```
SW_KAFKA_DECRYPT_CLASS="org.apache.skywalking.apm.agent.sample.DecodeUtil"
```
+
## 3rd party reporters
There are other reporter implementations from out of the Apache Software Foundation.
@@ -85,4 +93,4 @@ There are other reporter implementations from out of the Apache Software Foundat
Go to [Pulsar-reporter-plugin](https://github.com/SkyAPM/transporter-plugin-for-skywalking/blob/main/docs/en/pulsar/Pulsar-Reporter.md) for more details.
### RocketMQ Reporter
-Go to [RocketMQ-reporter-plugin](https://github.com/SkyAPM/transporter-plugin-for-skywalking/blob/main/docs/en/rocketmq/Rocketmq-Reporter.md) for more details.
\ No newline at end of file
+Go to [RocketMQ-reporter-plugin](https://github.com/SkyAPM/transporter-plugin-for-skywalking/blob/main/docs/en/rocketmq/Rocketmq-Reporter.md) for more details.