From 8fe752c359c7c8318d4d6340a44cb275fb8630ab Mon Sep 17 00:00:00 2001 From: Grigory Buteyko Date: Fri, 3 May 2024 22:50:00 +0300 Subject: [PATCH] experimental proxy with long poll nancel and tiny fixes --- internal/aggregator/ingress_proxy.go | 21 ++++++---- internal/vkgo/rpc/client.go | 58 ++++++++++++++++++++-------- internal/vkgo/rpc/client_cctx.go | 2 +- internal/vkgo/rpc/client_conn.go | 40 ++++++++++--------- internal/vkgo/rpc/clientmulti.go | 2 +- 5 files changed, 79 insertions(+), 44 deletions(-) diff --git a/internal/aggregator/ingress_proxy.go b/internal/aggregator/ingress_proxy.go index e6dc17a29..db9609dc6 100644 --- a/internal/aggregator/ingress_proxy.go +++ b/internal/aggregator/ingress_proxy.go @@ -39,7 +39,8 @@ type clientPool struct { } type longpollClient struct { - queryID int64 + client *rpc.Client + cc rpc.CallbackContext requestLen int } @@ -153,12 +154,12 @@ func keyFromHctx(hctx *rpc.HandlerContext, resultTag int32) data_model.Key { } } -func (ls *longpollShard) callback(queryID int64, resp *rpc.Response, err error, userData any) { +func (ls *longpollShard) callback(client *rpc.Client, queryID int64, resp *rpc.Response, err error, userData any) { hctx := userData.(*rpc.HandlerContext) ls.mu.Lock() defer ls.mu.Unlock() lpc, ok := ls.clientList[hctx] - if !ok || lpc.queryID != queryID { + if !ok || lpc.client != client || lpc.cc.QueryID() != queryID { // server already cancelled longpoll call // or hctx was cancelled and reused by server before client response arrived // since we have no client cancellation, we rely on fact that client queryId does not repeat often @@ -182,10 +183,13 @@ func (ls *longpollShard) CancelHijack(hctx *rpc.HandlerContext) { ls.mu.Lock() defer ls.mu.Unlock() if lpc, ok := ls.clientList[hctx]; ok { - key := keyFromHctx(hctx, format.TagValueIDRPCRequestsStatusErrCancel) - ls.proxy.sh2.AddValueCounter(key, float64(lpc.requestLen), 1, nil) + delete(ls.clientList, hctx) + // same order of locks between client and ls.mu as below + if lpc.client.CancelDoCallback(lpc.cc) { + key := keyFromHctx(hctx, format.TagValueIDRPCRequestsStatusErrCancel) + ls.proxy.sh2.AddValueCounter(key, float64(lpc.requestLen), 1, nil) + } // otherwise callback was/will be called } - delete(ls.clientList, hctx) } func (proxy *IngressProxy) syncHandler(ctx context.Context, hctx *rpc.HandlerContext) error { @@ -221,10 +225,11 @@ func (proxy *IngressProxy) syncHandlerImpl(ctx context.Context, hctx *rpc.Handle ls := proxy.longpollShards[lockShardID] ls.mu.Lock() // to avoid race with longpoll cancellation, all code below must run under lock defer ls.mu.Unlock() - if err := client.DoCallback(ctx, proxy.config.Network, address, req, ls.callback, hctx); err != nil { + cc, err := client.DoCallback(ctx, proxy.config.Network, address, req, ls.callback, hctx) + if err != nil { return format.TagValueIDRPCRequestsStatusErrLocal, err } - ls.clientList[hctx] = longpollClient{queryID: req.QueryID(), requestLen: requestLen} + ls.clientList[hctx] = longpollClient{client: client, cc: cc, requestLen: requestLen} return 0, hctx.HijackResponse(ls) } diff --git a/internal/vkgo/rpc/client.go b/internal/vkgo/rpc/client.go index f2c963078..f7d865317 100644 --- a/internal/vkgo/rpc/client.go +++ b/internal/vkgo/rpc/client.go @@ -11,6 +11,7 @@ import ( "errors" "fmt" "log" + "math" "sync" "time" @@ -85,7 +86,9 @@ type Response struct { type Client struct { // Place atomics first to ensure proper alignment, see https://pkg.go.dev/sync/atomic#pkg-note-BUG - lastQueryID atomic.Int64 // use an atomic counter instead of pure random to guarantee no ID reuse + // use an atomic counter instead of pure random to guarantee no ID reuse + // queryID should be per connection, but in our API user fills Request before connection is established/known, so we use per client + lastQueryID atomic.Uint64 opts ClientOptions @@ -121,7 +124,7 @@ func NewClient(options ...ClientOptionsFunc) *Client { opts: opts, conns: map[NetAddr]*clientConn{}, } - c.lastQueryID.Store(rand.Int63() / 2) // We like positive query IDs. Dividing by 2 makes wrapping very rare + c.lastQueryID.Store(rand.Uint64()) return c } @@ -158,7 +161,7 @@ func (c *Client) Logf(format string, args ...any) { c.opts.Logf(format, args...) } -// Do supports only "tcp4" and "unix" networks +// Do supports only "tcp", "tcp4", "tcp6" and "unix" networks func (c *Client) Do(ctx context.Context, network string, address string, req *Request) (*Response, error) { pc, cctx, err := c.setupCall(ctx, NetAddr{network, address}, req, nil, nil, nil) if err != nil { @@ -166,7 +169,7 @@ func (c *Client) Do(ctx context.Context, network string, address string, req *Re } select { case <-ctx.Done(): - pc.cancelCall(cctx, nil) // do not unblock, reuse normally + _ = pc.cancelCall(cctx.queryID, nil) // do not unblock, reuse normally return nil, ctx.Err() case r := <-cctx.result: // got ownership of cctx defer c.putCallContext(cctx) @@ -174,14 +177,34 @@ func (c *Client) Do(ctx context.Context, network string, address string, req *Re } } -// Experimental API, can change any moment -type ClientCallback func(queryID int64, resp *Response, err error, userData any) +// Experimental API, can change any moment. For high-performance clients, like ingress proxy +type ClientCallback func(client *Client, queryID int64, resp *Response, err error, userData any) + +type CallbackContext struct { + pc *clientConn + queryID int64 +} + +func (cc CallbackContext) QueryID() int64 { return cc.queryID } // so client will not have to remember queryID separately // Either error is returned immediately, or ClientCallback will be called in the future. -// There is no cancellation logic, you must remember QueryID and compare ir for now to avoid inconsistency -func (c *Client) DoCallback(ctx context.Context, network string, address string, req *Request, cb ClientCallback, userData any) error { - _, _, err := c.setupCall(ctx, NetAddr{network, address}, req, nil, cb, userData) - return err +// We add explicit userData, because for many users it will avoid allocation of lambda during capture of userData in cb +func (c *Client) DoCallback(ctx context.Context, network string, address string, req *Request, cb ClientCallback, userData any) (CallbackContext, error) { + queryID := req.QueryID() // must not access cctx or req after setupCall, because callback can be already called and cctx reused + pc, _, err := c.setupCall(ctx, NetAddr{network, address}, req, nil, cb, userData) + return CallbackContext{ + pc: pc, + queryID: queryID, + }, err +} + +// Callback will never be called twice, but you should be ready for callback even after you call Cancel. +// This is because callback could be in the process of calling. +// The best idea is to remember queryID (which is unique per Client) of call in your per call data structure and compare in callback. +// if cancelled == true, call was cancelled before callback scheduled for calling/called +// if cancelled == false, call result was already being delivered/delivered +func (c *Client) CancelDoCallback(cc CallbackContext) (cancelled bool) { + return cc.pc.cancelCall(cc.queryID, nil) } // Starts if it needs to @@ -317,9 +340,12 @@ func (c *Client) getLoad(address NetAddr) int { func (c *Client) GetRequest() *Request { req := c.getRequest() - req.queryID = c.lastQueryID.Inc() - for req.queryID == 0 { // so users can user QueryID as a flag - req.queryID = c.lastQueryID.Inc() + for { + req.queryID = int64(c.lastQueryID.Inc() & math.MaxInt64) + // We like positive query IDs, but not 0, so users can user QueryID as a flag + if req.queryID != 0 { + break + } } return req } @@ -362,14 +388,14 @@ func (c *Client) PutResponse(resp *Response) { c.responsePool.Put(resp) } -func (pc *clientConn) getCallContext() *callContext { - v := pc.client.callCtxPool.Get() +func (c *Client) getCallContext() *callContext { + v := c.callCtxPool.Get() if v != nil { return v.(*callContext) } cctx := &callContext{ singleResult: make(chan *callContext, 1), - hookState: pc.client.opts.Hooks(), + hookState: c.opts.Hooks(), } cctx.result = cctx.singleResult return cctx diff --git a/internal/vkgo/rpc/client_cctx.go b/internal/vkgo/rpc/client_cctx.go index e28ce519c..e5deee8e9 100644 --- a/internal/vkgo/rpc/client_cctx.go +++ b/internal/vkgo/rpc/client_cctx.go @@ -146,5 +146,5 @@ func (cctx *callContext) deliverResult(c *Client) { err := cctx.err userData := cctx.userData c.putCallContext(cctx) - cb(queryID, resp, err, userData) + cb(c, queryID, resp, err, userData) } diff --git a/internal/vkgo/rpc/client_conn.go b/internal/vkgo/rpc/client_conn.go index 6adcd84ec..ae121e7e6 100644 --- a/internal/vkgo/rpc/client_conn.go +++ b/internal/vkgo/rpc/client_conn.go @@ -55,7 +55,7 @@ func (pc *clientConn) close() error { // if multiResult is used for many requests, it must contain enough space so that no receiver is blocked func (pc *clientConn) setupCallLocked(req *Request, deadline time.Time, multiResult chan *callContext, cb ClientCallback, userData any) (*callContext, error) { - cctx := pc.getCallContext() + cctx := pc.client.getCallContext() cctx.queryID = req.QueryID() if multiResult != nil { cctx.result = multiResult // overrides single-result channel @@ -92,40 +92,40 @@ func (pc *clientConn) setupCallLocked(req *Request, deadline time.Time, multiRes return cctx, nil } -func (pc *clientConn) cancelCall(cctx *callContext, unblockWaitersError error) { - shouldRelease, shouldSignal := pc.cancelCallImpl(cctx) - if shouldRelease { +func (pc *clientConn) cancelCall(queryID int64, deliverError error) (cancelled bool) { + cctx, shouldSignal := pc.cancelCallImpl(queryID) + if cctx != nil { // exclusive ownership of cctx by this function - if unblockWaitersError != nil { - cctx.err = unblockWaitersError + if deliverError != nil { + cctx.err = deliverError cctx.deliverResult(pc.client) } else { pc.client.putCallContext(cctx) } } - if shouldSignal { + if shouldSignal { // write tl.RpcCancelReq, Signal is outside lock for efficiency pc.writeQCond.Signal() } + return cctx != nil } -func (pc *clientConn) cancelCallImpl(cctx *callContext) (shouldRelease bool, shouldSignal bool) { +func (pc *clientConn) cancelCallImpl(queryID int64) (shouldReleaseCctx *callContext, shouldSignal bool) { pc.mu.Lock() defer pc.mu.Unlock() - queryID := cctx.queryID - _, ok := pc.calls[queryID] + cctx, ok := pc.calls[queryID] if !ok { - return false, false + return nil, false } delete(pc.calls, queryID) if !cctx.sent { cctx.stale = true // exclusive ownership of cctx by writeQ now, will be released - return false, false + return nil, false } if pc.conn != nil && pc.conn.FlagCancelReq() { pc.writeQ = append(pc.writeQ, writeReq{cancelQueryID: queryID}) - return true, true + return cctx, true } - return true, false + return cctx, false } func (pc *clientConn) finishCall(queryID int64) *callContext { @@ -180,9 +180,13 @@ func (pc *clientConn) massCancelRequestsLocked() (finishedCalls []*callContext) continue } delete(pc.calls, queryID) - finishedCalls = append(finishedCalls, cctx) - // we do some allocations here, but after disconnect we allocate anyway - // we cannot call callbacks under any lock, otherwise deadlock + if cctx.cb == nil { // code moved from deliverResult to avoid allocation for common case of not using callbacks + cctx.result <- cctx + } else { + // we do some allocations here, but after disconnect we allocate anyway + // we cannot call callbacks under any lock, otherwise deadlock + finishedCalls = append(finishedCalls, cctx) + } } return } @@ -361,7 +365,7 @@ func (pc *clientConn) sendLoop(conn *PacketConn) error { return nil } if writeBuiltin { - err := pc.conn.WritePacketBuiltinNoFlushUnlocked(pc.client.opts.PacketTimeout) + err := conn.WritePacketBuiltinNoFlushUnlocked(pc.client.opts.PacketTimeout) if err != nil { if !commonConnCloseError(err) { pc.client.opts.Logf("rpc: failed to send ping/pong to %v, disconnecting: %v", conn.remoteAddr, err) diff --git a/internal/vkgo/rpc/clientmulti.go b/internal/vkgo/rpc/clientmulti.go index e110785f0..e415dda7b 100644 --- a/internal/vkgo/rpc/clientmulti.go +++ b/internal/vkgo/rpc/clientmulti.go @@ -69,7 +69,7 @@ func (m *Multi) Close() { close(m.closeCh) for _, cs := range m.calls { - cs.pc.cancelCall(cs.cctx, errMultiClosed) // Will release all waiting below + _ = cs.pc.cancelCall(cs.cctx.queryID, errMultiClosed) // Will release all waiting below } for queryID := range m.results { delete(m.results, queryID)