From 43a30dfaddefbbe0a76ec49024234785b1e5c9ab Mon Sep 17 00:00:00 2001 From: xuegangjie <631126217@qq.com> Date: Fri, 22 Jul 2022 20:03:00 +0800 Subject: [PATCH 1/2] [feat] /safely producer ctx --- primitive/ctx.go | 5 +++-- producer/interceptor.go | 2 +- producer/producer.go | 2 +- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/primitive/ctx.go b/primitive/ctx.go index 936b54c7..878dbced 100644 --- a/primitive/ctx.go +++ b/primitive/ctx.go @@ -168,6 +168,7 @@ func WithProducerCtx(ctx context.Context, c *ProducerCtx) context.Context { return context.WithValue(ctx, producerCtx, c) } -func GetProducerCtx(ctx context.Context) *ProducerCtx { - return ctx.Value(producerCtx).(*ProducerCtx) +func GetProducerCtx(ctx context.Context) (*ProducerCtx, bool) { + c, exist := ctx.Value(producerCtx).(*ProducerCtx) + return c, exist } diff --git a/producer/interceptor.go b/producer/interceptor.go index 71eb8e70..0da1d4dc 100644 --- a/producer/interceptor.go +++ b/producer/interceptor.go @@ -54,7 +54,7 @@ func newTraceInterceptor(traceCfg *primitive.TraceConfig) primitive.Interceptor beginT := time.Now() err := next(ctx, req, reply) - producerCtx := primitive.GetProducerCtx(ctx) + producerCtx, _ := primitive.GetProducerCtx(ctx) if producerCtx.Message.Topic == dispatcher.GetTraceTopicName() { return err } diff --git a/producer/producer.go b/producer/producer.go index 3ead53fd..b8c5850a 100644 --- a/producer/producer.go +++ b/producer/producer.go @@ -306,7 +306,7 @@ func (p *defaultProducer) sendSync(ctx context.Context, msg *primitive.Message, } if p.interceptor != nil { - producerCtx = primitive.GetProducerCtx(ctx) + producerCtx, _ = primitive.GetProducerCtx(ctx) producerCtx.BrokerAddr = addr producerCtx.MQ = *mq } From f8c1bbd6b6ba6f4af6276405131974a0768fa2e7 Mon Sep 17 00:00:00 2001 From: xuegangjie <631126217@qq.com> Date: Tue, 26 Jul 2022 21:08:32 +0800 Subject: [PATCH 2/2] [feat] safely producterCtx --- producer/interceptor.go | 5 ++++- producer/producer.go | 10 ++++++++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/producer/interceptor.go b/producer/interceptor.go index 0da1d4dc..f77b9c69 100644 --- a/producer/interceptor.go +++ b/producer/interceptor.go @@ -52,9 +52,12 @@ func newTraceInterceptor(traceCfg *primitive.TraceConfig) primitive.Interceptor return fmt.Errorf("GetOrNewRocketMQClient faild") } beginT := time.Now() + producerCtx, ok := primitive.GetProducerCtx(ctx) + if !ok { + return fmt.Errorf("ProducerCtx Not Exist") + } err := next(ctx, req, reply) - producerCtx, _ := primitive.GetProducerCtx(ctx) if producerCtx.Message.Topic == dispatcher.GetTraceTopicName() { return err } diff --git a/producer/producer.go b/producer/producer.go index 4000803b..3a710039 100644 --- a/producer/producer.go +++ b/producer/producer.go @@ -301,7 +301,10 @@ func (p *defaultProducer) sendSync(ctx context.Context, msg *primitive.Message, err error ) - var producerCtx *primitive.ProducerCtx + var ( + producerCtx *primitive.ProducerCtx + ok bool + ) for retryCount := 0; retryCount < retryTime; retryCount++ { mq := p.selectMessageQueue(msg) if mq == nil { @@ -315,7 +318,10 @@ func (p *defaultProducer) sendSync(ctx context.Context, msg *primitive.Message, } if p.interceptor != nil { - producerCtx, _ = primitive.GetProducerCtx(ctx) + producerCtx, ok = primitive.GetProducerCtx(ctx) + if !ok { + return fmt.Errorf("ProducerCtx Not Exist") + } producerCtx.BrokerAddr = addr producerCtx.MQ = *mq }