Skip to content

Commit

Permalink
chore: add health check to apisix-admin and make the leader election …
Browse files Browse the repository at this point in the history
…recyclable (#499)

Co-authored-by: Liu Peng <vslene@gmail.com>
  • Loading branch information
tokers and slene committed May 26, 2021
1 parent 77a06cc commit 582c4b3
Show file tree
Hide file tree
Showing 13 changed files with 235 additions and 99 deletions.
6 changes: 3 additions & 3 deletions pkg/apisix/apisix.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type Cluster interface {
String() string
// HasSynced checks whether all resources in APISIX cluster is synced to cache.
HasSynced(context.Context) error
// HealthCheck checks apisix cluster health in realtime.
HealthCheck(context.Context) error
}

// Route is the specific client interface to take over the create, update,
Expand Down Expand Up @@ -122,6 +124,7 @@ type apisix struct {
func NewClient() (APISIX, error) {
cli := &apisix{
nonExistentCluster: newNonExistentCluster(),
clusters: make(map[string]Cluster),
}
return cli, nil
}
Expand Down Expand Up @@ -160,9 +163,6 @@ func (c *apisix) AddCluster(co *ClusterOptions) error {
if err != nil {
return err
}
if c.clusters == nil {
c.clusters = make(map[string]Cluster)
}
c.clusters[co.Name] = cluster
return nil
}
Expand Down
122 changes: 95 additions & 27 deletions pkg/apisix/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"net/url"
"strings"
"sync/atomic"
"time"
Expand All @@ -37,8 +39,7 @@ import (
const (
_defaultTimeout = 5 * time.Second

_cacheNotSync = iota
_cacheSyncing
_cacheSyncing = iota
_cacheSynced
)

Expand All @@ -50,6 +51,19 @@ var (
ErrDuplicatedCluster = errors.New("duplicated cluster")

_errReadOnClosedResBody = errors.New("http: read on closed response body")

// Default shared transport for apisix client
_defaultTransport = &http.Transport{
Proxy: http.ProxyFromEnvironment,
Dial: (&net.Dialer{
Timeout: 3 * time.Second,
}).Dial,
DialContext: (&net.Dialer{
Timeout: 3 * time.Second,
}).DialContext,
ResponseHeaderTimeout: 30 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}
)

// ClusterOptions contains parameters to customize APISIX client.
Expand All @@ -63,6 +77,7 @@ type ClusterOptions struct {
type cluster struct {
name string
baseURL string
baseURLHost string
adminKey string
cli *http.Client
cacheState int32
Expand All @@ -86,19 +101,21 @@ func newCluster(o *ClusterOptions) (Cluster, error) {
}
o.BaseURL = strings.TrimSuffix(o.BaseURL, "/")

u, err := url.Parse(o.BaseURL)
if err != nil {
return nil, err
}

c := &cluster{
name: o.Name,
baseURL: o.BaseURL,
adminKey: o.AdminKey,
name: o.Name,
baseURL: o.BaseURL,
baseURLHost: u.Host,
adminKey: o.AdminKey,
cli: &http.Client{
Timeout: o.Timeout,
Transport: &http.Transport{
ResponseHeaderTimeout: o.Timeout,
ExpectContinueTimeout: o.Timeout,
},
Timeout: o.Timeout,
Transport: _defaultTransport,
},
cache: nil,
cacheState: _cacheNotSync,
cacheState: _cacheSyncing, // default state
cacheSynced: make(chan struct{}),
}
c.route = newRouteClient(c)
Expand All @@ -108,6 +125,11 @@ func newCluster(o *ClusterOptions) (Cluster, error) {
c.globalRules = newGlobalRuleClient(c)
c.consumer = newConsumerClient(c)

c.cache, err = cache.NewMemDBCache()
if err != nil {
return nil, err
}

go c.syncCache()

return c, nil
Expand All @@ -116,9 +138,6 @@ func newCluster(o *ClusterOptions) (Cluster, error) {
func (c *cluster) syncCache() {
log.Infow("syncing cache", zap.String("cluster", c.name))
now := time.Now()
if !atomic.CompareAndSwapInt32(&c.cacheState, _cacheNotSync, _cacheSyncing) {
panic("dubious state when sync cache")
}
defer func() {
if c.cacheSyncErr == nil {
log.Infow("cache synced",
Expand All @@ -134,13 +153,20 @@ func (c *cluster) syncCache() {
}()

backoff := wait.Backoff{
Duration: time.Second,
Factor: 2,
Steps: 6,
}
err := wait.ExponentialBackoff(backoff, c.syncCacheOnce)
Duration: 2 * time.Second,
Factor: 1,
Steps: 5,
}
var lastSyncErr error
err := wait.ExponentialBackoff(backoff, func() (done bool, _ error) {
// impossibly return: false, nil
// so can safe used
done, lastSyncErr = c.syncCacheOnce()
return
})
if err != nil {
c.cacheSyncErr = err
// if ErrWaitTimeout then set lastSyncErr
c.cacheSyncErr = lastSyncErr
}
close(c.cacheSynced)

Expand All @@ -150,12 +176,6 @@ func (c *cluster) syncCache() {
}

func (c *cluster) syncCacheOnce() (bool, error) {
dbcache, err := cache.NewMemDBCache()
if err != nil {
return false, err
}
c.cache = dbcache

routes, err := c.route.List(context.TODO())
if err != nil {
log.Errorf("failed to list route in APISIX: %s", err)
Expand Down Expand Up @@ -306,6 +326,54 @@ func (c *cluster) GlobalRule() GlobalRule {
return c.globalRules
}

// HealthCheck implements Cluster.HealthCheck method.
func (c *cluster) HealthCheck(ctx context.Context) (err error) {
if c.cacheSyncErr != nil {
err = c.cacheSyncErr
return
}
if atomic.LoadInt32(&c.cacheState) == _cacheSyncing {
return
}

// Retry three times in a row, and exit if all of them fail.
backoff := wait.Backoff{
Duration: 5 * time.Second,
Factor: 1,
Steps: 3,
}
var lastCheckErr error
err = wait.ExponentialBackoffWithContext(ctx, backoff, func() (done bool, _ error) {
if lastCheckErr = c.healthCheck(ctx); lastCheckErr != nil {
log.Warnf("failed to check health for cluster %s: %s, will retry", c.name, lastCheckErr)
return
}
done = true
return
})
if err != nil {
// if ErrWaitTimeout then set lastSyncErr
c.cacheSyncErr = lastCheckErr
}
return err
}

func (c *cluster) healthCheck(ctx context.Context) (err error) {
// tcp socket probe
d := net.Dialer{Timeout: 3 * time.Second}
conn, err := d.DialContext(ctx, "tcp", c.baseURLHost)
if err != nil {
return err
}
if er := conn.Close(); er != nil {
log.Warnw("failed to close tcp probe connection",
zap.Error(err),
zap.String("cluster", c.name),
)
}
return
}

func (c *cluster) applyAuth(req *http.Request) {
if c.adminKey != "" {
req.Header.Set("X-API-Key", c.adminKey)
Expand Down
4 changes: 4 additions & 0 deletions pkg/apisix/nonexistentclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ func (nc *nonExistentCluster) HasSynced(_ context.Context) error {
return nil
}

func (nc *nonExistentCluster) HealthCheck(_ context.Context) error {
return nil
}

func (nc *nonExistentCluster) String() string {
return "non-existent cluster"
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/ingress/apisix_cluster_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ func (c *Controller) newApisixClusterConfigController() *apisixClusterConfigCont
func (c *apisixClusterConfigController) run(ctx context.Context) {
log.Info("ApisixClusterConfig controller started")
defer log.Info("ApisixClusterConfig controller exited")
defer c.workqueue.ShutDown()

if ok := cache.WaitForCacheSync(ctx.Done(), c.controller.apisixClusterConfigInformer.HasSynced); !ok {
log.Error("cache sync failed")
return
Expand All @@ -62,7 +64,6 @@ func (c *apisixClusterConfigController) run(ctx context.Context) {
go c.runWorker(ctx)
}
<-ctx.Done()
c.workqueue.ShutDown()
}

func (c *apisixClusterConfigController) runWorker(ctx context.Context) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/ingress/apisix_route.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ func (c *Controller) newApisixRouteController() *apisixRouteController {
func (c *apisixRouteController) run(ctx context.Context) {
log.Info("ApisixRoute controller started")
defer log.Info("ApisixRoute controller exited")
defer c.workqueue.ShutDown()

ok := cache.WaitForCacheSync(ctx.Done(), c.controller.apisixRouteInformer.HasSynced)
if !ok {
log.Error("cache sync failed")
Expand All @@ -66,7 +68,6 @@ func (c *apisixRouteController) run(ctx context.Context) {
go c.runWorker(ctx)
}
<-ctx.Done()
c.workqueue.ShutDown()
}

func (c *apisixRouteController) runWorker(ctx context.Context) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/ingress/apisix_tls.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ func (c *Controller) newApisixTlsController() *apisixTlsController {
func (c *apisixTlsController) run(ctx context.Context) {
log.Info("ApisixTls controller started")
defer log.Info("ApisixTls controller exited")
defer c.workqueue.ShutDown()

if ok := cache.WaitForCacheSync(ctx.Done(), c.controller.apisixTlsInformer.HasSynced, c.controller.secretInformer.HasSynced); !ok {
log.Errorf("informers sync failed")
return
Expand All @@ -66,7 +68,6 @@ func (c *apisixTlsController) run(ctx context.Context) {
}

<-ctx.Done()
c.workqueue.ShutDown()
}

func (c *apisixTlsController) runWorker(ctx context.Context) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/ingress/apisix_upstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ func (c *Controller) newApisixUpstreamController() *apisixUpstreamController {
func (c *apisixUpstreamController) run(ctx context.Context) {
log.Info("ApisixUpstream controller started")
defer log.Info("ApisixUpstream controller exited")
defer c.workqueue.ShutDown()

if ok := cache.WaitForCacheSync(ctx.Done(), c.controller.apisixUpstreamInformer.HasSynced, c.controller.svcInformer.HasSynced); !ok {
log.Error("cache sync failed")
return
Expand All @@ -66,7 +68,6 @@ func (c *apisixUpstreamController) run(ctx context.Context) {
}

<-ctx.Done()
c.workqueue.ShutDown()
}

func (c *apisixUpstreamController) runWorker(ctx context.Context) {
Expand Down
Loading

0 comments on commit 582c4b3

Please sign in to comment.