Skip to content

Commit

Permalink
MINOR: Remove deprecated constructors from Connect's Kafka*BackingSto…
Browse files Browse the repository at this point in the history
…re classes (#15865)

- These constructors were deprecated over 3 years ago in KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics #9780.
- While these classes are not a part of Connect's public API, deprecation was still introduced instead of outright removal because they are useful utility classes that might've been used outside of Connect.
- The KafkaOffsetBackingStore's deprecated constructor was removed in KAFKA-14785: Connect offset read REST API #13434.
- This patch removes the deprecated constructors for KafkaConfigBackingStore and KafkaStatusBackingStore.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
  • Loading branch information
yashmayya committed May 17, 2024
1 parent 93a5efc commit 6aac009
Show file tree
Hide file tree
Showing 7 changed files with 15 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.SharedTopicAdmin;
import org.apache.kafka.connect.util.TopicAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -300,7 +299,6 @@ public static String LOGGER_CLUSTER_KEY(String namespace) {
final Map<String, Map<String, String>> connectorConfigs = new HashMap<>();
final Map<ConnectorTaskId, Map<String, String>> taskConfigs = new HashMap<>();
private final Supplier<TopicAdmin> topicAdminSupplier;
private SharedTopicAdmin ownTopicAdmin;
private final String clientId;

// Set of connectors where we saw a task commit with an incomplete set of task config updates, indicating the data
Expand Down Expand Up @@ -334,11 +332,6 @@ void setConfigLog(KafkaBasedLog<String, byte[]> configLog) {
this.configLog = configLog;
}

@Deprecated
public KafkaConfigBackingStore(Converter converter, DistributedConfig config, WorkerConfigTransformer configTransformer) {
this(converter, config, configTransformer, null, "connect-distributed-");
}

public KafkaConfigBackingStore(Converter converter, DistributedConfig config, WorkerConfigTransformer configTransformer, Supplier<TopicAdmin> adminSupplier, String clientIdBase) {
this(converter, config, configTransformer, adminSupplier, clientIdBase, Time.SYSTEM);
}
Expand Down Expand Up @@ -411,7 +404,6 @@ public void stop() {
log.info("Closing KafkaConfigBackingStore");

relinquishWritePrivileges();
Utils.closeQuietly(ownTopicAdmin, "admin for config topic");
Utils.closeQuietly(configLog::stop, "KafkaBasedLog for config topic");

log.info("Closed KafkaConfigBackingStore");
Expand Down Expand Up @@ -794,14 +786,7 @@ KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, final Wo
Map<String, Object> adminProps = new HashMap<>(originals);
ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId);
adminProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId);
Supplier<TopicAdmin> adminSupplier;
if (topicAdminSupplier != null) {
adminSupplier = topicAdminSupplier;
} else {
// Create our own topic admin supplier that we'll close when we're stopped
ownTopicAdmin = new SharedTopicAdmin(adminProps);
adminSupplier = ownTopicAdmin;
}

Map<String, Object> topicSettings = config instanceof DistributedConfig
? ((DistributedConfig) config).configStorageTopicSettings()
: Collections.emptyMap();
Expand All @@ -812,7 +797,7 @@ KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, final Wo
.replicationFactor(config.getShort(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG))
.build();

return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminSupplier, config, time);
return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, topicAdminSupplier, config, time);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConvertingFutureCallback;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.SharedTopicAdmin;
import org.apache.kafka.connect.util.TopicAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -149,7 +148,6 @@ private static String noClientId() {
private Converter keyConverter;
private final Supplier<TopicAdmin> topicAdminSupplier;
private final Supplier<String> clientIdBase;
private SharedTopicAdmin ownTopicAdmin;
protected boolean exactlyOnce;

/**
Expand Down Expand Up @@ -211,17 +209,9 @@ public void configure(final WorkerConfig config) {
Map<String, Object> adminProps = new HashMap<>(originals);
adminProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId);
ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId);
Supplier<TopicAdmin> adminSupplier;
if (topicAdminSupplier != null) {
adminSupplier = topicAdminSupplier;
} else {
// Create our own topic admin supplier that we'll close when we're stopped
this.ownTopicAdmin = new SharedTopicAdmin(adminProps);
adminSupplier = ownTopicAdmin;
}
NewTopic topicDescription = newTopicDescription(topic, config);

this.offsetLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback, topicDescription, adminSupplier, config, Time.SYSTEM);
this.offsetLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback, topicDescription, topicAdminSupplier, config, Time.SYSTEM);
}

protected NewTopic newTopicDescription(final String topic, final WorkerConfig config) {
Expand Down Expand Up @@ -268,13 +258,7 @@ public void start() {
@Override
public void stop() {
log.info("Stopping KafkaOffsetBackingStore");
try {
offsetLog.stop();
} finally {
if (ownTopicAdmin != null) {
ownTopicAdmin.close();
}
}
offsetLog.stop();
log.info("Stopped KafkaOffsetBackingStore");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.SharedTopicAdmin;
import org.apache.kafka.connect.util.Table;
import org.apache.kafka.connect.util.TopicAdmin;
import org.slf4j.Logger;
Expand Down Expand Up @@ -139,14 +138,8 @@ public class KafkaStatusBackingStore extends KafkaTopicBasedBackingStore impleme
private String statusTopic;
private KafkaBasedLog<String, byte[]> kafkaLog;
private int generation;
private SharedTopicAdmin ownTopicAdmin;
private ExecutorService sendRetryExecutor;

@Deprecated
public KafkaStatusBackingStore(Time time, Converter converter) {
this(time, converter, null, "connect-distributed-");
}

public KafkaStatusBackingStore(Time time, Converter converter, Supplier<TopicAdmin> topicAdminSupplier, String clientIdBase) {
this.time = time;
this.converter = converter;
Expand All @@ -158,8 +151,9 @@ public KafkaStatusBackingStore(Time time, Converter converter, Supplier<TopicAdm
}

// visible for testing
KafkaStatusBackingStore(Time time, Converter converter, String statusTopic, KafkaBasedLog<String, byte[]> kafkaLog) {
this(time, converter);
KafkaStatusBackingStore(Time time, Converter converter, String statusTopic, Supplier<TopicAdmin> topicAdminSupplier,
KafkaBasedLog<String, byte[]> kafkaLog) {
this(time, converter, null, "connect-distributed-");
this.kafkaLog = kafkaLog;
this.statusTopic = statusTopic;
sendRetryExecutor = Executors.newSingleThreadExecutor(
Expand Down Expand Up @@ -199,14 +193,6 @@ public void configure(final WorkerConfig config) {
Map<String, Object> adminProps = new HashMap<>(originals);
adminProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId);
ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId);
Supplier<TopicAdmin> adminSupplier;
if (topicAdminSupplier != null) {
adminSupplier = topicAdminSupplier;
} else {
// Create our own topic admin supplier that we'll close when we're stopped
ownTopicAdmin = new SharedTopicAdmin(adminProps);
adminSupplier = ownTopicAdmin;
}

Map<String, Object> topicSettings = config instanceof DistributedConfig
? ((DistributedConfig) config).statusStorageTopicSettings()
Expand All @@ -219,7 +205,7 @@ public void configure(final WorkerConfig config) {
.build();

Callback<ConsumerRecord<String, byte[]>> readCallback = (error, record) -> read(record);
this.kafkaLog = createKafkaBasedLog(statusTopic, producerProps, consumerProps, readCallback, topicDescription, adminSupplier, config, time);
this.kafkaLog = createKafkaBasedLog(statusTopic, producerProps, consumerProps, readCallback, topicDescription, topicAdminSupplier, config, time);
}

@Override
Expand All @@ -236,9 +222,6 @@ public void stop() {
kafkaLog.stop();
} finally {
ThreadUtils.shutdownExecutorServiceQuietly(sendRetryExecutor, 10, TimeUnit.SECONDS);
if (ownTopicAdmin != null) {
ownTopicAdmin.close();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ private void createStore() {
doReturn("test-cluster").when(config).kafkaClusterId();
configStorage = Mockito.spy(
new KafkaConfigBackingStore(
converter, config, null, null, CLIENT_ID_BASE, time)
converter, config, null, () -> null, CLIENT_ID_BASE, time)
);
configStorage.setConfigLog(configLog);
configStorage.setUpdateListener(configUpdateListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,11 @@ private void createStore() {
// The kafkaClusterId is used in the constructor for KafkaConfigBackingStore
// So temporarily enter replay mode in order to mock that call
EasyMock.replay(config);
Supplier<TopicAdmin> topicAdminSupplier = () -> null;
configStorage = PowerMock.createPartialMock(
KafkaConfigBackingStore.class,
new String[]{"createKafkaBasedLog", "createFencableProducer"},
converter, config, null, null, CLIENT_ID_BASE, time);
converter, config, null, topicAdminSupplier, CLIENT_ID_BASE, time);
Whitebox.setInternalState(configStorage, "configLog", storeLog);
configStorage.setUpdateListener(configUpdateListener);
// The mock must be reset and re-mocked for the remainder of the test.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,15 @@ public class KafkaStatusBackingStoreFormatTest {

private Time time;
private KafkaStatusBackingStore store;
private JsonConverter converter;

private KafkaBasedLog<String, byte[]> kafkaBasedLog = mock(KafkaBasedLog.class);
private final KafkaBasedLog<String, byte[]> kafkaBasedLog = mock(KafkaBasedLog.class);

@Before
public void setup() {
time = new MockTime();
converter = new JsonConverter();
JsonConverter converter = new JsonConverter();
converter.configure(Collections.singletonMap(SCHEMAS_ENABLE_CONFIG, false), false);
store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, () -> null, kafkaBasedLog);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public class KafkaStatusBackingStoreTest {

@Before
public void setup() {
store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, () -> null, kafkaBasedLog);
}

@Test
Expand Down

0 comments on commit 6aac009

Please sign in to comment.