Skip to content

Conversation

@liuyongvs
Copy link
Contributor

What is the purpose of the change

barkport for FLINK-29749

Brief change log

(for example:)

  • The TaskInfo is stored in the blob store on job creation time as a persistent artifact
  • Deployments RPC transmits only the blob storage reference
  • TaskManagers retrieve the TaskInfo from the blob cache

Verifying this change

Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (100MB)
  • Extended integration test for recovery after master (JobManager) failure
  • Added test that validates that TaskInfo is transferred only once across recoveries
  • Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

zentol and others added 30 commits April 14, 2022 16:36
Need to make sure the job manager runner is complete, because the test runner does not implement required methods to query job details.
…ation Development > Table API & SQL" to Chinese. This closes apache#19401

Co-authored-by: Roc Marshal <flinker@126.com>
…cal sort is incorrect if adaptive batch scheduler is enabled

(cherry picked from commit 456ceb2)

This closes apache#19497
…binary/character string

Revert behaviour of binary to string casts and vice versa, to not use
hex enconding/decoding, but simple UTF-8 bytes transformation from a
byte[] to a string and vice versa.

(cherry picked from commit 4cdafff)
… `x'ab3234f0'` format

Add an `isPrinting()` method to the `CastRule.Context` set to true used by
`RowDataToStringConverterImpl` which defines a different casting behaviour
for `BinaryToStringCastRule` when printing binary columns, so that we output
columns of binary type as (for example): `x'ab03f98e'`, which can easily be copy
pasted as a binary literal to another SQL query.

(cherry picked from commit 75007f6)

This closes apache#19516.
…Vulnerability. This closes apache#19479

Signed-off-by: David N Perkins <David.N.Perkins@ibm.com>
…rState may have not been updated when sechema changes
…eachableCoalesceArgumentsRule

(cherry picked from commit 2e632af)

This closes apache#19566
dawidwys and others added 28 commits October 6, 2022 17:07
…Scheduler

We change the signature of JobVertex#initializeOnMaster/finalizeOnMaster to pass a Context object. In this context we can pass
the actual parallelism the vertex will be run with.
Enum fields have naming restrictions in some languages.
…otation to JUnit5 annotation for disabling tests on Java 11
…ointCommittableManagerImpl during deserialization.

When we recover the `CheckpointCommittableManager` we were ignoring the subtaskId it is recovered on. This becomes a problem when a sink uses a post-commit topology because multiple committer operators might forward committable summaries coming from the same subtaskId.

This ticket implements a fix to use the subtaskId already present in the CommittableCollectorSerializer when recreating CheckpointCommittableManagerImpl during recovery.

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>
…bleWithLinages with the CommittableSummary

Before this change during recovery for the CommitableCollector we
initialized the SubtaskCommittableManager always with the initial
checkpoint id (1) but the holding CheckpointCommittable with the
checkpoint id in state. This could lead to that the emitted
CommittableWithLinages update SubtaskCommittableManagers that they do
not belong to cause "Unknown subtask for <id>" failures.
Co-authored-by: Yufan Sheng <yufan@streamnative.io>
…on (apache#21100)

This cherry-picks apache#21069.

Co-authored-by: Yufan Sheng <yufan@streamnative.io>
… more than 1 committable.

Recovery more than one Committable causes `IllegalStateException` and prevents cluster to start.

When we recover the `CheckpointCommittableManager` we deserialize SubtaskCommittableManager instances from recovery state, and we put them into `Map<Integer, SubtaskCommittableManager<CommT>>`. The key of this map is subtaskId of the recovered manager. However, this will fail if we have to recover more than one committable.

What was implemented as a fix is to call `SubtaskCommittableManager::merge` if we already deserialize manager for this subtaskId.

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.