Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Release Notes.
* Fix possible IllegalStateException when using Micrometer.
* Support Grizzly Work ThreadPool Metric Monitor
* Fix the gson dependency in the kafka-reporter-plugin.
* Support to config custom decode methods for kafka configurations

#### Documentation

Expand Down
2 changes: 2 additions & 0 deletions apm-sniffer/config/agent.config
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,8 @@ plugin.kafka.topic_management=${SW_PLUGIN_KAFKA_TOPIC_MANAGEMENT:skywalking-mana
plugin.kafka.topic_logging=${SW_PLUGIN_KAFKA_TOPIC_LOGGING:skywalking-logs}
# isolate multi OAP server when using same Kafka cluster (final topic name will append namespace before Kafka topics with `-` ).
plugin.kafka.namespace=${SW_KAFKA_NAMESPACE:}
# Specify which class to decode encoded configuration of kafka.You can set encoded information in `plugin.kafka.producer_config_json` or `plugin.kafka.producer_config` if you need.
plugin.kafka.decode_class=${SW_KAFKA_DECODE_CLASS:}
# Match spring beans with regular expression for the class name. Multiple expressions could be separated by a comma. This only works when `Spring annotation plugin` has been activated.
plugin.springannotation.classname_match_regex=${SW_SPRINGANNOTATION_CLASSNAME_MATCH_REGEX:}
# Whether or not to transmit logged data as formatted or un-formatted.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>optional-reporter-plugins</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>8.16.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>kafka-config-extension</artifactId>
<packaging>jar</packaging>

<url>http://maven.apache.org</url>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.apm.agent.core.kafka;

import java.util.Map;

public interface KafkaConfigExtension {
Map<String, String> decode(Map<String, String> config);
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@
<artifactId>kafka-clients</artifactId>
<version>${kafka-clients.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>kafka-config-extension</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.skywalking.apm.agent.core.kafka;

import com.google.gson.Gson;

import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -108,9 +110,9 @@ public void run() {
if (StringUtil.isNotEmpty(Kafka.PRODUCER_CONFIG_JSON)) {
Gson gson = new Gson();
Map<String, String> config = (Map<String, String>) gson.fromJson(Kafka.PRODUCER_CONFIG_JSON, Map.class);
config.forEach(properties::setProperty);
decode(config).forEach(properties::setProperty);
}
Kafka.PRODUCER_CONFIG.forEach(properties::setProperty);
decode(Kafka.PRODUCER_CONFIG).forEach(properties::setProperty);

try (AdminClient adminClient = AdminClient.create(properties)) {
DescribeTopicsResult topicsResult = adminClient.describeTopics(topics);
Expand Down Expand Up @@ -153,6 +155,22 @@ private void notifyListeners(KafkaConnectionStatus status) {
}
}

private Map<String, String> decode(Map<String, String> config) {
if (StringUtil.isBlank(Kafka.DECODE_CLASS)) {
return config;
}
try {
Object decodeTool = Class.forName(Kafka.DECODE_CLASS).getDeclaredConstructor().newInstance();
if (decodeTool instanceof KafkaConfigExtension) {
return ((KafkaConfigExtension) decodeTool).decode(config);
}
} catch (ClassNotFoundException | NoSuchMethodException | InstantiationException | IllegalAccessException | InvocationTargetException e) {
// ignore
LOGGER.warn("The decode class {} does not exist, exception:{}.", Kafka.DECODE_CLASS, e);
}
return config;
}

/**
* Get the KafkaProducer instance to send data to Kafka broker.
* @return Kafka producer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ public static class Kafka {
* Timeout period of reading topics from the Kafka server, the unit is second.
*/
public static int GET_TOPIC_TIMEOUT = 10;
/**
* Class name of decoding encoded information in kafka configuration.
*/
public static String DECODE_CLASS = "";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,16 @@

package org.apache.skywalking.apm.agent.core.kafka;

import static org.junit.Assert.assertEquals;
import org.junit.Test;

import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;

import static org.junit.Assert.assertEquals;

public class KafkaProducerManagerTest {
@Test
Expand Down Expand Up @@ -54,6 +60,22 @@ public void testFormatTopicNameThenRegister() {
assertEquals(KafkaReporterPluginConfig.Plugin.Kafka.TOPIC_METRICS, value);
}

@Test
public void testDecode() throws Exception {
KafkaReporterPluginConfig.Plugin.Kafka.DECODE_CLASS = "org.apache.skywalking.apm.agent.core.kafka.KafkaProducerManagerTest$DecodeTool";
KafkaProducerManager kafkaProducerManager = new KafkaProducerManager();

Map<String, String> config = new HashMap<>();
String value = "test.99998888";
config.put("test.password", Base64.getEncoder().encodeToString(value.getBytes(StandardCharsets.UTF_8)));

Method decodeMethod = kafkaProducerManager.getClass().getDeclaredMethod("decode", Map.class);
decodeMethod.setAccessible(true);
Map<String, String> decodeConfig = (Map<String, String>) decodeMethod.invoke(kafkaProducerManager, config);

assertEquals(value, decodeConfig.get("test.password"));
}

static class MockListener implements KafkaConnectionStatusListener {

private AtomicInteger counter;
Expand All @@ -68,4 +90,14 @@ public void onStatusChanged(KafkaConnectionStatus status) {
}
}

static class DecodeTool implements KafkaConfigExtension {
@Override
public Map<String, String> decode(Map<String, String> config) {
if (config.containsKey("test.password")) {
config.put("test.password", new String(Base64.getDecoder().decode(config.get("test.password")), StandardCharsets.UTF_8));
}
return config;
}
}

}
1 change: 1 addition & 0 deletions apm-sniffer/optional-reporter-plugins/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

<modules>
<module>kafka-reporter-plugin</module>
<module>kafka-config-extension</module>
</modules>

<properties>
Expand Down
55 changes: 54 additions & 1 deletion docs/en/setup/service-agent/java-agent/advanced-reporters.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,64 @@ plugin.kafka.producer_config_json={"delivery.timeout.ms": 12000, "compression.ty

Currently, there are 2 ways to configure advanced configurations below. Notice that, the new way, configured in JSON format, will be overridden by `plugin.kafka.producer_config[key]=value` when they have the duplication keys.

Since 8.16.0, users could implement their decoder for kafka configurations rather than using plain configurations(such as `password`) of Kafka producer,
Including `plugin.kafka.producer_config_json`,`plugin.kafka.producer_config` or environment variable `SW_PLUGIN_KAFKA_PRODUCER_CONFIG_JSON`.

By doing that, add the `kafka-config-extension` dependency to your decoder project and implement `decode` interface.

- Add the `KafkaConfigExtension` dependency to your project.
```
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>kafka-config-extension</artifactId>
<version>${skywalking.version}</version>
<scope>provided</scope>
</dependency>
```

- Implement your custom decode method.Like this:
```
package org.apache.skywalking.apm.agent.sample;

import org.apache.skywalking.apm.agent.core.kafka.KafkaConfigExtension;
import java.util.Map;

/**
* Custom decode class
*/
public class DecodeUtil implements KafkaConfigExtension {
/**
* Custom decode method.
* @param config the value of `plugin.kafka.producer_config` or `plugin.kafka.producer_config_json` in `agent.config`.
* @return the decoded configuration if you implement your custom decode logic.
*/
public Map<String, String> decode(Map<String, String> config) {
/**
* implement your custom decode logic
* */
return config;
}
}
```

Then, package your decoder project as a jar and move to `agent/plugins`.

**Notice, the jar package should contain all the dependencies required for your custom decode code.**

The last step is to activate the decoder class in `agent.config` like this:
```
plugin.kafka.decrypt_class="org.apache.skywalking.apm.agent.sample.DecodeUtil"
```
or configure by environment variable
```
SW_KAFKA_DECRYPT_CLASS="org.apache.skywalking.apm.agent.sample.DecodeUtil"
```

## 3rd party reporters
There are other reporter implementations from out of the Apache Software Foundation.

### Pulsar Reporter
Go to [Pulsar-reporter-plugin](https://github.com/SkyAPM/transporter-plugin-for-skywalking/blob/main/docs/en/pulsar/Pulsar-Reporter.md) for more details.

### RocketMQ Reporter
Go to [RocketMQ-reporter-plugin](https://github.com/SkyAPM/transporter-plugin-for-skywalking/blob/main/docs/en/rocketmq/Rocketmq-Reporter.md) for more details.
Go to [RocketMQ-reporter-plugin](https://github.com/SkyAPM/transporter-plugin-for-skywalking/blob/main/docs/en/rocketmq/Rocketmq-Reporter.md) for more details.
1 change: 1 addition & 0 deletions docs/en/setup/service-agent/java-agent/configurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ This is the properties list supported in `agent/config/agent.config`.
| `plugin.kafka.topic_management` | Specify which Kafka topic name for the register or heartbeat data of Service Instance to report to. | SW_PLUGIN_KAFKA_TOPIC_MANAGEMENT | `skywalking-managements` |
| `plugin.kafka.topic_logging` | Specify which Kafka topic name for the logging data to report to. | SW_PLUGIN_KAFKA_TOPIC_LOGGING | `skywalking-logging` |
| `plugin.kafka.namespace` | isolate multi OAP server when using same Kafka cluster (final topic name will append namespace before Kafka topics with `-` ). | SW_KAFKA_NAMESPACE | `` |
| `plugin.kafka.decode_class` | Specify which class to decode encoded configuration of kafka.You can set encoded information in `plugin.kafka.producer_config_json` or `plugin.kafka.producer_config` if you need. | SW_KAFKA_DECODE_CLASS | `` |
| `plugin.springannotation.classname_match_regex` | Match spring beans with regular expression for the class name. Multiple expressions could be separated by a comma. This only works when `Spring annotation plugin` has been activated. | SW_SPRINGANNOTATION_CLASSNAME_MATCH_REGEX | `All the spring beans tagged with @Bean,@Service,@Dao, or @Repository.` |
| `plugin.toolkit.log.transmit_formatted` | Whether or not to transmit logged data as formatted or un-formatted. | SW_PLUGIN_TOOLKIT_LOG_TRANSMIT_FORMATTED | `true` |
| `plugin.lettuce.trace_redis_parameters` | If set to true, the parameters of Redis commands would be collected by Lettuce agent. | SW_PLUGIN_LETTUCE_TRACE_REDIS_PARAMETERS | `false` |
Expand Down