Skip to content
This repository has been archived by the owner on Jun 14, 2018. It is now read-only.

Commit

Permalink
Simplified validation process for env variables that contain an address
Browse files Browse the repository at this point in the history
  • Loading branch information
Mihai Moisa committed Feb 23, 2017
1 parent c93c033 commit 15ac4ec
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 71 deletions.
75 changes: 42 additions & 33 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
standardLog "log"
"net"
"net/http"
"net/url"
"os"
"os/signal"
"regexp"
"strings"
"sync"
"syscall"
Expand All @@ -20,15 +20,13 @@ import (
queueConsumer "github.com/Financial-Times/message-queue-gonsumer/consumer"
status "github.com/Financial-Times/service-status-go/httphandlers"
log "github.com/Sirupsen/logrus"
"github.com/asaskevich/govalidator"
graphite "github.com/cyberdelia/go-metrics-graphite"
"github.com/gorilla/mux"
"github.com/jawher/mow.cli"
"github.com/rcrowley/go-metrics"
)

var validHTTPAddress = regexp.MustCompile(`^(?P<protocol>http):\/\/(?P<host>[^:\/\s]+):(?P<port>[\d]{1,5})$`)
var validTCPAddress = regexp.MustCompile(`^(?P<protocol>tcp):\/\/(?P<host>[^:\/\s]+):(?P<port>[\d]{1,5})$`)

var httpClient = http.Client{
Transport: &http.Transport{
MaxIdleConnsPerHost: 128,
Expand All @@ -39,6 +37,21 @@ var httpClient = http.Client{
},
}

type urlAuthority struct {
host string
port string
}

func (ua *urlAuthority) toString() string {
if ua.host != "" && ua.port != "" {
return ua.host + ":" + ua.port
}
if ua.host != "" {
return ua.host
}
return ua.port
}

func main() {
log.SetLevel(log.InfoLevel)
app := cli.App("concept-ingester", "A microservice that consumes concept messages from Kafka and routes them to the appropriate writer")
Expand Down Expand Up @@ -77,11 +90,11 @@ func main() {
Desc: "The kafka queue id",
EnvVar: "QUEUE_ID",
})
graphiteTCPAddress := app.String(cli.StringOpt{
graphiteTCPAuthority := app.String(cli.StringOpt{
Name: "graphite-tcp-address",
Value: "",
Desc: "Graphite TCP adress, e.g. tcp://graphite.ft.com:2003. Leave as default if you do NOT want to output to graphite (e.g. if running locally)",
EnvVar: "GRAPHITE_TCP_ADDRESS",
Desc: "Graphite TCP authority, e.g. graphite.ft.com:2003. Leave as default if you do NOT want to output to graphite (e.g. if running locally)",
EnvVar: "GRAPHITE_TCP_AUTHORITY",
})
graphitePrefix := app.String(cli.StringOpt{
Name: "graphite-prefix",
Expand Down Expand Up @@ -142,7 +155,7 @@ func main() {
ticker: time.NewTicker(time.Second / time.Duration(*throttle)),
}

err = outputMetricsIfRequired(*graphiteTCPAddress, *graphitePrefix, *logMetrics)
err = outputMetricsIfRequired(*graphiteTCPAuthority, *graphitePrefix, *logMetrics)
if err != nil {
log.Fatal(err)
}
Expand Down Expand Up @@ -177,43 +190,43 @@ func createElasticsearchWriterMappings(elasticServiceAddress string) (elasticsea
if elasticServiceAddress == "" {
return
}
components, err := extractAddressComponents(validHTTPAddress, elasticServiceAddress)
urlAuthority, err := extractURLAuthority(elasticServiceAddress)
if err != nil {
return "", "", err
}
elasticsearchWriterBasicMapping = elasticServiceAddress
elasticsearchWriterBulkMapping = elasticServiceAddress + "/bulk"
log.Infof("Using writer address: %s for service: %s", elasticsearchWriterBasicMapping, components["host"])
log.Infof("Using writer address: %s for service: %s", elasticsearchWriterBasicMapping, urlAuthority.host)
return
}

func createWriterMappings(services string) (map[string]string, error) {
writerMappings := make(map[string]string)
servicesSlice := strings.Split(services, ",")
for _, serviceAddress := range servicesSlice {
components, err := extractAddressComponents(validHTTPAddress, serviceAddress)
urlAuthority, err := extractURLAuthority(serviceAddress)
if err != nil {
return nil, err
}
writerMappings[components["host"]] = serviceAddress
log.Infof("Using writer address: %s for service: %s", serviceAddress, components["host"])
writerMappings[urlAuthority.host] = serviceAddress
log.Infof("Using writer address: %s for service: %s", serviceAddress, urlAuthority.host)
}
return writerMappings, nil
}

func extractAddressComponents(expression *regexp.Regexp, address string) (map[string]string, error) {
match := expression.FindStringSubmatch(address)
if match == nil {
return nil, fmt.Errorf("Address '%s' is invalid. Example of a valid address: http://host:8080", address)
func extractURLAuthority(address string) (*urlAuthority, error) {
if !govalidator.IsURL(address) {
return nil, fmt.Errorf("Address '%s' is not a valid URL", address)
}
components := make(map[string]string)
for index, name := range expression.SubexpNames() {
if index == 0 || name == "" {
continue
}
components[name] = match[index]
validURL, err := url.Parse(address)
if err != nil {
return nil, fmt.Errorf("Failed to parse address %s: %s", address, err)
}
authoritySlice := strings.Split(validURL.Host, ":")
if len(authoritySlice) != 2 {
return nil, fmt.Errorf("Address '%s' is invalid. Example of an expected value 'http://localhost:8080'", address)
}
return components, nil
return &urlAuthority{authoritySlice[0], authoritySlice[1]}, nil
}

func runServer(baseURLMappings map[string]string, elasticsearchWriterAddress string, port string, kafkaProxyAddress string, topic string) {
Expand Down Expand Up @@ -372,17 +385,13 @@ func readBody(resp *http.Response) {
resp.Body.Close()
}

func outputMetricsIfRequired(graphiteTCPAddress string, graphitePrefix string, logMetrics bool) error {
if graphiteTCPAddress != "" {
components, err := extractAddressComponents(validTCPAddress, graphiteTCPAddress)
if err != nil {
return err
}
graphiteAuthority := components["host"] + ":" + components["port"]
addr, _ := net.ResolveTCPAddr("tcp", graphiteAuthority)
func outputMetricsIfRequired(graphiteTCPAuthority string, graphitePrefix string, logMetrics bool) error {
if graphiteTCPAuthority != "" {
addr, _ := net.ResolveTCPAddr("tcp", graphiteTCPAuthority)
go graphite.Graphite(metrics.DefaultRegistry, 5*time.Second, graphitePrefix, addr)
}
if logMetrics { //useful locally
if logMetrics {
//useful locally
//messy use of the 'standard' log package here as this method takes the log struct, not an interface, so can't use logrus.Logger
go metrics.Log(metrics.DefaultRegistry, 60*time.Second, standardLog.New(os.Stdout, "metrics", standardLog.Lmicroseconds))
}
Expand Down
68 changes: 30 additions & 38 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package main
import (
"net/http"
"net/http/httptest"
"regexp"
"strings"
"testing"

Expand All @@ -24,8 +23,6 @@ var uuid = "5e0ad5e5-c3d4-387d-9875-ec15501808e5"
var validMessageTypeOrganisations = "organisations"
var invalidMessageType = "animals"

var validExpression = regexp.MustCompile(`^(?P<protocol>http):\/\/(?P<host>[^:\/\s]+):(?P<port>[\d]{1,5})$`)

func TestMessageProcessingHappyPathIncrementsSuccessMeter(t *testing.T) {
// Test server that always responds with 200 code
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -171,7 +168,7 @@ func TestElasticsearchWriterMappingsCreationWithEmptyAddress(t *testing.T) {
}

func TestUnsuccessfulElasticsearchWriterMappingsCreation(t *testing.T) {
_, _, err := createElasticsearchWriterMappings("http://invalidAddress:8080/invalid")
_, _, err := createElasticsearchWriterMappings("http//invalidAddress:8080")
assertion := assert.New(t)
assertion.Error(err)
}
Expand All @@ -184,59 +181,44 @@ func TestSuccessfulWriterMappingsCreation(t *testing.T) {
}

func TestUnsuccessfulWriterMappingsCreation(t *testing.T) {
_, err := createWriterMappings("http://invalidAddress:8080/invalid")
_, err := createWriterMappings("http:/invalidAddress:8080")
assertion := assert.New(t)
assertion.Error(err)
}

func TestSuccessfulAddressComponentsExtraction(t *testing.T) {
func TestSuccessfulURLAuthorityExtraction(t *testing.T) {
testCases := []struct {
address string
expectedComponents []string
address string
expectedAuthority urlAuthority
}{
{"http://localhost:1", []string{"http", "localhost", "1"}},
{"http://localhost:12", []string{"http", "localhost", "12"}},
{"http://localhost:123", []string{"http", "localhost", "123"}},
{"http://localhost:1234", []string{"http", "localhost", "1234"}},
{"http://localhost:12345", []string{"http", "localhost", "12345"}},
{"http://com.ft.address:8080", []string{"http", "com.ft.address", "8080"}},
{"http://localhost:8080", urlAuthority{host: "localhost", port: "8080"}},
{"https://localhost:8080", urlAuthority{host: "localhost", port: "8080"}},
{"tcp://localhost:8080", urlAuthority{host: "localhost", port: "8080"}},
}

assertion := assert.New(t)
for _, tc := range testCases {
components, err := extractAddressComponents(validExpression, tc.address)
authority, err := extractURLAuthority(tc.address)
assertion.NoError(err)
assertion.True(components["protocol"] == tc.expectedComponents[0], "Components must contain 'protocol' key equal to:"+tc.expectedComponents[0])
assertion.True(components["host"] == tc.expectedComponents[1], "Components must contain 'host' key equal to:"+tc.expectedComponents[1])
assertion.True(components["port"] == tc.expectedComponents[2], "Components must contain 'port' key equal to:"+tc.expectedComponents[2])
assertion.Equal(tc.expectedAuthority.host, authority.host)
}
}

func TestUnsuccessfulAddressComponentsExtraction(t *testing.T) {
func TestUnsuccessfulURLAuthorityExtraction(t *testing.T) {
testCases := []struct {
address string
}{
{"tcp://localhost:8080"},
{"http:localhost:8080"},
{"http//localhost:8080"},
{"http:/localhost:8080"},
{"http://:8080"},
{"http://localhost"},
{"http://localhost:"},
{"http://localhost:8080/"},
{"http://localhost:8080/path"},
{"http://:"},
{"://localhost:8080"},
{"//localhost:8080"},
{"localhost:8080"},
{":"},
{"http://localhost::8080"},
{"http:://localhost:8080"},
{"http://localhost:8080:"},
{"http:/localhost:8080"},
{"http:localhost:8080"},
{"http:localhost:123456"},
}

assertion := assert.New(t)
for _, tc := range testCases {
_, err := extractAddressComponents(validExpression, tc.address)
_, err := extractURLAuthority(tc.address)
assertion.Error(err)
}
}
Expand Down Expand Up @@ -266,11 +248,21 @@ func TestErrorIsThrownWhenIngestionTypeMatchesNoWriters(t *testing.T) {
assertion.Error(err, "No configured writer for concept: "+invalidMessageType)
}

func TestOutputMetricsIfRequiredWithInvalidAddress(t *testing.T) {
invalidProtocolAddress := "http://localhost:8080"
err := outputMetricsIfRequired(invalidProtocolAddress, "graphite-prefix", false)
func TestUrlAuthorityToString(t *testing.T) {
testCases := []struct {
authority urlAuthority
expectedAuthorityString string
}{
{urlAuthority{host: "localhost", port: "8080"}, "localhost:8080"},
{urlAuthority{host: "localhost"}, "localhost"},
{urlAuthority{port: "8080"}, "8080"},
}

assertion := assert.New(t)
assertion.Error(err)
for _, tc := range testCases {
authorityString := tc.authority.toString()
assertion.Equal(tc.expectedAuthorityString, authorityString)
}
}

func createMessage(messageID string, messageType string) queueConsumer.Message {
Expand Down

0 comments on commit 15ac4ec

Please sign in to comment.