Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-15918][FLINK-15917] Uptime Metric not reset on Job Restart / Root Exception not shown in Web UI #11032

Closed
wants to merge 2 commits into from

Conversation

GJL
Copy link
Member

@GJL GJL commented Feb 6, 2020

What is the purpose of the change

(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)

Brief change log

(for example:)

  • The TaskInfo is stored in the blob store on job creation time as a persistent artifact
  • Deployments RPC transmits only the blob storage reference
  • TaskManagers retrieve the TaskInfo from the blob cache

Verifying this change

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (100MB)
  • Extended integration test for recovery after master (JobManager) failure
  • Added test that validates that TaskInfo is transferred only once across recoveries
  • Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@GJL GJL requested a review from tillrohrmann February 6, 2020 14:15
@GJL GJL force-pushed the FLINK-15918 branch 2 times, most recently from 354d125 to a1422e1 Compare February 6, 2020 14:17
@flinkbot
Copy link
Collaborator

flinkbot commented Feb 6, 2020

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 354d125 (Thu Feb 06 14:19:08 UTC 2020)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Feb 6, 2020

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build
  • @flinkbot run azure re-run the last Azure build

@@ -210,6 +216,9 @@ private void restartTasksWithDelay(final FailureHandlingResult failureHandlingRe
final Set<ExecutionVertexVersion> executionVertexVersions =
new HashSet<>(executionVertexVersioner.recordVertexModifications(verticesToRestart).values());

transitionExecutionGraphState(JobStatus.RUNNING, JobStatus.RESTARTING);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extract method

@@ -223,6 +232,11 @@ private Runnable restartTasks(final Set<ExecutionVertexVersion> executionVertexV
return () -> {
final Set<ExecutionVertexID> verticesToRestart = executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions);

verticesWaitingForRestart.removeAll(verticesToRestart);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extract method

@tillrohrmann
Copy link
Contributor

I've started a separate build on travis-ci.com. You can track the progress here: https://travis-ci.com/flink-ci/flink/builds/147751241

Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for creating this PR @GJL. I think at the moment we cannot directly merge the PR. The problem is that ExecutionGraph.cancel will transition the ExecutionGraph directly into a globally terminal state if it is in state RESTARTING. Maybe we could remove this short cut and handle the RESTARTING case as we handle RUNNING and CREATED.

@GJL
Copy link
Member Author

GJL commented Feb 6, 2020

https://api.travis-ci.org/v3/job/646941244/log.txt

2020-02-06 19:39:58,390 INFO  org.apache.flink.runtime.taskmanager.Task                     - Ensuring all FileSystem streams are closed for task Source: Custom Source (1/1) (e809b769c817e73fa009ec5055e883e2) [CANCELED]
2020-02-06 19:39:58,394 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - Error while canceling task.
org.apache.flink.runtime.execution.CancelTaskException: Consumed partition PipelinedSubpartitionView(index: 0) of ResultPartition 20cd8853199f84e8a26339a17fe731e9@e809b769c817e73fa009ec5055e883e2 has been released.
	at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.getNextBuffer(LocalInputChannel.java:190)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.waitAndGetNextData(SingleInputGate.java:509)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:487)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNext(SingleInputGate.java:475)
	at org.apache.flink.runtime.taskmanager.InputGateWithMetrics.pollNext(InputGateWithMetrics.java:75)
	at org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:125)
	at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:133)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
	at java.lang.Thread.run(Thread.java:748)
2020-02-06 19:39:58,409 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Un-registering task and sending final execution state CANCELED to JobManager for task Source: Custom Source (1/1) e809b769c817e73fa009ec5055e883e2.
2020-02-06 19:39:58,425 INFO  org.apache.flink.runtime.taskmanager.Task                     - Map -> Sink: Unnamed (1/1) (e2897fbac05b906ad242f5dd8e21fbbe) switched from CANCELING to CANCELED.
2020-02-06 19:39:58,425 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Map -> Sink: Unnamed (1/1) (e2897fbac05b906ad242f5dd8e21fbbe).
2020-02-06 19:39:58,426 INFO  org.apache.flink.runtime.taskmanager.Task                     - Ensuring all FileSystem streams are closed for task Map -> Sink: Unnamed (1/1) (e2897fbac05b906ad242f5dd8e21fbbe) [CANCELED]
2020-02-06 19:39:58,427 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Un-registering task and sending final execution state CANCELED to JobManager for task Map -> Sink: Unnamed (1/1) e2897fbac05b906ad242f5dd8e21fbbe.
2020-02-06 19:39:58,451 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl  - Free slot TaskSlot(index:0, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=384.000mb (402653174 bytes), taskOffHeapMemory=0 bytes, managedMemory=512.000mb (536870920 bytes), networkMemory=128.000mb (134217730 bytes)}, allocationId: 0fd30c7ec2d4e14753fe9e4803721b73, jobId: b3cc3c987e7aa8f99f5814b05d0057fe).
2020-02-06 19:39:58,452 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Remove job b3cc3c987e7aa8f99f5814b05d0057fe from job leader monitoring.
2020-02-06 19:39:58,452 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close JobManager connection for job b3cc3c987e7aa8f99f5814b05d0057fe.
2020-02-06 19:39:58,455 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close JobManager connection for job b3cc3c987e7aa8f99f5814b05d0057fe.
2020-02-06 19:39:58,455 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Cannot reconnect to job b3cc3c987e7aa8f99f5814b05d0057fe because it is not registered.
Checking for non-empty .out files...
No non-empty .out files.

[FAIL] 'State Migration end-to-end test from 1.6' failed after 0 minutes and 17 seconds! Test exited with exit code 0 but the logs contained errors, exceptions or non-empty .out files

Shouldn't be related?

@zhuzhurk
Copy link
Contributor

zhuzhurk commented Feb 7, 2020

https://api.travis-ci.org/v3/job/646941244/log.txt

Looks this is not an issue. Just that the upstream task was canceled earlier and triggered a CancelTaskException of the downstream task which is still processing data on the same TM.

@GJL
Copy link
Member Author

GJL commented Feb 7, 2020

I was just about to post that this is not a problem. It has been fixed on master but not on release-1.10. See https://issues.apache.org/jira/browse/FLINK-15751?focusedCommentId=17024366&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17024366

@tillrohrmann
Copy link
Contributor

https://travis-ci.com/flink-ci/flink/builds/147754663 seems to pass as well.

Could we check whether we have a test ensuring that when we are in RESTARTING and calling cancel on the ExecutionGraph that we stop all tasks and eventually go into the state CANCELED?

@GJL
Copy link
Member Author

GJL commented Feb 7, 2020

Copy link
Contributor

@zhuzhurk zhuzhurk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for opening this PR @GJL . The change looks good to me.
+1 to merge it.

This fix ensures the backward compatibility of several metrics. However, I think some of the metrics are not very accurate now since region failover and batch scheduling are used by more and more users. I think in 1.11 we should revisit these metrics to see how to fix them for all these new scenarios.

Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing my comments @GJL. LGTM. I had a single comment concerning the newly added test. Once this comment is resolved +1 for merging.

…start

The 'uptime' metric is the time difference between 'now' and the timestamp when
the job transitioned to state RUNNING. The new scheduler until now never
transitioned out of status RUNNING when restarting tasks which had the effect
that the 'uptime' metric was not reset after a restart. This introduces new
state transitions to the job. We transition the job status to RESTARTING if at
least one ExecutionVertex is waiting to be restarted, and we transition from
RESTARTING immediately to RUNNING again after the restart.

This closes apache#11032.
@GJL GJL changed the title [WIP][FLINK-15918] [FLINK-15918][FLINK-15917] Feb 7, 2020
@GJL GJL changed the title [FLINK-15918][FLINK-15917] [FLINK-15918][FLINK-15917] Uptime Metric not reset on Job Restart / Root Exception not shown in Web UI Feb 7, 2020
GJL added a commit that referenced this pull request Feb 7, 2020
…start

The 'uptime' metric is the time difference between 'now' and the timestamp when
the job transitioned to state RUNNING. The new scheduler until now never
transitioned out of status RUNNING when restarting tasks which had the effect
that the 'uptime' metric was not reset after a restart. This introduces new
state transitions to the job. We transition the job status to RESTARTING if at
least one ExecutionVertex is waiting to be restarted, and we transition from
RESTARTING immediately to RUNNING again after the restart.

This closes #11032.
GJL added a commit to GJL/flink that referenced this pull request Feb 7, 2020
…start

The 'uptime' metric is the time difference between 'now' and the timestamp when
the job transitioned to state RUNNING. The new scheduler until now never
transitioned out of status RUNNING when restarting tasks which had the effect
that the 'uptime' metric was not reset after a restart. This introduces new
state transitions to the job. We transition the job status to RESTARTING if at
least one ExecutionVertex is waiting to be restarted, and we transition from
RESTARTING immediately to RUNNING again after the restart.

This closes apache#11032.
@GJL GJL closed this in b902ed5 Feb 10, 2020
JTaky pushed a commit to JTaky/flink that referenced this pull request Feb 20, 2020
…start

The 'uptime' metric is the time difference between 'now' and the timestamp when
the job transitioned to state RUNNING. The new scheduler until now never
transitioned out of status RUNNING when restarting tasks which had the effect
that the 'uptime' metric was not reset after a restart. This introduces new
state transitions to the job. We transition the job status to RESTARTING if at
least one ExecutionVertex is waiting to be restarted, and we transition from
RESTARTING immediately to RUNNING again after the restart.

This closes apache#11032.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants