Skip to content

Commit

Permalink
Configurable service name separator
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniel Krawczyk committed Jun 20, 2016
1 parent 6ed7737 commit 6175714
Show file tree
Hide file tree
Showing 15 changed files with 137 additions and 122 deletions.
71 changes: 35 additions & 36 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,42 +104,41 @@ curl -X POST 'http://marathon.service.consul:8080/v2/eventSubscriptions?callback

### Options

Argument | Default | Description
----------------------------|-----------------|------------------------------------------------------
config-file | | Path to a JSON file to read configuration from. Note: Will override options set earlier on the command line
consul-auth | `false` | Use Consul with authentication
consul-auth-password | | The basic authentication password
consul-auth-username | | The basic authentication username
consul-get-services-retry | `3` | Number of retries on failure when performing requests to Consul. Each retry uses different cached agent
consul-max-agent-failures | `3` | Max number of consecutive request failures for agent before removal from cache
consul-port | `8500` | Consul port
consul-ssl | `false` | Use HTTPS when talking to Consul
consul-ssl-ca-cert | | Path to a CA certificate file, containing one or more CA certificates to use to validate the certificate sent by the Consul server to us
consul-ssl-cert | | Path to an SSL client certificate to use to authenticate to the Consul server
consul-ssl-verify | `true` | Verify certificates when connecting via SSL
consul-tag | `marathon` | Common tag name added to every service registered in Consul, should be unique for every Marathon-cluster connected to Consul
consul-timeout | `3s` | Time limit for requests made by the Consul HTTP client. A Timeout of zero means no timeout
consul-token | | The Consul ACL token
events-queue-size | `1000` | Size of events queue
listen | `:4000` | Accept connections at this address
log-file | | Save logs to file (e.g.: `/var/log/marathon-consul.log`). If empty logs are published to STDERR
log-format | `text` | Log format: JSON, text
log-level | `info` | Log level: panic, fatal, error, warn, info, or debug
marathon-location | `localhost:8080`| Marathon URL
marathon-password | | Marathon password for basic auth
marathon-protocol | `http` | Marathon protocol (http or https)
marathon-ssl-verify | `true` | Verify certificates when connecting via SSL
marathon-timeout | `30s` | Time limit for requests made by the Marathon HTTP client. A Timeout of zero means no timeout
marathon-username | | Marathon username for basic auth
metrics-interval | `30s` | Metrics reporting interval
metrics-location | | Graphite URL (used when metrics-target is set to graphite)
metrics-prefix | `default` | Metrics prefix (default is resolved to <hostname>.<app_name>
metrics-target | `stdout` | Metrics destination stdout or graphite (empty string disables metrics)
sync-enabled | `true` | Enable Marathon-consul scheduled sync
sync-force | `false` | Force leadership-independent Marathon-consul sync (run always)
sync-interval | `15m0s` | Marathon-consul sync interval
sync-leader | | Marathon cluster-wide node name (defaults to <hostname>:8080), the sync will run only if the specified node is the current Marathon-leader
workers-pool-size | `10` | Number of concurrent workers processing events
Argument | Default | Description
-----------------------|-----------------------|------------------------------------------------------
config-file | | Path to a JSON file to read configuration from. **Note:** Will override options set earlier on the command line. See [example](debian/config.json).
consul-auth | `false` | Use Consul with authentication
consul-auth-password | | The basic authentication password
consul-auth-username | | The basic authentication username
consul-name-separator | `.` | Separator used to create default service name for Consul
consul-port | `8500` | Consul port
consul-ssl | `false` | Use HTTPS when talking to Consul
consul-ssl-ca-cert | | Path to a CA certificate file, containing one or more CA certificates to use to validate the certificate sent by the Consul server to us
consul-ssl-cert | | Path to an SSL client certificate to use to authenticate to the Consul server
consul-ssl-verify | `true` | Verify certificates when connecting via SSL
consul-token | | The Consul ACL token
consul-tag | `marathon` | Common tag name added to every service registered in Consul, should be unique for every Marathon-cluster connected to Consul
consul-timeout | `3s` | Time limit for requests made by the Consul HTTP client. A Timeout of zero means no timeout
events-queue-size | `1000` | Size of events queue
listen | `:4000` | Accept connections at this address
log-level | `info` | Log level: panic, fatal, error, warn, info, or debug
log-format | `text` | Log format: JSON, text
log-file | | Save logs to file (e.g.: `/var/log/marathon-consul.log`). If empty logs are published to STDERR
marathon-location | `localhost:8080` | Marathon URL
marathon-password | | Marathon password for basic auth
marathon-protocol | `http` | Marathon protocol (http or https)
marathon-username | | Marathon username for basic auth
marathon-timeout | `30s` | Time limit for requests made by the Marathon HTTP client. A Timeout of zero means no timeout
metrics-interval | `30s` | Metrics reporting [interval](https://golang.org/pkg/time/#Duration) **Note:** While using file configuration intervals should be provided in *nanoseconds*
metrics-location | | Graphite URL (used when metrics-target is set to graphite)
metrics-prefix | `default` | Metrics prefix (resolved to `<hostname>.<app_name>` by default)
metrics-target | `stdout` | Metrics destination `stdout` or `graphite` (empty string disables metrics)
sync-enabled | `true` | Enable Marathon-consul scheduled sync
sync-force | `false` | Force leadership-independent Marathon-consul sync (run always)
sync-interval | `15m0s` | Marathon-consul sync [interval](https://golang.org/pkg/time/#Duration) **Note:** While using file configuration intervals should be provided in *nanoseconds*
sync-leader | `<hostname>:8080` | Marathon cluster-wide node name (defaults to `<hostname>:8080`), the sync will run only if the node is the current [Marathon-leader](https://mesosphere.github.io/marathon/docs/rest-api.html#get-v2-leader)
workers-pool-size | `10` | Number of concurrent workers processing events


### Endpoints

Expand Down
16 changes: 4 additions & 12 deletions apps/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package apps

import (
"encoding/json"
"strings"
)

// Only Marathon apps with this label will be registered in Consul
Expand Down Expand Up @@ -45,23 +44,16 @@ func (id AppId) String() string {
return string(id)
}

func (id AppId) ConsulServiceName() string {
return strings.Replace(strings.Trim(strings.TrimSpace(id.String()), "/"), "/", ".", -1)
}

func (app *App) IsConsulApp() bool {
_, ok := app.Labels[MARATHON_CONSUL_LABEL]
return ok
}

func (app *App) ConsulServiceName() string {
if value, ok := app.Labels[MARATHON_CONSUL_LABEL]; ok && value != "true" {
value = AppId(value).ConsulServiceName()
if value != "" {
return value
}
func (app *App) ConsulName() string {
if value, ok := app.Labels[MARATHON_CONSUL_LABEL]; ok && value != "true" && value != "" {
return value
}
return app.ID.ConsulServiceName()
return app.ID.String()
}

func ParseApps(jsonBlob []byte) ([]*App, error) {
Expand Down
65 changes: 0 additions & 65 deletions apps/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,68 +119,3 @@ func TestAppId_String(t *testing.T) {
t.Parallel()
assert.Equal(t, "appId", AppId("appId").String())
}

func TestAppId_ConsulServiceName(t *testing.T) {
t.Parallel()

// given
id := AppId("/rootGroup/subGroup/subSubGroup/name")

// when
serviceName := id.ConsulServiceName()

// then
assert.Equal(t, "rootGroup.subGroup.subSubGroup.name", serviceName)
}

func TestConsulServiceName_BackwardCompatibilityForConsulTrue(t *testing.T) {
t.Parallel()

// when
app := &App{
ID: "/someApp",
Labels: map[string]string{"consul": "true"},
}

// then
assert.Equal(t, "someApp", app.ConsulServiceName())
}

func TestConsulServiceName_EmptyConsulLabelValue(t *testing.T) {
t.Parallel()

// when
app := &App{
ID: "/someApp/other",
Labels: map[string]string{"consul": ""},
}

// then
assert.Equal(t, "someApp.other", app.ConsulServiceName())
}

func TestConsulServiceName_CustomName(t *testing.T) {
t.Parallel()

// when
app := &App{
ID: "/someApp/other",
Labels: map[string]string{"consul": "otherName"},
}

// then
assert.Equal(t, "otherName", app.ConsulServiceName())
}

func TestConsulServiceName_CustomNameEscapingNotAllowedChars(t *testing.T) {
t.Parallel()

// when
app := &App{
ID: "/someApp/other",
Labels: map[string]string{"consul": "/otherName/something"},
}

// then
assert.Equal(t, "otherName.something", app.ConsulServiceName())
}
7 changes: 4 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,12 @@ func (config *Config) parseFlags() {
flag.DurationVar(&config.Consul.Timeout, "consul-timeout", 3*time.Second, "Time limit for requests made by the Consul HTTP client. A Timeout of zero means no timeout")
flag.Uint32Var(&config.Consul.AgentFailuresTolerance, "consul-max-agent-failures", 3, "Max number of consecutive request failures for agent before removal from cache")
flag.Uint32Var(&config.Consul.RequestRetries, "consul-get-services-retry", 3, "Number of retries on failure when performing requests to Consul. Each retry uses different cached agent")
flag.StringVar(&config.Consul.ConsulNameSeparator, "consul-name-separator", ".", "Separator used to create default service name for Consul")

// Web
flag.StringVar(&config.Web.Listen, "listen", ":4000", "Accept connections at this address")
flag.IntVar(&config.Web.QueueSize, "events-queue-size", 1000, "Size of events queue")
flag.IntVar(&config.Web.WorkersCount, "workers-pool-size", 10, "Number of concurrent workers processing events")
flag.StringVar(&config.Web.Listen, "listen", ":4000", "accept connections at this address")
flag.IntVar(&config.Web.QueueSize, "events-queue-size", 1000, "size of events queue")
flag.IntVar(&config.Web.WorkersCount, "workers-pool-size", 10, "number of concurrent workers processing events")

// Sync
flag.BoolVar(&config.Sync.Enabled, "sync-enabled", true, "Enable Marathon-consul scheduled sync")
Expand Down
1 change: 1 addition & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func TestConfig_ShouldBeMergedWithFileDefaultsAndFlags(t *testing.T) {
Timeout: 3 * time.Second,
RequestRetries: 5,
AgentFailuresTolerance: 3,
ConsulNameSeparator: ".",
},
Web: web.Config{
Listen: ":4000",
Expand Down
1 change: 1 addition & 0 deletions consul/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type ConsulConfig struct {
Timeout time.Duration
RequestRetries uint32
AgentFailuresTolerance uint32
ConsulNameSeparator string
}

type Auth struct {
Expand Down
7 changes: 6 additions & 1 deletion consul/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type ConsulServices interface {
Register(task *apps.Task, app *apps.App) error
Deregister(serviceId apps.TaskId, agentAddress string) error
GetAgent(agentAddress string) (*consulapi.Client, error)
ServiceName(app *apps.App) string
}

type Consul struct {
Expand Down Expand Up @@ -191,6 +192,10 @@ func (c *Consul) GetAgent(agentAddress string) (*consulapi.Client, error) {
return c.agents.GetAgent(agentAddress)
}

func (c *Consul) ServiceName(app *apps.App) string {
return strings.Replace(strings.Trim(strings.TrimSpace(app.ConsulName()), "/"), "/", c.config.ConsulNameSeparator, -1)
}

func (c *Consul) marathonTaskToConsulService(task *apps.Task, app *apps.App) (*consulapi.AgentServiceRegistration, error) {
IP, err := utils.HostToIPv4(task.Host)
if err != nil {
Expand All @@ -200,7 +205,7 @@ func (c *Consul) marathonTaskToConsulService(task *apps.Task, app *apps.App) (*c

return &consulapi.AgentServiceRegistration{
ID: task.ID.String(),
Name: app.ConsulServiceName(),
Name: c.ServiceName(app),
Port: task.Ports[0],
Address: serviceAddress,
Tags: c.marathonLabelsToConsulTags(app.Labels),
Expand Down
6 changes: 5 additions & 1 deletion consul/consul_stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func NewConsulStubWithTag(tag string) *ConsulStub {
services: make(map[apps.TaskId]*consulapi.AgentServiceRegistration),
ErrorServices: make(map[apps.TaskId]error),
ErrorGetServices: make(map[string]error),
consul: New(ConsulConfig{Tag: tag}),
consul: New(ConsulConfig{Tag: tag, ConsulNameSeparator: "."}),
}
}

Expand Down Expand Up @@ -73,6 +73,10 @@ func (c *ConsulStub) Register(task *apps.Task, app *apps.App) error {
}
}

func (c *ConsulStub) ServiceName(app *apps.App) string {
return c.consul.ServiceName(app)
}

func (c *ConsulStub) Deregister(serviceId apps.TaskId, agent string) error {
if err, ok := c.ErrorServices[serviceId]; ok {
return err
Expand Down
67 changes: 67 additions & 0 deletions consul/consul_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,3 +646,70 @@ func TestMarathonTaskToConsulServiceMapping_NotResolvableTaskHost(t *testing.T)
// then
assert.Error(t, err)
}

func TestServiceName_WithSerarator(t *testing.T) {
t.Parallel()

// given
app := &apps.App{
ID: "/rootGroup/subGroup/subSubGroup/name",
}
consul := New(ConsulConfig{ConsulNameSeparator: "."})

// when
serviceName := consul.ServiceName(app)

// then
assert.Equal(t, "rootGroup.subGroup.subSubGroup.name", serviceName)
}

func TestServiceName_WithEmptyConsulLabel(t *testing.T) {
t.Parallel()

// given
app := &apps.App{
ID: "/rootGroup/subGroup/subSubGroup/name",
Labels: map[string]string{"consul": ""},
}
consul := New(ConsulConfig{ConsulNameSeparator: "-"})

// when
serviceName := consul.ServiceName(app)

// then
assert.Equal(t, "rootGroup-subGroup-subSubGroup-name", serviceName)
}

func TestServiceName_WithConsulLabelSetToTrue(t *testing.T) {
t.Parallel()

// given
app := &apps.App{
ID: "/rootGroup/subGroup/subSubGroup/name",
Labels: map[string]string{"consul": "true"},
}
consul := New(ConsulConfig{ConsulNameSeparator: "-"})

// when
serviceName := consul.ServiceName(app)

// then
assert.Equal(t, "rootGroup-subGroup-subSubGroup-name", serviceName)
}

func TestServiceName_WithCustomConsulLabelEscapingChars(t *testing.T) {
t.Parallel()

// given
app := &apps.App{
ID: "/rootGroup/subGroup/subSubGroup/name",
Labels: map[string]string{"consul": "/some-other/name"},
}
consul := New(ConsulConfig{ConsulNameSeparator: "-"})

// when
serviceName := consul.ServiceName(app)

// then
assert.Equal(t, "some-other-name", serviceName)
}
5 changes: 3 additions & 2 deletions consul/consul_test_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ func ConsulClientAtServer(server *testutil.TestServer) *Consul {

func consulClientAtAddress(host string, port int) *Consul {
config := ConsulConfig{
Timeout: 10 * time.Millisecond,
Port: fmt.Sprintf("%d", port),
Timeout: 10 * time.Millisecond,
Port: fmt.Sprintf("%d", port),
ConsulNameSeparator: ".",
}
consul := New(config)
// initialize the agents cache with a single client pointing at provided location
Expand Down
1 change: 1 addition & 0 deletions debian/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"Username": "",
"Password": ""
},
"ConsulNameSeparator": ".",
"Port": "8500",
"SslEnabled": false,
"SslVerify": true,
Expand Down
2 changes: 1 addition & 1 deletion events/deployment_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (d *DeploymentEvent) RenamedConsulApps() []*apps.App {
targetMap := d.appsMap(target)
for id, originalApp := range originalMap {
targetApp, ok := targetMap[id]
if !ok || originalApp.ConsulServiceName() != targetApp.ConsulServiceName() {
if !ok || originalApp.ConsulName() != targetApp.ConsulName() {
renamedApps = append(renamedApps, originalApp)
}
}
Expand Down
4 changes: 4 additions & 0 deletions sync/error_consul_stub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ func (c errorConsul) Deregister(serviceId apps.TaskId, agent string) error {
return fmt.Errorf("Error occured")
}

func (c errorConsul) ServiceName(app *apps.App) string {
return ""
}

func (c errorConsul) GetAgent(agent string) (*consulapi.Client, error) {
return nil, fmt.Errorf("Error occured")
}
4 changes: 4 additions & 0 deletions sync/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ func (c *ConsulServicesMock) RegistrationsCount(instanceId string) int {
return c.registrations[instanceId]
}

func (c *ConsulServicesMock) ServiceName(app *apps.App) string {
return ""
}

func (c *ConsulServicesMock) Deregister(serviceId apps.TaskId, agent string) error {
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion web/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func (fh *eventHandler) handleDeploymentStepSuccess(body []byte) error {
func (fh *eventHandler) deregisterAllAppServices(app *apps.App) []error {

errors := []error{}
serviceName := app.ConsulServiceName()
serviceName := fh.service.ServiceName(app)

log.WithField("AppId", app.ID).WithField("ServiceName", serviceName).Info("Deregistering all services")

Expand Down

0 comments on commit 6175714

Please sign in to comment.