Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Collection;
Expand Down Expand Up @@ -135,7 +137,10 @@ class KafkaWriter<IN>
this.kafkaProducerConfig = checkNotNull(kafkaProducerConfig, "kafkaProducerConfig");
this.transactionalIdPrefix = checkNotNull(transactionalIdPrefix, "transactionalIdPrefix");
this.recordSerializer = checkNotNull(recordSerializer, "recordSerializer");
this.deliveryCallback = new WriterCallback(sinkInitContext.getMailboxExecutor());
this.deliveryCallback =
new WriterCallback(
sinkInitContext.getMailboxExecutor(),
sinkInitContext.<RecordMetadata>metadataConsumer().orElse(null));
this.disabledMetrics =
kafkaProducerConfig.containsKey(KEY_DISABLE_METRICS)
&& Boolean.parseBoolean(
Expand Down Expand Up @@ -389,9 +394,13 @@ private void registerMetricSync() {

private class WriterCallback implements Callback {
private final MailboxExecutor mailboxExecutor;
@Nullable private final Consumer<RecordMetadata> metadataConsumer;

public WriterCallback(MailboxExecutor mailboxExecutor) {
public WriterCallback(
MailboxExecutor mailboxExecutor,
@Nullable Consumer<RecordMetadata> metadataConsumer) {
this.mailboxExecutor = mailboxExecutor;
this.metadataConsumer = metadataConsumer;
}

@Override
Expand All @@ -403,6 +412,10 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
() -> throwException(metadata, exception, producer),
"Failed to send data to Kafka");
}

if (metadataConsumer != null) {
metadataConsumer.accept(metadata);
}
}

private void throwException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
Expand All @@ -54,8 +55,11 @@
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;

import javax.annotation.Nullable;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
Expand All @@ -64,6 +68,7 @@
import java.util.PriorityQueue;
import java.util.Properties;
import java.util.concurrent.ScheduledFuture;
import java.util.function.Consumer;
import java.util.stream.IntStream;

import static org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaContainer;
Expand Down Expand Up @@ -180,6 +185,25 @@ public void testCurrentSendTimeMetric() throws Exception {
}
}

@Test
public void testMetadataPublisher() throws Exception {
List<String> metadataList = new ArrayList<>();
try (final KafkaWriter<Integer> writer =
createWriterWithConfiguration(
getKafkaClientConfiguration(),
DeliveryGuarantee.AT_LEAST_ONCE,
InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()),
meta -> metadataList.add(meta.toString()))) {
List<String> expected = new ArrayList<>();
for (int i = 0; i < 100; i++) {
writer.write(1, SINK_WRITER_CONTEXT);
expected.add("testMetadataPublisher-0@" + i);
}
writer.prepareCommit();
org.assertj.core.api.Assertions.assertThat(metadataList).isEqualTo(expected);
}
}

/** Test that producer is not accidentally recreated or pool is used. */
@Test
void testLingeringTransaction() throws Exception {
Expand Down Expand Up @@ -341,11 +365,19 @@ private KafkaWriter<Integer> createWriterWithConfiguration(
Properties config,
DeliveryGuarantee guarantee,
SinkWriterMetricGroup sinkWriterMetricGroup) {
return createWriterWithConfiguration(config, guarantee, sinkWriterMetricGroup, null);
}

private KafkaWriter<Integer> createWriterWithConfiguration(
Properties config,
DeliveryGuarantee guarantee,
SinkWriterMetricGroup sinkWriterMetricGroup,
@Nullable Consumer<RecordMetadata> metadataConsumer) {
return new KafkaWriter<>(
guarantee,
config,
"test-prefix",
new SinkInitContext(sinkWriterMetricGroup, timeService),
new SinkInitContext(sinkWriterMetricGroup, timeService, metadataConsumer),
new DummyRecordSerializer(),
new DummySchemaContext(),
ImmutableList.of());
Expand All @@ -366,10 +398,15 @@ private static class SinkInitContext implements Sink.InitContext {

private final SinkWriterMetricGroup metricGroup;
private final ProcessingTimeService timeService;
@Nullable private final Consumer<RecordMetadata> metadataConsumer;

SinkInitContext(SinkWriterMetricGroup metricGroup, ProcessingTimeService timeService) {
SinkInitContext(
SinkWriterMetricGroup metricGroup,
ProcessingTimeService timeService,
@Nullable Consumer<RecordMetadata> metadataConsumer) {
this.metricGroup = metricGroup;
this.timeService = timeService;
this.metadataConsumer = metadataConsumer;
}

@Override
Expand Down Expand Up @@ -412,6 +449,11 @@ public OptionalLong getRestoredCheckpointId() {
asSerializationSchemaInitializationContext() {
return null;
}

@Override
public <MetaT> Optional<Consumer<MetaT>> metadataConsumer() {
return Optional.ofNullable((Consumer<MetaT>) metadataConsumer);
}
}

private class DummyRecordSerializer implements KafkaRecordSerializationSchema<Integer> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.api.connector.sink2;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.operators.ProcessingTimeService;
Expand All @@ -27,7 +28,9 @@

import java.io.IOException;
import java.io.Serializable;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.function.Consumer;

/**
* Base interface for developing a sink. A basic {@link Sink} is a stateless sink that can flush
Expand Down Expand Up @@ -105,5 +108,18 @@ interface InitContext {
* Provides a view on this context as a {@link SerializationSchema.InitializationContext}.
*/
SerializationSchema.InitializationContext asSerializationSchemaInitializationContext();

/**
* Returns a metadata consumer, the {@link SinkWriter} can publish metadata events of type
* {@link MetaT} to the consumer.
*
* <p>It is recommended to use a separate thread pool to publish the metadata because
* enqueuing a lot of these messages in the mailbox may lead to a performance decrease.
* thread, and the {@link Consumer#accept} method is executed very fast.
*/
@Experimental
default <MetaT> Optional<Consumer<MetaT>> metadataConsumer() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd mark this @Experimental to leave some room to change the threading model later if necessary.

return Optional.empty();
}
}
}