New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AbstractNodeQueue suffers from nepotism #19216

Closed
bantonsson opened this Issue Dec 17, 2015 · 28 comments

Comments

Projects
None yet
8 participants
@bantonsson
Copy link
Member

bantonsson commented Dec 17, 2015

The next references in abstract node queue are never cleared, which leads to promoted dead nodes pulling new dead nodes into the old space of the garbage collector.

@bantonsson bantonsson self-assigned this Dec 17, 2015

@bantonsson bantonsson added this to the 2.3.x milestone Dec 17, 2015

bantonsson added a commit to bantonsson/akka that referenced this issue Dec 17, 2015

rkuhn added a commit that referenced this issue Dec 17, 2015

bantonsson added a commit that referenced this issue Dec 17, 2015

@rkuhn rkuhn closed this Dec 17, 2015

bantonsson added a commit to bantonsson/akka that referenced this issue Dec 18, 2015

@bantonsson

This comment has been minimized.

Copy link
Member

bantonsson commented Dec 18, 2015

Values suffer from the same issue as nodes. Added another small fix.

@guidomedina

This comment has been minimized.

Copy link
Contributor

guidomedina commented Mar 4, 2016

I haven't got issues because I replaced my default mailbox implementation to use:

https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/MpscLinkedQueue.java

I can see that originally AbstractNodeQueue was based from old code of that project, but ... it has evolved so you might want to take a look there.

Note: I know it is bounded in this case but the AbstractNodeQueue source is the same.

Just my two cents, HTH.

@MQ-EL

This comment has been minimized.

Copy link
Contributor

MQ-EL commented Mar 10, 2016

Hey @guidomedina . I write three test code for compare ANQ(nepotism exists), ANQ(nepotism fixed) and JCTools version. The result is the JCTools version has the same problem as ANQ(nepotism). Above the nepotism problem, I didn't see it has any difference with an old ANQ.

@nitsanw

This comment has been minimized.

Copy link
Contributor

nitsanw commented Mar 10, 2016

@MQ-EL can you please share your test?

@MQ-EL

This comment has been minimized.

Copy link
Contributor

MQ-EL commented Mar 10, 2016

MpscLinkedAtomicQueue (JCTool version)

jvm args:

 -Xmx2g -Xms2g -XX:NewRatio=1 -XX:SurvivorRatio=3 -Xloggc:"E:\mpsc.log"  -XX:+PrintGCDetails -XX:+PrintGCDateStamps 

test code:

package org.jctools.queues.atomic

object UnboundedMPSCJCToolImpl extends App {

  type TestQueue = MpscLinkedAtomicQueue[TestValue] 

  val allNr = 1 to 1000000 * 2
  val cycle = allNr.length / 2
  val batchNr = 0 to 10
  val nq = new TestQueue()
  nq.offer(TestValue(-1))
  var firstNode = nq.lpConsumerNode.lvNext() // keep it for drag to old gen

  new Thread(new Runnable {
    override def run(): Unit = {
      allNr foreach {
        i  
          batchNr.foreach(i  nq.offer(TestValue(i)))
          batchNr.foreach(i  nq.poll())
          if (i > cycle && firstNode != null) {
            firstNode = null
            println("Release first node :-)")
          }
      }
      println("over!!")
    }
  }).start()
}

case class TestValue(i: Long) {
  val array = (1 to 200).toArray
}

GC log:mpsc.txt could use GCViewer for detail

rough memory pic:
image

the leftest bar is tenured, middle is eden, rightest is survivor

@nitsanw

This comment has been minimized.

Copy link
Contributor

nitsanw commented Mar 10, 2016

@MQ-EL not sure how this is a valid test, because stealing an internal field ref and keeping it around is not valid usage...

@MQ-EL

This comment has been minimized.

Copy link
Contributor

MQ-EL commented Mar 10, 2016

Old ANQ version

jvm args is the same as above

code:

package akka.dispatch.actor.nodequeue

import akka.dispatch.ANQBeforeFix //THIS IS OLD VERSION COPY FORM AKKA 2.4.0

object UnboundedMPSCTest extends App {

//  type TestQueue = AbstractNodeQueue[Value] // akka 2.4.2, bug already fixed version
//  type TestNode = AbstractNodeQueue.Node[Value] // akka 2.4.2, bug already fixed version

    type TestQueue = ANQBeforeFix[Value] // akka 2.4.0, bug exists version
    type TestNode = ANQBeforeFix.Node[Value] // akka 2.4.0, bug exists version

  val allNr = 1 to 1000000 * 2
  val cycle = allNr.length / 2
  val nq = new TestQueue() {}
  val batchNr = 0 to 10
  var firstNode = new TestNode(Value(-1)) // keep it for drag to old gen
  nq.addNode(firstNode)

  new Thread(new Runnable {
    override def run(): Unit = {
      allNr foreach {
        i 
          batchNr.foreach(i  nq.add(Value(i)))
          batchNr.foreach(i  nq.poll())
          if (i > cycle && firstNode != null) {
            firstNode = null
            println("Release first node :-)")
          }
      }
      println("over!!")
    }
  }).start()
}

case class Value(i: Long) {
  val array = (1 to 200).toArray
}

gc log: mpsc.txt

rough pic just for pick:
image

@MQ-EL

This comment has been minimized.

Copy link
Contributor

MQ-EL commented Mar 10, 2016

This is a simulation for true environment. when the eden is a fix size, and the app may create many new objects, which could prompt the NODE object to the old gen, because the eden was overflow. so this phenomenon is not always happened in true environment, it depends on app code whether create many many new object in the same time. e.g. some actor receive per msg and create some new msg

@nitsanw

This comment has been minimized.

Copy link
Contributor

nitsanw commented Mar 10, 2016

I also think this is now broken after your fix:

public final int count() {
    int count = 0;
    for(Node<T> n = peekNode();n != null && count < Integer.MAX_VALUE; n = n.next())
      ++count;
    return count;
}

If a non-consumer calls count they will see broken count.

@nitsanw

This comment has been minimized.

Copy link
Contributor

nitsanw commented Mar 10, 2016

You can force promotion using System.gc() and see if the promoted but no longer referenced nodes cause an issue for young collections.

@MQ-EL

This comment has been minimized.

Copy link
Contributor

MQ-EL commented Mar 10, 2016

Fixed ANQ version:

the code is the same as above(just need to comment/uncoment the type def)

gc log: mpsc.txt

pic:image

@bantonsson

This comment has been minimized.

Copy link
Member

bantonsson commented Mar 10, 2016

@nitsanw AFAICS the count is only broken if the non consumer gets overtaken by the consumer consuming the node the non consumer is counting, and then what does a count of a concurrently dequeued queue really mean.

@nitsanw

This comment has been minimized.

Copy link
Contributor

nitsanw commented Mar 10, 2016

less than it did before...

@rkuhn

This comment has been minimized.

Copy link
Collaborator

rkuhn commented Mar 10, 2016

Not sure I agree: if pulling things out of the queue is faster than counting, then the count means nothing regardless. In fact I tend to think that count is not a reasonable operation to offer on such a queue at all.

@rkuhn

This comment has been minimized.

Copy link
Collaborator

rkuhn commented Mar 10, 2016

Oh, random thought: count might not actually ever terminate if queue throughput matches counting speed, and the value is meaningless for any scenario where the speeds are not vastly different.

@nitsanw

This comment has been minimized.

Copy link
Contributor

nitsanw commented Mar 10, 2016

JCTools size would:

    LinkedQueueNode<E> chaserNode = lvConsumerNode();
    final LinkedQueueNode<E> producerNode = lvProducerNode();
    int size = 0;
    // must chase the nodes all the way to the producer node, but there's no need to chase a moving target.
    while (chaserNode != producerNode && size < Integer.MAX_VALUE) {
        LinkedQueueNode<E> next;
        while((next = chaserNode.lvNext()) == null);
        chaserNode = next;
        size++;
    }
    return size;

Also correctly handles producer induced bubble is the queue which you handle correctly for poll but not here.

@nitsanw

This comment has been minimized.

Copy link
Contributor

nitsanw commented Mar 10, 2016

it's fine to say "we expect little from this effort at estimating size" BTW, I have no issue with it.

@nitsanw

This comment has been minimized.

Copy link
Contributor

nitsanw commented Mar 10, 2016

@MQ-EL I have verified the issue without resorting to artificially keeping tail alive:

public static void main(String[] args) throws InterruptedException {
    SpscLinkedQueue<Integer> q = new SpscLinkedQueue<>();
    q.offer(1);
    System.gc();
    Thread.sleep(100);
    System.gc();
    Thread.sleep(100);
    System.gc();
    q.poll();

    while(true) {
        for (int i = 0; i < 4096; i++) {
          q.offer(1);
          q.poll();
        }
        Thread.sleep(100);
    }
}

Examining with visual GC or the GC log files confirms that the problem is real.

@nitsanw

This comment has been minimized.

Copy link
Contributor

nitsanw commented Mar 10, 2016

OK, reading though the CLQ solution to this issue, I think I will take their lead. Essentially they set next to the node (because null next now has different meaning). In size they recognize the issue with count above and skip to head if they identify the race condition:

/* Following code from ConcurrentLinkedQueue in JDK8u51 */
public int size() {
    int count = 0;
    for (Node<E> p = first(); p != null; p = succ(p))
        if (p.item != null)
            // Collection.size() spec says to max out
            if (++count == Integer.MAX_VALUE)
                break;
    return count;
}
final Node<E> succ(Node<E> p) {
    Node<E> next = p.next;
    return (p == next) ? head : next;
}
@nitsanw

This comment has been minimized.

Copy link
Contributor

nitsanw commented Mar 10, 2016

FYI: Fixed on core JCTools queues with JCTools/JCTools@40bfd7d

Will follow up similarly for the atomic ones

@MQ-EL

This comment has been minimized.

Copy link
Contributor

MQ-EL commented Mar 11, 2016

@nitsanw 👍 , your test is more clear than my,.
AFAICS, set next to itself insteadof null is a good solution to fix size() endless loop bug in #19949 . WDYT @bantonsson @nitsanw

@MQ-EL

This comment has been minimized.

Copy link
Contributor

MQ-EL commented Mar 11, 2016

may be a good idea to use JCTool in my production environment instead of old one, thanks nitsanw :)

@guidomedina

This comment has been minimized.

Copy link
Contributor

guidomedina commented Mar 15, 2016

FYI:

JCtools 1.2 released:

  • Added MpscChunkedArrayQueue an MPSC bounded queue aiming to replace current usage of MpscLinkedQueue in usecases where low footprint AND low GC churn are desirable. This is acheived through usage of smaller buffers which are then linked to either bigger buffers or same sized buffers as queue size demands.
  • Fixed a GC nepotism issue in linked queues. This is not a bug but an observable generational GC side effect causing false promotion of linked nodes because of a reference from a promoted dead node. See discussion here: #19216
  • Fixed an inconsistently handled exception on offering null elements. This was a bug in MpmcArrayQueue.
  • Formatting and refactoring
@MQ-EL

This comment has been minimized.

Copy link
Contributor

MQ-EL commented Mar 17, 2016

HI @guidomedina,@nitsanw, when i peek source code of MPSC in JCTools, does S means the same single thread all the time not at a time? Does it do anything to protect the visibility of consumerNode? In akka world, the need is "at a time", the actor will consume it in defferent thread at a defferent time. If i miss something , please tell me :)

@nitsanw

This comment has been minimized.

Copy link
Contributor

nitsanw commented Mar 17, 2016

@MQ-EL You can have a single consumer thread, or you can safely publish the queue to another thread and use the new thread as consumer. This is generally required for correctness in any case, since you need to enforce the one consumer at a time requirement.

@MQ-EL

This comment has been minimized.

Copy link
Contributor

MQ-EL commented Mar 17, 2016

unfortunately, the producer publish the queue to the consumer in the akka (e.g. one actor tell the other), so it exists unsafe AFAICS.

@nitsanw

This comment has been minimized.

Copy link
Contributor

nitsanw commented Mar 17, 2016

@MQ-EL IIRC the 'actors' are submitted to ForkJoin to do their processing, that is safe publication.
Put it another way, how do you know that there's no 2 concurrent consumers?

@MQ-EL

This comment has been minimized.

Copy link
Contributor

MQ-EL commented Mar 17, 2016

@nitsanw

  final val Scheduled = 2 // Deliberately without type ascription to make it a compile-time constant
...
@inline
  final def currentStatus: Mailbox.Status = Unsafe.instance.getIntVolatile(this, AbstractMailbox.mailboxStatusOffset)

each actor's mailbox has a flag to describe whether it is in pool. the producer will load-store or load-load it.

But the receiver do not read this flag , no fence protected.

But when the msgs handled receiver actor up to throughput, it will resubmit itselft to Pool, which it is a pulisher, I think now it is "safe publish" for consumer node.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment