Skip to content
Permalink
Browse files
[FLINK-27174][connector/kafka] Fix checking of bootstrapServers when …
…already provided in producer Properties
  • Loading branch information
zhangzhengqi3 authored and fapaul committed May 19, 2022
1 parent 124e4ad commit b4bb9c8bffe1e37ad6912348d8b3bef89af42286
Showing 2 changed files with 36 additions and 14 deletions.
@@ -74,7 +74,6 @@

private final Properties kafkaProducerConfig;
private KafkaRecordSerializationSchema<IN> recordSerializer;
private String bootstrapServers;

KafkaSinkBuilder() {
kafkaProducerConfig = new Properties();
@@ -188,8 +187,19 @@ public KafkaSinkBuilder<IN> setTransactionalIdPrefix(String transactionalIdPrefi
* @return {@link KafkaSinkBuilder}
*/
public KafkaSinkBuilder<IN> setBootstrapServers(String bootstrapServers) {
this.bootstrapServers = checkNotNull(bootstrapServers);
return this;
return setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
}

private void sanityCheck() {
checkNotNull(
kafkaProducerConfig.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
"bootstrapServers");
if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
checkState(
transactionalIdPrefix != null,
"EXACTLY_ONCE delivery guarantee requires a transactionIdPrefix to be set to provide unique transaction names across multiple KafkaSinks writing to the same Kafka cluster.");
}
checkNotNull(recordSerializer, "recordSerializer");
}

/**
@@ -198,17 +208,8 @@ public KafkaSinkBuilder<IN> setBootstrapServers(String bootstrapServers) {
* @return {@link KafkaSink}
*/
public KafkaSink<IN> build() {
checkNotNull(bootstrapServers);
if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
checkState(
transactionalIdPrefix != null,
"EXACTLY_ONCE delivery guarantee requires a transactionIdPrefix to be set to provide unique transaction names across multiple KafkaSinks writing to the same Kafka cluster.");
}
kafkaProducerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
sanityCheck();
return new KafkaSink<>(
deliveryGuarantee,
kafkaProducerConfig,
transactionalIdPrefix,
checkNotNull(recordSerializer, "recordSerializer"));
deliveryGuarantee, kafkaProducerConfig, transactionalIdPrefix, recordSerializer);
}
}
@@ -79,6 +79,18 @@ public void testPropertyHandling() {
});
}

@Test
public void testBootstrapServerSetting() {
Properties testConf1 = new Properties();
testConf1.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "testServer");

validateProducerConfig(
getNoServerBuilder().setKafkaProducerConfig(testConf1),
p -> {
Arrays.stream(DEFAULT_KEYS).forEach(k -> assertTrue(k, p.containsKey(k)));
});
}

private void validateProducerConfig(
KafkaSinkBuilder<?> builder, Consumer<Properties> validator) {
validator.accept(builder.build().getKafkaProducerConfig());
@@ -93,4 +105,13 @@ private KafkaSinkBuilder<String> getBasicBuilder() {
.setValueSerializationSchema(new SimpleStringSchema())
.build());
}

private KafkaSinkBuilder<String> getNoServerBuilder() {
return new KafkaSinkBuilder<String>()
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setTopic("topic")
.setValueSerializationSchema(new SimpleStringSchema())
.build());
}
}

0 comments on commit b4bb9c8

Please sign in to comment.