Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configurable Consul tag name #30

Merged
merged 1 commit into from
Jan 15, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,12 @@ make test

## Usage

- Consul Agents should be available at every Mesos Slave, tasks will be registered at host their run on.
- Consul Agents should be available at every Mesos Slave, tasks will be registered at hosts their run on.
- Only tasks which are labeled as `consul:true` will be registered in Consul.
- Only services with tag `marathon` will be maintained. This tag is automatically added during registration.
- Only services with tag specified by `consul-tag` property will be maintained. This tag is automatically added during registration. **Important**: it should be unique for every Marathon cluster connected to Consul.
- At least one HTTP healthcheck should be defined for a task. The task is registered when Marathon marks it's as alive.
- Provided HTTP healtcheck will be transferred to Consul.
- Labels with `tag` value will be converted to Consul tags, `marathon` tag is added by default
(e.g, `labels: ["public":"tag", "varnish":"tag", "env": "test"]` → `tags: ["public", "varnish", "marathon"]`).
- Labels with `tag` value will be converted to Consul tags, e.g. (note: `consul-tag` is set to `marathon`) `labels: ["public":"tag", "varnish":"tag", "env": "test"]` → `tags: ["public", "varnish", "marathon"]`.
- The scheduled Marathon-consul sync may run in two modes:
- Only on node that is the current [Marathon-leader](https://mesosphere.github.io/marathon/docs/rest-api.html#get-v2-leader), `sync-leader` parameter should be set to `hostname:port` the current node appears in the Marathon cluster.
This mode is **enabled by default** and the `sync-leader` property is set to the hostname resolved by OS.
Expand All @@ -69,6 +68,7 @@ consul-ssl-ca-cert | | Path to a CA certificate file,
consul-ssl-cert | | Path to an SSL client certificate to use to authenticate to the Consul server
consul-ssl-verify | `true` | Verify certificates when connecting via SSL
consul-token | | The Consul ACL token
consul-tag | `marathon` | Common tag name added to every service registered in Consul, should be unique for every Marathon-cluster connected to Consul
listen | `:4000` | Accept connections at this address
log-level | `info` | Log level: panic, fatal, error, warn, info, or debug
log-format | `text` | Log format: JSON, text
Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func (config *Config) parseFlags() {
flag.StringVar(&config.Consul.SslCert, "consul-ssl-cert", "", "Path to an SSL client certificate to use to authenticate to the Consul server")
flag.StringVar(&config.Consul.SslCaCert, "consul-ssl-ca-cert", "", "Path to a CA certificate file, containing one or more CA certificates to use to validate the certificate sent by the Consul server to us")
flag.StringVar(&config.Consul.Token, "consul-token", "", "The Consul ACL token")
flag.StringVar(&config.Consul.Tag, "consul-tag", "marathon", "Common tag name added to every service registered in Consul, should be unique for every Marathon-cluster connected to Consul")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add this field in sample config?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

debian/config.json? it's added already


// Web
flag.StringVar(&config.Web.Listen, "listen", ":4000", "accept connections at this address")
Expand Down
12 changes: 9 additions & 3 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,15 @@ func TestConfig_ShouldBeMergedWithFileDefaultsAndFlags(t *testing.T) {
SslVerify: true,
SslCert: "",
SslCaCert: "",
Token: ""},
Web: struct{ Listen string }{Listen: ":4000"},
Sync: sync.Config{Interval: 15 * time.Minute, Enabled: true},
Token: "",
Tag: "marathon"},
Web: struct{ Listen string }{Listen: ":4000"},
Sync: sync.Config{
Interval: 15 * time.Minute,
Enabled: true,
Leader: "",
Force: false,
},
Marathon: marathon.Config{Location: "localhost:8080",
Protocol: "http",
Username: "",
Expand Down
1 change: 1 addition & 0 deletions consul/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ type ConsulConfig struct {
SslCert string
SslCaCert string
Token string
Tag string
}

type Auth struct {
Expand Down
56 changes: 51 additions & 5 deletions consul/consul.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,33 @@
package consul

import (
"fmt"
log "github.com/Sirupsen/logrus"
"github.com/allegro/marathon-consul/apps"
"github.com/allegro/marathon-consul/metrics"
"github.com/allegro/marathon-consul/tasks"
consulapi "github.com/hashicorp/consul/api"
"net/url"
"strconv"
)

type ConsulServices interface {
GetAllServices() ([]*consulapi.CatalogService, error)
GetServices(name tasks.AppId) ([]*consulapi.CatalogService, error)
Register(service *consulapi.AgentServiceRegistration) error
Register(task *tasks.Task, app *apps.App) error
Deregister(serviceId tasks.Id, agentAddress string) error
GetAgent(agentAddress string) (*consulapi.Client, error)
}

type Consul struct {
agents Agents
config ConsulConfig
}

func New(config ConsulConfig) *Consul {
return &Consul{
agents: NewAgents(&config),
config: config,
}
}

Expand All @@ -40,7 +46,7 @@ func (c *Consul) GetServices(name tasks.AppId) ([]*consulapi.CatalogService, err
dcAwareQuery := &consulapi.QueryOptions{
Datacenter: dc,
}
services, _, err := agent.Catalog().Service(name.ConsulServiceName(), "marathon", dcAwareQuery)
services, _, err := agent.Catalog().Service(name.ConsulServiceName(), c.config.Tag, dcAwareQuery)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -69,8 +75,8 @@ func (c *Consul) GetAllServices() ([]*consulapi.CatalogService, error) {
return nil, err
}
for service, tags := range services {
if contains(tags, "marathon") {
serviceInstances, _, err := agent.Catalog().Service(service, "marathon", dcAwareQuery)
if contains(tags, c.config.Tag) {
serviceInstances, _, err := agent.Catalog().Service(service, c.config.Tag, dcAwareQuery)
if err != nil {
return nil, err
}
Expand All @@ -90,8 +96,9 @@ func contains(slice []string, search string) bool {
return false
}

func (c *Consul) Register(service *consulapi.AgentServiceRegistration) error {
func (c *Consul) Register(task *tasks.Task, app *apps.App) error {
var err error
service := c.marathonTaskToConsulService(task, app.HealthChecks, app.Labels)
metrics.Time("consul.register", func() { err = c.register(service) })
if err != nil {
metrics.Mark("consul.register.error")
Expand Down Expand Up @@ -151,3 +158,42 @@ func (c *Consul) deregister(serviceId tasks.Id, agentAddress string) error {
func (c *Consul) GetAgent(agentAddress string) (*consulapi.Client, error) {
return c.agents.GetAgent(agentAddress)
}

func (c *Consul) marathonTaskToConsulService(task *tasks.Task, healthChecks []apps.HealthCheck, labels map[string]string) *consulapi.AgentServiceRegistration {
return &consulapi.AgentServiceRegistration{
ID: task.ID.String(),
Name: task.AppID.ConsulServiceName(),
Port: task.Ports[0],
Address: task.Host,
Tags: c.marathonLabelsToConsulTags(labels),
Check: c.marathonToConsulCheck(task, healthChecks),
}
}

func (c *Consul) marathonToConsulCheck(task *tasks.Task, healthChecks []apps.HealthCheck) *consulapi.AgentServiceCheck {
// TODO: Handle all types of checks
for _, check := range healthChecks {
if check.Protocol == "HTTP" {
return &consulapi.AgentServiceCheck{
HTTP: (&url.URL{
Scheme: "http",
Host: task.Host + ":" + strconv.Itoa(task.Ports[check.PortIndex]),
Path: check.Path,
}).String(),
Interval: fmt.Sprintf("%ds", check.IntervalSeconds),
Timeout: fmt.Sprintf("%ds", check.TimeoutSeconds),
}
}
}
return nil
}

func (c *Consul) marathonLabelsToConsulTags(labels map[string]string) []string {
tags := []string{c.config.Tag}
for key, value := range labels {
if value == "tag" {
tags = append(tags, key)
}
}
return tags
}
16 changes: 11 additions & 5 deletions consul/consul_stub.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,26 @@
package consul

import (
"github.com/allegro/marathon-consul/apps"
"github.com/allegro/marathon-consul/tasks"
consulapi "github.com/hashicorp/consul/api"
)

type ConsulStub struct {
services map[tasks.Id]*consulapi.AgentServiceRegistration
ErrorServices map[tasks.Id]error
consul *Consul
}

func NewConsulStub() *ConsulStub {
return NewConsulStubWithTag("marathon")
}

func NewConsulStubWithTag(tag string) *ConsulStub {
return &ConsulStub{
services: make(map[tasks.Id]*consulapi.AgentServiceRegistration),
ErrorServices: make(map[tasks.Id]error),
consul: New(ConsulConfig{Tag: tag}),
}
}

Expand All @@ -35,7 +42,7 @@ func (c ConsulStub) GetAllServices() ([]*consulapi.CatalogService, error) {
func (c ConsulStub) GetServices(name tasks.AppId) ([]*consulapi.CatalogService, error) {
var catalog []*consulapi.CatalogService
for _, s := range c.services {
if s.Name == name.ConsulServiceName() && contains(s.Tags, "marathon") {
if s.Name == name.ConsulServiceName() && contains(s.Tags, c.consul.config.Tag) {
catalog = append(catalog, &consulapi.CatalogService{
Address: s.Address,
ServiceAddress: s.Address,
Expand All @@ -49,12 +56,11 @@ func (c ConsulStub) GetServices(name tasks.AppId) ([]*consulapi.CatalogService,
return catalog, nil
}

func (c *ConsulStub) Register(service *consulapi.AgentServiceRegistration) error {
taskId := tasks.Id(service.ID)
if err, ok := c.ErrorServices[taskId]; ok {
func (c *ConsulStub) Register(task *tasks.Task, app *apps.App) error {
if err, ok := c.ErrorServices[task.ID]; ok {
return err
} else {
c.services[taskId] = service
c.services[task.ID] = c.consul.marathonTaskToConsulService(task, app.HealthChecks, app.Labels)
return nil
}
}
Expand Down
31 changes: 10 additions & 21 deletions consul/consul_stub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package consul

import (
"fmt"
"github.com/allegro/marathon-consul/apps"
"github.com/allegro/marathon-consul/utils"
"github.com/stretchr/testify/assert"
"testing"
Expand All @@ -11,37 +10,27 @@ import (
func TestConsulStub(t *testing.T) {
t.Parallel()
// given
labels := map[string]string{
"consul": "true",
"public": "tag",
}
healthChecks := []apps.HealthCheck{
apps.HealthCheck{
Path: "/",
Protocol: "HTTP",
PortIndex: 0,
IntervalSeconds: 60,
TimeoutSeconds: 20,
MaxConsecutiveFailures: 3,
},
}
consul := NewConsulStub()
app := utils.ConsulApp("test", 3)
stubError := fmt.Errorf("Some error")
services, err := consul.GetAllServices()
assert.NoError(t, err)
testServices, err := consul.GetServices("test")
assert.NoError(t, err)

// then
assert.Empty(t, services)
assert.Empty(t, testServices)
assert.NoError(t, err)

// when
for _, task := range app.Tasks {
consul.Register(MarathonTaskToConsulService(task, app.HealthChecks, app.Labels))
err = consul.Register(&task, app)
assert.NoError(t, err)
}
services, _ = consul.GetAllServices()
testServices, _ = consul.GetServices("test")
services, err = consul.GetAllServices()
assert.NoError(t, err)
testServices, err = consul.GetServices("test")
assert.NoError(t, err)

// then
assert.Len(t, services, 3)
Expand All @@ -68,7 +57,7 @@ func TestConsulStub(t *testing.T) {
assert.Equal(t, stubError, err)

// when
err = consul.Register(MarathonTaskToConsulService(app.Tasks[0], healthChecks, labels))
err = consul.Register(&app.Tasks[0], app)

// then
assert.Equal(t, stubError, err)
Expand All @@ -83,7 +72,7 @@ func TestConsulStub(t *testing.T) {
// when
app = utils.ConsulApp("other", 2)
for _, task := range app.Tasks {
consul.Register(MarathonTaskToConsulService(task, app.HealthChecks, app.Labels))
consul.Register(&task, app)
}
services, _ = consul.GetAllServices()
testServices, _ = consul.GetServices("test")
Expand Down
Loading