Skip to content

Commit

Permalink
experimental proxy with long poll nancel and tiny fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
hrissan committed May 3, 2024
1 parent 8fdf61b commit 8fe752c
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 44 deletions.
21 changes: 13 additions & 8 deletions internal/aggregator/ingress_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ type clientPool struct {
}

type longpollClient struct {
queryID int64
client *rpc.Client
cc rpc.CallbackContext
requestLen int
}

Expand Down Expand Up @@ -153,12 +154,12 @@ func keyFromHctx(hctx *rpc.HandlerContext, resultTag int32) data_model.Key {
}
}

func (ls *longpollShard) callback(queryID int64, resp *rpc.Response, err error, userData any) {
func (ls *longpollShard) callback(client *rpc.Client, queryID int64, resp *rpc.Response, err error, userData any) {
hctx := userData.(*rpc.HandlerContext)
ls.mu.Lock()
defer ls.mu.Unlock()
lpc, ok := ls.clientList[hctx]
if !ok || lpc.queryID != queryID {
if !ok || lpc.client != client || lpc.cc.QueryID() != queryID {
// server already cancelled longpoll call
// or hctx was cancelled and reused by server before client response arrived
// since we have no client cancellation, we rely on fact that client queryId does not repeat often
Expand All @@ -182,10 +183,13 @@ func (ls *longpollShard) CancelHijack(hctx *rpc.HandlerContext) {
ls.mu.Lock()
defer ls.mu.Unlock()
if lpc, ok := ls.clientList[hctx]; ok {
key := keyFromHctx(hctx, format.TagValueIDRPCRequestsStatusErrCancel)
ls.proxy.sh2.AddValueCounter(key, float64(lpc.requestLen), 1, nil)
delete(ls.clientList, hctx)
// same order of locks between client and ls.mu as below
if lpc.client.CancelDoCallback(lpc.cc) {
key := keyFromHctx(hctx, format.TagValueIDRPCRequestsStatusErrCancel)
ls.proxy.sh2.AddValueCounter(key, float64(lpc.requestLen), 1, nil)
} // otherwise callback was/will be called
}
delete(ls.clientList, hctx)
}

func (proxy *IngressProxy) syncHandler(ctx context.Context, hctx *rpc.HandlerContext) error {
Expand Down Expand Up @@ -221,10 +225,11 @@ func (proxy *IngressProxy) syncHandlerImpl(ctx context.Context, hctx *rpc.Handle
ls := proxy.longpollShards[lockShardID]
ls.mu.Lock() // to avoid race with longpoll cancellation, all code below must run under lock
defer ls.mu.Unlock()
if err := client.DoCallback(ctx, proxy.config.Network, address, req, ls.callback, hctx); err != nil {
cc, err := client.DoCallback(ctx, proxy.config.Network, address, req, ls.callback, hctx)
if err != nil {
return format.TagValueIDRPCRequestsStatusErrLocal, err
}
ls.clientList[hctx] = longpollClient{queryID: req.QueryID(), requestLen: requestLen}
ls.clientList[hctx] = longpollClient{client: client, cc: cc, requestLen: requestLen}
return 0, hctx.HijackResponse(ls)
}

Expand Down
58 changes: 42 additions & 16 deletions internal/vkgo/rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"errors"
"fmt"
"log"
"math"
"sync"
"time"

Expand Down Expand Up @@ -85,7 +86,9 @@ type Response struct {

type Client struct {
// Place atomics first to ensure proper alignment, see https://pkg.go.dev/sync/atomic#pkg-note-BUG
lastQueryID atomic.Int64 // use an atomic counter instead of pure random to guarantee no ID reuse
// use an atomic counter instead of pure random to guarantee no ID reuse
// queryID should be per connection, but in our API user fills Request before connection is established/known, so we use per client
lastQueryID atomic.Uint64

opts ClientOptions

Expand Down Expand Up @@ -121,7 +124,7 @@ func NewClient(options ...ClientOptionsFunc) *Client {
opts: opts,
conns: map[NetAddr]*clientConn{},
}
c.lastQueryID.Store(rand.Int63() / 2) // We like positive query IDs. Dividing by 2 makes wrapping very rare
c.lastQueryID.Store(rand.Uint64())

return c
}
Expand Down Expand Up @@ -158,30 +161,50 @@ func (c *Client) Logf(format string, args ...any) {
c.opts.Logf(format, args...)
}

// Do supports only "tcp4" and "unix" networks
// Do supports only "tcp", "tcp4", "tcp6" and "unix" networks
func (c *Client) Do(ctx context.Context, network string, address string, req *Request) (*Response, error) {
pc, cctx, err := c.setupCall(ctx, NetAddr{network, address}, req, nil, nil, nil)
if err != nil {
return nil, err
}
select {
case <-ctx.Done():
pc.cancelCall(cctx, nil) // do not unblock, reuse normally
_ = pc.cancelCall(cctx.queryID, nil) // do not unblock, reuse normally
return nil, ctx.Err()
case r := <-cctx.result: // got ownership of cctx
defer c.putCallContext(cctx)
return r.resp, r.err
}
}

// Experimental API, can change any moment
type ClientCallback func(queryID int64, resp *Response, err error, userData any)
// Experimental API, can change any moment. For high-performance clients, like ingress proxy
type ClientCallback func(client *Client, queryID int64, resp *Response, err error, userData any)

type CallbackContext struct {
pc *clientConn
queryID int64
}

func (cc CallbackContext) QueryID() int64 { return cc.queryID } // so client will not have to remember queryID separately

// Either error is returned immediately, or ClientCallback will be called in the future.
// There is no cancellation logic, you must remember QueryID and compare ir for now to avoid inconsistency
func (c *Client) DoCallback(ctx context.Context, network string, address string, req *Request, cb ClientCallback, userData any) error {
_, _, err := c.setupCall(ctx, NetAddr{network, address}, req, nil, cb, userData)
return err
// We add explicit userData, because for many users it will avoid allocation of lambda during capture of userData in cb
func (c *Client) DoCallback(ctx context.Context, network string, address string, req *Request, cb ClientCallback, userData any) (CallbackContext, error) {
queryID := req.QueryID() // must not access cctx or req after setupCall, because callback can be already called and cctx reused
pc, _, err := c.setupCall(ctx, NetAddr{network, address}, req, nil, cb, userData)
return CallbackContext{
pc: pc,
queryID: queryID,
}, err
}

// Callback will never be called twice, but you should be ready for callback even after you call Cancel.
// This is because callback could be in the process of calling.
// The best idea is to remember queryID (which is unique per Client) of call in your per call data structure and compare in callback.
// if cancelled == true, call was cancelled before callback scheduled for calling/called
// if cancelled == false, call result was already being delivered/delivered
func (c *Client) CancelDoCallback(cc CallbackContext) (cancelled bool) {
return cc.pc.cancelCall(cc.queryID, nil)
}

// Starts if it needs to
Expand Down Expand Up @@ -317,9 +340,12 @@ func (c *Client) getLoad(address NetAddr) int {

func (c *Client) GetRequest() *Request {
req := c.getRequest()
req.queryID = c.lastQueryID.Inc()
for req.queryID == 0 { // so users can user QueryID as a flag
req.queryID = c.lastQueryID.Inc()
for {
req.queryID = int64(c.lastQueryID.Inc() & math.MaxInt64)
// We like positive query IDs, but not 0, so users can user QueryID as a flag
if req.queryID != 0 {
break
}
}
return req
}
Expand Down Expand Up @@ -362,14 +388,14 @@ func (c *Client) PutResponse(resp *Response) {
c.responsePool.Put(resp)
}

func (pc *clientConn) getCallContext() *callContext {
v := pc.client.callCtxPool.Get()
func (c *Client) getCallContext() *callContext {
v := c.callCtxPool.Get()
if v != nil {
return v.(*callContext)
}
cctx := &callContext{
singleResult: make(chan *callContext, 1),
hookState: pc.client.opts.Hooks(),
hookState: c.opts.Hooks(),
}
cctx.result = cctx.singleResult
return cctx
Expand Down
2 changes: 1 addition & 1 deletion internal/vkgo/rpc/client_cctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,5 +146,5 @@ func (cctx *callContext) deliverResult(c *Client) {
err := cctx.err
userData := cctx.userData
c.putCallContext(cctx)
cb(queryID, resp, err, userData)
cb(c, queryID, resp, err, userData)
}
40 changes: 22 additions & 18 deletions internal/vkgo/rpc/client_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (pc *clientConn) close() error {

// if multiResult is used for many requests, it must contain enough space so that no receiver is blocked
func (pc *clientConn) setupCallLocked(req *Request, deadline time.Time, multiResult chan *callContext, cb ClientCallback, userData any) (*callContext, error) {
cctx := pc.getCallContext()
cctx := pc.client.getCallContext()
cctx.queryID = req.QueryID()
if multiResult != nil {
cctx.result = multiResult // overrides single-result channel
Expand Down Expand Up @@ -92,40 +92,40 @@ func (pc *clientConn) setupCallLocked(req *Request, deadline time.Time, multiRes
return cctx, nil
}

func (pc *clientConn) cancelCall(cctx *callContext, unblockWaitersError error) {
shouldRelease, shouldSignal := pc.cancelCallImpl(cctx)
if shouldRelease {
func (pc *clientConn) cancelCall(queryID int64, deliverError error) (cancelled bool) {
cctx, shouldSignal := pc.cancelCallImpl(queryID)
if cctx != nil {
// exclusive ownership of cctx by this function
if unblockWaitersError != nil {
cctx.err = unblockWaitersError
if deliverError != nil {
cctx.err = deliverError
cctx.deliverResult(pc.client)
} else {
pc.client.putCallContext(cctx)
}
}
if shouldSignal {
if shouldSignal { // write tl.RpcCancelReq, Signal is outside lock for efficiency
pc.writeQCond.Signal()
}
return cctx != nil
}

func (pc *clientConn) cancelCallImpl(cctx *callContext) (shouldRelease bool, shouldSignal bool) {
func (pc *clientConn) cancelCallImpl(queryID int64) (shouldReleaseCctx *callContext, shouldSignal bool) {
pc.mu.Lock()
defer pc.mu.Unlock()
queryID := cctx.queryID
_, ok := pc.calls[queryID]
cctx, ok := pc.calls[queryID]
if !ok {
return false, false
return nil, false
}
delete(pc.calls, queryID)
if !cctx.sent {
cctx.stale = true // exclusive ownership of cctx by writeQ now, will be released
return false, false
return nil, false
}
if pc.conn != nil && pc.conn.FlagCancelReq() {
pc.writeQ = append(pc.writeQ, writeReq{cancelQueryID: queryID})
return true, true
return cctx, true
}
return true, false
return cctx, false
}

func (pc *clientConn) finishCall(queryID int64) *callContext {
Expand Down Expand Up @@ -180,9 +180,13 @@ func (pc *clientConn) massCancelRequestsLocked() (finishedCalls []*callContext)
continue
}
delete(pc.calls, queryID)
finishedCalls = append(finishedCalls, cctx)
// we do some allocations here, but after disconnect we allocate anyway
// we cannot call callbacks under any lock, otherwise deadlock
if cctx.cb == nil { // code moved from deliverResult to avoid allocation for common case of not using callbacks
cctx.result <- cctx
} else {
// we do some allocations here, but after disconnect we allocate anyway
// we cannot call callbacks under any lock, otherwise deadlock
finishedCalls = append(finishedCalls, cctx)
}
}
return
}
Expand Down Expand Up @@ -361,7 +365,7 @@ func (pc *clientConn) sendLoop(conn *PacketConn) error {
return nil
}
if writeBuiltin {
err := pc.conn.WritePacketBuiltinNoFlushUnlocked(pc.client.opts.PacketTimeout)
err := conn.WritePacketBuiltinNoFlushUnlocked(pc.client.opts.PacketTimeout)
if err != nil {
if !commonConnCloseError(err) {
pc.client.opts.Logf("rpc: failed to send ping/pong to %v, disconnecting: %v", conn.remoteAddr, err)
Expand Down
2 changes: 1 addition & 1 deletion internal/vkgo/rpc/clientmulti.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (m *Multi) Close() {
close(m.closeCh)

for _, cs := range m.calls {
cs.pc.cancelCall(cs.cctx, errMultiClosed) // Will release all waiting below
_ = cs.pc.cancelCall(cs.cctx.queryID, errMultiClosed) // Will release all waiting below
}
for queryID := range m.results {
delete(m.results, queryID)
Expand Down

0 comments on commit 8fe752c

Please sign in to comment.