Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,6 @@

package org.apache.flink.connector.base.sink.writer;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.StatefulSink;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.util.Preconditions;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayDeque;
Expand All @@ -36,6 +27,14 @@
import java.util.List;
import java.util.ListIterator;
import java.util.function.Consumer;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.StatefulSink;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.util.Preconditions;

/**
* A generic sink writer that handles the general behaviour of a sink such as batching and flushing,
Expand Down Expand Up @@ -170,30 +169,30 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
* the valid limits of the destination). The logic then needs to create and execute the request
* asynchronously against the destination (ideally by batching together multiple request entries
* to increase efficiency). The logic also needs to identify individual request entries that
* were not persisted successfully and resubmit them using the {@code requestResult} callback.
* were not persisted successfully and resubmit them using the {@code requestToRetry} callback.
*
* <p>From a threading perspective, the mailbox thread will call this method and initiate the
* asynchronous request to persist the {@code requestEntries}. NOTE: The client must support
* asynchronous requests and the method called to persist the records must asynchronously
* execute and return a future with the results of that request. A thread from the destination
* client thread pool should complete the request and submit the failed entries that should be
* retried. The {@code requestResult} will then trigger the mailbox thread to requeue the
* retried. The {@code requestToRetry} will then trigger the mailbox thread to requeue the
* unsuccessful elements.
*
* <p>An example implementation of this method is included:
*
* <pre>{@code
* @Override
* protected void submitRequestEntries
* (List<RequestEntryT> records, Consumer<Collection<RequestEntryT>> requestResult) {
* (List<RequestEntryT> records, Consumer<Collection<RequestEntryT>> requestToRetry) {
* Future<Response> response = destinationClient.putRecords(records);
* response.whenComplete(
* (response, error) -> {
* if(error){
* List<RequestEntryT> retryableFailedRecords = getRetryableFailed(response);
* requestResult.accept(retryableFailedRecords);
* requestToRetry.accept(retryableFailedRecords);
* }else{
* requestResult.accept(Collections.emptyList());
* requestToRetry.accept(Collections.emptyList());
* }
* }
* );
Expand All @@ -205,14 +204,14 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
* requests.
*
* @param requestEntries a set of request entries that should be sent to the destination
* @param requestResult the {@code accept} method should be called on this Consumer once the
* @param requestToRetry the {@code accept} method should be called on this Consumer once the
* processing of the {@code requestEntries} are complete. Any entries that encountered
* difficulties in persisting should be re-queued through {@code requestResult} by including
* that element in the collection of {@code RequestEntryT}s passed to the {@code accept}
* method. All other elements are assumed to have been successfully persisted.
* difficulties in persisting should be re-queued through {@code requestToRetry} by
* including that element in the collection of {@code RequestEntryT}s passed to the {@code
* accept} method. All other elements are assumed to have been successfully persisted.
*/
protected abstract void submitRequestEntries(
List<RequestEntryT> requestEntries, Consumer<List<RequestEntryT>> requestResult);
List<RequestEntryT> requestEntries, Consumer<List<RequestEntryT>> requestToRetry);

/**
* This method allows the getting of the size of a {@code RequestEntryT} in bytes. The size in
Expand Down Expand Up @@ -381,7 +380,7 @@ private void flush() throws InterruptedException {
}

long timestampOfRequest = System.currentTimeMillis();
Consumer<List<RequestEntryT>> requestResult =
Consumer<List<RequestEntryT>> requestToRetry =
failedRequestEntries ->
mailboxExecutor.execute(
() ->
Expand All @@ -394,7 +393,7 @@ private void flush() throws InterruptedException {

inFlightRequestsCount++;
inFlightMessages += batchSize;
submitRequestEntries(batch, requestResult);
submitRequestEntries(batch, requestToRetry);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@

package org.apache.flink.connector.base.sink.writer;

import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.apache.flink.connector.base.sink.writer.AsyncSinkWriterTestUtils.assertThatBufferStatesAreEqual;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.fail;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -37,12 +36,11 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import static org.apache.flink.connector.base.sink.writer.AsyncSinkWriterTestUtils.assertThatBufferStatesAreEqual;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.fail;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/**
* Unit Tests the functionality of AsyncSinkWriter without any assumptions of what a concrete
Expand Down Expand Up @@ -1059,12 +1057,12 @@ public void writeAsNonMailboxThread(String val) throws IOException, InterruptedE
* <p>A limitation of this basic implementation is that each element written must be unique.
*
* @param requestEntries a set of request entries that should be persisted to {@code res}
* @param requestResult a Consumer that needs to accept a collection of failure elements
* @param requestToRetry a Consumer that needs to accept a collection of failure elements
* once all request entries have been persisted
*/
@Override
protected void submitRequestEntries(
List<Integer> requestEntries, Consumer<List<Integer>> requestResult) {
List<Integer> requestEntries, Consumer<List<Integer>> requestToRetry) {
maybeDelay();

if (requestEntries.stream().anyMatch(val -> val > 100 && val <= 200)) {
Expand All @@ -1087,10 +1085,10 @@ protected void submitRequestEntries(

requestEntries.removeAll(firstTimeFailed);
res.addAll(requestEntries);
requestResult.accept(firstTimeFailed);
requestToRetry.accept(firstTimeFailed);
} else {
res.addAll(requestEntries);
requestResult.accept(new ArrayList<>());
requestToRetry.accept(new ArrayList<>());
}
}

Expand Down Expand Up @@ -1239,7 +1237,7 @@ public AsyncSinkReleaseAndBlockWriterImpl(

@Override
protected void submitRequestEntries(
List<Integer> requestEntries, Consumer<List<Integer>> requestResult) {
List<Integer> requestEntries, Consumer<List<Integer>> requestToRetry) {
if (requestEntries.size() == 3) {
try {
delayedStartLatch.countDown();
Expand All @@ -1258,7 +1256,7 @@ protected void submitRequestEntries(
}

res.addAll(requestEntries);
requestResult.accept(new ArrayList<>());
requestToRetry.accept(new ArrayList<>());
}
}
}