Skip to content
Permalink
Browse files

Merge pull request #227 from franz1981/improved_blocking_mp

Improved ParkTakeStrategy
  • Loading branch information...
nitsanw committed Apr 6, 2019
2 parents 58a08a1 + 1597914 commit 49e23552621135ebddfc61d8faa88d7566b4c682
@@ -3,32 +3,27 @@
import org.jctools.queues.spec.ConcurrentQueueSpec;

import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

public final class McParkTakeStrategy<E> implements TakeStrategy<E>
{
private final ReentrantLock lock = new ReentrantLock();
private final Condition cond = lock.newCondition();

private int waiters = 0;
private static final AtomicLongFieldUpdater<McParkTakeStrategy> WAITERS_UPDATER = AtomicLongFieldUpdater.newUpdater(McParkTakeStrategy.class, "waiters");
private volatile long waiters = 0;
private final Object obj = new Object();

@Override
public void signal()
{
ReentrantLock l = lock;
l.lock();
try
if (waiters > 0)
{
if (waiters>0)
synchronized (obj)
{
cond.signal();
if (waiters > 0)
{
obj.notify();
}
}
}
finally
{
l.unlock();
}
}

@Override
@@ -40,20 +35,14 @@ public E waitPoll(Queue<E> q) throws InterruptedException
return e;
}

ReentrantLock l = lock;
l.lock();
try
WAITERS_UPDATER.incrementAndGet(this);
synchronized (obj)
{
while((e = q.poll())==null)
while ((e = q.poll()) == null)
{
waiters++;
cond.await();
waiters--;
obj.wait();
}
}
finally
{
l.unlock();
WAITERS_UPDATER.decrementAndGet(this);
}

return e;
@@ -3,21 +3,25 @@
import org.jctools.queues.spec.ConcurrentQueueSpec;

import java.util.Queue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.LockSupport;

public final class ScParkTakeStrategy<E> implements TakeStrategy<E> {

private static final AtomicReferenceFieldUpdater<ScParkTakeStrategy, Thread> WAITING_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(ScParkTakeStrategy.class, Thread.class, "waiting");

public volatile int storeFence = 0;

private AtomicReference<Thread> t = new AtomicReference<Thread>(null);
private volatile Thread waiting = null;

@Override
public void signal() {
public void signal()
{
// Make sure the offer is visible before unpark
storeFence = 1; // store load barrier

LockSupport.unpark(t.get()); // t.get() load barrier
LockSupport.unpark(waiting); // t.get() load barrier
}

@Override
@@ -28,7 +32,7 @@ public E waitPoll(Queue<E> q) throws InterruptedException {
}

Thread currentThread = Thread.currentThread();
t.set(currentThread);
waiting = currentThread;

while ((e = q.poll()) == null) {
LockSupport.park();
@@ -38,7 +42,7 @@ public E waitPoll(Queue<E> q) throws InterruptedException {
}
}

t.lazySet(null);
WAITING_UPDATER.lazySet(this, null);

return e;
}

0 comments on commit 49e2355

Please sign in to comment.
You can’t perform that action at this time.