From 9bcd71acf5c72dc681a8d707f50632f6628a6a0f Mon Sep 17 00:00:00 2001 From: davidliu Date: Tue, 26 Jul 2022 12:48:07 +0800 Subject: [PATCH] [FLINK-27536][Connectors / Common] Rename method parameter in AsyncSinkWriter --- .../base/sink/writer/AsyncSinkWriter.java | 41 +++++++++---------- .../base/sink/writer/AsyncSinkWriterTest.java | 34 ++++++++------- 2 files changed, 36 insertions(+), 39 deletions(-) diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java index 090504ab8bc61..faab5889ad0d3 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java @@ -17,15 +17,6 @@ package org.apache.flink.connector.base.sink.writer; -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.api.common.operators.MailboxExecutor; -import org.apache.flink.api.common.operators.ProcessingTimeService; -import org.apache.flink.api.connector.sink2.Sink; -import org.apache.flink.api.connector.sink2.StatefulSink; -import org.apache.flink.metrics.Counter; -import org.apache.flink.metrics.groups.SinkWriterMetricGroup; -import org.apache.flink.util.Preconditions; - import java.io.IOException; import java.io.Serializable; import java.util.ArrayDeque; @@ -36,6 +27,14 @@ import java.util.List; import java.util.ListIterator; import java.util.function.Consumer; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.common.operators.ProcessingTimeService; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.StatefulSink; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.groups.SinkWriterMetricGroup; +import org.apache.flink.util.Preconditions; /** * A generic sink writer that handles the general behaviour of a sink such as batching and flushing, @@ -170,14 +169,14 @@ public abstract class AsyncSinkWriterFrom a threading perspective, the mailbox thread will call this method and initiate the * asynchronous request to persist the {@code requestEntries}. NOTE: The client must support * asynchronous requests and the method called to persist the records must asynchronously * execute and return a future with the results of that request. A thread from the destination * client thread pool should complete the request and submit the failed entries that should be - * retried. The {@code requestResult} will then trigger the mailbox thread to requeue the + * retried. The {@code requestToRetry} will then trigger the mailbox thread to requeue the * unsuccessful elements. * *

An example implementation of this method is included: @@ -185,15 +184,15 @@ public abstract class AsyncSinkWriter{@code * @Override * protected void submitRequestEntries - * (List records, Consumer> requestResult) { + * (List records, Consumer> requestToRetry) { * Future response = destinationClient.putRecords(records); * response.whenComplete( * (response, error) -> { * if(error){ * List retryableFailedRecords = getRetryableFailed(response); - * requestResult.accept(retryableFailedRecords); + * requestToRetry.accept(retryableFailedRecords); * }else{ - * requestResult.accept(Collections.emptyList()); + * requestToRetry.accept(Collections.emptyList()); * } * } * ); @@ -205,14 +204,14 @@ public abstract class AsyncSinkWriter requestEntries, Consumer> requestResult); + List requestEntries, Consumer> requestToRetry); /** * This method allows the getting of the size of a {@code RequestEntryT} in bytes. The size in @@ -381,7 +380,7 @@ private void flush() throws InterruptedException { } long timestampOfRequest = System.currentTimeMillis(); - Consumer> requestResult = + Consumer> requestToRetry = failedRequestEntries -> mailboxExecutor.execute( () -> @@ -394,7 +393,7 @@ private void flush() throws InterruptedException { inFlightRequestsCount++; inFlightMessages += batchSize; - submitRequestEntries(batch, requestResult); + submitRequestEntries(batch, requestToRetry); } /** diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java index c8535bfcef1ca..52f382429537a 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java @@ -17,12 +17,11 @@ package org.apache.flink.connector.base.sink.writer; -import org.apache.flink.api.common.operators.MailboxExecutor; -import org.apache.flink.api.connector.sink2.Sink; -import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; +import static org.apache.flink.connector.base.sink.writer.AsyncSinkWriterTestUtils.assertThatBufferStatesAreEqual; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assertions.fail; import java.io.IOException; import java.util.ArrayList; @@ -37,12 +36,11 @@ import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.stream.Collectors; - -import static org.apache.flink.connector.base.sink.writer.AsyncSinkWriterTestUtils.assertThatBufferStatesAreEqual; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatExceptionOfType; -import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.assertj.core.api.Assertions.fail; +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; /** * Unit Tests the functionality of AsyncSinkWriter without any assumptions of what a concrete @@ -1059,12 +1057,12 @@ public void writeAsNonMailboxThread(String val) throws IOException, InterruptedE *

A limitation of this basic implementation is that each element written must be unique. * * @param requestEntries a set of request entries that should be persisted to {@code res} - * @param requestResult a Consumer that needs to accept a collection of failure elements + * @param requestToRetry a Consumer that needs to accept a collection of failure elements * once all request entries have been persisted */ @Override protected void submitRequestEntries( - List requestEntries, Consumer> requestResult) { + List requestEntries, Consumer> requestToRetry) { maybeDelay(); if (requestEntries.stream().anyMatch(val -> val > 100 && val <= 200)) { @@ -1087,10 +1085,10 @@ protected void submitRequestEntries( requestEntries.removeAll(firstTimeFailed); res.addAll(requestEntries); - requestResult.accept(firstTimeFailed); + requestToRetry.accept(firstTimeFailed); } else { res.addAll(requestEntries); - requestResult.accept(new ArrayList<>()); + requestToRetry.accept(new ArrayList<>()); } } @@ -1239,7 +1237,7 @@ public AsyncSinkReleaseAndBlockWriterImpl( @Override protected void submitRequestEntries( - List requestEntries, Consumer> requestResult) { + List requestEntries, Consumer> requestToRetry) { if (requestEntries.size() == 3) { try { delayedStartLatch.countDown(); @@ -1258,7 +1256,7 @@ protected void submitRequestEntries( } res.addAll(requestEntries); - requestResult.accept(new ArrayList<>()); + requestToRetry.accept(new ArrayList<>()); } } }