Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipe: Fixed the bug that iterator.hasNext() == true after removal from empty ConcurrentIterableLinkedQueue #12514

Merged
merged 1 commit into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ private static class LinkedListNode<E> {
private E data;
private LinkedListNode<E> next;

private LinkedListNode(E data) {
private LinkedListNode(final E data) {
this.data = data;
this.next = null;
}
Expand All @@ -65,7 +65,7 @@ private LinkedListNode(E data) {
*
* @param e the element to be added, which cannot be {@code null}
*/
public void add(E e) {
public void add(final E e) {
if (e == null) {
throw new IllegalArgumentException("Null element is not allowed.");
}
Expand Down Expand Up @@ -125,7 +125,11 @@ public long tryRemoveBefore(long newFirstIndex) {
}

firstNode = currentNode;
pilotNode.next = firstNode;
// pilotNode.next shall be null when the queue is empty and firstNode == pilotNode
// to make iterator.hasNext() == false when the iterator is on the pilotNode
if (firstNode != pilotNode) {
pilotNode.next = firstNode;
}

// Reset firstNode and lastNode to pilotNode if the queue becomes empty
if (firstNode == null) {
Expand Down Expand Up @@ -200,7 +204,7 @@ public long getTailIndex() {
}
}

public void setFirstIndex(long firstIndex) {
public void setFirstIndex(final long firstIndex) {
lock.writeLock().lock();
try {
this.firstIndex = firstIndex;
Expand All @@ -218,7 +222,7 @@ public void setFirstIndex(long firstIndex) {
* If the queue is empty, the given index is valid if it is equal to {@link
* ConcurrentIterableLinkedQueue#firstIndex}.
*/
public boolean isNextIndexValid(long nextIndex) {
public boolean isNextIndexValid(final long nextIndex) {
lock.readLock().lock();
try {
return firstIndex <= nextIndex && nextIndex <= tailIndex;
Expand All @@ -231,7 +235,7 @@ public boolean hasAnyIterators() {
return !iteratorSet.isEmpty();
}

public DynamicIterator iterateFrom(long offset) {
public DynamicIterator iterateFrom(final long offset) {
final DynamicIterator iterator = new DynamicIterator(offset);
iteratorSet.put(iterator, iterator);
return iterator;
Expand Down Expand Up @@ -289,7 +293,7 @@ public E next() {
* @return the next element in the queue. {@code null} if the queue is closed, or if the waiting
* time elapsed, or the thread is interrupted
*/
public E next(long waitTimeMillis) {
public E next(final long waitTimeMillis) {
lock.writeLock().lock();
try {
while (!hasNext()) {
Expand All @@ -306,7 +310,7 @@ public E next(long waitTimeMillis) {
++nextIndex;

return currentNode.data;
} catch (InterruptedException e) {
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.warn("Interrupted while waiting for next element.", e);
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@

public class ConcurrentIterableLinkedQueueTest {

ConcurrentIterableLinkedQueue<Integer> queue;
private ConcurrentIterableLinkedQueue<Integer> queue;

@Before
public void setUp() {
Expand All @@ -60,15 +60,15 @@ public void testSeek() {
queue.add(2);
queue.add(3);

ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = queue.iterateFrom(0);
final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = queue.iterateFrom(0);
itr.seek(2);
Assert.assertEquals(Integer.valueOf(3), itr.next());
}

@Test(timeout = 60000)
public void testTimedGet() {
queue.add(1);
ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = queue.iterateFromEarliest();
final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = queue.iterateFromEarliest();
Assert.assertEquals(1, (int) itr.next(1000));
Assert.assertNull(itr.next(1000));
}
Expand All @@ -80,8 +80,8 @@ public void testInsertNull() {

@Test(timeout = 60000)
public void testConcurrentAddAndRemove() throws InterruptedException {
int numberOfAdds = 500;
ExecutorService executor = Executors.newFixedThreadPool(2);
final int numberOfAdds = 500;
final ExecutorService executor = Executors.newFixedThreadPool(2);

// Thread 1 adds elements to the queue
executor.submit(
Expand Down Expand Up @@ -110,18 +110,18 @@ public void testConcurrentAddAndRemove() throws InterruptedException {

@Test(timeout = 60000)
public void testIterateFromEmptyQueue() {
ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = queue.iterateFrom(1);
final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = queue.iterateFrom(1);

AtomicInteger value = new AtomicInteger(-1);
final AtomicInteger value = new AtomicInteger(-1);
new Thread(() -> value.set(itr.next())).start();
queue.add(3);
Awaitility.await().untilAsserted(() -> Assert.assertEquals(3, value.get()));
}

@Test(timeout = 60000)
public void testContinuousEmptyNext() throws InterruptedException {
ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = queue.iterateFrom(0);
AtomicInteger consumedValue = new AtomicInteger(0);
final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = queue.iterateFrom(0);
final AtomicInteger consumedValue = new AtomicInteger(0);
new Thread(
() -> {
while (true) {
Expand All @@ -143,7 +143,7 @@ public void testContinuousEmptyNext() throws InterruptedException {
public void testRemove() {
queue.add(1);
queue.add(2);
ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = queue.iterateFrom(1);
final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = queue.iterateFrom(1);

Assert.assertEquals(1, queue.tryRemoveBefore(Long.MAX_VALUE));
Assert.assertEquals(2, (int) itr.next());
Expand All @@ -153,7 +153,7 @@ public void testRemove() {
public void testRemoveAgainstNewestItr() {
queue.add(1);
queue.add(2);
ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = queue.iterateFromLatest();
final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = queue.iterateFromLatest();

Assert.assertEquals(2, queue.tryRemoveBefore(Long.MAX_VALUE));
queue.add(3);
Expand All @@ -164,7 +164,7 @@ public void testRemoveAgainstNewestItr() {
public void testClear() {
queue.add(1);
queue.add(2);
ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = queue.iterateFrom(1);
final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = queue.iterateFrom(1);
queue.clear();

assertFalse(queue.hasAnyIterators());
Expand All @@ -186,12 +186,12 @@ public void testIntegratedOperations() {
Assert.assertEquals(1, queue.getFirstIndex());
Assert.assertEquals(2, queue.getTailIndex());

ConcurrentIterableLinkedQueue<Integer>.DynamicIterator it = queue.iterateFromEarliest();
final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator it = queue.iterateFromEarliest();
Assert.assertEquals(2, (int) it.next());

ConcurrentIterableLinkedQueue<Integer>.DynamicIterator it2 = queue.iterateFromLatest();
final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator it2 = queue.iterateFromLatest();
Assert.assertEquals(2, it2.getNextIndex());
AtomicInteger value = new AtomicInteger(-1);
final AtomicInteger value = new AtomicInteger(-1);
new Thread(() -> value.set(it2.next())).start();
queue.add(3);
Awaitility.await().untilAsserted(() -> Assert.assertEquals(3, value.get()));
Expand All @@ -206,10 +206,10 @@ public void testIntegratedOperations() {

@Test(timeout = 60000)
public void testConcurrentReadWrite() {
AtomicBoolean failure = new AtomicBoolean(false);
List<Thread> threadList = new ArrayList<>(102);
final AtomicBoolean failure = new AtomicBoolean(false);
final List<Thread> threadList = new ArrayList<>(102);

Thread thread1 =
final Thread thread1 =
new Thread(
() -> {
try {
Expand All @@ -223,7 +223,7 @@ public void testConcurrentReadWrite() {
threadList.add(thread1);
thread1.start();

Thread thread2 =
final Thread thread2 =
new Thread(
() -> {
try {
Expand All @@ -238,7 +238,7 @@ public void testConcurrentReadWrite() {
thread2.start();

for (int i = 0; i < 100; ++i) {
Thread thread =
final Thread thread =
new Thread(
() -> {
try {
Expand Down Expand Up @@ -282,13 +282,13 @@ public void testEmptyQueueBehavior() {
@Test(timeout = 60000)
public void testBoundaryConditions() {
queue.add(1);
ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = queue.iterateFrom(10);
final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = queue.iterateFrom(10);
Assert.assertFalse(itr.hasNext());
}

@Test(timeout = 60000)
public void testConcurrentExceptionHandling() throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(2);
final ExecutorService executor = Executors.newFixedThreadPool(2);

executor.submit(
() -> {
Expand All @@ -301,11 +301,11 @@ public void testConcurrentExceptionHandling() throws InterruptedException {
queue.clear();
});

AtomicBoolean caughtException = new AtomicBoolean(false);
final AtomicBoolean caughtException = new AtomicBoolean(false);
executor.submit(
() -> {
try {
ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr =
final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr =
queue.iterateFromEarliest();
while (itr.hasNext()) {
itr.next();
Expand All @@ -327,7 +327,7 @@ public void testHighLoadPerformance() {
queue.add(i);
}

ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = queue.iterateFrom(0);
final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = queue.iterateFrom(0);
for (int i = 0; i < numberOfElements; i++) {
Assert.assertTrue(itr.hasNext());
Assert.assertEquals(Integer.valueOf(i), itr.next());
Expand All @@ -337,8 +337,8 @@ public void testHighLoadPerformance() {

@Test(timeout = 60000)
public void testMultiThreadedConsistency() throws InterruptedException {
int numberOfElements = 1000;
ExecutorService executor = Executors.newFixedThreadPool(10);
final int numberOfElements = 1000;
final ExecutorService executor = Executors.newFixedThreadPool(10);

for (int i = 0; i < numberOfElements; i++) {
int finalI = i;
Expand All @@ -348,8 +348,8 @@ public void testMultiThreadedConsistency() throws InterruptedException {
executor.shutdown();
Assert.assertTrue(executor.awaitTermination(1, TimeUnit.MINUTES));

ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = queue.iterateFromEarliest();
HashSet<Integer> elements = new HashSet<>();
final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = queue.iterateFromEarliest();
final HashSet<Integer> elements = new HashSet<>();
while (itr.hasNext()) {
elements.add(itr.next());
}
Expand All @@ -363,7 +363,7 @@ public void testIteratorBasicFunctionality() {
queue.add(i);
}

ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = queue.iterateFrom(0);
final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = queue.iterateFrom(0);
for (int i = 0; i < 5; i++) {
Assert.assertTrue(itr.hasNext());
Assert.assertEquals(Integer.valueOf(i), itr.next());
Expand All @@ -378,7 +378,7 @@ public void testIteratorAfterRemoval() {
}

queue.tryRemoveBefore(3);
ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = queue.iterateFrom(0);
final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = queue.iterateFrom(0);
for (int i = 3; i < 5; i++) {
Assert.assertTrue(itr.hasNext());
Assert.assertEquals(Integer.valueOf(i), itr.next());
Expand All @@ -392,8 +392,8 @@ public void testIteratorConcurrentAccess() throws InterruptedException {
queue.add(i);
}

ExecutorService executor = Executors.newFixedThreadPool(10);
AtomicInteger count = new AtomicInteger(0);
final ExecutorService executor = Executors.newFixedThreadPool(10);
final AtomicInteger count = new AtomicInteger(0);

for (int i = 0; i < 10; i++) {
executor.submit(
Expand All @@ -417,7 +417,7 @@ public void testIteratorDuringQueueModification() {
queue.add(i);
}

ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = queue.iterateFrom(0);
final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = queue.iterateFrom(0);
for (int i = 0; i < 3; i++) {
Assert.assertTrue(itr.hasNext());
Assert.assertEquals(Integer.valueOf(i), itr.next());
Expand Down Expand Up @@ -447,10 +447,20 @@ public void testIteratorDuringQueueModification() {
Assert.assertFalse(itr.hasNext());
}

@Test(timeout = 60000)
public void testIterateAfterRemoveFromEmptyQueue() {
final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = queue.iterateFrom(0);
Assert.assertFalse(itr.hasNext());
queue.tryRemoveBefore(0);
Assert.assertFalse(itr.hasNext());
itr.next(10);
Assert.assertEquals(0, itr.getNextIndex());
}

@Test(timeout = 60000)
public void testIteratorExceptionHandling() {
queue.add(1);
ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = queue.iterateFrom(0);
final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = queue.iterateFrom(0);
queue.clear();

Assert.assertFalse(itr.hasNext());
Expand All @@ -464,8 +474,8 @@ public void testIteratorSeek() {
queue.add(i);
}

ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = queue.iterateFrom(0);
long newNextIndex = itr.seek(5);
final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = queue.iterateFrom(0);
final long newNextIndex = itr.seek(5);
Assert.assertEquals(5, newNextIndex);
Assert.assertTrue(itr.hasNext());
Assert.assertEquals(Integer.valueOf(5), itr.next());
Expand Down
Loading