Skip to content
Merged
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Release Notes.
* Fix possible IllegalStateException when using Micrometer.
* Support Grizzly Work ThreadPool Metric Monitor
* Fix the gson dependency in the kafka-reporter-plugin.
* Fix deserialization of kafka producer json config in the kafka-reporter-plugin.
* Support to config custom decode methods for kafka configurations

#### Documentation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

package org.apache.skywalking.apm.agent.core.kafka;

import com.google.gson.reflect.TypeToken;
import com.google.gson.Gson;

import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.HashSet;
Expand All @@ -34,7 +34,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.producer.KafkaProducer;
Expand Down Expand Up @@ -107,11 +106,7 @@ public void run() {
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Kafka.BOOTSTRAP_SERVERS);

if (StringUtil.isNotEmpty(Kafka.PRODUCER_CONFIG_JSON)) {
Gson gson = new Gson();
Map<String, String> config = (Map<String, String>) gson.fromJson(Kafka.PRODUCER_CONFIG_JSON, Map.class);
decode(config).forEach(properties::setProperty);
}
setPropertiesFromJsonConfig(properties);
decode(Kafka.PRODUCER_CONFIG).forEach(properties::setProperty);

try (AdminClient adminClient = AdminClient.create(properties)) {
Expand All @@ -131,12 +126,12 @@ public void run() {
})
.filter(Objects::nonNull)
.collect(Collectors.toSet());

if (!topics.isEmpty()) {
LOGGER.warn("kafka topics {} is not exist, connect to kafka cluster abort", topics);
return;
}

try {
producer = new KafkaProducer<>(properties, new StringSerializer(), new BytesSerializer());
} catch (Exception e) {
Expand All @@ -149,6 +144,15 @@ public void run() {
}
}

void setPropertiesFromJsonConfig(Properties properties) {
if (StringUtil.isNotEmpty(Kafka.PRODUCER_CONFIG_JSON)) {
Gson gson = new Gson();
Map<String, String> config = gson.fromJson(Kafka.PRODUCER_CONFIG_JSON,
new TypeToken<Map<String, String>>() { }.getType());
decode(config).forEach(properties::setProperty);
}
}

private void notifyListeners(KafkaConnectionStatus status) {
for (KafkaConnectionStatusListener listener : listeners) {
listener.onStatusChanged(status);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,15 @@

package org.apache.skywalking.apm.agent.core.kafka;

import org.junit.Test;

import static org.junit.Assert.assertEquals;
import java.lang.reflect.Method;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.Assert.assertEquals;

public class KafkaProducerManagerTest {
@Test
Expand All @@ -39,8 +38,8 @@ public void testAddListener() throws Exception {
kafkaProducerManager.addListener(new MockListener(counter));
}
Method notifyListeners = kafkaProducerManager
.getClass()
.getDeclaredMethod("notifyListeners", KafkaConnectionStatus.class);
.getClass()
.getDeclaredMethod("notifyListeners", KafkaConnectionStatus.class);
notifyListeners.setAccessible(true);
notifyListeners.invoke(kafkaProducerManager, KafkaConnectionStatus.CONNECTED);

Expand All @@ -60,6 +59,17 @@ public void testFormatTopicNameThenRegister() {
assertEquals(KafkaReporterPluginConfig.Plugin.Kafka.TOPIC_METRICS, value);
}

@Test
public void testSetPropertiesFromJsonConfig() {
KafkaProducerManager kafkaProducerManager = new KafkaProducerManager();
Properties properties = new Properties();

KafkaReporterPluginConfig.Plugin.Kafka.PRODUCER_CONFIG_JSON = "{\"batch.size\":32768}";
kafkaProducerManager.setPropertiesFromJsonConfig(properties);

assertEquals(properties.get("batch.size"), "32768");
}

Comment on lines +62 to +72
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this UT prove? The codes are not shaded ever in the UT scope, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Show an example of how a JSON configuration can be correctly set to properties.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this UT prove? The codes are not shaded ever in the UT scope, right?

right.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, the point of this PR is TypeToken, which cast the type accordingly.

But still import com.google.common.reflect.TypeToken; seems not from gson package. Why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, the point of this PR is TypeToken, which cast the type accordingly.

But still import com.google.common.reflect.TypeToken; seems not from gson package. Why?

I have used the wrong TypeToken out of two.

@Test
public void testDecode() throws Exception {
KafkaReporterPluginConfig.Plugin.Kafka.DECODE_CLASS = "org.apache.skywalking.apm.agent.core.kafka.KafkaProducerManagerTest$DecodeTool";
Expand Down