Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ public void finish() throws IOException {

@Override
public void close() {
storageMemoryManager.release();
super.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,16 @@ private BufferAccumulator createBufferAccumulator(
List<TieredStorageMemorySpec> tieredStorageMemorySpecs = new ArrayList<>();

tieredStorageMemorySpecs.add(
// Accumulators are also treated as {@code guaranteedReclaimable}, since these
// buffers can always be transferred to the other tiers.
new TieredStorageMemorySpec(
bufferAccumulator,
2
* Math.min(
numberOfSubpartitions + 1,
tieredStorageConfiguration
.getAccumulatorExclusiveBuffers())));
.getAccumulatorExclusiveBuffers()),
true));
List<Integer> tierExclusiveBuffers =
tieredStorageConfiguration.getEachTierExclusiveBufferNum();

Expand All @@ -208,8 +211,16 @@ private BufferAccumulator createBufferAccumulator(
numberOfSubpartitions),
tieredStorageConfiguration.getDiskIOSchedulerBufferRequestTimeout());
tierProducerAgents.add(producerAgent);
tieredStorageMemorySpecs.add(
new TieredStorageMemorySpec(producerAgent, tierExclusiveBuffers.get(index)));

if (tierFactory.getClass() == MemoryTierFactory.class) {
tieredStorageMemorySpecs.add(
new TieredStorageMemorySpec(
producerAgent, tierExclusiveBuffers.get(index), false));
} else {
tieredStorageMemorySpecs.add(
new TieredStorageMemorySpec(
producerAgent, tierExclusiveBuffers.get(index), true));
}
}
return Tuple2.of(tierProducerAgents, tieredStorageMemorySpecs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@
* can request. Instead, it only simply provides memory usage hints to memory users. It is very
* <b>important</b> to note that <b>only</b> users with non-reclaimable should check the memory
* hints by calling {@code getMaxNonReclaimableBuffers} before requesting buffers.
*
* <p>The {@link TieredStorageMemoryManager} needs to ensure that it would not hinder reclaimable
* users from acquiring buffers due to non-reclaimable users not releasing the buffers they have
* requested. So it is very <b>important</b> to note that <b>only</b> users with non-reclaimable
* should call {@code ensureCapacity} before requesting buffers to reserve enough buffers.
*/
public interface TieredStorageMemoryManager {

Expand Down Expand Up @@ -74,11 +79,11 @@ public interface TieredStorageMemoryManager {
void listenBufferReclaimRequest(Runnable onBufferReclaimRequest);

/**
* Request a {@link BufferBuilder} instance from {@link BufferPool} for a specific owner. The
* {@link TieredStorageMemoryManagerImpl} will not check whether a buffer can be requested. The
* manager only records the number of requested buffers. If the buffers in the {@link
* BufferPool} is not enough, the manager will request each tiered storage to reclaim their
* requested buffers as much as possible.
* Request a {@link BufferBuilder} instance for a specific owner. The {@link
* TieredStorageMemoryManagerImpl} will not check whether a buffer can be requested. The manager
* only records the number of requested buffers. If the buffers is not enough to meet the
* request, the manager will request each tiered storage to reclaim their requested buffers as
* much as possible.
*
* <p>This is not thread safe and is expected to be called only from the task thread.
*
Expand All @@ -101,6 +106,21 @@ public interface TieredStorageMemoryManager {
*/
int getMaxNonReclaimableBuffers(Object owner);

/**
* Try best to reserve enough buffers that are guaranteed reclaimable along with the additional
* ones.
*
* <p>Note that the available buffers are calculated dynamically based on some conditions, for
* example, the state of the {@link BufferPool}, the {@link TieredStorageMemorySpec} of the
* owner, etc. So the caller should always ensure capacity before requesting non-reclaimable
* buffers.
*
* @param numAdditionalBuffers the number of buffers that need to also be reserved in addition
* to guaranteed reclaimable buffers.
* @return True if the capacity meets the requirements, false otherwise.
*/
boolean ensureCapacity(int numAdditionalBuffers);

/**
* Return the number of requested buffers belonging to a specific owner.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,23 @@
import org.apache.flink.shaded.guava31.com.google.common.util.concurrent.ThreadFactoryBuilder;

import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
Expand Down Expand Up @@ -91,6 +96,18 @@ public class TieredStorageMemoryManagerImpl implements TieredStorageMemoryManage
*/
private final Map<Object, Integer> numOwnerRequestedBuffers;

/**
* The queue that contains all available buffers. This field should be thread-safe because it
* can be touched both by the task thread and the netty thread.
*/
private final BlockingQueue<MemorySegment> bufferQueue;

/** The lock guarding concurrency issues during releasing. */
private final ReadWriteLock releasedStateLock;

/** The number of buffers that are guaranteed to be reclaimed. */
private int numGuaranteedReclaimableBuffers;

/**
* Time gauge to measure that hard backpressure time. Pre-create it to avoid checkNotNull in
* hot-path for performance purpose.
Expand All @@ -108,14 +125,23 @@ public class TieredStorageMemoryManagerImpl implements TieredStorageMemoryManage
private BufferPool bufferPool;

/**
* Indicate whether the {@link TieredStorageMemoryManagerImpl} is initialized. Before setting
* Indicates whether the {@link TieredStorageMemoryManagerImpl} is initialized. Before setting
* up, this field is false.
*
* <p>Note that before requesting buffers or getting the maximum allowed buffers, this
* initialized state should be checked.
*/
private boolean isInitialized;

/**
* Indicates whether the {@link TieredStorageMemoryManagerImpl} is released.
*
* <p>Note that before recycling buffers, this released state should be checked to determine
* whether to recycle the buffer back to the internal queue or to the buffer pool.
*/
@GuardedBy("readWriteLock")
private boolean isReleased;

/**
* The constructor of the {@link TieredStorageMemoryManagerImpl}.
*
Expand All @@ -131,6 +157,9 @@ public TieredStorageMemoryManagerImpl(
this.numRequestedBuffers = new AtomicInteger(0);
this.numOwnerRequestedBuffers = new ConcurrentHashMap<>();
this.bufferReclaimRequestListeners = new ArrayList<>();
this.bufferQueue = new LinkedBlockingQueue<>();
this.releasedStateLock = new ReentrantReadWriteLock();
this.isReleased = false;
this.isInitialized = false;
}

Expand All @@ -142,6 +171,8 @@ public void setup(BufferPool bufferPool, List<TieredStorageMemorySpec> storageMe
!tieredMemorySpecs.containsKey(memorySpec.getOwner()),
"Duplicated memory spec.");
tieredMemorySpecs.put(memorySpec.getOwner(), memorySpec);
numGuaranteedReclaimableBuffers +=
memorySpec.isGuaranteedReclaimable() ? memorySpec.getNumGuaranteedBuffers() : 0;
}

if (mayReclaimBuffer) {
Expand Down Expand Up @@ -173,22 +204,13 @@ public BufferBuilder requestBufferBlocking(Object owner) {

reclaimBuffersIfNeeded(0);

CompletableFuture<Void> requestBufferFuture = new CompletableFuture<>();
scheduleCheckRequestBufferFuture(
requestBufferFuture, INITIAL_REQUEST_BUFFER_TIMEOUT_FOR_RECLAIMING_MS);
MemorySegment memorySegment = bufferPool.requestMemorySegment();

MemorySegment memorySegment = bufferQueue.poll();
if (memorySegment == null) {
try {
hardBackpressureTimerGauge.markStart();
memorySegment = bufferPool.requestMemorySegmentBlocking();
hardBackpressureTimerGauge.markEnd();
} catch (InterruptedException e) {
ExceptionUtils.rethrow(e);
}
memorySegment = requestBufferBlockingFromPool();
}
if (memorySegment == null) {
memorySegment = checkNotNull(requestBufferBlockingFromQueue());
}

requestBufferFuture.complete(null);

incNumRequestedBuffer(owner);
return new BufferBuilder(
Expand Down Expand Up @@ -218,6 +240,31 @@ public int getMaxNonReclaimableBuffers(Object owner) {
return bufferPool.getNumBuffers() - numBuffersUsedOrReservedForOtherOwners;
}

@Override
public boolean ensureCapacity(int numAdditionalBuffers) {
checkIsInitialized();

final int numRequestedByGuaranteedReclaimableOwners =
tieredMemorySpecs.values().stream()
.filter(TieredStorageMemorySpec::isGuaranteedReclaimable)
.mapToInt(spec -> numOwnerRequestedBuffer(spec.getOwner()))
.sum();

while (bufferQueue.size() + numRequestedByGuaranteedReclaimableOwners
< numGuaranteedReclaimableBuffers + numAdditionalBuffers) {
if (numRequestedBuffers.get() >= bufferPool.getNumBuffers()) {
return false;
}

MemorySegment memorySegment = requestBufferBlockingFromPool();
if (memorySegment == null) {
return false;
}
bufferQueue.add(memorySegment);
}
return true;
}

@Override
public int numOwnerRequestedBuffer(Object owner) {
return numOwnerRequestedBuffers.getOrDefault(owner, 0);
Expand All @@ -233,6 +280,12 @@ public void transferBufferOwnership(Object oldOwner, Object newOwner, Buffer buf

@Override
public void release() {
try {
releasedStateLock.writeLock().lock();
isReleased = true;
} finally {
releasedStateLock.writeLock().unlock();
}
if (executor != null) {
executor.shutdown();
try {
Expand All @@ -244,6 +297,59 @@ public void release() {
ExceptionUtils.rethrow(e);
}
}
while (!bufferQueue.isEmpty()) {
MemorySegment segment = bufferQueue.poll();
bufferPool.recycle(segment);
numRequestedBuffers.decrementAndGet();
}
}

/**
* @return a memory segment from the buffer pool or null if the memory manager has requested all
* segments of the buffer pool.
*/
@Nullable
private MemorySegment requestBufferBlockingFromPool() {
MemorySegment memorySegment = null;

hardBackpressureTimerGauge.markStart();
while (numRequestedBuffers.get() < bufferPool.getNumBuffers()) {
memorySegment = bufferPool.requestMemorySegment();
if (memorySegment == null) {
try {
// Wait until a buffer is available or timeout before entering the next loop
// iteration.
bufferPool.getAvailableFuture().get(100, TimeUnit.MILLISECONDS);
} catch (TimeoutException ignored) {
} catch (Exception e) {
ExceptionUtils.rethrow(e);
}
} else {
numRequestedBuffers.incrementAndGet();
break;
}
}
hardBackpressureTimerGauge.markEnd();

return memorySegment;
}

/** @return a memory segment from the internal buffer queue. */
private MemorySegment requestBufferBlockingFromQueue() {
CompletableFuture<Void> requestBufferFuture = new CompletableFuture<>();
scheduleCheckRequestBufferFuture(
requestBufferFuture, INITIAL_REQUEST_BUFFER_TIMEOUT_FOR_RECLAIMING_MS);

MemorySegment memorySegment = null;
try {
memorySegment = bufferQueue.take();
} catch (InterruptedException e) {
ExceptionUtils.rethrow(e);
} finally {
requestBufferFuture.complete(null);
}

return memorySegment;
}

private void scheduleCheckRequestBufferFuture(
Expand Down Expand Up @@ -272,13 +378,11 @@ private void internalCheckRequestBufferFuture(
private void incNumRequestedBuffer(Object owner) {
numOwnerRequestedBuffers.compute(
owner, (ignore, numRequested) -> numRequested == null ? 1 : numRequested + 1);
numRequestedBuffers.incrementAndGet();
}

private void decNumRequestedBuffer(Object owner) {
numOwnerRequestedBuffers.compute(
owner, (ignore, numRequested) -> checkNotNull(numRequested) - 1);
numRequestedBuffers.decrementAndGet();
}

private void reclaimBuffersIfNeeded(long delayForNextCheckMs) {
Expand All @@ -293,17 +397,28 @@ private boolean shouldReclaimBuffersBeforeRequesting(long delayForNextCheckMs) {
// next iteration, the buffer reclaim will eventually be triggered.
int numTotal = bufferPool.getNumBuffers();
int numRequested = numRequestedBuffers.get();
return numRequested >= numTotal
// Because we do the checking before requesting buffers, we need add additional one
// buffer when calculating the usage ratio.
|| ((numRequested + 1) * 1.0 / numTotal) > numTriggerReclaimBuffersRatio

// Because we do the checking before requesting buffers, we need add additional one
// buffer when calculating the usage ratio.
return (numRequested + 1 - bufferQueue.size()) * 1.0 / numTotal
> numTriggerReclaimBuffersRatio
|| delayForNextCheckMs > MAX_DELAY_TIME_TO_TRIGGER_RECLAIM_BUFFER_MS
&& bufferPool.getNumberOfAvailableMemorySegments() == 0;
&& bufferQueue.size() == 0;
}

/** Note that this method may be called by the netty thread. */
private void recycleBuffer(Object owner, MemorySegment buffer) {
bufferPool.recycle(buffer);
try {
releasedStateLock.readLock().lock();
if (!isReleased && numRequestedBuffers.get() <= bufferPool.getNumBuffers()) {
bufferQueue.add(buffer);
} else {
bufferPool.recycle(buffer);
numRequestedBuffers.decrementAndGet();
}
} finally {
releasedStateLock.readLock().unlock();
}
decNumRequestedBuffer(owner);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,21 @@ public class TieredStorageMemorySpec {
/** The number of guaranteed buffers of this memory owner. */
private final int numGuaranteedBuffers;

/**
* Whether the buffers of this owner are guaranteed to be reclaimed, even if the downstream does
* not consume them promptly.
*/
private final boolean guaranteedReclaimable;

public TieredStorageMemorySpec(Object owner, int numGuaranteedBuffers) {
this(owner, numGuaranteedBuffers, true);
}

public TieredStorageMemorySpec(
Object owner, int numGuaranteedBuffers, boolean guaranteedReclaimable) {
this.owner = owner;
this.numGuaranteedBuffers = numGuaranteedBuffers;
this.guaranteedReclaimable = guaranteedReclaimable;
}

public Object getOwner() {
Expand All @@ -42,4 +54,8 @@ public Object getOwner() {
public int getNumGuaranteedBuffers() {
return numGuaranteedBuffers;
}

public boolean isGuaranteedReclaimable() {
return guaranteedReclaimable;
}
}
Loading