diff --git a/content_indexer.go b/content_indexer.go index 579cd0e..805f867 100644 --- a/content_indexer.go +++ b/content_indexer.go @@ -10,6 +10,7 @@ import ( "github.com/gorilla/mux" "github.com/kr/pretty" "github.com/rcrowley/go-metrics" + "net" "net/http" "os" "os/signal" @@ -58,27 +59,22 @@ func (indexer *contentIndexer) start(appSystemCode string, indexName string, por }() go func() { - indexer.serveAdminEndpoints(appSystemCode, port) + indexer.serveAdminEndpoints(appSystemCode, port, queueConfig) }() } -func (indexer *contentIndexer) serveAdminEndpoints(appSystemCode string, port string) { - healthService := newHealthService(indexer.esServiceInstance) +func (indexer *contentIndexer) serveAdminEndpoints(appSystemCode string, port string, queueConfig consumer.QueueConfig) { + healthService := newHealthService(indexer.esServiceInstance, queueConfig.Topic, queueConfig.Addrs[0]) var monitoringRouter http.Handler = mux.NewRouter() monitoringRouter = httphandlers.TransactionAwareRequestLoggingHandler(log.StandardLogger(), monitoringRouter) monitoringRouter = httphandlers.HTTPMetricsHandler(metrics.DefaultRegistry, monitoringRouter) serveMux := http.NewServeMux() - checks := []health.Check{healthService.connectivityHealthyCheck(), - healthService.clusterIsHealthyCheck(), - healthService.schemaHealthyCheck()} - - //todo add Kafka check - hc := health.HealthCheck{SystemCode: appSystemCode, Name: appSystemCode, Description: "Content Read Writer for Elasticsearch", Checks: checks} + hc := health.HealthCheck{SystemCode: appSystemCode, Name: appSystemCode, Description: "Content Read Writer for Elasticsearch", Checks: healthService.checks} serveMux.HandleFunc("/__health", health.Handler(hc)) serveMux.HandleFunc("/__health-details", healthService.HealthDetails) - serveMux.HandleFunc(status.GTGPath, healthService.GoodToGo) + serveMux.HandleFunc(status.GTGPath, status.NewGoodToGoHandler(healthService.gtgCheck)) serveMux.HandleFunc(status.BuildInfoPath, status.BuildInfoHandler) serveMux.Handle("/", monitoringRouter) @@ -89,7 +85,19 @@ func (indexer *contentIndexer) serveAdminEndpoints(appSystemCode string, port st } func (indexer *contentIndexer) startMessageConsumer(config consumer.QueueConfig) { - messageConsumer := consumer.NewConsumer(config, indexer.handleMessage, &http.Client{}) + client := &http.Client{ + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, + MaxIdleConnsPerHost: 20, + TLSHandshakeTimeout: 3 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + }, + } + messageConsumer := consumer.NewConsumer(config, indexer.handleMessage, client) log.Printf("[Startup] Consumer: %# v", pretty.Formatter(messageConsumer)) var consumerWaitGroup sync.WaitGroup diff --git a/healthcheck.go b/healthcheck.go index 8362c51..290820a 100644 --- a/healthcheck.go +++ b/healthcheck.go @@ -4,16 +4,49 @@ import ( "encoding/json" "fmt" health "github.com/Financial-Times/go-fthealth/v1_1" + "github.com/Financial-Times/service-status-go/gtg" log "github.com/Sirupsen/logrus" + "io" + "io/ioutil" + "net" "net/http" + "time" ) +// ResponseOK Successful healthcheck response +const ResponseOK = "OK" + type healthService struct { esHealthService esHealthServiceI + topic string + proxyAddress string + httpClient *http.Client + checks []health.Check } -func newHealthService(esHealthService esHealthServiceI) *healthService { - return &healthService{esHealthService: esHealthService} +func newHealthService(esHealthService esHealthServiceI, topic string, proxyAddress string) *healthService { + service := &healthService{ + esHealthService: esHealthService, + topic: topic, + proxyAddress: proxyAddress, + httpClient: &http.Client{ + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, + MaxIdleConnsPerHost: 20, + TLSHandshakeTimeout: 3 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + }, + }} + service.checks = []health.Check{ + service.clusterIsHealthyCheck(), + service.connectivityHealthyCheck(), + service.schemaHealthyCheck(), + service.topicHealthcheck()} + return service } func (service *healthService) clusterIsHealthyCheck() health.Check { @@ -80,11 +113,94 @@ func (service *healthService) schemaChecker() (string, error) { } } -//GoodToGo returns a 503 if the healthcheck fails - suitable for use from varnish to check availability of a node -func (service *healthService) GoodToGo(writer http.ResponseWriter, req *http.Request) { - if _, err := service.healthChecker(); err != nil { - writer.WriteHeader(http.StatusServiceUnavailable) +func (service *healthService) topicHealthcheck() health.Check { + return health.Check{ + BusinessImpact: "CombinedPostPublication messages can't be read from the queue. Indexing for search won't work.", + Name: fmt.Sprintf("Check kafka-proxy connectivity and %s topic", service.topic), + PanicGuide: "https://dewey.ft.com/content-rw-elasticsearch.html", + Severity: 1, + TechnicalSummary: "Messages couldn't be read from the queue. Check if kafka-proxy is reachable and topic is present.", + Checker: service.checkIfCombinedPublicationTopicIsPresent, + } +} + +func (service *healthService) checkIfCombinedPublicationTopicIsPresent() (string, error) { + return ResponseOK, service.checkIfTopicIsPresent(service.topic) +} + +func (service *healthService) checkIfTopicIsPresent(searchedTopic string) error { + + urlStr := service.proxyAddress + "/__kafka-rest-proxy/topics" + + body, _, err := executeHTTPRequest(urlStr, service.httpClient) + if err != nil { + log.Errorf("Healthcheck: %v", err.Error()) + return err + } + + var topics []string + + err = json.Unmarshal(body, &topics) + if err != nil { + log.Errorf("Connection could be established to kafka-proxy, but a parsing error occurred and topic could not be found. %v", err.Error()) + return err + } + + for _, topic := range topics { + if topic == searchedTopic { + return nil + } + } + + return fmt.Errorf("Connection could be established to kafka-proxy, but topic %s was not found", searchedTopic) +} + +func executeHTTPRequest(urlStr string, httpClient *http.Client) (b []byte, status int, err error) { + + req, err := http.NewRequest("GET", urlStr, nil) + if err != nil { + return nil, -1, fmt.Errorf("Error creating requests for url=%s, error=%v", urlStr, err) + } + + resp, err := httpClient.Do(req) + if err != nil { + return nil, resp.StatusCode, fmt.Errorf("Error executing requests for url=%s, error=%v", urlStr, err) + } + + defer cleanUp(resp) + + if resp.StatusCode != http.StatusOK { + return nil, resp.StatusCode, fmt.Errorf("Connecting to %s was not successful. Status: %d", urlStr, resp.StatusCode) + } + + b, err = ioutil.ReadAll(resp.Body) + if err != nil { + return nil, http.StatusOK, fmt.Errorf("Could not parse payload from response for url=%s, error=%v", urlStr, err) + } + + return b, http.StatusOK, err +} + +func cleanUp(resp *http.Response) { + + _, err := io.Copy(ioutil.Discard, resp.Body) + if err != nil { + log.Warningf("[%v]", err) + } + + err = resp.Body.Close() + if err != nil { + log.Warningf("[%v]", err) + } +} + +func (service *healthService) gtgCheck() gtg.Status { + for _, check := range service.checks { + if _, err := check.Checker(); err != nil { + return gtg.Status{GoodToGo: false, Message: err.Error()} + } } + return gtg.Status{GoodToGo: true} } //HealthDetails returns the response from elasticsearch service /__health endpoint - describing the cluster health