New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

XADD MultiProducer SingleConsumer Array Queue #195

Merged
merged 1 commit into from Sep 27, 2017

Conversation

Projects
None yet
3 participants
@franz1981
Contributor

franz1981 commented Sep 14, 2017

The mechanics of this queue is similar on how Aeron's Publication (aka the LogBuffer) works, but using only 2 partitions and without background cleaning of the consumed elements.
When hit by a single producer the offer and poll has the same semantics of the MpscArrayQueue, otherwise there is the chance that more than the configured capacity will be written (without element losses), exactly as the Aeron's LogBuffer.
Benchmarks shows (throughput-wise) perf similar to MpscArrayQueue with Single Producer and a better scalability otherwise (mainly due to XADD).

@franz1981

This comment has been minimized.

Show comment
Hide comment
@franz1981

franz1981 Sep 14, 2017

Contributor

@nitsanw The name Relaxed comes from the backpressure model that became more relaxed with more producers involved.

Contributor

franz1981 commented Sep 14, 2017

@nitsanw The name Relaxed comes from the backpressure model that became more relaxed with more producers involved.

@nitsanw

Will continue reading later.

Show outdated Hide outdated jctools-core/src/main/java/org/jctools/queues/MpscRelaxedArrayQueue.java
@Override
public boolean isEmpty()
{
return size() == 0;

This comment has been minimized.

@nitsanw

nitsanw Sep 19, 2017

Contributor

Should be:

       final long producerPosition = producerPosition(
            cycleId(producerCycleClaim, this.cycleIdBitShift),
            positionOnCycle(producerCycleClaim, this.positionOnCycleMask),
            this.cycleLengthLog2);
        return (producerPosition == consumerPosition);

?

@nitsanw

nitsanw Sep 19, 2017

Contributor

Should be:

       final long producerPosition = producerPosition(
            cycleId(producerCycleClaim, this.cycleIdBitShift),
            positionOnCycle(producerCycleClaim, this.positionOnCycleMask),
            this.cycleLengthLog2);
        return (producerPosition == consumerPosition);

?

This comment has been minimized.

@franz1981

franz1981 Sep 19, 2017

Contributor

good point!!!

@franz1981

franz1981 Sep 19, 2017

Contributor

good point!!!

This comment has been minimized.

@franz1981

franz1981 Sep 19, 2017

Contributor

Implementing it like this in the atomic version of the q:

    @Override
    public boolean isEmpty()
    {
        final long consumerPosition = lvConsumerPosition();
        final int activeCycleIndex = lvActiveCycleIndex();
        final long producerCycleClaim = AtomicLongArrayAccess.lvValue(this.producerCycleClaim, activeCycleIndex);
        final long producerPosition = producerPosition(
            cycleId(producerCycleClaim, this.cycleIdBitShift),
            positionOnCycle(producerCycleClaim, this.positionOnCycleMask),
            this.cycleLengthLog2);
        return (producerPosition == consumerPosition);
    }

It will cause a fail in the sanity test:

java.lang.AssertionError: Observed no element in non-empty queue 
Expected :0
Actual   :350
 <Click to see difference>


	at org.junit.Assert.fail(Assert.java:88)
	at org.junit.Assert.failNotEquals(Assert.java:834)
	at org.junit.Assert.assertEquals(Assert.java:645)
	at org.jctools.queues.QueueSanityTest.testPollAfterIsEmpty(QueueSanityTest.java:361)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
	at org.junit.runners.Suite.runChild(Suite.java:128)
	at org.junit.runners.Suite.runChild(Suite.java:27)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
	at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
	at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
	at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
@franz1981

franz1981 Sep 19, 2017

Contributor

Implementing it like this in the atomic version of the q:

    @Override
    public boolean isEmpty()
    {
        final long consumerPosition = lvConsumerPosition();
        final int activeCycleIndex = lvActiveCycleIndex();
        final long producerCycleClaim = AtomicLongArrayAccess.lvValue(this.producerCycleClaim, activeCycleIndex);
        final long producerPosition = producerPosition(
            cycleId(producerCycleClaim, this.cycleIdBitShift),
            positionOnCycle(producerCycleClaim, this.positionOnCycleMask),
            this.cycleLengthLog2);
        return (producerPosition == consumerPosition);
    }

It will cause a fail in the sanity test:

java.lang.AssertionError: Observed no element in non-empty queue 
Expected :0
Actual   :350
 <Click to see difference>


	at org.junit.Assert.fail(Assert.java:88)
	at org.junit.Assert.failNotEquals(Assert.java:834)
	at org.junit.Assert.assertEquals(Assert.java:645)
	at org.jctools.queues.QueueSanityTest.testPollAfterIsEmpty(QueueSanityTest.java:361)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
	at org.junit.runners.Suite.runChild(Suite.java:128)
	at org.junit.runners.Suite.runChild(Suite.java:27)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
	at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
	at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
	at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

This comment has been minimized.

@franz1981

franz1981 Sep 19, 2017

Contributor

@nitsanw
Turning it into this seems to fix that:

    @Override
    public boolean isEmpty()
    {
        final AtomicLongArray producerCycleClaims = this.producerCycleClaim;
        final int cycleLength = this.cycleLength;
        final int positionOnCycleMask = this.positionOnCycleMask;
        final int cycleIdBitShift = this.cycleIdBitShift;
        final int cycleLengthLog2 = this.cycleLengthLog2;
        final long consumerPosition = lvConsumerPosition();
        int activeCycleIndex;
        long producerCycleClaim;
        int positionOnCycle;
        do
        {
            activeCycleIndex = lvActiveCycleIndex();
            producerCycleClaim = AtomicLongArrayAccess.lvValue(producerCycleClaims, activeCycleIndex);
        }
        while ((positionOnCycle = positionOnCycle(producerCycleClaim, positionOnCycleMask)) > cycleLength);

        final long producerPosition = producerPosition(
            cycleId(producerCycleClaim, cycleIdBitShift),
            positionOnCycle,
            cycleLengthLog2);
        return (producerPosition == consumerPosition);
    }

Makes sense?
But TBH I'm not seeing such a big improvement vs the original implementation...

@franz1981

franz1981 Sep 19, 2017

Contributor

@nitsanw
Turning it into this seems to fix that:

    @Override
    public boolean isEmpty()
    {
        final AtomicLongArray producerCycleClaims = this.producerCycleClaim;
        final int cycleLength = this.cycleLength;
        final int positionOnCycleMask = this.positionOnCycleMask;
        final int cycleIdBitShift = this.cycleIdBitShift;
        final int cycleLengthLog2 = this.cycleLengthLog2;
        final long consumerPosition = lvConsumerPosition();
        int activeCycleIndex;
        long producerCycleClaim;
        int positionOnCycle;
        do
        {
            activeCycleIndex = lvActiveCycleIndex();
            producerCycleClaim = AtomicLongArrayAccess.lvValue(producerCycleClaims, activeCycleIndex);
        }
        while ((positionOnCycle = positionOnCycle(producerCycleClaim, positionOnCycleMask)) > cycleLength);

        final long producerPosition = producerPosition(
            cycleId(producerCycleClaim, cycleIdBitShift),
            positionOnCycle,
            cycleLengthLog2);
        return (producerPosition == consumerPosition);
    }

Makes sense?
But TBH I'm not seeing such a big improvement vs the original implementation...

@nitsanw

This comment has been minimized.

Show comment
Hide comment
@nitsanw

nitsanw Sep 19, 2017

Contributor

Thanks, this is a valuable port, I'll read carefully.
I'm not sure about the naming, but this can be worked out later.

Contributor

nitsanw commented Sep 19, 2017

Thanks, this is a valuable port, I'll read carefully.
I'm not sure about the naming, but this can be worked out later.

@franz1981

This comment has been minimized.

Show comment
Hide comment
@franz1981

franz1981 Sep 19, 2017

Contributor

@nitsanw

I'm not sure about the naming

I'm always so bad to name things argh

Contributor

franz1981 commented Sep 19, 2017

@nitsanw

I'm not sure about the naming

I'm always so bad to name things argh

@nitsanw

This comment has been minimized.

Show comment
Hide comment
@nitsanw

nitsanw Sep 19, 2017

Contributor

@mjpt777 AFAIK this is your algorithm, and you are a passionate namer, do you have a suggestion for a name to reflect the algorithm?
MpscDoubleBufferedQueue? MpscThompsonQueue? MpscLogBufferQueue?

Contributor

nitsanw commented Sep 19, 2017

@mjpt777 AFAIK this is your algorithm, and you are a passionate namer, do you have a suggestion for a name to reflect the algorithm?
MpscDoubleBufferedQueue? MpscThompsonQueue? MpscLogBufferQueue?

@franz1981

This comment has been minimized.

Show comment
Hide comment
@franz1981

franz1981 Sep 19, 2017

Contributor

@mjpt777 @nitsanw

MpscThompsonQueue

💯 I'm not aware of any other q (or ring buffer) with a similar mechanics, hence it is well deserved 👍

Contributor

franz1981 commented Sep 19, 2017

@mjpt777 @nitsanw

MpscThompsonQueue

💯 I'm not aware of any other q (or ring buffer) with a similar mechanics, hence it is well deserved 👍

@franz1981

This comment has been minimized.

Show comment
Hide comment
@franz1981

franz1981 Sep 19, 2017

Contributor

@mjpt777 @nitsanw I've added a second commit with the producer claim fields inlined into the q class.
If it is implemented in a safe/right way I will squash it in the first commit

Contributor

franz1981 commented Sep 19, 2017

@mjpt777 @nitsanw I've added a second commit with the producer claim fields inlined into the q class.
If it is implemented in a safe/right way I will squash it in the first commit

@franz1981

This comment has been minimized.

Show comment
Hide comment
@franz1981

franz1981 Sep 20, 2017

Contributor

I've modified the benchmarks of https://github.com/real-logic/benchmarks to run the ManyToOneConcurrentArrayQueueBenchmark bench (burst length = 100) vs MpscRelaxedArrayQueue/MpscArrayQueue using a relaxedPoll (to match the LogBuffer consume semantic) and this is what I'm getting:

Benchmark                                                             (burstLength)    Mode     Cnt       Score     Error  Units
AeronIpcBenchmark.test1Producer                                                 100  sample  116035    5602.900 ± 126.159  ns/op
AeronIpcBenchmark.test1Producer:test1Producer·p0.99                             100  sample           24160.000            ns/op
AeronIpcBenchmark.test2Producers                                                100  sample  219839   11341.917 ±  66.733  ns/op
AeronIpcBenchmark.test2Producers:test2Producers·p0.99                           100  sample           38848.000            ns/op
AeronIpcBenchmark.test3Producers                                                100  sample  247221   15645.017 ± 232.384  ns/op
AeronIpcBenchmark.test3Producers:test3Producers·p0.99                           100  sample           49472.000            ns/op

MpscArrayQueueBenchmark.test1Producer                                           100  sample  161740    3906.104 ±  16.878  ns/op
MpscArrayQueueBenchmark.test1Producer:test1Producer·p0.99                       100  sample           17216.000            ns/op
MpscArrayQueueBenchmark.test2Producers                                          100  sample  223930   11185.586 ±  23.686  ns/op
MpscArrayQueueBenchmark.test2Producers:test2Producers·p0.99                     100  sample           27776.000            ns/op
MpscArrayQueueBenchmark.test3Producers                                          100  sample  310557   24080.455 ±  18.060  ns/op
MpscArrayQueueBenchmark.test3Producers:test3Producers·p0.99                     100  sample           30144.000            ns/op

MpscRelaxedArrayQueueBenchmark.test1Producer                                    100  sample  139179    4527.424 ±  18.276  ns/op
MpscRelaxedArrayQueueBenchmark.test1Producer:test1Producer·p0.99                100  sample           19430.400            ns/op
MpscRelaxedArrayQueueBenchmark.test2Producers                                   100  sample  282486    8875.034 ±  14.771  ns/op
MpscRelaxedArrayQueueBenchmark.test2Producers:test2Producers·p0.99              100  sample           23104.000            ns/op
MpscRelaxedArrayQueueBenchmark.test3Producers                                   100  sample  324322   11579.367 ±  10.287  ns/op
MpscRelaxedArrayQueueBenchmark.test3Producers:test3Producers·p0.99              100  sample           12704.000            ns/op

P.S: the LogBuffer is writing much more than the queues but it doesn't suffer any write barrier while clearing/offering the messages of the buffer: this makes this experiment far from being fair, but it is good enough to show if the benefits of using XADD are similar to those of the original algorithm.

While using the Queue:poll semantic:

MpscRelaxedArrayQueueBenchmark.test1Producer                                    100  sample   98782    6383.736 ±  26.957  ns/op
MpscRelaxedArrayQueueBenchmark.test1Producer:test1Producer·p0.99                100  sample           22405.440            ns/op
MpscRelaxedArrayQueueBenchmark.test2Producers                                   100  sample  262446    9558.282 ±  18.908  ns/op
MpscRelaxedArrayQueueBenchmark.test2Producers:test2Producers·p0.99              100  sample           25664.000            ns/op
MpscRelaxedArrayQueueBenchmark.test3Producers                                   100  sample  311647   12116.275 ±  78.396  ns/op
MpscRelaxedArrayQueueBenchmark.test3Producers:test3Producers·p0.99              100  sample           20864.000            ns/op

Having the spin loop on Queue::poll defined here:

    private E pollMaybeEmpty(E[] buffer, final long offset, final long consumerPosition)
    {
        [...evilish code...]
        else
        {
            E e;
            while ((e = lvElement(buffer, offset)) == null)
            {

            }
            [...the q has received a counterpunch after this!]
            [...other evilish code...]
        }
    }

seems the one that hit the latency of a burst of offers probably due to a near empty thrashing issue as mentioned in several posts of @nitsanw (eg http://psy-lob-saw.blogspot.it/2013/11/spsc-iv-look-at-bqueue.html): just adding a Blackhole::consumeCPU on the consumer side (we are doing real stuff in a real world example) it simply disappear.

Contributor

franz1981 commented Sep 20, 2017

I've modified the benchmarks of https://github.com/real-logic/benchmarks to run the ManyToOneConcurrentArrayQueueBenchmark bench (burst length = 100) vs MpscRelaxedArrayQueue/MpscArrayQueue using a relaxedPoll (to match the LogBuffer consume semantic) and this is what I'm getting:

Benchmark                                                             (burstLength)    Mode     Cnt       Score     Error  Units
AeronIpcBenchmark.test1Producer                                                 100  sample  116035    5602.900 ± 126.159  ns/op
AeronIpcBenchmark.test1Producer:test1Producer·p0.99                             100  sample           24160.000            ns/op
AeronIpcBenchmark.test2Producers                                                100  sample  219839   11341.917 ±  66.733  ns/op
AeronIpcBenchmark.test2Producers:test2Producers·p0.99                           100  sample           38848.000            ns/op
AeronIpcBenchmark.test3Producers                                                100  sample  247221   15645.017 ± 232.384  ns/op
AeronIpcBenchmark.test3Producers:test3Producers·p0.99                           100  sample           49472.000            ns/op

MpscArrayQueueBenchmark.test1Producer                                           100  sample  161740    3906.104 ±  16.878  ns/op
MpscArrayQueueBenchmark.test1Producer:test1Producer·p0.99                       100  sample           17216.000            ns/op
MpscArrayQueueBenchmark.test2Producers                                          100  sample  223930   11185.586 ±  23.686  ns/op
MpscArrayQueueBenchmark.test2Producers:test2Producers·p0.99                     100  sample           27776.000            ns/op
MpscArrayQueueBenchmark.test3Producers                                          100  sample  310557   24080.455 ±  18.060  ns/op
MpscArrayQueueBenchmark.test3Producers:test3Producers·p0.99                     100  sample           30144.000            ns/op

MpscRelaxedArrayQueueBenchmark.test1Producer                                    100  sample  139179    4527.424 ±  18.276  ns/op
MpscRelaxedArrayQueueBenchmark.test1Producer:test1Producer·p0.99                100  sample           19430.400            ns/op
MpscRelaxedArrayQueueBenchmark.test2Producers                                   100  sample  282486    8875.034 ±  14.771  ns/op
MpscRelaxedArrayQueueBenchmark.test2Producers:test2Producers·p0.99              100  sample           23104.000            ns/op
MpscRelaxedArrayQueueBenchmark.test3Producers                                   100  sample  324322   11579.367 ±  10.287  ns/op
MpscRelaxedArrayQueueBenchmark.test3Producers:test3Producers·p0.99              100  sample           12704.000            ns/op

P.S: the LogBuffer is writing much more than the queues but it doesn't suffer any write barrier while clearing/offering the messages of the buffer: this makes this experiment far from being fair, but it is good enough to show if the benefits of using XADD are similar to those of the original algorithm.

While using the Queue:poll semantic:

MpscRelaxedArrayQueueBenchmark.test1Producer                                    100  sample   98782    6383.736 ±  26.957  ns/op
MpscRelaxedArrayQueueBenchmark.test1Producer:test1Producer·p0.99                100  sample           22405.440            ns/op
MpscRelaxedArrayQueueBenchmark.test2Producers                                   100  sample  262446    9558.282 ±  18.908  ns/op
MpscRelaxedArrayQueueBenchmark.test2Producers:test2Producers·p0.99              100  sample           25664.000            ns/op
MpscRelaxedArrayQueueBenchmark.test3Producers                                   100  sample  311647   12116.275 ±  78.396  ns/op
MpscRelaxedArrayQueueBenchmark.test3Producers:test3Producers·p0.99              100  sample           20864.000            ns/op

Having the spin loop on Queue::poll defined here:

    private E pollMaybeEmpty(E[] buffer, final long offset, final long consumerPosition)
    {
        [...evilish code...]
        else
        {
            E e;
            while ((e = lvElement(buffer, offset)) == null)
            {

            }
            [...the q has received a counterpunch after this!]
            [...other evilish code...]
        }
    }

seems the one that hit the latency of a burst of offers probably due to a near empty thrashing issue as mentioned in several posts of @nitsanw (eg http://psy-lob-saw.blogspot.it/2013/11/spsc-iv-look-at-bqueue.html): just adding a Blackhole::consumeCPU on the consumer side (we are doing real stuff in a real world example) it simply disappear.

@franz1981

This comment has been minimized.

Show comment
Hide comment
@franz1981

franz1981 Sep 22, 2017

Contributor

I've written a long running test with multiple producers and sometimes it got stuck with only few producers capable to make progress and it happens more often when a small (~128) capacity is configured: I'm trying to figure it out what's happening (probably a math error on my side while offering).

Contributor

franz1981 commented Sep 22, 2017

I've written a long running test with multiple producers and sometimes it got stuck with only few producers capable to make progress and it happens more often when a small (~128) capacity is configured: I'm trying to figure it out what's happening (probably a math error on my side while offering).

@franz1981

This comment has been minimized.

Show comment
Hide comment
@franz1981

franz1981 Sep 22, 2017

Contributor

I've fixed (and already squashed) the math error: the original heuristic built to allow long lived queues with small capacity wasn't good enough to cope with the concurrent backoff/overclaiming during a cycle rotation: I've changed it with another one that seems able to increase the queue lifecycle without exposing it to overclaims that could change the claimed cycleId.
If makes sense I could add a check on the maximum claimed position like this:
https://github.com/real-logic/aeron/blob/5ead930a6bd145acaf235a2905fbcbac19c89bae/aeron-client/src/main/java/io/aeron/Publication.java#L587
But throwing an exception when it happens.

Contributor

franz1981 commented Sep 22, 2017

I've fixed (and already squashed) the math error: the original heuristic built to allow long lived queues with small capacity wasn't good enough to cope with the concurrent backoff/overclaiming during a cycle rotation: I've changed it with another one that seems able to increase the queue lifecycle without exposing it to overclaims that could change the claimed cycleId.
If makes sense I could add a check on the maximum claimed position like this:
https://github.com/real-logic/aeron/blob/5ead930a6bd145acaf235a2905fbcbac19c89bae/aeron-client/src/main/java/io/aeron/Publication.java#L587
But throwing an exception when it happens.

@franz1981

This comment has been minimized.

Show comment
Hide comment
@franz1981

franz1981 Sep 23, 2017

Contributor

I'm going to fix the claim initialisation on cycle rotation due to this: real-logic/aeron#403

Contributor

franz1981 commented Sep 23, 2017

I'm going to fix the claim initialisation on cycle rotation due to this: real-logic/aeron#403

@franz1981

This comment has been minimized.

Show comment
Hide comment
@franz1981

franz1981 Sep 24, 2017

Contributor

@mjpt777 I've just added a fix to address the same issue referenced on real-logic/aeron@d55b9b8
Now the tests with multiple producer threads (128 on a 4 cores machine) are not failing and corrupting not consumed data anymore.

Contributor

franz1981 commented Sep 24, 2017

@mjpt777 I've just added a fix to address the same issue referenced on real-logic/aeron@d55b9b8
Now the tests with multiple producer threads (128 on a 4 cores machine) are not failing and corrupting not consumed data anymore.

@franz1981

This comment has been minimized.

Show comment
Hide comment
@franz1981

franz1981 Sep 25, 2017

Contributor

There is another issue related to this real-logic/aeron@8735ef4 that is needed to be ported too

Contributor

franz1981 commented Sep 25, 2017

There is another issue related to this real-logic/aeron@8735ef4 that is needed to be ported too

@franz1981

This comment has been minimized.

Show comment
Hide comment
@franz1981

franz1981 Sep 25, 2017

Contributor

I've added a commit to address it in a way similar to those on Aeron's LogBuffer.

Contributor

franz1981 commented Sep 25, 2017

I've added a commit to address it in a way similar to those on Aeron's LogBuffer.

@franz1981

This comment has been minimized.

Show comment
Hide comment
@franz1981

franz1981 Sep 25, 2017

Contributor

I've added a commit to detect exhaustion of:

  • position on cycle: mainly due to overclaiming (ie small capacity, big number of concurrent producers)
  • claim id: mainly time related (ie small capacity and high throughput long running producers)
Contributor

franz1981 commented Sep 25, 2017

I've added a commit to detect exhaustion of:

  • position on cycle: mainly due to overclaiming (ie small capacity, big number of concurrent producers)
  • claim id: mainly time related (ie small capacity and high throughput long running producers)

@nitsanw nitsanw merged commit c848836 into JCTools:master Sep 27, 2017

1 check passed

continuous-integration/travis-ci/pr The Travis CI build passed
Details
@nitsanw

This comment has been minimized.

Show comment
Hide comment
@nitsanw

nitsanw Sep 27, 2017

Contributor

Many thanks @franz1981 I've got some style tweaks and bits, but overall looks good

Contributor

nitsanw commented Sep 27, 2017

Many thanks @franz1981 I've got some style tweaks and bits, but overall looks good

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment