Skip to content

Commit

Permalink
fix evicting queue growing unbounded under high load
Browse files Browse the repository at this point in the history
Signed-off-by: Dominik Guggemos <dominik.guggemos@bosch.io>
  • Loading branch information
dguggemos committed Jul 7, 2021
1 parent f7284ca commit 183f1d8
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 8 deletions.
Expand Up @@ -18,6 +18,7 @@
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;

import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
Expand All @@ -34,10 +35,13 @@ final class DefaultEvictingQueue<E> extends AbstractQueue<E> implements Evicting

private final int capacity;
private final Queue<E> elements;
// keep track of queue size manually, because calling queue.size() is expensive
private final AtomicInteger size;

private DefaultEvictingQueue(final int capacity) {
this.capacity = capacity;
this.elements = new ConcurrentLinkedQueue<>();
elements = new ConcurrentLinkedQueue<>();
size = new AtomicInteger();
}

/**
Expand All @@ -58,15 +62,19 @@ public Iterator<E> iterator() {

@Override
public boolean offer(@Nullable final E e) {
if (capacity == elements.size()) {
elements.poll();
if (size.getAndIncrement() >= capacity) {
poll();
}
return elements.offer(e);
}

@Override
public E poll() {
return elements.poll();
final E pollResult = elements.poll();
if (pollResult != null) {
size.decrementAndGet();
}
return pollResult;
}

@Override
Expand All @@ -76,7 +84,7 @@ public E peek() {

@Override
public int size() {
return elements.size();
return size.get();
}

@Override
Expand All @@ -89,19 +97,21 @@ public boolean equals(@Nullable final Object o) {
}
final DefaultEvictingQueue<?> that = (DefaultEvictingQueue<?>) o;
return capacity == that.capacity &&
Objects.equals(elements, that.elements);
Objects.equals(elements, that.elements) &&
Objects.equals(size, that.size);
}

@Override
public int hashCode() {
return Objects.hash(capacity, elements);
return Objects.hash(capacity, elements, size);
}

@Override
public String toString() {
return getClass().getSimpleName() + " [" +
"capacity=" + capacity +
", elements=" + elements +
", size=" + size +
"]";
}

Expand Down
Expand Up @@ -17,8 +17,10 @@

import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import org.junit.Test;
Expand All @@ -43,8 +45,27 @@ public void verifyEviction() {
queue.addAll(remainingStrings);

assertThat(queue)
.containsOnlyElementsOf(remainingStrings)
.hasSameElementsAs(remainingStrings)
.hasSize(remainingStrings.size());
assertThat(queue.stream().count())
.isEqualTo(remainingStrings.size());
}

@Test
public void verifyEvictionUnderHighLoad() throws InterruptedException {
final EvictingQueue<String> queue = DefaultEvictingQueue.withCapacity(CAPACITY);

final CountDownLatch latch = new CountDownLatch(100000);

IntStream.range(0, 100000).parallel().forEach(i -> {
queue.add(Integer.toString(i));
latch.countDown();
});

latch.await();

assertThat(queue.size()).isEqualTo(CAPACITY);
assertThat(queue.stream().count()).isEqualTo(CAPACITY);
}

private List<String> createRandomStrings(final int n) {
Expand Down

0 comments on commit 183f1d8

Please sign in to comment.