Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue 491][rpc_client]fix: timeout guarantee for RequestOnCnx #492

Merged
merged 1 commit into from
Jun 2, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 25 additions & 19 deletions pulsar/internal/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"errors"
"net"
"net/url"
"sync"
"sync/atomic"
"time"

Expand All @@ -31,6 +30,16 @@ import (
"github.com/gogo/protobuf/proto"
)

var (
// ErrRequestTimeOut happens when request not finished in given requestTimeout.
ErrRequestTimeOut = errors.New("request timed out")
)

type result struct {
*RPCResult
error
}

type RPCResult struct {
Response *pb.BaseCommand
Cnx Connection
Expand Down Expand Up @@ -121,14 +130,10 @@ func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, request
return nil, err
}

type Res struct {
*RPCResult
error
}
ch := make(chan Res, 10)
ch := make(chan result, 1)

cnx.SendRequest(requestID, baseCommand(cmdType, message), func(response *pb.BaseCommand, err error) {
ch <- Res{&RPCResult{
ch <- result{&RPCResult{
Cnx: cnx,
Response: response,
}, err}
Expand All @@ -139,29 +144,30 @@ func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, request
case res := <-ch:
return res.RPCResult, res.error
case <-time.After(c.requestTimeout):
return nil, errors.New("request timed out")
return nil, ErrRequestTimeOut
}
}

func (c *rpcClient) RequestOnCnx(cnx Connection, requestID uint64, cmdType pb.BaseCommand_Type,
message proto.Message) (*RPCResult, error) {
c.metrics.RPCRequestCount.Inc()
wg := sync.WaitGroup{}
wg.Add(1)

rpcResult := &RPCResult{
Cnx: cnx,
}
ch := make(chan result, 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we consider adjusting the size of the channel buffer appropriately? It looks like 10 is a good choice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0 maybe enough for this because there will be only one result sends to ch . Set it to 1 to prevent block happens on the sender side.


var rpcErr error
cnx.SendRequest(requestID, baseCommand(cmdType, message), func(response *pb.BaseCommand, err error) {
rpcResult.Response = response
rpcErr = err
wg.Done()
ch <- result{&RPCResult{
Cnx: cnx,
Response: response,
}, err}
close(ch)
})

wg.Wait()
return rpcResult, rpcErr
select {
case res := <-ch:
return res.RPCResult, res.error
case <-time.After(c.requestTimeout):
return nil, ErrRequestTimeOut
}
}

func (c *rpcClient) RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message proto.Message) error {
Expand Down