Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: Remove deprecated constructors from Connect's Kafka*BackingStore classes #15865

Merged
merged 2 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.SharedTopicAdmin;
import org.apache.kafka.connect.util.TopicAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -300,7 +299,6 @@ public static String LOGGER_CLUSTER_KEY(String namespace) {
final Map<String, Map<String, String>> connectorConfigs = new HashMap<>();
final Map<ConnectorTaskId, Map<String, String>> taskConfigs = new HashMap<>();
private final Supplier<TopicAdmin> topicAdminSupplier;
private SharedTopicAdmin ownTopicAdmin;
private final String clientId;

// Set of connectors where we saw a task commit with an incomplete set of task config updates, indicating the data
Expand Down Expand Up @@ -334,11 +332,6 @@ void setConfigLog(KafkaBasedLog<String, byte[]> configLog) {
this.configLog = configLog;
}

@Deprecated
public KafkaConfigBackingStore(Converter converter, DistributedConfig config, WorkerConfigTransformer configTransformer) {
this(converter, config, configTransformer, null, "connect-distributed-");
}

public KafkaConfigBackingStore(Converter converter, DistributedConfig config, WorkerConfigTransformer configTransformer, Supplier<TopicAdmin> adminSupplier, String clientIdBase) {
this(converter, config, configTransformer, adminSupplier, clientIdBase, Time.SYSTEM);
}
Expand Down Expand Up @@ -411,7 +404,6 @@ public void stop() {
log.info("Closing KafkaConfigBackingStore");

relinquishWritePrivileges();
Utils.closeQuietly(ownTopicAdmin, "admin for config topic");
Utils.closeQuietly(configLog::stop, "KafkaBasedLog for config topic");

log.info("Closed KafkaConfigBackingStore");
Expand Down Expand Up @@ -794,14 +786,7 @@ KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, final Wo
Map<String, Object> adminProps = new HashMap<>(originals);
ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId);
adminProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId);
Supplier<TopicAdmin> adminSupplier;
if (topicAdminSupplier != null) {
adminSupplier = topicAdminSupplier;
} else {
// Create our own topic admin supplier that we'll close when we're stopped
ownTopicAdmin = new SharedTopicAdmin(adminProps);
adminSupplier = ownTopicAdmin;
}

Map<String, Object> topicSettings = config instanceof DistributedConfig
? ((DistributedConfig) config).configStorageTopicSettings()
: Collections.emptyMap();
Expand All @@ -812,7 +797,7 @@ KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, final Wo
.replicationFactor(config.getShort(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG))
.build();

return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminSupplier, config, time);
return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, topicAdminSupplier, config, time);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConvertingFutureCallback;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.SharedTopicAdmin;
import org.apache.kafka.connect.util.TopicAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -149,7 +148,6 @@ private static String noClientId() {
private Converter keyConverter;
private final Supplier<TopicAdmin> topicAdminSupplier;
private final Supplier<String> clientIdBase;
private SharedTopicAdmin ownTopicAdmin;
protected boolean exactlyOnce;

/**
Expand Down Expand Up @@ -211,17 +209,9 @@ public void configure(final WorkerConfig config) {
Map<String, Object> adminProps = new HashMap<>(originals);
adminProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId);
ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId);
Supplier<TopicAdmin> adminSupplier;
if (topicAdminSupplier != null) {
adminSupplier = topicAdminSupplier;
} else {
// Create our own topic admin supplier that we'll close when we're stopped
this.ownTopicAdmin = new SharedTopicAdmin(adminProps);
adminSupplier = ownTopicAdmin;
}
NewTopic topicDescription = newTopicDescription(topic, config);

this.offsetLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback, topicDescription, adminSupplier, config, Time.SYSTEM);
this.offsetLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback, topicDescription, topicAdminSupplier, config, Time.SYSTEM);
}

protected NewTopic newTopicDescription(final String topic, final WorkerConfig config) {
Expand Down Expand Up @@ -268,13 +258,7 @@ public void start() {
@Override
public void stop() {
log.info("Stopping KafkaOffsetBackingStore");
try {
offsetLog.stop();
} finally {
if (ownTopicAdmin != null) {
ownTopicAdmin.close();
}
}
offsetLog.stop();
log.info("Stopped KafkaOffsetBackingStore");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.SharedTopicAdmin;
import org.apache.kafka.connect.util.Table;
import org.apache.kafka.connect.util.TopicAdmin;
import org.slf4j.Logger;
Expand Down Expand Up @@ -139,14 +138,8 @@ public class KafkaStatusBackingStore extends KafkaTopicBasedBackingStore impleme
private String statusTopic;
private KafkaBasedLog<String, byte[]> kafkaLog;
private int generation;
private SharedTopicAdmin ownTopicAdmin;
private ExecutorService sendRetryExecutor;

@Deprecated
public KafkaStatusBackingStore(Time time, Converter converter) {
this(time, converter, null, "connect-distributed-");
}

public KafkaStatusBackingStore(Time time, Converter converter, Supplier<TopicAdmin> topicAdminSupplier, String clientIdBase) {
this.time = time;
this.converter = converter;
Expand All @@ -158,8 +151,9 @@ public KafkaStatusBackingStore(Time time, Converter converter, Supplier<TopicAdm
}

// visible for testing
KafkaStatusBackingStore(Time time, Converter converter, String statusTopic, KafkaBasedLog<String, byte[]> kafkaLog) {
this(time, converter);
KafkaStatusBackingStore(Time time, Converter converter, String statusTopic, Supplier<TopicAdmin> topicAdminSupplier,
KafkaBasedLog<String, byte[]> kafkaLog) {
this(time, converter, null, "connect-distributed-");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This constructor is used by testing. It set topicAdminSupplier to null, so we have to handle the "null" topicAdminSupplier just for testing. That is a bit awkward to me. Could we require those test cases pass a topicAdminSupplier instead of null? Those tests can pass a fake topicAdminSupplier to constructor if they expect topicAdminSupplier should not be called in testing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's a good point. I think this and the "ownTopicAdmin" are cruft left over from earlier refactors and are definitely no longer used anywhere in Connect itself (or MM2). Since these aren't part of Connect's public API, I think we should be fine with removing them.

this.kafkaLog = kafkaLog;
this.statusTopic = statusTopic;
sendRetryExecutor = Executors.newSingleThreadExecutor(
Expand Down Expand Up @@ -199,14 +193,6 @@ public void configure(final WorkerConfig config) {
Map<String, Object> adminProps = new HashMap<>(originals);
adminProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId);
ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId);
Supplier<TopicAdmin> adminSupplier;
if (topicAdminSupplier != null) {
adminSupplier = topicAdminSupplier;
} else {
// Create our own topic admin supplier that we'll close when we're stopped
ownTopicAdmin = new SharedTopicAdmin(adminProps);
adminSupplier = ownTopicAdmin;
}

Map<String, Object> topicSettings = config instanceof DistributedConfig
? ((DistributedConfig) config).statusStorageTopicSettings()
Expand All @@ -219,7 +205,7 @@ public void configure(final WorkerConfig config) {
.build();

Callback<ConsumerRecord<String, byte[]>> readCallback = (error, record) -> read(record);
this.kafkaLog = createKafkaBasedLog(statusTopic, producerProps, consumerProps, readCallback, topicDescription, adminSupplier, config, time);
this.kafkaLog = createKafkaBasedLog(statusTopic, producerProps, consumerProps, readCallback, topicDescription, topicAdminSupplier, config, time);
}

@Override
Expand All @@ -236,9 +222,6 @@ public void stop() {
kafkaLog.stop();
} finally {
ThreadUtils.shutdownExecutorServiceQuietly(sendRetryExecutor, 10, TimeUnit.SECONDS);
if (ownTopicAdmin != null) {
ownTopicAdmin.close();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ private void createStore() {
doReturn("test-cluster").when(config).kafkaClusterId();
configStorage = Mockito.spy(
new KafkaConfigBackingStore(
converter, config, null, null, CLIENT_ID_BASE, time)
converter, config, null, () -> null, CLIENT_ID_BASE, time)
);
configStorage.setConfigLog(configLog);
configStorage.setUpdateListener(configUpdateListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,11 @@ private void createStore() {
// The kafkaClusterId is used in the constructor for KafkaConfigBackingStore
// So temporarily enter replay mode in order to mock that call
EasyMock.replay(config);
Supplier<TopicAdmin> topicAdminSupplier = () -> null;
configStorage = PowerMock.createPartialMock(
KafkaConfigBackingStore.class,
new String[]{"createKafkaBasedLog", "createFencableProducer"},
converter, config, null, null, CLIENT_ID_BASE, time);
converter, config, null, topicAdminSupplier, CLIENT_ID_BASE, time);
Whitebox.setInternalState(configStorage, "configLog", storeLog);
configStorage.setUpdateListener(configUpdateListener);
// The mock must be reset and re-mocked for the remainder of the test.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,15 @@ public class KafkaStatusBackingStoreFormatTest {

private Time time;
private KafkaStatusBackingStore store;
private JsonConverter converter;

private KafkaBasedLog<String, byte[]> kafkaBasedLog = mock(KafkaBasedLog.class);
private final KafkaBasedLog<String, byte[]> kafkaBasedLog = mock(KafkaBasedLog.class);

@Before
public void setup() {
time = new MockTime();
converter = new JsonConverter();
JsonConverter converter = new JsonConverter();
converter.configure(Collections.singletonMap(SCHEMAS_ENABLE_CONFIG, false), false);
store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, () -> null, kafkaBasedLog);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public class KafkaStatusBackingStoreTest {

@Before
public void setup() {
store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, () -> null, kafkaBasedLog);
}

@Test
Expand Down