Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Duplicated transaction check when queue of AbstractTransactionalMessageCheckListener.excuetorService is full #1519

Closed
areyouok opened this issue Oct 11, 2019 · 2 comments
Labels

Comments

@areyouok
Copy link
Contributor

@areyouok areyouok commented Oct 11, 2019

master1        | java.util.concurrent.RejectedExecutionException: Task org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener$2@1e179a79 rejected from java.util.concurrent.ThreadPoolExecutor@bb1ccc7[Running, pool size = 5, active threads = 4, queued tasks = 2000, completed tasks = 30247]
master1        | 	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
master1        | 	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
master1        | 	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
master1        | 	at org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener.resolveHalfMsg(AbstractTransactionalMessageCheckListener.java:80)
master1        | 	at org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl.check(TransactionalMessageServiceImpl.java:223)
master1        | 	at org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService.onWaitEnd(TransactionalMessageCheckService.java:55)
master1        | 	at org.apache.rocketmq.common.ServiceThread.waitForRunning(ServiceThread.java:144)
master1        | 	at org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService.run(TransactionalMessageCheckService.java:44)
master1        | 	at java.lang.Thread.run(Thread.java:748)

Because the queue of ExcuetorService (created in AbstractTransactionalMessageCheckListener) is full :

private static ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setName("Transaction-msg-check-thread");
            return thread;
        }
    });

When the exception occurs, the newOffset of TransactionalMessageServiceImpl.check is not update to ConsumeQueue, so lots of duplicated check occurs.

@duhenglucky

This comment has been minimized.

Copy link
Contributor

@duhenglucky duhenglucky commented Oct 18, 2019

@areyouok in order to avoid lost check request, will not update new offset when rejected.

@areyouok

This comment has been minimized.

Copy link
Contributor Author

@areyouok areyouok commented Oct 18, 2019

This PR include the fix
#1544

duhenglucky added a commit that referenced this issue Oct 31, 2019
[ISSUE #1519]Optimise performance/stability of  transaction message
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
2 participants
You can’t perform that action at this time.