diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java index ac4897bac0a42..447cc0ea51a7f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java @@ -33,6 +33,7 @@ import org.apache.flink.runtime.rest.handler.job.JobPlanHandler; import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler; import org.apache.flink.runtime.rest.handler.job.JobTerminationHandler; +import org.apache.flink.runtime.rest.handler.job.JobVertexAccumulatorsHandler; import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler; import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticDetailsHandler; import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache; @@ -57,6 +58,7 @@ import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders; import org.apache.flink.runtime.rest.messages.JobPlanHeaders; import org.apache.flink.runtime.rest.messages.JobTerminationHeaders; +import org.apache.flink.runtime.rest.messages.JobVertexAccumulatorsHeaders; import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeaders; import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatisticDetailsHeaders; import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders; @@ -217,6 +219,14 @@ protected Collection> in executionGraphCache, executor); + JobVertexAccumulatorsHandler jobVertexAccumulatorsHandler = new JobVertexAccumulatorsHandler( + restAddressFuture, + leaderRetriever, + timeout, + JobVertexAccumulatorsHeaders.getInstance(), + executionGraphCache, + executor); + final File tmpDir = restConfiguration.getTmpDir(); Optional> optWebContent; @@ -244,6 +254,7 @@ protected Collection> in handlers.add(Tuple2.of(JobPlanHeaders.getInstance(), jobPlanHandler)); handlers.add(Tuple2.of(TaskCheckpointStatisticsHeaders.getInstance(), taskCheckpointStatisticDetailsHandler)); handlers.add(Tuple2.of(JobExceptionsHeaders.getInstance(), jobExceptionsHandler)); + handlers.add(Tuple2.of(JobVertexAccumulatorsHeaders.getInstance(), jobVertexAccumulatorsHandler)); BlobServerPortHandler blobServerPortHandler = new BlobServerPortHandler(restAddressFuture, leaderRetriever, timeout); handlers.add(Tuple2.of(blobServerPortHandler.getMessageHeaders(), blobServerPortHandler)); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexAccumulatorsHandler.java new file mode 100644 index 0000000000000..e89052bf2cb34 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexAccumulatorsHandler.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobMessageParameters; +import org.apache.flink.runtime.rest.messages.JobVertexAccumulatorsInfo; +import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Request handler for the job vertex accumulators. + */ +public class JobVertexAccumulatorsHandler extends AbstractExecutionGraphHandler { + + public JobVertexAccumulatorsHandler( + CompletableFuture localRestAddress, + GatewayRetriever leaderRetriever, + Time timeout, + MessageHeaders messageHeaders, + ExecutionGraphCache executionGraphCache, + Executor executor) { + super(localRestAddress, leaderRetriever, timeout, messageHeaders, executionGraphCache, executor); + } + + @Override + protected JobVertexAccumulatorsInfo handleRequest(HandlerRequest request, AccessExecutionGraph executionGraph) throws RestHandlerException { + JobVertexID jobVertexID = request.getPathParameter(JobVertexIdPathParameter.class); + AccessExecutionJobVertex jobVertex = executionGraph.getJobVertex(jobVertexID); + if (null != jobVertex) { + StringifiedAccumulatorResult[] accs = jobVertex.getAggregatedUserAccumulatorsStringified(); + List userAccumulatorList = new ArrayList<>(); + for (StringifiedAccumulatorResult acc : accs) { + userAccumulatorList.add(new JobVertexAccumulatorsInfo.UserAccumulator( + acc.getName(), + acc.getType(), + acc.getValue())); + } + + return new JobVertexAccumulatorsInfo(jobVertex.getJobVertexId().toString(), userAccumulatorList); + } else { + throw new RestHandlerException("There is no accumulator for vertex " + jobVertexID + '.', HttpResponseStatus.NOT_FOUND); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsHeaders.java new file mode 100644 index 0000000000000..0e1179f31fdff --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsHeaders.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.handler.job.JobVertexAccumulatorsHandler; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +/** + * Message headers for the {@link JobVertexAccumulatorsHandler}. + */ +public class JobVertexAccumulatorsHeaders implements MessageHeaders { + + private static final JobVertexAccumulatorsHeaders INSTANCE = new JobVertexAccumulatorsHeaders(); + + public static final String URL = "/jobs/:jobid/vertices/:vertexid/accumulators"; + + private JobVertexAccumulatorsHeaders() {} + + @Override + public Class getRequestClass() { + return EmptyRequestBody.class; + } + + @Override + public Class getResponseClass() { + return JobVertexAccumulatorsInfo.class; + } + + @Override + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.OK; + } + + @Override + public JobMessageParameters getUnresolvedMessageParameters() { + return new JobMessageParameters(); + } + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.GET; + } + + @Override + public String getTargetRestEndpointURL() { + return URL; + } + + public static JobVertexAccumulatorsHeaders getInstance() { + return INSTANCE; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsInfo.java new file mode 100644 index 0000000000000..3f64a06d1d5d3 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsInfo.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.handler.job.JobVertexAccumulatorsHandler; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; +import java.util.Objects; + +/** + * Response type of the {@link JobVertexAccumulatorsHandler}. + */ +public class JobVertexAccumulatorsInfo implements ResponseBody { + + public static final String FIELD_NAME_ID = "id"; + public static final String FIELD_NAME_USER_ACCUMULATORS = "user-accumulators"; + + @JsonProperty(FIELD_NAME_ID) + private String id; + + @JsonProperty(FIELD_NAME_USER_ACCUMULATORS) + private List userAccumulatorList; + + @JsonCreator + public JobVertexAccumulatorsInfo( + @JsonProperty(FIELD_NAME_ID) String id, + @JsonProperty(FIELD_NAME_USER_ACCUMULATORS) List userAccumulatorList) { + this.id = id; + this.userAccumulatorList = userAccumulatorList; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + JobVertexAccumulatorsInfo that = (JobVertexAccumulatorsInfo) o; + return Objects.equals(id, that.id) && + Objects.equals(userAccumulatorList, that.userAccumulatorList); + } + + @Override + public int hashCode() { + return Objects.hash(id, userAccumulatorList); + } + + //--------------------------------------------------------------------------------- + // Static helper classes + //--------------------------------------------------------------------------------- + + /** + * Json serializer for the {@link JobVertexAccumulatorsInfo}. + */ + public static final class UserAccumulator { + + public static final String FIELD_NAME_ACC_NAME = "name"; + public static final String FIELD_NAME_ACC_TYPE = "type"; + public static final String FIELD_NAME_ACC_VALUE = "value"; + + @JsonProperty(FIELD_NAME_ACC_NAME) + private String name; + + @JsonProperty(FIELD_NAME_ACC_TYPE) + private String type; + + @JsonProperty(FIELD_NAME_ACC_VALUE) + private String value; + + @JsonCreator + public UserAccumulator( + @JsonProperty(FIELD_NAME_ACC_NAME) String name, + @JsonProperty(FIELD_NAME_ACC_TYPE) String type, + @JsonProperty(FIELD_NAME_ACC_VALUE) String value) { + this.name = name; + this.type = type; + this.value = value; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + UserAccumulator that = (UserAccumulator) o; + return Objects.equals(name, that.name) && + Objects.equals(type, that.type) && + Objects.equals(value, that.value); + } + + @Override + public int hashCode() { + return Objects.hash(name, type, value); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsInfoTest.java new file mode 100644 index 0000000000000..fd4fe788cfe38 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsInfoTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import java.util.ArrayList; +import java.util.List; + +/** + * Tests that the {@link JobVertexAccumulatorsInfo} can be marshalled and unmarshalled. + */ +public class JobVertexAccumulatorsInfoTest extends RestResponseMarshallingTestBase { + @Override + protected Class getTestResponseClass() { + return JobVertexAccumulatorsInfo.class; + } + + @Override + protected JobVertexAccumulatorsInfo getTestResponseInstance() throws Exception { + List userAccumulatorList = new ArrayList<>(); + userAccumulatorList.add(new JobVertexAccumulatorsInfo.UserAccumulator( + "test name1", + "test type1", + "test value1")); + userAccumulatorList.add(new JobVertexAccumulatorsInfo.UserAccumulator( + "test name2", + "test type2", + "test value2")); + userAccumulatorList.add(new JobVertexAccumulatorsInfo.UserAccumulator( + "test name3", + "test type3", + "test value3")); + + return new JobVertexAccumulatorsInfo("testId", userAccumulatorList); + } +}