Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add health check to apisix-admin and make the leader election recyclable #499

Merged
merged 13 commits into from
May 26, 2021
6 changes: 3 additions & 3 deletions pkg/apisix/apisix.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type Cluster interface {
String() string
// HasSynced checks whether all resources in APISIX cluster is synced to cache.
HasSynced(context.Context) error
// HealthCheck checks apisix cluster health in realtime.
HealthCheck(context.Context) error
}

// Route is the specific client interface to take over the create, update,
Expand Down Expand Up @@ -122,6 +124,7 @@ type apisix struct {
func NewClient() (APISIX, error) {
cli := &apisix{
nonExistentCluster: newNonExistentCluster(),
clusters: make(map[string]Cluster),
}
return cli, nil
}
Expand Down Expand Up @@ -160,9 +163,6 @@ func (c *apisix) AddCluster(co *ClusterOptions) error {
if err != nil {
return err
}
if c.clusters == nil {
c.clusters = make(map[string]Cluster)
}
c.clusters[co.Name] = cluster
return nil
}
Expand Down
122 changes: 95 additions & 27 deletions pkg/apisix/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"net/url"
"strings"
"sync/atomic"
"time"
Expand All @@ -37,8 +39,7 @@ import (
const (
_defaultTimeout = 5 * time.Second

_cacheNotSync = iota
_cacheSyncing
_cacheSyncing = iota
_cacheSynced
)

Expand All @@ -50,6 +51,19 @@ var (
ErrDuplicatedCluster = errors.New("duplicated cluster")

_errReadOnClosedResBody = errors.New("http: read on closed response body")

// Default shared transport for apisix client
_defaultTransport = &http.Transport{
Proxy: http.ProxyFromEnvironment,
Dial: (&net.Dialer{
Timeout: 3 * time.Second,
}).Dial,
DialContext: (&net.Dialer{
Timeout: 3 * time.Second,
}).DialContext,
ResponseHeaderTimeout: 30 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}
)

// ClusterOptions contains parameters to customize APISIX client.
Expand All @@ -63,6 +77,7 @@ type ClusterOptions struct {
type cluster struct {
name string
baseURL string
baseURLHost string
adminKey string
cli *http.Client
cacheState int32
Expand All @@ -86,19 +101,21 @@ func newCluster(o *ClusterOptions) (Cluster, error) {
}
o.BaseURL = strings.TrimSuffix(o.BaseURL, "/")

u, err := url.Parse(o.BaseURL)
if err != nil {
return nil, err
}

c := &cluster{
name: o.Name,
baseURL: o.BaseURL,
adminKey: o.AdminKey,
name: o.Name,
baseURL: o.BaseURL,
baseURLHost: u.Host,
adminKey: o.AdminKey,
cli: &http.Client{
Timeout: o.Timeout,
Transport: &http.Transport{
ResponseHeaderTimeout: o.Timeout,
ExpectContinueTimeout: o.Timeout,
},
Timeout: o.Timeout,
Transport: _defaultTransport,
},
cache: nil,
cacheState: _cacheNotSync,
cacheState: _cacheSyncing, // default state
cacheSynced: make(chan struct{}),
}
c.route = newRouteClient(c)
Expand All @@ -108,6 +125,11 @@ func newCluster(o *ClusterOptions) (Cluster, error) {
c.globalRules = newGlobalRuleClient(c)
c.consumer = newConsumerClient(c)

c.cache, err = cache.NewMemDBCache()
if err != nil {
return nil, err
}

go c.syncCache()

return c, nil
Expand All @@ -116,9 +138,6 @@ func newCluster(o *ClusterOptions) (Cluster, error) {
func (c *cluster) syncCache() {
log.Infow("syncing cache", zap.String("cluster", c.name))
now := time.Now()
if !atomic.CompareAndSwapInt32(&c.cacheState, _cacheNotSync, _cacheSyncing) {
panic("dubious state when sync cache")
}
defer func() {
if c.cacheSyncErr == nil {
log.Infow("cache synced",
Expand All @@ -134,13 +153,20 @@ func (c *cluster) syncCache() {
}()

backoff := wait.Backoff{
Duration: time.Second,
Factor: 2,
Steps: 6,
}
err := wait.ExponentialBackoff(backoff, c.syncCacheOnce)
Duration: 2 * time.Second,
Factor: 1,
Steps: 5,
}
var lastSyncErr error
err := wait.ExponentialBackoff(backoff, func() (done bool, _ error) {
// impossibly return: false, nil
// so can safe used
done, lastSyncErr = c.syncCacheOnce()
return
})
if err != nil {
c.cacheSyncErr = err
// if ErrWaitTimeout then set lastSyncErr
c.cacheSyncErr = lastSyncErr
}
close(c.cacheSynced)

Expand All @@ -150,12 +176,6 @@ func (c *cluster) syncCache() {
}

func (c *cluster) syncCacheOnce() (bool, error) {
dbcache, err := cache.NewMemDBCache()
if err != nil {
return false, err
}
c.cache = dbcache

routes, err := c.route.List(context.TODO())
if err != nil {
log.Errorf("failed to list route in APISIX: %s", err)
Expand Down Expand Up @@ -306,6 +326,54 @@ func (c *cluster) GlobalRule() GlobalRule {
return c.globalRules
}

// HealthCheck implements Cluster.HealthCheck method.
func (c *cluster) HealthCheck(ctx context.Context) (err error) {
if c.cacheSyncErr != nil {
err = c.cacheSyncErr
return
}
if atomic.LoadInt32(&c.cacheState) == _cacheSyncing {
return
}

// Retry three times in a row, and exit if all of them fail.
backoff := wait.Backoff{
Duration: 5 * time.Second,
Factor: 1,
Steps: 3,
}
var lastCheckErr error
err = wait.ExponentialBackoffWithContext(ctx, backoff, func() (done bool, _ error) {
if lastCheckErr = c.healthCheck(ctx); lastCheckErr != nil {
log.Warnf("failed to check health for cluster %s: %s, will retry", c.name, lastCheckErr)
return
}
done = true
return
})
if err != nil {
// if ErrWaitTimeout then set lastSyncErr
c.cacheSyncErr = lastCheckErr
}
return err
}

func (c *cluster) healthCheck(ctx context.Context) (err error) {
// tcp socket probe
d := net.Dialer{Timeout: 3 * time.Second}
conn, err := d.DialContext(ctx, "tcp", c.baseURLHost)
if err != nil {
return err
}
if er := conn.Close(); er != nil {
log.Warnw("failed to close tcp probe connection",
zap.Error(err),
zap.String("cluster", c.name),
)
}
return
}

func (c *cluster) applyAuth(req *http.Request) {
if c.adminKey != "" {
req.Header.Set("X-API-Key", c.adminKey)
Expand Down
4 changes: 4 additions & 0 deletions pkg/apisix/nonexistentclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ func (nc *nonExistentCluster) HasSynced(_ context.Context) error {
return nil
}

func (nc *nonExistentCluster) HealthCheck(_ context.Context) error {
return nil
}

func (nc *nonExistentCluster) String() string {
return "non-existent cluster"
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/ingress/apisix_cluster_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ func (c *Controller) newApisixClusterConfigController() *apisixClusterConfigCont
func (c *apisixClusterConfigController) run(ctx context.Context) {
log.Info("ApisixClusterConfig controller started")
defer log.Info("ApisixClusterConfig controller exited")
defer c.workqueue.ShutDown()

if ok := cache.WaitForCacheSync(ctx.Done(), c.controller.apisixClusterConfigInformer.HasSynced); !ok {
log.Error("cache sync failed")
return
Expand All @@ -62,7 +64,6 @@ func (c *apisixClusterConfigController) run(ctx context.Context) {
go c.runWorker(ctx)
}
<-ctx.Done()
c.workqueue.ShutDown()
}

func (c *apisixClusterConfigController) runWorker(ctx context.Context) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/ingress/apisix_route.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ func (c *Controller) newApisixRouteController() *apisixRouteController {
func (c *apisixRouteController) run(ctx context.Context) {
log.Info("ApisixRoute controller started")
defer log.Info("ApisixRoute controller exited")
defer c.workqueue.ShutDown()

ok := cache.WaitForCacheSync(ctx.Done(), c.controller.apisixRouteInformer.HasSynced)
if !ok {
log.Error("cache sync failed")
Expand All @@ -66,7 +68,6 @@ func (c *apisixRouteController) run(ctx context.Context) {
go c.runWorker(ctx)
}
<-ctx.Done()
c.workqueue.ShutDown()
}

func (c *apisixRouteController) runWorker(ctx context.Context) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/ingress/apisix_tls.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ func (c *Controller) newApisixTlsController() *apisixTlsController {
func (c *apisixTlsController) run(ctx context.Context) {
log.Info("ApisixTls controller started")
defer log.Info("ApisixTls controller exited")
defer c.workqueue.ShutDown()

if ok := cache.WaitForCacheSync(ctx.Done(), c.controller.apisixTlsInformer.HasSynced, c.controller.secretInformer.HasSynced); !ok {
log.Errorf("informers sync failed")
return
Expand All @@ -66,7 +68,6 @@ func (c *apisixTlsController) run(ctx context.Context) {
}

<-ctx.Done()
c.workqueue.ShutDown()
}

func (c *apisixTlsController) runWorker(ctx context.Context) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/ingress/apisix_upstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ func (c *Controller) newApisixUpstreamController() *apisixUpstreamController {
func (c *apisixUpstreamController) run(ctx context.Context) {
log.Info("ApisixUpstream controller started")
defer log.Info("ApisixUpstream controller exited")
defer c.workqueue.ShutDown()

if ok := cache.WaitForCacheSync(ctx.Done(), c.controller.apisixUpstreamInformer.HasSynced, c.controller.svcInformer.HasSynced); !ok {
log.Error("cache sync failed")
return
Expand All @@ -66,7 +68,6 @@ func (c *apisixUpstreamController) run(ctx context.Context) {
}

<-ctx.Done()
c.workqueue.ShutDown()
}

func (c *apisixUpstreamController) runWorker(ctx context.Context) {
Expand Down
Loading