Skip to content

Commit

Permalink
[FLINK-8150] [flip6] Expose TaskExecutor's ResourceID as TaskExecutor id
Browse files Browse the repository at this point in the history
Before, the TaskExecutor's InstanceID was exposed as TaskExecutor id. This was wrong
since the InstanceID is bound the registration of a TaskExecutor whereas the
ResourceID is bound to the lifetime of the TaskExecutor. Thus, it is better to identify
the TaskExecutor by its ResourceID which does not change.

This commit changes the behaviour accordingly on the ResourceManager and the
TaskManagerDetailsHandler.

This closes #5093.
  • Loading branch information
tillrohrmann committed Nov 29, 2017
1 parent 44d973d commit dc7ab13
Show file tree
Hide file tree
Showing 14 changed files with 412 additions and 66 deletions.
Expand Up @@ -500,14 +500,14 @@ public CompletableFuture<Integer> getNumberOfRegisteredTaskManagers() {
@Override @Override
public CompletableFuture<Collection<TaskManagerInfo>> requestTaskManagerInfo(Time timeout) { public CompletableFuture<Collection<TaskManagerInfo>> requestTaskManagerInfo(Time timeout) {


ArrayList<TaskManagerInfo> taskManagerInfos = new ArrayList<>(taskExecutors.size()); final ArrayList<TaskManagerInfo> taskManagerInfos = new ArrayList<>(taskExecutors.size());


for (Map.Entry<ResourceID, WorkerRegistration<WorkerType>> taskExecutorEntry : taskExecutors.entrySet()) { for (Map.Entry<ResourceID, WorkerRegistration<WorkerType>> taskExecutorEntry : taskExecutors.entrySet()) {
final WorkerRegistration<WorkerType> taskExecutor = taskExecutorEntry.getValue(); final WorkerRegistration<WorkerType> taskExecutor = taskExecutorEntry.getValue();


taskManagerInfos.add( taskManagerInfos.add(
new TaskManagerInfo( new TaskManagerInfo(
taskExecutor.getInstanceID(), taskExecutorEntry.getKey(),
taskExecutor.getTaskExecutorGateway().getAddress(), taskExecutor.getTaskExecutorGateway().getAddress(),
taskExecutor.getDataPort(), taskExecutor.getDataPort(),
taskManagerHeartbeatManager.getLastHeartbeatFrom(taskExecutorEntry.getKey()), taskManagerHeartbeatManager.getLastHeartbeatFrom(taskExecutorEntry.getKey()),
Expand All @@ -520,29 +520,20 @@ public CompletableFuture<Collection<TaskManagerInfo>> requestTaskManagerInfo(Tim
} }


@Override @Override
public CompletableFuture<TaskManagerInfo> requestTaskManagerInfo(InstanceID instanceId, Time timeout) { public CompletableFuture<TaskManagerInfo> requestTaskManagerInfo(ResourceID resourceId, Time timeout) {


Map.Entry<ResourceID, WorkerRegistration<WorkerType>> taskExecutorEntry = null; WorkerRegistration<WorkerType> taskExecutor = taskExecutors.get(resourceId);


// TODO: Implement more efficiently if (taskExecutor == null) {
for (Map.Entry<ResourceID, WorkerRegistration<WorkerType>> workerEntry : taskExecutors.entrySet()) { return FutureUtils.completedExceptionally(new FlinkException("Requested TaskManager " + resourceId + " is not registered."));
if (Objects.equals(workerEntry.getValue().getInstanceID(), instanceId)) {
taskExecutorEntry = workerEntry;
break;
}
}

if (taskExecutorEntry == null) {
return FutureUtils.completedExceptionally(new FlinkException("Requested TaskManager " + instanceId + " is not registered."));
} else { } else {
WorkerRegistration<?> taskExecutor = taskExecutorEntry.getValue();
final TaskManagerInfo taskManagerInfo = new TaskManagerInfo( final TaskManagerInfo taskManagerInfo = new TaskManagerInfo(
taskExecutor.getInstanceID(), resourceId,
taskExecutor.getTaskExecutorGateway().getAddress(), taskExecutor.getTaskExecutorGateway().getAddress(),
taskExecutor.getDataPort(), taskExecutor.getDataPort(),
taskManagerHeartbeatManager.getLastHeartbeatFrom(taskExecutorEntry.getKey()), taskManagerHeartbeatManager.getLastHeartbeatFrom(resourceId),
slotManager.getNumberRegisteredSlotsOf(instanceId), slotManager.getNumberRegisteredSlotsOf(taskExecutor.getInstanceID()),
slotManager.getNumberFreeSlotsOf(instanceId), slotManager.getNumberFreeSlotsOf(taskExecutor.getInstanceID()),
taskExecutor.getHardwareDescription()); taskExecutor.getHardwareDescription());


return CompletableFuture.completedFuture(taskManagerInfo); return CompletableFuture.completedFuture(taskManagerInfo);
Expand Down
Expand Up @@ -185,11 +185,11 @@ void notifySlotAvailable(
/** /**
* Requests information about the given {@link TaskExecutor}. * Requests information about the given {@link TaskExecutor}.
* *
* @param instanceId identifying the TaskExecutor for which to return information * @param taskManagerId identifying the TaskExecutor for which to return information
* @param timeout of the request * @param timeout of the request
* @return Future TaskManager information * @return Future TaskManager information
*/ */
CompletableFuture<TaskManagerInfo> requestTaskManagerInfo(InstanceID instanceId, @RpcTimeout Time timeout); CompletableFuture<TaskManagerInfo> requestTaskManagerInfo(ResourceID taskManagerId, @RpcTimeout Time timeout);


/** /**
* Requests the resource overview. The resource overview provides information about the * Requests the resource overview. The resource overview provides information about the
Expand Down
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.runtime.rest.handler.taskmanager; package org.apache.flink.runtime.rest.handler.taskmanager;


import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException; import org.apache.flink.runtime.rest.handler.RestHandlerException;
Expand Down Expand Up @@ -71,7 +71,7 @@ public TaskManagerDetailsHandler(
protected CompletableFuture<TaskManagerDetailsInfo> handleRequest( protected CompletableFuture<TaskManagerDetailsInfo> handleRequest(
@Nonnull HandlerRequest<EmptyRequestBody, TaskManagerMessageParameters> request, @Nonnull HandlerRequest<EmptyRequestBody, TaskManagerMessageParameters> request,
@Nonnull ResourceManagerGateway gateway) throws RestHandlerException { @Nonnull ResourceManagerGateway gateway) throws RestHandlerException {
final InstanceID taskManagerInstanceId = request.getPathParameter(TaskManagerIdPathParameter.class); final ResourceID taskManagerInstanceId = request.getPathParameter(TaskManagerIdPathParameter.class);


CompletableFuture<TaskManagerInfo> taskManagerInfoFuture = gateway.requestTaskManagerInfo(taskManagerInstanceId, timeout); CompletableFuture<TaskManagerInfo> taskManagerInfoFuture = gateway.requestTaskManagerInfo(taskManagerInstanceId, timeout);


Expand Down
Expand Up @@ -18,8 +18,7 @@


package org.apache.flink.runtime.rest.messages.json; package org.apache.flink.runtime.rest.messages.json;


import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.util.StringUtils;


import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
Expand All @@ -28,18 +27,18 @@
import java.io.IOException; import java.io.IOException;


/** /**
* Json deserializer for {@link InstanceID}. * Json deserializer for {@link ResourceID}.
*/ */
public class InstanceIDDeserializer extends StdDeserializer<InstanceID> { public class ResourceIDDeserializer extends StdDeserializer<ResourceID> {


private static final long serialVersionUID = -9058463293913469849L; private static final long serialVersionUID = -9058463293913469849L;


protected InstanceIDDeserializer() { protected ResourceIDDeserializer() {
super(InstanceID.class); super(ResourceID.class);
} }


@Override @Override
public InstanceID deserialize(JsonParser p, DeserializationContext ctxt) throws IOException { public ResourceID deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
return new InstanceID(StringUtils.hexStringToByte(p.getValueAsString())); return new ResourceID(p.getValueAsString());
} }
} }
Expand Up @@ -18,6 +18,7 @@


package org.apache.flink.runtime.rest.messages.json; package org.apache.flink.runtime.rest.messages.json;


import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.instance.InstanceID;


import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
Expand All @@ -29,16 +30,16 @@
/** /**
* Json serializer for {@link InstanceID}. * Json serializer for {@link InstanceID}.
*/ */
public class InstanceIDSerializer extends StdSerializer<InstanceID> { public class ResourceIDSerializer extends StdSerializer<ResourceID> {


private static final long serialVersionUID = 5798852092159615938L; private static final long serialVersionUID = 5798852092159615938L;


protected InstanceIDSerializer() { protected ResourceIDSerializer() {
super(InstanceID.class); super(ResourceID.class);
} }


@Override @Override
public void serialize(InstanceID value, JsonGenerator gen, SerializerProvider provider) throws IOException { public void serialize(ResourceID value, JsonGenerator gen, SerializerProvider provider) throws IOException {
gen.writeString(value.toString()); gen.writeString(value.toString());
} }
} }
Expand Up @@ -18,9 +18,9 @@


package org.apache.flink.runtime.rest.messages.taskmanager; package org.apache.flink.runtime.rest.messages.taskmanager;


import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.instance.HardwareDescription; import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.rest.messages.json.ResourceIDDeserializer;
import org.apache.flink.runtime.rest.messages.json.InstanceIDDeserializer;
import org.apache.flink.runtime.taskexecutor.TaskExecutor; import org.apache.flink.runtime.taskexecutor.TaskExecutor;
import org.apache.flink.util.Preconditions; import org.apache.flink.util.Preconditions;


Expand All @@ -43,7 +43,7 @@ public class TaskManagerDetailsInfo extends TaskManagerInfo {


@JsonCreator @JsonCreator
public TaskManagerDetailsInfo( public TaskManagerDetailsInfo(
@JsonDeserialize(using = InstanceIDDeserializer.class) @JsonProperty(FIELD_NAME_INSTANCE_ID) InstanceID instanceId, @JsonDeserialize(using = ResourceIDDeserializer.class) @JsonProperty(FIELD_NAME_INSTANCE_ID) ResourceID resourceId,
@JsonProperty(FIELD_NAME_ADDRESS) String address, @JsonProperty(FIELD_NAME_ADDRESS) String address,
@JsonProperty(FIELD_NAME_DATA_PORT) int dataPort, @JsonProperty(FIELD_NAME_DATA_PORT) int dataPort,
@JsonProperty(FIELD_NAME_LAST_HEARTBEAT) long lastHeartbeat, @JsonProperty(FIELD_NAME_LAST_HEARTBEAT) long lastHeartbeat,
Expand All @@ -52,7 +52,7 @@ public TaskManagerDetailsInfo(
@JsonProperty(FIELD_NAME_HARDWARE) HardwareDescription hardwareDescription, @JsonProperty(FIELD_NAME_HARDWARE) HardwareDescription hardwareDescription,
@JsonProperty(FIELD_NAME_METRICS) TaskManagerMetricsInfo taskManagerMetrics) { @JsonProperty(FIELD_NAME_METRICS) TaskManagerMetricsInfo taskManagerMetrics) {
super( super(
instanceId, resourceId,
address, address,
dataPort, dataPort,
lastHeartbeat, lastHeartbeat,
Expand All @@ -65,7 +65,7 @@ public TaskManagerDetailsInfo(


public TaskManagerDetailsInfo(TaskManagerInfo taskManagerInfo, TaskManagerMetricsInfo taskManagerMetrics) { public TaskManagerDetailsInfo(TaskManagerInfo taskManagerInfo, TaskManagerMetricsInfo taskManagerMetrics) {
this( this(
taskManagerInfo.getInstanceId(), taskManagerInfo.getResourceId(),
taskManagerInfo.getAddress(), taskManagerInfo.getAddress(),
taskManagerInfo.getDataPort(), taskManagerInfo.getDataPort(),
taskManagerInfo.getLastHeartbeat(), taskManagerInfo.getLastHeartbeat(),
Expand Down
Expand Up @@ -18,15 +18,14 @@


package org.apache.flink.runtime.rest.messages.taskmanager; package org.apache.flink.runtime.rest.messages.taskmanager;


import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.rest.messages.ConversionException; import org.apache.flink.runtime.rest.messages.ConversionException;
import org.apache.flink.runtime.rest.messages.MessagePathParameter; import org.apache.flink.runtime.rest.messages.MessagePathParameter;
import org.apache.flink.util.StringUtils;


/** /**
* TaskManager id path parameter used by TaskManager related handlers. * TaskManager id path parameter used by TaskManager related handlers.
*/ */
public class TaskManagerIdPathParameter extends MessagePathParameter<InstanceID> { public class TaskManagerIdPathParameter extends MessagePathParameter<ResourceID> {


public static final String KEY = "taskmanagerid"; public static final String KEY = "taskmanagerid";


Expand All @@ -35,12 +34,12 @@ protected TaskManagerIdPathParameter() {
} }


@Override @Override
protected InstanceID convertFromString(String value) throws ConversionException { protected ResourceID convertFromString(String value) throws ConversionException {
return new InstanceID(StringUtils.hexStringToByte(value)); return new ResourceID(value);
} }


@Override @Override
protected String convertToString(InstanceID value) { protected String convertToString(ResourceID value) {
return StringUtils.byteToHexString(value.getBytes()); return value.getResourceIdString();
} }
} }
Expand Up @@ -18,11 +18,11 @@


package org.apache.flink.runtime.rest.messages.taskmanager; package org.apache.flink.runtime.rest.messages.taskmanager;


import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.instance.HardwareDescription; import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.rest.messages.ResponseBody; import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.messages.json.InstanceIDDeserializer; import org.apache.flink.runtime.rest.messages.json.ResourceIDDeserializer;
import org.apache.flink.runtime.rest.messages.json.InstanceIDSerializer; import org.apache.flink.runtime.rest.messages.json.ResourceIDSerializer;
import org.apache.flink.runtime.taskexecutor.TaskExecutor; import org.apache.flink.runtime.taskexecutor.TaskExecutor;
import org.apache.flink.util.Preconditions; import org.apache.flink.util.Preconditions;


Expand Down Expand Up @@ -53,8 +53,8 @@ public class TaskManagerInfo implements ResponseBody {
public static final String FIELD_NAME_HARDWARE = "hardware"; public static final String FIELD_NAME_HARDWARE = "hardware";


@JsonProperty(FIELD_NAME_INSTANCE_ID) @JsonProperty(FIELD_NAME_INSTANCE_ID)
@JsonSerialize(using = InstanceIDSerializer.class) @JsonSerialize(using = ResourceIDSerializer.class)
private final InstanceID instanceId; private final ResourceID resourceId;


@JsonProperty(FIELD_NAME_ADDRESS) @JsonProperty(FIELD_NAME_ADDRESS)
private final String address; private final String address;
Expand All @@ -76,14 +76,14 @@ public class TaskManagerInfo implements ResponseBody {


@JsonCreator @JsonCreator
public TaskManagerInfo( public TaskManagerInfo(
@JsonDeserialize(using = InstanceIDDeserializer.class) @JsonProperty(FIELD_NAME_INSTANCE_ID) InstanceID instanceId, @JsonDeserialize(using = ResourceIDDeserializer.class) @JsonProperty(FIELD_NAME_INSTANCE_ID) ResourceID resourceId,
@JsonProperty(FIELD_NAME_ADDRESS) String address, @JsonProperty(FIELD_NAME_ADDRESS) String address,
@JsonProperty(FIELD_NAME_DATA_PORT) int dataPort, @JsonProperty(FIELD_NAME_DATA_PORT) int dataPort,
@JsonProperty(FIELD_NAME_LAST_HEARTBEAT) long lastHeartbeat, @JsonProperty(FIELD_NAME_LAST_HEARTBEAT) long lastHeartbeat,
@JsonProperty(FIELD_NAME_NUMBER_SLOTS) int numberSlots, @JsonProperty(FIELD_NAME_NUMBER_SLOTS) int numberSlots,
@JsonProperty(FIELD_NAME_NUMBER_AVAILABLE_SLOTS) int numberAvailableSlots, @JsonProperty(FIELD_NAME_NUMBER_AVAILABLE_SLOTS) int numberAvailableSlots,
@JsonProperty(FIELD_NAME_HARDWARE) HardwareDescription hardwareDescription) { @JsonProperty(FIELD_NAME_HARDWARE) HardwareDescription hardwareDescription) {
this.instanceId = Preconditions.checkNotNull(instanceId); this.resourceId = Preconditions.checkNotNull(resourceId);
this.address = Preconditions.checkNotNull(address); this.address = Preconditions.checkNotNull(address);
this.dataPort = dataPort; this.dataPort = dataPort;
this.lastHeartbeat = lastHeartbeat; this.lastHeartbeat = lastHeartbeat;
Expand All @@ -92,8 +92,8 @@ public TaskManagerInfo(
this.hardwareDescription = Preconditions.checkNotNull(hardwareDescription); this.hardwareDescription = Preconditions.checkNotNull(hardwareDescription);
} }


public InstanceID getInstanceId() { public ResourceID getResourceId() {
return instanceId; return resourceId;
} }


public String getAddress() { public String getAddress() {
Expand Down Expand Up @@ -133,15 +133,15 @@ public boolean equals(Object o) {
lastHeartbeat == that.lastHeartbeat && lastHeartbeat == that.lastHeartbeat &&
numberSlots == that.numberSlots && numberSlots == that.numberSlots &&
numberAvailableSlots == that.numberAvailableSlots && numberAvailableSlots == that.numberAvailableSlots &&
Objects.equals(instanceId, that.instanceId) && Objects.equals(resourceId, that.resourceId) &&
Objects.equals(address, that.address) && Objects.equals(address, that.address) &&
Objects.equals(hardwareDescription, that.hardwareDescription); Objects.equals(hardwareDescription, that.hardwareDescription);
} }


@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash( return Objects.hash(
instanceId, resourceId,
address, address,
dataPort, dataPort,
lastHeartbeat, lastHeartbeat,
Expand Down
Expand Up @@ -18,8 +18,6 @@


package org.apache.flink.runtime.clusterframework; package org.apache.flink.runtime.clusterframework;


import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.AkkaOptions;
Expand Down Expand Up @@ -62,6 +60,9 @@
import org.apache.flink.runtime.testutils.TestingResourceManager; import org.apache.flink.runtime.testutils.TestingResourceManager;
import org.apache.flink.runtime.util.TestingFatalErrorHandler; import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.TestLogger; import org.apache.flink.util.TestLogger;

import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Before; import org.junit.Before;
Expand All @@ -70,8 +71,6 @@
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import org.mockito.Mockito; import org.mockito.Mockito;


import scala.Option;

import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
Expand All @@ -80,7 +79,11 @@
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;


import static org.junit.Assert.*; import scala.Option;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
Expand Down

0 comments on commit dc7ab13

Please sign in to comment.