Skip to content

Commit

Permalink
kafka check
Browse files Browse the repository at this point in the history
  • Loading branch information
peterschubert committed Apr 19, 2017
1 parent b6056be commit f79f79a
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 17 deletions.
30 changes: 19 additions & 11 deletions content_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/gorilla/mux"
"github.com/kr/pretty"
"github.com/rcrowley/go-metrics"
"net"
"net/http"
"os"
"os/signal"
Expand Down Expand Up @@ -58,27 +59,22 @@ func (indexer *contentIndexer) start(appSystemCode string, indexName string, por
}()

go func() {
indexer.serveAdminEndpoints(appSystemCode, port)
indexer.serveAdminEndpoints(appSystemCode, port, queueConfig)
}()
}

func (indexer *contentIndexer) serveAdminEndpoints(appSystemCode string, port string) {
healthService := newHealthService(indexer.esServiceInstance)
func (indexer *contentIndexer) serveAdminEndpoints(appSystemCode string, port string, queueConfig consumer.QueueConfig) {
healthService := newHealthService(indexer.esServiceInstance, queueConfig.Topic, queueConfig.Addrs[0])
var monitoringRouter http.Handler = mux.NewRouter()
monitoringRouter = httphandlers.TransactionAwareRequestLoggingHandler(log.StandardLogger(), monitoringRouter)
monitoringRouter = httphandlers.HTTPMetricsHandler(metrics.DefaultRegistry, monitoringRouter)

serveMux := http.NewServeMux()

checks := []health.Check{healthService.connectivityHealthyCheck(),
healthService.clusterIsHealthyCheck(),
healthService.schemaHealthyCheck()}

//todo add Kafka check
hc := health.HealthCheck{SystemCode: appSystemCode, Name: appSystemCode, Description: "Content Read Writer for Elasticsearch", Checks: checks}
hc := health.HealthCheck{SystemCode: appSystemCode, Name: appSystemCode, Description: "Content Read Writer for Elasticsearch", Checks: healthService.checks}
serveMux.HandleFunc("/__health", health.Handler(hc))
serveMux.HandleFunc("/__health-details", healthService.HealthDetails)
serveMux.HandleFunc(status.GTGPath, healthService.GoodToGo)
serveMux.HandleFunc(status.GTGPath, status.NewGoodToGoHandler(healthService.gtgCheck))
serveMux.HandleFunc(status.BuildInfoPath, status.BuildInfoHandler)

serveMux.Handle("/", monitoringRouter)
Expand All @@ -89,7 +85,19 @@ func (indexer *contentIndexer) serveAdminEndpoints(appSystemCode string, port st
}

func (indexer *contentIndexer) startMessageConsumer(config consumer.QueueConfig) {
messageConsumer := consumer.NewConsumer(config, indexer.handleMessage, &http.Client{})
client := &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
MaxIdleConnsPerHost: 20,
TLSHandshakeTimeout: 3 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},
}
messageConsumer := consumer.NewConsumer(config, indexer.handleMessage, client)
log.Printf("[Startup] Consumer: %# v", pretty.Formatter(messageConsumer))

var consumerWaitGroup sync.WaitGroup
Expand Down
128 changes: 122 additions & 6 deletions healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,49 @@ import (
"encoding/json"
"fmt"
health "github.com/Financial-Times/go-fthealth/v1_1"
"github.com/Financial-Times/service-status-go/gtg"
log "github.com/Sirupsen/logrus"
"io"
"io/ioutil"
"net"
"net/http"
"time"
)

// ResponseOK Successful healthcheck response
const ResponseOK = "OK"

type healthService struct {
esHealthService esHealthServiceI
topic string
proxyAddress string
httpClient *http.Client
checks []health.Check
}

func newHealthService(esHealthService esHealthServiceI) *healthService {
return &healthService{esHealthService: esHealthService}
func newHealthService(esHealthService esHealthServiceI, topic string, proxyAddress string) *healthService {
service := &healthService{
esHealthService: esHealthService,
topic: topic,
proxyAddress: proxyAddress,
httpClient: &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
MaxIdleConnsPerHost: 20,
TLSHandshakeTimeout: 3 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},
}}
service.checks = []health.Check{
service.clusterIsHealthyCheck(),
service.connectivityHealthyCheck(),
service.schemaHealthyCheck(),
service.topicHealthcheck()}
return service
}

func (service *healthService) clusterIsHealthyCheck() health.Check {
Expand Down Expand Up @@ -80,11 +113,94 @@ func (service *healthService) schemaChecker() (string, error) {
}
}

//GoodToGo returns a 503 if the healthcheck fails - suitable for use from varnish to check availability of a node
func (service *healthService) GoodToGo(writer http.ResponseWriter, req *http.Request) {
if _, err := service.healthChecker(); err != nil {
writer.WriteHeader(http.StatusServiceUnavailable)
func (service *healthService) topicHealthcheck() health.Check {
return health.Check{
BusinessImpact: "CombinedPostPublication messages can't be read from the queue. Indexing for search won't work.",
Name: fmt.Sprintf("Check kafka-proxy connectivity and %s topic", service.topic),
PanicGuide: "https://dewey.ft.com/content-rw-elasticsearch.html",
Severity: 1,
TechnicalSummary: "Messages couldn't be read from the queue. Check if kafka-proxy is reachable and topic is present.",
Checker: service.checkIfCombinedPublicationTopicIsPresent,
}
}

func (service *healthService) checkIfCombinedPublicationTopicIsPresent() (string, error) {
return ResponseOK, service.checkIfTopicIsPresent(service.topic)
}

func (service *healthService) checkIfTopicIsPresent(searchedTopic string) error {

urlStr := service.proxyAddress + "/__kafka-rest-proxy/topics"

body, _, err := executeHTTPRequest(urlStr, service.httpClient)
if err != nil {
log.Errorf("Healthcheck: %v", err.Error())
return err
}

var topics []string

err = json.Unmarshal(body, &topics)
if err != nil {
log.Errorf("Connection could be established to kafka-proxy, but a parsing error occurred and topic could not be found. %v", err.Error())
return err
}

for _, topic := range topics {
if topic == searchedTopic {
return nil
}
}

return fmt.Errorf("Connection could be established to kafka-proxy, but topic %s was not found", searchedTopic)
}

func executeHTTPRequest(urlStr string, httpClient *http.Client) (b []byte, status int, err error) {

req, err := http.NewRequest("GET", urlStr, nil)
if err != nil {
return nil, -1, fmt.Errorf("Error creating requests for url=%s, error=%v", urlStr, err)
}

resp, err := httpClient.Do(req)
if err != nil {
return nil, resp.StatusCode, fmt.Errorf("Error executing requests for url=%s, error=%v", urlStr, err)
}

defer cleanUp(resp)

if resp.StatusCode != http.StatusOK {
return nil, resp.StatusCode, fmt.Errorf("Connecting to %s was not successful. Status: %d", urlStr, resp.StatusCode)
}

b, err = ioutil.ReadAll(resp.Body)
if err != nil {
return nil, http.StatusOK, fmt.Errorf("Could not parse payload from response for url=%s, error=%v", urlStr, err)
}

return b, http.StatusOK, err
}

func cleanUp(resp *http.Response) {

_, err := io.Copy(ioutil.Discard, resp.Body)
if err != nil {
log.Warningf("[%v]", err)
}

err = resp.Body.Close()
if err != nil {
log.Warningf("[%v]", err)
}
}

func (service *healthService) gtgCheck() gtg.Status {
for _, check := range service.checks {
if _, err := check.Checker(); err != nil {
return gtg.Status{GoodToGo: false, Message: err.Error()}
}
}
return gtg.Status{GoodToGo: true}
}

//HealthDetails returns the response from elasticsearch service /__health endpoint - describing the cluster health
Expand Down

0 comments on commit f79f79a

Please sign in to comment.