-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
分布式追踪系统场景下,如何使用TTL #53
Comments
所以你 测试到的现象 是 预期的功能。 示意代码调整如下,你看看,说明白了不? @wyzssw ExecutorService service = Executors.newFixedThreadPool(1);
ExecutorService executorService = TtlExecutors.getTtlExecutorService(service);
// 通过`TransmittableThreadLocal`变量查看/修改的自己线程的哪个拷贝
// 所以命名 不用Parent,往往按用途命名
// 且是 static 变量
static final TransmittableThreadLocal<String> storeTestContext = new TransmittableThreadLocal<String>();
// Use
public void testMethod() {
storeTestContext.set("value-set-in-parent");
executorService.submit(new Runnable() {
@Override
public void run() {
// 继承自testMethod方法所在线程
System.out.println("Before set in Runnable: " + storeTestContext.get());
storeTestContext.set("asf");
// 设置新值,不会影响testMethod线程
System.out.println("After set in Runnable: " + storeTestContext.get());
}
});
Thread.sleep(1000);
String value = storeTestContext.get();
System.out.println("In testMethod: " + value);
} 输出:
|
可以先说说你的需求/要解决的问题。再看看合适的解决方案。 如果 场景/需求 是合理的,可以考虑提供功能: 在其它线程中 去设置 本线程的Context。 |
我现在做一套分布式追踪系统,比如一次RPC调用, 框架里面要对这次调用链记录traceId和spanId(类似淘宝鹰眼的 在同一 子线程 生成 父线程通过 因为 |
明白你的场景了,分布式追踪系统是个重要的基础系统 👍 把你说的Server调用,画个示意图,TraceId是同一个,括号里是SpanId。 Client A(ROOT) +---> Server B(1) +---> Server C(1.1) +---> Server Y(1.1.1)
| | |
| | \---> Server Z(1.1.2)
| \---> Server D(1.2)
|
\---> Server E(2) +---> Server F(2.1)
|
\---> Server G(2.2) 用全局的ConcurrentMap实现 TraceId对应的 下级Server的SpanId的使用记录,是简单直接的,无需 上面图是按Server维度在说明,Server之间是RPC调用,TraceId/SpanId的传递可以通过Rpc调用的Context来完成。 对于上面 Server C(1.1) ------> Server Z(1.1.2)
MethodC1 =====> MethodC2 =*=*=> MethodC3 ------> MethodZ1
直接调用 异步调用
如,MethodC2提交到Runnable到线程池,
在Runnable中调用MethodC3
上面的场景你是如何解决的? @wyzssw PS: 本Server一级的 |
多谢回复 是的,和你的思路是一样的,traceId 的多线程传递我用的就是 |
关于『用全局的ConcurrentMap实现 TraceId对应的 下级Server的SpanId的使用记录』,调用完成时,需要清除Map中的 TraceId 的条目,避免内存泄漏,TraceId随着时间推移会有很多。 你是如何解决的? 另外,关于『唯一一点侵入就是需要业务代码中使用 |
你到提醒了我,我之前没有考虑oneway方法的rpc请求,只考虑了父线程会等到所有子线程的结果,每次都是server receive时put操作,server send时remove操作,我再想下好的方案,或者大牛你有好的方案还请不吝赐教。
|
客气了 :) 关于『jvm参数加agent,业务方估计会比较排斥,因为不知道agent修改了哪些类,会担心影响性能和安全』:
上面的事情是一次性的,比起业务不透明,让各个使用方在不同的应用代码中保证,如果可以做到,个人感觉投入是值得的。 当然,前提是『你可以接管容器的运行方式』: 😄
|
现在我们后端服务没有使用tomcat,只有api层使用了tomcat,后端各个服务都是一个单独的thrift-server,如果要添加agent需要事先将multithread.context-1.1.0-SNAPSHOT.jar部署到线上服务器上,或者放在工程lib目录里,对发布部署迁移升级带来些额外工作量。 |
我想到的解法是对『traceId』做引用计数。 具体参见 运行结果: DEBUG: Increase reference counter(1) for traceId traceId-111 in thread main
DEBUG: Increase reference counter(2) for traceId traceId-111 in thread main
Do Rpc invocation to server server 2 with {traceId=traceId-111, spanId=baseSpanId-1.1.1}
Do Rpc invocation to server server 3 with {traceId=traceId-111, spanId=baseSpanId-1.1.3}
Do Rpc invocation to server server 1 with {traceId=traceId-111, spanId=baseSpanId-1.1.2}
DEBUG: Decrease reference counter(1) for traceId traceId-111 in thread Executors
DEBUG: Decrease reference counter(0) for traceId traceId-111 in thread main
DEBUG: Clear traceId2LeafSpanIdInfo for traceId traceId-111 in thread main PS: 上代码省时间,不附文字说明了~ 😸 |
当然要实现简单的话,跨线程池时,让用户自己传递一下Context。示意实现如下: final Object traceContext = TraceContext.borrowTraceContext(); // 计数加1
executor.submit(new Runnable() {
public void run() {
TraceContext.returnAndSetTraceContext(traceContext); // 计数减1
// biz code
// 执行结束,没有清理ThreadLocal中的Context!
// 但是一个线程就一个Context没清,线程数有限,Context占用内存一般很小,可以接受。
}
}); 这样 让用户使用麻烦一些,但完全避开 但 下面的逻辑还是有的:
|
太赞了,借助的 @Override
protected void afterExecute() {
decreaseSpanIdRefCounter();
} 我clone了distributed-tracer-support分支是运行成功了,可否将新版本发布到maven仓库中? 我在 我的 |
是的,distributed-tracer-support分支有修复这个问题的提交。
『trace框架只开放给使用方TraceIdContext.getTraceId()方法』赞成! PS 推荐建议:
👍 |
👍 多谢大牛的指点,我会在代码里改正,又学到不少东西,之前你写的show-busy-java-threads.sh我一直在用 😄 |
Fix了 『没有 afterExecute 回调 #54 』 1.2.1 release 并 发到了Maven中央库了。 关于前面说的『引用计数』解法,在 使用 详见Demo 运行结果如下: ......
Do Rpc invocation to server server 1 with {traceId=traceId_XXXYYY39, spanId=1.1.1}
Finished Rpc call traceId_XXXYYY39 with span LeafSpanIdInfo{current=3}.
Do Rpc invocation to server server 3 with {traceId=traceId_XXXYYY38, spanId=1.1.2}
Do Rpc invocation to server server 1 with {traceId=traceId_XXXYYY40, spanId=1.1.1}
Finished Rpc call traceId_XXXYYY40 with span LeafSpanIdInfo{current=3}.
Do Rpc invocation to server server 3 with {traceId=traceId_XXXYYY39, spanId=1.1.2}
Do Rpc invocation to server server 2 with {traceId=traceId_XXXYYY33, spanId=1.1.3}
Do Rpc invocation to server server 3 with {traceId=traceId_XXXYYY41, spanId=1.1.2}
Do Rpc invocation to server server 1 with {traceId=traceId_XXXYYY41, spanId=1.1.1}
Finished Rpc call traceId_XXXYYY41 with span LeafSpanIdInfo{current=3}.
Do Rpc invocation to server server 3 with {traceId=traceId_XXXYYY40, spanId=1.1.2}
Do Rpc invocation to server server 1 with {traceId=traceId_XXXYYY42, spanId=1.1.1}
Finished Rpc call traceId_XXXYYY42 with span LeafSpanIdInfo{current=3}.
Do Rpc invocation to server server 2 with {traceId=traceId_XXXYYY34, spanId=1.1.3}
Do Rpc invocation to server server 3 with {traceId=traceId_XXXYYY42, spanId=1.1.2}
Do Rpc invocation to server server 3 with {traceId=traceId_XXXYYY43, spanId=1.1.2}
Do Rpc invocation to server server 1 with {traceId=traceId_XXXYYY43, spanId=1.1.1}
Finished Rpc call traceId_XXXYYY43 with span LeafSpanIdInfo{current=3}.
DEBUG: Remove traceId traceId_XXXYYY19 in thread main by cause COLLECTED: null
DEBUG: Remove traceId traceId_XXXYYY12 in thread main by cause COLLECTED: null
DEBUG: Remove traceId traceId_XXXYYY5 in thread main by cause COLLECTED: null
DEBUG: Remove traceId traceId_XXXYYY2 in thread main by cause COLLECTED: null
DEBUG: Remove traceId traceId_XXXYYY13 in thread main by cause COLLECTED: null
DEBUG: Remove traceId traceId_XXXYYY40 in thread main by cause COLLECTED: null
Do Rpc invocation to server server 2 with {traceId=traceId_XXXYYY42, spanId=1.1.3}
DEBUG: Remove traceId traceId_XXXYYY27 in thread main by cause COLLECTED: null
Do Rpc invocation to server server 2 with {traceId=traceId_XXXYYY43, spanId=1.1.3}
DEBUG: Remove traceId traceId_XXXYYY10 in thread main by cause COLLECTED: null
DEBUG: Remove traceId traceId_XXXYYY21 in thread main by cause COLLECTED: null
DEBUG: Remove traceId traceId_XXXYYY32 in thread main by cause COLLECTED: null
DEBUG: Remove traceId traceId_XXXYYY29 in thread main by cause COLLECTED: null
DEBUG: Remove traceId traceId_XXXYYY35 in thread main by cause COLLECTED: null
DEBUG: Remove traceId traceId_XXXYYY8 in thread main by cause COLLECTED: null
DEBUG: Remove traceId traceId_XXXYYY36 in thread main by cause COLLECTED: null
DEBUG: Remove traceId traceId_XXXYYY18 in thread main by cause COLLECTED: null
DEBUG: Remove traceId traceId_XXXYYY26 in thread main by cause COLLECTED: null
DEBUG: Remove traceId traceId_XXXYYY39 in thread main by cause COLLECTED: null
Do Rpc invocation to server server 2 with {traceId=traceId_XXXYYY44, spanId=1.1.1}
Do Rpc invocation to server server 3 with {traceId=traceId_XXXYYY44, spanId=1.1.3}
Do Rpc invocation to server server 1 with {traceId=traceId_XXXYYY44, spanId=1.1.2}
Finished Rpc call traceId_XXXYYY44 with span LeafSpanIdInfo{current=4}.
Do Rpc invocation to server server 1 with {traceId=traceId_XXXYYY45, spanId=1.1.1}
Finished Rpc call traceId_XXXYYY45 with span LeafSpanIdInfo{current=4}.
Do Rpc invocation to server server 3 with {traceId=traceId_XXXYYY45, spanId=1.1.3}
...... # 老Demo在 |
多谢发布,之前考虑过用 new Thread(new Runnable() {
@Override
public void run() {
syncMethod_ByNewThread(); //断点处
}
}, "Thread-by-new").start(); |
问题Fix了,解决方法:让 详见提交 199e7de 。 PS: 运行结果如下: ......
Do Rpc invocation to server server 3 with {traceId=traceId_XXXYYY38, spanId=1.1.3}
Do Rpc invocation to server server 2 with {traceId=traceId_XXXYYY39, spanId=1.1.1}
Do Rpc invocation to server server 3 with {traceId=traceId_XXXYYY39, spanId=1.1.3}
Do Rpc invocation to server server 1 with {traceId=traceId_XXXYYY39, spanId=1.1.2}
Finished Rpc call traceId_XXXYYY39 with span LeafSpanIdInfo{current=4}.
Do Rpc invocation to server server 2 with {traceId=traceId_XXXYYY40, spanId=1.1.1}
Do Rpc invocation to server server 3 with {traceId=traceId_XXXYYY40, spanId=1.1.3}
Do Rpc invocation to server server 1 with {traceId=traceId_XXXYYY40, spanId=1.1.2}
Finished Rpc call traceId_XXXYYY40 with span LeafSpanIdInfo{current=4}.
Do Rpc invocation to server server 2 with {traceId=traceId_XXXYYY41, spanId=1.1.1}
Do Rpc invocation to server server 3 with {traceId=traceId_XXXYYY41, spanId=1.1.3}
Do Rpc invocation to server server 1 with {traceId=traceId_XXXYYY41, spanId=1.1.2}
Finished Rpc call traceId_XXXYYY41 with span LeafSpanIdInfo{current=4}.
Do Rpc invocation to server server 2 with {traceId=traceId_XXXYYY42, spanId=1.1.1}
Do Rpc invocation to server server 1 with {traceId=traceId_XXXYYY42, spanId=1.1.2}
Finished Rpc call traceId_XXXYYY42 with span LeafSpanIdInfo{current=4}.
Do Rpc invocation to server server 3 with {traceId=traceId_XXXYYY42, spanId=1.1.3}
DEBUG: Remove traceId traceId_XXXYYY38 in thread main by cause COLLECTED: null
DEBUG: Remove traceId traceId_XXXYYY28 in thread main by cause COLLECTED: null
DEBUG: Remove traceId traceId_XXXYYY14 in thread main by cause COLLECTED: null
DEBUG: Remove traceId traceId_XXXYYY41 in thread main by cause COLLECTED: null
DEBUG: Remove traceId traceId_XXXYYY11 in thread main by cause COLLECTED: null
DEBUG: Remove traceId traceId_XXXYYY3 in thread main by cause COLLECTED: null
DEBUG: Remove traceId traceId_XXXYYY33 in thread main by cause COLLECTED: null
DEBUG: Remove traceId traceId_XXXYYY20 in thread main by cause COLLECTED: null
DEBUG: Remove traceId traceId_XXXYYY25 in thread main by cause COLLECTED: null
...... |
『让
|
经过你这么一说,我也明白了, import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import com.alibaba.ttl.TransmittableThreadLocal;
import com.alibaba.ttl.threadpool.TtlExecutors;
import com.google.common.base.Optional;
public class Test {
public static void main(String[] args) throws InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(10);
ExecutorService executorService = TtlExecutors.getTtlExecutorService(service);
final TransmittableThreadLocal<Optional<AtomicLong>> parent =
new TransmittableThreadLocal<Optional<AtomicLong>>();
parent.set(Optional.of(new AtomicLong(0)));
System.out.println("Parent-start at spanId " + parent.get().get().incrementAndGet());
// 挤满线程
for (int i = 0; i < 10; i++) {
service.submit(new Runnable() {
@Override
public void run() {}
});
}
// 提交任务,执行rpc调用
for (int i = 0; i < 30; i++) {
executorService.submit(new Runnable() {
@Override
public void run() {
Optional<AtomicLong> spanIncr = parent.get();
if (spanIncr.isPresent()) {
// 打印输出子线程所需的spanId
System.out.println("Sub start at spanId " + spanIncr.get().incrementAndGet());
}
}
});
}
// 打印父线程退出的spanId
System.out.println("Parent-end at spanId " + parent.get().get().incrementAndGet());
}
} 输出
『为了让性能测试 更容易发现问题,在测试时,让 |
缺省是浅拷贝,可以通过覆盖 PS: |
嘿嘿,果然,去除Optional也行:+1: |
多谢反馈,发现了bug,也一起深入梳理了 分布式追踪场景下 很愉快 😋 后面有什么问题欢迎交流~ 👏 |
嗯,和牛人交流,收获很多,以后多多交流:clap: |
通过 可以确认到在Main线程结束前, 100% 回收了 |
👍 cool |
学习 |
看得我跳了起来 |
学习了 |
学习了 |
2 similar comments
学习了 |
学习了 |
学习了,膜拜一下。对技术追求极致,才能不断进步。 |
学习了 |
学习了 |
good |
2024年了,不知道还有伙伴关注这块么,又学到了! |
可以有 💕😄 @ling0900 |
@oldratlee 大佬,我看您指点回复写的代码中,这里是不是应该做个小小修改🤔:testMethod.get() ----> storeTestContext.get() |
@ling0900 👍👌 已修正~ |
子线程无法修改父线程 threadlocal,不知道设计时是否支持该功能
打印出
The text was updated successfully, but these errors were encountered: