Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-9212][flip6] Port SubtasksAllAccumulatorsHandler to new REST endpoint #5893

Closed
wants to merge 2 commits into from

Conversation

yew1eb
Copy link
Contributor

@yew1eb yew1eb commented Apr 22, 2018

What is the purpose of the change

Port SubtasksAllAccumulatorsHandler to new REST endpoint

Brief change log

  • Add SubtasksAllAccumulatorsHandler for WebMonitorEndpoint
  • Add SubtasksAllAccumulatorsHeaders/SubtasksAllAccumulatorsInfo for rest response
  • Add SubtasksAllAccumulatorsInfoTest

Verifying this change

Added test case SubtasksAllAccumulatorsInfoTest to generate the json response

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

public class SubtasksAllAccumulatorsHandler extends AbstractJobVertexHandler<SubtasksAllAccumulatorsInfo, JobVertexMessageParameters> {

/**
* Instantiates a new subtasks all accumulators handler.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we typically don't have javadocs on constructors as they rarely offer any value

import java.util.Collection;
import java.util.Objects;


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove empty line

));
}

return new SubtasksAllAccumulatorsInfo(vertexId, parallelism, subtaskAccumulatorsInfos);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the vertex ID should be passed as a JobVertexID, not as a string.

public static final String FILED_NMAE_SUBTASKS = "subtasks";

@JsonProperty(FIELD_NAME_ID)
private String id;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be a JobVertexID, and a more descriptive field name would be great. (the JSON field name should remain the same ;) )


final List<SubtasksAllAccumulatorsInfo.SubtaskAccumulatorsInfo> subtaskAccumulatorsInfos = new ArrayList<>();

int index = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not the actual subtask index. Use vertex.getCurrentExecutionAttempt().getParallelSubtaskIndex() instead.


@Override
protected SubtasksAllAccumulatorsInfo getTestResponseInstance() throws Exception {
List<SubtasksAllAccumulatorsInfo.SubtaskAccumulatorsInfo> subtaskAccumulatorsInfos = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you now the size of this array already. (3)

protected SubtasksAllAccumulatorsInfo getTestResponseInstance() throws Exception {
List<SubtasksAllAccumulatorsInfo.SubtaskAccumulatorsInfo> subtaskAccumulatorsInfos = new ArrayList<>();

List<UserAccumulator> userAccumulators = new ArrayList<>(3);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you actually only have 2 accumulators.

@yew1eb
Copy link
Contributor Author

yew1eb commented Apr 23, 2018

@zentol thanks for your review. I have updated the PR.

@zentol
Copy link
Contributor

zentol commented Apr 23, 2018

You have not changed the type of SubtasksAllAccumulatorsInfo#jobVertexId to JobVertexID.

@yew1eb
Copy link
Contributor Author

yew1eb commented Apr 23, 2018

i saw legacy/SubtasksAllAccumulatorsHandler.java used gen.writeStringField("id", jobVertex.getJobVertexId().toString());.

if i change the type of SubtasksAllAccumulatorsInfo#jobVertexId to JobVertexID ,
run the SubtasksAllAccumulatorsInfoTest will fail, error log as follow:

org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "bytes" (class org.apache.flink.runtime.jobgraph.JobVertexID), not marked as ignorable (2 known properties: "lowerPart", "upperPart"])
at [Source: {"id":{"upperPart":-7327168959427448358,"lowerPart":4189391734336269051,"bytes":"OiO1lgYSKvuaUKqnBA7x2g=="},"parallelism":4,"subtasks":[{"subtask":0,"attempt":0,"host":"host-0","user-accumulators":[{"name":"test name1","type":"test type1","value":"test value1"},{"name":"test name2","type":"test type2","value":"test value2"}]},{"subtask":1,"attempt":1,"host":"host-1","user-accumulators":[{"name":"test name1","type":"test type1","value":"test value1"},{"name":"test name2","type":"test type2","value":"test value2"}]},{"subtask":2,"attempt":2,"host":"host-2","user-accumulators":[{"name":"test name1","type":"test type1","value":"test value1"},{"name":"test name2","type":"test type2","value":"test value2"}]}]}; line: 1, column: 82] (through reference chain: org.apache.flink.runtime.rest.messages.job.SubtasksAllAccumulatorsInfo["id"]->org.apache.flink.runtime.jobgraph.JobVertexID["bytes"])

@zentol
Copy link
Contributor

zentol commented Apr 23, 2018

You have to use the JobVertexIDSerializer for that field.

@zentol
Copy link
Contributor

zentol commented Apr 23, 2018

see JobVertexDetailsInfo on how to use it.

@yew1eb yew1eb force-pushed the FLINK-9212 branch 3 times, most recently from 267f294 to 7e8cc4a Compare April 23, 2018 13:48
@yew1eb
Copy link
Contributor Author

yew1eb commented Apr 23, 2018

@zentol thanks you. I updated the code. 😆

Copy link
Contributor

@zentol zentol left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 minor comments, otherwise +1.

for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {

TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation();
String locationString = location == null ? "(unassigned)" : location.getHostname();

gen.writeStartObject();

gen.writeNumberField("subtask", num++);
gen.writeNumberField("subtask", vertex.getCurrentExecutionAttempt().getParallelSubtaskIndex());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep this at it is.

public static final String FIELD_NAME_PARALLELISM = "parallelism";
public static final String FILED_NMAE_SUBTASKS = "subtasks";
public static final String FILED_NAME_SUBTASKS = "subtasks";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: FILED -> FIELD

@zentol
Copy link
Contributor

zentol commented Apr 24, 2018

I'll address the remaining issues myself when merging, thanks for working on this!

zentol pushed a commit to zentol/flink that referenced this pull request Apr 24, 2018
asfgit pushed a commit that referenced this pull request Apr 25, 2018
@asfgit asfgit closed this in 512083a Apr 25, 2018
sampathBhat pushed a commit to sampathBhat/flink that referenced this pull request Jul 26, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants