Skip to content

Commit

Permalink
custom service name
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniel Krawczyk committed Jan 19, 2016
1 parent 04d84c6 commit 2e0e5ac
Show file tree
Hide file tree
Showing 26 changed files with 853 additions and 455 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ make test
## Usage

- Consul Agents should be available at every Mesos Slave, tasks will be registered at hosts their run on.
- Only tasks which are labeled as `consul:true` will be registered in Consul.
- Only tasks which are labeled as `consul` will be registered in Consul. By default the registered service name is equal to Marathon's application name.
A different name can be provided as the label's value, e.g. `consul:customName`. As an exception of the rule, for backward compatibility with the `0.3.x` branch, a value of `true` is resolved to the default name.
- Only services with tag specified by `consul-tag` property will be maintained. This tag is automatically added during registration. **Important**: it should be unique for every Marathon cluster connected to Consul.
- At least one HTTP healthcheck should be defined for a task. The task is registered when Marathon marks it's as alive.
- Provided HTTP healtcheck will be transferred to Consul.
Expand Down
33 changes: 28 additions & 5 deletions apps/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package apps

import (
"encoding/json"
"github.com/allegro/marathon-consul/tasks"
"strings"
)

type HealthCheck struct {
Expand All @@ -26,13 +26,36 @@ type AppsResponse struct {
type App struct {
Labels map[string]string `json:"labels"`
HealthChecks []HealthCheck `json:"healthChecks"`
ID tasks.AppId `json:"id"`
Tasks []tasks.Task `json:"tasks"`
ID AppId `json:"id"`
Tasks []Task `json:"tasks"`
}

// Marathon Application Id (aka PathId)
// Usually in the form of /rootGroup/subGroup/subSubGroup/name
// allowed characters: lowercase letters, digits, hyphens, slash
type AppId string

func (id AppId) String() string {
return string(id)
}

func (id AppId) ConsulServiceName() string {
return strings.Replace(strings.Trim(strings.TrimSpace(id.String()), "/"), "/", ".", -1)
}

func (app *App) IsConsulApp() bool {
value, ok := app.Labels["consul"]
return ok && value == "true"
_, ok := app.Labels["consul"]
return ok
}

func (app *App) ConsulServiceName() string {
if value, ok := app.Labels["consul"]; ok && value != "true" {
value = AppId(value).ConsulServiceName()
if value != "" {
return value
}
}
return app.ID.ConsulServiceName()
}

func ParseApps(jsonBlob []byte) ([]*App, error) {
Expand Down
91 changes: 80 additions & 11 deletions apps/app_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package apps

import (
"github.com/allegro/marathon-consul/tasks"
"github.com/stretchr/testify/assert"
"io/ioutil"
"testing"
Expand All @@ -26,15 +25,15 @@ func TestParseApps(t *testing.T) {
},
},
ID: "/bridged-webapp",
Tasks: []tasks.Task{
tasks.Task{
Tasks: []Task{
Task{
ID: "test.47de43bd-1a81-11e5-bdb6-e6cb6734eaf8",
AppID: "/test",
Host: "192.168.2.114",
Ports: []int{31315},
HealthCheckResults: []tasks.HealthCheckResult{tasks.HealthCheckResult{Alive: true}},
HealthCheckResults: []HealthCheckResult{HealthCheckResult{Alive: true}},
},
tasks.Task{
Task{
ID: "test.4453212c-1a81-11e5-bdb6-e6cb6734eaf8",
AppID: "/test",
Host: "192.168.2.114",
Expand Down Expand Up @@ -63,24 +62,24 @@ func TestParseApp(t *testing.T) {
TimeoutSeconds: 10,
MaxConsecutiveFailures: 3}},
ID: "/myapp",
Tasks: []tasks.Task{tasks.Task{
Tasks: []Task{Task{
ID: "myapp.cc49ccc1-9812-11e5-a06e-56847afe9799",
AppID: "/myapp",
Host: "10.141.141.10",
Ports: []int{31678,
31679,
31680,
31681},
HealthCheckResults: []tasks.HealthCheckResult{tasks.HealthCheckResult{Alive: true}}},
tasks.Task{
HealthCheckResults: []HealthCheckResult{HealthCheckResult{Alive: true}}},
Task{
ID: "myapp.c8b449f0-9812-11e5-a06e-56847afe9799",
AppID: "/myapp",
Host: "10.141.141.10",
Ports: []int{31307,
31308,
31309,
31310},
HealthCheckResults: []tasks.HealthCheckResult{tasks.HealthCheckResult{Alive: true}}}}}
HealthCheckResults: []HealthCheckResult{HealthCheckResult{Alive: true}}}}}

app, err := ParseApp(appBlob)
assert.NoError(t, err)
Expand All @@ -100,11 +99,11 @@ func TestConsulApp(t *testing.T) {

// when
app = &App{
Labels: map[string]string{"consul": "false", "marathon": "true"},
Labels: map[string]string{"consul": "someName", "marathon": "true"},
}

// then
assert.False(t, app.IsConsulApp())
assert.True(t, app.IsConsulApp())

// when
app = &App{
Expand All @@ -114,3 +113,73 @@ func TestConsulApp(t *testing.T) {
// then
assert.False(t, app.IsConsulApp())
}

func TestAppId_String(t *testing.T) {
t.Parallel()
assert.Equal(t, "appId", AppId("appId").String())
}

func TestAppId_ConsulServiceName(t *testing.T) {
t.Parallel()

// given
id := AppId("/rootGroup/subGroup/subSubGroup/name")

// when
serviceName := id.ConsulServiceName()

// then
assert.Equal(t, "rootGroup.subGroup.subSubGroup.name", serviceName)
}

func TestConsulServiceName_BackwardCompatibilityForConsulTrue(t *testing.T) {
t.Parallel()

// when
app := &App{
ID: "/someApp",
Labels: map[string]string{"consul": "true"},
}

// then
assert.Equal(t, "someApp", app.ConsulServiceName())
}

func TestConsulServiceName_EmptyConsulLabelValue(t *testing.T) {
t.Parallel()

// when
app := &App{
ID: "/someApp/other",
Labels: map[string]string{"consul": ""},
}

// then
assert.Equal(t, "someApp.other", app.ConsulServiceName())
}

func TestConsulServiceName_CustomName(t *testing.T) {
t.Parallel()

// when
app := &App{
ID: "/someApp/other",
Labels: map[string]string{"consul": "otherName"},
}

// then
assert.Equal(t, "otherName", app.ConsulServiceName())
}

func TestConsulServiceName_CustomNameEscapingNotAllowedChars(t *testing.T) {
t.Parallel()

// when
app := &App{
ID: "/someApp/other",
Labels: map[string]string{"consul": "/otherName/something"},
}

// then
assert.Equal(t, "otherName.something", app.ConsulServiceName())
}
12 changes: 10 additions & 2 deletions tasks/task.go → apps/task.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,26 @@
package tasks
package apps

import (
"encoding/json"
)

type Task struct {
ID Id `json:"id"`
ID TaskId `json:"id"`
TaskStatus string `json:"taskStatus"`
AppID AppId `json:"appId"`
Host string `json:"host"`
Ports []int `json:"ports"`
HealthCheckResults []HealthCheckResult `json:"healthCheckResults"`
}

// Marathon Task Id
// Usually in the form of AppId.uuid with '/' replaced with '_'
type TaskId string

func (id TaskId) String() string {
return string(id)
}

type HealthCheckResult struct {
Alive bool `json:"alive"`
}
Expand Down
7 changes: 6 additions & 1 deletion tasks/task_test.go → apps/task_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package tasks
package apps

import (
"encoding/json"
Expand Down Expand Up @@ -102,3 +102,8 @@ func TestIsHealthy(t *testing.T) {
// then
assert.True(t, task.IsHealthy())
}

func TestId_String(t *testing.T) {
t.Parallel()
assert.Equal(t, "id", TaskId("id").String())
}
File renamed without changes.
29 changes: 14 additions & 15 deletions consul/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,16 @@ import (
log "github.com/Sirupsen/logrus"
"github.com/allegro/marathon-consul/apps"
"github.com/allegro/marathon-consul/metrics"
"github.com/allegro/marathon-consul/tasks"
consulapi "github.com/hashicorp/consul/api"
"net/url"
"strconv"
)

type ConsulServices interface {
GetAllServices() ([]*consulapi.CatalogService, error)
GetServices(name tasks.AppId) ([]*consulapi.CatalogService, error)
Register(task *tasks.Task, app *apps.App) error
Deregister(serviceId tasks.Id, agentAddress string) error
GetServices(name string) ([]*consulapi.CatalogService, error)
Register(task *apps.Task, app *apps.App) error
Deregister(serviceId apps.TaskId, agentAddress string) error
GetAgent(agentAddress string) (*consulapi.Client, error)
}

Expand All @@ -31,7 +30,7 @@ func New(config ConsulConfig) *Consul {
}
}

func (c *Consul) GetServices(name tasks.AppId) ([]*consulapi.CatalogService, error) {
func (c *Consul) GetServices(name string) ([]*consulapi.CatalogService, error) {
agent, err := c.agents.GetAnyAgent()
if err != nil {
return nil, err
Expand All @@ -46,7 +45,7 @@ func (c *Consul) GetServices(name tasks.AppId) ([]*consulapi.CatalogService, err
dcAwareQuery := &consulapi.QueryOptions{
Datacenter: dc,
}
services, _, err := agent.Catalog().Service(name.ConsulServiceName(), c.config.Tag, dcAwareQuery)
services, _, err := agent.Catalog().Service(name, c.config.Tag, dcAwareQuery)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -96,9 +95,9 @@ func contains(slice []string, search string) bool {
return false
}

func (c *Consul) Register(task *tasks.Task, app *apps.App) error {
func (c *Consul) Register(task *apps.Task, app *apps.App) error {
var err error
service := c.marathonTaskToConsulService(task, app.HealthChecks, app.Labels)
service := c.marathonTaskToConsulService(task, app)
metrics.Time("consul.register", func() { err = c.register(service) })
if err != nil {
metrics.Mark("consul.register.error")
Expand Down Expand Up @@ -129,7 +128,7 @@ func (c *Consul) register(service *consulapi.AgentServiceRegistration) error {
return err
}

func (c *Consul) Deregister(serviceId tasks.Id, agentAddress string) error {
func (c *Consul) Deregister(serviceId apps.TaskId, agentAddress string) error {
var err error
metrics.Time("consul.deregister", func() { err = c.deregister(serviceId, agentAddress) })
if err != nil {
Expand All @@ -140,7 +139,7 @@ func (c *Consul) Deregister(serviceId tasks.Id, agentAddress string) error {
return err
}

func (c *Consul) deregister(serviceId tasks.Id, agentAddress string) error {
func (c *Consul) deregister(serviceId apps.TaskId, agentAddress string) error {
agent, err := c.agents.GetAgent(agentAddress)
if err != nil {
return err
Expand All @@ -159,18 +158,18 @@ func (c *Consul) GetAgent(agentAddress string) (*consulapi.Client, error) {
return c.agents.GetAgent(agentAddress)
}

func (c *Consul) marathonTaskToConsulService(task *tasks.Task, healthChecks []apps.HealthCheck, labels map[string]string) *consulapi.AgentServiceRegistration {
func (c *Consul) marathonTaskToConsulService(task *apps.Task, app *apps.App) *consulapi.AgentServiceRegistration {
return &consulapi.AgentServiceRegistration{
ID: task.ID.String(),
Name: task.AppID.ConsulServiceName(),
Name: app.ConsulServiceName(),
Port: task.Ports[0],
Address: task.Host,
Tags: c.marathonLabelsToConsulTags(labels),
Check: c.marathonToConsulCheck(task, healthChecks),
Tags: c.marathonLabelsToConsulTags(app.Labels),
Check: c.marathonToConsulCheck(task, app.HealthChecks),
}
}

func (c *Consul) marathonToConsulCheck(task *tasks.Task, healthChecks []apps.HealthCheck) *consulapi.AgentServiceCheck {
func (c *Consul) marathonToConsulCheck(task *apps.Task, healthChecks []apps.HealthCheck) *consulapi.AgentServiceCheck {
// TODO: Handle all types of checks
for _, check := range healthChecks {
if check.Protocol == "HTTP" {
Expand Down

0 comments on commit 2e0e5ac

Please sign in to comment.