From 1a9cc709db699604fe414be43e3a597143491ad7 Mon Sep 17 00:00:00 2001 From: Daniel Krawczyk Date: Mon, 10 Jun 2019 10:41:30 +0200 Subject: [PATCH] Full drain MpscQueue (#1032) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Piotr Rżysko --- .../tech/hermes/common/config/Configs.java | 1 + .../consumer/offset/OffsetCommitter.java | 1 - .../consumer/offset/OffsetQueue.java | 13 +++-- .../consumers/queue/FullDrainMpscQueue.java | 55 +++++++++++++++++++ .../consumers/queue/MonitoredMpscQueue.java | 21 +++++-- .../hermes/consumers/queue/MpscQueue.java | 14 +++++ .../queue/WaitFreeDrainMpscQueue.java | 33 +++++++++++ .../process/ConsumerProcessSupervisor.java | 8 ++- .../supervisor/process/SignalsFilter.java | 6 +- .../process/SignalsFilterTest.groovy | 6 +- .../queue/FullDrainMpscQueueTest.java | 9 +++ .../queue/MpscQueuesAbstractTest.java | 51 +++++++++++++++++ .../queue/WaitFreeDrainMpscQueueTest.java | 9 +++ .../src/test/resources/config.properties | 1 + 14 files changed, 209 insertions(+), 19 deletions(-) create mode 100644 hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/queue/FullDrainMpscQueue.java create mode 100644 hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/queue/MpscQueue.java create mode 100644 hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/queue/WaitFreeDrainMpscQueue.java create mode 100644 hermes-consumers/src/test/java/pl/allegro/tech/hermes/consumers/queue/FullDrainMpscQueueTest.java create mode 100644 hermes-consumers/src/test/java/pl/allegro/tech/hermes/consumers/queue/MpscQueuesAbstractTest.java create mode 100644 hermes-consumers/src/test/java/pl/allegro/tech/hermes/consumers/queue/WaitFreeDrainMpscQueueTest.java diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/config/Configs.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/config/Configs.java index 0868eb9893..31830bc1ae 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/config/Configs.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/config/Configs.java @@ -173,6 +173,7 @@ public enum Configs { CONSUMER_COMMIT_OFFSET_PERIOD("consumer.commit.offset.period", 60), CONSUMER_COMMIT_OFFSET_QUEUES_SIZE("consumer.commit.offset.queues.size", 200_000), + CONSUMER_COMMIT_OFFSET_QUEUES_INFLIGHT_DRAIN_FULL("consumer.commit.offset.queues.inflight.drain.full", false), CONSUMER_SENDER_ASYNC_TIMEOUT_MS("consumer.sender.async.timeout.ms", 5_000), CONSUMER_SENDER_ASYNC_TIMEOUT_THREAD_POOL_SIZE("consumer.sender.async.timeout.thread.pool.size", 32), diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/offset/OffsetCommitter.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/offset/OffsetCommitter.java index ae45f00f80..5b9147a074 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/offset/OffsetCommitter.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/offset/OffsetCommitter.java @@ -18,7 +18,6 @@ import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; import java.util.function.Function; diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/offset/OffsetQueue.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/offset/OffsetQueue.java index 7609257616..311f82cb9a 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/offset/OffsetQueue.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/offset/OffsetQueue.java @@ -4,22 +4,27 @@ import pl.allegro.tech.hermes.common.config.ConfigFactory; import pl.allegro.tech.hermes.common.config.Configs; import pl.allegro.tech.hermes.common.metric.HermesMetrics; +import pl.allegro.tech.hermes.consumers.queue.FullDrainMpscQueue; +import pl.allegro.tech.hermes.consumers.queue.MpscQueue; +import pl.allegro.tech.hermes.consumers.queue.WaitFreeDrainMpscQueue; import pl.allegro.tech.hermes.consumers.queue.MonitoredMpscQueue; import javax.inject.Inject; public class OffsetQueue { - private final MonitoredMpscQueue inflightOffsetsQueue; + private final MpscQueue inflightOffsetsQueue; - private final MonitoredMpscQueue commitOffsetsQueue; + private final MpscQueue commitOffsetsQueue; @Inject public OffsetQueue(HermesMetrics metrics, ConfigFactory configFactory) { int queueSize = configFactory.getIntProperty(Configs.CONSUMER_COMMIT_OFFSET_QUEUES_SIZE); - this.inflightOffsetsQueue = new MonitoredMpscQueue<>(metrics, "inflightOffsets", queueSize); - this.commitOffsetsQueue = new MonitoredMpscQueue<>(metrics, "committedOffsets", queueSize); + boolean fullDrainInflightsQueue = configFactory.getBooleanProperty(Configs.CONSUMER_COMMIT_OFFSET_QUEUES_INFLIGHT_DRAIN_FULL); + this.inflightOffsetsQueue = new MonitoredMpscQueue<>(fullDrainInflightsQueue ? + new FullDrainMpscQueue<>(queueSize) : new WaitFreeDrainMpscQueue<>(queueSize), metrics, "inflightOffsets"); + this.commitOffsetsQueue = new MonitoredMpscQueue<>(new WaitFreeDrainMpscQueue<>(queueSize), metrics, "committedOffsets"); } public void offerInflightOffset(SubscriptionPartitionOffset offset) { diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/queue/FullDrainMpscQueue.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/queue/FullDrainMpscQueue.java new file mode 100644 index 0000000000..1f715ae925 --- /dev/null +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/queue/FullDrainMpscQueue.java @@ -0,0 +1,55 @@ +package pl.allegro.tech.hermes.consumers.queue; + +import org.jctools.queues.MessagePassingQueue; +import org.jctools.queues.MpscArrayQueue; +import org.slf4j.Logger; + +import static org.slf4j.LoggerFactory.getLogger; + +public class FullDrainMpscQueue implements MpscQueue { + + private static final Logger logger = getLogger(FullDrainMpscQueue.class); + + private final MpscArrayQueue queue; + + public FullDrainMpscQueue(int capacity) { + this.queue = new MpscArrayQueue<>(capacity); + } + + @Override + public boolean offer(T element) { + return queue.offer(element); + } + + /** + * The {@link MpscArrayQueue#drain(MessagePassingQueue.Consumer)} method may skip items with allocated slots + * by producers (who won CAS) but were not added to the queue yet. This may happend to broken elements chain. + * See explanation at http://psy-lob-saw.blogspot.com/2014/07/poll-me-maybe.html + * + * This is an alternative approach which waits for all items to become available + * by using {@link MpscArrayQueue#poll()} underneath (which spin-waits when getting next item). + */ + @Override + public void drain(MessagePassingQueue.Consumer consumer) { + int size = queue.size(); + for (int i = 0; i < size; i++) { + T element = queue.poll(); + if (element != null) { + consumer.accept(element); + } else { + logger.warn("Unexpected null value while draining queue [idx={}, size={}]", i, size); + break; + } + } + } + + @Override + public int size() { + return queue.size(); + } + + @Override + public int capacity() { + return queue.capacity(); + } +} diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/queue/MonitoredMpscQueue.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/queue/MonitoredMpscQueue.java index 7a19a60f25..d2bc341d14 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/queue/MonitoredMpscQueue.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/queue/MonitoredMpscQueue.java @@ -1,28 +1,28 @@ package pl.allegro.tech.hermes.consumers.queue; import org.jctools.queues.MessagePassingQueue; -import org.jctools.queues.MpscArrayQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import pl.allegro.tech.hermes.common.metric.HermesMetrics; -public class MonitoredMpscQueue { +public class MonitoredMpscQueue implements MpscQueue { private static final Logger logger = LoggerFactory.getLogger(MonitoredMpscQueue.class); - private final MpscArrayQueue queue; + private final MpscQueue queue; private final String name; private final HermesMetrics metrics; - public MonitoredMpscQueue(HermesMetrics metrics, String name, int capacity) { - this.queue = new MpscArrayQueue<>(capacity); + public MonitoredMpscQueue(MpscQueue queue, HermesMetrics metrics, String name) { + this.queue = queue; this.name = name; this.metrics = metrics; metrics.registerGauge("queue." + name + ".utilization", () -> (double) queue.size() / queue.capacity()); } + @Override public boolean offer(T element) { boolean accepted = queue.offer(element); if (!accepted) { @@ -32,7 +32,18 @@ public boolean offer(T element) { return accepted; } + @Override public void drain(MessagePassingQueue.Consumer consumer) { queue.drain(consumer); } + + @Override + public int size() { + return queue.size(); + } + + @Override + public int capacity() { + return queue.capacity(); + } } diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/queue/MpscQueue.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/queue/MpscQueue.java new file mode 100644 index 0000000000..c9fe254a53 --- /dev/null +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/queue/MpscQueue.java @@ -0,0 +1,14 @@ +package pl.allegro.tech.hermes.consumers.queue; + +import org.jctools.queues.MessagePassingQueue; + +public interface MpscQueue { + + boolean offer(T element); + + void drain(MessagePassingQueue.Consumer consumer); + + int size(); + + int capacity(); +} diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/queue/WaitFreeDrainMpscQueue.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/queue/WaitFreeDrainMpscQueue.java new file mode 100644 index 0000000000..88cc5a5f57 --- /dev/null +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/queue/WaitFreeDrainMpscQueue.java @@ -0,0 +1,33 @@ +package pl.allegro.tech.hermes.consumers.queue; + +import org.jctools.queues.MessagePassingQueue; +import org.jctools.queues.MpscArrayQueue; + +public class WaitFreeDrainMpscQueue implements MpscQueue { + + private final MpscArrayQueue queue; + + public WaitFreeDrainMpscQueue(int capacity) { + this.queue = new MpscArrayQueue<>(capacity); + } + + @Override + public boolean offer(T element) { + return queue.offer(element); + } + + @Override + public void drain(MessagePassingQueue.Consumer consumer) { + queue.drain(consumer); + } + + @Override + public int size() { + return queue.size(); + } + + @Override + public int capacity() { + return queue.capacity(); + } +} diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/process/ConsumerProcessSupervisor.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/process/ConsumerProcessSupervisor.java index 5df36b75b0..178f616b98 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/process/ConsumerProcessSupervisor.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/process/ConsumerProcessSupervisor.java @@ -8,6 +8,8 @@ import pl.allegro.tech.hermes.common.config.Configs; import pl.allegro.tech.hermes.common.metric.HermesMetrics; import pl.allegro.tech.hermes.consumers.queue.MonitoredMpscQueue; +import pl.allegro.tech.hermes.consumers.queue.MpscQueue; +import pl.allegro.tech.hermes.consumers.queue.WaitFreeDrainMpscQueue; import pl.allegro.tech.hermes.consumers.supervisor.ConsumersExecutorService; import java.time.Clock; @@ -24,7 +26,7 @@ public class ConsumerProcessSupervisor implements Runnable { private static final Logger logger = LoggerFactory.getLogger(ConsumerProcessSupervisor.class); - private final MonitoredMpscQueue taskQueue; + private final MpscQueue taskQueue; private final RunningConsumerProcesses runningConsumerProcesses; @@ -48,8 +50,8 @@ public ConsumerProcessSupervisor(ConsumersExecutorService executor, this.executor = executor; this.clock = clock; this.metrics = metrics; - this.taskQueue = new MonitoredMpscQueue<>(metrics, "signalQueue", - configs.getIntProperty(Configs.CONSUMER_SIGNAL_PROCESSING_QUEUE_SIZE)); + int signalQueueSize = configs.getIntProperty(Configs.CONSUMER_SIGNAL_PROCESSING_QUEUE_SIZE); + this.taskQueue = new MonitoredMpscQueue<>(new WaitFreeDrainMpscQueue<>(signalQueueSize), metrics, "signalQueue"); this.signalsFilter = new SignalsFilter(taskQueue, clock); this.runningConsumerProcesses = new RunningConsumerProcesses(clock); this.processKiller = new ConsumerProcessKiller( diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/process/SignalsFilter.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/process/SignalsFilter.java index 4611bf7dc8..4209f89c17 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/process/SignalsFilter.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/process/SignalsFilter.java @@ -1,7 +1,7 @@ package pl.allegro.tech.hermes.consumers.supervisor.process; import com.google.common.collect.ImmutableMap; -import pl.allegro.tech.hermes.consumers.queue.MonitoredMpscQueue; +import pl.allegro.tech.hermes.consumers.queue.MpscQueue; import pl.allegro.tech.hermes.consumers.supervisor.process.Signal.SignalType; import java.time.Clock; @@ -20,9 +20,9 @@ class SignalsFilter { private final Clock clock; - private final MonitoredMpscQueue taskQueue; + private final MpscQueue taskQueue; - SignalsFilter(MonitoredMpscQueue taskQueue, Clock clock) { + SignalsFilter(MpscQueue taskQueue, Clock clock) { this.taskQueue = taskQueue; this.clock = clock; } diff --git a/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/supervisor/process/SignalsFilterTest.groovy b/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/supervisor/process/SignalsFilterTest.groovy index 9c057403f9..9e68dfb1ea 100644 --- a/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/supervisor/process/SignalsFilterTest.groovy +++ b/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/supervisor/process/SignalsFilterTest.groovy @@ -1,8 +1,8 @@ package pl.allegro.tech.hermes.consumers.supervisor.process import pl.allegro.tech.hermes.api.SubscriptionName -import pl.allegro.tech.hermes.common.metric.HermesMetrics -import pl.allegro.tech.hermes.consumers.queue.MonitoredMpscQueue +import pl.allegro.tech.hermes.consumers.queue.MpscQueue +import pl.allegro.tech.hermes.consumers.queue.WaitFreeDrainMpscQueue import pl.allegro.tech.hermes.consumers.supervisor.process.Signal.SignalType import spock.lang.Specification @@ -14,7 +14,7 @@ class SignalsFilterTest extends Specification { private final Clock clock = Clock.fixed(Instant.ofEpochMilli(1024), ZoneId.systemDefault()) - private final MonitoredMpscQueue taskQueue = new MonitoredMpscQueue<>(Mock(HermesMetrics), "queue", 10) + private final MpscQueue taskQueue = new WaitFreeDrainMpscQueue<>(10) private final SignalsFilter filter = new SignalsFilter(taskQueue, clock) diff --git a/hermes-consumers/src/test/java/pl/allegro/tech/hermes/consumers/queue/FullDrainMpscQueueTest.java b/hermes-consumers/src/test/java/pl/allegro/tech/hermes/consumers/queue/FullDrainMpscQueueTest.java new file mode 100644 index 0000000000..c4ea984bab --- /dev/null +++ b/hermes-consumers/src/test/java/pl/allegro/tech/hermes/consumers/queue/FullDrainMpscQueueTest.java @@ -0,0 +1,9 @@ +package pl.allegro.tech.hermes.consumers.queue; + +public class FullDrainMpscQueueTest extends MpscQueuesAbstractTest { + + @Override + protected MpscQueue createMpscQueue(int size) { + return new FullDrainMpscQueue<>(size); + } +} diff --git a/hermes-consumers/src/test/java/pl/allegro/tech/hermes/consumers/queue/MpscQueuesAbstractTest.java b/hermes-consumers/src/test/java/pl/allegro/tech/hermes/consumers/queue/MpscQueuesAbstractTest.java new file mode 100644 index 0000000000..ee0a22a2d9 --- /dev/null +++ b/hermes-consumers/src/test/java/pl/allegro/tech/hermes/consumers/queue/MpscQueuesAbstractTest.java @@ -0,0 +1,51 @@ +package pl.allegro.tech.hermes.consumers.queue; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +public abstract class MpscQueuesAbstractTest { + + protected abstract MpscQueue createMpscQueue(int size); + + @Test + public void shouldDrainItemsFromNonEmptyQueue() { + // given + MpscQueue queue = createMpscQueue(16); + assertThat(queue.capacity()).isEqualTo(16); + + // when + queue.offer(1); + queue.offer(2); + queue.offer(3); + + // then + assertThat(queue.size()).isEqualTo(3); + + // when + List drained = new ArrayList<>(); + queue.drain(drained::add); + + // then + assertThat(drained).contains(1, 2, 3); + + // and + assertThat(queue.size()).isZero(); + } + + @Test + public void shouldDrainEmptyQueue() { + // given + MpscQueue queue = createMpscQueue(16); + + // when + List drained = new ArrayList<>(); + queue.drain(drained::add); + + // then + assertThat(drained).isEmpty(); + } +} \ No newline at end of file diff --git a/hermes-consumers/src/test/java/pl/allegro/tech/hermes/consumers/queue/WaitFreeDrainMpscQueueTest.java b/hermes-consumers/src/test/java/pl/allegro/tech/hermes/consumers/queue/WaitFreeDrainMpscQueueTest.java new file mode 100644 index 0000000000..6923f10a3f --- /dev/null +++ b/hermes-consumers/src/test/java/pl/allegro/tech/hermes/consumers/queue/WaitFreeDrainMpscQueueTest.java @@ -0,0 +1,9 @@ +package pl.allegro.tech.hermes.consumers.queue; + +public class WaitFreeDrainMpscQueueTest extends MpscQueuesAbstractTest { + + @Override + protected MpscQueue createMpscQueue(int size) { + return new WaitFreeDrainMpscQueue<>(size); + } +} diff --git a/integration/src/test/resources/config.properties b/integration/src/test/resources/config.properties index 20b1f872ae..1c09e96fc3 100644 --- a/integration/src/test/resources/config.properties +++ b/integration/src/test/resources/config.properties @@ -32,6 +32,7 @@ consumer.status.health.port=10802 kafka.simple.consumer.buffer.size=1024 kafka.consumer.auto.offset.reset=smallest kafka.consumer.timeout.ms=1000 +consumer.commit.offset.queues.inflight.drain.full=true metrics.graphite.reporter=true metrics.console.reporter=false