Skip to content

Commit

Permalink
Async Code V0: Flush Worker Improvements. (#26384)
Browse files Browse the repository at this point in the history
Incorporate the changes from #26178 .

- Update queue method from setMaxMemory to addMaxMemory.
- Update work retrieval logic to only assign work
  - if there are free worker threads
  - to account for in-progress worker threads
  • Loading branch information
davinchia committed May 23, 2023
1 parent d261259 commit 1ed32e5
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@
import io.airbyte.protocol.models.v0.StreamDescriptor;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;

/**
* Parallel flushing of Destination data.
Expand All @@ -39,7 +42,7 @@
public class FlushWorkers implements AutoCloseable {

public static final long TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES = (long) (Runtime.getRuntime().maxMemory() * 0.8);
public static final long QUEUE_FLUSH_THRESHOLD = 10 * 1024 * 1024; // 10MB
private static final long QUEUE_FLUSH_THRESHOLD_BYTES = 10 * 1024 * 1024; // 10MB
private static final long MAX_TIME_BETWEEN_REC_MINS = 5L;
private static final long SUPERVISOR_INITIAL_DELAY_SECS = 0L;
private static final long SUPERVISOR_PERIOD_SECS = 1L;
Expand All @@ -50,6 +53,7 @@ public class FlushWorkers implements AutoCloseable {
private final BufferDequeue bufferDequeue;
private final DestinationFlushFunction flusher;
private final ScheduledExecutorService debugLoop = Executors.newSingleThreadScheduledExecutor();
private final ConcurrentHashMap<StreamDescriptor, AtomicInteger> streamToInProgressWorkers = new ConcurrentHashMap<>();

public FlushWorkers(final BufferDequeue bufferDequeue, final DestinationFlushFunction flushFunction) {
this.bufferDequeue = bufferDequeue;
Expand All @@ -74,6 +78,8 @@ public void close() throws Exception {
workerPool.shutdown();
final var workersShut = workerPool.awaitTermination(5L, TimeUnit.MINUTES);
log.info("Workers shut status: {}", workersShut);

debugLoop.shutdownNow();
}

private void retrieveWork() {
Expand All @@ -82,17 +88,42 @@ private void retrieveWork() {
// if the total size is > n, flush all buffers
if (bufferDequeue.getTotalGlobalQueueSizeBytes() > TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES) {
flushAll();
return;
}

final ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) workerPool;
var allocatableThreads = threadPoolExecutor.getMaximumPoolSize() - threadPoolExecutor.getActiveCount();

// todo (cgardens) - build a score to prioritize which queue to flush next. e.g. if a queue is very
// large, flush it first. if a queue has not been flushed in a while, flush it next.
// otherwise, if each individual stream has crossed a specific threshold, flush
for (final StreamDescriptor stream : bufferDequeue.getBufferedStreams()) {
final var exceedSize = bufferDequeue.getQueueSizeBytes(stream).get() >= QUEUE_FLUSH_THRESHOLD;
if (allocatableThreads == 0) {
break;
}

// while we allow out-of-order processing for speed improvements via multiple workers reading from
// the same queue, also avoid scheduling more workers than what is already in progress.
final var inProgressSizeByte = (bufferDequeue.getQueueSizeBytes(stream).get() -
streamToInProgressWorkers.getOrDefault(stream, new AtomicInteger(0)).get() * QUEUE_FLUSH_THRESHOLD_BYTES);
final var exceedSize = inProgressSizeByte >= QUEUE_FLUSH_THRESHOLD_BYTES;
final var tooLongSinceLastRecord = bufferDequeue.getTimeOfLastRecord(stream)
.map(time -> time.isBefore(Instant.now().minus(MAX_TIME_BETWEEN_REC_MINS, ChronoUnit.MINUTES)))
.orElse(false);

if (exceedSize || tooLongSinceLastRecord) {
log.info(
"Allocated stream {}, exceedSize:{}, tooLongSinceLastRecord: {}, bytes in queue: {} computed in-progress bytes: {} , threshold bytes: {}",
stream.getName(), exceedSize, tooLongSinceLastRecord,
FileUtils.byteCountToDisplaySize(bufferDequeue.getQueueSizeBytes(stream).get()),
FileUtils.byteCountToDisplaySize(inProgressSizeByte),
FileUtils.byteCountToDisplaySize(QUEUE_FLUSH_THRESHOLD_BYTES));
allocatableThreads--;
if (streamToInProgressWorkers.containsKey(stream)) {
streamToInProgressWorkers.get(stream).getAndAdd(1);
} else {
streamToInProgressWorkers.put(stream, new AtomicInteger(1));
}
flush(stream);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public synchronized long requestMemory() {
* @param bytes the size of the block to free, in bytes
*/
public void free(final long bytes) {
log.info("Freeing {} bytes..", bytes);
currentMemoryBytes.addAndGet(-bytes);

if (currentMemoryBytes.get() < 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public MemoryAwareMessageBatch take(final StreamDescriptor streamDescriptor, fin

final var s = Stream.generate(() -> {
try {
return queue.poll(5, TimeUnit.MILLISECONDS);
return queue.poll(20, TimeUnit.MILLISECONDS);
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
Expand All @@ -81,9 +81,7 @@ public MemoryAwareMessageBatch take(final StreamDescriptor streamDescriptor, fin
.toList()
.stream();

// todo (cgardens) - possible race where in between pulling records and new records going in that we
// reset the limit to be lower than number of bytes already in the queue. probably not a big deal.
queue.setMaxMemoryUsage(queue.getMaxMemoryUsage() - bytesRead.get());
queue.addMaxMemory(-bytesRead.get());

return new MemoryAwareMessageBatch(s, bytesRead.get(), memoryManager);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ public BufferEnqueue(final long initialQueueSizeBytes,
*/
public void addRecord(final StreamDescriptor streamDescriptor, final AirbyteMessage message) {
if (!buffers.containsKey(streamDescriptor)) {
System.out.println("putting things in");
buffers.put(streamDescriptor, new MemoryBoundedLinkedBlockingQueue<>(initialQueueSizeBytes));
buffers.put(streamDescriptor, new MemoryBoundedLinkedBlockingQueue<>(memoryManager.requestMemory()));
}

// todo (cgardens) - handle estimating state message size.
Expand All @@ -60,9 +59,9 @@ public void addRecord(final StreamDescriptor streamDescriptor, final AirbyteMess
// todo (cgardens) - what if the record being added is bigger than the block size?
// if failed, try to increase memory and add to queue.
while (!addedToQueue) {
final var freeMem = memoryManager.requestMemory();
if (freeMem > 0) {
queue.setMaxMemoryUsage(queue.getMaxMemoryUsage() + freeMem);
final var newlyAllocatedMemory = memoryManager.requestMemory();
if (newlyAllocatedMemory > 0) {
queue.addMaxMemory(newlyAllocatedMemory);
}
addedToQueue = queue.offer(message, messageSize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -33,8 +34,8 @@ public long getMaxMemoryUsage() {
return maxMemoryUsage.get();
}

public void setMaxMemoryUsage(final long maxMemoryUsage) {
this.maxMemoryUsage.set(maxMemoryUsage);
public void addMaxMemory(final long maxMemoryUsage) {
this.maxMemoryUsage.addAndGet(maxMemoryUsage);
}

public Optional<Instant> getTimeOfLastMessage() {
Expand Down Expand Up @@ -80,6 +81,16 @@ public MemoryBoundedLinkedBlockingQueue.MemoryItem<E> poll() {
return null;
}

@Override
public MemoryBoundedLinkedBlockingQueue.MemoryItem<E> poll(final long timeout, final TimeUnit unit) throws InterruptedException {
final MemoryItem<E> memoryItem = super.poll(timeout, unit);
if (memoryItem != null) {
currentMemoryUsage.addAndGet(-memoryItem.size());
return memoryItem;
}
return null;
}

public record MemoryItem<E> (E item, long size) {}

}

0 comments on commit 1ed32e5

Please sign in to comment.