Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-32495][connectors/common] Fix the bug that the shared thread factory causes the source alignment unit test to fail #22911

Merged

Conversation

1996fanrui
Copy link
Member

What is the purpose of the change

SourceCoordinatorAlignmentTest.testWatermarkAlignmentWithTwoGroups fails.

I analyzed this CI : https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50668&view=logs&j=a57e0635-3fad-5b08-57c7-a4142d7d6fa9&t=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7&l=9089

Root cause:

  • The CoordinatorExecutorThreadFactory cannot new multiple threads. And too many callers will check coordinatorThreadFactory.isCurrentThreadCoordinatorThread(), such as: SourceCoordinatorContext.attemptReady.
  • The CoordinatorExecutorThreadFactory is shared at SourceCoordinatorTestBase
  • It will be used at multiple source coordinator, and the second source coordinator will overwrite the CoordinatorExecutorThreadFactory#t, so the check will fail for the first source.

Brief change log

Don't share the CoordinatorExecutorThreadFactory.

Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): ( no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): ( no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)

@flinkbot
Copy link
Collaborator

flinkbot commented Jun 30, 2023

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

…actory causes the source alignment unit test to fail
@1996fanrui 1996fanrui force-pushed the 32495/shared-coordinator-thread-factory-18 branch from 3c4c809 to e62427b Compare June 30, 2023 05:18
@1996fanrui 1996fanrui force-pushed the 32495/shared-coordinator-thread-factory-18 branch from e62427b to 85b96a1 Compare June 30, 2023 05:41
Copy link
Contributor

@snuyanzin snuyanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @1996fanrui

i checked the tests locally with 10k repetitions

LGTM % CI

Copy link
Contributor

@RocMarshal RocMarshal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello, @1996fanrui , Thanks for your driving the fix.
That looks good to me.

@Before
public void setup() {
@BeforeEach
void setup() {
provider =
new SourceCoordinatorProvider<>(
"SourceCoordinatorProviderTest",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about

Suggested change
"SourceCoordinatorProviderTest",
this.getClass().getSimpleName(),

?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not related to this fix, and this fix is urgent, so i will merge it first.

Comment on lines +126 to +139
@Test
void testCoordinatorExecutorThreadFactoryNewMultipleThread() {
SourceCoordinatorProvider.CoordinatorExecutorThreadFactory
coordinatorExecutorThreadFactory =
new SourceCoordinatorProvider.CoordinatorExecutorThreadFactory(
"test_coordinator_thread",
new MockOperatorCoordinatorContext(
new OperatorID(1234L, 5678L), 3));

coordinatorExecutorThreadFactory.newThread(() -> {});
// coordinatorExecutorThreadFactory cannot create multiple threads.
assertThatThrownBy(() -> coordinatorExecutorThreadFactory.newThread(() -> {}))
.isInstanceOf(IllegalStateException.class);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Comment on lines +138 to +141
checkState(
t == null,
"Please using the new CoordinatorExecutorThreadFactory,"
+ " this factory cannot new multiple threads.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice~.
This design is in line with the principle that It should fail first once encounter error operation.

@1996fanrui
Copy link
Member Author

Hi @snuyanzin @RocMarshal , thanks for the quick review, merging.

@1996fanrui 1996fanrui merged commit 9d7b166 into apache:master Jun 30, 2023
@1996fanrui 1996fanrui deleted the 32495/shared-coordinator-thread-factory-18 branch June 30, 2023 12:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants