Skip to content

Commit

Permalink
Fixed issue 179.
Browse files Browse the repository at this point in the history
  • Loading branch information
killme2008 committed Mar 17, 2012
1 parent 06df5f8 commit e154442
Showing 1 changed file with 21 additions and 3 deletions.
24 changes: 21 additions & 3 deletions src/main/java/net/rubyeye/xmemcached/impl/MemcachedHandler.java
Expand Up @@ -14,6 +14,8 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -59,6 +61,7 @@ public class MemcachedHandler extends HandlerAdapter {
private final MemcachedClient client;
private static final Logger log = LoggerFactory
.getLogger(MemcachedHandler.class);
private final ScheduledExecutorService scheduleExecutorService;

/**
* On receive message from memcached server
Expand Down Expand Up @@ -150,8 +153,12 @@ public final void onSessionClosed(Session session) {
*/
@Override
public void onSessionIdle(Session session) {
//checkHeartBeat(session);
}

private void checkHeartBeat(Session session) {
if (this.enableHeartBeat) {
log.debug("Session (%s) is idle,send heartbeat", session
log.debug("Check session (%s) is alive,send heartbeat", session
.getRemoteSocketAddress() == null ? "unknown" : SystemUtils
.getRawAddress(session.getRemoteSocketAddress())
+ ":" + session.getRemoteSocketAddress().getPort());
Expand All @@ -172,7 +179,6 @@ public void onSessionIdle(Session session) {
versionCommand, session));
}
}

}

private static final String HEART_BEAT_FAIL_COUNT_ATTR = "heartBeatFailCount";
Expand Down Expand Up @@ -250,21 +256,33 @@ protected void reconnect(MemcachedTCPSession session) {

public void stop() {
this.heartBeatThreadPool.shutdown();
this.scheduleExecutorService.shutdown();
}

final long HEARTBEAT_PERIOD=Long.parseLong(System.getProperty("xmemcached.heartbeat.period", "5000"));

public void start() {
int serverSize = this.client.getAvaliableServers().size();
this.heartBeatThreadPool = Executors
.newFixedThreadPool(serverSize == 0 ? Runtime.getRuntime()
.availableProcessors() : serverSize);
this.scheduleExecutorService.scheduleAtFixedRate(new Runnable() {
public void run() {
//check heartbeat
for(Session session:client.getConnector().getSessionSet()){
checkHeartBeat(session);
}

}
}, HEARTBEAT_PERIOD, HEARTBEAT_PERIOD, TimeUnit.MILLISECONDS);
}

public MemcachedHandler(MemcachedClient client) {
super();
this.client = client;
this.listener = new AuthMemcachedConnectListener();
this.statisticsHandler = new StatisticsHandler();

this.scheduleExecutorService=Executors.newSingleThreadScheduledExecutor();
}

}

0 comments on commit e154442

Please sign in to comment.