Skip to content

Commit

Permalink
Merge 1597914 into 1e638a6
Browse files Browse the repository at this point in the history
  • Loading branch information
franz1981 committed Jan 13, 2019
2 parents 1e638a6 + 1597914 commit f96dc2d
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 32 deletions.
Expand Up @@ -3,32 +3,27 @@
import org.jctools.queues.spec.ConcurrentQueueSpec;

import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

public final class McParkTakeStrategy<E> implements TakeStrategy<E>
{
private final ReentrantLock lock = new ReentrantLock();
private final Condition cond = lock.newCondition();

private int waiters = 0;
private static final AtomicLongFieldUpdater<McParkTakeStrategy> WAITERS_UPDATER = AtomicLongFieldUpdater.newUpdater(McParkTakeStrategy.class, "waiters");
private volatile long waiters = 0;
private final Object obj = new Object();

@Override
public void signal()
{
ReentrantLock l = lock;
l.lock();
try
if (waiters > 0)
{
if (waiters>0)
synchronized (obj)
{
cond.signal();
if (waiters > 0)
{
obj.notify();
}
}
}
finally
{
l.unlock();
}
}

@Override
Expand All @@ -40,20 +35,14 @@ public E waitPoll(Queue<E> q) throws InterruptedException
return e;
}

ReentrantLock l = lock;
l.lock();
try
WAITERS_UPDATER.incrementAndGet(this);
synchronized (obj)
{
while((e = q.poll())==null)
while ((e = q.poll()) == null)
{
waiters++;
cond.await();
waiters--;
obj.wait();
}
}
finally
{
l.unlock();
WAITERS_UPDATER.decrementAndGet(this);
}

return e;
Expand Down
Expand Up @@ -3,21 +3,25 @@
import org.jctools.queues.spec.ConcurrentQueueSpec;

import java.util.Queue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.LockSupport;

public final class ScParkTakeStrategy<E> implements TakeStrategy<E> {

private static final AtomicReferenceFieldUpdater<ScParkTakeStrategy, Thread> WAITING_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(ScParkTakeStrategy.class, Thread.class, "waiting");

public volatile int storeFence = 0;

private AtomicReference<Thread> t = new AtomicReference<Thread>(null);
private volatile Thread waiting = null;

@Override
public void signal() {
public void signal()
{
// Make sure the offer is visible before unpark
storeFence = 1; // store load barrier

LockSupport.unpark(t.get()); // t.get() load barrier
LockSupport.unpark(waiting); // t.get() load barrier
}

@Override
Expand All @@ -28,7 +32,7 @@ public E waitPoll(Queue<E> q) throws InterruptedException {
}

Thread currentThread = Thread.currentThread();
t.set(currentThread);
waiting = currentThread;

while ((e = q.poll()) == null) {
LockSupport.park();
Expand All @@ -38,7 +42,7 @@ public E waitPoll(Queue<E> q) throws InterruptedException {
}
}

t.lazySet(null);
WAITING_UPDATER.lazySet(this, null);

return e;
}
Expand Down

0 comments on commit f96dc2d

Please sign in to comment.