Skip to content

Commit

Permalink
MessageDispatch15Interceptor was used to add Java 5 features to Messa…
Browse files Browse the repository at this point in the history
…geDispatchInterceptor. Since the minimum Java version is now >=5 (and has been since Tomcat 6) there is no need for the separate implementation.

Merge the Java 5 features into MessageDispatchInterceptor and deprecate MessageDispatch15Interceptor

git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@1723368 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
markt-asf committed Jan 6, 2016
1 parent 2f7a894 commit 1adbf79
Show file tree
Hide file tree
Showing 10 changed files with 154 additions and 169 deletions.
4 changes: 2 additions & 2 deletions java/org/apache/catalina/ha/tcp/SimpleTcpCluster.java
Expand Up @@ -52,7 +52,7 @@
import org.apache.catalina.tribes.Member; import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.MembershipListener; import org.apache.catalina.tribes.MembershipListener;
import org.apache.catalina.tribes.group.GroupChannel; import org.apache.catalina.tribes.group.GroupChannel;
import org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor; import org.apache.catalina.tribes.group.interceptors.MessageDispatchInterceptor;
import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector; import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector;
import org.apache.catalina.util.LifecycleMBeanBase; import org.apache.catalina.util.LifecycleMBeanBase;
import org.apache.juli.logging.Log; import org.apache.juli.logging.Log;
Expand Down Expand Up @@ -564,7 +564,7 @@ protected void checkDefaults() {
if ( clusterDeployer != null ) clusterDeployer.setCluster(this); if ( clusterDeployer != null ) clusterDeployer.setCluster(this);
if ( channel == null ) channel = new GroupChannel(); if ( channel == null ) channel = new GroupChannel();
if ( channel instanceof GroupChannel && !((GroupChannel)channel).getInterceptors().hasNext()) { if ( channel instanceof GroupChannel && !((GroupChannel)channel).getInterceptors().hasNext()) {
channel.addInterceptor(new MessageDispatch15Interceptor()); channel.addInterceptor(new MessageDispatchInterceptor());
channel.addInterceptor(new TcpFailureDetector()); channel.addInterceptor(new TcpFailureDetector());
} }
if (heartbeatBackgroundEnabled) channel.setHeartbeat(false); if (heartbeatBackgroundEnabled) channel.setHeartbeat(false);
Expand Down
4 changes: 2 additions & 2 deletions java/org/apache/catalina/tribes/group/GroupChannel.java
Expand Up @@ -38,7 +38,7 @@
import org.apache.catalina.tribes.MembershipService; import org.apache.catalina.tribes.MembershipService;
import org.apache.catalina.tribes.RemoteProcessException; import org.apache.catalina.tribes.RemoteProcessException;
import org.apache.catalina.tribes.UniqueId; import org.apache.catalina.tribes.UniqueId;
import org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor; import org.apache.catalina.tribes.group.interceptors.MessageDispatchInterceptor;
import org.apache.catalina.tribes.io.BufferPool; import org.apache.catalina.tribes.io.BufferPool;
import org.apache.catalina.tribes.io.ChannelData; import org.apache.catalina.tribes.io.ChannelData;
import org.apache.catalina.tribes.io.XByteBuffer; import org.apache.catalina.tribes.io.XByteBuffer;
Expand Down Expand Up @@ -372,7 +372,7 @@ public void memberDisappeared(Member member) {
protected synchronized void setupDefaultStack() throws ChannelException { protected synchronized void setupDefaultStack() throws ChannelException {
if (getFirstInterceptor() != null && if (getFirstInterceptor() != null &&
((getFirstInterceptor().getNext() instanceof ChannelCoordinator))) { ((getFirstInterceptor().getNext() instanceof ChannelCoordinator))) {
addInterceptor(new MessageDispatch15Interceptor()); addInterceptor(new MessageDispatchInterceptor());
} }
} }


Expand Down
Expand Up @@ -16,106 +16,13 @@
*/ */
package org.apache.catalina.tribes.group.interceptors; package org.apache.catalina.tribes.group.interceptors;


import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.catalina.tribes.ChannelMessage;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.group.InterceptorPayload;
import org.apache.catalina.tribes.transport.bio.util.LinkObject;
import org.apache.catalina.tribes.util.ExecutorFactory;
import org.apache.catalina.tribes.util.TcclThreadFactory;

/** /**
* * @deprecated Originally provided an optional implementation that used Java 5+
* Same implementation as the MessageDispatchInterceptor * features. Now the minimum Java version is >=5, those features
* except it uses an atomic long for the currentSize calculation * have been added to {@link MessageDispatchInterceptor} which
* and uses a thread pool for message sending. * should be used instead. This class will be removed in Tomcat
* * 9.0.x onwards.
* @version 1.0
*/ */

@Deprecated
public class MessageDispatch15Interceptor extends MessageDispatchInterceptor { public class MessageDispatch15Interceptor extends MessageDispatchInterceptor {

protected final AtomicLong currentSize = new AtomicLong(0);
protected ExecutorService executor = null;
protected int maxThreads = 10;
protected int maxSpareThreads = 2;
protected long keepAliveTime = 5000;

@Override
public long getCurrentSize() {
return currentSize.get();
}

@Override
public long addAndGetCurrentSize(long inc) {
return currentSize.addAndGet(inc);
}

@Override
public long setAndGetCurrentSize(long value) {
currentSize.set(value);
return value;
}

@Override
public boolean addToQueue(ChannelMessage msg, Member[] destination, InterceptorPayload payload) {
final LinkObject obj = new LinkObject(msg,destination,payload);
Runnable r = new Runnable() {
@Override
public void run() {
sendAsyncData(obj);
}
};
executor.execute(r);
return true;
}

@Override
public LinkObject removeFromQueue() {
return null; //not used, thread pool contains its own queue.
}

@Override
public void startQueue() {
if ( run ) return;
executor = ExecutorFactory.newThreadPool(maxSpareThreads, maxThreads,
keepAliveTime, TimeUnit.MILLISECONDS,
new TcclThreadFactory("MessageDispatch15Interceptor.MessageDispatchThread"));
run = true;
}

@Override
public void stopQueue() {
run = false;
executor.shutdownNow();
setAndGetCurrentSize(0);
}

public long getKeepAliveTime() {
return keepAliveTime;
}

public int getMaxSpareThreads() {
return maxSpareThreads;
}

public int getMaxThreads() {
return maxThreads;
}

public void setKeepAliveTime(long keepAliveTime) {
this.keepAliveTime = keepAliveTime;
}

public void setMaxSpareThreads(int maxSpareThreads) {
this.maxSpareThreads = maxSpareThreads;
}

public void setMaxThreads(int maxThreads) {
this.maxThreads = maxThreads;
}

} }

0 comments on commit 1adbf79

Please sign in to comment.