Skip to content

Commit

Permalink
Full drain MpscQueue (#1032)
Browse files Browse the repository at this point in the history
Co-authored-by: Piotr Rżysko <piotr.rzysko@gmail.com>
  • Loading branch information
Daniel Krawczyk and piotrrzysko committed Jun 10, 2019
1 parent 018d42e commit 1a9cc70
Show file tree
Hide file tree
Showing 14 changed files with 209 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ public enum Configs {

CONSUMER_COMMIT_OFFSET_PERIOD("consumer.commit.offset.period", 60),
CONSUMER_COMMIT_OFFSET_QUEUES_SIZE("consumer.commit.offset.queues.size", 200_000),
CONSUMER_COMMIT_OFFSET_QUEUES_INFLIGHT_DRAIN_FULL("consumer.commit.offset.queues.inflight.drain.full", false),

CONSUMER_SENDER_ASYNC_TIMEOUT_MS("consumer.sender.async.timeout.ms", 5_000),
CONSUMER_SENDER_ASYNC_TIMEOUT_THREAD_POOL_SIZE("consumer.sender.async.timeout.thread.pool.size", 32),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Function;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,27 @@
import pl.allegro.tech.hermes.common.config.ConfigFactory;
import pl.allegro.tech.hermes.common.config.Configs;
import pl.allegro.tech.hermes.common.metric.HermesMetrics;
import pl.allegro.tech.hermes.consumers.queue.FullDrainMpscQueue;
import pl.allegro.tech.hermes.consumers.queue.MpscQueue;
import pl.allegro.tech.hermes.consumers.queue.WaitFreeDrainMpscQueue;
import pl.allegro.tech.hermes.consumers.queue.MonitoredMpscQueue;

import javax.inject.Inject;

public class OffsetQueue {

private final MonitoredMpscQueue<SubscriptionPartitionOffset> inflightOffsetsQueue;
private final MpscQueue<SubscriptionPartitionOffset> inflightOffsetsQueue;

private final MonitoredMpscQueue<SubscriptionPartitionOffset> commitOffsetsQueue;
private final MpscQueue<SubscriptionPartitionOffset> commitOffsetsQueue;

@Inject
public OffsetQueue(HermesMetrics metrics, ConfigFactory configFactory) {
int queueSize = configFactory.getIntProperty(Configs.CONSUMER_COMMIT_OFFSET_QUEUES_SIZE);

this.inflightOffsetsQueue = new MonitoredMpscQueue<>(metrics, "inflightOffsets", queueSize);
this.commitOffsetsQueue = new MonitoredMpscQueue<>(metrics, "committedOffsets", queueSize);
boolean fullDrainInflightsQueue = configFactory.getBooleanProperty(Configs.CONSUMER_COMMIT_OFFSET_QUEUES_INFLIGHT_DRAIN_FULL);
this.inflightOffsetsQueue = new MonitoredMpscQueue<>(fullDrainInflightsQueue ?
new FullDrainMpscQueue<>(queueSize) : new WaitFreeDrainMpscQueue<>(queueSize), metrics, "inflightOffsets");
this.commitOffsetsQueue = new MonitoredMpscQueue<>(new WaitFreeDrainMpscQueue<>(queueSize), metrics, "committedOffsets");
}

public void offerInflightOffset(SubscriptionPartitionOffset offset) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package pl.allegro.tech.hermes.consumers.queue;

import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.MpscArrayQueue;
import org.slf4j.Logger;

import static org.slf4j.LoggerFactory.getLogger;

public class FullDrainMpscQueue<T> implements MpscQueue<T> {

private static final Logger logger = getLogger(FullDrainMpscQueue.class);

private final MpscArrayQueue<T> queue;

public FullDrainMpscQueue(int capacity) {
this.queue = new MpscArrayQueue<>(capacity);
}

@Override
public boolean offer(T element) {
return queue.offer(element);
}

/**
* The {@link MpscArrayQueue#drain(MessagePassingQueue.Consumer)} method may skip items with allocated slots
* by producers (who won CAS) but were not added to the queue yet. This may happend to broken elements chain.
* See explanation at http://psy-lob-saw.blogspot.com/2014/07/poll-me-maybe.html
*
* This is an alternative approach which waits for all items to become available
* by using {@link MpscArrayQueue#poll()} underneath (which spin-waits when getting next item).
*/
@Override
public void drain(MessagePassingQueue.Consumer<T> consumer) {
int size = queue.size();
for (int i = 0; i < size; i++) {
T element = queue.poll();
if (element != null) {
consumer.accept(element);
} else {
logger.warn("Unexpected null value while draining queue [idx={}, size={}]", i, size);
break;
}
}
}

@Override
public int size() {
return queue.size();
}

@Override
public int capacity() {
return queue.capacity();
}
}
Original file line number Diff line number Diff line change
@@ -1,28 +1,28 @@
package pl.allegro.tech.hermes.consumers.queue;

import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.MpscArrayQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.common.metric.HermesMetrics;

public class MonitoredMpscQueue<T> {
public class MonitoredMpscQueue<T> implements MpscQueue<T> {

private static final Logger logger = LoggerFactory.getLogger(MonitoredMpscQueue.class);

private final MpscArrayQueue<T> queue;
private final MpscQueue<T> queue;

private final String name;

private final HermesMetrics metrics;

public MonitoredMpscQueue(HermesMetrics metrics, String name, int capacity) {
this.queue = new MpscArrayQueue<>(capacity);
public MonitoredMpscQueue(MpscQueue<T> queue, HermesMetrics metrics, String name) {
this.queue = queue;
this.name = name;
this.metrics = metrics;
metrics.registerGauge("queue." + name + ".utilization", () -> (double) queue.size() / queue.capacity());
}

@Override
public boolean offer(T element) {
boolean accepted = queue.offer(element);
if (!accepted) {
Expand All @@ -32,7 +32,18 @@ public boolean offer(T element) {
return accepted;
}

@Override
public void drain(MessagePassingQueue.Consumer<T> consumer) {
queue.drain(consumer);
}

@Override
public int size() {
return queue.size();
}

@Override
public int capacity() {
return queue.capacity();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package pl.allegro.tech.hermes.consumers.queue;

import org.jctools.queues.MessagePassingQueue;

public interface MpscQueue<T> {

boolean offer(T element);

void drain(MessagePassingQueue.Consumer<T> consumer);

int size();

int capacity();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package pl.allegro.tech.hermes.consumers.queue;

import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.MpscArrayQueue;

public class WaitFreeDrainMpscQueue<T> implements MpscQueue<T> {

private final MpscArrayQueue<T> queue;

public WaitFreeDrainMpscQueue(int capacity) {
this.queue = new MpscArrayQueue<>(capacity);
}

@Override
public boolean offer(T element) {
return queue.offer(element);
}

@Override
public void drain(MessagePassingQueue.Consumer<T> consumer) {
queue.drain(consumer);
}

@Override
public int size() {
return queue.size();
}

@Override
public int capacity() {
return queue.capacity();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import pl.allegro.tech.hermes.common.config.Configs;
import pl.allegro.tech.hermes.common.metric.HermesMetrics;
import pl.allegro.tech.hermes.consumers.queue.MonitoredMpscQueue;
import pl.allegro.tech.hermes.consumers.queue.MpscQueue;
import pl.allegro.tech.hermes.consumers.queue.WaitFreeDrainMpscQueue;
import pl.allegro.tech.hermes.consumers.supervisor.ConsumersExecutorService;

import java.time.Clock;
Expand All @@ -24,7 +26,7 @@ public class ConsumerProcessSupervisor implements Runnable {

private static final Logger logger = LoggerFactory.getLogger(ConsumerProcessSupervisor.class);

private final MonitoredMpscQueue<Signal> taskQueue;
private final MpscQueue<Signal> taskQueue;

private final RunningConsumerProcesses runningConsumerProcesses;

Expand All @@ -48,8 +50,8 @@ public ConsumerProcessSupervisor(ConsumersExecutorService executor,
this.executor = executor;
this.clock = clock;
this.metrics = metrics;
this.taskQueue = new MonitoredMpscQueue<>(metrics, "signalQueue",
configs.getIntProperty(Configs.CONSUMER_SIGNAL_PROCESSING_QUEUE_SIZE));
int signalQueueSize = configs.getIntProperty(Configs.CONSUMER_SIGNAL_PROCESSING_QUEUE_SIZE);
this.taskQueue = new MonitoredMpscQueue<>(new WaitFreeDrainMpscQueue<>(signalQueueSize), metrics, "signalQueue");
this.signalsFilter = new SignalsFilter(taskQueue, clock);
this.runningConsumerProcesses = new RunningConsumerProcesses(clock);
this.processKiller = new ConsumerProcessKiller(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package pl.allegro.tech.hermes.consumers.supervisor.process;

import com.google.common.collect.ImmutableMap;
import pl.allegro.tech.hermes.consumers.queue.MonitoredMpscQueue;
import pl.allegro.tech.hermes.consumers.queue.MpscQueue;
import pl.allegro.tech.hermes.consumers.supervisor.process.Signal.SignalType;

import java.time.Clock;
Expand All @@ -20,9 +20,9 @@ class SignalsFilter {

private final Clock clock;

private final MonitoredMpscQueue<Signal> taskQueue;
private final MpscQueue<Signal> taskQueue;

SignalsFilter(MonitoredMpscQueue<Signal> taskQueue, Clock clock) {
SignalsFilter(MpscQueue<Signal> taskQueue, Clock clock) {
this.taskQueue = taskQueue;
this.clock = clock;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package pl.allegro.tech.hermes.consumers.supervisor.process

import pl.allegro.tech.hermes.api.SubscriptionName
import pl.allegro.tech.hermes.common.metric.HermesMetrics
import pl.allegro.tech.hermes.consumers.queue.MonitoredMpscQueue
import pl.allegro.tech.hermes.consumers.queue.MpscQueue
import pl.allegro.tech.hermes.consumers.queue.WaitFreeDrainMpscQueue
import pl.allegro.tech.hermes.consumers.supervisor.process.Signal.SignalType
import spock.lang.Specification

Expand All @@ -14,7 +14,7 @@ class SignalsFilterTest extends Specification {

private final Clock clock = Clock.fixed(Instant.ofEpochMilli(1024), ZoneId.systemDefault())

private final MonitoredMpscQueue<Signal> taskQueue = new MonitoredMpscQueue<>(Mock(HermesMetrics), "queue", 10)
private final MpscQueue<Signal> taskQueue = new WaitFreeDrainMpscQueue<>(10)

private final SignalsFilter filter = new SignalsFilter(taskQueue, clock)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package pl.allegro.tech.hermes.consumers.queue;

public class FullDrainMpscQueueTest extends MpscQueuesAbstractTest {

@Override
protected <T> MpscQueue<T> createMpscQueue(int size) {
return new FullDrainMpscQueue<>(size);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package pl.allegro.tech.hermes.consumers.queue;

import org.junit.Test;

import java.util.ArrayList;
import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;

public abstract class MpscQueuesAbstractTest {

protected abstract <T> MpscQueue<T> createMpscQueue(int size);

@Test
public void shouldDrainItemsFromNonEmptyQueue() {
// given
MpscQueue<Integer> queue = createMpscQueue(16);
assertThat(queue.capacity()).isEqualTo(16);

// when
queue.offer(1);
queue.offer(2);
queue.offer(3);

// then
assertThat(queue.size()).isEqualTo(3);

// when
List<Integer> drained = new ArrayList<>();
queue.drain(drained::add);

// then
assertThat(drained).contains(1, 2, 3);

// and
assertThat(queue.size()).isZero();
}

@Test
public void shouldDrainEmptyQueue() {
// given
MpscQueue<Integer> queue = createMpscQueue(16);

// when
List<Integer> drained = new ArrayList<>();
queue.drain(drained::add);

// then
assertThat(drained).isEmpty();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package pl.allegro.tech.hermes.consumers.queue;

public class WaitFreeDrainMpscQueueTest extends MpscQueuesAbstractTest {

@Override
protected <T> MpscQueue<T> createMpscQueue(int size) {
return new WaitFreeDrainMpscQueue<>(size);
}
}
1 change: 1 addition & 0 deletions integration/src/test/resources/config.properties
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ consumer.status.health.port=10802
kafka.simple.consumer.buffer.size=1024
kafka.consumer.auto.offset.reset=smallest
kafka.consumer.timeout.ms=1000
consumer.commit.offset.queues.inflight.drain.full=true

metrics.graphite.reporter=true
metrics.console.reporter=false
Expand Down

0 comments on commit 1a9cc70

Please sign in to comment.