Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft of Blocking Queue #46

Closed
wants to merge 27 commits into from
Closed

Conversation

georges-gomes
Copy link
Contributor

Hi Nitsan,

This is not for merge - just to open the discussion about efficient BlockingQueue implementations.

The SpscArrayQueueBlocking here is hand-crafted by inheritance of SpscArrayQueue but ultimately I would prefer to have the BlockingQueue Implementations dynamically generated on demand by the QueueFactory.

Let me know your thoughts before I push it too far.

Cheers
Georges

@georges-gomes georges-gomes changed the title First draft of Blocking Queue Draft of Blocking Queue Jan 14, 2015
@ChristianWulf
Copy link

Good job so far. Here are a few hints:

  • The following code should be instantiated only once at construction and cached in a final field:
new SupplierJDK6<E>() {
@Override
public E get() {
return poll();
}
}
  • All take strategies should be declared final

@nitsanw
Copy link
Contributor

nitsanw commented Jan 14, 2015

I'm with you on codegen. What we want is a class Q to be extended by a generic blocking stategy to give us some sort of mixin of strategy and lock free queue. We can try and do that with @RichardWarburton templates.
I think we should start simpler with a generic wrapper which takes a strategy and queue, delegates all the queue methods and adds the blocking queue methods. Once we've worked out correctness and blocking strategies we can use the templates to generate the queues.

@Override
public E waitFor(SupplierJDK6<E> supplier)
{
while(true) // Should introduce safepoints
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thread.yield() should introduce the safepoint, no?
In any case this loop will be just as correct written naturally:

E e;
while((e = s.get()) == null)
  Thread.yield();
return e;

@nitsanw
Copy link
Contributor

nitsanw commented Jan 15, 2015

Thinking out loud, we have 2 threads, here's a simplified view of what's happening:

P: {
  offer(e);// this is an effective StoreStore
  dummyStoreLoad;// the store to the queue can't get re-ordered with the load of parkedThread
  unpark(parkedThread.get());
}
C: {
  if(e = poll() != null) return e;// This is an optimization, in terms of correctness we can skip
  parkedThread.set(Thread.current()); //StoreLoad, the loads after parkedThread is set can't move up
  while (e = poll() == null)
    park();
  parkedThread.set(null);
}

So the question:
"Can a put to the queue fail to unpark the consumer?"
Can be simplified to the following(v,u are volatile fields):
P {v = 1; if (u == 1) unpark(C);}
C {u = 1; if (v == 0) park(C); u = 0;}

I think it should work, but having a bad week in terms of sleep, so not trusting myself much.

@georges-gomes
Copy link
Contributor Author

That's exactly it.

On Thu Jan 15 2015 at 9:17:14 AM Nitsan Wakart notifications@github.com
wrote:

Thinking out loud, we have 2 threads, here's a simplified view of what's
happening:
P: {
offer(e);// this is an effective StoreStore
dummyStoreLoad;// the store to the queue can't get re-ordered with the
load of parkedThread
unpark(parkedThread.get());
}
C: {
if(e = poll() != null) return e;// This is an optimization, in terms of
correctness we can skip
parkedThread.set(Thread.current()); //StoreLoad, the loads after
parkedThread is set can't move up
while (e = poll() == null)
park();
parkedThread.set(null);
}

So the question:
"Can a put to the queue fail to unpark the consumer?"
Can be simplified to the following(v,u are volatile fields):
P {v = 1; if (u == 1) unpark(C);}
C {u = 1; if (v == 0) park(C); u = 0;}

I think it should work, but having a bad week in terms of sleep, so not
trusting myself much.


Reply to this email directly or view it on GitHub
#46 (comment).

@georges-gomes
Copy link
Contributor Author

@nitsanw could you point me to the entrypoints of the templates of @RichardWarburton?
Thanks

@nitsanw
Copy link
Contributor

nitsanw commented Jan 16, 2015

Have a look at this commit from @RichardWarburton: d8d9a4c

Thanks to the cool Template and SimpleCompiler of @RichardWarburton!
@georges-gomes
Copy link
Contributor Author

The Template and SimpleCompiler classes are pretty cool. Thanks @RichardWarburton!
This is what I made today in a few hours.
Blocking queues on demand.

Next step: Mixin Take and Put strategies?

Your feedback welcomed.

Cheers
GG

@nitsanw
Copy link
Contributor

nitsanw commented Jan 18, 2015

Thanks for all the hard work, this is looking pretty good.
Have you run any benchmarks to compare with ArrayBlockingQueue?

@georges-gomes
Copy link
Contributor Author

I have done some benchmark internally and this form is lighter, probably just by the way it signal.
The latency is also better between producer and consumer wakeup.
I have a few JMH benches that I will port for this JCTools implementation...
Always better to see these things live and sometimes on more than one machine :)

@ChristianWulf
Copy link

I do not see the benefits of the dynamic creation. Why don't you generate blocking implementations as java files? Moreover, a blocking implementation is currently always generated, even if it has been generated once or more before.

@nitsanw
Copy link
Contributor

nitsanw commented Jan 19, 2015

@ChristianWulf dynamic creation can help in minimising the size of the deployed jar especially as more parameters are added to the generated flavours. It can also offer some interesting optimisation options that are otherwise not easy to achieve (e.g. several padding configurations for same queue).
I agree that the generated classes can and should be cached to avoid bloat and allow compiled code to be reused.

@RichardWarburton
Copy link
Contributor

Caching should be evaluated on a case-by-case basis. Specifically code generating new instances can give the JIT compiler opportunities for optimisation that wouldn't be possible otherwise. Obviously it also has a potential tradeoff in terms of additional memory consumption and JIT overhead. So the ability to generate new instances vs cached instances is something that end users looking for optimal performance might want to have control over.

@georges-gomes
Copy link
Contributor Author

Should I just add a cache layer in from of it?

On Mon Jan 19 2015 at 2:38:35 PM Richard Warburton notifications@github.com
wrote:

Caching should be evaluated on a case-by-case basis. Specifically code
generating new instances can give the JIT compiler opportunities for
optimisation that wouldn't be possible otherwise. Obviously it also has a
potential tradeoff in terms of additional memory consumption and JIT
overhead. So the ability to generate new instances vs cached instances is
something that end users looking for optimal performance might want to have
control over.


Reply to this email directly or view it on GitHub
#46 (comment).

Conflicts:
	jctools-core/pom.xml
	jctools-core/src/main/java/org/jctools/queues/MpscLinkedQueue8.java
@daschl
Copy link

daschl commented Feb 18, 2015

Great conversations going on, and since @nitsanw pointed me to this ticket I'd like to chime in with my use case - maybe you can see if this fits or not? :)

I'm currently using the disruptor in a "MPSC" setup, but it only allows you to configure the producer side (so it's set to multi). I'm using it for two specific things:

  • Providing fast feedback backpressure to the producers if they outpace the consumer
  • Batching semantics. Since downstream I'm writing to IO in netty I only flush() when i get the "end of batch" signal from the disruptor in my consumer.

Backpressure can easily be covered anyways, but are the batching semantics that make sense in that approach? Also, I guess making the waiting strategies configurable is a must (?) since I care about latency, but also I can't spin wait because this runs in a customer environment alongside with other services.

Looking forward to help / test / whatever makes sense for you folks :)

@nitsanw
Copy link
Contributor

nitsanw commented Feb 18, 2015

I'm not a fan of the batch end approach in the disruptor as it takes the following form:

      final long availableSequence = sequenceBarrier.waitFor(nextSequence);
      while (nextSequence <= availableSequence)
      {
          event = dataProvider.get(nextSequence);
          eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
          nextSequence++;
      }
      sequence.set(availableSequence);

This leads to:

  • Consumer sequence is updated infrequently, which means producers can stall despite plenty of room in buffer
  • End of batch signal does not mean no further elements are available, just that no further elements were visible when you started this batch
  • The win in updating the consumer index infrequently is very small. This is not true from the producer POV if consumer sequence is queried on each offer, but if the value is cached (as it is in JCTools) than I don't really see the point.

I would suggest you pick a batch size you are happy with because flushing every X messages makes sense, and use the stronger guarantee of poll == null as a further trigger.

@daschl
Copy link

daschl commented Feb 18, 2015

@nitsanw makes sense.. I just wonder what the right batch size is, maybe it could also be latency driven (flush after N µs or when written before and poll == null)

@nitsanw
Copy link
Contributor

nitsanw commented Feb 18, 2015

you are describing a mini-nagles algo:

  • batch end by time since first message in batch
  • batch end by size of batch

For sending over network in less latency sensitive envs. it makes sense to wait for more messages before sending (as does nagles)
This is interesting, but not within the scope of JCTools at the moment.

@daschl
Copy link

daschl commented Feb 18, 2015

yes, thanks - let's not pollute this topic here with it :)

@ChristianWulf
Copy link

What about a SleepTakeStrategy where the consumer goes to bed for a configurable amount of time if the queue is empty?

@nitsanw
Copy link
Contributor

nitsanw commented Feb 19, 2015

Seems like a specialization of the Park one?

On 18 Feb 2015, at 22:57, ChristianWulf notifications@github.com wrote:

What about a SleepTakeStrategy where the consumer goes to bed for a configurable amount of time if the queue is empty?


Reply to this email directly or view it on GitHub.

@ChristianWulf
Copy link

When looking at the park method, then: yes, you're right! I didn't know.
public native void park(boolean isAbsolute, long time);

@nitsanw
Copy link
Contributor

nitsanw commented Feb 19, 2015

In particular it seems redundant for a notification based implementation to wake up periodically as the interface does not allow the thread to 'escape' from the take/put methods.

@georges-gomes
Copy link
Contributor Author

Hi @daschl,

Due to the single consumer relaxed constraint, the waiting Park strategy in this pull request is very fast to wake up. Faster than a object.notify or condition.signal/signalAll that you will found in a ArrayBlockingQueue. I needed something fast to wake up and we have benchmark this wake up time carefully (bench not provided here - but it's a single sleep -> round trip time on the queue).

The backpressure is pretty single as you said, just look for offer() returning false.

For the batching:

Let's create a batching interface

interface BatchingConsumer
{
void startBatch()
void newElement(E e)
void endBatch()
}

In the consuming thread you do something like this:

public void run()
{
    while(true)
    {
         // wait for a first element
         E e = queue.take();

         batchConsumer.startBatch();
         batchConsumer.newElement(e);

         // Spin with some yield
         for(int i=0; i<100;i++)
         {
              E e = queue.poll()
              if (e != null)
                    batchConsumer.newElement(e);
              else
                    Thread.yield();
         }

         batchConsumer.endBatch();
    }
}

This will make sure you don't have more than 100 elements in one batch.
The Thread.yield() will create some time to wait for data, not crazy spinning and not calculating time offs with System.nanotime() all over the place.

But I don't like to wait for data. I would rather do:

public void run()
{
    while(true)
    {
         // wait for a first element
         E e = queue.take();

         batchConsumer.startBatch();
         batchConsumer.newElement(e);

         // Batch
         for(int i=0; i<100;i++)
         {
              E e = queue.poll()
              if (e != null)
                    batchConsumer.newElement(e);
              else
                    break;
         }

         batchConsumer.endBatch();
    }
}

Taking full advantage of the thread parking efficiency.

With some check for QueueSpec support
@daschl
Copy link

daschl commented Feb 23, 2015

@georges-gomes very interesting, thanks for sharing. Now I just need to find time and rip out the disruptor and see what I can get over this :)

@ChristianWulf
Copy link

I found a bug in the waitPoll method. Although it is declared to throw an InterruptedException, it does not since park() does not throw this exception. Thus, I added the if-statement after parking.

    @Override
    public E waitPoll(final Queue<E> q) throws InterruptedException
    {
        E e = q.poll();
        if (e != null)
        {
            return e;
        }

        t.set(Thread.currentThread());

        while ((e = q.poll()) == null) {
            LockSupport.park();
            if (Thread.currentThread().isInterrupted()) {
                throw new InterruptedException("Interrupted while waiting for the queue to become non-empty.");
            }
        }

        t.lazySet(null);

        return e;
    }

@georges-gomes
Copy link
Contributor Author

Yes, you are right! I will add that shortly.

On Mon, Feb 23, 2015 at 7:26 PM ChristianWulf notifications@github.com
wrote:

I found a bug in the waitPoll method. Although it is declared to throw an
InterruptedException, it does not since park() does not throw this
exception. Thus, I added the if-statement after parking.

@Override
public E waitPoll(final Queue<E> q) throws InterruptedException
{
    E e = q.poll();
    if (e != null)
    {
        return e;
    }

    t.set(Thread.currentThread());

while ((e = q.poll()) == null) {
LockSupport.

park();
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("Interrupted while waiting for the queue to become non-empty.");
}
}

    t.lazySet(null);

    return e;
}


Reply to this email directly or view it on GitHub
#46 (comment).

@nitsanw
Copy link
Contributor

nitsanw commented Mar 13, 2015

Second review in progress, will comment as I go...

  1. PutStrategy.backoffOffer - should throw interrupted, as dictated by the BlockingQueue API, and implementation should check for Thread.interrupted and throw for it.
  2. instead of put/take strategy packages, maybe put both under one package called blocking?

@nitsanw
Copy link
Contributor

nitsanw commented Mar 13, 2015

Just a note on the MCParkTakeStrategy that condition.await generates garbage. I hit a case yesterday where replacing LinkedBQ with ArrayBQ was generating more garbage because of that...

@nitsanw
Copy link
Contributor

nitsanw commented Mar 16, 2015

Semi relevant discussion on park/unpark HB rules:
http://jsr166-concurrency.10961.n7.nabble.com/unpark-park-memory-visibility-td11812.html
I don't think it helps us any, because the problem is that we want to prevent the 't' field read from re-ordering with the offer:

offer(e);
// need a StoreLoad barrier because:
unpark(t); <-- volatile read of t, can float up!

@georges-gomes
Copy link
Contributor Author

Interesting!

So there is potentially an issue on MCParkWaitStrat then.

SPParkWaitStrat is good with the volatile write right?

GG

On Mon, Mar 16, 2015, 14:47 Nitsan Wakart notifications@github.com wrote:

Semi relevant discussion on park/unpark HB rules:

http://jsr166-concurrency.10961.n7.nabble.com/unpark-park-memory-visibility-td11812.html
I don't think it helps us any, because the problem is that we want to
prevent the 't' field read from re-ordering with the offer:

offer(e);
// need a StoreLoad barrier because:
unpark(t); <-- volatile read of t, can float up!


Reply to this email directly or view it on GitHub
#46 (comment).

@georges-gomes
Copy link
Contributor Author

I mean SCParkWaitStrat

On Mon, Mar 16, 2015, 15:42 Georges Gomes georges.gomes@gmail.com wrote:

Interesting!

So there is potentially an issue on MCParkWaitStrat then.

SPParkWaitStrat is good with the volatile write right?

GG

On Mon, Mar 16, 2015, 14:47 Nitsan Wakart notifications@github.com
wrote:

Semi relevant discussion on park/unpark HB rules:
http://jsr166-concurrency.10961.n7.nabble.com/unpark-
park-memory-visibility-td11812.html
I don't think it helps us any, because the problem is that we want to
prevent the 't' field read from re-ordering with the offer:

offer(e);
// need a StoreLoad barrier because:
unpark(t); <-- volatile read of t, can float up!


Reply to this email directly or view it on GitHub
#46 (comment).

@georges-gomes
Copy link
Contributor Author

Hi Nitsan, I pushed your comments.

Regarding the signaling:

##MCParkTakeStartegy
We are good because of the lock - ordering is safe
(I still need to figure out how to remove Condition if it generate garbage... Do you recommend we go for wait/notify?)

    @Override
    public void signal()
    {
        ReentrantLock l = lock;
        l.lock();
        try
        {
            if (waiters>0)
            {
                cond.signal();
            }
        }
        finally
        {
            l.unlock();
        }
    }

##SCParkTakeStrategy
We are good with the volatile write and t.get() volatile read this will create a StoreLoad barrier right?

    public volatile int storeFence = 0;

    @Override
    public void signal()
    {
        // Make sure the offer is visible before unpark
        storeFence = 1; // store barrier

        LockSupport.unpark(t.get()); // t.get() load barrier
    }

Let me know if you think something is not safe.

Cheers
GG

@ChristianWulf
Copy link

According to http://shipilev.net/blog/2014/on-the-fence-with-dependencies/, it depends on the JVM implementation whether a write to or a read from a volatile variable inserts a StoreLoad barrier.

Since you use both in SCParkTakeStrategy, this strategy should work correctly for all JVM implementations.

@nitsanw
Copy link
Contributor

nitsanw commented Apr 18, 2015

Ok, I'm going to merge this fucker :-)
But I'll be moving it to experimental and playing with it a bit before unleashing onto the core.

@nitsanw
Copy link
Contributor

nitsanw commented Apr 18, 2015

merged!

@nitsanw nitsanw closed this Apr 18, 2015
@georges-gomes
Copy link
Contributor Author

May the force be with you :)
At your service if you need additional hands for some changes.
Cheers
GG

On Sat, Apr 18, 2015, 16:37 Nitsan Wakart notifications@github.com wrote:

Ok, I'm going to merge this fucker :-)
But I'll be moving it to experimental and playing with it a bit before
unleashing onto the core.


Reply to this email directly or view it on GitHub
#46 (comment).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

8 participants