Skip to content

Commit

Permalink
[FLINK-14400] Shrink the scope of MemoryManager from TaskExecutor to …
Browse files Browse the repository at this point in the history
…slot

MemoryManager currently manages the memory bookkeeping for all slots/tasks inside one TaskExecutor.
For better abstraction and isolation of slots, we can shrink its scope and make it per slot.
The memory limits are fixed now per slot at the moment of slot creation. All operators, sharing the slot,
will get their fixed fractional limits.

In future, we might make it possible for operators to over-allocate beyond their fraction limit
if there is some available free memory in the slot but it should be possible to reclaim the over-allocated memory
at any time if other operator decides to claim its fair share within its limit.
  • Loading branch information
azagrebin committed Nov 19, 2019
1 parent 01d6972 commit 9fed0dd
Show file tree
Hide file tree
Showing 9 changed files with 125 additions and 80 deletions.
Expand Up @@ -89,6 +89,18 @@ public class MemoryManager {
/** Flag whether the close() has already been invoked. */
private volatile boolean isShutDown;

/**
* Creates a memory manager with the given memory types, capacity and given page size.
*
* @param memorySizeByType The total size of the memory to be managed by this memory manager for each type (heap / off-heap).
* @param pageSize The size of the pages handed out by the memory manager.
*/
public MemoryManager(
Map<MemoryType, Long> memorySizeByType,
int pageSize) {
this(memorySizeByType, 1, pageSize);
}

/**
* Creates a memory manager with the given memory types, capacity and given page size.
*
Expand Down Expand Up @@ -180,7 +192,6 @@ public boolean isShutdown() {
*
* @return True, if the memory manager is empty and valid, false if it is not empty or corrupted.
*/
@VisibleForTesting
public boolean verifyEmpty() {
return budgetByType.totalAvailableBudget() == budgetByType.maxTotalBudget();
}
Expand Down
Expand Up @@ -64,6 +64,7 @@
import org.apache.flink.runtime.jobmaster.ResourceManagerAddress;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.TaskBackPressureResponse;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
Expand Down Expand Up @@ -269,8 +270,7 @@ public TaskExecutor(

this.jobManagerConnections = new HashMap<>(4);

this.hardwareDescription = HardwareDescription.extractFromSystem(
taskExecutorServices.getMemoryManager().getMemorySize());
this.hardwareDescription = HardwareDescription.extractFromSystem(taskExecutorServices.getManagedMemorySize());

this.resourceManagerAddress = null;
this.resourceManagerConnection = null;
Expand Down Expand Up @@ -405,8 +405,6 @@ private void stopTaskExecutorServices() throws Exception {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}

taskSlotTable.stop();

try {
resourceManagerLeaderRetriever.stop();
} catch (Exception e) {
Expand Down Expand Up @@ -553,6 +551,13 @@ public CompletableFuture<Acknowledge> submitTask(
taskRestore,
checkpointResponder);

MemoryManager memoryManager;
try {
memoryManager = taskSlotTable.getTaskMemoryManager(tdd.getAllocationId());
} catch (SlotNotFoundException e) {
throw new TaskSubmissionException("Could not submit task.", e);
}

Task task = new Task(
jobInformation,
taskInformation,
Expand All @@ -563,7 +568,7 @@ public CompletableFuture<Acknowledge> submitTask(
tdd.getProducedPartitions(),
tdd.getInputGates(),
tdd.getTargetSlotNumber(),
taskExecutorServices.getMemoryManager(),
memoryManager,
taskExecutorServices.getIOManager(),
taskExecutorServices.getShuffleEnvironment(),
taskExecutorServices.getKvStateService(),
Expand Down
Expand Up @@ -51,12 +51,13 @@

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.apache.flink.configuration.MemorySize.MemoryUnit.MEGA_BYTES;

Expand All @@ -73,7 +74,7 @@ public class TaskManagerServices {

/** TaskManager services. */
private final TaskManagerLocation taskManagerLocation;
private final MemoryManager memoryManager;
private final long managedMemorySize;
private final IOManager ioManager;
private final ShuffleEnvironment<?, ?> shuffleEnvironment;
private final KvStateService kvStateService;
Expand All @@ -86,7 +87,7 @@ public class TaskManagerServices {

TaskManagerServices(
TaskManagerLocation taskManagerLocation,
MemoryManager memoryManager,
long managedMemorySize,
IOManager ioManager,
ShuffleEnvironment<?, ?> shuffleEnvironment,
KvStateService kvStateService,
Expand All @@ -98,7 +99,7 @@ public class TaskManagerServices {
TaskEventDispatcher taskEventDispatcher) {

this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
this.memoryManager = Preconditions.checkNotNull(memoryManager);
this.managedMemorySize = managedMemorySize;
this.ioManager = Preconditions.checkNotNull(ioManager);
this.shuffleEnvironment = Preconditions.checkNotNull(shuffleEnvironment);
this.kvStateService = Preconditions.checkNotNull(kvStateService);
Expand All @@ -114,8 +115,8 @@ public class TaskManagerServices {
// Getter/Setter
// --------------------------------------------------------------------------------------------

public MemoryManager getMemoryManager() {
return memoryManager;
long getManagedMemorySize() {
return managedMemorySize;
}

public IOManager getIOManager() {
Expand Down Expand Up @@ -175,12 +176,6 @@ public void shutDown() throws FlinkException {
exception = e;
}

try {
memoryManager.shutdown();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}

try {
ioManager.close();
} catch (Exception e) {
Expand Down Expand Up @@ -258,15 +253,15 @@ public static TaskManagerServices fromConfiguration(
taskManagerServicesConfiguration.getTaskManagerAddress(),
dataPort);

// this call has to happen strictly after the network stack has been initialized
final MemoryManager memoryManager = createMemoryManager(taskManagerServicesConfiguration);

final BroadcastVariableManager broadcastVariableManager = new BroadcastVariableManager();

// this call has to happen strictly after the network stack has been initialized
Map<MemoryType, Long> memorySizeByType = calculateMemorySizeByType(taskManagerServicesConfiguration);
final TaskSlotTable taskSlotTable = createTaskSlotTable(
taskManagerServicesConfiguration.getNumberOfSlots(),
memoryManager.getMemorySize(),
taskManagerServicesConfiguration.getTimerServiceShutdownTimeout());
memorySizeByType,
taskManagerServicesConfiguration.getTimerServiceShutdownTimeout(),
taskManagerServicesConfiguration.getPageSize());

final JobManagerTable jobManagerTable = new JobManagerTable();

Expand All @@ -287,7 +282,7 @@ public static TaskManagerServices fromConfiguration(

return new TaskManagerServices(
taskManagerLocation,
memoryManager,
memorySizeByType.values().stream().mapToLong(s -> s).sum(),
ioManager,
shuffleEnvironment,
kvStateService,
Expand All @@ -300,27 +295,25 @@ public static TaskManagerServices fromConfiguration(
}

private static TaskSlotTable createTaskSlotTable(
int numberOfSlots,
long managedMemorySize,
long timerServiceShutdownTimeout) {
final int numberOfSlots,
final Map<MemoryType, Long> memorySizeByType,
final long timerServiceShutdownTimeout,
final int pageSize) {
final List<ResourceProfile> resourceProfiles =
Collections.nCopies(numberOfSlots, computeSlotResourceProfile(numberOfSlots, managedMemorySize));
Collections.nCopies(numberOfSlots, computeSlotResourceProfile(numberOfSlots, memorySizeByType));
final TimerService<AllocationID> timerService = new TimerService<>(
new ScheduledThreadPoolExecutor(1),
timerServiceShutdownTimeout);
return new TaskSlotTable(createTaskSlotsFromResources(resourceProfiles), timerService);
return new TaskSlotTable(createTaskSlotsFromResources(resourceProfiles, pageSize), timerService);
}

private static List<TaskSlot> createTaskSlotsFromResources(Collection<ResourceProfile> resourceProfiles) {
int numberSlots = resourceProfiles.size();
Preconditions.checkArgument(0 < numberSlots, "The number of task slots must be greater than 0.");
List<TaskSlot> taskSlots = new ArrayList<>(numberSlots);
int index = 0;
for (ResourceProfile resourceProfile: resourceProfiles) {
taskSlots.add(new TaskSlot(index, resourceProfile));
++index;
}
return taskSlots;
private static List<TaskSlot> createTaskSlotsFromResources(
List<ResourceProfile> resourceProfiles,
int memoryPageSize) {
return IntStream
.range(0, resourceProfiles.size())
.mapToObj(index -> new TaskSlot(index, resourceProfiles.get(index), memoryPageSize))
.collect(Collectors.toList());
}

private static ShuffleEnvironment<?, ?> createShuffleEnvironment(
Expand All @@ -343,12 +336,12 @@ private static List<TaskSlot> createTaskSlotsFromResources(Collection<ResourcePr
}

/**
* Creates a {@link MemoryManager} from the given {@link TaskManagerServicesConfiguration}.
* Computes memory size for each {@link MemoryType} from the given {@link TaskManagerServicesConfiguration}.
*
* @param taskManagerServicesConfiguration to create the memory manager from
* @return Memory manager
* @return map of {@link MemoryType} (heap/off-heap) to its size
*/
private static MemoryManager createMemoryManager(
private static Map<MemoryType, Long> calculateMemorySizeByType(
TaskManagerServicesConfiguration taskManagerServicesConfiguration) {
// computing the amount of memory to use depends on how much memory is available
// it strictly needs to happen AFTER the network stack has been initialized
Expand Down Expand Up @@ -386,12 +379,7 @@ private static MemoryManager createMemoryManager(
throw new RuntimeException("No supported memory type detected.");
}
}

// now start the memory manager
return new MemoryManager(
Collections.singletonMap(memType, memorySize),
taskManagerServicesConfiguration.getNumberOfSlots(),
taskManagerServicesConfiguration.getPageSize());
return Collections.singletonMap(memType, memorySize);
}

/**
Expand Down Expand Up @@ -511,14 +499,17 @@ private static void checkTempDirs(String[] tmpDirs) throws IOException {
}

public static ResourceProfile computeSlotResourceProfile(int numOfSlots, long managedMemorySize) {
int managedMemoryPerSlotMB = (int) bytesToMegabytes(managedMemorySize / numOfSlots);
// TODO: before operators separate on-heap/off-heap managed memory, we use on-heap managed memory to denote total managed memory
return computeSlotResourceProfile(numOfSlots, Collections.singletonMap(MemoryType.HEAP, managedMemorySize));
}

private static ResourceProfile computeSlotResourceProfile(int numOfSlots, Map<MemoryType, Long> memorySizeByType) {
return new ResourceProfile(
Double.MAX_VALUE,
MemorySize.MAX_VALUE,
MemorySize.MAX_VALUE,
// TODO: before operators separate on-heap/off-heap managed memory, we use on-heap managed memory to denote total managed memory
MemorySize.parse(managedMemoryPerSlotMB + "m"),
MemorySize.MAX_VALUE,
new MemorySize(memorySizeByType.getOrDefault(MemoryType.HEAP, 0L) / numOfSlots),
new MemorySize(memorySizeByType.getOrDefault(MemoryType.OFF_HEAP, 0L) / numOfSlots),
MemorySize.MAX_VALUE,
Collections.emptyMap());
}
Expand Down
Expand Up @@ -19,12 +19,18 @@
package org.apache.flink.runtime.taskexecutor.slot;

import org.apache.flink.api.common.JobID;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.EnumMap;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
Expand All @@ -48,7 +54,8 @@
* <p>An allocated or active slot can only be freed if it is empty. If it is not empty, then it's state
* can be set to releasing indicating that it can be freed once it becomes empty.
*/
public class TaskSlot {
public class TaskSlot implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(TaskSlot.class);

/** Index of the task slot. */
private final int index;
Expand All @@ -59,6 +66,8 @@ public class TaskSlot {
/** Tasks running in this slot. */
private final Map<ExecutionAttemptID, Task> tasks;

private final MemoryManager memoryManager;

/** State of this slot. */
private TaskSlotState state;

Expand All @@ -68,7 +77,7 @@ public class TaskSlot {
/** Allocation id of this slot; null if not allocated. */
private AllocationID allocationId;

public TaskSlot(final int index, final ResourceProfile resourceProfile) {
public TaskSlot(final int index, final ResourceProfile resourceProfile, final int memoryPageSize) {
Preconditions.checkArgument(0 <= index, "The index must be greater than 0.");
this.index = index;
this.resourceProfile = Preconditions.checkNotNull(resourceProfile);
Expand All @@ -78,6 +87,8 @@ public TaskSlot(final int index, final ResourceProfile resourceProfile) {

this.jobId = null;
this.allocationId = null;

this.memoryManager = createMemoryManager(resourceProfile, memoryPageSize);
}

// ----------------------------------------------------------------------------------
Expand Down Expand Up @@ -142,6 +153,10 @@ public Iterator<Task> getTasks() {
return tasks.values().iterator();
}

public MemoryManager getMemoryManager() {
return memoryManager;
}

// ----------------------------------------------------------------------------------
// State changing methods
// ----------------------------------------------------------------------------------
Expand Down Expand Up @@ -268,6 +283,7 @@ public boolean markInactive() {
public boolean markFree() {
if (isEmpty()) {
state = TaskSlotState.FREE;
verifyMemoryFreed();
this.jobId = null;
this.allocationId = null;

Expand Down Expand Up @@ -305,4 +321,23 @@ public String toString() {
return "TaskSlot(index:" + index + ", state:" + state + ", resource profile: " + resourceProfile +
", allocationId: " + (allocationId != null ? allocationId.toString() : "none") + ", jobId: " + (jobId != null ? jobId.toString() : "none") + ')';
}

@Override
public void close() {
verifyMemoryFreed();
this.memoryManager.shutdown();
}

private void verifyMemoryFreed() {
if (!memoryManager.verifyEmpty()) {
LOG.warn("Not all slot memory is freed, potential memory leak at {}", this);
}
}

private static MemoryManager createMemoryManager(ResourceProfile resourceProfile, int pageSize) {
Map<MemoryType, Long> memorySizeByType = new EnumMap<>(MemoryType.class);
memorySizeByType.put(MemoryType.HEAP, resourceProfile.getOnHeapManagedMemory().getBytes());
memorySizeByType.put(MemoryType.OFF_HEAP, resourceProfile.getOffHeapManagedMemory().getBytes());
return new MemoryManager(memorySizeByType, pageSize);
}
}

0 comments on commit 9fed0dd

Please sign in to comment.