Skip to content

Commit

Permalink
Merge pull request #1284 from benjchristensen/util-with-mpsc
Browse files Browse the repository at this point in the history
Manual merge of Lock-free, MPSC-queue based
  • Loading branch information
benjchristensen committed May 29, 2014
2 parents 798fa7e + 5b5d99f commit f5b02fa
Show file tree
Hide file tree
Showing 3 changed files with 188 additions and 0 deletions.
129 changes: 129 additions & 0 deletions rxjava-core/src/main/java/rx/internal/util/MpscPaddedQueue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package rx.internal.util;

import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

/**
* A multiple-producer single consumer queue implementation with padded reference
* to tail to avoid cache-line thrashing.
* Based on Netty's <a href='https://github.com/netty/netty/blob/master/common/src/main/java/io/netty/util/internal/MpscLinkedQueue.java'>MpscQueue implementation</a> but using AtomicReferenceFieldUpdater
* instead of Unsafe.
* @param <E> the element type
*/
public final class MpscPaddedQueue<E> extends AtomicReference<MpscPaddedQueue.Node<E>> {
@SuppressWarnings(value = "rawtypes")
static final AtomicReferenceFieldUpdater<PaddedNode, Node> TAIL_UPDATER = AtomicReferenceFieldUpdater.newUpdater(PaddedNode.class, Node.class, "tail");
/** */
private static final long serialVersionUID = 1L;
/** The padded tail reference. */
final PaddedNode<E> tail;
/**
* Initializes the empty queue.
*/
public MpscPaddedQueue() {
Node<E> first = new Node<E>(null);
tail = new PaddedNode<E>();
tail.tail = first;
set(first);
}
/**
* Offer a new value.
* @param v the value to offer
*/
public void offer(E v) {
Node<E> n = new Node<E>(v);
getAndSet(n).set(n);
}

/**
* @return Poll a value from the head of the queue or return null if the queue is empty.
*/
public E poll() {
Node<E> n = peekNode();
if (n == null) {
return null;
}
E v = n.value;
n.value = null; // do not retain this value as the node still stays in the queue
TAIL_UPDATER.lazySet(tail, n);
return v;
}
/**
* Check if there is a node available without changing anything.
*/
private Node<E> peekNode() {
for (;;) {
@SuppressWarnings(value = "unchecked")
Node<E> t = TAIL_UPDATER.get(tail);
Node<E> n = t.get();
if (n != null || get() == t) {
return n;
}
}
}
/**
* Clears the queue.
*/
public void clear() {
for (;;) {
if (poll() == null) {
break;
}
}
}
/** Class that contains a Node reference padded around to fit a typical cache line. */
static final class PaddedNode<E> {
/** Padding, public to prevent optimizing it away. */
public int p1;
volatile Node<E> tail;
/** Padding, public to prevent optimizing it away. */
public long p2;
/** Padding, public to prevent optimizing it away. */
public long p3;
/** Padding, public to prevent optimizing it away. */
public long p4;
/** Padding, public to prevent optimizing it away. */
public long p5;
/** Padding, public to prevent optimizing it away. */
public long p6;
}
/**
* Regular node with value and reference to the next node.
*/
static final class Node<E> {

E value;
@SuppressWarnings(value = "rawtypes")
static final AtomicReferenceFieldUpdater<Node, Node> TAIL_UPDATER = AtomicReferenceFieldUpdater.newUpdater(Node.class, Node.class, "tail");
volatile Node<E> tail;

public Node(E value) {
this.value = value;
}

public void set(Node<E> newTail) {
TAIL_UPDATER.lazySet(this, newTail);
}

@SuppressWarnings(value = "unchecked")
public Node<E> get() {
return TAIL_UPDATER.get(this);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package rx.internal.util;

import java.util.concurrent.atomic.AtomicInteger;

/**
* An AtomicInteger with extra fields to pad it out to fit a typical cache line.
*/
public final class PaddedAtomicInteger extends AtomicInteger {
private static final long serialVersionUID = 1L;
/** Padding, public to prevent optimizing it away. */
public int p1;
/** Padding, public to prevent optimizing it away. */
public int p2;
/** Padding, public to prevent optimizing it away. */
public int p3;
/** Padding, public to prevent optimizing it away. */
public int p4;
/** Padding, public to prevent optimizing it away. */
public int p5;
/** Padding, public to prevent optimizing it away. */
public int p6;
/** Padding, public to prevent optimizing it away. */
public int p7;
/** Padding, public to prevent optimizing it away. */
public int p8;
/** Padding, public to prevent optimizing it away. */
public int p9;
/** Padding, public to prevent optimizing it away. */
public int p10;
/** Padding, public to prevent optimizing it away. */
public int p11;
/** Padding, public to prevent optimizing it away. */
public int p12;
/** Padding, public to prevent optimizing it away. */
public int p13;
/** @return prevents optimizing away the fields, most likely. */
public int noopt() {
return p1 + p2 + p3 + p4 + p5 + p6 + p7 + p8 + p9 + p10 + p11 + p12 + p13;
}

}
3 changes: 3 additions & 0 deletions rxjava-core/src/main/java/rx/internal/util/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
This `rx.internal.*` package is for internal use only. Any code here can change at any time and is not considered part of the public API, even if the classes are `public` so as to be used from other packages within `rx.*`.

If you depend on these classes, your code may break in any future RxJava release, even if it's just a patch release (major.minor.patch).

0 comments on commit f5b02fa

Please sign in to comment.