-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Updated the consumer and producer libraries.
Reworked the health and gtg endpoints.
- Loading branch information
Mihai Moisa
committed
Jun 16, 2017
1 parent
efd87cb
commit 8ca9eaa
Showing
7 changed files
with
200 additions
and
209 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,100 +1,90 @@ | ||
package main | ||
|
||
import ( | ||
"errors" | ||
"fmt" | ||
health "github.com/Financial-Times/go-fthealth/v1_1" | ||
"net/http" | ||
"time" | ||
|
||
fthealth "github.com/Financial-Times/go-fthealth/v1_1" | ||
"github.com/Financial-Times/message-queue-go-producer/producer" | ||
"github.com/Financial-Times/message-queue-gonsumer/consumer" | ||
"github.com/Financial-Times/service-status-go/gtg" | ||
"net/http" | ||
) | ||
|
||
const healthPath = "/__health" | ||
|
||
type healthService struct { | ||
config *healthConfig | ||
checks []health.Check | ||
} | ||
const requestTimeout = 4500 | ||
|
||
type healthConfig struct { | ||
type HealthCheck struct { | ||
consumer consumer.MessageConsumer | ||
producer producer.MessageProducer | ||
appSystemCode string | ||
appName string | ||
port string | ||
httpCl *http.Client | ||
consumerConf consumer.QueueConfig | ||
producerConf producer.MessageProducerConfig | ||
panicGuide string | ||
} | ||
|
||
func newHealthService(config *healthConfig) *healthService { | ||
service := &healthService{config: config} | ||
service.checks = []health.Check{ | ||
service.queueCheck(), | ||
func newHealthCheck(producerConf *producer.MessageProducerConfig, consumerConf *consumer.QueueConfig, appName, appSystemCode, panicGuide string) *HealthCheck { | ||
httpClient := &http.Client{Timeout: requestTimeout * time.Millisecond} | ||
p := producer.NewMessageProducerWithHTTPClient(*producerConf, httpClient) | ||
c := consumer.NewConsumer(*consumerConf, func(m consumer.Message) {}, httpClient) | ||
return &HealthCheck{ | ||
consumer: c, | ||
producer: p, | ||
appName: appName, | ||
appSystemCode: appSystemCode, | ||
panicGuide: panicGuide, | ||
} | ||
return service | ||
} | ||
|
||
func (service *healthService) queueCheck() health.Check { | ||
return health.Check{ | ||
ID: "message-queue-proxy-reachable", | ||
BusinessImpact: "Related content from published Next videos will not be processed, clients will not see them within content.", | ||
Name: "Message Queue Proxy Reachable", | ||
PanicGuide: service.config.panicGuide, | ||
Severity: 1, | ||
TechnicalSummary: "Message queue proxy is not reachable/healthy", | ||
Checker: service.checkAggregateMessageQueueProxiesReachable, | ||
func (h *HealthCheck) Health() func(w http.ResponseWriter, r *http.Request) { | ||
checks := []fthealth.Check{h.readQueueCheck(), h.writeQueueCheck()} | ||
hc := fthealth.HealthCheck{ | ||
SystemCode: h.appSystemCode, | ||
Name: h.appName, | ||
Description: serviceDescription, | ||
Checks: checks, | ||
} | ||
return fthealth.Handler(hc) | ||
} | ||
|
||
func (service *healthService) checkAggregateMessageQueueProxiesReachable() (string, error) { | ||
var errMsg string | ||
|
||
err := service.checkMessageQueueProxyReachable(service.config.producerConf.Addr, service.config.producerConf.Topic, service.config.producerConf.Authorization, service.config.producerConf.Queue) | ||
if err != nil { | ||
return err.Error(), fmt.Errorf("Health check for queue address %s, topic %s failed. Error: %s", service.config.producerConf.Addr, service.config.producerConf.Topic, err.Error()) | ||
func (h *HealthCheck) readQueueCheck() fthealth.Check { | ||
return fthealth.Check{ | ||
ID: "read-message-queue-proxy-reachable", | ||
Name: "Read Message Queue Proxy Reachable", | ||
Severity: 1, | ||
BusinessImpact: "Related content from published Next videos will not be processed, clients will not see them within content.", | ||
TechnicalSummary: "Read message queue proxy is not reachable/healthy", | ||
PanicGuide: h.panicGuide, | ||
Checker: h.consumer.ConnectivityCheck, | ||
} | ||
} | ||
|
||
for i := 0; i < len(service.config.consumerConf.Addrs); i++ { | ||
err := service.checkMessageQueueProxyReachable(service.config.consumerConf.Addrs[i], service.config.consumerConf.Topic, service.config.consumerConf.AuthorizationKey, service.config.consumerConf.Queue) | ||
if err == nil { | ||
return "Ok", nil | ||
} | ||
errMsg = errMsg + fmt.Sprintf("Health check for queue address %s, topic %s failed. Error: %s", service.config.consumerConf.Addrs[i], service.config.consumerConf.Topic, err.Error()) | ||
func (h *HealthCheck) writeQueueCheck() fthealth.Check { | ||
return fthealth.Check{ | ||
ID: "write-message-queue-proxy-reachable", | ||
Name: "Write Message Queue Proxy Reachable", | ||
Severity: 1, | ||
BusinessImpact: "Related content from published Next videos will not be processed, clients will not see them within content.", | ||
TechnicalSummary: "Write message queue proxy is not reachable/healthy", | ||
PanicGuide: h.panicGuide, | ||
Checker: h.producer.ConnectivityCheck, | ||
} | ||
return errMsg, errors.New(errMsg) | ||
} | ||
|
||
func (service *healthService) checkMessageQueueProxyReachable(address string, topic string, authKey string, queue string) error { | ||
req, err := http.NewRequest("GET", address+"/topics", nil) | ||
if err != nil { | ||
logger.messageEvent(topic, fmt.Sprintf("Could not connect to proxy: %v", err.Error())) | ||
return err | ||
func (h *HealthCheck) GTG() gtg.Status { | ||
consumerCheck := func() gtg.Status { | ||
return gtgCheck(h.consumer.ConnectivityCheck) | ||
} | ||
if len(authKey) > 0 { | ||
req.Header.Add("Authorization", authKey) | ||
producerCheck := func() gtg.Status { | ||
return gtgCheck(h.producer.ConnectivityCheck) | ||
} | ||
if len(queue) > 0 { | ||
req.Host = queue | ||
} | ||
resp, err := service.config.httpCl.Do(req) | ||
if err != nil { | ||
logger.messageEvent(topic, fmt.Sprintf("Could not connect to proxy: %v", err.Error())) | ||
return err | ||
} | ||
defer func() { _ = resp.Body.Close() }() | ||
if resp.StatusCode != http.StatusOK { | ||
errMsg := fmt.Sprintf("Proxy returned status: %d", resp.StatusCode) | ||
return errors.New(errMsg) | ||
} | ||
return nil | ||
|
||
return gtg.FailFastParallelCheck([]gtg.StatusChecker{ | ||
consumerCheck, | ||
producerCheck, | ||
})() | ||
} | ||
|
||
func (service *healthService) gtgCheck() gtg.Status { | ||
for _, check := range service.checks { | ||
if _, err := check.Checker(); err != nil { | ||
return gtg.Status{GoodToGo: false, Message: err.Error()} | ||
} | ||
func gtgCheck(handler func() (string, error)) gtg.Status { | ||
if _, err := handler(); err != nil { | ||
return gtg.Status{GoodToGo: false, Message: err.Error()} | ||
} | ||
return gtg.Status{GoodToGo: true} | ||
} |
Oops, something went wrong.