Skip to content

Commit

Permalink
[feat] safely producterCtx
Browse files Browse the repository at this point in the history
  • Loading branch information
aireet committed Jul 26, 2022
1 parent 4fe4a77 commit f8c1bbd
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 3 deletions.
5 changes: 4 additions & 1 deletion producer/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,12 @@ func newTraceInterceptor(traceCfg *primitive.TraceConfig) primitive.Interceptor
return fmt.Errorf("GetOrNewRocketMQClient faild")
}
beginT := time.Now()
producerCtx, ok := primitive.GetProducerCtx(ctx)
if !ok {
return fmt.Errorf("ProducerCtx Not Exist")
}
err := next(ctx, req, reply)

producerCtx, _ := primitive.GetProducerCtx(ctx)
if producerCtx.Message.Topic == dispatcher.GetTraceTopicName() {
return err
}
Expand Down
10 changes: 8 additions & 2 deletions producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,10 @@ func (p *defaultProducer) sendSync(ctx context.Context, msg *primitive.Message,
err error
)

var producerCtx *primitive.ProducerCtx
var (
producerCtx *primitive.ProducerCtx
ok bool
)
for retryCount := 0; retryCount < retryTime; retryCount++ {
mq := p.selectMessageQueue(msg)
if mq == nil {
Expand All @@ -315,7 +318,10 @@ func (p *defaultProducer) sendSync(ctx context.Context, msg *primitive.Message,
}

if p.interceptor != nil {
producerCtx, _ = primitive.GetProducerCtx(ctx)
producerCtx, ok = primitive.GetProducerCtx(ctx)
if !ok {
return fmt.Errorf("ProducerCtx Not Exist")
}
producerCtx.BrokerAddr = addr
producerCtx.MQ = *mq
}
Expand Down

0 comments on commit f8c1bbd

Please sign in to comment.