Skip to content

Commit 7527a8b

Browse files
authored
MINOR: Cleanup Connect Module (4/n) (#20389)
Now that Kafka support Java 17, this PR makes some changes in connect module. The changes in this PR are limited to only some files. A future PR(s) shall follow. The changes mostly include: - Collections.emptyList(), Collections.singletonList() and Arrays.asList() are replaced with List.of() - Collections.emptyMap() and Collections.singletonMap() are replaced with Map.of() - Collections.singleton() is replaced with Set.of() Modules target: runtime/src/main Reviewers: Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
1 parent 92fe00e commit 7527a8b

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+178
-228
lines changed

connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434

3535
import java.net.URI;
3636
import java.util.Arrays;
37-
import java.util.Collections;
37+
import java.util.List;
3838
import java.util.Map;
3939

4040
/**
@@ -82,15 +82,15 @@ protected abstract H createHerder(T config, String workerId, Plugins plugins,
8282
* Validate {@link #args}, process worker properties from the first CLI argument, and start {@link Connect}
8383
*/
8484
public void run() {
85-
if (args.length < 1 || Arrays.asList(args).contains("--help")) {
85+
if (args.length < 1 || List.of(args).contains("--help")) {
8686
log.info("Usage: {}", usage());
8787
Exit.exit(1);
8888
}
8989

9090
try {
9191
String workerPropsFile = args[0];
9292
Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
93-
Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.emptyMap();
93+
Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Map.of();
9494
String[] extraArgs = Arrays.copyOfRange(args, 1, args.length);
9595
Connect<H> connect = startConnect(workerProps);
9696
processExtraArgs(connect, extraArgs);

connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@
3636
import org.apache.kafka.connect.util.ConnectUtils;
3737
import org.apache.kafka.connect.util.SharedTopicAdmin;
3838

39-
import java.util.Collections;
4039
import java.util.HashMap;
40+
import java.util.List;
4141
import java.util.Map;
4242

4343
import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG;
@@ -77,7 +77,7 @@ protected DistributedHerder createHerder(DistributedConfig config, String worker
7777

7878
KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(sharedAdmin, () -> clientIdBase,
7979
plugins.newInternalConverter(true, JsonConverter.class.getName(),
80-
Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false")));
80+
Map.of(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false")));
8181
offsetBackingStore.configure(config);
8282

8383
Worker worker = new Worker(workerId, Time.SYSTEM, plugins, config, offsetBackingStore, connectorClientConfigOverridePolicy);
@@ -99,7 +99,7 @@ protected DistributedHerder createHerder(DistributedConfig config, String worker
9999
return new DistributedHerder(config, Time.SYSTEM, worker,
100100
kafkaClusterId, statusBackingStore, configBackingStore,
101101
restServer.advertisedUrl().toString(), restClient, connectorClientConfigOverridePolicy,
102-
Collections.emptyList(), sharedAdmin);
102+
List.of(), sharedAdmin);
103103
}
104104

105105
@Override

connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@
4949
import java.io.File;
5050
import java.io.IOException;
5151
import java.nio.file.Paths;
52-
import java.util.Collections;
5352
import java.util.Map;
5453

5554
import static org.apache.kafka.connect.runtime.ConnectorConfig.NAME_CONFIG;
@@ -163,7 +162,7 @@ protected StandaloneHerder createHerder(StandaloneConfig config, String workerId
163162
RestServer restServer, RestClient restClient) {
164163

165164
OffsetBackingStore offsetBackingStore = new FileOffsetBackingStore(plugins.newInternalConverter(
166-
true, JsonConverter.class.getName(), Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false")));
165+
true, JsonConverter.class.getName(), Map.of(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false")));
167166
offsetBackingStore.configure(config);
168167

169168
Worker worker = new Worker(workerId, Time.SYSTEM, plugins, config, offsetBackingStore,

connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -555,7 +555,7 @@ private ConfigInfos validateAllConverterConfigs(
555555
"header converter",
556556
HEADER_CONVERTER_CLASS_CONFIG,
557557
HEADER_CONVERTER_VERSION_CONFIG,
558-
Collections.singletonMap(ConverterConfig.TYPE_CONFIG, ConverterType.HEADER.getName()),
558+
Map.of(ConverterConfig.TYPE_CONFIG, ConverterType.HEADER.getName()),
559559
connectorLoader,
560560
reportStage
561561
);
@@ -568,7 +568,7 @@ private ConfigInfos validateAllConverterConfigs(
568568
"key converter",
569569
KEY_CONVERTER_CLASS_CONFIG,
570570
KEY_CONVERTER_VERSION_CONFIG,
571-
Collections.singletonMap(ConverterConfig.TYPE_CONFIG, ConverterType.KEY.getName()),
571+
Map.of(ConverterConfig.TYPE_CONFIG, ConverterType.KEY.getName()),
572572
connectorLoader,
573573
reportStage
574574
);
@@ -582,7 +582,7 @@ private ConfigInfos validateAllConverterConfigs(
582582
"value converter",
583583
VALUE_CONVERTER_CLASS_CONFIG,
584584
VALUE_CONVERTER_VERSION_CONFIG,
585-
Collections.singletonMap(ConverterConfig.TYPE_CONFIG, ConverterType.VALUE.getName()),
585+
Map.of(ConverterConfig.TYPE_CONFIG, ConverterType.VALUE.getName()),
586586
connectorLoader,
587587
reportStage
588588
);
@@ -1278,7 +1278,7 @@ public List<String> setWorkerLoggerLevel(String namespace, String desiredLevelSt
12781278

12791279
if (!loggers.isValidLevel(normalizedLevel)) {
12801280
log.warn("Ignoring request to set invalid level '{}' for namespace {}", desiredLevelStr, namespace);
1281-
return Collections.emptyList();
1281+
return List.of();
12821282
}
12831283

12841284
return loggers.setLevel(namespace, normalizedLevel);

connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ public MetricGroupId(String groupName, Map<String, String> tags) {
253253
Objects.requireNonNull(groupName);
254254
Objects.requireNonNull(tags);
255255
this.groupName = groupName;
256-
this.tags = Collections.unmodifiableMap(new LinkedHashMap<>(tags));
256+
this.tags = Collections.unmodifiableMap(new LinkedHashMap<>(tags)); // To ensure the order of insertion, we have to use Collections.
257257
this.hc = Objects.hash(this.groupName, this.tags);
258258
StringBuilder sb = new StringBuilder(this.groupName);
259259
for (Map.Entry<String, String> entry : this.tags.entrySet()) {

connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919
import org.apache.kafka.common.MetricNameTemplate;
2020

2121
import java.util.ArrayList;
22-
import java.util.Collections;
23-
import java.util.HashMap;
2422
import java.util.LinkedHashSet;
2523
import java.util.List;
2624
import java.util.Map;
@@ -135,7 +133,7 @@ public class ConnectMetricsRegistry {
135133
public final MetricNameTemplate predicateClass;
136134
public final MetricNameTemplate predicateVersion;
137135

138-
public Map<MetricNameTemplate, TaskStatus.State> connectorStatusMetrics;
136+
public final Map<MetricNameTemplate, TaskStatus.State> connectorStatusMetrics;
139137

140138
public ConnectMetricsRegistry() {
141139
this(new LinkedHashSet<>());
@@ -388,14 +386,14 @@ public ConnectMetricsRegistry(Set<String> tags) {
388386
WORKER_GROUP_NAME,
389387
"The number of restarting tasks of the connector on the worker.", workerConnectorTags);
390388

391-
connectorStatusMetrics = new HashMap<>();
392-
connectorStatusMetrics.put(connectorRunningTaskCount, TaskStatus.State.RUNNING);
393-
connectorStatusMetrics.put(connectorPausedTaskCount, TaskStatus.State.PAUSED);
394-
connectorStatusMetrics.put(connectorFailedTaskCount, TaskStatus.State.FAILED);
395-
connectorStatusMetrics.put(connectorUnassignedTaskCount, TaskStatus.State.UNASSIGNED);
396-
connectorStatusMetrics.put(connectorDestroyedTaskCount, TaskStatus.State.DESTROYED);
397-
connectorStatusMetrics.put(connectorRestartingTaskCount, TaskStatus.State.RESTARTING);
398-
connectorStatusMetrics = Collections.unmodifiableMap(connectorStatusMetrics);
389+
connectorStatusMetrics = Map.of(
390+
connectorRunningTaskCount, TaskStatus.State.RUNNING,
391+
connectorPausedTaskCount, TaskStatus.State.PAUSED,
392+
connectorFailedTaskCount, TaskStatus.State.FAILED,
393+
connectorUnassignedTaskCount, TaskStatus.State.UNASSIGNED,
394+
connectorDestroyedTaskCount, TaskStatus.State.DESTROYED,
395+
connectorRestartingTaskCount, TaskStatus.State.RESTARTING
396+
);
399397

400398
/* Worker rebalance level */
401399
Set<String> rebalanceTags = new LinkedHashSet<>(tags);
@@ -444,7 +442,7 @@ private MetricNameTemplate createTemplate(String name, String group, String doc,
444442
}
445443

446444
public List<MetricNameTemplate> getAllTemplates() {
447-
return Collections.unmodifiableList(allTemplates);
445+
return List.copyOf(allTemplates);
448446
}
449447

450448
public String connectorTagName() {

connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
import org.slf4j.LoggerFactory;
4848

4949
import java.util.ArrayList;
50-
import java.util.Collections;
5150
import java.util.HashSet;
5251
import java.util.LinkedHashSet;
5352
import java.util.List;
@@ -250,8 +249,8 @@ protected static ConfigDef configDef(
250249
.define(VALUE_CONVERTER_VERSION_CONFIG, Type.STRING, valueConverterDefaults.version, VALUE_CONVERTER_VERSION_VALIDATOR, Importance.LOW, VALUE_CONVERTER_VERSION_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, VALUE_CONVERTER_VERSION_DISPLAY, recommender.valueConverterPluginVersionRecommender())
251250
.define(HEADER_CONVERTER_CLASS_CONFIG, Type.CLASS, headerConverterDefaults.type, HEADER_CONVERTER_CLASS_VALIDATOR, Importance.LOW, HEADER_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, HEADER_CONVERTER_CLASS_DISPLAY, recommender.headerConverterPluginRecommender())
252251
.define(HEADER_CONVERTER_VERSION_CONFIG, Type.STRING, headerConverterDefaults.version, HEADER_CONVERTER_VERSION_VALIDATOR, Importance.LOW, HEADER_CONVERTER_VERSION_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, HEADER_CONVERTER_VERSION_DISPLAY, recommender.headerConverterPluginVersionRecommender())
253-
.define(TRANSFORMS_CONFIG, Type.LIST, Collections.emptyList(), aliasValidator("transformation"), Importance.LOW, TRANSFORMS_DOC, TRANSFORMS_GROUP, ++orderInGroup, Width.LONG, TRANSFORMS_DISPLAY)
254-
.define(PREDICATES_CONFIG, Type.LIST, Collections.emptyList(), aliasValidator("predicate"), Importance.LOW, PREDICATES_DOC, PREDICATES_GROUP, ++orderInGroup, Width.LONG, PREDICATES_DISPLAY)
252+
.define(TRANSFORMS_CONFIG, Type.LIST, List.of(), aliasValidator("transformation"), Importance.LOW, TRANSFORMS_DOC, TRANSFORMS_GROUP, ++orderInGroup, Width.LONG, TRANSFORMS_DISPLAY)
253+
.define(PREDICATES_CONFIG, Type.LIST, List.of(), aliasValidator("predicate"), Importance.LOW, PREDICATES_DOC, PREDICATES_GROUP, ++orderInGroup, Width.LONG, PREDICATES_DISPLAY)
255254
.define(CONFIG_RELOAD_ACTION_CONFIG, Type.STRING, CONFIG_RELOAD_ACTION_RESTART,
256255
in(CONFIG_RELOAD_ACTION_NONE, CONFIG_RELOAD_ACTION_RESTART), Importance.LOW,
257256
CONFIG_RELOAD_ACTION_DOC, COMMON_GROUP, ++orderInGroup, Width.MEDIUM, CONFIG_RELOAD_ACTION_DISPLAY)
@@ -303,7 +302,7 @@ private static ConfigDef.CompositeValidator aliasValidator(String kind) {
303302
}
304303

305304
public ConnectorConfig(Plugins plugins) {
306-
this(plugins, Collections.emptyMap());
305+
this(plugins, Map.of());
307306
}
308307

309308
public ConnectorConfig(Plugins plugins, Map<String, String> props) {
@@ -646,7 +645,7 @@ void enrich(ConfigDef newDef, Plugins plugins) {
646645
newDef.define(typeConfig, Type.CLASS, ConfigDef.NO_DEFAULT_VALUE, typeValidator, Importance.HIGH,
647646
"Class for the '" + alias + "' " + aliasKind.toLowerCase(Locale.ENGLISH) + ".", group, orderInGroup++, Width.LONG,
648647
baseClass.getSimpleName() + " type for " + alias,
649-
Collections.emptyList(), new ClassRecommender());
648+
List.of(), new ClassRecommender());
650649

651650
// Add the version configuration
652651
final ConfigDef.Validator versionValidator = (name, value) -> {
@@ -664,7 +663,7 @@ void enrich(ConfigDef newDef, Plugins plugins) {
664663
newDef.define(versionConfig, Type.STRING, defaultVersion, versionValidator, Importance.HIGH,
665664
"Version of the '" + alias + "' " + aliasKind.toLowerCase(Locale.ENGLISH) + ".", group, orderInGroup++, Width.LONG,
666665
baseClass.getSimpleName() + " version for " + alias,
667-
Collections.emptyList(), versionRecommender(typeConfig));
666+
List.of(), versionRecommender(typeConfig));
668667

669668
final ConfigDef configDef = populateConfigDef(typeConfig, versionConfig, plugins);
670669
if (configDef == null) continue;
@@ -780,11 +779,7 @@ final class ClassRecommender implements ConfigDef.Recommender {
780779

781780
@Override
782781
public List<Object> validValues(String name, Map<String, Object> parsedConfig) {
783-
List<Object> result = new ArrayList<>();
784-
for (PluginDesc<T> plugin : plugins()) {
785-
result.add(plugin.pluginClass());
786-
}
787-
return Collections.unmodifiableList(result);
782+
return plugins().stream().map(p -> (Object) p.pluginClass()).toList();
788783
}
789784

790785
@Override

connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.apache.kafka.connect.transforms.util.RegexValidator;
2828

2929
import java.util.ArrayList;
30-
import java.util.Collections;
3130
import java.util.LinkedHashMap;
3231
import java.util.List;
3332
import java.util.Map;
@@ -169,7 +168,7 @@ public static void validate(Map<String, String> props, Map<String, ConfigValue>
169168
private static void addErrorMessage(Map<String, ConfigValue> validatedConfig, String name, String value, String errorMessage) {
170169
validatedConfig.computeIfAbsent(
171170
name,
172-
p -> new ConfigValue(name, value, Collections.emptyList(), new ArrayList<>())
171+
p -> new ConfigValue(name, value, List.of(), new ArrayList<>())
173172
).addErrorMessage(
174173
errorMessage
175174
);
@@ -189,7 +188,7 @@ public static boolean hasDlqTopicConfig(Map<String, String> props) {
189188
public static List<String> parseTopicsList(Map<String, String> props) {
190189
List<String> topics = (List<String>) ConfigDef.parseType(TOPICS_CONFIG, props.get(TOPICS_CONFIG), Type.LIST);
191190
if (topics == null) {
192-
return Collections.emptyList();
191+
return List.of();
193192
}
194193
return topics
195194
.stream()

connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,12 @@
2626
import org.slf4j.LoggerFactory;
2727

2828
import java.util.ArrayList;
29-
import java.util.Collections;
3029
import java.util.HashMap;
3130
import java.util.HashSet;
3231
import java.util.List;
3332
import java.util.Locale;
3433
import java.util.Map;
34+
import java.util.Set;
3535
import java.util.stream.Collectors;
3636

3737
import static org.apache.kafka.common.utils.Utils.enumOptions;
@@ -132,7 +132,7 @@ private static ConfigDef configDef(ConfigDef baseConfigDef) {
132132
.define(
133133
TOPIC_CREATION_GROUPS_CONFIG,
134134
ConfigDef.Type.LIST,
135-
Collections.emptyList(),
135+
List.of(),
136136
ConfigDef.CompositeValidator.of(
137137
new ConfigDef.NonNullValidator(),
138138
ConfigDef.LambdaValidator.with(
@@ -240,7 +240,7 @@ public static ConfigDef enrich(ConfigDef baseConfigDef, Map<String, String> prop
240240
if (topicCreationGroups.contains(DEFAULT_TOPIC_CREATION_GROUP)) {
241241
log.warn("'{}' topic creation group always exists and does not need to be listed explicitly",
242242
DEFAULT_TOPIC_CREATION_GROUP);
243-
topicCreationGroups.removeAll(Collections.singleton(DEFAULT_TOPIC_CREATION_GROUP));
243+
topicCreationGroups.removeAll(Set.of(DEFAULT_TOPIC_CREATION_GROUP));
244244
}
245245

246246
ConfigDef newDef = new ConfigDef(baseConfigDef);
@@ -332,7 +332,7 @@ public Integer topicCreationPartitions(String group) {
332332

333333
public Map<String, Object> topicCreationOtherConfigs(String group) {
334334
if (enrichedSourceConfig == null) {
335-
return Collections.emptyMap();
335+
return Map.of();
336336
}
337337
return enrichedSourceConfig.originalsWithPrefix(TOPIC_CREATION_PREFIX + group + '.').entrySet().stream()
338338
.filter(e -> {

connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ record CommittableOffsets(Map<Map<String, Object>, Map<String, Object>> offsets,
250250
/**
251251
* An "empty" snapshot that contains no offsets to commit and whose metadata contains no committable or uncommitable messages.
252252
*/
253-
public static final CommittableOffsets EMPTY = new CommittableOffsets(Collections.emptyMap(), 0, 0, 0, 0, null);
253+
public static final CommittableOffsets EMPTY = new CommittableOffsets(Map.of(), 0, 0, 0, 0, null);
254254

255255
CommittableOffsets {
256256
offsets = Collections.unmodifiableMap(offsets);

0 commit comments

Comments
 (0)