Skip to content

[FLINK-34643] Fix concurrency issue in LoggerAuditingExtension#24550

Merged
rkhachatryan merged 2 commits intoapache:masterfrom
rkhachatryan:f34643
Mar 22, 2024
Merged

[FLINK-34643] Fix concurrency issue in LoggerAuditingExtension#24550
rkhachatryan merged 2 commits intoapache:masterfrom
rkhachatryan:f34643

Conversation

@rkhachatryan
Copy link
Copy Markdown
Contributor

What is the purpose of the change

(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)

Brief change log

(for example:)

  • The TaskInfo is stored in the blob store on job creation time as a persistent artifact
  • Deployments RPC transmits only the blob storage reference
  • TaskManagers retrieve the TaskInfo from the blob cache

Verifying this change

Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (100MB)
  • Extended integration test for recovery after master (JobManager) failure
  • Added test that validates that TaskInfo is transferred only once across recoveries
  • Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@rkhachatryan rkhachatryan requested a review from XComp March 21, 2024 13:29
@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Mar 21, 2024

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@XComp
Copy link
Copy Markdown
Contributor

XComp commented Mar 21, 2024

Can you elaborate a bit why this would solve the issue? Isn't beforeEach called only once per test method? 🤔

@rkhachatryan
Copy link
Copy Markdown
Contributor Author

Sure, my theory is that although the order is correct, the write to this field (loggingEvents) might not be visible to other threads, including threads started by Flink and logging threads (if any).
So some threads might see null there, catch NPE and log it to ConsoleLogger.
I'm not 100% sure that this is the issue, but it seems more likely than async logging or buffering of log records.

The fix is to use volatile or avoid using field (use closure). I'm using both (just in case some new usage will be added later).

Copy link
Copy Markdown
Contributor

@RyanSkraba RyanSkraba left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

I had to refresh my understanding of the volatile keyword :D, but it seems to me that this is correct. After this change, any assignment or reassignment to loggingEvent (1) will be immediately visible to other threads and (2) guarantees that any other thread that has read (to assign to loggingEventsLocal) will have occurred before.

Thanks!

@rkhachatryan
Copy link
Copy Markdown
Contributor Author

Thanks for reviewing! Merging

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants