Skip to content

Commit

Permalink
fix(pkg/healthsrv): do the 3 health checks concurrently
Browse files Browse the repository at this point in the history
  • Loading branch information
Aaron Schlesinger committed Feb 10, 2016
1 parent d1db2f9 commit 89c3a29
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 36 deletions.
18 changes: 18 additions & 0 deletions pkg/healthsrv/buckets_lister.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,21 @@ type errBucketLister struct {
func (e errBucketLister) ListBuckets(*s3.ListBucketsInput) (*s3.ListBucketsOutput, error) {
return nil, e.err
}

// listBuckets calls bl.ListBuckets(...) and sends the results back on the various given channels. This func is intended to be run in a goroutine and communicates via the channels it's passed.
//
// On success, it passes the bucket output on succCh, and on failure, it passes the error on errCh. At most one of {succCh, errCh} will be sent on. If stopCh is closed, no pending or future sends will occur.
func listBuckets(bl BucketLister, succCh chan<- *s3.ListBucketsOutput, errCh chan<- error, stopCh <-chan struct{}) {
lbOut, err := bl.ListBuckets(&s3.ListBucketsInput{})
if err != nil {
select {
case errCh <- err:
case <-stopCh:
}
return
}
select {
case succCh <- lbOut:
case <-stopCh:
}
}
25 changes: 25 additions & 0 deletions pkg/healthsrv/circuit_state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package healthsrv

import (
"fmt"

"github.com/deis/builder/pkg/sshd"
)

// circuitState determines whether circ.State() == sshd.ClosedState, and sends the results back on the various given channels. This func is intended to be run in a goroutine and communicates via the channels it's passed.
//
// If the circuit is closed, it passes an empty struct back on succCh. On failure, it sends an error back on errCh. At most one of {succCh, errCh} will be sent on. If stopCh is closed, no pending or future sends will occur.
func circuitState(circ *sshd.Circuit, succCh chan<- struct{}, errCh chan<- error, stopCh <-chan struct{}) {
// There's a race between the boolean eval and the HTTP error returned (the circuit could close between the two). This function should be polled to avoid that problem. If it's being used in a k8s probe, then you're fine because k8s will repeat the health probe and effectively re-evaluate the boolean
if circ.State() != sshd.ClosedState {
select {
case errCh <- fmt.Errorf("SSH Server is not yet started"):
case <-stopCh:
}
return
}
select {
case succCh <- struct{}{}:
case <-stopCh:
}
}
105 changes: 69 additions & 36 deletions pkg/healthsrv/healthz_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ import (
s3 "github.com/aws/aws-sdk-go/service/s3"
"github.com/deis/builder/pkg/gitreceive/log"
"github.com/deis/builder/pkg/sshd"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/api"
)

type healthZRespBucket struct {
Expand All @@ -31,43 +30,77 @@ type healthZResp struct {
SSHServerStarted bool `json:"ssh_server_started"`
}

func marshalHealthZResp(w http.ResponseWriter, rsp healthZResp) {
if err := json.NewEncoder(w).Encode(rsp); err != nil {
str := fmt.Sprintf("Error encoding JSON (%s)", err)
http.Error(w, str, http.StatusInternalServerError)
return
}
}

func healthZHandler(nsLister NamespaceLister, bLister BucketLister, serverCircuit *sshd.Circuit) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// There's a race between the boolean eval and the HTTP error returned (the server could start up between the two), but k8s will repeat the health probe request and effectively re-evaluate the boolean. The result is that the server may not start until the next probe in those cases
if serverCircuit.State() != sshd.ClosedState {
str := fmt.Sprintf("SSH Server is not yet started")
log.Err(str)
http.Error(w, str, http.StatusServiceUnavailable)
return
}
lbOut, err := bLister.ListBuckets(&s3.ListBucketsInput{})
if err != nil {
str := fmt.Sprintf("Error listing buckets (%s)", err)
log.Err(str)
http.Error(w, str, http.StatusServiceUnavailable)
return
}
var rsp healthZResp
for _, buck := range lbOut.Buckets {
rsp.S3Buckets = append(rsp.S3Buckets, convertBucket(buck))
}
stopCh := make(chan struct{})

nsList, err := nsLister.List(labels.Everything(), fields.Everything())
if err != nil {
str := fmt.Sprintf("Error listing namespaces (%s)", err)
log.Err(str)
http.Error(w, str, http.StatusServiceUnavailable)
return
}
for _, ns := range nsList.Items {
rsp.Namespaces = append(rsp.Namespaces, ns.Name)
}
rsp.SSHServerStarted = true
if err := json.NewEncoder(w).Encode(rsp); err != nil {
str := fmt.Sprintf("Error encoding JSON (%s)", err)
http.Error(w, str, http.StatusInternalServerError)
return
serverStateCh := make(chan struct{})
serverStateErrCh := make(chan error)
go circuitState(serverCircuit, serverStateCh, serverStateErrCh, stopCh)

listBucketsCh := make(chan *s3.ListBucketsOutput)
listBucketsErrCh := make(chan error)
go listBuckets(bLister, listBucketsCh, listBucketsErrCh, stopCh)

namespaceListerCh := make(chan *api.NamespaceList)
namespaceListerErrCh := make(chan error)
go listNamespaces(nsLister, namespaceListerCh, namespaceListerErrCh, stopCh)

var rsp healthZResp
serverState, bucketState, namespaceState := false, false, false
for {
select {
case err := <-serverStateErrCh:
log.Err(err.Error())
http.Error(w, err.Error(), http.StatusServiceUnavailable)
close(stopCh)
return
case err := <-listBucketsErrCh:
str := fmt.Sprintf("Error listing buckets (%s)", err)
log.Err(str)
http.Error(w, str, http.StatusServiceUnavailable)
close(stopCh)
return
case err := <-namespaceListerErrCh:
str := fmt.Sprintf("Error listing namespaces (%s)", err)
log.Err(str)
http.Error(w, str, http.StatusServiceUnavailable)
close(stopCh)
return
case <-serverStateCh:
serverState = true
rsp.SSHServerStarted = true
if serverState && bucketState && namespaceState {
marshalHealthZResp(w, rsp)
return
}
case lbOut := <-listBucketsCh:
bucketState = true
for _, buck := range lbOut.Buckets {
rsp.S3Buckets = append(rsp.S3Buckets, convertBucket(buck))
}
if serverState && bucketState && namespaceState {
marshalHealthZResp(w, rsp)
return
}
case nsList := <-namespaceListerCh:
namespaceState = true
for _, ns := range nsList.Items {
rsp.Namespaces = append(rsp.Namespaces, ns.Name)
}
if serverState && bucketState && namespaceState {
marshalHealthZResp(w, rsp)
return
}
}
}
// TODO: check server is running
})
}
19 changes: 19 additions & 0 deletions pkg/healthsrv/namespace_lister.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,22 @@ type errNamespaceLister struct {
func (e errNamespaceLister) List(labels.Selector, fields.Selector) (*api.NamespaceList, error) {
return nil, e.err
}

// listNamespaces calls nl.List(...) and sends the results back on the various given channels. This func is intended to be run in a goroutine and communicates via the channels it's passed.
//
// On success, it passes the namespace list on succCh, and on failure, it passes the error on errCh. At most one of {succCh, errCh} will be sent on. If stopCh is closed, no pending or future sends will occur.
func listNamespaces(nl NamespaceLister, succCh chan<- *api.NamespaceList, errCh chan<- error, stopCh <-chan struct{}) {
nsList, err := nl.List(labels.Everything(), fields.Everything())
if err != nil {
select {
case errCh <- err:
case <-stopCh:
}
return
}
select {
case succCh <- nsList:
case <-stopCh:
return
}
}

0 comments on commit 89c3a29

Please sign in to comment.