Skip to content
This repository has been archived by the owner on Jun 14, 2018. It is now read-only.

DO NOT MERGE - Remove vulcan based routing #18

Closed
wants to merge 15 commits into from
14 changes: 7 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,20 @@ Incremental counters both for neo and elasticsearch are configured.
* `go install`

## Running in a cluster
* `$GOPATH/bin/concept-ingester --services-list="people-rw-neo4j-blue,organisations-rw-neo4j-blue" --port="8081" --vulcan_addr="http://localhost:8080" --consumer_group_id="TestConcepts" --consumer_autocommit_enable=true --topic="Concept" --consumer_offset="smallest" --consumer_queue_id="kafka" --throttle=10`
* `$GOPATH/bin/concept-ingester --services-list="http://people-rw-neo4j-blue:8080,http://organisations-rw-neo4j-blue:8080" --port="8081" --kafka-proxy-address="http://localhost:8080" --consumer-group-id="TestConcepts" --consumer-autocommit-enable=true --topic="Concept" --consumer-offset="smallest" --consumer-queue-id="kafka" --throttle=10`

Some comments about configuration parameters:
* --vulcan_addr the vulcan address, host and port
* --services-list comma separated list of neo4j writers - do not append a port for running in the cluster
* --elastic-service elasticsearch writer name
* --topic, --consumer_group_id, --consumer_autocommit_enable, --consumer_offset, --consumer_queue_id see the message-queue-gonsumer library
* --kafka-proxy-address the kafka proxy address, host and port
* --services-list comma separated list of neo4j writers authorities, each pair should be provided as host:port
* --elastic-service-address elasticsearch writer address, host and port
* --topic, --consumer-group-id, --consumer-autocommit-enable, --consumer-offset, --consumer-queue-id see the message-queue-gonsumer library

## Healthchecks
* Check connectivity [http://localhost:8080/__health](http://localhost:8080/__health)
* Good to Go: [http://localhost:8080/__gtg](http://localhost:8080/__gtg)

##Examples:
### How to run locally not in the cluster
`concept-ingester --services-list="alphaville-series-rw-neo4j:8092" --port="8089" --vulcan_addr="http://localhost:8082" --consumer_group_id="alphaville-series" --topic="Concept" --consumer_of fset="smallest" --consumer_queue_id="kafka"
`concept-ingester --services-list="http://localhost:8092" --port="8089" --kafka-proxy-address="http://localhost:8082" --consumer-group-id="alphaville-series" --topic="Concept" --consumer-offset="smallest" --consumer-queue-id="kafka"
`
To run locally against local writers, specify the port each of these writers are running on (the writer address will be resolved to localhost:[specified-port]). Override the vulcan-addr parameter to point to the host:port of a kafka-rest-proxy.
To run locally against local writers just use localhost and the port each writer is running on. Override the kafka-address parameter to point to the host:port of a kafka-rest-proxy.
179 changes: 105 additions & 74 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,23 @@ import (
standardLog "log"
"net"
"net/http"
"net/url"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"

"github.com/Financial-Times/http-handlers-go/httphandlers"
queueConsumer "github.com/Financial-Times/message-queue-gonsumer/consumer"
status "github.com/Financial-Times/service-status-go/httphandlers"
log "github.com/Sirupsen/logrus"
"github.com/asaskevich/govalidator"
"github.com/cyberdelia/go-metrics-graphite"
"github.com/gorilla/mux"
"github.com/jawher/mow.cli"
"github.com/rcrowley/go-metrics"
"github.com/Financial-Times/http-handlers-go/httphandlers"
log "github.com/Sirupsen/logrus"
)

func main() {
Expand All @@ -30,44 +32,43 @@ func main() {

services := app.String(cli.StringOpt{
Name: "services-list",
Value: "services",
Desc: "neo4j writer services",
Desc: "A comma separated list of neo4j writer service addresses",
EnvVar: "SERVICES",
})
elasticService := app.String(cli.StringOpt{
Name: "elastic-service",
Desc: "elasticsearch writer service",
EnvVar: "ELASTICSEARCH_WRITER",
elasticServiceAddress := app.String(cli.StringOpt{
Name: "elastic-service-address",
Desc: "elasticsearch writer service address",
EnvVar: "ELASTICSEARCH_WRITER_ADDRESS",
})
port := app.String(cli.StringOpt{
Name: "port",
Value: "8080",
Desc: "Port to listen on",
EnvVar: "PORT",
})
vulcanAddr := app.String(cli.StringOpt{
Name: "vulcan_addr",
Value: "https://vulcan-address",
Desc: "Vulcan address for routing requests",
EnvVar: "VULCAN_ADDR",
kafkaProxyAddress := app.String(cli.StringOpt{
Name: "kafka-proxy-address",
Value: "http://kafka-proxy:8082",
Desc: "Kafka proxy address",
EnvVar: "KAFKA_PROXY_ADDRESS",
})
consumerGroupID := app.String(cli.StringOpt{
Name: "consumer_group_id",
Name: "consumer-group-id",
Value: "ConceptIngesterGroup",
Desc: "Kafka group id used for message consuming.",
Desc: "Kafka group id used for message consuming",
EnvVar: "GROUP_ID",
})
consumerQueue := app.String(cli.StringOpt{
Name: "consumer_queue_id",
Name: "consumer-queue-id",
Value: "",
Desc: "The kafka queue id",
EnvVar: "QUEUE_ID",
})
graphiteTCPAddress := app.String(cli.StringOpt{
graphiteTCPAuthority := app.String(cli.StringOpt{
Name: "graphite-tcp-address",
Value: "",
Desc: "Graphite TCP address, e.g. graphite.ft.com:2003. Leave as default if you do NOT want to output to graphite (e.g. if running locally)",
EnvVar: "GRAPHITE_TCP_ADDRESS",
Desc: "Graphite TCP authority, e.g. graphite.ft.com:2003. Leave as default if you do NOT want to output to graphite (e.g. if running locally)",
EnvVar: "GRAPHITE_TCP_AUTHORITY",
})
graphitePrefix := app.String(cli.StringOpt{
Name: "graphite-prefix",
Expand All @@ -82,14 +83,14 @@ func main() {
EnvVar: "LOG_METRICS",
})
consumerOffset := app.String(cli.StringOpt{
Name: "consumer_offset",
Name: "consumer-offset",
Value: "",
Desc: "Kafka read offset.",
Desc: "Kafka read offset",
EnvVar: "OFFSET"})
consumerAutoCommitEnable := app.Bool(cli.BoolOpt{
Name: "consumer_autocommit_enable",
Name: "consumer-autocommit-enable",
Value: true,
Desc: "Enable autocommit for small messages.",
Desc: "Enable autocommit for small messages",
EnvVar: "COMMIT_ENABLE"})
topic := app.String(cli.StringOpt{
Name: "topic",
Expand Down Expand Up @@ -117,7 +118,7 @@ func main() {
}

consumerConfig := queueConsumer.QueueConfig{
Addrs: strings.Split(*vulcanAddr, ","),
Addrs: []string{*kafkaProxyAddress},
Group: *consumerGroupID,
Queue: *consumerQueue,
Topic: *topic,
Expand All @@ -126,26 +127,26 @@ func main() {
ConcurrentProcessing: true,
}

writerMappings := createWriterMappings(*services, *vulcanAddr)

var elasticsearchWriterBasicMapping string
var elasticsearchWriterBulkMapping string
if *elasticService != "" {
elasticsearchWriterBasicMapping = resolveWriterURL(*elasticService, *vulcanAddr)
if elasticsearchWriterBasicMapping != "" {
elasticsearchWriterBulkMapping = elasticsearchWriterBasicMapping + "/bulk"
}
log.Infof("Using writer url: %s for service: %s", elasticsearchWriterBasicMapping, *elasticService)
writerMappings, err := createWriterMappings(*services)
if err != nil {
log.Fatal(err)
}
elasticsearchWriterBasicMapping, elasticsearchWriterBulkMapping, err := createElasticsearchWriterMappings(*elasticServiceAddress)
if err != nil {
log.Fatal(err)
}

ing := ingesterService{
baseURLMappings: writerMappings,
elasticWriterURL: elasticsearchWriterBulkMapping,
ticker: time.NewTicker(time.Second / time.Duration(*throttle)),
client: httpClient,
baseURLMappings: writerMappings,
elasticWriterAddress: elasticsearchWriterBulkMapping,
ticker: time.NewTicker(time.Second / time.Duration(*throttle)),
client: httpClient,
}

outputMetricsIfRequired(*graphiteTCPAddress, *graphitePrefix, *logMetrics)
err = outputMetricsIfRequired(*graphiteTCPAuthority, *graphitePrefix, *logMetrics)
if err != nil {
log.Fatal(err)
}

consumer := queueConsumer.NewConsumer(consumerConfig, ing.readMessage, httpClient)

Expand Down Expand Up @@ -173,32 +174,57 @@ func main() {
app.Run(os.Args)
}

func createWriterMappings(services string, vulcanAddr string) map[string]string {
func createElasticsearchWriterMappings(elasticServiceAddress string) (elasticsearchWriterBasicMapping string, elasticsearchWriterBulkMapping string, err error) {
if elasticServiceAddress == "" {
return
}
host, err := extractAddressHost(elasticServiceAddress)
if err != nil {
return "", "", err
}
elasticsearchWriterBasicMapping = elasticServiceAddress
elasticsearchWriterBulkMapping = elasticServiceAddress + "/bulk"
log.Infof("Using writer address: %s for service: %s", elasticsearchWriterBasicMapping, host)
return
}

func createWriterMappings(services string) (map[string]string, error) {
writerMappings := make(map[string]string)
serviceSlice := strings.Split(services, ",")
for _, service := range serviceSlice {
writerURL := resolveWriterURL(service, vulcanAddr)
writerMappings[service] = writerURL
log.Infof("Using writer url: %s for service: %s", writerURL, service)
servicesSlice := strings.Split(services, ",")
for _, serviceAddress := range servicesSlice {
host, err := extractAddressHost(serviceAddress)
if err != nil {
return nil, err
}
writerMappings[host] = serviceAddress
log.Infof("Using writer address: %s for service: %s", serviceAddress, host)
}
return writerMappings
return writerMappings, nil
}
func resolveWriterURL(service string, vulcanAddr string) string {
wr := strings.Split(service, ":")
if len(wr) > 1 {
return "http://localhost:" + wr[1]

func extractAddressHost(address string) (string, error) {
if !govalidator.IsURL(address) {
return "", fmt.Errorf("Address '%s' is not a valid URL", address)
}
return vulcanAddr + "/__" + service
validURL, err := url.Parse(address)
if err != nil {
return "", fmt.Errorf("Failed to parse address %s: %s", address, err)
}
authoritySlice := strings.Split(validURL.Host, ":")
if len(authoritySlice) != 2 {
return "", fmt.Errorf("Address '%s' is invalid. Example of an expected value 'http://localhost:8080'", address)
}
return authoritySlice[0], nil
}

func runServer(consumer queueConsumer.MessageConsumer, baseURLMappings map[string]string, elasticsearchWriter string, port string, client *http.Client) {
func runServer(consumer queueConsumer.MessageConsumer, baseURLMappings map[string]string, elasticsearchWriterAddress string, port string, client *http.Client) {
var includeElasticsearchWriter bool
if elasticsearchWriter != "" {
if elasticsearchWriterAddress != "" {
includeElasticsearchWriter = true
}
eWC := &ElasticsearchWriterConfig{
includeElasticsearchWriter: includeElasticsearchWriter,
elasticsearchWriterUrl: elasticsearchWriter,
elasticsearchWriterUrl: elasticsearchWriterAddress,
}
r := router(NewHealthCheck(consumer, getBaseURLs(baseURLMappings), eWC, client))

Expand Down Expand Up @@ -240,10 +266,10 @@ func router(hc *HealthCheck) http.Handler {
}

type ingesterService struct {
baseURLMappings map[string]string
elasticWriterURL string
client *http.Client
ticker *time.Ticker
baseURLMappings map[string]string
elasticWriterAddress string
client *http.Client
ticker *time.Ticker
}

func (ing ingesterService) readMessage(msg queueConsumer.Message) {
Expand All @@ -255,26 +281,28 @@ func (ing ingesterService) readMessage(msg queueConsumer.Message) {
}

func (ing ingesterService) processMessage(msg queueConsumer.Message) error {
ingestionType, uuid, transactionID := extractMessageTypeAndId(msg.Headers)
ingestionType, uuid, transactionID := extractMessageTypeAndID(msg.Headers)

writerUrl, err := resolveWriter(ingestionType, ing.baseURLMappings)
writerAddress, err := resolveWriter(ingestionType, ing.baseURLMappings)
if err != nil {
failureMeter := metrics.GetOrRegisterMeter(ingestionType+"-FAILURE", metrics.DefaultRegistry)
failureMeter.Mark(1)
log.Infof("Incremented failure count, new count=%d for meter=%s", failureMeter.Count(), ingestionType+"-FAILURE")
return err
}

err = sendToWriter(ingestionType, msg.Body, uuid, transactionID, writerUrl, ing.client)
log.Infof("Processing message to service: %v", writerAddress)
err = sendToWriter(ingestionType, msg.Body, uuid, transactionID, writerAddress, ing.client)

if err != nil {
failureMeter := metrics.GetOrRegisterMeter(ingestionType+"-FAILURE", metrics.DefaultRegistry)
failureMeter.Mark(1)
log.Infof("Incremented failure count, new count=%d for meter=%s", failureMeter.Count(), ingestionType+"-FAILURE")
return err
}

if ing.elasticWriterURL != "" {
err = sendToWriter(ingestionType, msg.Body, uuid, transactionID, ing.elasticWriterURL, ing.client)
if ing.elasticWriterAddress != "" {
err = sendToWriter(ingestionType, msg.Body, uuid, transactionID, ing.elasticWriterAddress, ing.client)
if err != nil {
failureMeter := metrics.GetOrRegisterMeter(ingestionType+"-elasticsearch-FAILURE", metrics.DefaultRegistry)
failureMeter.Mark(1)
Expand All @@ -289,11 +317,12 @@ func (ing ingesterService) processMessage(msg queueConsumer.Message) error {

}

func extractMessageTypeAndId(headers map[string]string) (string, string, string) {
func extractMessageTypeAndID(headers map[string]string) (string, string, string) {
return headers["Message-Type"], headers["Message-Id"], headers["X-Request-Id"]
}

func sendToWriter(ingestionType string, msgBody string, uuid string, transactionID string, elasticWriter string, client *http.Client) error {
log.Infof("Sending message to writer: %v", elasticWriter)

request, reqURL, err := createWriteRequest(ingestionType, strings.NewReader(msgBody), uuid, elasticWriter)
if err != nil {
Expand Down Expand Up @@ -324,22 +353,22 @@ func sendToWriter(ingestionType string, msgBody string, uuid string, transaction
}

func resolveWriter(ingestionType string, URLMappings map[string]string) (string, error) {
var writerURL string
var writerAddress string
for service, URL := range URLMappings {
if strings.Contains(service, ingestionType) {
writerURL = URL
writerAddress = URL
}
}
if writerURL == "" {
if writerAddress == "" {
return "", fmt.Errorf("No configured writer for concept: %v", ingestionType)
}

return writerURL, nil
return writerAddress, nil
}

func createWriteRequest(ingestionType string, msgBody io.Reader, uuid string, writerURL string) (*http.Request, string, error) {
func createWriteRequest(ingestionType string, msgBody io.Reader, uuid string, writerAddress string) (*http.Request, string, error) {

reqURL := writerURL + "/" + ingestionType + "/" + uuid
reqURL := writerAddress + "/" + ingestionType + "/" + uuid

request, err := http.NewRequest("PUT", reqURL, msgBody)
if err != nil {
Expand All @@ -353,13 +382,15 @@ func readBody(resp *http.Response) {
resp.Body.Close()
}

func outputMetricsIfRequired(graphiteTCPAddress string, graphitePrefix string, logMetrics bool) {
if graphiteTCPAddress != "" {
addr, _ := net.ResolveTCPAddr("tcp", graphiteTCPAddress)
func outputMetricsIfRequired(graphiteTCPAuthority string, graphitePrefix string, logMetrics bool) error {
if graphiteTCPAuthority != "" {
addr, _ := net.ResolveTCPAddr("tcp", graphiteTCPAuthority)
go graphite.Graphite(metrics.DefaultRegistry, 5*time.Second, graphitePrefix, addr)
}
if logMetrics { //useful locally
if logMetrics {
//useful locally
//messy use of the 'standard' log package here as this method takes the log struct, not an interface, so can't use logrus.Logger
go metrics.Log(metrics.DefaultRegistry, 60*time.Second, standardLog.New(os.Stdout, "metrics", standardLog.Lmicroseconds))
}
return nil
}
Loading