diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java index 605c79c041b6f..e114de88bed0b 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java @@ -82,7 +82,7 @@ protected void init() throws Exception { } @Override - protected void performDefaultAction(DefaultActionContext context) throws Exception { + protected void processInput(DefaultActionContext context) throws Exception { if (input.hasNext()) { reuse.replace(input.next()); headOperator.setKeyContextElement1(reuse); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java index 45d1a1a1f2ae7..2ce1e6e9e40c9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java @@ -111,7 +111,7 @@ protected void cleanup() { } @Override - protected void performDefaultAction(DefaultActionContext context) throws Exception { + protected void processInput(DefaultActionContext context) throws Exception { context.suspendDefaultAction(); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java index 4a031afde6db9..68db112cb2023 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java @@ -67,7 +67,7 @@ public StreamIterationHead(Environment env) { // ------------------------------------------------------------------------ @Override - protected void performDefaultAction(DefaultActionContext context) throws Exception { + protected void processInput(DefaultActionContext context) throws Exception { StreamRecord nextRecord = shouldWait ? dataChannel.poll(iterationWaitTime, TimeUnit.MILLISECONDS) : dataChannel.take(); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 617a3673b2faa..827e6ad9069c4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -247,7 +247,7 @@ protected StreamTask( this.accumulatorMap = getEnvironment().getAccumulatorRegistry().getUserMap(); this.recordWriters = createRecordWriters(configuration, environment); this.syncSavepointLatch = new SynchronousSavepointLatch(); - this.mailboxProcessor = new MailboxProcessor(this::performDefaultAction); + this.mailboxProcessor = new MailboxProcessor(this::processInput); this.asyncExceptionHandler = new StreamTaskAsyncExceptionHandler(environment); } @@ -273,7 +273,7 @@ protected void cleanup() throws Exception { * @param context context object for collaborative interaction between the action and the stream task. * @throws Exception on any problems in the action. */ - protected void performDefaultAction(DefaultActionContext context) throws Exception { + protected void processInput(DefaultActionContext context) throws Exception { if (!inputProcessor.processInput()) { if (inputProcessor.isFinished()) { context.allActionsCompleted(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest.java index 3e4f8374b50f9..8caeda7bb00d4 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest.java @@ -194,14 +194,14 @@ private static class TestSelectiveReadingTask extends TwoInputStr } @Override - protected void performDefaultAction(DefaultActionContext context) throws Exception { + protected void processInput(DefaultActionContext context) throws Exception { if (!started) { synchronized (this) { this.wait(); } } - super.performDefaultAction(context); + super.processInput(context); } public void startProcessing() { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java index 4f5ba0300b131..916460edd5bab 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java @@ -226,7 +226,7 @@ protected void init() { } @Override - protected void performDefaultAction(DefaultActionContext context) throws Exception { + protected void processInput(DefaultActionContext context) throws Exception { RUN_LATCH.trigger(); // wait until we have started an asynchronous checkpoint CHECKPOINTING_LATCH.await(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index ca6af8cf19a35..77e1ee2b89f85 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -1174,7 +1174,7 @@ protected void init() throws Exception { } @Override - protected void performDefaultAction(DefaultActionContext context) throws Exception { + protected void processInput(DefaultActionContext context) throws Exception { if (fail) { throw new RuntimeException(); } @@ -1262,7 +1262,7 @@ public CancelLockingTask(Environment env) { protected void init() {} @Override - protected void performDefaultAction(DefaultActionContext context) throws Exception { + protected void processInput(DefaultActionContext context) throws Exception { holder = new LockHolder(getCheckpointLock(), latch); holder.start(); latch.await(); @@ -1307,7 +1307,7 @@ public CancelFailingTask(Environment env) { protected void init() {} @Override - protected void performDefaultAction(DefaultActionContext context) throws Exception { + protected void processInput(DefaultActionContext context) throws Exception { final OneShotLatch latch = new OneShotLatch(); final Object lock = new Object(); @@ -1374,9 +1374,9 @@ public void onProcessingTime(long timestamp) throws Exception { } @Override - protected void performDefaultAction(DefaultActionContext context) throws Exception { + protected void processInput(DefaultActionContext context) throws Exception { syncLatch.await(); - super.performDefaultAction(context); + super.processInput(context); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java index eee3a62c34bcd..6ee1974820e40 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java @@ -138,7 +138,7 @@ public SynchronousCheckpointTestingTask(Environment environment) { } @Override - protected void performDefaultAction(DefaultActionContext context) throws Exception { + protected void processInput(DefaultActionContext context) throws Exception { if (!isRunning) { isRunning = true; eventQueue.put(Event.TASK_IS_RUNNING); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java index 0b36ad7dd0229..8e22758456ab8 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java @@ -172,10 +172,10 @@ private static class StreamTaskUnderTest extends NoOpStreamTask { } @Override - protected void performDefaultAction(DefaultActionContext context) throws Exception { + protected void processInput(DefaultActionContext context) throws Exception { runningLatch.trigger(); execLatch.await(); - super.performDefaultAction(context); + super.processInput(context); } } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java index 7360508f0f3a8..9b74c01e45d74 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java @@ -475,7 +475,7 @@ public TestStreamTask(Environment env) { public void init() {} @Override - protected void performDefaultAction(DefaultActionContext context) throws Exception { + protected void processInput(DefaultActionContext context) throws Exception { triggerCheckpointOnBarrier( new CheckpointMetaData( 11L, diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java index 059f72f8d92ca..de8ecafab0fda 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java @@ -86,7 +86,7 @@ public void init() { } @Override - protected void performDefaultAction(DefaultActionContext context) throws Exception { + protected void processInput(DefaultActionContext context) throws Exception { context.allActionsCompleted(); } diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java index 7dd07db59e6ed..bac15bf84d6af 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java @@ -287,7 +287,7 @@ public ExceptionOnCallbackStreamTask(final Environment environment) { } @Override - protected void performDefaultAction(DefaultActionContext context) throws Exception { + protected void processInput(DefaultActionContext context) throws Exception { final long taskIndex = getEnvironment().getTaskInfo().getIndexOfThisSubtask(); if (taskIndex == 0) { numberOfRestarts.countDown(); @@ -344,7 +344,7 @@ public NoOpBlockingStreamTask(final Environment environment) { } @Override - protected void performDefaultAction(DefaultActionContext context) throws Exception { + protected void processInput(DefaultActionContext context) throws Exception { invokeLatch.countDown(); finishLatch.await(); context.allActionsCompleted();