Skip to content

Commit

Permalink
[ISSUE #1112] feat: optimize producer send async (#1111)
Browse files Browse the repository at this point in the history
* feat: optimize producer send async

* fix: fix mq override bug
  • Loading branch information
twz915 committed Dec 4, 2023
1 parent 7ae83c4 commit 7ffb599
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 13 deletions.
25 changes: 13 additions & 12 deletions internal/remote/remote_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,23 +112,24 @@ func (c *remotingClient) InvokeSync(ctx context.Context, addr string, request *R

// InvokeAsync send request without blocking, just return immediately.
func (c *remotingClient) InvokeAsync(ctx context.Context, addr string, request *RemotingCommand, callback func(*ResponseFuture)) error {
conn, err := c.connect(ctx, addr)
if err != nil {
return err
}

resp := NewResponseFuture(ctx, request.Opaque, callback)
c.responseTable.Store(resp.Opaque, resp)

err = c.sendRequest(ctx, conn, request)
if err != nil {
c.responseTable.Delete(request.Opaque)
return err
}

go primitive.WithRecover(func() {
defer resp.executeInvokeCallback()
defer c.responseTable.Delete(request.Opaque)

conn, err := c.connect(ctx, addr)
if err != nil {
resp.Err = err
return
}
err = c.sendRequest(ctx, conn, request)
if err != nil {
resp.Err = err
return
}
c.receiveAsync(resp)
c.responseTable.Delete(request.Opaque)
})

return nil
Expand Down
2 changes: 1 addition & 1 deletion producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func (p *defaultProducer) sendSync(ctx context.Context, msg *primitive.Message,
if mq != nil {
lastBrokerName = mq.BrokerName
}
mq := p.selectMessageQueue(msg, lastBrokerName)
mq = p.selectMessageQueue(msg, lastBrokerName)
if mq == nil {
err = fmt.Errorf("the topic=%s route info not found", msg.Topic)
continue
Expand Down

0 comments on commit 7ffb599

Please sign in to comment.