Skip to content

Commit

Permalink
Add support for startup probe (#808)
Browse files Browse the repository at this point in the history
Co-authored-by: hanpengfei01 <hanpengfei01@baidu.com>
  • Loading branch information
hannatao and hanpengfei01 committed Nov 22, 2023
1 parent b0dd868 commit 57214a7
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 35 deletions.
20 changes: 10 additions & 10 deletions ami/native/prober/prober.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,22 @@ const (

// Prober helps to check the liveness/readiness/startup of a container.
type prober struct {
// probe types needs different httpprobe instances so they don't
// probe types needs different httpProbe instances, so they don't
// share a connection pool which can cause collisions to the
// same host:port and transient failures. See #49740.
livenessHTTP HTTPProber
tcp TCPProber
http HTTPProber
tcp TCPProber
}

func newProber() *prober {
const followNonLocalRedirects = false
return &prober{
livenessHTTP: NewHTTPProber(followNonLocalRedirects),
tcp: NewTCPProber(),
http: NewHTTPProber(followNonLocalRedirects),
tcp: NewTCPProber(),
}
}

func (pb *prober) probe(appName string, p *v1.Probe) (ProbeResult, error) {
func (pb *prober) probe(appName *probeKey, p *v1.Probe) (ProbeResult, error) {
var err error
var output string
result := Unknown
Expand All @@ -73,7 +73,7 @@ func (pb *prober) probe(appName string, p *v1.Probe) (ProbeResult, error) {
return Success, nil
}

func (pb *prober) runProbe(appName string, p *v1.Probe) (ProbeResult, string, error) {
func (pb *prober) runProbe(key *probeKey, p *v1.Probe) (ProbeResult, string, error) {
timeout := time.Duration(p.TimeoutSeconds) * time.Second
if p.HTTPGet != nil {
scheme := strings.ToLower(string(p.HTTPGet.Scheme))
Expand All @@ -87,9 +87,9 @@ func (pb *prober) runProbe(appName string, p *v1.Probe) (ProbeResult, string, er
}
path := p.HTTPGet.Path
log.L().Debug("HTTP-Probe Host", log.Any("host", host), log.Any("port", port), log.Any("path", path))
url := formatURL(scheme, host, port.IntValue(), path)
u := formatURL(scheme, host, port.IntValue(), path)
headers := buildHeader(p.HTTPGet.HTTPHeaders)
return pb.livenessHTTP.Probe(url, headers, timeout)
return pb.http.Probe(u, headers, timeout)
}
if p.TCPSocket != nil {
port := p.HTTPGet.Port
Expand All @@ -103,7 +103,7 @@ func (pb *prober) runProbe(appName string, p *v1.Probe) (ProbeResult, string, er
log.L().Debug("TCP-Probe Host", log.Any("host", host), log.Any("port", port))
return pb.tcp.Probe(host, port.IntValue(), timeout)
}
return Unknown, "", fmt.Errorf("missing probe handler for %s", appName)
return Unknown, "", fmt.Errorf("missing probe handler for %v", key)
}

// formatURL formats a URL from args. For testability.
Expand Down
38 changes: 27 additions & 11 deletions ami/native/prober/prober_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,39 +22,47 @@ type Manager interface {
}

type manager struct {
workers map[string]*worker
workers map[probeKey]*worker
workerLock sync.RWMutex
start time.Time
log *log.Logger
// prober executes the probe actions.
prober *prober
store *bolthold.Store
// count when collecting process status, if count >=maxProbeRetries, stop worker
status map[string]int
status map[probeKey]int
}

func NewManager(store *bolthold.Store) Manager {
return &manager{
workers: make(map[string]*worker),
workers: make(map[probeKey]*worker),
start: clock.RealClock{}.Now(),
prober: newProber(),
store: store,
status: make(map[string]int),
status: make(map[probeKey]int),
log: log.With(log.Any("native", "probe")),
}
}

func (m *manager) AddApp(svc service.Service, app *v1.Application) {
if app == nil || len(app.Services) == 0 || app.Services[0].LivenessProbe == nil {
if app == nil || len(app.Services) == 0 {
return
}
var p probeType
if app.Services[0].LivenessProbe != nil {
p = liveness
} else if app.Services[0].StartupProbe != nil {
p = startup
} else {
return
}
m.workerLock.Lock()
defer m.workerLock.Unlock()
key := utils.MakeKey(v1.KindApplication, app.Name, app.Version)
key := probeKey{Name: app.Name, Version: app.Version, ProbeType: p}
if _, ok := m.workers[key]; ok {
return
}
w := newWorker(m, svc, app)
w := newWorker(m, svc, p, app)
m.workers[key] = w
m.status[key] = 0
m.log.Debug("add app", log.Any("app", key))
Expand All @@ -67,12 +75,17 @@ func (m *manager) RemoveApp(app *v1.AppInfo) {
}
m.workerLock.Lock()
defer m.workerLock.Unlock()
key := utils.MakeKey(v1.KindApplication, app.Name, app.Version)
if w, ok := m.workers[key]; ok {
livenessKey := probeKey{Name: app.Name, Version: app.Version, ProbeType: liveness}
if w, ok := m.workers[livenessKey]; ok {
w.stop()
}
startupKey := probeKey{Name: app.Name, Version: app.Version, ProbeType: startup}
if w, ok := m.workers[startupKey]; ok {
w.stop()
}
}

// CheckAndStart This is used for restarting baetyl-core, due to applying apps will not be called.
func (m *manager) CheckAndStart(svc service.Service, info *v1.AppInfo) {
if strings.HasPrefix(info.Name, v1.BaetylCore) || strings.HasPrefix(info.Name, v1.BaetylInit) {
return
Expand All @@ -87,11 +100,14 @@ func (m *manager) CheckAndStart(svc service.Service, info *v1.AppInfo) {
m.AddApp(svc, app)
}

// CleanupApps removes the workers when collecting the process status.
// Only if count >=maxProbeRetries, stop worker
func (m *manager) CleanupApps(apps map[string]bool) {
m.workerLock.Lock()
defer m.workerLock.Unlock()
for key, w := range m.workers {
if _, ok := apps[key]; !ok {
k := utils.MakeKey(v1.KindApplication, key.Name, key.Version)
if _, ok := apps[k]; !ok {
m.status[key]++
if m.status[key] >= maxProbeRetries {
m.log.Debug("remove app", log.Any("key", key), log.Any("apps", apps))
Expand All @@ -105,7 +121,7 @@ func (m *manager) CleanupApps(apps map[string]bool) {
}

// Called by the worker after exiting.
func (m *manager) removeWorker(name string) {
func (m *manager) removeWorker(name probeKey) {
m.workerLock.Lock()
defer m.workerLock.Unlock()
delete(m.workers, name)
Expand Down
46 changes: 35 additions & 11 deletions ami/native/prober/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,28 @@ import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/utils/clock"
)

// Type of probe (liveness, readiness or startup)
type probeType int

"github.com/baetyl/baetyl/v2/utils"
const (
liveness probeType = iota
startup
)

type probeKey struct {
Name string
Version string
ProbeType probeType
}

type worker struct {
// Channel for stopping the probe.
stopCh chan struct{}
// Describes the probe configuration (read-only)
spec *v1.Probe
spec *v1.Probe
probeType probeType
// The process to probe
svc service.Service
app *specV1.Application
Expand All @@ -31,13 +44,14 @@ type worker struct {
resultRun int
}

func newWorker(m *manager, svc service.Service, app *specV1.Application) *worker {
func newWorker(m *manager, svc service.Service, probeType probeType, app *specV1.Application) *worker {
return &worker{
stopCh: make(chan struct{}, 1), // Buffer so stop() can be non-blocking.
spec: app.Services[0].LivenessProbe, // native only support one service
app: app,
svc: svc,
probeManager: m,
probeType: probeType,
log: m.log,
startedAt: clock.RealClock{}.Now(),
}
Expand All @@ -56,7 +70,7 @@ func (w *worker) run() {
defer func() {
// Clean up.
probeTicker.Stop()
key := utils.MakeKey(specV1.KindApplication, w.app.Name, w.app.Version)
key := probeKey{Name: w.app.Name, Version: w.app.Version, ProbeType: w.probeType}
w.probeManager.removeWorker(key)
}()
probeLoop:
Expand Down Expand Up @@ -84,16 +98,26 @@ func (w *worker) doProbe() (keepGoing bool) {
defer func() { recover() }() // Actually eat panics (HandleCrash takes care of logging)
defer runtime.HandleCrash(func(_ interface{}) { keepGoing = true })

key := utils.MakeKey(specV1.KindApplication, w.app.Name, w.app.Version)
key := &probeKey{Name: w.app.Name, Version: w.app.Version, ProbeType: w.probeType}
status, ok := w.svc.Status()
if ok != nil || status == service.StatusUnknown {
w.log.Debug("No status for process", log.Any("app", key))
if ok != nil {
w.log.Debug("No status for process", log.Any("key", key))
return true
}
if status == service.StatusStopped {
w.log.Debug("Process is terminated, exiting probe worker", log.Any("app", key))
w.log.Debug("Process is terminated, exiting probe worker", log.Any("key", key))
return false
}
// Stop probing for liveness until process has started.
if w.probeType == liveness && status == service.StatusUnknown {
w.log.Debug("No status for process", log.Any("key", key))
return true
}
// Stop probing for startup once process has started.
// we keep it running to make sure it will work for restarted process.
if w.probeType == startup && status == service.StatusRunning {
return true
}
// Probe disabled for InitialDelaySeconds.
if int32(time.Since(w.startedAt).Seconds()) < w.spec.InitialDelaySeconds {
return true
Expand All @@ -116,12 +140,12 @@ func (w *worker) doProbe() (keepGoing bool) {
return true
}
if result == Failure {
// The process fails a liveness check, it will need to be restarted.
// The process fails a check, it will need to be restarted.
// Stop probing and restart the process.
w.log.Warn("Process failed liveness probe, restarting", log.Any("app", key))
w.log.Warn("Process failed probe, restarting", log.Any("key", key))
err = w.svc.Restart()
if err != nil {
w.log.Error("Failed to restart process", log.Any("app", key), log.Error(err))
w.log.Error("Failed to restart process", log.Any("key", key), log.Error(err))
}
w.resultRun = 0
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.18
replace github.com/kardianos/service => github.com/baetyl/service v1.2.3-0.20221205070704-85cb455aa3a3

require (
github.com/baetyl/baetyl-go/v2 v2.2.4-0.20231114062225-6e5e78d5f19a
github.com/baetyl/baetyl-go/v2 v2.2.4-0.20231121032505-22cbbc571abd
github.com/golang/mock v1.6.0
github.com/gorilla/websocket v1.4.2
github.com/imdario/mergo v0.3.13
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHG
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
github.com/asaskevich/govalidator v0.0.0-20200428143746-21a406dcc535 h1:4daAzAu0S6Vi7/lbWECcX0j45yZReDZ56BQsrVBOEEY=
github.com/asaskevich/govalidator v0.0.0-20200428143746-21a406dcc535/go.mod h1:oGkLhpf+kjZl6xBf758TQhh5XrAeiJv/7FRz/2spLIg=
github.com/baetyl/baetyl-go/v2 v2.2.4-0.20231114062225-6e5e78d5f19a h1:2nXCO/u+mpQ2PkECRGtf+FjP2xLMdGZBP4IMkFLVcTo=
github.com/baetyl/baetyl-go/v2 v2.2.4-0.20231114062225-6e5e78d5f19a/go.mod h1:tZo5qNUjoPcTBDBF61Ubdg92SAzFAHhtguYi6iRlblY=
github.com/baetyl/baetyl-go/v2 v2.2.4-0.20231121032505-22cbbc571abd h1:gMcbRlLHqWRhq3TvneZrwnwngkbscf0LOCeL1J2G074=
github.com/baetyl/baetyl-go/v2 v2.2.4-0.20231121032505-22cbbc571abd/go.mod h1:tZo5qNUjoPcTBDBF61Ubdg92SAzFAHhtguYi6iRlblY=
github.com/baetyl/service v1.2.3-0.20221205070704-85cb455aa3a3 h1:vVNBMGCt01dwNwF2wtctC4uUbfgW6eXUG9ZicajIfAs=
github.com/baetyl/service v1.2.3-0.20221205070704-85cb455aa3a3/go.mod h1:CIMRFEJVL+0DS1a3Nx06NaMn4Dz63Ng6O7dl0qH0zVM=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
Expand Down

0 comments on commit 57214a7

Please sign in to comment.