From 34a7734c489b080d34ff2194a29d3c1d25d3ab45 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Thu, 29 Feb 2024 13:22:10 +0100 Subject: [PATCH] [FLINK-20090][rest] Expose slot sharing group info in REST API --- .../program/rest/RestClusterClientTest.java | 78 +++++++++++++++++++ .../src/test/resources/rest_api_v1.snapshot | 3 + .../AccessExecutionJobVertex.java | 8 ++ .../ArchivedExecutionGraph.java | 1 + .../ArchivedExecutionJobVertex.java | 11 +++ .../executiongraph/ExecutionJobVertex.java | 1 + .../runtime/instance/SlotSharingGroupId.java | 13 +++- .../rest/handler/job/JobDetailsHandler.java | 1 + .../rest/messages/job/JobDetailsInfo.java | 20 +++++ .../json/SlotSharingGroupIDDeserializer.java | 44 +++++++++++ .../json/SlotSharingGroupIDSerializer.java | 43 ++++++++++ .../handler/job/JobExceptionsHandlerTest.java | 2 + .../job/JobVertexFlameGraphHandlerTest.java | 3 + ...askExecutionAttemptDetailsHandlerTest.java | 2 + .../rest/messages/job/JobDetailsInfoTest.java | 2 + 15 files changed, 231 insertions(+), 1 deletion(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SlotSharingGroupIDDeserializer.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SlotSharingGroupIDSerializer.java diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java index ce8bd8694e044..8e78ae8918e78 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java @@ -37,10 +37,13 @@ import org.apache.flink.runtime.client.JobSubmissionException; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.instance.SlotSharingGroupId; import org.apache.flink.runtime.io.network.partition.DataSetMetaInfo; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; +import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.messages.Acknowledge; @@ -71,6 +74,7 @@ import org.apache.flink.runtime.rest.messages.JobCancellationHeaders; import org.apache.flink.runtime.rest.messages.JobCancellationMessageParameters; import org.apache.flink.runtime.rest.messages.JobMessageParameters; +import org.apache.flink.runtime.rest.messages.JobPlanInfo; import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders; import org.apache.flink.runtime.rest.messages.MessageHeaders; import org.apache.flink.runtime.rest.messages.MessageParameters; @@ -86,6 +90,8 @@ import org.apache.flink.runtime.rest.messages.dataset.ClusterDataSetIdPathParameter; import org.apache.flink.runtime.rest.messages.dataset.ClusterDataSetListHeaders; import org.apache.flink.runtime.rest.messages.dataset.ClusterDataSetListResponseBody; +import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders; +import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo; import org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders; import org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody; import org.apache.flink.runtime.rest.messages.job.JobStatusInfoHeaders; @@ -96,6 +102,7 @@ import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationMessageParameters; import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationRequestBody; import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationResponseBody; +import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo; import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalRequest; import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusHeaders; import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusMessageParameters; @@ -1236,6 +1243,58 @@ void testNotShowSuspendedJobStatus() throws Exception { } } + @Test + void testJobDetailsContainsSlotSharingGroupId() throws Exception { + final IOMetricsInfo jobVertexMetrics = + new IOMetricsInfo(0, false, 0, false, 0, false, 0, false, 0, 0, 0); + SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId(); + final Collection jobVertexDetailsInfos = + Collections.singletonList( + new JobDetailsInfo.JobVertexDetailsInfo( + new JobVertexID(), + slotSharingGroupId, + "jobVertex1", + 2, + 1, + ExecutionState.RUNNING, + 1, + 2, + 1, + Collections.singletonMap(ExecutionState.RUNNING, 0), + jobVertexMetrics)); + final JobDetailsInfo jobDetailsInfo = + new JobDetailsInfo( + jobId, + "foobar", + false, + JobStatus.RUNNING, + 1, + 2, + 1, + 2, + 10, + Collections.singletonMap(JobStatus.RUNNING, 1L), + jobVertexDetailsInfos, + Collections.singletonMap(ExecutionState.RUNNING, 1), + new JobPlanInfo.RawJson("{\"id\":\"1234\"}")); + final TestJobDetailsInfoHandler jobDetailsInfoHandler = + new TestJobDetailsInfoHandler(jobDetailsInfo); + + try (TestRestServerEndpoint restServerEndpoint = + createRestServerEndpoint(jobDetailsInfoHandler)) { + try (RestClusterClient restClusterClient = + createRestClusterClient(restServerEndpoint.getServerAddress().getPort())) { + final CompletableFuture jobDetailsInfoFuture = + restClusterClient.getJobDetails(jobId); + Collection jobVertexInfos = + jobDetailsInfoFuture.get().getJobVertexInfos(); + assertThat(jobVertexInfos).hasSize(1); + assertThat(jobVertexInfos.iterator().next().getSlotSharingGroupId()) + .isEqualTo(slotSharingGroupId); + } + } + } + private class TestClientCoordinationHandler extends TestHandler< ClientCoordinationRequestBody, @@ -1402,6 +1461,25 @@ protected CompletableFuture handleRequest( } } + private class TestJobDetailsInfoHandler + extends TestHandler { + + private final JobDetailsInfo jobDetailsInfo; + + private TestJobDetailsInfoHandler(@Nonnull JobDetailsInfo jobDetailsInfo) { + super(JobDetailsHeaders.getInstance()); + this.jobDetailsInfo = checkNotNull(jobDetailsInfo); + } + + @Override + protected CompletableFuture handleRequest( + @Nonnull HandlerRequest request, + @Nonnull DispatcherGateway gateway) + throws RestHandlerException { + return CompletableFuture.completedFuture(jobDetailsInfo); + } + } + private abstract class TestHandler< R extends RequestBody, P extends ResponseBody, M extends MessageParameters> extends AbstractRestHandler { diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot index 11b642e440631..a5fd771f88673 100644 --- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot +++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot @@ -899,6 +899,9 @@ "id" : { "type" : "any" }, + "slotSharingGroupId" : { + "type" : "any" + }, "name" : { "type" : "string" }, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionJobVertex.java index e5a610020f50c..065aed3fd41e1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionJobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionJobVertex.java @@ -21,6 +21,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; /** * Common interface for the runtime {@link ExecutionJobVertex} and {@link @@ -48,6 +49,13 @@ public interface AccessExecutionJobVertex { */ int getMaxParallelism(); + /** + * Returns the slot sharing group for this job vertex. + * + * @return slot sharing group for this job vertex. + */ + SlotSharingGroup getSlotSharingGroup(); + /** * Returns the resource profile for this job vertex. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java index eea5abbdb7ea8..c0f25b870c61c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java @@ -400,6 +400,7 @@ public static ArchivedExecutionGraph createSparseArchivedExecutionGraphWithJobVe jobVertex.getName(), parallelismInfo.getParallelism(), parallelismInfo.getMaxParallelism(), + jobVertex.getSlotSharingGroup(), ResourceProfile.fromResourceSpec( jobVertex.getMinResources(), MemorySize.ZERO), new StringifiedAccumulatorResult[0]); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java index a2c97ec49bae9..d08f8e80033b2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java @@ -21,6 +21,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import java.io.Serializable; @@ -39,6 +40,8 @@ public class ArchivedExecutionJobVertex implements AccessExecutionJobVertex, Ser private final int maxParallelism; + private final SlotSharingGroup slotSharingGroup; + private final ResourceProfile resourceProfile; private final StringifiedAccumulatorResult[] archivedUserAccumulators; @@ -55,6 +58,7 @@ public ArchivedExecutionJobVertex(ExecutionJobVertex jobVertex) { this.name = jobVertex.getJobVertex().getName(); this.parallelism = jobVertex.getParallelism(); this.maxParallelism = jobVertex.getMaxParallelism(); + this.slotSharingGroup = jobVertex.getSlotSharingGroup(); this.resourceProfile = jobVertex.getResourceProfile(); } @@ -64,6 +68,7 @@ public ArchivedExecutionJobVertex( String name, int parallelism, int maxParallelism, + SlotSharingGroup slotSharingGroup, ResourceProfile resourceProfile, StringifiedAccumulatorResult[] archivedUserAccumulators) { this.taskVertices = taskVertices; @@ -71,6 +76,7 @@ public ArchivedExecutionJobVertex( this.name = name; this.parallelism = parallelism; this.maxParallelism = maxParallelism; + this.slotSharingGroup = slotSharingGroup; this.resourceProfile = resourceProfile; this.archivedUserAccumulators = archivedUserAccumulators; } @@ -94,6 +100,11 @@ public int getMaxParallelism() { return maxParallelism; } + @Override + public SlotSharingGroup getSlotSharingGroup() { + return slotSharingGroup; + } + @Override public ResourceProfile getResourceProfile() { return resourceProfile; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java index 18131e603378a..f9c77f19e947a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java @@ -391,6 +391,7 @@ public InputSplitAssigner getSplitAssigner() { return splitAssigner; } + @Override public SlotSharingGroup getSlotSharingGroup() { return slotSharingGroup; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupId.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupId.java index d7beb0e6209df..06c8d4cb32c0c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupId.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupId.java @@ -19,13 +19,24 @@ package org.apache.flink.runtime.instance; import org.apache.flink.util.AbstractID; +import org.apache.flink.util.StringUtils; public class SlotSharingGroupId extends AbstractID { private static final long serialVersionUID = 8837647978345422042L; + public SlotSharingGroupId() { + super(); + } + public SlotSharingGroupId(long lowerPart, long upperPart) { super(lowerPart, upperPart); } - public SlotSharingGroupId() {} + public SlotSharingGroupId(byte[] bytes) { + super(bytes); + } + + public static SlotSharingGroupId fromHexString(String hexString) { + return new SlotSharingGroupId(StringUtils.hexStringToByte(hexString)); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java index 3de279850410e..cc6a62d666e2c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java @@ -230,6 +230,7 @@ private static JobDetailsInfo.JobVertexDetailsInfo createJobVertexDetailsInfo( return new JobDetailsInfo.JobVertexDetailsInfo( ejv.getJobVertexId(), + ejv.getSlotSharingGroup().getSlotSharingGroupId(), ejv.getName(), ejv.getMaxParallelism(), ejv.getParallelism(), diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java index 6b624090b5628..ca9ee5eeeab34 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.instance.SlotSharingGroupId; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.rest.messages.JobPlanInfo; import org.apache.flink.runtime.rest.messages.ResponseBody; @@ -29,6 +30,8 @@ import org.apache.flink.runtime.rest.messages.json.JobIDSerializer; import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer; import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer; +import org.apache.flink.runtime.rest.messages.json.SlotSharingGroupIDDeserializer; +import org.apache.flink.runtime.rest.messages.json.SlotSharingGroupIDSerializer; import org.apache.flink.util.Preconditions; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; @@ -263,6 +266,8 @@ public static final class JobVertexDetailsInfo { public static final String FIELD_NAME_JOB_VERTEX_ID = "id"; + public static final String FIELD_NAME_SLOT_SHARING_GROUP_ID = "slotSharingGroupId"; + public static final String FIELD_NAME_JOB_VERTEX_NAME = "name"; public static final String FIELD_NAME_MAX_PARALLELISM = "maxParallelism"; @@ -285,6 +290,10 @@ public static final class JobVertexDetailsInfo { @JsonSerialize(using = JobVertexIDSerializer.class) private final JobVertexID jobVertexID; + @JsonProperty(FIELD_NAME_SLOT_SHARING_GROUP_ID) + @JsonSerialize(using = SlotSharingGroupIDSerializer.class) + private final SlotSharingGroupId slotSharingGroupId; + @JsonProperty(FIELD_NAME_JOB_VERTEX_NAME) private final String name; @@ -317,6 +326,9 @@ public JobVertexDetailsInfo( @JsonDeserialize(using = JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_JOB_VERTEX_ID) JobVertexID jobVertexID, + @JsonDeserialize(using = SlotSharingGroupIDDeserializer.class) + @JsonProperty(FIELD_NAME_SLOT_SHARING_GROUP_ID) + SlotSharingGroupId slotSharingGroupId, @JsonProperty(FIELD_NAME_JOB_VERTEX_NAME) String name, @JsonProperty(FIELD_NAME_MAX_PARALLELISM) int maxParallelism, @JsonProperty(FIELD_NAME_PARALLELISM) int parallelism, @@ -328,6 +340,7 @@ public JobVertexDetailsInfo( Map tasksPerState, @JsonProperty(FIELD_NAME_JOB_VERTEX_METRICS) IOMetricsInfo jobVertexMetrics) { this.jobVertexID = Preconditions.checkNotNull(jobVertexID); + this.slotSharingGroupId = Preconditions.checkNotNull(slotSharingGroupId); this.name = Preconditions.checkNotNull(name); this.maxParallelism = maxParallelism; this.parallelism = parallelism; @@ -344,6 +357,11 @@ public JobVertexID getJobVertexID() { return jobVertexID; } + @JsonIgnore + public SlotSharingGroupId getSlotSharingGroupId() { + return slotSharingGroupId; + } + @JsonIgnore public String getName() { return name; @@ -404,6 +422,7 @@ public boolean equals(Object o) { && endTime == that.endTime && duration == that.duration && Objects.equals(jobVertexID, that.jobVertexID) + && Objects.equals(slotSharingGroupId, that.slotSharingGroupId) && Objects.equals(name, that.name) && executionState == that.executionState && Objects.equals(tasksPerState, that.tasksPerState) @@ -414,6 +433,7 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash( jobVertexID, + slotSharingGroupId, name, maxParallelism, parallelism, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SlotSharingGroupIDDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SlotSharingGroupIDDeserializer.java new file mode 100644 index 0000000000000..ecd19dcecb8f1 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SlotSharingGroupIDDeserializer.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.json; + +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer; + +import java.io.IOException; + +/** Jackson deserializer for {@link SlotSharingGroupId}. */ +public class SlotSharingGroupIDDeserializer extends StdDeserializer { + + private static final long serialVersionUID = -2908308366715321301L; + + protected SlotSharingGroupIDDeserializer() { + super(JobVertexID.class); + } + + @Override + public SlotSharingGroupId deserialize(JsonParser p, DeserializationContext ctxt) + throws IOException { + return SlotSharingGroupId.fromHexString(p.getValueAsString()); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SlotSharingGroupIDSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SlotSharingGroupIDSerializer.java new file mode 100644 index 0000000000000..57b55c3292905 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SlotSharingGroupIDSerializer.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.json; + +import org.apache.flink.runtime.instance.SlotSharingGroupId; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer; + +import java.io.IOException; + +/** Jackson serializer for {@link SlotSharingGroupId}. */ +public class SlotSharingGroupIDSerializer extends StdSerializer { + + private static final long serialVersionUID = -4052148694985726120L; + + public SlotSharingGroupIDSerializer() { + super(SlotSharingGroupId.class); + } + + @Override + public void serialize(SlotSharingGroupId value, JsonGenerator gen, SerializerProvider provider) + throws IOException { + gen.writeString(value.toString()); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java index f5354e5ce9077..a377ec83bb009 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionHistory; import org.apache.flink.runtime.failure.FailureEnricherUtils; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.HandlerRequestException; import org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache; @@ -476,6 +477,7 @@ private static ArchivedExecutionJobVertex createArchivedExecutionJobVertex( jobVertexID.toString(), 1, 1, + new SlotSharingGroup(), ResourceProfile.UNKNOWN, emptyAccumulators); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexFlameGraphHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexFlameGraphHandlerTest.java index 8f73f9c7ac56b..0ffe0164bcd51 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexFlameGraphHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexFlameGraphHandlerTest.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex; import org.apache.flink.runtime.executiongraph.ExecutionHistory; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.HandlerRequestException; import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration; @@ -103,6 +104,7 @@ void testHandleMixedSubtasks() throws Exception { "test", 2, 2, + new SlotSharingGroup(), ResourceProfile.UNKNOWN, new StringifiedAccumulatorResult[0]); @@ -139,6 +141,7 @@ void testHandleFinishedJobVertex() throws Exception { "test", 2, 2, + new SlotSharingGroup(), ResourceProfile.UNKNOWN, new StringifiedAccumulatorResult[0]); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java index 661e9e3982cf8..79fe8d1226a0f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionHistory; import org.apache.flink.runtime.executiongraph.IOMetrics; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration; import org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache; @@ -115,6 +116,7 @@ void testHandleRequest() throws Exception { "test", 1, 1, + new SlotSharingGroup(), ResourceProfile.UNKNOWN, emptyAccumulators); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java index 43459c51097f4..74e8960277a1d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.instance.SlotSharingGroupId; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.rest.messages.JobPlanInfo; import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase; @@ -108,6 +109,7 @@ private JobDetailsInfo.JobVertexDetailsInfo createJobVertexDetailsInfo(Random ra int parallelism = 1 + (random.nextInt() / 3); return new JobDetailsInfo.JobVertexDetailsInfo( new JobVertexID(), + new SlotSharingGroupId(), "jobVertex" + random.nextLong(), 2 * parallelism, parallelism,