Skip to content

Commit

Permalink
MINOR: Move parseCsvList to server-common (apache#16029)
Browse files Browse the repository at this point in the history
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
  • Loading branch information
mimaison committed May 23, 2024
1 parent 14b5c4d commit ab0cc72
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 38 deletions.
8 changes: 5 additions & 3 deletions core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,21 @@

package kafka.metrics

import kafka.utils.{CoreUtils, VerifiableProperties}
import kafka.utils.VerifiableProperties
import org.apache.kafka.server.metrics.MetricConfigs
import org.apache.kafka.server.util.Csv

import scala.collection.Seq
import scala.jdk.CollectionConverters._

class KafkaMetricsConfig(props: VerifiableProperties) {

/**
* Comma-separated list of reporter types. These classes should be on the
* classpath and will be instantiated at run-time.
*/
val reporters: Seq[String] = CoreUtils.parseCsvList(props.getString(MetricConfigs.KAFKA_METRICS_REPORTER_CLASSES_CONFIG,
MetricConfigs.KAFKA_METRIC_REPORTER_CLASSES_DEFAULT))
val reporters: Seq[String] = Csv.parseCsvList(props.getString(MetricConfigs.KAFKA_METRICS_REPORTER_CLASSES_CONFIG,
MetricConfigs.KAFKA_METRIC_REPORTER_CLASSES_DEFAULT)).asScala

/**
* The metrics polling interval (in seconds).
Expand Down
5 changes: 2 additions & 3 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.{lang, util}
import java.util.concurrent.TimeUnit
import java.util.{Collections, Properties}
import kafka.cluster.EndPoint
import kafka.utils.CoreUtils.parseCsvList
import kafka.utils.{CoreUtils, Logging}
import kafka.utils.Implicits._
import org.apache.kafka.clients.CommonClientConfigs
Expand Down Expand Up @@ -895,7 +894,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
/** ********* Log Configuration ***********/
val autoCreateTopicsEnable = getBoolean(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG)
val numPartitions = getInt(ServerLogConfigs.NUM_PARTITIONS_CONFIG)
val logDirs = CoreUtils.parseCsvList(Option(getString(ServerLogConfigs.LOG_DIRS_CONFIG)).getOrElse(getString(ServerLogConfigs.LOG_DIR_CONFIG)))
val logDirs: Seq[String] = Csv.parseCsvList(Option(getString(ServerLogConfigs.LOG_DIRS_CONFIG)).getOrElse(getString(ServerLogConfigs.LOG_DIR_CONFIG))).asScala
def logSegmentBytes = getInt(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG)
def logFlushIntervalMessages = getLong(ServerLogConfigs.LOG_FLUSH_INTERVAL_MESSAGES_CONFIG)
val logCleanerThreads = getInt(CleanerConfig.LOG_CLEANER_THREADS_PROP)
Expand Down Expand Up @@ -1292,7 +1291,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
// check controller listener names (they won't appear in listeners when process.roles=broker)
// as well as listeners for occurrences of SSL or SASL_*
if (controllerListenerNames.exists(isSslOrSasl) ||
parseCsvList(getString(SocketServerConfigs.LISTENERS_CONFIG)).exists(listenerValue => isSslOrSasl(EndPoint.parseListenerName(listenerValue)))) {
Csv.parseCsvList(getString(SocketServerConfigs.LISTENERS_CONFIG)).asScala.exists(listenerValue => isSslOrSasl(EndPoint.parseListenerName(listenerValue)))) {
mapValue // don't add default mappings since we found something that is SSL or SASL_*
} else {
// add the PLAINTEXT mappings for all controller listener names that are not explicitly PLAINTEXT
Expand Down
16 changes: 3 additions & 13 deletions core/src/main/scala/kafka/utils/CoreUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.commons.validator.routines.InetAddressValidator
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.util.Csv
import org.slf4j.event.Level

import java.util
Expand Down Expand Up @@ -109,17 +110,6 @@ object CoreUtils {
}
}

/**
* Parse a comma separated string into a sequence of strings.
* Whitespace surrounding the comma will be removed.
*/
def parseCsvList(csvList: String): Seq[String] = {
if (csvList == null || csvList.isEmpty)
Seq.empty[String]
else
csvList.split("\\s*,\\s*").filter(v => !v.equals(""))
}

/**
* Create an instance of the class with the given class name
*/
Expand Down Expand Up @@ -219,8 +209,8 @@ object CoreUtils {
}

val endPoints = try {
val listenerList = parseCsvList(listeners)
listenerList.map(EndPoint.createEndPoint(_, Some(securityProtocolMap)))
val listenerList = Csv.parseCsvList(listeners)
listenerList.asScala.map(EndPoint.createEndPoint(_, Some(securityProtocolMap)))
} catch {
case e: Exception =>
throw new IllegalArgumentException(s"Error creating broker listeners from '$listeners': ${e.getMessage}", e)
Expand Down
13 changes: 0 additions & 13 deletions core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,19 +89,6 @@ class CoreUtilsTest extends Logging {
assertEquals(Integer.MAX_VALUE, Utils.abs(Integer.MAX_VALUE))
}

@Test
def testCsvList(): Unit = {
val emptyString:String = ""
val nullString:String = null
val emptyList = CoreUtils.parseCsvList(emptyString)
val emptyListFromNullString = CoreUtils.parseCsvList(nullString)
val emptyStringList = Seq.empty[String]
assertTrue(emptyList!=null)
assertTrue(emptyListFromNullString!=null)
assertTrue(emptyStringList.equals(emptyListFromNullString))
assertTrue(emptyStringList.equals(emptyList))
}

@Test
def testInLock(): Unit = {
val lock = new ReentrantLock()
Expand Down
16 changes: 16 additions & 0 deletions server-common/src/main/java/org/apache/kafka/server/util/Csv.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@
*/
package org.apache.kafka.server.util;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class Csv {

Expand All @@ -38,4 +42,16 @@ public static Map<String, String> parseCsvMap(String str) {
}
return map;
}

/**
* Parse a comma separated string into a sequence of strings.
* Whitespace surrounding the comma will be removed.
*/
public static List<String> parseCsvList(String csvList) {
if (csvList == null || csvList.isEmpty()) {
return Collections.emptyList();
} else {
return Stream.of(csvList.split("\\s*,\\s*")).filter(v -> !v.isEmpty()).collect(Collectors.toList());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,19 @@

import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

public class CsvTest {

@Test
public void testCsvMap() {
String emptyString = "";
Map<String, String> emptyMap = Csv.parseCsvMap(emptyString);
Map<String, String> emptyStringMap = Collections.emptyMap();
assertNotNull(emptyMap);
assertEquals(emptyStringMap, emptyStringMap);
Map<String, String> emptyMap = Csv.parseCsvMap("");
assertEquals(Collections.emptyMap(), emptyMap);

String kvPairsIpV6 = "a:b:c:v,a:b:c:v";
Map<String, String> ipv6Map = Csv.parseCsvMap(kvPairsIpV6);
Expand Down Expand Up @@ -60,4 +58,16 @@ public void testCsvMap() {
assertEquals("value", entry.getValue());
}
}

@Test
public void testCsvList() {
List<String> emptyList = Csv.parseCsvList("");
assertEquals(Collections.emptyList(), emptyList);

List<String> emptyListFromNullString = Csv.parseCsvList(null);
assertEquals(Collections.emptyList(), emptyListFromNullString);

List<String> csvList = Csv.parseCsvList("a,b ,c, d,,e,");
assertEquals(Arrays.asList("a", "b", "c", "d", "e"), csvList);
}
}

0 comments on commit ab0cc72

Please sign in to comment.