Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
The list of events is accessed by multiple threads so use a thread safe Deque implementation.
Fix a Javadoc comment
Simplify the code a little

git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@1703151 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
markt-asf committed Sep 15, 2015
1 parent db6a416 commit e1c3c37
Showing 1 changed file with 23 additions and 12 deletions.
35 changes: 23 additions & 12 deletions java/org/apache/catalina/tribes/transport/nio/NioReceiver.java
Expand Up @@ -27,9 +27,10 @@
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Deque;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.catalina.tribes.io.ObjectReader;
Expand All @@ -56,7 +57,7 @@ public class NioReceiver extends ReceiverBase implements Runnable {
private ServerSocketChannel serverChannel = null;
private DatagramChannel datagramChannel = null;

protected final LinkedList<Runnable> events = new LinkedList<>();
protected final Deque<Runnable> events = new ConcurrentLinkedDeque<>();

public NioReceiver() {
}
Expand All @@ -68,8 +69,10 @@ public void stop() {
}

/**
* start cluster receiver
* @throws IOException
* Start cluster receiver.
*
* @throws IOException If the receiver fails to start
*
* @see org.apache.catalina.tribes.ChannelReceiver#start()
*/
@Override
Expand Down Expand Up @@ -141,25 +144,33 @@ private void configureDatagraChannel() throws IOException {

public void addEvent(Runnable event) {
Selector selector = this.selector.get();
if ( selector != null ) {
if (selector != null) {
synchronized (events) {
events.add(event);
}
if ( log.isTraceEnabled() ) log.trace("Adding event to selector:"+event);
if ( isListening() ) selector.wakeup();
if (log.isTraceEnabled()) {
log.trace("Adding event to selector:" + event);
}
if (isListening()) {
selector.wakeup();
}
}
}

public void events() {
if ( events.size() == 0 ) return;
if (events.size() == 0) {
return;
}
synchronized (events) {
Runnable r = null;
while ( (events.size() > 0) && (r = events.removeFirst()) != null ) {
while ((r = events.pollFirst()) != null ) {
try {
if ( log.isTraceEnabled() ) log.trace("Processing event in selector:"+r);
if (log.isTraceEnabled()) {
log.trace("Processing event in selector:" + r);
}
r.run();
} catch ( Exception x ) {
log.error("",x);
} catch (Exception x) {
log.error("", x);
}
}
events.clear();
Expand Down

0 comments on commit e1c3c37

Please sign in to comment.