Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Minor refactor.
  • Loading branch information
brettwooldridge committed Jun 11, 2015
1 parent cb50434 commit 1bb1f60
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 45 deletions.
49 changes: 11 additions & 38 deletions src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java
Expand Up @@ -26,7 +26,6 @@
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedLongSynchronizer;


import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -56,12 +55,11 @@ public class ConcurrentBag<T extends IConcurrentBagEntry> implements AutoCloseab
{ {
private static final Logger LOGGER = LoggerFactory.getLogger(ConcurrentBag.class); private static final Logger LOGGER = LoggerFactory.getLogger(ConcurrentBag.class);


private final AbstractQueuedLongSynchronizer synchronizer; private final QueuedSequenceSynchronizer synchronizer;
private final CopyOnWriteArrayList<T> sharedList; private final CopyOnWriteArrayList<T> sharedList;
private final boolean weakThreadLocals; private final boolean weakThreadLocals;


private final ThreadLocal<List> threadList; private final ThreadLocal<List> threadList;
private final Sequence sequence;
private final IBagStateListener listener; private final IBagStateListener listener;
private volatile boolean closed; private volatile boolean closed;


Expand All @@ -77,8 +75,7 @@ public ConcurrentBag(IBagStateListener listener)
this.weakThreadLocals = useWeakThreadLocals(); this.weakThreadLocals = useWeakThreadLocals();


this.sharedList = new CopyOnWriteArrayList<>(); this.sharedList = new CopyOnWriteArrayList<>();
this.synchronizer = new Synchronizer(); this.synchronizer = new QueuedSequenceSynchronizer();
this.sequence = Sequence.Factory.create();
if (weakThreadLocals) { if (weakThreadLocals) {
this.threadList = new ThreadLocal<>(); this.threadList = new ThreadLocal<>();
} }
Expand Down Expand Up @@ -126,27 +123,27 @@ public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedExcepti
Future<Boolean> addItemFuture = null; Future<Boolean> addItemFuture = null;
final long startScan = System.nanoTime(); final long startScan = System.nanoTime();
final long originTimeout = timeout; final long originTimeout = timeout;
long startSeq = 0; // 0 intentionally causes tryAcquireSharedNanos() to fall-thru in the first iteration long startSeq;
try { try {
while (timeout > 1000L && synchronizer.tryAcquireSharedNanos(startSeq, timeout)) { do {
do { do {
startSeq = sequence.get(); startSeq = synchronizer.getSequence();
for (final T bagEntry : sharedList) { for (final T bagEntry : sharedList) {
if (bagEntry.state().compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) { if (bagEntry.state().compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
return bagEntry; return bagEntry;
} }
} }
} while (startSeq < sequence.get()); } while (startSeq < synchronizer.getSequence());


if (addItemFuture == null || addItemFuture.isDone()) { if (addItemFuture == null || addItemFuture.isDone()) {
addItemFuture = listener.addBagItem(); addItemFuture = listener.addBagItem();
} }


timeout = originTimeout - (System.nanoTime() - startScan); timeout = originTimeout - (System.nanoTime() - startScan);
} } while (timeout > 1000L && synchronizer.waitUntilThresholdExceeded(startSeq, timeout));
} }
finally { finally {
synchronizer.releaseShared(1); synchronizer.increment();
} }


return null; return null;
Expand All @@ -170,8 +167,7 @@ public void requite(final T bagEntry)
threadLocalList.add((weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry)); threadLocalList.add((weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry));
} }


sequence.increment(); synchronizer.increment();
synchronizer.releaseShared(1);
} }
else { else {
LOGGER.warn("Attempt to remove an object from the bag that does not exist: {}", bagEntry); LOGGER.warn("Attempt to remove an object from the bag that does not exist: {}", bagEntry);
Expand All @@ -191,8 +187,7 @@ public void add(final T bagEntry)
} }


sharedList.add(bagEntry); sharedList.add(bagEntry);
sequence.increment(); synchronizer.increment();
synchronizer.releaseShared(1);
} }


/** /**
Expand Down Expand Up @@ -287,9 +282,8 @@ public boolean reserve(final T bagEntry)
*/ */
public void unreserve(final T bagEntry) public void unreserve(final T bagEntry)
{ {
sequence.increment();
if (bagEntry.state().compareAndSet(STATE_RESERVED, STATE_NOT_IN_USE)) { if (bagEntry.state().compareAndSet(STATE_RESERVED, STATE_NOT_IN_USE)) {
synchronizer.releaseShared(1); synchronizer.increment();
} }
else { else {
LOGGER.warn("Attempt to relinquish an object to the bag that was not reserved: {}", bagEntry); LOGGER.warn("Attempt to relinquish an object to the bag that was not reserved: {}", bagEntry);
Expand Down Expand Up @@ -361,25 +355,4 @@ private boolean useWeakThreadLocals()
return true; return true;
} }
} }

/**
* Our private synchronizer that handles notify/wait type semantics.
*/
private final class Synchronizer extends AbstractQueuedLongSynchronizer
{
private static final long serialVersionUID = 104753538004341218L;

@Override
protected long tryAcquireShared(final long seq)
{
return sequence.get() - (seq + 1) < 0 ? -1L : 0L;
}

/** {@inheritDoc} */
@Override
protected boolean tryReleaseShared(final long unreliableSequence)
{
return true;
}
}
} }
133 changes: 133 additions & 0 deletions src/main/java/com/zaxxer/hikari/util/QueuedSequenceSynchronizer.java
@@ -0,0 +1,133 @@
/*
* Copyright (C) 2015 Brett Wooldridge
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.zaxxer.hikari.util;

import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.AbstractQueuedLongSynchronizer;

/**
* A specialized "lock" class that permits resource tracking through the
* use of a monotonically-increasing long sequence. When a shared resource
* becomes available the {@link #increment()} method should be called.
* <p>
* A thread wishing to acquire a resource should obtain the current sequence
* from the {@link #getSequence()} method before calling {@link #waitUntilThresholdExceeded(long, long)}
* with that sequence. Upon receiving a <code>true</code> result from
* {@link #waitUntilThresholdExceeded(long, long)}, the current sequence
* should again be obtained from the {@link #getSequence()} method, and an
* attempt to acquire the resource should be made. If the shared resource cannot
* be acquired, the thread should again call {@link #waitUntilThresholdExceeded(long, long)}
* with the previously obtained sequence.
* <p>
* When running on Java 8 and above, this class leverages the fact that when {@link LongAdder}
* is monotonically increasing, and only {@link LongAdder#increment()} and {@link LongAdder#sum()}
* are used, it can be relied on to be Sequentially Consistent.
*
* @see <a href="http://en.wikipedia.org/wiki/Sequential_consistency">Java Spec</a>
* @author Brett Wooldridge
*/
public final class QueuedSequenceSynchronizer
{
private final Sequence sequence;
private final Synchronizer synchronizer;

/**
* Default constructor
*/
public QueuedSequenceSynchronizer()
{
this.synchronizer = new Synchronizer();
this.sequence = Sequence.Factory.create();
}

/**
* Increment the sequence by the specified number, which must be greater than zero.
*
* @param delta the number to increase the sequence by, greater than 0
*/
public void increment()
{
synchronizer.releaseShared(1);
}

/**
* Get the current sequence.
*
* @return the current sequence
*/
public long getSequence()
{
return sequence.get();
}

/**
* Block the current thread until the current sequence exceeds the specified threshold, or
* until the specified timeout is reached.
*
* @param threshold the threshold the sequence must reach before this thread becomes unblocked
* @param nanosTimeout a nanosecond timeout specifying the maximum time to wait
* @return true if the threshold was reached, false if the wait timed out
* @throws InterruptedException if the thread is interrupted while waiting
*/
public boolean waitUntilThresholdExceeded(long threshold, long nanosTimeout) throws InterruptedException
{
return synchronizer.tryAcquireSharedNanos(threshold, nanosTimeout);
}

/**
* Queries whether any threads are waiting to for the sequence to reach a particular threshold.
*
* @return true if there may be other threads waiting for a sequence threshold to be reached
*/
public boolean hasQueuedThreads()
{
return synchronizer.hasQueuedThreads();
}

/**
* Returns an estimate of the number of threads waiting for a sequence threshold to be reached. The
* value is only an estimate because the number of threads may change dynamically while this method
* traverses internal data structures. This method is designed for use in monitoring system state,
* not for synchronization control.
*
* @return the estimated number of threads waiting for a sequence threshold to be reached
*/
public int getQueueLength()
{
return synchronizer.getQueueLength();
}

private final class Synchronizer extends AbstractQueuedLongSynchronizer
{
private static final long serialVersionUID = 104753538004341218L;

/** {@inheritDoc} */
@Override
protected long tryAcquireShared(final long seq)
{
return sequence.get() - (seq + 1) < 0 ? -1L : 0L;
}

/** {@inheritDoc} */
@Override
protected boolean tryReleaseShared(final long unused)
{
sequence.increment();
return true;
}
}
}
12 changes: 5 additions & 7 deletions src/main/java/com/zaxxer/hikari/util/Sequence.java
Expand Up @@ -19,7 +19,6 @@
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.atomic.LongAdder;



/** /**
* A monotonically increasing long sequence. * A monotonically increasing long sequence.
* *
Expand All @@ -28,13 +27,15 @@
public interface Sequence public interface Sequence
{ {
/** /**
* Increment the sequence. * Adds the given value to the current sequence. If delta is negative,
* the Sequential Consistency of this Sequence cannot be guarenteed.
*
* @param delta the value to add
*/ */
void increment(); void increment();


/** /**
* Get the current sequence (imprecise on Java 8, but it doesn't matter for * Get the current sequence.
* our purposes).
* *
* @return the current sequence. * @return the current sequence.
*/ */
Expand All @@ -50,18 +51,15 @@ public static Sequence create()
{ {
class Java7Sequence extends AtomicLong implements Sequence { class Java7Sequence extends AtomicLong implements Sequence {
Java7Sequence() { Java7Sequence() {
super(1);
} }


@Override
public void increment() { public void increment() {
this.incrementAndGet(); this.incrementAndGet();
} }
} }


class Java8Sequence extends LongAdder implements Sequence { class Java8Sequence extends LongAdder implements Sequence {
public Java8Sequence() { public Java8Sequence() {
this.increment();
} }


@Override @Override
Expand Down

0 comments on commit 1bb1f60

Please sign in to comment.