forked from vitessio/vitess
-
Notifications
You must be signed in to change notification settings - Fork 1
/
instance.go
202 lines (174 loc) · 6.57 KB
/
instance.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
// Copyright 2015, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package worker
import (
"errors"
"fmt"
"os"
"os/signal"
"sync"
"syscall"
"time"
log "github.com/golang/glog"
"github.com/youtube/vitess/go/tb"
"github.com/youtube/vitess/go/vt/logutil"
vtrpcpb "github.com/youtube/vitess/go/vt/proto/vtrpc"
"github.com/youtube/vitess/go/vt/tabletmanager/tmclient"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/vt/vterrors"
"github.com/youtube/vitess/go/vt/wrangler"
"golang.org/x/net/context"
)
// Instance encapsulate the execution state of vtworker.
type Instance struct {
// Default wrangler for all operations.
// Users can specify their own in RunCommand() e.g. the gRPC server does this.
wr *wrangler.Wrangler
// mutex is protecting all the following variables
// 3 states here:
// - no job ever ran (or reset was run): currentWorker is nil,
// currentContext/currentCancelFunc is nil, lastRunError is nil
// - one worker running: currentWorker is set,
// currentContext/currentCancelFunc is set, lastRunError is nil
// - (at least) one worker already ran, none is running atm:
// currentWorker is set, currentContext is nil, lastRunError
// has the error returned by the worker.
currentWorkerMutex sync.Mutex
currentWorker Worker
currentMemoryLogger *logutil.MemoryLogger
currentContext context.Context
currentCancelFunc context.CancelFunc
lastRunError error
lastRunStopTime time.Time
// backgroundContext is context.Background() from main() which has to be plumbed through.
backgroundContext context.Context
topoServer topo.Server
cell string
commandDisplayInterval time.Duration
}
// NewInstance creates a new Instance.
func NewInstance(ctx context.Context, ts topo.Server, cell string, commandDisplayInterval time.Duration) *Instance {
wi := &Instance{backgroundContext: ctx, topoServer: ts, cell: cell, commandDisplayInterval: commandDisplayInterval}
// Note: setAndStartWorker() also adds a MemoryLogger for the webserver.
wi.wr = wi.CreateWrangler(logutil.NewConsoleLogger())
return wi
}
// CreateWrangler creates a new wrangler using the instance specific configuration.
func (wi *Instance) CreateWrangler(logger logutil.Logger) *wrangler.Wrangler {
return wrangler.New(logger, wi.topoServer, tmclient.NewTabletManagerClient())
}
// setAndStartWorker will set the current worker.
// We always log to both memory logger (for display on the web) and
// console logger (for records / display of command line worker).
func (wi *Instance) setAndStartWorker(wrk Worker, wr *wrangler.Wrangler) (chan struct{}, error) {
wi.currentWorkerMutex.Lock()
defer wi.currentWorkerMutex.Unlock()
if wi.currentContext != nil {
return nil, vterrors.FromError(vtrpcpb.ErrorCode_TRANSIENT_ERROR,
fmt.Errorf("A worker job is already in progress: %v", wi.currentWorker))
}
if wi.currentWorker != nil {
// During the grace period, we answer with a retryable error.
const gracePeriod = 1 * time.Minute
gracePeriodEnd := time.Now().Add(gracePeriod)
if wi.lastRunStopTime.Before(gracePeriodEnd) {
return nil, vterrors.FromError(vtrpcpb.ErrorCode_TRANSIENT_ERROR,
fmt.Errorf("A worker job was recently stopped (%f seconds ago): %v",
time.Now().Sub(wi.lastRunStopTime).Seconds(),
wi.currentWorker))
}
// QUERY_NOT_SERVED = FailedPrecondition => manual resolution required.
return nil, vterrors.FromError(vtrpcpb.ErrorCode_QUERY_NOT_SERVED,
fmt.Errorf("The worker job was stopped %.1f minutes ago, but not reset. You have to reset it manually. Job: %v",
time.Now().Sub(wi.lastRunStopTime).Minutes(),
wi.currentWorker))
}
wi.currentWorker = wrk
wi.currentMemoryLogger = logutil.NewMemoryLogger()
wi.currentContext, wi.currentCancelFunc = context.WithCancel(wi.backgroundContext)
wi.lastRunError = nil
wi.lastRunStopTime = time.Unix(0, 0)
done := make(chan struct{})
wranglerLogger := wr.Logger()
if wr == wi.wr {
// If it's the default wrangler, do not reuse its logger because it may have been set before.
// Resuing it would result into an endless recursion.
wranglerLogger = logutil.NewConsoleLogger()
}
wr.SetLogger(logutil.NewTeeLogger(wi.currentMemoryLogger, wranglerLogger))
// one go function runs the worker, changes state when done
go func() {
log.Infof("Starting worker...")
var err error
// Catch all panics and always save the execution state at the end.
defer func() {
// The recovery code is a copy of servenv.HandlePanic().
if x := recover(); x != nil {
log.Errorf("uncaught vtworker panic: %v\n%s", x, tb.Stack(4))
err = fmt.Errorf("uncaught vtworker panic: %v", x)
}
wi.currentWorkerMutex.Lock()
wi.currentContext = nil
wi.currentCancelFunc = nil
wi.lastRunError = err
wi.lastRunStopTime = time.Now()
wi.currentWorkerMutex.Unlock()
close(done)
}()
// run will take a long time
err = wrk.Run(wi.currentContext)
}()
return done, nil
}
// InstallSignalHandlers installs signal handler which exit vtworker gracefully.
func (wi *Instance) InstallSignalHandlers() {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT)
go func() {
for s := range sigChan {
// We got a signal, notify our modules.
// Use an extra function to properly unlock using defer.
func() {
wi.currentWorkerMutex.Lock()
defer wi.currentWorkerMutex.Unlock()
if wi.currentCancelFunc != nil {
log.Infof("Trying to cancel current worker after receiving signal: %v", s)
wi.currentCancelFunc()
} else {
log.Infof("Shutting down idle worker after receiving signal: %v", s)
os.Exit(0)
}
}()
}
}()
}
// Reset resets the state of a finished worker. It returns an error if the worker is still running.
func (wi *Instance) Reset() error {
wi.currentWorkerMutex.Lock()
defer wi.currentWorkerMutex.Unlock()
if wi.currentWorker == nil {
return nil
}
// check the worker is really done
if wi.currentContext == nil {
wi.currentWorker = nil
wi.currentMemoryLogger = nil
return nil
}
return errors.New("worker still executing")
}
// Cancel calls the cancel function of the current vtworker job.
// It returns true, if a job was running. False otherwise.
// NOTE: Cancel won't reset the state as well. Use Reset() to do so.
func (wi *Instance) Cancel() bool {
wi.currentWorkerMutex.Lock()
if wi.currentWorker == nil || wi.currentCancelFunc == nil {
wi.currentWorkerMutex.Unlock()
return false
}
cancel := wi.currentCancelFunc
wi.currentWorkerMutex.Unlock()
cancel()
return true
}