Skip to content
This repository has been archived by the owner on Jun 14, 2018. It is now read-only.

Commit

Permalink
Merge pull request #16 from Financial-Times/ingest-concepts-into-elas…
Browse files Browse the repository at this point in the history
…ticsearch

Ingest concepts into elasticsearch
  • Loading branch information
BerniVarga committed Dec 13, 2016
2 parents bb8be64 + 5a71ee4 commit 94e329c
Show file tree
Hide file tree
Showing 5 changed files with 215 additions and 43 deletions.
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
# Concept Ingester

__An API for reading concepts off of the kafka queue and sending them to the appropriate writer__
[![Circle CI](https://circleci.com/gh/Financial-Times/concept-ingester.svg?style=shield)](https://circleci.com/gh/Financial-Times/concept-ingester)[![Go Report Card](https://goreportcard.com/badge/github.com/Financial-Times/concept-ingester)](https://goreportcard.com/report/github.com/Financial-Times/concept-ingester) [![Coverage Status](https://coveralls.io/repos/github/Financial-Times/concept-ingester/badge.svg)](https://coveralls.io/github/Financial-Times/concept-ingester)

__An API for reading concepts off of the kafka queue and sending them to the appropriate neo4j writer__

An elasticsearch writer can also be configured. If so, after a successful ingestion into neo4j, the ingester will ingest the concept into elasticsearch too.
Incremental counters both for neo and elasticsearch are configured.

## Installation

Expand All @@ -14,6 +19,7 @@ __An API for reading concepts off of the kafka queue and sending them to the app
Some comments about configuration parameters:
* --vulcan_addr the vulcan address, host and port
* --services-list comma separated list of neo4j writers - do not append a port for running in the cluster
* --elastic-service elasticsearch writer name
* --topic, --consumer_group_id, --consumer_autocommit_enable, --consumer_offset, --consumer_queue_id see the message-queue-gonsumer library

## Healthchecks
Expand Down
16 changes: 16 additions & 0 deletions circle.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
dependencies:
pre:
- go get github.com/axw/gocov/gocov; go get github.com/matm/gocov-html; go get -u github.com/jstemmer/go-junit-report
test:
pre:
- go get github.com/mattn/goveralls
override:
- mkdir -p $CIRCLE_TEST_REPORTS/golang
- go test -race -v ./... | go-junit-report > $CIRCLE_TEST_REPORTS/golang/junit.xml
- go test -v -cover -race -coverprofile=$CIRCLE_ARTIFACTS/coverage.out ./...
- cd $CIRCLE_ARTIFACTS && sed -i '1d' *.out
- |
echo "mode: atomic" > $CIRCLE_ARTIFACTS/overall-coverage.result
- cd $CIRCLE_ARTIFACTS && cat *.out >> overall-coverage.result
post:
- goveralls -coverprofile=$CIRCLE_ARTIFACTS/overall-coverage.result -service=circle-ci -repotoken=$COVERALLS_TOKEN
54 changes: 43 additions & 11 deletions handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ import (
)

type httpHandlers struct {
baseURLMappings map[string]string
vulcanAddr string
topic string
baseURLMappings map[string]string
elasticsearchWriterUrl string
vulcanAddr string
topic string
}

func (hh *httpHandlers) kafkaProxyHealthCheck() v1a.Check {
Expand All @@ -39,6 +40,17 @@ func (hh *httpHandlers) writerHealthCheck() v1a.Check {
}
}

func (hh *httpHandlers) elasticHealthCheck() v1a.Check {
return v1a.Check{
BusinessImpact: "Unable to connect to elasticsearch concept writer",
Name: "Check connectivity to concept-rw-elasticsearch",
PanicGuide: "https://sites.google.com/a/ft.com/universal-publishing/ops-guides/concept-ingestion",
Severity: 1,
TechnicalSummary: `Cannot connect to elasticsearch concept writer. If this check fails, check that the configured writer returns a healthy gtg`,
Checker: hh.checkCanConnectToElasticsearchWriter,
}
}

func (hh *httpHandlers) checkCanConnectToKafkaProxy() (string, error) {
_, err := checkProxyConnection(hh.vulcanAddr)
if err != nil {
Expand All @@ -48,13 +60,21 @@ func (hh *httpHandlers) checkCanConnectToKafkaProxy() (string, error) {
}

func (hh *httpHandlers) checkCanConnectToWriters() (string, error) {
err := checkWriterAvailability(hh.baseURLMappings)
err := checkWritersAvailability(hh.baseURLMappings)
if err != nil {
return fmt.Sprintf("Healthcheck: Writer not available: %v", err.Error()), err
}
return "", nil
}

func (hh *httpHandlers) checkCanConnectToElasticsearchWriter() (string, error) {
err := checkWriterAvailability(hh.elasticsearchWriterUrl)
if err != nil {
return fmt.Sprintf("Healthcheck: Elasticsearch Concept Writer not available: %v", err.Error()), err
}
return "", nil
}

func checkProxyConnection(vulcanAddr string) (body []byte, err error) {
//check if proxy is running and topic is present
req, err := http.NewRequest("GET", vulcanAddr+"/topics", nil)
Expand Down Expand Up @@ -97,22 +117,34 @@ func (hh *httpHandlers) goodToGo(writer http.ResponseWriter, req *http.Request)
writer.WriteHeader(http.StatusServiceUnavailable)
return
}
if _, err := hh.checkCanConnectToWriters(); err != nil {
writer.WriteHeader(http.StatusServiceUnavailable)
return
}
}

func checkWriterAvailability(baseURLMapping map[string]string) error {
func checkWritersAvailability(baseURLMapping map[string]string) error {
for _, baseURL := range baseURLMapping {
resp, err := http.Get(baseURL + "/__gtg")
err := checkWriterAvailability(baseURL)
if err != nil {
return fmt.Errorf("Error calling writer at %s : %v", baseURL+"/__gtg", err)
}
resp.Body.Close()
if resp != nil && resp.StatusCode != http.StatusOK {
return fmt.Errorf("Writer %v returned status %d", baseURL+"/__gtg", resp.StatusCode)
return err
}
}
return nil
}

func checkWriterAvailability(baseURL string) error {
resp, err := http.Get(baseURL + "/__gtg")
if err != nil {
return fmt.Errorf("Error calling writer at %s : %v", baseURL+"/__gtg", err)
}
resp.Body.Close()
if resp != nil && resp.StatusCode != http.StatusOK {
return fmt.Errorf("Writer %v returned status %d", baseURL+"/__gtg", resp.StatusCode)
}
return nil
}

// buildInfoHandler - This is a stop gap and will be added to when we can define what we should display here
func (hh *httpHandlers) buildInfoHandler(w http.ResponseWriter, req *http.Request) {
fmt.Fprintf(w, "build-info")
Expand Down
104 changes: 77 additions & 27 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,16 @@ import (
standardLog "log"
"net/http"
"os"

"strings"

queueConsumer "github.com/Financial-Times/message-queue-gonsumer/consumer"

"io"
"io/ioutil"
"os/signal"
"sync"
"syscall"
"time"

"net"

"fmt"

"github.com/Financial-Times/go-fthealth/v1a"
"github.com/Financial-Times/http-handlers-go/httphandlers"
status "github.com/Financial-Times/service-status-go/httphandlers"
Expand Down Expand Up @@ -47,9 +41,14 @@ func main() {
services := app.String(cli.StringOpt{
Name: "services-list",
Value: "services",
Desc: "writer services",
Desc: "neo4j writer services",
EnvVar: "SERVICES",
})
elasticService := app.String(cli.StringOpt{
Name: "elastic-service",
Desc: "elasticsearch writer service",
EnvVar: "ELASTICSEARCH_WRITER",
})
port := app.String(cli.StringOpt{
Name: "port",
Value: "8080",
Expand Down Expand Up @@ -125,9 +124,21 @@ func main() {
}

writerMappings := createWriterMappings(*services, *vulcanAddr)

var elasticsearchWriterBasicMapping string
var elasticsearchWriterBulkMapping string
if *elasticService != "" {
elasticsearchWriterBasicMapping = resolveWriterURL(*elasticService, *vulcanAddr)
if elasticsearchWriterBasicMapping != "" {
elasticsearchWriterBulkMapping = elasticsearchWriterBasicMapping + "/bulk"
}
log.Infof("Using writer url: %s for service: %s", elasticsearchWriterBasicMapping, *elasticService)
}

ing := ingesterService{
baseURLMappings: writerMappings,
ticker: time.NewTicker(time.Second / time.Duration(*throttle)),
baseURLMappings: writerMappings,
elasticWriterURL: elasticsearchWriterBulkMapping,
ticker: time.NewTicker(time.Second / time.Duration(*throttle)),
}

outputMetricsIfRequired(*graphiteTCPAddress, *graphitePrefix, *logMetrics)
Expand All @@ -142,7 +153,7 @@ func main() {
wg.Done()
}()

go runServer(ing.baseURLMappings, *port, *vulcanAddr, *topic)
go runServer(ing.baseURLMappings, elasticsearchWriterBasicMapping, *port, *vulcanAddr, *topic)

ch := make(chan os.Signal)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
Expand Down Expand Up @@ -176,11 +187,16 @@ func resolveWriterURL(service string, vulcanAddr string) string {
return vulcanAddr + "/__" + service
}

func runServer(baseURLMappings map[string]string, port string, vulcanAddr string, topic string) {
func runServer(baseURLMappings map[string]string, elasticsearchWriter string, port string, vulcanAddr string, topic string) {

httpHandlers := httpHandlers{baseURLMappings, vulcanAddr, topic}
httpHandlers := httpHandlers{baseURLMappings, elasticsearchWriter, vulcanAddr, topic}
var r http.Handler
if elasticsearchWriter != "" {
r = router(httpHandlers, true)
} else {
r = router(httpHandlers, false)
}

r := router(httpHandlers)
// The following endpoints should not be monitored or logged (varnish calls one of these every second, depending on config)
// The top one of these build info endpoints feels more correct, but the lower one matches what we have in Dropwizard,
// so it's what apps expect currently same as ping, the content of build-info needs more definition
Expand All @@ -197,9 +213,16 @@ func runServer(baseURLMappings map[string]string, port string, vulcanAddr string
}
}

func router(hh httpHandlers) http.Handler {
func router(hh httpHandlers, includeElasticsearchWriter bool) http.Handler {

servicesRouter := mux.NewRouter()
servicesRouter.HandleFunc("/__health", v1a.Handler("ConceptIngester Healthchecks", "Checks for accessing writer", hh.kafkaProxyHealthCheck(), hh.writerHealthCheck()))
var checks []v1a.Check = []v1a.Check{hh.kafkaProxyHealthCheck(), hh.kafkaProxyHealthCheck()}

if includeElasticsearchWriter {
checks = append(checks, hh.elasticHealthCheck())
}

servicesRouter.HandleFunc("/__health", v1a.Handler("ConceptIngester Healthchecks", "Checks for accessing writer", checks...))
servicesRouter.HandleFunc("/__gtg", hh.goodToGo)

var monitoringRouter http.Handler = servicesRouter
Expand All @@ -210,9 +233,10 @@ func router(hh httpHandlers) http.Handler {
}

type ingesterService struct {
baseURLMappings map[string]string
client http.Client
ticker *time.Ticker
baseURLMappings map[string]string
elasticWriterURL string
client http.Client
ticker *time.Ticker
}

func (ing ingesterService) readMessage(msg queueConsumer.Message) {
Expand All @@ -225,13 +249,33 @@ func (ing ingesterService) readMessage(msg queueConsumer.Message) {

func (ing ingesterService) processMessage(msg queueConsumer.Message) error {
ingestionType, uuid := extractMessageTypeAndId(msg.Headers)
err := sendToWriter(ingestionType, strings.NewReader(msg.Body), uuid, ing.baseURLMappings)

writerUrl, err := resolveWriter(ingestionType, ing.baseURLMappings)
if err != nil {
failureMeter := metrics.GetOrRegisterMeter(ingestionType+"-FAILURE", metrics.DefaultRegistry)
failureMeter.Mark(1)
log.Infof("Incremented failure count, new count=%d for meter=%s", failureMeter.Count(), ingestionType+"-FAILURE")
return err
}

err = sendToWriter(ingestionType, msg.Body, uuid, writerUrl)
if err != nil {
failureMeter := metrics.GetOrRegisterMeter(ingestionType+"-FAILURE", metrics.DefaultRegistry)
failureMeter.Mark(1)
log.Infof("Incremented failure count, new count=%d for meter=%s", failureMeter.Count(), ingestionType+"-FAILURE")
return err
}

if ing.elasticWriterURL != "" {
err = sendToWriter(ingestionType, msg.Body, uuid, ing.elasticWriterURL)
if err != nil {
failureMeter := metrics.GetOrRegisterMeter(ingestionType+"-elasticsearch-FAILURE", metrics.DefaultRegistry)
failureMeter.Mark(1)
log.Infof("Incremented failure count, new count=%d for meter=%s", failureMeter.Count(), ingestionType+"-elasticsearch-FAILURE")
return err
}
}

successMeter := metrics.GetOrRegisterMeter(ingestionType+"-SUCCESS", metrics.DefaultRegistry)
successMeter.Mark(1)
return nil
Expand All @@ -242,39 +286,45 @@ func extractMessageTypeAndId(headers map[string]string) (string, string) {
return headers["Message-Type"], headers["Message-Id"]
}

func sendToWriter(ingestionType string, msgBody io.Reader, uuid string, URLMappings map[string]string) error {
request, reqURL, err := resolveWriterAndCreateRequest(ingestionType, msgBody, uuid, URLMappings)
request.ContentLength = -1
func sendToWriter(ingestionType string, msgBody string, uuid string, elasticWriter string) error {

request, reqURL, err := createWriteRequest(ingestionType, strings.NewReader(msgBody), uuid, elasticWriter)
if err != nil {
log.Errorf("Cannot create write request: [%v]", err)
}
request.ContentLength = -1
resp, reqErr := httpClient.Do(request)
if reqErr != nil {
return fmt.Errorf("reqURL=[%s] concept=[%s] uuid=[%s] error=[%v]", reqURL, ingestionType, uuid, reqErr)
}

if resp.StatusCode == http.StatusOK {
readBody(resp)
return nil
}

defer resp.Body.Close()
errorMessage, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Errorf("Cannot read error body: [%v]", err)
}

return fmt.Errorf("reqURL=[%s] status=[%d] uuid=[%s] error=[%v] body=[%s]", reqURL, resp.StatusCode, uuid, reqErr, string(errorMessage))
}

func resolveWriterAndCreateRequest(ingestionType string, msgBody io.Reader, uuid string, URLMappings map[string]string) (*http.Request, string, error) {
func resolveWriter(ingestionType string, URLMappings map[string]string) (string, error) {
var writerURL string
for service, URL := range URLMappings {
if strings.Contains(service, ingestionType) {
writerURL = URL
}
}
if writerURL == "" {
return nil, "", fmt.Errorf("No configured writer for concept: %v", ingestionType)
return "", fmt.Errorf("No configured writer for concept: %v", ingestionType)
}

return writerURL, nil
}

func createWriteRequest(ingestionType string, msgBody io.Reader, uuid string, writerURL string) (*http.Request, string, error) {

reqURL := writerURL + "/" + ingestionType + "/" + uuid

request, err := http.NewRequest("PUT", reqURL, msgBody)
Expand Down
Loading

0 comments on commit 94e329c

Please sign in to comment.