Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
import org.apache.flink.mesos.scheduler.ConnectionMonitor;
import org.apache.flink.mesos.scheduler.LaunchCoordinator;
import org.apache.flink.mesos.scheduler.LaunchableTask;
import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
import org.apache.flink.mesos.scheduler.TaskMonitor;
import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
Expand Down Expand Up @@ -79,6 +78,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -124,6 +124,8 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
@Nullable
private final String webUiUrl;

private final Collection<ResourceProfile> slotsPerWorker;

/** Mesos scheduler driver. */
private SchedulerDriver schedulerDriver;

Expand Down Expand Up @@ -191,6 +193,9 @@ public MesosResourceManager(
this.workersInNew = new HashMap<>(8);
this.workersInLaunch = new HashMap<>(8);
this.workersBeingReturned = new HashMap<>(8);

final ContaineredTaskManagerParameters containeredTaskManagerParameters = taskManagerParameters.containeredParameters();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would create a method to create and return slotsPerWorker to unload constructor code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea.

this.slotsPerWorker = createSlotsPerWorker(containeredTaskManagerParameters.numSlots());
}

protected ActorRef createSelfActor() {
Expand Down Expand Up @@ -352,7 +357,7 @@ private void recoverWorkers(final List<MesosWorkerStore.Worker> tasksFromPreviou
switch(worker.state()) {
case Launched:
workersInLaunch.put(extractResourceID(worker.taskID()), worker);
final LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID(), worker.profile());
final LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID());
toAssign.add(new Tuple2<>(launchable.taskRequest(), worker.hostname().get()));
break;
case Released:
Expand Down Expand Up @@ -426,15 +431,15 @@ protected void internalDeregisterApplication(
}

@Override
public void startNewWorker(ResourceProfile resourceProfile) {
public Collection<ResourceProfile> startNewWorker(ResourceProfile resourceProfile) {
LOG.info("Starting a new worker.");
try {
// generate new workers into persistent state and launch associated actors
MesosWorkerStore.Worker worker = MesosWorkerStore.Worker.newWorker(workerStore.newTaskID(), resourceProfile);
workerStore.putWorker(worker);
workersInNew.put(extractResourceID(worker.taskID()), worker);

LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID(), resourceProfile);
LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID());

LOG.info("Scheduling Mesos task {} with ({} MB, {} cpus).",
launchable.taskID().getValue(), launchable.taskRequest().getMemory(), launchable.taskRequest().getCPUs());
Expand All @@ -443,9 +448,12 @@ public void startNewWorker(ResourceProfile resourceProfile) {
taskMonitor.tell(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), selfActor);

// tell the launch coordinator to launch the new tasks
launchCoordinator.tell(new LaunchCoordinator.Launch(Collections.singletonList((LaunchableTask) launchable)), selfActor);
launchCoordinator.tell(new LaunchCoordinator.Launch(Collections.singletonList(launchable)), selfActor);

return slotsPerWorker;
} catch (Exception ex) {
onFatalError(new ResourceManagerException("Unable to request new workers.", ex));
return Collections.emptyList();
}
}

Expand Down Expand Up @@ -691,36 +699,13 @@ private CompletableFuture<Boolean> stopActor(@Nullable final ActorRef actorRef,
/**
* Creates a launchable task for Fenzo to process.
*/
private LaunchableMesosWorker createLaunchableMesosWorker(Protos.TaskID taskID, ResourceProfile resourceProfile) {

// create the specific TM parameters from the resource profile and some defaults
MesosTaskManagerParameters params = new MesosTaskManagerParameters(
resourceProfile.getCpuCores() < 1.0 ? taskManagerParameters.cpus() : resourceProfile.getCpuCores(),
taskManagerParameters.gpus(),
taskManagerParameters.containerType(),
taskManagerParameters.containerImageName(),
new ContaineredTaskManagerParameters(
ResourceProfile.UNKNOWN.equals(resourceProfile) ? taskManagerParameters.containeredParameters().taskManagerTotalMemoryMB() : resourceProfile.getMemoryInMB(),
ResourceProfile.UNKNOWN.equals(resourceProfile) ? taskManagerParameters.containeredParameters().taskManagerHeapSizeMB() : resourceProfile.getHeapMemoryInMB(),
ResourceProfile.UNKNOWN.equals(resourceProfile) ? taskManagerParameters.containeredParameters().taskManagerDirectMemoryLimitMB() : resourceProfile.getDirectMemoryInMB(),
1,
new HashMap<>(taskManagerParameters.containeredParameters().taskManagerEnv())),
taskManagerParameters.containerVolumes(),
taskManagerParameters.dockerParameters(),
taskManagerParameters.dockerForcePullImage(),
taskManagerParameters.constraints(),
taskManagerParameters.command(),
taskManagerParameters.bootstrapCommand(),
taskManagerParameters.getTaskManagerHostname(),
taskManagerParameters.uris()
);

LOG.debug("LaunchableMesosWorker parameters: {}", params);
private LaunchableMesosWorker createLaunchableMesosWorker(Protos.TaskID taskID) {
LOG.debug("LaunchableMesosWorker parameters: {}", taskManagerParameters);

LaunchableMesosWorker launchable =
new LaunchableMesosWorker(
artifactServer,
params,
taskManagerParameters,
taskManagerContainerSpec,
taskID,
mesosConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ static class Context implements AutoCloseable {
TestingMesosResourceManager resourceManager;

// domain objects for test purposes
final ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 1);
final ResourceProfile resourceProfile1 = ResourceProfile.UNKNOWN;

Protos.FrameworkID framework1 = Protos.FrameworkID.newBuilder().setValue("framework1").build();
public Protos.SlaveID slave1 = Protos.SlaveID.newBuilder().setValue("slave1").build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile

public static final ResourceProfile UNKNOWN = new ResourceProfile(-1.0, -1);

/** ResourceProfile which matches any other ResourceProfile. */
public static final ResourceProfile ANY = new ResourceProfile(Double.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Collections.emptyMap());

// ------------------------------------------------------------------------

/** How many cpu cores are needed, use double so we can specify cpu like 0.1. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -1019,9 +1020,10 @@ protected abstract void internalDeregisterApplication(
* Allocates a resource using the resource profile.
*
* @param resourceProfile The resource description
* @return Collection of {@link ResourceProfile} describing the launched slots
*/
@VisibleForTesting
public abstract void startNewWorker(ResourceProfile resourceProfile);
public abstract Collection<ResourceProfile> startNewWorker(ResourceProfile resourceProfile);

/**
* Callback when a worker was started.
Expand Down Expand Up @@ -1051,9 +1053,9 @@ public void releaseResource(InstanceID instanceId, Exception cause) {
}

@Override
public void allocateResource(ResourceProfile resourceProfile) throws ResourceManagerException {
public Collection<ResourceProfile> allocateResource(ResourceProfile resourceProfile) {
validateRunsInMainThread();
startNewWorker(resourceProfile);
return startNewWorker(resourceProfile);
}

@Override
Expand Down Expand Up @@ -1176,8 +1178,16 @@ public CompletableFuture<Void> retrievePayload(ResourceID resourceID) {
// Resource Management
// ------------------------------------------------------------------------

protected int getNumberPendingSlotRequests() {
return slotManager.getNumberPendingSlotRequests();
protected int getNumberRequiredTaskManagerSlots() {
return slotManager.getNumberPendingTaskManagerSlots();
}

// ------------------------------------------------------------------------
// Helper methods
// ------------------------------------------------------------------------

protected static Collection<ResourceProfile> createSlotsPerWorker(int numSlots) {
return Collections.nCopies(numSlots, ResourceProfile.ANY);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@

import javax.annotation.Nullable;

import java.util.Collection;
import java.util.Collections;

/**
* A standalone implementation of the resource manager. Used when the system is started in
* standalone mode (via scripts), rather than via a resource framework like YARN or Mesos.
Expand Down Expand Up @@ -74,7 +77,8 @@ protected void internalDeregisterApplication(ApplicationStatus finalStatus, @Nul
}

@Override
public void startNewWorker(ResourceProfile resourceProfile) {
public Collection<ResourceProfile> startNewWorker(ResourceProfile resourceProfile) {
return Collections.emptyList();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,31 @@
import org.apache.flink.runtime.resourcemanager.SlotRequest;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.util.concurrent.CompletableFuture;

/**
* Class representing a pending slot request in the {@link SlotManager}.
*/
public class PendingSlotRequest {

private final SlotRequest slotRequest;

@Nullable
private CompletableFuture<Acknowledge> requestFuture;

@Nullable
private PendingTaskManagerSlot pendingTaskManagerSlot;

/** Timestamp when this pending slot request has been created. */
private final long creationTimestamp;

public PendingSlotRequest(SlotRequest slotRequest) {
this.slotRequest = Preconditions.checkNotNull(slotRequest);
this.requestFuture = null;
this.pendingTaskManagerSlot = null;
creationTimestamp = System.currentTimeMillis();
}

Expand Down Expand Up @@ -78,4 +87,18 @@ public void setRequestFuture(@Nullable CompletableFuture<Acknowledge> requestFut
public CompletableFuture<Acknowledge> getRequestFuture() {
return requestFuture;
}

@Nullable
public PendingTaskManagerSlot getAssignedPendingTaskManagerSlot() {
return pendingTaskManagerSlot;
}

public void assignPendingTaskManagerSlot(@Nonnull PendingTaskManagerSlot pendingTaskManagerSlotToAssign) {
Preconditions.checkState(pendingTaskManagerSlot == null);
this.pendingTaskManagerSlot = pendingTaskManagerSlotToAssign;
}

public void unassignPendingTaskManagerSlot() {
this.pendingTaskManagerSlot = null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.resourcemanager.slotmanager;

import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/**
* Represents a pending task manager slot in the {@link SlotManager}.
*/
public class PendingTaskManagerSlot {

private final TaskManagerSlotId taskManagerSlotId = TaskManagerSlotId.generate();

private final ResourceProfile resourceProfile;

@Nullable
private PendingSlotRequest pendingSlotRequest;

public PendingTaskManagerSlot(ResourceProfile resourceProfile) {
this.resourceProfile = resourceProfile;
}

public TaskManagerSlotId getTaskManagerSlotId() {
return taskManagerSlotId;
}

public ResourceProfile getResourceProfile() {
return resourceProfile;
}

public void assignPendingSlotRequest(@Nonnull PendingSlotRequest pendingSlotRequestToAssign) {
Preconditions.checkState(pendingSlotRequest == null);
pendingSlotRequest = pendingSlotRequestToAssign;
}

public void unassignPendingSlotRequest() {
pendingSlotRequest = null;
}

@Nullable
public PendingSlotRequest getAssignedPendingSlotRequest() {
return pendingSlotRequest;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;

import java.util.Collection;

/**
* Resource related actions which the {@link SlotManager} can perform.
*/
Expand All @@ -41,9 +43,10 @@ public interface ResourceActions {
* Requests to allocate a resource with the given {@link ResourceProfile}.
*
* @param resourceProfile for the to be allocated resource
* @return Collection of {@link ResourceProfile} describing the allocated slots
* @throws ResourceManagerException if the resource cannot be allocated
*/
void allocateResource(ResourceProfile resourceProfile) throws ResourceManagerException;
Collection<ResourceProfile> allocateResource(ResourceProfile resourceProfile) throws ResourceManagerException;

/**
* Notifies that an allocation failure has occurred.
Expand Down
Loading