Skip to content
This repository has been archived by the owner on Jun 14, 2018. It is now read-only.

Commit

Permalink
Address env variables now use the pattern 'protocol://host:port'
Browse files Browse the repository at this point in the history
  • Loading branch information
Mihai Moisa committed Feb 23, 2017
1 parent e139483 commit cf190b3
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 92 deletions.
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ Incremental counters both for neo and elasticsearch are configured.
* `go install`

## Running in a cluster
* `$GOPATH/bin/concept-ingester --service-authorities-list="people-rw-neo4j-blue:8080,organisations-rw-neo4j-blue:8080" --port="8081" --kafka-proxy-authority="http://localhost:8080" --consumer-group-id="TestConcepts" --consumer-autocommit-enable=true --topic="Concept" --consumer-offset="smallest" --consumer-queue-id="kafka" --throttle=10`
* `$GOPATH/bin/concept-ingester --service-authorities-list="http://people-rw-neo4j-blue:8080,organisations-rw-neo4j-blue:8080" --port="8081" --kafka-proxy-address="http://localhost:8080" --consumer-group-id="TestConcepts" --consumer-autocommit-enable=true --topic="Concept" --consumer-offset="smallest" --consumer-queue-id="kafka" --throttle=10`

Some comments about configuration parameters:
* --kafka-proxy-authority the kafka proxy authority, host and port
* --kafka-proxy-address the kafka proxy address, host and port
* --service-authorities comma separated list of neo4j writers authorities, each pair should be provided as host:port
* --elastic-service-authority elasticsearch writer authority, host and port
* --elastic-service-address elasticsearch writer address, host and port
* --topic, --consumer-group-id, --consumer-autocommit-enable, --consumer-offset, --consumer-queue-id see the message-queue-gonsumer library

## Healthchecks
Expand All @@ -28,6 +28,6 @@ Some comments about configuration parameters:

##Examples:
### How to run locally not in the cluster
`concept-ingester --service-authorities-list="localhost:8092" --port="8089" --kafka-proxy-authority="http://localhost:8082" --consumer-group-id="alphaville-series" --topic="Concept" --consumer-offset="smallest" --consumer-queue-id="kafka"
`concept-ingester --service-authorities-list="http://localhost:8092" --port="8089" --kafka-proxy-address="http://localhost:8082" --consumer-group-id="alphaville-series" --topic="Concept" --consumer-offset="smallest" --consumer-queue-id="kafka"
`
To run locally against local writers just use localhost and the port each writer is running on. Override the kafka-authority parameter to point to the host:port of a kafka-rest-proxy.
To run locally against local writers just use localhost and the port each writer is running on. Override the kafka-address parameter to point to the host:port of a kafka-rest-proxy.
125 changes: 69 additions & 56 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net/http"
"os"
"os/signal"
"regexp"
"strings"
"sync"
"syscall"
Expand All @@ -25,6 +26,9 @@ import (
"github.com/rcrowley/go-metrics"
)

var validHttpAddress = regexp.MustCompile(`^(?P<protocol>http):\/\/(?P<host>[^:\/\s]+):(?P<port>[\d]{1,5})$`)
var validTcpAddress = regexp.MustCompile(`^(?P<protocol>tcp):\/\/(?P<host>[^:\/\s]+):(?P<port>[\d]{1,5})$`)

var httpClient = http.Client{
Transport: &http.Transport{
MaxIdleConnsPerHost: 128,
Expand All @@ -39,32 +43,32 @@ func main() {
log.SetLevel(log.InfoLevel)
app := cli.App("concept-ingester", "A microservice that consumes concept messages from Kafka and routes them to the appropriate writer")

serviceAuthorities := app.String(cli.StringOpt{
Name: "service-authorities",
Desc: "A comma separated list of neo4j writer service authorities",
EnvVar: "SERVICE_AUTHORITIES",
services := app.String(cli.StringOpt{
Name: "services",
Desc: "A comma separated list of neo4j writer service addresses",
EnvVar: "SERVICES",
})
elasticServiceAuthority := app.String(cli.StringOpt{
Name: "elastic-service-authority",
Desc: "elasticsearch writer service authority.",
EnvVar: "ELASTICSEARCH_WRITER_AUTHORITY",
elasticServiceAddress := app.String(cli.StringOpt{
Name: "elastic-service-address",
Desc: "elasticsearch writer service address",
EnvVar: "ELASTICSEARCH_WRITER_ADDRESS",
})
port := app.String(cli.StringOpt{
Name: "port",
Value: "8080",
Desc: "Port to listen on",
EnvVar: "PORT",
})
kafkaProxyAuthority := app.String(cli.StringOpt{
Name: "kafka-proxy-authority",
Value: "kafka:8082",
Desc: "Kafka proxy authority",
EnvVar: "KAFKA_PROXY_AUTHORITY",
kafkaProxyAddress := app.String(cli.StringOpt{
Name: "kafka-proxy-address",
Value: "http://kafka-proxy:8082",
Desc: "Kafka proxy address",
EnvVar: "KAFKA_PROXY_ADDRESS",
})
consumerGroupID := app.String(cli.StringOpt{
Name: "consumer-group-id",
Value: "ConceptIngesterGroup",
Desc: "Kafka group id used for message consuming.",
Desc: "Kafka group id used for message consuming",
EnvVar: "GROUP_ID",
})
consumerQueue := app.String(cli.StringOpt{
Expand All @@ -73,11 +77,11 @@ func main() {
Desc: "The kafka queue id",
EnvVar: "QUEUE_ID",
})
graphiteTCPAuthority := app.String(cli.StringOpt{
Name: "graphite-tcp-authority",
graphiteTCPAddress := app.String(cli.StringOpt{
Name: "graphite-tcp-address",
Value: "",
Desc: "Graphite TCP authority, e.g. graphite.ft.com:2003. Leave as default if you do NOT want to output to graphite (e.g. if running locally)",
EnvVar: "GRAPHITE_TCP_AUTHORITY",
Desc: "Graphite TCP adress, e.g. tcp://graphite.ft.com:2003. Leave as default if you do NOT want to output to graphite (e.g. if running locally)",
EnvVar: "GRAPHITE_TCP_ADDRESS",
})
graphitePrefix := app.String(cli.StringOpt{
Name: "graphite-prefix",
Expand All @@ -94,12 +98,12 @@ func main() {
consumerOffset := app.String(cli.StringOpt{
Name: "consumer-offset",
Value: "",
Desc: "Kafka read offset.",
Desc: "Kafka read offset",
EnvVar: "OFFSET"})
consumerAutoCommitEnable := app.Bool(cli.BoolOpt{
Name: "consumer-autocommit-enable",
Value: true,
Desc: "Enable autocommit for small messages.",
Desc: "Enable autocommit for small messages",
EnvVar: "COMMIT_ENABLE"})
topic := app.String(cli.StringOpt{
Name: "topic",
Expand All @@ -113,9 +117,8 @@ func main() {
EnvVar: "THROTTLE"})

app.Action = func() {
kafkaProxyURL := resolveURL(*kafkaProxyAuthority)
consumerConfig := queueConsumer.QueueConfig{
Addrs: []string{kafkaProxyURL},
Addrs: []string{*kafkaProxyAddress},
Group: *consumerGroupID,
Queue: *consumerQueue,
Topic: *topic,
Expand All @@ -124,11 +127,11 @@ func main() {
ConcurrentProcessing: true,
}

writerMappings, err := createWriterMappings(*serviceAuthorities)
writerMappings, err := createWriterMappings(*services)
if err != nil {
log.Fatal(err)
}
elasticsearchWriterBasicMapping, elasticsearchWriterBulkMapping, err := createElasticsearchWriterMappings(*elasticServiceAuthority)
elasticsearchWriterBasicMapping, elasticsearchWriterBulkMapping, err := createElasticsearchWriterMappings(*elasticServiceAddress)
if err != nil {
log.Fatal(err)
}
Expand All @@ -139,7 +142,10 @@ func main() {
ticker: time.NewTicker(time.Second / time.Duration(*throttle)),
}

outputMetricsIfRequired(*graphiteTCPAuthority, *graphitePrefix, *logMetrics)
err = outputMetricsIfRequired(*graphiteTCPAddress, *graphitePrefix, *logMetrics)
if err != nil {
log.Fatal(err)
}

consumer := queueConsumer.NewConsumer(consumerConfig, ing.readMessage, httpClient)

Expand All @@ -151,7 +157,7 @@ func main() {
wg.Done()
}()

go runServer(ing.baseURLMappings, elasticsearchWriterBasicMapping, *port, kafkaProxyURL, *topic)
go runServer(ing.baseURLMappings, elasticsearchWriterBasicMapping, *port, *kafkaProxyAddress, *topic)

ch := make(chan os.Signal)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
Expand All @@ -167,46 +173,47 @@ func main() {
app.Run(os.Args)
}

func createElasticsearchWriterMappings(elasticServiceAuthority string) (elasticsearchWriterBasicMapping string, elasticsearchWriterBulkMapping string, err error) {
if elasticServiceAuthority == "" {
func createElasticsearchWriterMappings(elasticServiceAddress string) (elasticsearchWriterBasicMapping string, elasticsearchWriterBulkMapping string, err error) {
if elasticServiceAddress == "" {
return
}

elasticServiceAuthoritySlice := strings.Split(elasticServiceAuthority, ":")
if len(elasticServiceAuthoritySlice) != 2 {
err = fmt.Errorf("Authority '%s' is invalid. Example of a valid authority: host:port", elasticServiceAuthority)
return "", "", err
}
if elasticServiceAuthoritySlice[0] == "" || elasticServiceAuthoritySlice[1] == "" {
err = fmt.Errorf("Authority '%s' is invalid. Example of a valid authority: host:port", elasticServiceAuthority)
components, err := extractAddressComponents(validHttpAddress, elasticServiceAddress)
if err != nil {
return "", "", err
}
elasticsearchWriterBasicMapping = resolveURL(elasticServiceAuthority)
elasticsearchWriterBulkMapping = elasticsearchWriterBasicMapping + "/bulk"
log.Infof("Using writer url: %s for service: %s", elasticsearchWriterBasicMapping, elasticServiceAuthoritySlice[0])
elasticsearchWriterBasicMapping = elasticServiceAddress
elasticsearchWriterBulkMapping = elasticServiceAddress + "/bulk"
log.Infof("Using writer url: %s for service: %s", elasticsearchWriterBasicMapping, components["host"])
return
}

func createWriterMappings(authorities string) (map[string]string, error) {
func createWriterMappings(services string) (map[string]string, error) {
writerMappings := make(map[string]string)
authoritiesSlice := strings.Split(authorities, ",")
for _, authority := range authoritiesSlice {
serviceSlice := strings.Split(authority, ":")
if len(serviceSlice) != 2 {
return nil, fmt.Errorf("Authority '%s' is invalid. Example of a valid authority: host:port", authority)
}
if serviceSlice[0] == "" || serviceSlice[1] == "" {
return nil, fmt.Errorf("Authority '%s' is invalid. Example of a valid authority: host:port", authority)
servicesSlice := strings.Split(services, ",")
for _, serviceAddress := range servicesSlice {
components, err := extractAddressComponents(validHttpAddress, serviceAddress)
if err != nil {
return nil, err
}
writerURL := resolveURL(authority)
writerMappings[serviceSlice[0]] = writerURL
log.Infof("Using writer url: %s for service: %s", writerURL, serviceSlice[0])
writerMappings[components["host"]] = serviceAddress
log.Infof("Using writer url: %s for service: %s", serviceAddress, components["host"])
}
return writerMappings, nil
}

func resolveURL(authority string) string {
return "http://" + authority
func extractAddressComponents(expression *regexp.Regexp, address string) (map[string]string, error) {
match := expression.FindStringSubmatch(address)
if match == nil {
return nil, fmt.Errorf("Address '%s' is invalid. Example of a valid address: http://host:8080", address)
}
components := make(map[string]string)
for index, name := range expression.SubexpNames() {
if index == 0 || name == "" {
continue
}
components[name] = match[index]
}
return components, nil
}

func runServer(baseURLMappings map[string]string, elasticsearchWriterURL string, port string, kafkaProxyURL string, topic string) {
Expand Down Expand Up @@ -365,13 +372,19 @@ func readBody(resp *http.Response) {
resp.Body.Close()
}

func outputMetricsIfRequired(graphiteTCPAuthority string, graphitePrefix string, logMetrics bool) {
if graphiteTCPAuthority != "" {
addr, _ := net.ResolveTCPAddr("tcp", graphiteTCPAuthority)
func outputMetricsIfRequired(graphiteTCPAddress string, graphitePrefix string, logMetrics bool) error {
if graphiteTCPAddress != "" {
components, err := extractAddressComponents(validTcpAddress, graphiteTCPAddress)
if err != nil {
return err
}
graphiteAuthority := components["host"] + ":" + components["port"]
addr, _ := net.ResolveTCPAddr("tcp", graphiteAuthority)
go graphite.Graphite(metrics.DefaultRegistry, 5*time.Second, graphitePrefix, addr)
}
if logMetrics { //useful locally
//messy use of the 'standard' log package here as this method takes the log struct, not an interface, so can't use logrus.Logger
go metrics.Log(metrics.DefaultRegistry, 60*time.Second, standardLog.New(os.Stdout, "metrics", standardLog.Lmicroseconds))
}
return nil
}
Loading

0 comments on commit cf190b3

Please sign in to comment.