diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java index eea1390bfd68d..0e0a874f023c3 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java @@ -47,6 +47,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.io.IOException; import java.util.ArrayDeque; import java.util.Collection; @@ -135,7 +137,10 @@ class KafkaWriter this.kafkaProducerConfig = checkNotNull(kafkaProducerConfig, "kafkaProducerConfig"); this.transactionalIdPrefix = checkNotNull(transactionalIdPrefix, "transactionalIdPrefix"); this.recordSerializer = checkNotNull(recordSerializer, "recordSerializer"); - this.deliveryCallback = new WriterCallback(sinkInitContext.getMailboxExecutor()); + this.deliveryCallback = + new WriterCallback( + sinkInitContext.getMailboxExecutor(), + sinkInitContext.metadataConsumer().orElse(null)); this.disabledMetrics = kafkaProducerConfig.containsKey(KEY_DISABLE_METRICS) && Boolean.parseBoolean( @@ -389,9 +394,13 @@ private void registerMetricSync() { private class WriterCallback implements Callback { private final MailboxExecutor mailboxExecutor; + @Nullable private final Consumer metadataConsumer; - public WriterCallback(MailboxExecutor mailboxExecutor) { + public WriterCallback( + MailboxExecutor mailboxExecutor, + @Nullable Consumer metadataConsumer) { this.mailboxExecutor = mailboxExecutor; + this.metadataConsumer = metadataConsumer; } @Override @@ -403,6 +412,10 @@ public void onCompletion(RecordMetadata metadata, Exception exception) { () -> throwException(metadata, exception, producer), "Failed to send data to Kafka"); } + + if (metadataConsumer != null) { + metadataConsumer.accept(metadata); + } } private void throwException( diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java index b317f203d3bc3..559eb38bf500f 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java @@ -40,6 +40,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; @@ -54,8 +55,11 @@ import org.testcontainers.containers.KafkaContainer; import org.testcontainers.containers.Network; +import javax.annotation.Nullable; + import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; import java.util.List; @@ -64,6 +68,7 @@ import java.util.PriorityQueue; import java.util.Properties; import java.util.concurrent.ScheduledFuture; +import java.util.function.Consumer; import java.util.stream.IntStream; import static org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaContainer; @@ -180,6 +185,25 @@ public void testCurrentSendTimeMetric() throws Exception { } } + @Test + public void testMetadataPublisher() throws Exception { + List metadataList = new ArrayList<>(); + try (final KafkaWriter writer = + createWriterWithConfiguration( + getKafkaClientConfiguration(), + DeliveryGuarantee.AT_LEAST_ONCE, + InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()), + meta -> metadataList.add(meta.toString()))) { + List expected = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + writer.write(1, SINK_WRITER_CONTEXT); + expected.add("testMetadataPublisher-0@" + i); + } + writer.prepareCommit(); + org.assertj.core.api.Assertions.assertThat(metadataList).isEqualTo(expected); + } + } + /** Test that producer is not accidentally recreated or pool is used. */ @Test void testLingeringTransaction() throws Exception { @@ -341,11 +365,19 @@ private KafkaWriter createWriterWithConfiguration( Properties config, DeliveryGuarantee guarantee, SinkWriterMetricGroup sinkWriterMetricGroup) { + return createWriterWithConfiguration(config, guarantee, sinkWriterMetricGroup, null); + } + + private KafkaWriter createWriterWithConfiguration( + Properties config, + DeliveryGuarantee guarantee, + SinkWriterMetricGroup sinkWriterMetricGroup, + @Nullable Consumer metadataConsumer) { return new KafkaWriter<>( guarantee, config, "test-prefix", - new SinkInitContext(sinkWriterMetricGroup, timeService), + new SinkInitContext(sinkWriterMetricGroup, timeService, metadataConsumer), new DummyRecordSerializer(), new DummySchemaContext(), ImmutableList.of()); @@ -366,10 +398,15 @@ private static class SinkInitContext implements Sink.InitContext { private final SinkWriterMetricGroup metricGroup; private final ProcessingTimeService timeService; + @Nullable private final Consumer metadataConsumer; - SinkInitContext(SinkWriterMetricGroup metricGroup, ProcessingTimeService timeService) { + SinkInitContext( + SinkWriterMetricGroup metricGroup, + ProcessingTimeService timeService, + @Nullable Consumer metadataConsumer) { this.metricGroup = metricGroup; this.timeService = timeService; + this.metadataConsumer = metadataConsumer; } @Override @@ -412,6 +449,11 @@ public OptionalLong getRestoredCheckpointId() { asSerializationSchemaInitializationContext() { return null; } + + @Override + public Optional> metadataConsumer() { + return Optional.ofNullable((Consumer) metadataConsumer); + } } private class DummyRecordSerializer implements KafkaRecordSerializationSchema { diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java index 4052a6ce3bdae..c006ba5c12b7c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java @@ -18,6 +18,7 @@ package org.apache.flink.api.connector.sink2; +import org.apache.flink.annotation.Experimental; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.operators.MailboxExecutor; import org.apache.flink.api.common.operators.ProcessingTimeService; @@ -27,7 +28,9 @@ import java.io.IOException; import java.io.Serializable; +import java.util.Optional; import java.util.OptionalLong; +import java.util.function.Consumer; /** * Base interface for developing a sink. A basic {@link Sink} is a stateless sink that can flush @@ -105,5 +108,18 @@ interface InitContext { * Provides a view on this context as a {@link SerializationSchema.InitializationContext}. */ SerializationSchema.InitializationContext asSerializationSchemaInitializationContext(); + + /** + * Returns a metadata consumer, the {@link SinkWriter} can publish metadata events of type + * {@link MetaT} to the consumer. + * + *

It is recommended to use a separate thread pool to publish the metadata because + * enqueuing a lot of these messages in the mailbox may lead to a performance decrease. + * thread, and the {@link Consumer#accept} method is executed very fast. + */ + @Experimental + default Optional> metadataConsumer() { + return Optional.empty(); + } } }