Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consumer接收消息时导致CPU100% #23

Closed
klboke opened this issue Sep 18, 2019 · 7 comments
Closed

consumer接收消息时导致CPU100% #23

klboke opened this issue Sep 18, 2019 · 7 comments

Comments

@klboke
Copy link
Contributor

klboke commented Sep 18, 2019

系统:Windows10
运行环境:jdk8
开发工具:IDEA2019.1.3X64
初步定位到FetchTaskWorker 空转导致CPU 100% ,在Producer发送消息时,consumer接收消息后必现

   private class FetchTaskWorker implements Runnable {
        @Override
        public void run() {
            StringBuilder sBuilder = new StringBuilder(256);
            final Long curThreadId = Thread.currentThread().getId();
            fetchWorkerStatusMap.put(curThreadId, 0);
            while (!isShutdown()) {
                PartitionSelectResult partSelectResult = null;
                fetchWorkerStatusMap.put(curThreadId, 0);
                try {
                    if (isShutdown()) {
                        break;
                    }
                    fetchWorkerStatusMap.put(curThreadId, 1);
                    MessageFetchManager.this.pushConsumer.allowConsumeWait();
                    partSelectResult = MessageFetchManager.this.pushConsumer.getBaseConsumer().pushSelectPartition();
                    if (partSelectResult == null) {
                        continue;
                    }
                    Partition partition = partSelectResult.getPartition();
                    long usedToken = partSelectResult.getUsedToken();
                    boolean isLastConsumed = partSelectResult.isLastPackConsumed();
                    if (isShutdown()) {
                        MessageFetchManager.this.pushConsumer
                                .getBaseConsumer()
                                .pushReqReleasePartiton(partition.getPartitionKey(), usedToken, isLastConsumed);
                        partSelectResult = null;
                        break;
                    }
                    if (MessageFetchManager.this.pushConsumer.isConsumePaused()) {
                        boolean result = partSelectResult.isLastPackConsumed();
                        if (result) {
                            result =
                                    MessageFetchManager.this.pushConsumer
                                            .getBaseConsumer().flushLastRequest(partition);
                        }
                        MessageFetchManager.this.pushConsumer
                                .getBaseConsumer().pushReqReleasePartiton(partition.getPartitionKey(),
                                usedToken, result);
                        partSelectResult = null;
                        continue;
                    }
                } catch (Throwable e) {
                    if (partSelectResult != null) {
                        MessageFetchManager.this.pushConsumer
                                .getBaseConsumer()
                                .pushReqReleasePartiton(partSelectResult.getPartition().getPartitionKey(),
                                        partSelectResult.getUsedToken(), false);
                    }
                    sBuilder.delete(0, sBuilder.length());
                    logger.warn(sBuilder.append("Thread {} has been interrupted 3.")
                            .append(Thread.currentThread().getName()).toString());
                    sBuilder.delete(0, sBuilder.length());
                }
                fetchWorkerStatusMap.put(curThreadId, 2);
                MessageFetchManager.this.pushConsumer.processRequest(partSelectResult, sBuilder);
            }
            fetchWorkerStatusMap.remove(curThreadId);
        }
    }

image

@lizhiboo
Copy link
Contributor

能够提供一下cpu100%时的线程堆栈吗?可以使用jstack pid > thread.txt

@klboke
Copy link
Contributor Author

klboke commented Sep 18, 2019

"tube_single_netty_worker-9@1246" prio=5 tid=0x19 nid=NA runnable
  java.lang.Thread.State: RUNNABLE
	  at sun.nio.ch.WindowsSelectorImpl$SubSelector.poll0(WindowsSelectorImpl.java:-1)
	  at sun.nio.ch.WindowsSelectorImpl$SubSelector.poll(WindowsSelectorImpl.java:296)
	  at sun.nio.ch.WindowsSelectorImpl$SubSelector.access$400(WindowsSelectorImpl.java:278)
	  at sun.nio.ch.WindowsSelectorImpl.doSelect(WindowsSelectorImpl.java:159)
	  at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
	  - locked <0x655> (a sun.nio.ch.WindowsSelectorImpl)
	  - locked <0x656> (a java.util.Collections$UnmodifiableSet)
	  - locked <0x657> (a sun.nio.ch.Util$2)
	  at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
	  at org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
	  at org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:415)
	  at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
	  at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
	  at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
	  at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
	  at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
	  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	  at java.lang.Thread.run(Thread.java:745)

"tube_single_netty_worker-8@1245" prio=5 tid=0x18 nid=NA runnable
  java.lang.Thread.State: RUNNABLE
	  at sun.nio.ch.WindowsSelectorImpl$SubSelector.poll0(WindowsSelectorImpl.java:-1)
	  at sun.nio.ch.WindowsSelectorImpl$SubSelector.poll(WindowsSelectorImpl.java:296)
	  at sun.nio.ch.WindowsSelectorImpl$SubSelector.access$400(WindowsSelectorImpl.java:278)
	  at sun.nio.ch.WindowsSelectorImpl.doSelect(WindowsSelectorImpl.java:159)
	  at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
	  - locked <0x658> (a sun.nio.ch.WindowsSelectorImpl)
	  - locked <0x659> (a java.util.Collections$UnmodifiableSet)
	  - locked <0x65a> (a sun.nio.ch.Util$2)
	  at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
	  at org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
	  at org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:415)
	  at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
	  at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
	  at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
	  at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
	  at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
	  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	  at java.lang.Thread.run(Thread.java:745)

"tube_single_netty_worker-7@1244" prio=5 tid=0x17 nid=NA runnable
  java.lang.Thread.State: RUNNABLE
	  at sun.nio.ch.WindowsSelectorImpl$SubSelector.poll0(WindowsSelectorImpl.java:-1)
	  at sun.nio.ch.WindowsSelectorImpl$SubSelector.poll(WindowsSelectorImpl.java:296)
	  at sun.nio.ch.WindowsSelectorImpl$SubSelector.access$400(WindowsSelectorImpl.java:278)
	  at sun.nio.ch.WindowsSelectorImpl.doSelect(WindowsSelectorImpl.java:159)
	  at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
	  - locked <0x65b> (a sun.nio.ch.WindowsSelectorImpl)
	  - locked <0x65c> (a java.util.Collections$UnmodifiableSet)
	  - locked <0x65d> (a sun.nio.ch.Util$2)
	  at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
	  at org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
	  at org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:415)
	  at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
	  at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
	  at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
	  at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
	  at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
	  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	  at java.lang.Thread.run(Thread.java:745)

"tube_single_netty_worker-6@1243" prio=5 tid=0x16 nid=NA runnable
  java.lang.Thread.State: RUNNABLE
	  at sun.nio.ch.WindowsSelectorImpl$SubSelector.poll0(WindowsSelectorImpl.java:-1)
	  at sun.nio.ch.WindowsSelectorImpl$SubSelector.poll(WindowsSelectorImpl.java:296)
	  at sun.nio.ch.WindowsSelectorImpl$SubSelector.access$400(WindowsSelectorImpl.java:278)
	  at sun.nio.ch.WindowsSelectorImpl.doSelect(WindowsSelectorImpl.java:159)
	  at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
	  - locked <0x65e> (a sun.nio.ch.WindowsSelectorImpl)
	  - locked <0x65f> (a java.util.Collections$UnmodifiableSet)
	  - locked <0x660> (a sun.nio.ch.Util$2)
	  at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
	  at org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
	  at org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:415)
	  at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
	  at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
	  at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
	  at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
	  at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
	  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	  at java.lang.Thread.run(Thread.java:745)

"tube_single_netty_worker-5@1242" prio=5 tid=0x15 nid=NA runnable
  java.lang.Thread.State: RUNNABLE
	  at sun.nio.ch.WindowsSelectorImpl$SubSelector.poll0(WindowsSelectorImpl.java:-1)
	  at sun.nio.ch.WindowsSelectorImpl$SubSelector.poll(WindowsSelectorImpl.java:296)
	  at sun.nio.ch.WindowsSelectorImpl$SubSelector.access$400(WindowsSelectorImpl.java:278)
	  at sun.nio.ch.WindowsSelectorImpl.doSelect(WindowsSelectorImpl.java:159)
	  at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
	  - locked <0x661> (a sun.nio.ch.WindowsSelectorImpl)
	  - locked <0x662> (a java.util.Collections$UnmodifiableSet)
	  - locked <0x663> (a sun.nio.ch.Util$2)
	  at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
	  at org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
	  at org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:415)
	  at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
	  at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
	  at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
	  at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
	  at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
	  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	  at java.lang.Thread.run(Thread.java:745)

"tube_single_netty_worker-4@1241" prio=5 tid=0x14 nid=NA runnable
  java.lang.Thread.State: RUNNABLE
	  at sun.nio.ch.WindowsSelectorImpl$SubSelector.poll0(WindowsSelectorImpl.java:-1)
	  at sun.nio.ch.WindowsSelectorImpl$SubSelector.poll(WindowsSelectorImpl.java:296)
	  at sun.nio.ch.WindowsSelectorImpl$SubSelector.access$400(WindowsSelectorImpl.java:278)
	  at sun.nio.ch.WindowsSelectorImpl.doSelect(WindowsSelectorImpl.java:159)
	  at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
	  - locked <0x664> (a sun.nio.ch.WindowsSelectorImpl)
	  - locked <0x665> (a java.util.Collections$UnmodifiableSet)
	  - locked <0x666> (a sun.nio.ch.Util$2)
	  at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
	  at org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
	  at org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:415)
	  at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
	  at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
	  at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
	  at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
	  at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
	  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	  at java.lang.Thread.run(Thread.java:745)

"tube_single_netty_worker-3@1240" prio=5 tid=0x13 nid=NA runnable
  java.lang.Thread.State: RUNNABLE
	  at sun.nio.ch.WindowsSelectorImpl$SubSelector.poll0(WindowsSelectorImpl.java:-1)
	  at sun.nio.ch.WindowsSelectorImpl$SubSelector.poll(WindowsSelectorImpl.java:296)
	  at sun.nio.ch.WindowsSelectorImpl$SubSelector.access$400(WindowsSelectorImpl.java:278)
	  at sun.nio.ch.WindowsSelectorImpl.doSelect(WindowsSelectorImpl.java:159)
	  at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
	  - locked <0x667> (a sun.nio.ch.WindowsSelectorImpl)
	  - locked <0x668> (a java.util.Collections$UnmodifiableSet)
	  - locked <0x669> (a sun.nio.ch.Util$2)
	  at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
	  at org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
	  at org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:415)
	  at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
	  at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
	  at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
	  at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
	  at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
	  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	  at java.lang.Thread.run(Thread.java:745)

"tube_single_netty_worker-2@1239" prio=5 tid=0x12 nid=NA runnable
  java.lang.Thread.State: RUNNABLE
	  at sun.nio.ch.WindowsSelectorImpl$SubSelector.poll0(WindowsSelectorImpl.java:-1)
	  at sun.nio.ch.WindowsSelectorImpl$SubSelector.poll(WindowsSelectorImpl.java:296)
	  at sun.nio.ch.WindowsSelectorImpl$SubSelector.access$400(WindowsSelectorImpl.java:278)
	  at sun.nio.ch.WindowsSelectorImpl.doSelect(WindowsSelectorImpl.java:159)
	  at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
	  - locked <0x66a> (a sun.nio.ch.WindowsSelectorImpl)
	  - locked <0x66b> (a java.util.Collections$UnmodifiableSet)
	  - locked <0x66c> (a sun.nio.ch.Util$2)
	  at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
	  at org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
	  at org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:415)
	  at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
	  at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
	  at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
	  at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
	  at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
	  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	  at java.lang.Thread.run(Thread.java:745)

"tube_single_netty_worker-1@1232" prio=5 tid=0x11 nid=NA runnable
  java.lang.Thread.State: RUNNABLE
	  at sun.nio.ch.WindowsSelectorImpl$SubSelector.poll0(WindowsSelectorImpl.java:-1)
	  at sun.nio.ch.WindowsSelectorImpl$SubSelector.poll(WindowsSelectorImpl.java:296)
	  at sun.nio.ch.WindowsSelectorImpl$SubSelector.access$400(WindowsSelectorImpl.java:278)
	  at sun.nio.ch.WindowsSelectorImpl.doSelect(WindowsSelectorImpl.java:159)
	  at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
	  - locked <0x66d> (a sun.nio.ch.WindowsSelectorImpl)
	  - locked <0x66e> (a java.util.Collections$UnmodifiableSet)
	  - locked <0x66f> (a sun.nio.ch.Util$2)
	  at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
	  at org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
	  at org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:415)
	  at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
	  at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
	  at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
	  at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
	  at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
	  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	  at java.lang.Thread.run(Thread.java:745)

"New I/O boss #10@1252" prio=5 tid=0x1b nid=NA runnable
  java.lang.Thread.State: RUNNABLE
	  at sun.nio.ch.WindowsSelectorImpl$SubSelector.poll0(WindowsSelectorImpl.java:-1)
	  at sun.nio.ch.WindowsSelectorImpl$SubSelector.poll(WindowsSelectorImpl.java:296)
	  at sun.nio.ch.WindowsSelectorImpl$SubSelector.access$400(WindowsSelectorImpl.java:278)
	  at sun.nio.ch.WindowsSelectorImpl.doSelect(WindowsSelectorImpl.java:159)
	  at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
	  - locked <0x652> (a sun.nio.ch.WindowsSelectorImpl)
	  - locked <0x653> (a java.util.Collections$UnmodifiableSet)
	  - locked <0x654> (a sun.nio.ch.Util$2)
	  at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
	  at org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
	  at org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:415)
	  at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
	  at org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:42)
	  at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
	  at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
	  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	  at java.lang.Thread.run(Thread.java:745)

"Master-Heartbeat-Thread-kl_192.168.7.77-12548-1568806990526-1-Push-3.8.0@1604" prio=10 tid=0x2c nid=NA waiting
  java.lang.Thread.State: WAITING
	  at sun.misc.Unsafe.park(Unsafe.java:-1)
	  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
	  at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
	  at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
	  at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
	  at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
	  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
	  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	  at java.lang.Thread.run(Thread.java:745)

"pool-6-thread-3@1396" prio=5 tid=0x20 nid=NA waiting
  java.lang.Thread.State: WAITING
	  at sun.misc.Unsafe.park(Unsafe.java:-1)
	  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
	  at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
	  at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
	  at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1066)
	  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
	  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	  at java.lang.Thread.run(Thread.java:745)

"pool-6-thread-2@1395" prio=5 tid=0x1f nid=NA waiting
  java.lang.Thread.State: WAITING
	  at sun.misc.Unsafe.park(Unsafe.java:-1)
	  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
	  at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
	  at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
	  at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1066)
	  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
	  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	  at java.lang.Thread.run(Thread.java:745)

"pool-6-thread-1@1375" prio=5 tid=0x1e nid=NA waiting
  java.lang.Thread.State: WAITING
	  at sun.misc.Unsafe.park(Unsafe.java:-1)
	  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
	  at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
	  at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
	  at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1066)
	  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
	  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	  at java.lang.Thread.run(Thread.java:745)

"Rebalance-Thread-kl_192.168.7.77-12548-1568806990526-1-Push-3.8.0@1607" prio=10 tid=0x23 nid=NA waiting
  java.lang.Thread.State: WAITING
	  at sun.misc.Unsafe.park(Unsafe.java:-1)
	  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
	  at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
	  at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:403)
	  at com.tencent.tubemq.client.consumer.BaseMessageConsumer$2.run(BaseMessageConsumer.java:170)
	  at java.lang.Thread.run(Thread.java:745)

"Fetch_Worker_kl-7@1522" prio=5 tid=0x2b nid=NA runnable
  java.lang.Thread.State: RUNNABLE
	  at com.tencent.tubemq.client.consumer.RmtDataCache.pushSelect(RmtDataCache.java:228)
	  at com.tencent.tubemq.client.consumer.BaseMessageConsumer.pushSelectPartition(BaseMessageConsumer.java:702)
	  at com.tencent.tubemq.client.consumer.MessageFetchManager$FetchTaskWorker.run(MessageFetchManager.java:180)
	  at java.lang.Thread.run(Thread.java:745)

"Fetch_Worker_kl-6@1521" prio=5 tid=0x2a nid=NA runnable
  java.lang.Thread.State: RUNNABLE
	  at com.tencent.tubemq.client.consumer.RmtDataCache.pushSelect(RmtDataCache.java:228)
	  at com.tencent.tubemq.client.consumer.BaseMessageConsumer.pushSelectPartition(BaseMessageConsumer.java:702)
	  at com.tencent.tubemq.client.consumer.MessageFetchManager$FetchTaskWorker.run(MessageFetchManager.java:180)
	  at java.lang.Thread.run(Thread.java:745)

"Fetch_Worker_kl-4@1518" prio=5 tid=0x28 nid=NA runnable
  java.lang.Thread.State: RUNNABLE
	  at com.tencent.tubemq.client.consumer.RmtDataCache.pushSelect(RmtDataCache.java:242)
	  at com.tencent.tubemq.client.consumer.BaseMessageConsumer.pushSelectPartition(BaseMessageConsumer.java:702)
	  at com.tencent.tubemq.client.consumer.MessageFetchManager$FetchTaskWorker.run(MessageFetchManager.java:180)
	  at java.lang.Thread.run(Thread.java:745)

"Fetch_Worker_kl-3@1519" prio=5 tid=0x27 nid=NA runnable
  java.lang.Thread.State: RUNNABLE
	  at com.tencent.tubemq.client.consumer.RmtDataCache.pushSelect(RmtDataCache.java:228)
	  at com.tencent.tubemq.client.consumer.BaseMessageConsumer.pushSelectPartition(BaseMessageConsumer.java:702)
	  at com.tencent.tubemq.client.consumer.MessageFetchManager$FetchTaskWorker.run(MessageFetchManager.java:180)
	  at java.lang.Thread.run(Thread.java:745)

"Fetch_Worker_kl-5@1520" prio=5 tid=0x29 nid=NA runnable
  java.lang.Thread.State: RUNNABLE
	  at com.tencent.tubemq.client.consumer.MessageFetchManager$FetchTaskWorker.run(MessageFetchManager.java:175)
	  at java.lang.Thread.run(Thread.java:745)

"Fetch_Worker_kl-2@1517" prio=5 tid=0x26 nid=NA runnable
  java.lang.Thread.State: RUNNABLE
	  at com.tencent.tubemq.client.consumer.MessageFetchManager$FetchTaskWorker.run(MessageFetchManager.java:178)
	  at java.lang.Thread.run(Thread.java:745)

"Fetch_Worker_kl-1@1516" prio=5 tid=0x25 nid=NA runnable
  java.lang.Thread.State: RUNNABLE
	  at com.tencent.tubemq.client.consumer.MessageFetchManager$FetchTaskWorker.run(MessageFetchManager.java:171)
	  at java.lang.Thread.run(Thread.java:745)

"Fetch_Worker_kl-0@1515" prio=5 tid=0x24 nid=NA runnable
  java.lang.Thread.State: RUNNABLE
	  at com.tencent.tubemq.client.consumer.MessageFetchManager$FetchTaskWorker.run(MessageFetchManager.java:171)
	  at java.lang.Thread.run(Thread.java:745)

"Hashed wheel timer #4@1706" prio=5 tid=0x22 nid=NA sleeping
  java.lang.Thread.State: TIMED_WAITING
	  at java.lang.Thread.sleep(Thread.java:-1)
	  at org.jboss.netty.util.HashedWheelTimer$Worker.waitForNextTick(HashedWheelTimer.java:483)
	  at org.jboss.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:392)
	  at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
	  at java.lang.Thread.run(Thread.java:745)

"Broker-Heartbeat-Thread-kl_192.168.7.77-12548-1568806990526-1-Push-3.8.0@1606" prio=10 tid=0x2d nid=NA sleeping
  java.lang.Thread.State: TIMED_WAITING
	  at java.lang.Thread.sleep(Thread.java:-1)
	  at com.tencent.tubemq.client.consumer.BaseMessageConsumer$HeartTask2BrokerWorker.run(BaseMessageConsumer.java:1626)
	  at java.lang.Thread.run(Thread.java:745)

"Hashed wheel timer #2@1387" prio=5 tid=0x1a nid=NA sleeping
  java.lang.Thread.State: TIMED_WAITING
	  at java.lang.Thread.sleep(Thread.java:-1)
	  at org.jboss.netty.util.HashedWheelTimer$Worker.waitForNextTick(HashedWheelTimer.java:483)
	  at org.jboss.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:392)
	  at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
	  at java.lang.Thread.run(Thread.java:745)

"Hashed wheel timer #1@1367" prio=5 tid=0x10 nid=NA sleeping
  java.lang.Thread.State: TIMED_WAITING
	  at java.lang.Thread.sleep(Thread.java:-1)
	  at org.jboss.netty.util.HashedWheelTimer$Worker.waitForNextTick(HashedWheelTimer.java:483)
	  at org.jboss.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:392)
	  at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
	  at java.lang.Thread.run(Thread.java:745)

"rpcFactory-Thread-0@1276" prio=5 tid=0x1c nid=NA sleeping
  java.lang.Thread.State: TIMED_WAITING
	  at java.lang.Thread.sleep(Thread.java:-1)
	  at com.tencent.tubemq.corerpc.RpcServiceFactory$ConnectionManager.run(RpcServiceFactory.java:544)

"Finalizer@1616" daemon prio=8 tid=0x3 nid=NA waiting
  java.lang.Thread.State: WAITING
	  at java.lang.Object.wait(Object.java:-1)
	  at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
	  at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
	  at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)

"Reference Handler@1617" daemon prio=10 tid=0x2 nid=NA waiting
  java.lang.Thread.State: WAITING
	  at java.lang.Object.wait(Object.java:-1)
	  at java.lang.Object.wait(Object.java:502)
	  at java.lang.ref.Reference.tryHandlePending(Reference.java:191)
	  at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153)

"DestroyJavaVM@1610" prio=5 tid=0x2e nid=NA runnable
  java.lang.Thread.State: RUNNABLE

"Attach Listener@1614" daemon prio=5 tid=0x5 nid=NA runnable
  java.lang.Thread.State: RUNNABLE

"Signal Dispatcher@1615" daemon prio=9 tid=0x4 nid=NA runnable
  java.lang.Thread.State: RUNNABLE


@lizhiboo
Copy link
Contributor

我在本地测试了一下,发现大部分的线程堆栈是netty包下面的线程,netty3在windows环境下会存在这个问题,目前建议先在linux环境运行。后续会有计划升级到netty4。

@klboke
Copy link
Contributor Author

klboke commented Sep 19, 2019

感觉不是netty的问题,TubeMQ的consumer默认创建了8个FetchTaskWorker线程来获取message,当我试图手动设置consumerConfig.setPushFetchThreadCnt(3)时,CPU100%占用的情况明显好转了,而且最终我发现,FetchTaskWorker线程近乎死循环一样的请求Broker获取message,获取不到时,会响应404,最终走向如下逻辑分支中的case TErrCodeConstants.NOT_FOUND,

switch (msgRspB2C.getErrCode()) {
                case TErrCodeConstants.SUCCESS: {
                    int msgSize = 0;
                    // Convert the message payload data
                    List<Message> tmpMessageList =
                            DataConverterUtil.convertMessage(topic, msgRspB2C.getMessagesList());
                    boolean isEscLimit =
                            (msgRspB2C.hasEscFlowCtrl() && msgRspB2C.getEscFlowCtrl());
                    // Filter the message based on its content
                    // Calculate the message size and do some flow control
                    boolean needFilter = false;
                    Set<String> topicFilterSet = null;
                    TopicProcessor topicProcessor = consumeSubInfo.getTopicProcesser(topic);
                    if (topicProcessor != null) {
                        topicFilterSet = topicProcessor.getFilterConds();
                        if (topicFilterSet != null && !topicFilterSet.isEmpty()) {
                            needFilter = true;
                        }
                    }
                    List<Message> messageList = new ArrayList<Message>();
                    for (Message message : tmpMessageList) {
                        if (message == null) {
                            continue;
                        }
                        if (needFilter && (TStringUtils.isBlank(message.getMsgType())
                                || !topicFilterSet.contains(message.getMsgType()))) {
                            continue;
                        }
                        messageList.add(message);
                        msgSize += message.getData().length;
                    }
                    // Set the process result of current stage. Process the result based on the response
                    long dataDltVal = msgRspB2C.hasCurrDataDlt()
                            ? msgRspB2C.getCurrDataDlt() : -1;
                    long currOffset = msgRspB2C.hasCurrOffset()
                            ? msgRspB2C.getCurrOffset() : TBaseConstants.META_VALUE_UNDEFINED;
                    boolean isRequireSlow =
                            (msgRspB2C.hasRequireSlow() && msgRspB2C.getRequireSlow());
                    rmtDataCache
                            .setPartitionContextInfo(partitionKey, currOffset, 1,
                                    msgRspB2C.getErrCode(), isEscLimit, msgSize, 0,
                                    dataDltVal, isRequireSlow);
                    taskContext.setSuccessProcessResult(currOffset,
                            strBuffer.append(partitionKey).append(TokenConstants.ATTR_SEP)
                                    .append(taskContext.getUsedToken()).toString(), messageList);
                    strBuffer.delete(0, strBuffer.length());
                    break;
                }
                case TErrCodeConstants.NOT_FOUND:
                case TErrCodeConstants.FORBIDDEN:
                case TErrCodeConstants.MOVED: {
                    // Slow down the request based on the limitation configuration when meet these errors
                    long limitDlt = consumerConfig.getMsgNotFoundWaitPeriodMs();
                    if (msgRspB2C.getErrCode() == TErrCodeConstants.FORBIDDEN) {
                        limitDlt = 2000;
                    } else if (msgRspB2C.getErrCode() == TErrCodeConstants.MOVED) {
                        limitDlt = 200;
                    }
                    rmtDataCache.errRspRelease(partitionKey, topic,
                            taskContext.getUsedToken(), false, -1,
                            0, msgRspB2C.getErrCode(), false, 0,
                            limitDlt, isFilterConsume(topic), -1);
                    taskContext.setFailProcessResult(msgRspB2C.getErrCode(), msgRspB2C.getErrMsg());
                    break;
                }
                case TErrCodeConstants.HB_NO_NODE:
                case TErrCodeConstants.CERTIFICATE_FAILURE:
                case TErrCodeConstants.DUPLICATE_PARTITION: {
                    // Release the partitions when meeting these error codes
                    removePartition(partition);
                    taskContext.setFailProcessResult(msgRspB2C.getErrCode(), msgRspB2C.getErrMsg());
                    break;
                }
                case TErrCodeConstants.SERVER_CONSUME_SPEED_LIMIT: {
                    // Process with server side speed limit
                    long defDltTime =
                            msgRspB2C.hasMinLimitTime()
                                    ? msgRspB2C.getMinLimitTime() : consumerConfig.getMsgNotFoundWaitPeriodMs();
                    rmtDataCache.errRspRelease(partitionKey, topic,
                            taskContext.getUsedToken(), false, -1,
                            0, msgRspB2C.getErrCode(), false, 0,
                            defDltTime, isFilterConsume(topic), -1);
                    taskContext.setFailProcessResult(msgRspB2C.getErrCode(), msgRspB2C.getErrMsg());
                    break;
                }
                default: {
                    // Unknown error
                    rmtDataCache.errRspRelease(partitionKey, topic,
                            taskContext.getUsedToken(), false, -1,
                            0, msgRspB2C.getErrCode(), false, 0,
                            300, isFilterConsume(topic), -1);
                    taskContext.setFailProcessResult(msgRspB2C.getErrCode(), msgRspB2C.getErrMsg());
                    break;
                }
            }

看这块的代码注释,应该根据限制配置减慢请求的速度,默认配置getMsgNotFoundWaitPeriodMs为200ms,但是下面的代码没有任何阻塞的操作。最后的errRspRelease()操作,也是以近乎死循环的往HashedWheelTimer写任务,如:

    public void errRspRelease(String partitionKey, String topicName,
                              long usedToken, boolean isLastPackConsumed,
                              long currOffset, int reqProcType, int errCode,
                              boolean isEscLimit, int msgSize, long limitDlt,
                              boolean isFilterConsume, long curDataDlt) {
        PartitionExt partitionExt = this.partitionMap.get(partitionKey);
        if (partitionExt != null) {
            if (!indexPartition.contains(partitionKey) && !isTimeWait(partitionKey)) {
                Long oldUsedToken = partitionUsedMap.get(partitionKey);
                if (oldUsedToken != null && oldUsedToken == usedToken) {
                    if (currOffset >= 0) {
                        partitionOffsetMap.put(partitionKey, currOffset);
                    }
                    partitionUsedMap.remove(partitionKey);
                    partitionExt.setLastPackConsumed(isLastPackConsumed);
                    long waitDlt = partitionExt.procConsumeResult(isFilterConsume, reqProcType,
                                    errCode, msgSize, isEscLimit, limitDlt, curDataDlt, false);
                    if (waitDlt > 10) {
                        timeouts.put(partitionKey, timer.newTimeout(new TimeoutTask(partitionKey), waitDlt, TimeUnit.MILLISECONDS));
                    } else {
                        try {
                            indexPartition.put(partitionKey);
                        } catch (Throwable e) {
                            //
                        }
                    }
                }
            }
        }
    }

所以最终体现CPU100%的问题可能就落到了netty的HashedWheelTimer中了。
不知道是否如此,还望答疑

@klboke
Copy link
Contributor Author

klboke commented Sep 19, 2019

下面是我在执行timeouts.put(partitionKey, timer.newTimeout(new TimeoutTask(partitionKey), waitDlt, TimeUnit.MILLISECONDS));前追加的日志打印情况,如:

[WARN ] 2019-09-19 18:27:41,551 method:com.tencent.tubemq.client.consumer.RmtDataCache.errRspRelease(RmtDataCache.java:359)
add TimeoutTask waitTime for 1
[WARN ] 2019-09-19 18:27:41,551 method:com.tencent.tubemq.client.consumer.RmtDataCache.errRspRelease(RmtDataCache.java:359)
add TimeoutTask waitTime for 0
[WARN ] 2019-09-19 18:27:41,551 method:com.tencent.tubemq.client.consumer.RmtDataCache.errRspRelease(RmtDataCache.java:359)
add TimeoutTask waitTime for 0
[WARN ] 2019-09-19 18:27:41,849 method:com.tencent.tubemq.client.consumer.RmtDataCache.errRspRelease(RmtDataCache.java:359)
add TimeoutTask waitTime for 1
[WARN ] 2019-09-19 18:27:41,849 method:com.tencent.tubemq.client.consumer.RmtDataCache.errRspRelease(RmtDataCache.java:359)
add TimeoutTask waitTime for 1
[WARN ] 2019-09-19 18:27:41,849 method:com.tencent.tubemq.client.consumer.RmtDataCache.errRspRelease(RmtDataCache.java:359)
add TimeoutTask waitTime for 0
[WARN ] 2019-09-19 18:27:42,154 method:com.tencent.tubemq.client.consumer.RmtDataCache.errRspRelease(RmtDataCache.java:359)
add TimeoutTask waitTime for 0
[WARN ] 2019-09-19 18:27:42,154 method:com.tencent.tubemq.client.consumer.RmtDataCache.errRspRelease(RmtDataCache.java:359)
add TimeoutTask waitTime for 0
[WARN ] 2019-09-19 18:27:42,154 method:com.tencent.tubemq.client.consumer.RmtDataCache.errRspRelease(RmtDataCache.java:359)
add TimeoutTask waitTime for 0
[WARN ] 2019-09-19 18:27:42,459 method:com.tencent.tubemq.client.consumer.RmtDataCache.errRspRelease(RmtDataCache.java:359)
add TimeoutTask waitTime for 1
[WARN ] 2019-09-19 18:27:42,459 method:com.tencent.tubemq.client.consumer.RmtDataCache.errRspRelease(RmtDataCache.java:359)
add TimeoutTask waitTime for 2
[WARN ] 2019-09-19 18:27:42,459 method:com.tencent.tubemq.client.consumer.RmtDataCache.errRspRelease(RmtDataCache.java:359)
add TimeoutTask waitTime for 1
[WARN ] 2019-09-19 18:27:42,767 method:com.tencent.tubemq.client.consumer.RmtDataCache.errRspRelease(RmtDataCache.java:359)
add TimeoutTask waitTime for 0
[WARN ] 2019-09-19 18:27:42,767 method:com.tencent.tubemq.client.consumer.RmtDataCache.errRspRelease(RmtDataCache.java:359)
add TimeoutTask waitTime for 0
[WARN ] 2019-09-19 18:27:42,767 method:com.tencent.tubemq.client.consumer.RmtDataCache.errRspRelease(RmtDataCache.java:359)
add TimeoutTask waitTime for 0
[WARN ] 2019-09-19 18:27:43,062 method:com.tencent.tubemq.client.consumer.RmtDataCache.errRspRelease(RmtDataCache.java:359)
add TimeoutTask waitTime for 2
[WARN ] 2019-09-19 18:27:43,062 method:com.tencent.tubemq.client.consumer.RmtDataCache.errRspRelease(RmtDataCache.java:359)
add TimeoutTask waitTime for 0
[WARN ] 2019-09-19 18:27:43,062 method:com.tencent.tubemq.client.consumer.RmtDataCache.errRspRelease(RmtDataCache.java:359)
add TimeoutTask waitTime for 0
[WARN ] 2019-09-19 18:27:43,354 method:com.tencent.tubemq.client.consumer.RmtDataCache.errRspRelease(RmtDataCache.java:359)
add TimeoutTask waitTime for 0
[WARN ] 2019-09-19 18:27:43,354 method:com.tencent.tubemq.client.consumer.RmtDataCache.errRspRelease(RmtDataCache.java:359)
add TimeoutTask waitTime for 1
[WARN ] 2019-09-19 18:27:43,354 method:com.tencent.tubemq.client.consumer.RmtDataCache.errRspRelease(RmtDataCache.java:359)

@lizhiboo
Copy link
Contributor

lizhiboo commented Sep 19, 2019

确实是FetchTaskWorker线程导致cpu100%,问题应该是出在RmtDataCache.java
String key = indexPartition.poll(); ### #修改成indexPartition.take();当indexPartition为空时,阻塞等待timeout生产分区信息到indexPartition。

    public PartitionSelectResult pushSelect() {
        do {
            if (this.isClosed.get()) {
                break;
            }
            if (!partitionMap.isEmpty()) {
                break;
            }
            ThreadUtils.sleep(200);
        } while (true);
        if (this.isClosed.get()) {
            return null;
        }
        waitCont.incrementAndGet();
        try {
            rebProcessWait();
            if (this.isClosed.get()) {
                return null;
            }
            String key = indexPartition.take();
            if (key == null) {
                return null;
            }
            PartitionExt partitionExt = partitionMap.get(key);
            if (partitionExt == null) {
                return null;
            }
            long curTime = System.currentTimeMillis();
            Long newTime = partitionUsedMap.putIfAbsent(key, curTime);
            if (newTime != null) {
                return null;
            }
            return new PartitionSelectResult(partitionExt,
                    curTime, partitionExt.getAndResetLastPackConsumed());
        } catch (Throwable e1) {
            return null;
        } finally {
            waitCont.decrementAndGet();
        }
    }

@gosonzhang
Copy link
Collaborator

Thanks @klboke

This was referenced Sep 19, 2019
guangxuCheng pushed a commit to guangxuCheng/TubeMQ that referenced this issue Jul 3, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants