Skip to content

Conversation

@kezhuw
Copy link
Member

@kezhuw kezhuw commented Nov 19, 2020

What is the purpose of the change

Fix unpredictable Thread.getState in StreamTaskTestHarness.waitForInputProcessing due to concurrent class loading

Brief change log

Query whether all input has been processed using MailboxProcessor.isDefaultActionUnavailable through MailboxExecutor.execute.

Verifying this change

This change is already covered by existing tests:

  • TwoInputStreamTaskTest.testWatermarkMetrics and other tests depending on StreamTaskTestHarness.waitForInputProcessing.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)

@flinkbot
Copy link
Collaborator

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit b3576e2 (Thu Nov 19 14:35:22 UTC 2020)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.

Details
The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@kezhuw
Copy link
Member Author

kezhuw commented Nov 19, 2020

@flinkbot attention @AHeise

@flinkbot
Copy link
Collaborator

flinkbot commented Nov 19, 2020

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build
  • @flinkbot run azure re-run the last Azure build

try {
final CountDownLatch latch = new CountDownLatch(1);
mailboxExecutor.execute(() -> {
allInputProcessed.set(mailboxProcessor.isDefaultActionUnavailable());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think isDefaultActionUnavailable() is not the best choice here because suspension is a temporary state; some input may come after this check.

What about using mailboxProcessor.isMailboxLoopRunning() instead?
It is updated on InputStatus.END_OF_INPUT which seems exactly what is needed here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may made wrong name for allInputProcessed, it should express all current available input has been processed, not end of input. What StreamTaskTestHarness.waitForInputProcessing does is waiting current available input processed, so that following up testing code could do post-process assertion.

latch.countDown();
}, "query-whether-processInput-has-suspend-itself");
// Mail could be dropped due to task exception, so we do timed-await here.
latch.await(1, TimeUnit.SECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this await, is the sleep below still necessary?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This await has two purposes here:

  1. Wait until post mail has been processed, so we can query allInputProcessed safely.
  2. If post mail has been dropped due to task exception, break out indefinite wait.

It does not serve as sleeping to yield control to mailbox thread. Without sleep, testing thread and mailbox thread may do ping-pong game between process-one-element and execute-one-mail.

I tend to keep it, it does not affect correctness at least.

@kezhuw
Copy link
Member Author

kezhuw commented Nov 20, 2020

Hi @AHeise, I try to list existing alternatives below:

  1. Migrate dependent tests to use StreamTaskMailboxTestHarness.
  2. Introduce TaskMailbox.isMailboxThreadBlocked to let testing thread query status of mailbox thread concurrently.
  3. Use MailboxExecutor to query whether all input has been processed.
    a. Use StreamTask.getInputOutputJointFuture(InputStatus.NOTHING_AVAILABLE).isDone()
    b. Use MailboxProcessor.isDefaultActionUnavailable().

First, comparing to MailboxExecutor, TaskMailbox.isMailboxThreadBlocked is intrusive and undermine the encapsulation mailbox modeling trying to provide. I think we have converged to avoid this approach.

Second, migration may require big hard work since there are almost 53 dependent tests as you counted. Personally, I think it would be nice if we can solve unstable StreamTaskTestHarness.waitForInputProcessing with few changes before migration. But it is totally up to you and/or other committers to decide whether it is worthwhile or not.

Third, I think 3(a) or similar may be what you suggest in jira. Togather with all-queues-empty while-looping, 3(a) and 3(b) should have same effect. I notice that there are some optimizations in UnionInputGate which cause UnionInputGate.getAvailableFuture().isDone() returns false while there are cached data. If we drop all-queues-empty while-looping, 3(a) will fail due to above optimizations. I am kind of preferring MailboxProcessor.isDefaultActionUnavailable(), since it is resistance to these optimizations.

@AHeise
Copy link
Contributor

AHeise commented Nov 20, 2020

Hi @AHeise, I try to list existing alternatives below:

1. Migrate dependent tests to use `StreamTaskMailboxTestHarness`.

2. Introduce `TaskMailbox.isMailboxThreadBlocked` to let testing thread query status of mailbox thread concurrently.

3. Use `MailboxExecutor` to query whether all input has been processed.
   a. Use `StreamTask.getInputOutputJointFuture(InputStatus.NOTHING_AVAILABLE).isDone()`
   b. Use `MailboxProcessor.isDefaultActionUnavailable()`.

First, comparing to MailboxExecutor, TaskMailbox.isMailboxThreadBlocked is intrusive and undermine the encapsulation mailbox modeling trying to provide. I think we have converged to avoid this approach.

Second, migration may require big hard work since there are almost 53 dependent tests as you counted. Personally, I think it would be nice if we can solve unstable StreamTaskTestHarness.waitForInputProcessing with few changes before migration. But it is totally up to you and/or other committers to decide whether it is worthwhile or not.

Third, I think 3(a) or similar may be what you suggest in jira. Togather with all-queues-empty while-looping, 3(a) and 3(b) should have same effect. I notice that there are some optimizations in UnionInputGate which cause UnionInputGate.getAvailableFuture().isDone() returns false while there are cached data. If we drop all-queues-empty while-looping, 3(a) will fail due to above optimizations. I am kind of preferring MailboxProcessor.isDefaultActionUnavailable(), since it is resistance to these optimizations.

Thank you very much for your deep investigation. I asked Roman to assess the solution and the alternatives as he is much more adept on threading issues than me.

In theory, I'd go with the first approach, but I understand that this is hardly feasible. So I like your current fix in most regards (details may or may not be improved).

One more idea, couldn't we also inject END_OF_INPUT StreamStatus at the beginning of allInputProcessed? I was hoping that the thread would then eventually terminate itself and we could simply join.

@kezhuw
Copy link
Member Author

kezhuw commented Nov 21, 2020

@AHeise I am getting confused. We probably have essential divergences on what StreamTaskTestHarness.waitForInputProcessing should do. From my understanding, it should wait until all currently available input has been processed not end of stream. It is waiting for an intermediate status, and could occur several times for single test harness, say three times in TwoInputStreamTaskTest.testWatermarkMetrics. What you try to propose here should have covered by combination of StreamTaskTestHarness.endInput and StreamTaskTestHarness.waitForTaskCompletion. That combination wait for task termination which is a terminated status, and should occur at most once for single test harness.

@rkhachatryan gave similar suggestion in previous review cycle, I think we probably should align on what StreamTaskTestHarness.waitForInputProcessing should do.

@AHeise
Copy link
Contributor

AHeise commented Nov 23, 2020

@AHeise I am getting confused. We probably have essential divergences on what StreamTaskTestHarness.waitForInputProcessing should do. From my understanding, it should wait until all currently available input has been processed not end of stream. It is waiting for an intermediate status, and could occur several times for single test harness, say three times in TwoInputStreamTaskTest.testWatermarkMetrics. What you try to propose here should have covered by combination of StreamTaskTestHarness.endInput and StreamTaskTestHarness.waitForTaskCompletion. That combination wait for task termination which is a terminated status, and should occur at most once for single test harness.

@rkhachatryan gave similar suggestion in previous review cycle, I think we probably should align on what StreamTaskTestHarness.waitForInputProcessing should do.

You are completely right. It's just very difficult to realize the original implementation without some kind of hacks and assumptions - any hotfix is prone to fail with a slight change again. I'd probably go your way but also start migrating the tests - the harness has been deprecated for a reason.

Copy link
Contributor

@rkhachatryan rkhachatryan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my side, all the concerns were resolved.
Thanks for the fix!

@AHeise
Copy link
Contributor

AHeise commented Nov 23, 2020

Thanks you very much for the contribution. Merging.

@AHeise AHeise merged commit eb500b8 into apache:master Nov 23, 2020
@kezhuw
Copy link
Member Author

kezhuw commented Nov 25, 2020

I think StreamTaskTestHarness.waitForInputProcessing play similar role as StreamTaskMailboxTestHarness.processAll. Current fix and StreamTaskMailboxTestHarness.processAll share one assumption: default action will and only will be suspended after all available input has been processed. The main functionality difference between these two methods is that the latter guarantee mailbox is drained before returning. MailboxExecutorImpl.isIdle or similar could fill the hole if StreamTaskTestHarness needs that guarantee unfortunately.

In my opinion, the key difference between StreamTaskTestHarness and StreamTaskMailboxTestHarness is threading model. With help from StreamTaskMailboxTestHarness, assertion codes and stream tasks execute in same thread, it is easy to get help from task execution(MailboxProcessor.runMailboxStep) without involved to concurrent realm. This is not possible with StreamTaskTestHarness which make it hard to probe and evaluate.

That is all I have got now.

@AHeise Thank you for pointing out the inappropriateness of TaskMailbox.isMailboxThreadBlocked.

@AHeise @rkhachatryan Thank you for reviewing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants