Skip to content

Commit

Permalink
Add support for liveness in native mode (#806)
Browse files Browse the repository at this point in the history
Co-authored-by: hanpengfei01 <hanpengfei01@baidu.com>
  • Loading branch information
hannatao and hanpengfei01 committed Nov 16, 2023
1 parent bb135b9 commit e6e836d
Show file tree
Hide file tree
Showing 16 changed files with 639 additions and 72 deletions.
10 changes: 2 additions & 8 deletions ami/kube/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/baetyl/baetyl/v2/ami"
"github.com/baetyl/baetyl/v2/config"
"github.com/baetyl/baetyl/v2/utils"
)

type kubeImpl struct {
Expand Down Expand Up @@ -93,13 +94,6 @@ func (k *kubeImpl) ApplyApp(ns string, app specv1.Application, cfgs map[string]s
return nil
}

func makeKey(kind specv1.Kind, name, ver string) string {
if name == "" || ver == "" {
return ""
}
return string(kind) + "-" + name + "-" + ver
}

func (k *kubeImpl) DeleteApp(ns string, app specv1.AppInfo) error {
if ns == context.EdgeNamespace() {
err := k.DeleteHelm(app.Name)
Expand All @@ -109,7 +103,7 @@ func (k *kubeImpl) DeleteApp(ns string, app specv1.AppInfo) error {
}
}
delApp := new(specv1.Application)
key := makeKey(specv1.KindApplication, app.Name, app.Version)
key := utils.MakeKey(specv1.KindApplication, app.Name, app.Version)
err := k.store.Get(key, delApp)
if err != nil {
return err
Expand Down
4 changes: 3 additions & 1 deletion ami/kube/kube_yaml.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"
"k8s.io/client-go/kubernetes/scheme"

"github.com/baetyl/baetyl/v2/utils"
)

const (
Expand Down Expand Up @@ -66,7 +68,7 @@ func (k *kubeImpl) DeleteYaml(app *specv1.Application) error {
cfgs := make(map[string]specv1.Configuration)
for _, v := range app.Volumes {
if cfg := v.VolumeSource.Config; cfg != nil {
key := makeKey(specv1.KindConfiguration, cfg.Name, cfg.Version)
key := utils.MakeKey(specv1.KindConfiguration, cfg.Name, cfg.Version)
if key == "" {
return errors.Errorf("failed to get config name: (%s) version: (%s)", cfg.Name, cfg.Version)
}
Expand Down
18 changes: 15 additions & 3 deletions ami/native/native.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ import (
"gopkg.in/yaml.v2"

"github.com/baetyl/baetyl/v2/ami"
"github.com/baetyl/baetyl/v2/ami/native/prober"
"github.com/baetyl/baetyl/v2/config"
"github.com/baetyl/baetyl/v2/program"
utilsV2 "github.com/baetyl/baetyl/v2/utils"
)

var (
Expand Down Expand Up @@ -67,10 +69,12 @@ type nativeImpl struct {
hostPathLib string
mapping *native.ServiceMapping
portAllocator *native.PortAllocator
probeManager prober.Manager
store *bh.Store
log *log.Logger
}

func newNativeImpl(cfg config.AmiConfig, _ *bh.Store) (ami.AMI, error) {
func newNativeImpl(cfg config.AmiConfig, store *bh.Store) (ami.AMI, error) {
hostPathLib, err := v2context.HostPathLib()
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -89,6 +93,8 @@ func newNativeImpl(cfg config.AmiConfig, _ *bh.Store) (ami.AMI, error) {
hostPathLib: hostPathLib,
mapping: mapping,
portAllocator: portAllocator,
probeManager: prober.NewManager(store),
store: store,
log: log.With(log.Any("ami", "native")),
}, nil
}
Expand Down Expand Up @@ -352,18 +358,19 @@ func (impl *nativeImpl) ApplyApp(ns string, app v1.Application, configs map[stri
return errors.Trace(err)
}
}
impl.probeManager.AddApp(svc, &app)
}

if len(ports) > 0 {
if app.Type == v1.AppTypeContainer {
err := impl.mapping.SetServicePorts(s.Name, ports)
err = impl.mapping.SetServicePorts(s.Name, ports)
if err != nil {
return errors.Trace(err)
}
impl.log.Debug("set applied service ports in mapping files", log.Any("applied service", s.Name), log.Any("ports", ports))
} else {
// native function use app name
err := impl.mapping.SetServicePorts(app.Name, ports)
err = impl.mapping.SetServicePorts(app.Name, ports)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -437,6 +444,7 @@ func (impl *nativeImpl) DeleteApp(ns string, app v1.AppInfo) error {
}
}
}
impl.probeManager.RemoveApp(&app)
if err = svc.Stop(); err != nil {
impl.log.Warn("failed to stop old app", log.Error(err))
}
Expand Down Expand Up @@ -495,6 +503,7 @@ func (impl *nativeImpl) StatsApps(ns string) ([]v1.AppStats, error) {
if err != nil {
return nil, errors.Trace(err)
}
apps := make(map[string]bool)
for _, appFile := range appFiles {
if !appFile.IsDir() {
continue
Expand Down Expand Up @@ -567,6 +576,8 @@ func (impl *nativeImpl) StatsApps(ns string) ([]v1.AppStats, error) {
curAppStats.InstanceStats = curInsStats
continue
}
apps[utilsV2.MakeKey(v1.KindApplication, curAppName, curAppVer)] = true
impl.probeManager.CheckAndStart(svc, &v1.AppInfo{Name: curAppName, Version: curAppVer})

status, err := svc.Status()
if err != nil || status != service.StatusRunning {
Expand Down Expand Up @@ -621,6 +632,7 @@ func (impl *nativeImpl) StatsApps(ns string) ([]v1.AppStats, error) {
}
}
}
impl.probeManager.CleanupApps(apps)
return stats, nil
}

Expand Down
138 changes: 138 additions & 0 deletions ami/native/prober/http_probe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package prober

import (
"crypto/tls"
"fmt"
"net/http"
"net/url"
"time"

"github.com/baetyl/baetyl-go/v2/errors"
"github.com/baetyl/baetyl-go/v2/log"
"k8s.io/component-base/version"
utilio "k8s.io/utils/io"
)

const (
maxRespBodyLength = 10 * 1 << 10 // 10KB
)

// NewHTTPProber creates Prober that will skip TLS verification while probing.
// followNonLocalRedirects configures whether the prober should follow redirects to a different hostname.
//
// If disabled, redirects to other hosts will trigger a warning result.
func NewHTTPProber(followNonLocalRedirects bool) HTTPProber {
tlsConfig := &tls.Config{InsecureSkipVerify: true}
return NewWithTLSConfig(tlsConfig, followNonLocalRedirects)
}

// NewWithTLSConfig takes tls config as parameter.
// followNonLocalRedirects configures whether the prober should follow redirects to a different hostname.
//
// If disabled, redirects to other hosts will trigger a warning result.
func NewWithTLSConfig(config *tls.Config, followNonLocalRedirects bool) HTTPProber {
// We do not want the probe use node's local proxy set.
transport := &http.Transport{
TLSClientConfig: config,
DisableKeepAlives: true,
Proxy: http.ProxyURL(nil),
}
return httpProber{transport, followNonLocalRedirects}
}

// HTTPProber is an interface that defines the Probe function for doing HTTP readiness/liveness checks.
type HTTPProber interface {
Probe(url *url.URL, headers http.Header, timeout time.Duration) (ProbeResult, string, error)
}

type httpProber struct {
transport *http.Transport
followNonLocalRedirects bool
}

// Probe returns a ProbeRunner capable of running an HTTP check.
func (pr httpProber) Probe(url *url.URL, headers http.Header, timeout time.Duration) (ProbeResult, string, error) {
pr.transport.DisableCompression = true // removes Accept-Encoding header
client := &http.Client{
Timeout: timeout,
Transport: pr.transport,
CheckRedirect: redirectChecker(pr.followNonLocalRedirects),
}
return DoHTTPProbe(url, headers, client)
}

// GetHTTPInterface is an interface for making HTTP requests, that returns a response and error.
type GetHTTPInterface interface {
Do(req *http.Request) (*http.Response, error)
}

// DoHTTPProbe checks if a GET request to the url succeeds.
// If the HTTP response code is successful (i.e. 400 > code >= 200), it returns Success.
// If the HTTP response code is unsuccessful or HTTP communication fails, it returns Failure.
// This is exported because some other packages may want to do direct HTTP probes.
func DoHTTPProbe(url *url.URL, headers http.Header, client GetHTTPInterface) (ProbeResult, string, error) {
req, err := http.NewRequest("GET", url.String(), nil)
if err != nil {
// Convert errors into failures to catch timeouts.
return Failure, err.Error(), nil
}
if headers == nil {
headers = http.Header{}
}
if _, ok := headers["User-Agent"]; !ok {
// explicitly set User-Agent so it's not set to default Go value
v := version.Get()
headers.Set("User-Agent", fmt.Sprintf("kube-probe/%s.%s", v.Major, v.Minor))
}
if _, ok := headers["Accept"]; !ok {
// Accept header was not defined. accept all
headers.Set("Accept", "*/*")
} else if headers.Get("Accept") == "" {
// Accept header was overridden but is empty. removing
headers.Del("Accept")
}
req.Header = headers
req.Host = headers.Get("Host")
res, err := client.Do(req)
if err != nil {
// Convert errors into failures to catch timeouts.
return Failure, err.Error(), nil
}
defer res.Body.Close()
b, err := utilio.ReadAtMost(res.Body, maxRespBodyLength)
if err != nil {
if err == utilio.ErrLimitReached {
log.L().Debug(fmt.Sprintf("Non fatal body truncation for %s, Response: %v", url.String(), *res))
} else {
return Failure, "", err
}
}
body := string(b)
if res.StatusCode >= http.StatusOK && res.StatusCode < http.StatusBadRequest {
if res.StatusCode >= http.StatusMultipleChoices { // Redirect
log.L().Warn(fmt.Sprintf("Probe succeeded for %s, Response: %v", url.String(), *res))
return Warning, body, nil
}
log.L().Debug(fmt.Sprintf("Probe succeeded for %s, Response: %v", url.String(), *res))
return Success, body, nil
}
log.L().Warn(fmt.Sprintf("Probe failed for %s with request headers %v, response body: %v", url.String(), headers, body))
return Failure, fmt.Sprintf("HTTP probe failed with statuscode: %d", res.StatusCode), nil
}

func redirectChecker(followNonLocalRedirects bool) func(*http.Request, []*http.Request) error {
if followNonLocalRedirects {
return nil // Use the default http client checker.
}

return func(req *http.Request, via []*http.Request) error {
if req.URL.Hostname() != via[0].URL.Hostname() {
return http.ErrUseLastResponse
}
// Default behavior: stop after 10 redirects.
if len(via) >= 10 {
return errors.New("stopped after 10 redirects")
}
return nil
}
}
Loading

0 comments on commit e6e836d

Please sign in to comment.