Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35047][state] Shutdown StateExecutors when ForStKeyedStateBackend is closed #24768

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public class AsyncExecutionController<K> implements StateRequestHandler {
private final StateFutureFactory<K> stateFutureFactory;

/** The state executor where the {@link StateRequest} is actually executed. */
final StateExecutor stateExecutor;
private final StateExecutor stateExecutor;

/** The corresponding context that currently runs in task thread. */
RecordContext<K> currentContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@

import java.util.concurrent.CompletableFuture;

/** Executor for executing batch {@link StateRequest}s. */
/**
* Executor for executing batch {@link StateRequest}s.
*
* <p>Notice that the owner who create the {@code StateExecutor} is responsible for shutting down it
* when it is no longer in use.
*/
@Internal
public interface StateExecutor {
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ public interface AsyncKeyedStateBackend extends Disposable, Closeable {
* Creates a {@code StateExecutor} which supports to execute a batch of state requests
* asynchronously.
*
* <p>Notice that the {@code AsyncKeyedStateBackend} is responsible for shutting down the
* StateExecutors created by itself when they are no longer in use.
*
* @return a {@code StateExecutor} which supports to execute a batch of state requests
* asynchronously.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.api.common.state.v2.State;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.state.StateFutureImpl.AsyncFrameworkExceptionHandler;
import org.apache.flink.core.state.StateFutureUtils;
import org.apache.flink.runtime.asyncprocessing.EpochManager.Epoch;
Expand All @@ -39,6 +40,7 @@

import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
Expand Down Expand Up @@ -73,7 +75,9 @@ void setup(
long timeout,
int maxInFlight,
MailboxExecutor mailboxExecutor,
AsyncFrameworkExceptionHandler exceptionHandler) {
AsyncFrameworkExceptionHandler exceptionHandler,
CloseableRegistry closeableRegistry)
throws IOException {
StateExecutor stateExecutor = new TestStateExecutor();
ValueStateDescriptor<Integer> stateDescriptor =
new ValueStateDescriptor<>("test-value-state", BasicTypeInfo.INT_TYPE_INFO);
Expand All @@ -88,6 +92,8 @@ void setup(
} catch (Exception e) {
throw new RuntimeException(e);
}
closeableRegistry.registerCloseable(asyncKeyedStateBackend);
closeableRegistry.registerCloseable(asyncKeyedStateBackend::dispose);
aec =
new AsyncExecutionController<>(
mailboxExecutor,
Expand All @@ -109,13 +115,15 @@ void setup(
}

@Test
void testBasicRun() {
void testBasicRun() throws IOException {
CloseableRegistry resourceRegistry = new CloseableRegistry();
setup(
100,
10000L,
1000,
new SyncMailboxExecutor(),
new TestAsyncFrameworkExceptionHandler());
new TestAsyncFrameworkExceptionHandler(),
resourceRegistry);
// ============================ element1 ============================
String record1 = "key1-r1";
String key1 = "key1";
Expand Down Expand Up @@ -220,16 +228,20 @@ void testBasicRun() {
assertThat(aec.inFlightRecordNum.get()).isEqualTo(0);
assertThat(output.get()).isEqualTo(1);
assertThat(recordContext4.getReferenceCount()).isEqualTo(0);

resourceRegistry.close();
}

@Test
void testRecordsRunInOrder() {
void testRecordsRunInOrder() throws IOException {
CloseableRegistry resourceRegistry = new CloseableRegistry();
setup(
100,
10000L,
1000,
new SyncMailboxExecutor(),
new TestAsyncFrameworkExceptionHandler());
new TestAsyncFrameworkExceptionHandler(),
resourceRegistry);
// Record1 and record3 have the same key, record2 has a different key.
// Record2 should be processed before record3.

Expand Down Expand Up @@ -284,18 +296,22 @@ void testRecordsRunInOrder() {
assertThat(output.get()).isEqualTo(2);
assertThat(recordContext3.getReferenceCount()).isEqualTo(0);
assertThat(aec.inFlightRecordNum.get()).isEqualTo(0);

resourceRegistry.close();
}

@Test
void testInFlightRecordControl() {
void testInFlightRecordControl() throws IOException {
int batchSize = 5;
int maxInFlight = 10;
CloseableRegistry resourceRegistry = new CloseableRegistry();
setup(
batchSize,
10000L,
maxInFlight,
new SyncMailboxExecutor(),
new TestAsyncFrameworkExceptionHandler());
new TestAsyncFrameworkExceptionHandler(),
resourceRegistry);
// For records with different keys, the in-flight records is controlled by batch size.
for (int round = 0; round < 10; round++) {
for (int i = 0; i < batchSize; i++) {
Expand Down Expand Up @@ -334,16 +350,20 @@ void testInFlightRecordControl() {
assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
assertThat(aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(maxInFlight);
}

resourceRegistry.close();
}

@Test
public void testSyncPoint() {
public void testSyncPoint() throws IOException {
CloseableRegistry resourceRegistry = new CloseableRegistry();
setup(
1000,
10000L,
6000,
new SyncMailboxExecutor(),
new TestAsyncFrameworkExceptionHandler());
new TestAsyncFrameworkExceptionHandler(),
resourceRegistry);
AtomicInteger counter = new AtomicInteger(0);

// Test the sync point processing without a key occupied.
Expand Down Expand Up @@ -384,18 +404,22 @@ public void testSyncPoint() {
assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0);
assertThat(aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(0);
recordContext2.release();

resourceRegistry.close();
}

@Test
void testBufferTimeout() {
void testBufferTimeout() throws IOException {
int batchSize = 5;
int timeout = 1000;
CloseableRegistry resourceRegistry = new CloseableRegistry();
setup(
batchSize,
timeout,
1000,
new SyncMailboxExecutor(),
new TestAsyncFrameworkExceptionHandler());
new TestAsyncFrameworkExceptionHandler(),
resourceRegistry);
ManuallyTriggeredScheduledExecutorService scheduledExecutor =
new ManuallyTriggeredScheduledExecutorService();
aec.stateRequestsBuffer.scheduledExecutor = scheduledExecutor;
Expand Down Expand Up @@ -457,18 +481,22 @@ void testBufferTimeout() {
assertThat(scheduledFuture.isDone()).isTrue();
assertThat(aec.stateRequestsBuffer.currentSeq.get()).isEqualTo(2);
assertThat(aec.stateRequestsBuffer.scheduledSeq.get()).isEqualTo(1);

resourceRegistry.close();
}

@Test
void testBufferTimeoutSkip() {
void testBufferTimeoutSkip() throws IOException {
int batchSize = 3;
int timeout = 1000;
CloseableRegistry resourceRegistry = new CloseableRegistry();
setup(
batchSize,
timeout,
1000,
new SyncMailboxExecutor(),
new TestAsyncFrameworkExceptionHandler());
new TestAsyncFrameworkExceptionHandler(),
resourceRegistry);
ManuallyTriggeredScheduledExecutorService scheduledExecutor =
new ManuallyTriggeredScheduledExecutorService();
aec.stateRequestsBuffer.scheduledExecutor = scheduledExecutor;
Expand Down Expand Up @@ -526,14 +554,17 @@ void testBufferTimeoutSkip() {
assertThat(aec.stateRequestsBuffer.currentSeq.get()).isEqualTo(2);
assertThat(aec.stateRequestsBuffer.scheduledSeq.get()).isEqualTo(1);
assertThat(aec.stateRequestsBuffer.currentScheduledFuture.isDone()).isTrue();

resourceRegistry.close();
}

@Test
void testUserCodeException() {
void testUserCodeException() throws IOException {
TestAsyncFrameworkExceptionHandler exceptionHandler =
new TestAsyncFrameworkExceptionHandler();
TestMailboxExecutor testMailboxExecutor = new TestMailboxExecutor(false);
setup(1000, 10000, 6000, testMailboxExecutor, exceptionHandler);
CloseableRegistry resourceRegistry = new CloseableRegistry();
setup(1000, 10000, 6000, testMailboxExecutor, exceptionHandler, resourceRegistry);
Runnable userCode =
() -> {
valueState
Expand Down Expand Up @@ -562,14 +593,17 @@ void testUserCodeException() {
.isEqualTo("Artificial exception in user code");
assertThat(exceptionHandler.exception).isNull();
assertThat(exceptionHandler.message).isNull();

resourceRegistry.close();
}

@Test
void testFrameworkException() {
void testFrameworkException() throws IOException {
TestAsyncFrameworkExceptionHandler exceptionHandler =
new TestAsyncFrameworkExceptionHandler();
TestMailboxExecutor testMailboxExecutor = new TestMailboxExecutor(true);
setup(1000, 10000, 6000, testMailboxExecutor, exceptionHandler);
CloseableRegistry resourceRegistry = new CloseableRegistry();
setup(1000, 10000, 6000, testMailboxExecutor, exceptionHandler, resourceRegistry);
Runnable userCode = () -> valueState.asyncValue().thenAccept(val -> {});
String record = "record";
String key = "key";
Expand All @@ -588,16 +622,20 @@ void testFrameworkException() {
.isEqualTo("java.lang.RuntimeException: Fail to execute.");
assertThat(exceptionHandler.message)
.isEqualTo("Caught exception when submitting StateFuture's callback.");

resourceRegistry.close();
}

@Test
void testEpochManager() {
void testEpochManager() throws Exception {
CloseableRegistry resourceRegistry = new CloseableRegistry();
setup(
1000,
10000,
6000,
new SyncMailboxExecutor(),
new TestAsyncFrameworkExceptionHandler());
new TestAsyncFrameworkExceptionHandler(),
resourceRegistry);
AtomicInteger output = new AtomicInteger(0);
Runnable userCode = () -> valueState.asyncValue().thenAccept(v -> output.incrementAndGet());

Expand All @@ -622,10 +660,13 @@ void testEpochManager() {
assertThat(output.get()).isEqualTo(3);
// SERIAL_BETWEEN_EPOCH mode would drain in-flight records on non-record arriving.
assertThat(epoch1.ongoingRecordCount).isEqualTo(0);

resourceRegistry.close();
}

@Test
void testMixEpochMode() {
void testMixEpochMode() throws Exception {
CloseableRegistry resourceRegistry = new CloseableRegistry();
// epoch1(parallel mode) -> epoch2(parallel mode) -> epoch3(serial mode),
// when epoch2 close, epoch1 is still in-flight.
// when epoch3 close, all in-flight records should drain, epoch1 and epoch2 should finish.
Expand All @@ -634,7 +675,8 @@ void testMixEpochMode() {
10000,
6000,
new SyncMailboxExecutor(),
new TestAsyncFrameworkExceptionHandler());
new TestAsyncFrameworkExceptionHandler(),
resourceRegistry);
AtomicInteger output = new AtomicInteger(0);
Runnable userCode = () -> valueState.asyncValue().thenAccept(v -> output.incrementAndGet());

Expand Down Expand Up @@ -678,6 +720,8 @@ void testMixEpochMode() {
assertThat(epoch2.ongoingRecordCount).isEqualTo(0);
assertThat(epoch3.ongoingRecordCount).isEqualTo(0);
assertThat(output.get()).isEqualTo(6);

resourceRegistry.close();
}

/** Simulate the underlying state that is actually used to execute the request. */
Expand Down