Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[hotfix][runtime] Rename StreamTask's performDefaultAction method to processInput #9635

Merged
merged 1 commit into from Sep 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -82,7 +82,7 @@ protected void init() throws Exception {
}

@Override
protected void performDefaultAction(DefaultActionContext context) throws Exception {
protected void processInput(DefaultActionContext context) throws Exception {
if (input.hasNext()) {
reuse.replace(input.next());
headOperator.setKeyContextElement1(reuse);
Expand Down
Expand Up @@ -111,7 +111,7 @@ protected void cleanup() {
}

@Override
protected void performDefaultAction(DefaultActionContext context) throws Exception {
protected void processInput(DefaultActionContext context) throws Exception {

context.suspendDefaultAction();

Expand Down
Expand Up @@ -67,7 +67,7 @@ public StreamIterationHead(Environment env) {
// ------------------------------------------------------------------------

@Override
protected void performDefaultAction(DefaultActionContext context) throws Exception {
protected void processInput(DefaultActionContext context) throws Exception {
StreamRecord<OUT> nextRecord = shouldWait ?
dataChannel.poll(iterationWaitTime, TimeUnit.MILLISECONDS) :
dataChannel.take();
Expand Down
Expand Up @@ -247,7 +247,7 @@ protected StreamTask(
this.accumulatorMap = getEnvironment().getAccumulatorRegistry().getUserMap();
this.recordWriters = createRecordWriters(configuration, environment);
this.syncSavepointLatch = new SynchronousSavepointLatch();
this.mailboxProcessor = new MailboxProcessor(this::performDefaultAction);
this.mailboxProcessor = new MailboxProcessor(this::processInput);
this.asyncExceptionHandler = new StreamTaskAsyncExceptionHandler(environment);
}

Expand All @@ -273,7 +273,7 @@ protected void cleanup() throws Exception {
* @param context context object for collaborative interaction between the action and the stream task.
* @throws Exception on any problems in the action.
*/
protected void performDefaultAction(DefaultActionContext context) throws Exception {
protected void processInput(DefaultActionContext context) throws Exception {
if (!inputProcessor.processInput()) {
if (inputProcessor.isFinished()) {
context.allActionsCompleted();
Expand Down
Expand Up @@ -194,14 +194,14 @@ private static class TestSelectiveReadingTask<IN1, IN2, OUT> extends TwoInputStr
}

@Override
protected void performDefaultAction(DefaultActionContext context) throws Exception {
protected void processInput(DefaultActionContext context) throws Exception {
if (!started) {
synchronized (this) {
this.wait();
}
}

super.performDefaultAction(context);
super.processInput(context);
}

public void startProcessing() {
Expand Down
Expand Up @@ -226,7 +226,7 @@ protected void init() {
}

@Override
protected void performDefaultAction(DefaultActionContext context) throws Exception {
protected void processInput(DefaultActionContext context) throws Exception {
RUN_LATCH.trigger();
// wait until we have started an asynchronous checkpoint
CHECKPOINTING_LATCH.await();
Expand Down
Expand Up @@ -1174,7 +1174,7 @@ protected void init() throws Exception {
}

@Override
protected void performDefaultAction(DefaultActionContext context) throws Exception {
protected void processInput(DefaultActionContext context) throws Exception {
if (fail) {
throw new RuntimeException();
}
Expand Down Expand Up @@ -1262,7 +1262,7 @@ public CancelLockingTask(Environment env) {
protected void init() {}

@Override
protected void performDefaultAction(DefaultActionContext context) throws Exception {
protected void processInput(DefaultActionContext context) throws Exception {
holder = new LockHolder(getCheckpointLock(), latch);
holder.start();
latch.await();
Expand Down Expand Up @@ -1307,7 +1307,7 @@ public CancelFailingTask(Environment env) {
protected void init() {}

@Override
protected void performDefaultAction(DefaultActionContext context) throws Exception {
protected void processInput(DefaultActionContext context) throws Exception {
final OneShotLatch latch = new OneShotLatch();
final Object lock = new Object();

Expand Down Expand Up @@ -1374,9 +1374,9 @@ public void onProcessingTime(long timestamp) throws Exception {
}

@Override
protected void performDefaultAction(DefaultActionContext context) throws Exception {
protected void processInput(DefaultActionContext context) throws Exception {
syncLatch.await();
super.performDefaultAction(context);
super.processInput(context);
}
}

Expand Down
Expand Up @@ -138,7 +138,7 @@ public SynchronousCheckpointTestingTask(Environment environment) {
}

@Override
protected void performDefaultAction(DefaultActionContext context) throws Exception {
protected void processInput(DefaultActionContext context) throws Exception {
if (!isRunning) {
isRunning = true;
eventQueue.put(Event.TASK_IS_RUNNING);
Expand Down
Expand Up @@ -172,10 +172,10 @@ private static class StreamTaskUnderTest extends NoOpStreamTask {
}

@Override
protected void performDefaultAction(DefaultActionContext context) throws Exception {
protected void processInput(DefaultActionContext context) throws Exception {
runningLatch.trigger();
execLatch.await();
super.performDefaultAction(context);
super.processInput(context);
}
}
}
Expand Up @@ -475,7 +475,7 @@ public TestStreamTask(Environment env) {
public void init() {}

@Override
protected void performDefaultAction(DefaultActionContext context) throws Exception {
protected void processInput(DefaultActionContext context) throws Exception {
triggerCheckpointOnBarrier(
new CheckpointMetaData(
11L,
Expand Down
Expand Up @@ -86,7 +86,7 @@ public void init() {
}

@Override
protected void performDefaultAction(DefaultActionContext context) throws Exception {
protected void processInput(DefaultActionContext context) throws Exception {
context.allActionsCompleted();
}

Expand Down
Expand Up @@ -287,7 +287,7 @@ public ExceptionOnCallbackStreamTask(final Environment environment) {
}

@Override
protected void performDefaultAction(DefaultActionContext context) throws Exception {
protected void processInput(DefaultActionContext context) throws Exception {
final long taskIndex = getEnvironment().getTaskInfo().getIndexOfThisSubtask();
if (taskIndex == 0) {
numberOfRestarts.countDown();
Expand Down Expand Up @@ -344,7 +344,7 @@ public NoOpBlockingStreamTask(final Environment environment) {
}

@Override
protected void performDefaultAction(DefaultActionContext context) throws Exception {
protected void processInput(DefaultActionContext context) throws Exception {
invokeLatch.countDown();
finishLatch.await();
context.allActionsCompleted();
Expand Down