Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

add BlockedHealthBalncingPolicy #648

Open
wants to merge 1 commit into from

1 participant

@fbwotjq

confirm #647

in 1.0.3 version. When many request come to hecotor pool . and more than one node have problem(doesn't send reponse but node is alive).

1 . application did't have socket connection
and i try this commnad ==> netstat -na | grep | wc - l ==> result is 0

2 . Thread used hector pool will be blocked ... and if application or cassandra didn't recover status. many thread will be Time_wait status.

3 . and hector pool status : blocked : 400, active: 0 : idle: 0

this is thread dump. all most thread are ...

"[CASSANDRA_JOB_WORKER-2]thread-91" prio=10 tid=0x0000000041e2a000 nid=0x7e01 waiting on condition [0x00007f56f6c3b000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)

  • parking to wait for (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:196) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025) at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:340) at me.prettyprint.cassandra.connection.ConcurrentHClientPool.waitForConnection(ConcurrentHClientPool.java:114) at me.prettyprint.cassandra.connection.ConcurrentHClientPool.borrowClient(ConcurrentHClientPool.java:82) at me.prettyprint.cassandra.connection.HConnectionManager.operateWithFailover(HConnectionManager.java:238) at me.prettyprint.cassandra.service.KeyspaceServiceImpl.operateWithFailover(KeyspaceServiceImpl.java:131) at me.prettyprint.cassandra.service.KeyspaceServiceImpl.getSlice(KeyspaceServiceImpl.java:289) at me.prettyprint.cassandra.model.thrift.ThriftSliceQuery$1.doInKeyspace(ThriftSliceQuery.java:53) at me.prettyprint.cassandra.model.thrift.ThriftSliceQuery$1.doInKeyspace(ThriftSliceQuery.java:49) at me.prettyprint.cassandra.model.KeyspaceOperationCallback.doInKeyspaceAndMeasure(KeyspaceOperationCallback.java:20) at me.prettyprint.cassandra.model.ExecutingKeyspace.doExecute(ExecutingKeyspace.java:85) at me.prettyprint.cassandra.model.thrift.ThriftSliceQuery.execute(ThriftSliceQuery.java:48) at com.xxx.xxx.xxxxxx.cassandra.dao.xxxxxxxxx.xxxxxxxxxxxxxxxxxxDaoCassandra.select(xxxxxxxxxxxxxxxxxxDaoCassandra.java:181) at com.xxx.xxx.xxxxxxx.xxxxxxxxxxxxxxxxxxxxx.xxxxxxxxxxxxxxxxxxxxxx(xxxxxxxxxxxxxxxxxx.java:449) at com.xxx.xxx.xxxxx.xxxxxxxxxxxxx.xxxxxxxxxxxxxxxxx(xxxxxxxxxxxxxxxx.java:463) at com.xxx.xxx.xxxxx.xxxxxxxxxxxxx.access$3400(xxxxxxxxxxxxxxxx.java:97) at com.xxx.xxx.xxxxx.xxxxxxxxxxxxx$xxxxxxxxxxxxxxxxxxxx.run(QueueManager.java:1506) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:662) "[CASSANDRA_JOB_WORKER-2]thread-90" prio=10 tid=0x0000000041e2a000 nid=0x7e01 waiting on condition [0x00007f56f6c3b000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method)
  • parking to wait for (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:196) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025) at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:340) at me.prettyprint.cassandra.connection.ConcurrentHClientPool.waitForConnection(ConcurrentHClientPool.java:114) at me.prettyprint.cassandra.connection.ConcurrentHClientPool.borrowClient(ConcurrentHClientPool.java:82) at me.prettyprint.cassandra.connection.HConnectionManager.operateWithFailover(HConnectionManager.java:238) at me.prettyprint.cassandra.service.KeyspaceServiceImpl.operateWithFailover(KeyspaceServiceImpl.java:131) at me.prettyprint.cassandra.service.KeyspaceServiceImpl.getSlice(KeyspaceServiceImpl.java:289) at me.prettyprint.cassandra.model.thrift.ThriftSliceQuery$1.doInKeyspace(ThriftSliceQuery.java:53) at me.prettyprint.cassandra.model.thrift.ThriftSliceQuery$1.doInKeyspace(ThriftSliceQuery.java:49) at me.prettyprint.cassandra.model.KeyspaceOperationCallback.doInKeyspaceAndMeasure(KeyspaceOperationCallback.java:20) at me.prettyprint.cassandra.model.ExecutingKeyspace.doExecute(ExecutingKeyspace.java:85) at me.prettyprint.cassandra.model.thrift.ThriftSliceQuery.execute(ThriftSliceQuery.java:48) at com.xxx.xxx.xxxxxx.cassandra.dao.xxxxxxxxx.xxxxxxxxxxxxxxxxxxDaoCassandra.select(xxxxxxxxxxxxxxxxxxDaoCassandra.java:181) at com.xxx.xxx.xxxxxxx.xxxxxxxxxxxxxxxxxxxxx.xxxxxxxxxxxxxxxxxxxxxx(xxxxxxxxxxxxxxxxxx.java:449) at com.xxx.xxx.xxxxx.xxxxxxxxxxxxx.xxxxxxxxxxxxxxxxx(xxxxxxxxxxxxxxxx.java:463) at com.xxx.xxx.xxxxx.xxxxxxxxxxxxx.access$3400(xxxxxxxxxxxxxxxx.java:97) at com.xxx.xxx.xxxxx.xxxxxxxxxxxxx$xxxxxxxxxxxxxxxxxxxx.run(QueueManager.java:1506) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:662) "[CASSANDRA_JOB_WORKER-2]thread-89" prio=10 tid=0x0000000041e2a000 nid=0x7e01 waiting on condition [0x00007f56f6c3b000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method)
  • parking to wait for (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:196) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025) at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:340) at me.prettyprint.cassandra.connection.ConcurrentHClientPool.waitForConnection(ConcurrentHClientPool.java:114) at me.prettyprint.cassandra.connection.ConcurrentHClientPool.borrowClient(ConcurrentHClientPool.java:82) at me.prettyprint.cassandra.connection.HConnectionManager.operateWithFailover(HConnectionManager.java:238) at me.prettyprint.cassandra.service.KeyspaceServiceImpl.operateWithFailover(KeyspaceServiceImpl.java:131) at me.prettyprint.cassandra.service.KeyspaceServiceImpl.getSlice(KeyspaceServiceImpl.java:289) at me.prettyprint.cassandra.model.thrift.ThriftSliceQuery$1.doInKeyspace(ThriftSliceQuery.java:53) at me.prettyprint.cassandra.model.thrift.ThriftSliceQuery$1.doInKeyspace(ThriftSliceQuery.java:49) at me.prettyprint.cassandra.model.KeyspaceOperationCallback.doInKeyspaceAndMeasure(KeyspaceOperationCallback.java:20) at me.prettyprint.cassandra.model.ExecutingKeyspace.doExecute(ExecutingKeyspace.java:85) at me.prettyprint.cassandra.model.thrift.ThriftSliceQuery.execute(ThriftSliceQuery.java:48) at com.xxx.xxx.xxxxxx.cassandra.dao.xxxxxxxxx.xxxxxxxxxxxxxxxxxxDaoCassandra.select(xxxxxxxxxxxxxxxxxxDaoCassandra.java:181) at com.xxx.xxx.xxxxxxx.xxxxxxxxxxxxxxxxxxxxx.xxxxxxxxxxxxxxxxxxxxxx(xxxxxxxxxxxxxxxxxx.java:449) at com.xxx.xxx.xxxxx.xxxxxxxxxxxxx.xxxxxxxxxxxxxxxxx(xxxxxxxxxxxxxxxx.java:463) at com.xxx.xxx.xxxxx.xxxxxxxxxxxxx.access$3400(xxxxxxxxxxxxxxxx.java:97) at com.xxx.xxx.xxxxx.xxxxxxxxxxxxx$xxxxxxxxxxxxxxxxxxxx.run(QueueManager.java:1506) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:662) "[CASSANDRA_JOB_WORKER-2]thread-87" prio=10 tid=0x0000000041e2a000 nid=0x7e01 waiting on condition [0x00007f56f6c3b000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method)
  • parking to wait for (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:196) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025) at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:340) at me.prettyprint.cassandra.connection.ConcurrentHClientPool.waitForConnection(ConcurrentHClientPool.java:114) at me.prettyprint.cassandra.connection.ConcurrentHClientPool.borrowClient(ConcurrentHClientPool.java:82) at me.prettyprint.cassandra.connection.HConnectionManager.operateWithFailover(HConnectionManager.java:238) at me.prettyprint.cassandra.service.KeyspaceServiceImpl.operateWithFailover(KeyspaceServiceImpl.java:131) at me.prettyprint.cassandra.service.KeyspaceServiceImpl.getSlice(KeyspaceServiceImpl.java:289) at me.prettyprint.cassandra.model.thrift.ThriftSliceQuery$1.doInKeyspace(ThriftSliceQuery.java:53) at me.prettyprint.cassandra.model.thrift.ThriftSliceQuery$1.doInKeyspace(ThriftSliceQuery.java:49) at me.prettyprint.cassandra.model.KeyspaceOperationCallback.doInKeyspaceAndMeasure(KeyspaceOperationCallback.java:20) at me.prettyprint.cassandra.model.ExecutingKeyspace.doExecute(ExecutingKeyspace.java:85) at me.prettyprint.cassandra.model.thrift.ThriftSliceQuery.execute(ThriftSliceQuery.java:48) at com.xxx.xxx.xxxxxx.cassandra.dao.xxxxxxxxx.xxxxxxxxxxxxxxxxxxDaoCassandra.select(xxxxxxxxxxxxxxxxxxDaoCassandra.java:181) at com.xxx.xxx.xxxxxxx.xxxxxxxxxxxxxxxxxxxxx.xxxxxxxxxxxxxxxxxxxxxx(xxxxxxxxxxxxxxxxxx.java:449) at com.xxx.xxx.xxxxx.xxxxxxxxxxxxx.xxxxxxxxxxxxxxxxx(xxxxxxxxxxxxxxxx.java:463) at com.xxx.xxx.xxxxx.xxxxxxxxxxxxx.access$3400(xxxxxxxxxxxxxxxx.java:97) at com.xxx.xxx.xxxxx.xxxxxxxxxxxxx$xxxxxxxxxxxxxxxxxxxx.run(QueueManager.java:1506) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:662) "[CASSANDRA_JOB_WORKER-2]thread-86" prio=10 tid=0x0000000041e2a000 nid=0x7e01 waiting on condition [0x00007f56f6c3b000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method)
  • parking to wait for (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:196) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025) at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:340) at me.prettyprint.cassandra.connection.ConcurrentHClientPool.waitForConnection(ConcurrentHClientPool.java:114) at me.prettyprint.cassandra.connection.ConcurrentHClientPool.borrowClient(ConcurrentHClientPool.java:82) at me.prettyprint.cassandra.connection.HConnectionManager.operateWithFailover(HConnectionManager.java:238) at me.prettyprint.cassandra.service.KeyspaceServiceImpl.operateWithFailover(KeyspaceServiceImpl.java:131) at me.prettyprint.cassandra.service.KeyspaceServiceImpl.getSlice(KeyspaceServiceImpl.java:289) at me.prettyprint.cassandra.model.thrift.ThriftSliceQuery$1.doInKeyspace(ThriftSliceQuery.java:53) at me.prettyprint.cassandra.model.thrift.ThriftSliceQuery$1.doInKeyspace(ThriftSliceQuery.java:49) at me.prettyprint.cassandra.model.KeyspaceOperationCallback.doInKeyspaceAndMeasure(KeyspaceOperationCallback.java:20) at me.prettyprint.cassandra.model.ExecutingKeyspace.doExecute(ExecutingKeyspace.java:85) at me.prettyprint.cassandra.model.thrift.ThriftSliceQuery.execute(ThriftSliceQuery.java:48) at com.xxx.xxx.xxxxxx.cassandra.dao.xxxxxxxxx.xxxxxxxxxxxxxxxxxxDaoCassandra.select(xxxxxxxxxxxxxxxxxxDaoCassandra.java:181) at com.xxx.xxx.xxxxxxx.xxxxxxxxxxxxxxxxxxxxx.xxxxxxxxxxxxxxxxxxxxxx(xxxxxxxxxxxxxxxxxx.java:449) at com.xxx.xxx.xxxxx.xxxxxxxxxxxxx.xxxxxxxxxxxxxxxxx(xxxxxxxxxxxxxxxx.java:463) at com.xxx.xxx.xxxxx.xxxxxxxxxxxxx.access$3400(xxxxxxxxxxxxxxxx.java:97) at com.xxx.xxx.xxxxx.xxxxxxxxxxxxx$xxxxxxxxxxxxxxxxxxxx.run(QueueManager.java:1506) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:662)

so. i guess that my application lose connection socket. but it didn't recover connection in many reason.
so i think. how about make new routing policy. standard is Blocked/client count

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Jan 15, 2014
  1. @fbwotjq

    #647 code

    fbwotjq authored
This page is out of date. Refresh to see the latest.
View
72 core/src/main/java/me/prettyprint/cassandra/connection/BlockedHealthBalncingPolicy
@@ -0,0 +1,72 @@
+package me.prettyprint.cassandra.connection;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import me.prettyprint.cassandra.connection.factory.HClientFactory;
+import me.prettyprint.cassandra.service.CassandraHost;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+public class BlockedHealthBalncingPolicy implements LoadBalancingPolicy {
+
+ private static final long serialVersionUID = 4216966823555634515L;
+ private static final Logger log = LoggerFactory.getLogger(BlockedHealthBalncingPolicy.class);
+ private static final int ZERO_VALUE = 0;
+
+ private final class FilteringCompare implements Comparator<HClientPool> {
+
+ public int compare(HClientPool o1, HClientPool o2) {
+
+ if ( log.isDebugEnabled() ) {
+ log.debug("comparing 1: {} / count {} / Blocked {} / availableClientQueue {} with 2: {} / count {} / Blocked {} / availableClientQueue {} ",
+ new Object[]{o1.getCassandraHost(), o1.getNumActive(), o1.getNumBlockedThreads(), o1.getNumIdle(),
+ o2.getCassandraHost(), o2.getNumActive(), o2.getNumBlockedThreads(), o2.getNumIdle()});
+ }
+
+ if(o2.getNumIdle() == ZERO_VALUE || o2.getNumIdle() == ZERO_VALUE){
+ log.warn("finding 0 hector pool {}:{} /{}:{}",
+ new Object[]{o1.getCassandraHost(), o1.getNumIdle(), o2.getCassandraHost(), o2.getNumIdle()});
+ }
+
+ if(o1.getNumBlockedThreads() != o2.getNumBlockedThreads()){
+ return o1.getNumBlockedThreads() - o2.getNumBlockedThreads();
+ } else {
+ return o2.getNumIdle() - o1.getNumIdle();
+ }
+ }
+ }
+
+ public HClientPool getPool(Collection<HClientPool> pools, Set<CassandraHost> excludeHosts) {
+
+ List<HClientPool> hClientPoolList = Lists.newArrayList(pools);
+ Collections.shuffle(hClientPoolList);
+ Collections.sort(hClientPoolList, new FilteringCompare());
+
+ Iterator<HClientPool> iterator = hClientPoolList.iterator();
+ HClientPool concurrentHClientPool = iterator.next();
+
+ if ( excludeHosts != null && excludeHosts.size() > 0 ) {
+ while (iterator.hasNext()) {
+ if ( !excludeHosts.contains(concurrentHClientPool.getCassandraHost()) ) {
+ break;
+ }
+ concurrentHClientPool = (ConcurrentHClientPool) iterator.next();
+ }
+ }
+
+ return concurrentHClientPool;
+ }
+
+ public HClientPool createConnection(HClientFactory clientFactory, CassandraHost host) {
+ return new ConcurrentHClientPool(clientFactory, host);
+ }
+
+}
Something went wrong with that request. Please try again.