diff --git a/jctools-core/src/test/java/org/jctools/queues/MpqSanityTest.java b/jctools-core/src/test/java/org/jctools/queues/MpqSanityTest.java index acfb5e55..812eafdc 100644 --- a/jctools-core/src/test/java/org/jctools/queues/MpqSanityTest.java +++ b/jctools-core/src/test/java/org/jctools/queues/MpqSanityTest.java @@ -1,5 +1,6 @@ package org.jctools.queues; +import org.jctools.queues.atomic.AtomicQueueFactory; import org.jctools.queues.spec.ConcurrentQueueSpec; import org.jctools.queues.spec.Ordering; import org.jctools.queues.spec.Preference; @@ -19,7 +20,7 @@ public abstract class MpqSanityTest { - static final int SIZE = 8192 * 2; + public static final int SIZE = 8192 * 2; static final int CONCURRENT_TEST_DURATION = 500; static final int TEST_TIMEOUT = 30000; @@ -34,7 +35,7 @@ public MpqSanityTest(ConcurrentQueueSpec spec, MessagePassingQueue queu this.spec = spec; } - static Object[] makeMpq(int producers, int consumers, int capacity, Ordering ordering, Queue q) + public static Object[] makeMpq(int producers, int consumers, int capacity, Ordering ordering, Queue q) { ConcurrentQueueSpec spec = new ConcurrentQueueSpec(producers, consumers, capacity, ordering, Preference.NONE); @@ -45,6 +46,16 @@ static Object[] makeMpq(int producers, int consumers, int capacity, Ordering ord return new Object[] {spec, q}; } + public static Object[] makeAtomic(int producers, int consumers, int capacity, Ordering ordering, Queue q) + { + ConcurrentQueueSpec spec = new ConcurrentQueueSpec(producers, consumers, capacity, ordering, + Preference.NONE); + if (q == null) + { + q = AtomicQueueFactory.newQueue(spec); + } + return new Object[] {spec, q}; + } @After public void clear() throws InterruptedException { diff --git a/jctools-core/src/test/java/org/jctools/queues/atomic/AtomicMpqSanityTestMpmcArray.java b/jctools-core/src/test/java/org/jctools/queues/atomic/AtomicMpqSanityTestMpmcArray.java new file mode 100644 index 00000000..73fb1548 --- /dev/null +++ b/jctools-core/src/test/java/org/jctools/queues/atomic/AtomicMpqSanityTestMpmcArray.java @@ -0,0 +1,29 @@ +package org.jctools.queues.atomic; + +import org.jctools.queues.MessagePassingQueue; +import org.jctools.queues.MpqSanityTestMpmcArray; +import org.jctools.queues.spec.ConcurrentQueueSpec; +import org.jctools.queues.spec.Ordering; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.ArrayList; +import java.util.Collection; + +@RunWith(Parameterized.class) +public class AtomicMpqSanityTestMpmcArray extends MpqSanityTestMpmcArray +{ + public AtomicMpqSanityTestMpmcArray(ConcurrentQueueSpec spec, MessagePassingQueue queue) + { + super(spec, queue); + } + + @Parameterized.Parameters + public static Collection parameters() + { + ArrayList list = new ArrayList(); + list.add(makeAtomic(0, 0, 2, Ordering.FIFO, null)); + list.add(makeAtomic(0, 0, SIZE, Ordering.FIFO, null)); + return list; + } +} diff --git a/jctools-core/src/test/java/org/jctools/queues/atomic/AtomicMpqSanityTestMpscArray.java b/jctools-core/src/test/java/org/jctools/queues/atomic/AtomicMpqSanityTestMpscArray.java new file mode 100644 index 00000000..e0be2bd0 --- /dev/null +++ b/jctools-core/src/test/java/org/jctools/queues/atomic/AtomicMpqSanityTestMpscArray.java @@ -0,0 +1,30 @@ +package org.jctools.queues.atomic; + +import org.jctools.queues.MessagePassingQueue; +import org.jctools.queues.MpqSanityTest; +import org.jctools.queues.MpqSanityTestMpscArray; +import org.jctools.queues.spec.ConcurrentQueueSpec; +import org.jctools.queues.spec.Ordering; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.ArrayList; +import java.util.Collection; + +@RunWith(Parameterized.class) +public class AtomicMpqSanityTestMpscArray extends MpqSanityTestMpscArray +{ + public AtomicMpqSanityTestMpscArray(ConcurrentQueueSpec spec, MessagePassingQueue queue) + { + super(spec, queue); + } + + @Parameterized.Parameters + public static Collection parameters() + { + ArrayList list = new ArrayList(); + list.add(makeAtomic(0, 1, 1, Ordering.FIFO, null));// MPSC size 1 + list.add(makeAtomic(0, 1, SIZE, Ordering.FIFO, null));// MPSC size SIZE + return list; + } +} diff --git a/jctools-core/src/test/java/org/jctools/queues/atomic/AtomicMpqSanityTestMpscChunked.java b/jctools-core/src/test/java/org/jctools/queues/atomic/AtomicMpqSanityTestMpscChunked.java new file mode 100644 index 00000000..440bc3cd --- /dev/null +++ b/jctools-core/src/test/java/org/jctools/queues/atomic/AtomicMpqSanityTestMpscChunked.java @@ -0,0 +1,32 @@ +package org.jctools.queues.atomic; + +import org.jctools.queues.MessagePassingQueue; +import org.jctools.queues.MpqSanityTest; +import org.jctools.queues.MpqSanityTestMpscChunked; +import org.jctools.queues.MpscChunkedArrayQueue; +import org.jctools.queues.spec.ConcurrentQueueSpec; +import org.jctools.queues.spec.Ordering; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.ArrayList; +import java.util.Collection; + +@RunWith(Parameterized.class) +public class AtomicMpqSanityTestMpscChunked extends MpqSanityTestMpscChunked +{ + public AtomicMpqSanityTestMpscChunked(ConcurrentQueueSpec spec, MessagePassingQueue queue) + { + super(spec, queue); + } + + @Parameterized.Parameters + public static Collection parameters() + { + ArrayList list = new ArrayList(); + list.add(makeAtomic(0, 1, 4, Ordering.FIFO, new MpscChunkedArrayQueue<>(2, 4)));// MPSC size 1 + list.add(makeAtomic(0, 1, SIZE, Ordering.FIFO, new MpscChunkedArrayQueue<>(8, SIZE)));// MPSC size SIZE + return list; + } +} diff --git a/jctools-core/src/test/java/org/jctools/queues/atomic/AtomicMpqSanityTestMpscCompound.java b/jctools-core/src/test/java/org/jctools/queues/atomic/AtomicMpqSanityTestMpscCompound.java new file mode 100644 index 00000000..3221cab7 --- /dev/null +++ b/jctools-core/src/test/java/org/jctools/queues/atomic/AtomicMpqSanityTestMpscCompound.java @@ -0,0 +1,32 @@ +package org.jctools.queues.atomic; + +import org.jctools.queues.MessagePassingQueue; +import org.jctools.queues.MpqSanityTest; +import org.jctools.queues.MpqSanityTestMpscCompound; +import org.jctools.queues.spec.ConcurrentQueueSpec; +import org.jctools.queues.spec.Ordering; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.ArrayList; +import java.util.Collection; + +import static org.jctools.util.PortableJvmInfo.CPUs; + +@RunWith(Parameterized.class) +public class AtomicMpqSanityTestMpscCompound extends MpqSanityTestMpscCompound +{ + public AtomicMpqSanityTestMpscCompound(ConcurrentQueueSpec spec, MessagePassingQueue queue) + { + super(spec, queue); + } + + @Parameterized.Parameters + public static Collection parameters() + { + ArrayList list = new ArrayList(); + list.add(makeAtomic(0, 1, CPUs, Ordering.NONE, null));// MPSC size 1 + list.add(makeAtomic(0, 1, SIZE, Ordering.NONE, null));// MPSC size SIZE + return list; + } +} diff --git a/jctools-core/src/test/java/org/jctools/queues/atomic/AtomicMpqSanityTestMpscGrowable.java b/jctools-core/src/test/java/org/jctools/queues/atomic/AtomicMpqSanityTestMpscGrowable.java new file mode 100644 index 00000000..5b5c71a0 --- /dev/null +++ b/jctools-core/src/test/java/org/jctools/queues/atomic/AtomicMpqSanityTestMpscGrowable.java @@ -0,0 +1,32 @@ +package org.jctools.queues.atomic; + +import org.jctools.queues.MessagePassingQueue; +import org.jctools.queues.MpqSanityTest; +import org.jctools.queues.MpqSanityTestMpscGrowable; +import org.jctools.queues.MpscGrowableArrayQueue; +import org.jctools.queues.spec.ConcurrentQueueSpec; +import org.jctools.queues.spec.Ordering; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.ArrayList; +import java.util.Collection; + +@RunWith(Parameterized.class) +public class AtomicMpqSanityTestMpscGrowable extends MpqSanityTestMpscGrowable +{ + public AtomicMpqSanityTestMpscGrowable(ConcurrentQueueSpec spec, MessagePassingQueue queue) + { + super(spec, queue); + } + + @Parameterized.Parameters + public static Collection parameters() + { + ArrayList list = new ArrayList(); + list.add(makeAtomic(0, 1, 4, Ordering.FIFO, new MpscGrowableArrayQueue<>(2, 4)));// MPSC size 1 + list.add(makeAtomic(0, 1, SIZE, Ordering.FIFO, new MpscGrowableArrayQueue<>(8, SIZE)));// MPSC size SIZE + return list; + } + +} diff --git a/jctools-core/src/test/java/org/jctools/queues/atomic/AtomicMpqSanityTestMpscLinked.java b/jctools-core/src/test/java/org/jctools/queues/atomic/AtomicMpqSanityTestMpscLinked.java new file mode 100644 index 00000000..038e91d0 --- /dev/null +++ b/jctools-core/src/test/java/org/jctools/queues/atomic/AtomicMpqSanityTestMpscLinked.java @@ -0,0 +1,29 @@ +package org.jctools.queues.atomic; + +import org.jctools.queues.MessagePassingQueue; +import org.jctools.queues.MpqSanityTest; +import org.jctools.queues.MpqSanityTestMpscLinked; +import org.jctools.queues.spec.ConcurrentQueueSpec; +import org.jctools.queues.spec.Ordering; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.ArrayList; +import java.util.Collection; + +@RunWith(Parameterized.class) +public class AtomicMpqSanityTestMpscLinked extends MpqSanityTestMpscLinked +{ + public AtomicMpqSanityTestMpscLinked(ConcurrentQueueSpec spec, MessagePassingQueue queue) + { + super(spec, queue); + } + + @Parameterized.Parameters + public static Collection parameters() + { + ArrayList list = new ArrayList(); + list.add(makeAtomic(0, 1, 0, Ordering.FIFO, null));// unbounded MPSC + return list; + } +} diff --git a/jctools-core/src/test/java/org/jctools/queues/atomic/AtomicMpqSanityTestMpscUnbounded.java b/jctools-core/src/test/java/org/jctools/queues/atomic/AtomicMpqSanityTestMpscUnbounded.java new file mode 100644 index 00000000..dda25e65 --- /dev/null +++ b/jctools-core/src/test/java/org/jctools/queues/atomic/AtomicMpqSanityTestMpscUnbounded.java @@ -0,0 +1,32 @@ +package org.jctools.queues.atomic; + +import org.jctools.queues.MessagePassingQueue; +import org.jctools.queues.MpqSanityTest; +import org.jctools.queues.MpqSanityTestMpscUnbounded; +import org.jctools.queues.MpscUnboundedArrayQueue; +import org.jctools.queues.spec.ConcurrentQueueSpec; +import org.jctools.queues.spec.Ordering; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.ArrayList; +import java.util.Collection; + +@RunWith(Parameterized.class) +public class AtomicMpqSanityTestMpscUnbounded extends MpqSanityTestMpscUnbounded +{ + public AtomicMpqSanityTestMpscUnbounded(ConcurrentQueueSpec spec, MessagePassingQueue queue) + { + super(spec, queue); + } + + @Parameterized.Parameters + public static Collection parameters() + { + ArrayList list = new ArrayList(); + list.add(makeAtomic(0, 1, 0, Ordering.FIFO, new MpscUnboundedArrayQueue<>(2))); + list.add(makeAtomic(0, 1, 0, Ordering.FIFO, new MpscUnboundedArrayQueue<>(64))); + return list; + } + +} diff --git a/jctools-core/src/test/java/org/jctools/queues/atomic/AtomicMpqSanityTestSpmcArray.java b/jctools-core/src/test/java/org/jctools/queues/atomic/AtomicMpqSanityTestSpmcArray.java new file mode 100644 index 00000000..c25963c8 --- /dev/null +++ b/jctools-core/src/test/java/org/jctools/queues/atomic/AtomicMpqSanityTestSpmcArray.java @@ -0,0 +1,30 @@ +package org.jctools.queues.atomic; + +import org.jctools.queues.MessagePassingQueue; +import org.jctools.queues.MpqSanityTest; +import org.jctools.queues.MpqSanityTestSpmcArray; +import org.jctools.queues.spec.ConcurrentQueueSpec; +import org.jctools.queues.spec.Ordering; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.ArrayList; +import java.util.Collection; + +@RunWith(Parameterized.class) +public class AtomicMpqSanityTestSpmcArray extends MpqSanityTestSpmcArray +{ + public AtomicMpqSanityTestSpmcArray(ConcurrentQueueSpec spec, MessagePassingQueue queue) + { + super(spec, queue); + } + + @Parameterized.Parameters + public static Collection parameters() + { + ArrayList list = new ArrayList(); + list.add(makeAtomic(1, 0, 1, Ordering.FIFO, null));// SPMC size 1 + list.add(makeAtomic(1, 0, SIZE, Ordering.FIFO, null));// SPMC size SIZE + return list; + } +} diff --git a/jctools-core/src/test/java/org/jctools/queues/atomic/AtomicMpqSanityTestSpscArray.java b/jctools-core/src/test/java/org/jctools/queues/atomic/AtomicMpqSanityTestSpscArray.java new file mode 100644 index 00000000..71091756 --- /dev/null +++ b/jctools-core/src/test/java/org/jctools/queues/atomic/AtomicMpqSanityTestSpscArray.java @@ -0,0 +1,30 @@ +package org.jctools.queues.atomic; + +import org.jctools.queues.MessagePassingQueue; +import org.jctools.queues.MpqSanityTest; +import org.jctools.queues.MpqSanityTestSpscArray; +import org.jctools.queues.spec.ConcurrentQueueSpec; +import org.jctools.queues.spec.Ordering; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.ArrayList; +import java.util.Collection; + +@RunWith(Parameterized.class) +public class AtomicMpqSanityTestSpscArray extends MpqSanityTestSpscArray +{ + public AtomicMpqSanityTestSpscArray(ConcurrentQueueSpec spec, MessagePassingQueue queue) + { + super(spec, queue); + } + + @Parameterized.Parameters + public static Collection parameters() + { + ArrayList list = new ArrayList(); + list.add(makeAtomic(1, 1, 4, Ordering.FIFO, null));// SPSC size 4 + list.add(makeAtomic(1, 1, SIZE, Ordering.FIFO, null));// SPSC size SIZE + return list; + } +} diff --git a/jctools-core/src/test/java/org/jctools/queues/atomic/AtomicMpqSanityTestSpscChunked.java b/jctools-core/src/test/java/org/jctools/queues/atomic/AtomicMpqSanityTestSpscChunked.java new file mode 100644 index 00000000..948c8fca --- /dev/null +++ b/jctools-core/src/test/java/org/jctools/queues/atomic/AtomicMpqSanityTestSpscChunked.java @@ -0,0 +1,32 @@ +package org.jctools.queues.atomic; + +import org.jctools.queues.MessagePassingQueue; +import org.jctools.queues.MpqSanityTest; +import org.jctools.queues.MpqSanityTestSpscChunked; +import org.jctools.queues.SpscChunkedArrayQueue; +import org.jctools.queues.spec.ConcurrentQueueSpec; +import org.jctools.queues.spec.Ordering; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.ArrayList; +import java.util.Collection; + +@RunWith(Parameterized.class) +public class AtomicMpqSanityTestSpscChunked extends MpqSanityTestSpscChunked +{ + public AtomicMpqSanityTestSpscChunked(ConcurrentQueueSpec spec, MessagePassingQueue queue) + { + super(spec, queue); + } + + @Parameterized.Parameters + public static Collection parameters() + { + ArrayList list = new ArrayList(); + list.add(makeAtomic(1, 1, 16, Ordering.FIFO, new SpscChunkedArrayQueue<>(8, 16)));// MPSC size 1 + list.add(makeAtomic(1, 1, SIZE, Ordering.FIFO, new SpscChunkedArrayQueue<>(8, SIZE)));// MPSC size SIZE + return list; + } +} diff --git a/jctools-core/src/test/java/org/jctools/queues/atomic/AtomicMpqSanityTestSpscGrowable.java b/jctools-core/src/test/java/org/jctools/queues/atomic/AtomicMpqSanityTestSpscGrowable.java new file mode 100644 index 00000000..e05bfddd --- /dev/null +++ b/jctools-core/src/test/java/org/jctools/queues/atomic/AtomicMpqSanityTestSpscGrowable.java @@ -0,0 +1,33 @@ +package org.jctools.queues.atomic; + +import org.jctools.queues.MessagePassingQueue; +import org.jctools.queues.MpqSanityTest; +import org.jctools.queues.MpqSanityTestSpscGrowable; +import org.jctools.queues.SpscGrowableArrayQueue; +import org.jctools.queues.spec.ConcurrentQueueSpec; +import org.jctools.queues.spec.Ordering; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.ArrayList; +import java.util.Collection; + +@RunWith(Parameterized.class) +public class AtomicMpqSanityTestSpscGrowable extends MpqSanityTestSpscGrowable +{ + + public AtomicMpqSanityTestSpscGrowable(ConcurrentQueueSpec spec, MessagePassingQueue queue) + { + super(spec, queue); + } + + @Parameterized.Parameters + public static Collection parameters() + { + ArrayList list = new ArrayList(); + list.add(makeAtomic(1, 1, 16, Ordering.FIFO, new SpscGrowableArrayQueue<>(8, 16))); + list.add(makeAtomic(1, 1, SIZE, Ordering.FIFO, new SpscGrowableArrayQueue<>(8, SIZE))); + return list; + } + +} diff --git a/jctools-core/src/test/java/org/jctools/queues/atomic/AtomicMpqSanityTestSpscLinked.java b/jctools-core/src/test/java/org/jctools/queues/atomic/AtomicMpqSanityTestSpscLinked.java new file mode 100644 index 00000000..d68380b7 --- /dev/null +++ b/jctools-core/src/test/java/org/jctools/queues/atomic/AtomicMpqSanityTestSpscLinked.java @@ -0,0 +1,29 @@ +package org.jctools.queues.atomic; + +import org.jctools.queues.MessagePassingQueue; +import org.jctools.queues.MpqSanityTest; +import org.jctools.queues.MpqSanityTestSpscLinked; +import org.jctools.queues.spec.ConcurrentQueueSpec; +import org.jctools.queues.spec.Ordering; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.ArrayList; +import java.util.Collection; + +@RunWith(Parameterized.class) +public class AtomicMpqSanityTestSpscLinked extends MpqSanityTestSpscLinked +{ + public AtomicMpqSanityTestSpscLinked(ConcurrentQueueSpec spec, MessagePassingQueue queue) + { + super(spec, queue); + } + + @Parameterized.Parameters + public static Collection parameters() + { + ArrayList list = new ArrayList(); + list.add(makeAtomic(1, 1, 0, Ordering.FIFO, null));// unbounded SPSC + return list; + } +} diff --git a/jctools-core/src/test/java/org/jctools/queues/atomic/AtomicMpqSanityTestSpscUnbounded.java b/jctools-core/src/test/java/org/jctools/queues/atomic/AtomicMpqSanityTestSpscUnbounded.java new file mode 100644 index 00000000..8b66b56a --- /dev/null +++ b/jctools-core/src/test/java/org/jctools/queues/atomic/AtomicMpqSanityTestSpscUnbounded.java @@ -0,0 +1,32 @@ +package org.jctools.queues.atomic; + +import org.jctools.queues.MessagePassingQueue; +import org.jctools.queues.MpqSanityTest; +import org.jctools.queues.MpqSanityTestSpscUnbounded; +import org.jctools.queues.SpscUnboundedArrayQueue; +import org.jctools.queues.spec.ConcurrentQueueSpec; +import org.jctools.queues.spec.Ordering; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.ArrayList; +import java.util.Collection; + +@RunWith(Parameterized.class) +public class AtomicMpqSanityTestSpscUnbounded extends MpqSanityTestSpscUnbounded +{ + + public AtomicMpqSanityTestSpscUnbounded(ConcurrentQueueSpec spec, MessagePassingQueue queue) + { + super(spec, queue); + } + + @Parameterized.Parameters + public static Collection parameters() + { + ArrayList list = new ArrayList(); + list.add(makeAtomic(1, 1, 0, Ordering.FIFO, new SpscUnboundedArrayQueue<>(2))); + list.add(makeAtomic(1, 1, 0, Ordering.FIFO, new SpscUnboundedArrayQueue<>(64))); + return list; + } +} diff --git a/jctools-core/src/test/java/org/jctools/queues/atomic/MpscAtomicArrayQueueOfferWithThresholdTest.java b/jctools-core/src/test/java/org/jctools/queues/atomic/MpscAtomicArrayQueueOfferWithThresholdTest.java new file mode 100644 index 00000000..7aabffed --- /dev/null +++ b/jctools-core/src/test/java/org/jctools/queues/atomic/MpscAtomicArrayQueueOfferWithThresholdTest.java @@ -0,0 +1,39 @@ +package org.jctools.queues.atomic; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class MpscAtomicArrayQueueOfferWithThresholdTest +{ + + private MpscAtomicArrayQueue queue; + + @Before + public void setUp() throws Exception + { + this.queue = new MpscAtomicArrayQueue(16); + } + + @Test + public void testOfferWithThreshold() + { + int i; + for (i = 0; i < 8; ++i) + { + //Offers succeed because current size is below the HWM. + Assert.assertTrue(this.queue.offerIfBelowThreshold(i, 8)); + } + //Not anymore, our offer got rejected. + Assert.assertFalse(this.queue.offerIfBelowThreshold(i, 8)); + Assert.assertFalse(this.queue.offerIfBelowThreshold(i, 7)); + Assert.assertFalse(this.queue.offerIfBelowThreshold(i, 1)); + Assert.assertFalse(this.queue.offerIfBelowThreshold(i, 0)); + + //Also, the threshold is dynamic and different levels can be set for + //different task priorities. + Assert.assertTrue(this.queue.offerIfBelowThreshold(i, 9)); + Assert.assertTrue(this.queue.offerIfBelowThreshold(i, 16)); + } + +}