Skip to content

Commit

Permalink
Async Destination V0 - Split up BufferManager (#26331)
Browse files Browse the repository at this point in the history
Follow up to #26324 - here we split up the BufferManager and add tests and comments.

- Split up the buffer manager class into -> BufferManager, BufferEnqueue and BufferDequeue.
- Move all buffer related code to the buffers package.
- Rename test classes to match this split.
- Add java docs and tests as part of this split.
- Simplify the BufferDequeue interface to return a set streams representing the buffered streams instead of the underlying map of buffers. This lets us keep the memory queue package private.
- all getYMethods now return Optionals for better error handling. This would have resulted in NPEs previously.
  • Loading branch information
davinchia committed May 22, 2023
1 parent e6126c6 commit 988ce24
Show file tree
Hide file tree
Showing 13 changed files with 559 additions and 328 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,11 @@

package io.airbyte.integrations.destination_async;

import static io.airbyte.integrations.destination_async.BufferManager.QUEUE_FLUSH_THRESHOLD;
import static io.airbyte.integrations.destination_async.BufferManager.TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES;

import io.airbyte.integrations.destination_async.buffers.BufferDequeue;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -41,18 +38,20 @@
@Slf4j
public class FlushWorkers implements AutoCloseable {

public static final long TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES = (long) (Runtime.getRuntime().maxMemory() * 0.8);
public static final long QUEUE_FLUSH_THRESHOLD = 10 * 1024 * 1024; // 10MB
private static final long MAX_TIME_BETWEEN_REC_MINS = 5L;
private static final long SUPERVISOR_INITIAL_DELAY_SECS = 0L;
private static final long SUPERVISOR_PERIOD_SECS = 1L;
private static final long DEBUG_INITIAL_DELAY_SECS = 0L;
private static final long DEBUG_PERIOD_SECS = 10L;
private final ScheduledExecutorService supervisorThread = Executors.newScheduledThreadPool(1);
private final ExecutorService workerPool = Executors.newFixedThreadPool(5);
private final BufferManager.BufferManagerDequeue bufferManagerDequeue;
private final BufferDequeue bufferManagerDequeue;
private final DestinationFlushFunction flusher;
private final ScheduledExecutorService debugLoop = Executors.newSingleThreadScheduledExecutor();

public FlushWorkers(final BufferManager.BufferManagerDequeue bufferManagerDequeue, final DestinationFlushFunction flushFunction) {
public FlushWorkers(final BufferDequeue bufferManagerDequeue, final DestinationFlushFunction flushFunction) {
this.bufferManagerDequeue = bufferManagerDequeue;
flusher = flushFunction;
}
Expand Down Expand Up @@ -88,9 +87,8 @@ private void retrieveWork() {
// todo (cgardens) - build a score to prioritize which queue to flush next. e.g. if a queue is very
// large, flush it first. if a queue has not been flushed in a while, flush it next.
// otherwise, if each individual stream has crossed a specific threshold, flush
for (final Map.Entry<StreamDescriptor, MemoryBoundedLinkedBlockingQueue<AirbyteMessage>> entry : bufferManagerDequeue.getBuffers().entrySet()) {
final var stream = entry.getKey();
final var exceedSize = bufferManagerDequeue.getQueueSizeBytes(stream) >= QUEUE_FLUSH_THRESHOLD;
for (final StreamDescriptor stream : bufferManagerDequeue.getBufferedStreams()) {
final var exceedSize = bufferManagerDequeue.getQueueSizeBytes(stream).get() >= QUEUE_FLUSH_THRESHOLD;
final var tooLongSinceLastRecord = bufferManagerDequeue.getTimeOfLastRecord(stream)
.map(time -> time.isBefore(Instant.now().minus(MAX_TIME_BETWEEN_REC_MINS, ChronoUnit.MINUTES)))
.orElse(false);
Expand All @@ -115,7 +113,7 @@ private void printWorkerInfo() {

private void flushAll() {
log.info("Flushing all buffers..");
for (final StreamDescriptor desc : bufferManagerDequeue.getBuffers().keySet()) {
for (final StreamDescriptor desc : bufferManagerDequeue.getBufferedStreams()) {
flush(desc);
}
}
Expand All @@ -124,7 +122,7 @@ private void flush(final StreamDescriptor desc) {
workerPool.submit(() -> {
log.info("Worker picked up work..");
try {
log.info("Attempting to read from queue {}. Current queue size: {}", desc, bufferManagerDequeue.getQueueSizeInRecords(desc));
log.info("Attempting to read from queue {}. Current queue size: {}", desc, bufferManagerDequeue.getQueueSizeInRecords(desc).get());

try (final var batch = bufferManagerDequeue.take(desc, flusher.getOptimalBatchSizeBytes())) {
flusher.flush(desc, batch.getData());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,35 @@
import java.util.concurrent.atomic.AtomicLong;
import lombok.extern.slf4j.Slf4j;

/**
* Responsible for managing global memory across multiple queues in a thread-safe way.
* <p>
* This means memory allocation and deallocation for each queue can be dynamically adjusted
* according to the overall available memory. Memory blocks are managed in chunks of
* {@link #BLOCK_SIZE_BYTES}, and the total amount of memory managed is configured at creation time.
* <p>
* As a destination has no information about incoming per-stream records, having static non-global
* queue sizes can cause unnecessary backpressure on a per-stream basis. By providing a dynamic,
* global view of memory management, this class allows each queue to free and consume memory
* dynamically, enabling effective sharing of global memory resources across all the queues. and
* avoiding accidental stream backpressure.
* <p>
* This becomes particularly useful in the following scenarios:
* <ul>
* <li>1. When the incoming records belong to a single stream. Dynamic allocation ensures this one
* stream can utilise all memory.</li>
* <li>2. When the incoming records are from multiple streams, such as with Change Data Capture
* (CDC). Here, dynamic allocation let us create as many queues as possible, allowing all streams to
* be processed in parallel without accidental backpressure from unnecessary eager flushing.</li>
* </ul>
*/
@Slf4j
public class GlobalMemoryManager {

private static final long BLOCK_SIZE_BYTES = 10 * 1024 * 1024; // 10MB
// In cases where a queue is rapidly expanding, a larger block size allows less allocation calls. On
// the flip size, a smaller block size allows more granular memory management. Since this overhead
// is minimal for now, err on a smaller block sizes.
public static final long BLOCK_SIZE_BYTES = 10 * 1024 * 1024; // 10MB
private final long maxMemoryBytes;

private final AtomicLong currentMemoryBytes = new AtomicLong(0);
Expand All @@ -27,7 +52,13 @@ public long getCurrentMemoryBytes() {
return currentMemoryBytes.get();
}

/**
* Requests a block of memory of {@link #BLOCK_SIZE_BYTES}. Return 0 if memory cannot be freed.
*
* @return the size of the allocated block, in bytes
*/
public synchronized long requestMemory() {
// todo(davin): what happens if the incoming record is larger than 10MB?
if (currentMemoryBytes.get() >= maxMemoryBytes) {
return 0L;
}
Expand All @@ -40,6 +71,12 @@ public synchronized long requestMemory() {
return toAllocateBytes;
}

/**
* Frees a block of memory of the given size. If the amount of memory freed exceeds the current
* memory allocation, a warning will be logged.
*
* @param bytes the size of the block to free, in bytes
*/
public void free(final long bytes) {
currentMemoryBytes.addAndGet(-bytes);

Expand Down

0 comments on commit 988ce24

Please sign in to comment.