-
Notifications
You must be signed in to change notification settings - Fork 827
[JAV-127]retry的场景在自己的线程池执行 #41
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
dab8cf1
a1caf7d
e10f247
2180800
f848ffc
9b6a8ad
45fcb7e
124253b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,6 +21,10 @@ | |
import java.util.Map; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.Executor; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.ThreadFactory; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
|
||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
@@ -53,6 +57,15 @@ | |
public class LoadbalanceHandler extends AbstractHandler { | ||
private static final Logger LOGGER = LoggerFactory.getLogger(LoadbalanceHandler.class); | ||
|
||
private static final ExecutorService RETRY_POOL = Executors.newCachedThreadPool(new ThreadFactory() { | ||
private AtomicInteger count = new AtomicInteger(0); | ||
|
||
@Override | ||
public Thread newThread(Runnable r) { | ||
return new Thread(r, "retry-pool-thread-" + count.getAndIncrement()); | ||
} | ||
}); | ||
|
||
// 会给每个Microservice创建一个handler实例,因此这里的key为transportName,保证每个通道使用一个负载均衡策略 | ||
private volatile Map<String, LoadBalancer> loadBalancerMap = new ConcurrentHashMap<>(); | ||
|
||
|
@@ -181,7 +194,9 @@ private void sendWithRetry(Invocation invocation, AsyncResponse asyncResp, | |
newExecutor = new Executor() { | ||
@Override | ||
public void execute(Runnable command) { | ||
command.run(); // retry的场景,对于同步调用, 需要在网络线程中进行。同步调用的主线程已经被挂起,无法再主线程中进行重试。 | ||
// retry的场景,对于同步调用, 同步调用的主线程已经被挂起,无法再主线程中进行重试; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you reproduce this issue in a test and ensure it's fixed in the new solution? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's quite easy to reproduce according to the issue discretion, and I have test it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. start a producer to listen to both rpc/highway, and a rest api that throws an exception. start a consumer with load balance retry enabled, and call this method There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. great, could you please add an automated test to ensure it won't be broken by changes in the future? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added |
||
// 重试也不能在网络线程(event-loop)中进行,未被保护的阻塞操作会导致网络线程挂起 | ||
RETRY_POOL.submit(command); | ||
} | ||
}; | ||
invocation.setResponseExecutor(newExecutor); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It could be better to use fix size thread pool to avoid the resource leak.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This Is not a resource leak stuff, but a performance stuff I think. And I.think it's better to use a cached pool to reuse in high volume request and when no requests thread will be destroyed automatically