diff --git a/ami/native/prober/prober.go b/ami/native/prober/prober.go index 13bb67ca..f4cc14b4 100644 --- a/ami/native/prober/prober.go +++ b/ami/native/prober/prober.go @@ -33,22 +33,22 @@ const ( // Prober helps to check the liveness/readiness/startup of a container. type prober struct { - // probe types needs different httpprobe instances so they don't + // probe types needs different httpProbe instances, so they don't // share a connection pool which can cause collisions to the // same host:port and transient failures. See #49740. - livenessHTTP HTTPProber - tcp TCPProber + http HTTPProber + tcp TCPProber } func newProber() *prober { const followNonLocalRedirects = false return &prober{ - livenessHTTP: NewHTTPProber(followNonLocalRedirects), - tcp: NewTCPProber(), + http: NewHTTPProber(followNonLocalRedirects), + tcp: NewTCPProber(), } } -func (pb *prober) probe(appName string, p *v1.Probe) (ProbeResult, error) { +func (pb *prober) probe(appName *probeKey, p *v1.Probe) (ProbeResult, error) { var err error var output string result := Unknown @@ -73,7 +73,7 @@ func (pb *prober) probe(appName string, p *v1.Probe) (ProbeResult, error) { return Success, nil } -func (pb *prober) runProbe(appName string, p *v1.Probe) (ProbeResult, string, error) { +func (pb *prober) runProbe(key *probeKey, p *v1.Probe) (ProbeResult, string, error) { timeout := time.Duration(p.TimeoutSeconds) * time.Second if p.HTTPGet != nil { scheme := strings.ToLower(string(p.HTTPGet.Scheme)) @@ -87,9 +87,9 @@ func (pb *prober) runProbe(appName string, p *v1.Probe) (ProbeResult, string, er } path := p.HTTPGet.Path log.L().Debug("HTTP-Probe Host", log.Any("host", host), log.Any("port", port), log.Any("path", path)) - url := formatURL(scheme, host, port.IntValue(), path) + u := formatURL(scheme, host, port.IntValue(), path) headers := buildHeader(p.HTTPGet.HTTPHeaders) - return pb.livenessHTTP.Probe(url, headers, timeout) + return pb.http.Probe(u, headers, timeout) } if p.TCPSocket != nil { port := p.HTTPGet.Port @@ -103,7 +103,7 @@ func (pb *prober) runProbe(appName string, p *v1.Probe) (ProbeResult, string, er log.L().Debug("TCP-Probe Host", log.Any("host", host), log.Any("port", port)) return pb.tcp.Probe(host, port.IntValue(), timeout) } - return Unknown, "", fmt.Errorf("missing probe handler for %s", appName) + return Unknown, "", fmt.Errorf("missing probe handler for %v", key) } // formatURL formats a URL from args. For testability. diff --git a/ami/native/prober/prober_manager.go b/ami/native/prober/prober_manager.go index c9058293..b8bfdf28 100644 --- a/ami/native/prober/prober_manager.go +++ b/ami/native/prober/prober_manager.go @@ -22,7 +22,7 @@ type Manager interface { } type manager struct { - workers map[string]*worker + workers map[probeKey]*worker workerLock sync.RWMutex start time.Time log *log.Logger @@ -30,31 +30,39 @@ type manager struct { prober *prober store *bolthold.Store // count when collecting process status, if count >=maxProbeRetries, stop worker - status map[string]int + status map[probeKey]int } func NewManager(store *bolthold.Store) Manager { return &manager{ - workers: make(map[string]*worker), + workers: make(map[probeKey]*worker), start: clock.RealClock{}.Now(), prober: newProber(), store: store, - status: make(map[string]int), + status: make(map[probeKey]int), log: log.With(log.Any("native", "probe")), } } func (m *manager) AddApp(svc service.Service, app *v1.Application) { - if app == nil || len(app.Services) == 0 || app.Services[0].LivenessProbe == nil { + if app == nil || len(app.Services) == 0 { + return + } + var p probeType + if app.Services[0].LivenessProbe != nil { + p = liveness + } else if app.Services[0].StartupProbe != nil { + p = startup + } else { return } m.workerLock.Lock() defer m.workerLock.Unlock() - key := utils.MakeKey(v1.KindApplication, app.Name, app.Version) + key := probeKey{Name: app.Name, Version: app.Version, ProbeType: p} if _, ok := m.workers[key]; ok { return } - w := newWorker(m, svc, app) + w := newWorker(m, svc, p, app) m.workers[key] = w m.status[key] = 0 m.log.Debug("add app", log.Any("app", key)) @@ -67,12 +75,17 @@ func (m *manager) RemoveApp(app *v1.AppInfo) { } m.workerLock.Lock() defer m.workerLock.Unlock() - key := utils.MakeKey(v1.KindApplication, app.Name, app.Version) - if w, ok := m.workers[key]; ok { + livenessKey := probeKey{Name: app.Name, Version: app.Version, ProbeType: liveness} + if w, ok := m.workers[livenessKey]; ok { + w.stop() + } + startupKey := probeKey{Name: app.Name, Version: app.Version, ProbeType: startup} + if w, ok := m.workers[startupKey]; ok { w.stop() } } +// CheckAndStart This is used for restarting baetyl-core, due to applying apps will not be called. func (m *manager) CheckAndStart(svc service.Service, info *v1.AppInfo) { if strings.HasPrefix(info.Name, v1.BaetylCore) || strings.HasPrefix(info.Name, v1.BaetylInit) { return @@ -87,11 +100,14 @@ func (m *manager) CheckAndStart(svc service.Service, info *v1.AppInfo) { m.AddApp(svc, app) } +// CleanupApps removes the workers when collecting the process status. +// Only if count >=maxProbeRetries, stop worker func (m *manager) CleanupApps(apps map[string]bool) { m.workerLock.Lock() defer m.workerLock.Unlock() for key, w := range m.workers { - if _, ok := apps[key]; !ok { + k := utils.MakeKey(v1.KindApplication, key.Name, key.Version) + if _, ok := apps[k]; !ok { m.status[key]++ if m.status[key] >= maxProbeRetries { m.log.Debug("remove app", log.Any("key", key), log.Any("apps", apps)) @@ -105,7 +121,7 @@ func (m *manager) CleanupApps(apps map[string]bool) { } // Called by the worker after exiting. -func (m *manager) removeWorker(name string) { +func (m *manager) removeWorker(name probeKey) { m.workerLock.Lock() defer m.workerLock.Unlock() delete(m.workers, name) diff --git a/ami/native/prober/worker.go b/ami/native/prober/worker.go index b64a9684..c6ecdab9 100644 --- a/ami/native/prober/worker.go +++ b/ami/native/prober/worker.go @@ -10,15 +10,28 @@ import ( "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/utils/clock" +) + +// Type of probe (liveness, readiness or startup) +type probeType int - "github.com/baetyl/baetyl/v2/utils" +const ( + liveness probeType = iota + startup ) +type probeKey struct { + Name string + Version string + ProbeType probeType +} + type worker struct { // Channel for stopping the probe. stopCh chan struct{} // Describes the probe configuration (read-only) - spec *v1.Probe + spec *v1.Probe + probeType probeType // The process to probe svc service.Service app *specV1.Application @@ -31,13 +44,14 @@ type worker struct { resultRun int } -func newWorker(m *manager, svc service.Service, app *specV1.Application) *worker { +func newWorker(m *manager, svc service.Service, probeType probeType, app *specV1.Application) *worker { return &worker{ stopCh: make(chan struct{}, 1), // Buffer so stop() can be non-blocking. spec: app.Services[0].LivenessProbe, // native only support one service app: app, svc: svc, probeManager: m, + probeType: probeType, log: m.log, startedAt: clock.RealClock{}.Now(), } @@ -56,7 +70,7 @@ func (w *worker) run() { defer func() { // Clean up. probeTicker.Stop() - key := utils.MakeKey(specV1.KindApplication, w.app.Name, w.app.Version) + key := probeKey{Name: w.app.Name, Version: w.app.Version, ProbeType: w.probeType} w.probeManager.removeWorker(key) }() probeLoop: @@ -84,16 +98,26 @@ func (w *worker) doProbe() (keepGoing bool) { defer func() { recover() }() // Actually eat panics (HandleCrash takes care of logging) defer runtime.HandleCrash(func(_ interface{}) { keepGoing = true }) - key := utils.MakeKey(specV1.KindApplication, w.app.Name, w.app.Version) + key := &probeKey{Name: w.app.Name, Version: w.app.Version, ProbeType: w.probeType} status, ok := w.svc.Status() - if ok != nil || status == service.StatusUnknown { - w.log.Debug("No status for process", log.Any("app", key)) + if ok != nil { + w.log.Debug("No status for process", log.Any("key", key)) return true } if status == service.StatusStopped { - w.log.Debug("Process is terminated, exiting probe worker", log.Any("app", key)) + w.log.Debug("Process is terminated, exiting probe worker", log.Any("key", key)) return false } + // Stop probing for liveness until process has started. + if w.probeType == liveness && status == service.StatusUnknown { + w.log.Debug("No status for process", log.Any("key", key)) + return true + } + // Stop probing for startup once process has started. + // we keep it running to make sure it will work for restarted process. + if w.probeType == startup && status == service.StatusRunning { + return true + } // Probe disabled for InitialDelaySeconds. if int32(time.Since(w.startedAt).Seconds()) < w.spec.InitialDelaySeconds { return true @@ -116,12 +140,12 @@ func (w *worker) doProbe() (keepGoing bool) { return true } if result == Failure { - // The process fails a liveness check, it will need to be restarted. + // The process fails a check, it will need to be restarted. // Stop probing and restart the process. - w.log.Warn("Process failed liveness probe, restarting", log.Any("app", key)) + w.log.Warn("Process failed probe, restarting", log.Any("key", key)) err = w.svc.Restart() if err != nil { - w.log.Error("Failed to restart process", log.Any("app", key), log.Error(err)) + w.log.Error("Failed to restart process", log.Any("key", key), log.Error(err)) } w.resultRun = 0 } diff --git a/go.mod b/go.mod index 96781aac..c83ecf86 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.18 replace github.com/kardianos/service => github.com/baetyl/service v1.2.3-0.20221205070704-85cb455aa3a3 require ( - github.com/baetyl/baetyl-go/v2 v2.2.4-0.20231114062225-6e5e78d5f19a + github.com/baetyl/baetyl-go/v2 v2.2.4-0.20231121032505-22cbbc571abd github.com/golang/mock v1.6.0 github.com/gorilla/websocket v1.4.2 github.com/imdario/mergo v0.3.13 diff --git a/go.sum b/go.sum index 6d699826..754d1abb 100644 --- a/go.sum +++ b/go.sum @@ -35,8 +35,8 @@ github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHG github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= github.com/asaskevich/govalidator v0.0.0-20200428143746-21a406dcc535 h1:4daAzAu0S6Vi7/lbWECcX0j45yZReDZ56BQsrVBOEEY= github.com/asaskevich/govalidator v0.0.0-20200428143746-21a406dcc535/go.mod h1:oGkLhpf+kjZl6xBf758TQhh5XrAeiJv/7FRz/2spLIg= -github.com/baetyl/baetyl-go/v2 v2.2.4-0.20231114062225-6e5e78d5f19a h1:2nXCO/u+mpQ2PkECRGtf+FjP2xLMdGZBP4IMkFLVcTo= -github.com/baetyl/baetyl-go/v2 v2.2.4-0.20231114062225-6e5e78d5f19a/go.mod h1:tZo5qNUjoPcTBDBF61Ubdg92SAzFAHhtguYi6iRlblY= +github.com/baetyl/baetyl-go/v2 v2.2.4-0.20231121032505-22cbbc571abd h1:gMcbRlLHqWRhq3TvneZrwnwngkbscf0LOCeL1J2G074= +github.com/baetyl/baetyl-go/v2 v2.2.4-0.20231121032505-22cbbc571abd/go.mod h1:tZo5qNUjoPcTBDBF61Ubdg92SAzFAHhtguYi6iRlblY= github.com/baetyl/service v1.2.3-0.20221205070704-85cb455aa3a3 h1:vVNBMGCt01dwNwF2wtctC4uUbfgW6eXUG9ZicajIfAs= github.com/baetyl/service v1.2.3-0.20221205070704-85cb455aa3a3/go.mod h1:CIMRFEJVL+0DS1a3Nx06NaMn4Dz63Ng6O7dl0qH0zVM= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=