Skip to content

Commit

Permalink
deps: update x/tools and gopls to a3568bac
Browse files Browse the repository at this point in the history
* internal/lsp: disable unimported completions for snippet tests a3568bac
* internal/telemetry/event: fix error/value key type tag formatting 066fd139
* internal/lsp: make sure that `gofmt -s` analyses don't modify AST 1fd97665
* internal/lsp: make event directly implement TagMap 903869a8
* internal/telemetry: add an example of using the logging behaviour ff0df582
* internal/lsp: use one context throughout completion ee2abff5
* internal/lsp: rewrite the rpc debug page 7db14c95
* internal/jsonrpc2: dont add any handlers by default ccaaa5c2
* internal/jsonrpc2: remove the legacy interface 17cc17e0
* internal/jsonrpc2: break Run up into composable handlers 3eebf4bf
* internal/jsonrpc2: remove request state 5c4bdbc0
  • Loading branch information
myitcv committed Apr 7, 2020
1 parent 281246e commit d076099
Show file tree
Hide file tree
Showing 25 changed files with 437 additions and 458 deletions.
86 changes: 60 additions & 26 deletions cmd/govim/internal/golang_org_x_tools/jsonrpc2/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ package jsonrpc2
import (
"context"
"fmt"
"sync"

"github.com/govim/govim/cmd/govim/internal/golang_org_x_tools/telemetry/event"
)

// Handler is invoked to handle incoming requests.
Expand All @@ -15,31 +18,6 @@ import (
// The handler should return ErrNotHandled if it could not handle the request.
type Handler func(context.Context, *Request) error

// LegacyHooks is a temporary measure during migration from the old Handler
// interface to the new HandleFunc.
// The intent is to delete this interface in a later cl.
type LegacyHooks interface {
// Deliver is invoked to handle incoming requests.
// If the request returns false from IsNotify then the Handler must eventually
// call Reply on the Conn with the supplied request.
// Handlers are called synchronously, they should pass the work off to a go
// routine if they are going to take a long time.
// If Deliver returns true all subsequent handlers will be invoked with
// delivered set to true, and should not attempt to deliver the message.
Deliver(ctx context.Context, r *Request, delivered bool) bool

// Cancel is invoked for cancelled outgoing requests.
// It is okay to use the connection to send notifications, but the context will
// be in the cancelled state, so you must do it with the background context
// instead.
// If Cancel returns true all subsequent handlers will be invoked with
// cancelled set to true, and should not attempt to cancel the message.
Cancel(ctx context.Context, conn *Conn, id ID, cancelled bool) bool

// Request is called near the start of processing any request.
Request(ctx context.Context, conn *Conn, direction Direction, r *WireRequest) context.Context
}

// Direction is used to indicate to a logger whether the logged message was being
// sent or received.
type Direction bool
Expand Down Expand Up @@ -74,9 +52,65 @@ func MethodNotFound(ctx context.Context, r *Request) error {
func MustReply(handler Handler) Handler {
return func(ctx context.Context, req *Request) error {
err := handler(ctx, req)
if req.state < requestReplied {
if req.done != nil {
panic(fmt.Errorf("request %q was never replied to", req.Method))
}
return err
}
}

// CancelHandler returns a handler that supports cancellation, and a canceller
// that can be used to trigger canceling in progress requests.
func CancelHandler(handler Handler) (Handler, Canceller) {
var mu sync.Mutex
handling := make(map[ID]context.CancelFunc)
wrapped := func(ctx context.Context, req *Request) error {
if req.ID != nil {
cancelCtx, cancel := context.WithCancel(ctx)
ctx = cancelCtx
mu.Lock()
handling[*req.ID] = cancel
mu.Unlock()
req.OnReply(func() {
mu.Lock()
delete(handling, *req.ID)
mu.Unlock()
})
}
return handler(ctx, req)
}
return wrapped, func(id ID) {
mu.Lock()
cancel, found := handling[id]
mu.Unlock()
if found {
cancel()
}
}
}

// AsyncHandler returns a handler that processes each request goes in its own
// goroutine.
// The handler returns immediately, without the request being processed.
// Each request then waits for the previous request to finish before it starts.
// This allows the stream to unblock at the cost of unbounded goroutines
// all stalled on the previous one.
func AsyncHandler(handler Handler) Handler {
nextRequest := make(chan struct{})
close(nextRequest)
return func(ctx context.Context, req *Request) error {
waitForPrevious := nextRequest
nextRequest = make(chan struct{})
unlockNext := nextRequest
req.OnReply(func() { close(unlockNext) })
_, queueDone := event.StartSpan(ctx, "queued")
go func() {
<-waitForPrevious
queueDone()
if err := handler(ctx, req); err != nil {
event.Error(ctx, "jsonrpc2 async message delivery failed", err)
}
}()
return nil
}
}
161 changes: 56 additions & 105 deletions cmd/govim/internal/golang_org_x_tools/jsonrpc2/jsonrpc2.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,35 +30,30 @@ const (
// Conn is bidirectional; it does not have a designated server or client end.
type Conn struct {
seq int64 // must only be accessed using atomic operations
LegacyHooks LegacyHooks
stream Stream
pendingMu sync.Mutex // protects the pending map
pending map[ID]chan *WireResponse
handlingMu sync.Mutex // protects the handling map
handling map[ID]*Request
onCancelled CallCanceller
}

type requestState int

const (
requestWaiting = requestState(iota)
requestSerial
requestParallel
requestReplied
requestDone
)

// Request is sent to a server to represent a Call or Notify operaton.
type Request struct {
conn *Conn
cancel context.CancelFunc
state requestState
nextRequest chan struct{}
conn *Conn
// done holds set of callbacks added by OnReply, and is set back to nil if
// Reply has been called.
done []func()

// The Wire values of the request.
WireRequest
}

// Canceller is the type for a function that can cancel an in progress request.
type Canceller func(id ID)

// CallCanceller is the type for a callback when an outgoing request is
// has it's context cancelled.
type CallCanceller func(context.Context, *Conn, ID)

type constError string

func (e constError) Error() string { return string(e) }
Expand All @@ -76,25 +71,17 @@ func NewErrorf(code int64, format string, args ...interface{}) *Error {
// You must call Run for the connection to be active.
func NewConn(s Stream) *Conn {
conn := &Conn{
stream: s,
pending: make(map[ID]chan *WireResponse),
handling: make(map[ID]*Request),
stream: s,
pending: make(map[ID]chan *WireResponse),
}
return conn
}

// Cancel cancels a pending Call on the server side.
// The call is identified by its id.
// JSON RPC 2 does not specify a cancel message, so cancellation support is not
// directly wired in. This method allows a higher level protocol to choose how
// to propagate the cancel.
func (c *Conn) Cancel(id ID) {
c.handlingMu.Lock()
handling, found := c.handling[id]
c.handlingMu.Unlock()
if found {
handling.cancel()
}
// OnCancelled sets the callback used when an outgoing call request has
// it's context cancelled when still in progress.
// Only the last callback registered is used.
func (c *Conn) OnCancelled(cancelled CallCanceller) {
c.onCancelled = cancelled
}

// Notify is called to send a notification request over the connection.
Expand All @@ -113,9 +100,6 @@ func (c *Conn) Notify(ctx context.Context, method string, params interface{}) (e
if err != nil {
return fmt.Errorf("marshalling notify request: %v", err)
}
if c.LegacyHooks != nil {
ctx = c.LegacyHooks.Request(ctx, c, Send, request)
}
ctx, done := event.StartSpan(ctx, request.Method,
tag.Method.Of(request.Method),
tag.RPCDirection.Of(tag.Outbound),
Expand All @@ -128,7 +112,7 @@ func (c *Conn) Notify(ctx context.Context, method string, params interface{}) (e

event.Record(ctx, tag.Started.Of(1))
n, err := c.stream.Write(ctx, data)
event.Record(ctx, tag.ReceivedBytes.Of(n))
event.Record(ctx, tag.SentBytes.Of(n))
return err
}

Expand All @@ -152,9 +136,6 @@ func (c *Conn) Call(ctx context.Context, method string, params, result interface
if err != nil {
return fmt.Errorf("marshalling call request: %v", err)
}
if c.LegacyHooks != nil {
ctx = c.LegacyHooks.Request(ctx, c, Send, request)
}
ctx, done := event.StartSpan(ctx, request.Method,
tag.Method.Of(request.Method),
tag.RPCDirection.Of(tag.Outbound),
Expand All @@ -180,7 +161,7 @@ func (c *Conn) Call(ctx context.Context, method string, params, result interface
}()
// now we are ready to send
n, err := c.stream.Write(ctx, data)
event.Record(ctx, tag.ReceivedBytes.Of(n))
event.Record(ctx, tag.SentBytes.Of(n))
if err != nil {
// sending failed, we will never get a response, so don't leave it pending
return err
Expand All @@ -201,8 +182,8 @@ func (c *Conn) Call(ctx context.Context, method string, params, result interface
return nil
case <-ctx.Done():
// Allow the handler to propagate the cancel.
if c.LegacyHooks != nil {
c.LegacyHooks.Cancel(ctx, c, id, false)
if c.onCancelled != nil {
c.onCancelled(ctx, c, id)
}
return ctx.Err()
}
Expand All @@ -222,16 +203,17 @@ func (r *Request) IsNotify() bool {
// This will mark the request as done, triggering any done
// handlers
func (r *Request) Reply(ctx context.Context, result interface{}, err error) error {
if r.state >= requestReplied {
if r.done == nil {
return fmt.Errorf("reply invoked more than once")
}

if r.state < requestParallel {
r.state = requestParallel
close(r.nextRequest)
}
r.state = requestReplied
recordStatus(ctx, nil)
defer func() {
recordStatus(ctx, err)
for i := len(r.done); i > 0; i-- {
r.done[i-1]()
}
r.done = nil
}()

if r.IsNotify() {
return nil
Expand All @@ -257,7 +239,7 @@ func (r *Request) Reply(ctx context.Context, result interface{}, err error) erro
return err
}
n, err := r.conn.stream.Write(ctx, data)
event.Record(ctx, tag.ReceivedBytes.Of(n))
event.Record(ctx, tag.SentBytes.Of(n))

if err != nil {
// TODO(iancottrell): if a stream write fails, we really need to shut down
Expand All @@ -267,17 +249,15 @@ func (r *Request) Reply(ctx context.Context, result interface{}, err error) erro
return nil
}

func setHandling(r *Request, active bool) {
if r.ID == nil {
return
}
r.conn.handlingMu.Lock()
defer r.conn.handlingMu.Unlock()
if active {
r.conn.handling[*r.ID] = r
} else {
delete(r.conn.handling, *r.ID)
}
// OnReply adds a done callback to the request.
// All added callbacks are invoked during the one required call to Reply, and
// then dropped.
// It is an error to call this after Reply.
// This call is not safe for concurrent use, but should only be invoked by
// handlers and in general only one handler should be working on a request
// at any time.
func (r *Request) OnReply(do func()) {
r.done = append(r.done, do)
}

// combined has all the fields of both Request and Response.
Expand All @@ -296,12 +276,6 @@ type combined struct {
// It must be called exactly once for each Conn.
// It returns only when the reader is closed or there is an error in the stream.
func (c *Conn) Run(runCtx context.Context, handler Handler) error {
handler = MustReply(handler)
// we need to make the next request "lock" in an unlocked state to allow
// the first incoming request to proceed. All later requests are unlocked
// by the preceding request going to parallel mode.
nextRequest := make(chan struct{})
close(nextRequest)
for {
// get the data for a message
data, n, err := c.stream.Read(runCtx)
Expand All @@ -321,53 +295,30 @@ func (c *Conn) Run(runCtx context.Context, handler Handler) error {
switch {
case msg.Method != "":
// If method is set it must be a request.
reqCtx, cancelReq := context.WithCancel(runCtx)
thisRequest := nextRequest
nextRequest = make(chan struct{})
reqCtx, spanDone := event.StartSpan(runCtx, msg.Method,
tag.Method.Of(msg.Method),
tag.RPCDirection.Of(tag.Inbound),
tag.RPCID.Of(msg.ID.String()),
)
event.Record(reqCtx,
tag.Started.Of(1),
tag.ReceivedBytes.Of(n))

req := &Request{
conn: c,
cancel: cancelReq,
nextRequest: nextRequest,
conn: c,
WireRequest: WireRequest{
VersionTag: msg.VersionTag,
Method: msg.Method,
Params: msg.Params,
ID: msg.ID,
},
}
if c.LegacyHooks != nil {
reqCtx = c.LegacyHooks.Request(reqCtx, c, Receive, &req.WireRequest)
req.OnReply(func() { spanDone() })

if err := handler(reqCtx, req); err != nil {
// delivery failed, not much we can do
event.Error(reqCtx, "jsonrpc2 message delivery failed", err)
}
reqCtx, done := event.StartSpan(reqCtx, req.WireRequest.Method,
tag.Method.Of(req.WireRequest.Method),
tag.RPCDirection.Of(tag.Inbound),
tag.RPCID.Of(req.WireRequest.ID.String()),
)
event.Record(reqCtx,
tag.Started.Of(1),
tag.SentBytes.Of(n))
setHandling(req, true)
_, queueDone := event.StartSpan(reqCtx, "queued")
go func() {
<-thisRequest
queueDone()
req.state = requestSerial
defer func() {
setHandling(req, false)
done()
cancelReq()
}()
if c.LegacyHooks != nil {
if c.LegacyHooks.Deliver(reqCtx, req, false) {
return
}
}
err := handler(reqCtx, req)
if err != nil {
// delivery failed, not much we can do
event.Error(reqCtx, "jsonrpc2 message delivery failed", err)
}
}()
case msg.ID != nil:
// If method is not set, this should be a response, in which case we must
// have an id to send the response back to the caller.
Expand Down
Loading

0 comments on commit d076099

Please sign in to comment.