forked from dolthub/vitess
-
Notifications
You must be signed in to change notification settings - Fork 0
/
rpc_server.go
87 lines (72 loc) · 2.53 KB
/
rpc_server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
// Copyright 2012, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package tabletmanager
import (
"fmt"
log "github.com/golang/glog"
"github.com/youtube/vitess/go/tb"
"github.com/youtube/vitess/go/vt/callinfo"
"github.com/youtube/vitess/go/vt/topo/topoproto"
"golang.org/x/net/context"
)
// This file contains the RPC method helpers for the tablet manager.
//
// Utility functions for RPC service
//
// lock is used at the beginning of an RPC call, to lock the
// action mutex. It returns ctx.Err() if <-ctx.Done() after the lock.
func (agent *ActionAgent) lock(ctx context.Context) error {
agent.actionMutex.Lock()
// After we take the lock (which could take a long time), we
// check the client is still here.
select {
case <-ctx.Done():
agent.actionMutex.Unlock()
return ctx.Err()
default:
return nil
}
}
// unlock is the symetrical action to lock.
func (agent *ActionAgent) unlock() {
agent.actionMutex.Unlock()
}
// HandleRPCPanic is part of the RPCAgent interface.
func (agent *ActionAgent) HandleRPCPanic(ctx context.Context, name string, args, reply interface{}, verbose bool, err *error) {
// panic handling
if x := recover(); x != nil {
log.Errorf("TabletManager.%v(%v) on %v panic: %v\n%s", name, args, topoproto.TabletAliasString(agent.TabletAlias), x, tb.Stack(4))
*err = fmt.Errorf("caught panic during %v: %v", name, x)
return
}
// quick check for fast path
if !verbose && *err == nil {
return
}
// we gotta log something, get the source
from := ""
ci, ok := callinfo.FromContext(ctx)
if ok {
from = ci.Text()
}
if *err != nil {
// error case
log.Warningf("TabletManager.%v(%v)(on %v from %v) error: %v", name, args, topoproto.TabletAliasString(agent.TabletAlias), from, (*err).Error())
*err = fmt.Errorf("TabletManager.%v on %v error: %v", name, topoproto.TabletAliasString(agent.TabletAlias), *err)
} else {
// success case
log.Infof("TabletManager.%v(%v)(on %v from %v): %#v", name, args, topoproto.TabletAliasString(agent.TabletAlias), from, reply)
}
}
//
// RegisterQueryService is used to delay registration of RPC servers until we have all the objects.
type RegisterQueryService func(*ActionAgent)
// RegisterQueryServices is a list of functions to call when the delayed registration is triggered.
var RegisterQueryServices []RegisterQueryService
// registerQueryService will register all the instances.
func (agent *ActionAgent) registerQueryService() {
for _, f := range RegisterQueryServices {
f(agent)
}
}