From 15979144e06a5a812e72d010f854f078d1302304 Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Sun, 13 Jan 2019 21:06:22 +0100 Subject: [PATCH] Improved ParkTakeStrategy It includes 2 improvements for park take strategies: - single-consumer: uses field updater to avoid AtomicReference instance - multi-consumer: uses intrinsic locks to avoid garbage on contention and avoided locking while checking waiting threads --- .../queues/blocking/McParkTakeStrategy.java | 41 +++++++------------ .../queues/blocking/ScParkTakeStrategy.java | 16 +++++--- 2 files changed, 25 insertions(+), 32 deletions(-) diff --git a/jctools-experimental/src/main/java/org/jctools/queues/blocking/McParkTakeStrategy.java b/jctools-experimental/src/main/java/org/jctools/queues/blocking/McParkTakeStrategy.java index 6bcd3fbc..78443008 100644 --- a/jctools-experimental/src/main/java/org/jctools/queues/blocking/McParkTakeStrategy.java +++ b/jctools-experimental/src/main/java/org/jctools/queues/blocking/McParkTakeStrategy.java @@ -3,32 +3,27 @@ import org.jctools.queues.spec.ConcurrentQueueSpec; import java.util.Queue; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; public final class McParkTakeStrategy implements TakeStrategy { - private final ReentrantLock lock = new ReentrantLock(); - private final Condition cond = lock.newCondition(); - - private int waiters = 0; + private static final AtomicLongFieldUpdater WAITERS_UPDATER = AtomicLongFieldUpdater.newUpdater(McParkTakeStrategy.class, "waiters"); + private volatile long waiters = 0; + private final Object obj = new Object(); @Override public void signal() { - ReentrantLock l = lock; - l.lock(); - try + if (waiters > 0) { - if (waiters>0) + synchronized (obj) { - cond.signal(); + if (waiters > 0) + { + obj.notify(); + } } } - finally - { - l.unlock(); - } } @Override @@ -40,20 +35,14 @@ public E waitPoll(Queue q) throws InterruptedException return e; } - ReentrantLock l = lock; - l.lock(); - try + WAITERS_UPDATER.incrementAndGet(this); + synchronized (obj) { - while((e = q.poll())==null) + while ((e = q.poll()) == null) { - waiters++; - cond.await(); - waiters--; + obj.wait(); } - } - finally - { - l.unlock(); + WAITERS_UPDATER.decrementAndGet(this); } return e; diff --git a/jctools-experimental/src/main/java/org/jctools/queues/blocking/ScParkTakeStrategy.java b/jctools-experimental/src/main/java/org/jctools/queues/blocking/ScParkTakeStrategy.java index 8fd3290e..0b7c037c 100644 --- a/jctools-experimental/src/main/java/org/jctools/queues/blocking/ScParkTakeStrategy.java +++ b/jctools-experimental/src/main/java/org/jctools/queues/blocking/ScParkTakeStrategy.java @@ -3,21 +3,25 @@ import org.jctools.queues.spec.ConcurrentQueueSpec; import java.util.Queue; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.concurrent.locks.LockSupport; public final class ScParkTakeStrategy implements TakeStrategy { + private static final AtomicReferenceFieldUpdater WAITING_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(ScParkTakeStrategy.class, Thread.class, "waiting"); + public volatile int storeFence = 0; - private AtomicReference t = new AtomicReference(null); + private volatile Thread waiting = null; @Override - public void signal() { + public void signal() + { // Make sure the offer is visible before unpark storeFence = 1; // store load barrier - LockSupport.unpark(t.get()); // t.get() load barrier + LockSupport.unpark(waiting); // t.get() load barrier } @Override @@ -28,7 +32,7 @@ public E waitPoll(Queue q) throws InterruptedException { } Thread currentThread = Thread.currentThread(); - t.set(currentThread); + waiting = currentThread; while ((e = q.poll()) == null) { LockSupport.park(); @@ -38,7 +42,7 @@ public E waitPoll(Queue q) throws InterruptedException { } } - t.lazySet(null); + WAITING_UPDATER.lazySet(this, null); return e; }