Skip to content

Commit

Permalink
service-port placeholder, marathon tag
Browse files Browse the repository at this point in the history
  • Loading branch information
Oskar Jagodzinski committed Aug 28, 2019
1 parent 7c10503 commit 0e0f9bc
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 10 deletions.
7 changes: 6 additions & 1 deletion mesos/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@ type label struct {
Value string
}

type labels struct {
Labels []label
}

type port struct {
Number int
Protocol string
Labels []label
Name string
Labels labels
}

type ports struct {
Expand Down
52 changes: 45 additions & 7 deletions mesos/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@ import (
"errors"
"fmt"
"os"
"strings"

"github.com/allegro/consul-registration-hook/consul"
)

const consulLabelKey = "consul"
const (
consulLabelKey = "consul"
consulTagValue = "tag"
portPlaceholder = "{port:%s}"
)

// ServiceProvider is responsible for providing services that should be registered
// in Consul discovery service.
Expand Down Expand Up @@ -41,24 +46,30 @@ func (p *ServiceProvider) buildServices(t task) ([]consul.ServiceInstance, error
}

var services []consul.ServiceInstance
var tags []string
var globalTags []string

for _, label := range t.Labels {
if label.Value == "tag" {
tags = append(tags, label.Key)
if label.Value == consulTagValue {
globalTags = append(globalTags, label.Key)
}
}

marathonTaskTag := fmt.Sprintf("marathon-task:%s", t.ID)
globalTags = append(globalTags, marathonTaskTag)
tagPlaceholders := getPlaceholders(t.Discovery.Ports.Ports)

// TODO(medzin): add check conversion after MESOS-8780 is completed
// See: https://issues.apache.org/jira/browse/MESOS-8780
for _, port := range t.Discovery.Ports.Ports {
if consulServiceName := p.getConsulServiceName(port.Labels); consulServiceName != "" {
if consulServiceName := p.getConsulServiceName(port.Labels.Labels); consulServiceName != "" {
portTags := p.getPortLabels(port.Labels.Labels, tagPlaceholders)

service := consul.ServiceInstance{
ID: fmt.Sprintf("%s_%d", hostname, port.Number),
Name: consulServiceName,
Host: hostname,
Port: port.Number,
Tags: tags,
Tags: append(portTags, globalTags...),
}
services = append(services, service)
}
Expand All @@ -72,7 +83,7 @@ func (p *ServiceProvider) buildServices(t task) ([]consul.ServiceInstance, error
Name: consulServiceName,
Host: hostname,
Port: port,
Tags: tags,
Tags: globalTags,
}
services = append(services, service)
}
Expand All @@ -81,6 +92,21 @@ func (p *ServiceProvider) buildServices(t task) ([]consul.ServiceInstance, error
return services, nil
}

func (p *ServiceProvider) getPortLabels(labels []label, tagPlaceholders map[string]string) []string {
var portLabels []string
for _, label := range labels {
if label.Value == consulTagValue {
for placeholder, replacement := range tagPlaceholders {
if strings.Contains(label.Key, placeholder) {
label.Key = strings.Replace(label.Key, placeholder, replacement, -1)
}
}
portLabels = append(portLabels, label.Key)
}
}
return portLabels
}

func (p *ServiceProvider) client() agentClient {
if p.agentClient != nil {
return p.agentClient
Expand Down Expand Up @@ -134,3 +160,15 @@ func (p *ServiceProvider) getTaskFromState(state state) (task, error) {

return task{}, errors.New("no task in executor")
}

func getPlaceholders(ports []port) map[string]string {
placeholders := map[string]string{}
for _, port := range ports {
name := port.Name
if name != "" {
placeholder := fmt.Sprintf(portPlaceholder, name)
placeholders[placeholder] = fmt.Sprint(port.Number)
}
}
return placeholders
}
92 changes: 90 additions & 2 deletions mesos/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestIfReturnsServicesToRegisterBasedOnPortLabels(t *testing.T) {
Labels: []label{label{Key: "consul", Value: "invalid-name"}},
Discovery: discovery{Ports: ports{Ports: []port{port{
Number: 1234,
Labels: []label{label{Key: "consul", Value: "valid-name"}},
Labels: labels{Labels: []label{label{Key: "consul", Value: "valid-name"}}},
}}}},
}},
}},
Expand All @@ -85,6 +85,93 @@ func TestIfReturnsServicesToRegisterBasedOnPortLabels(t *testing.T) {
assert.Equal(t, 1234, serviceInstances[0].Port)
}

func TestIfReturnsServicesPortPlaceholders(t *testing.T) {
os.Setenv("MESOS_EXECUTOR_ID", "executor_id")
os.Setenv("MESOS_FRAMEWORK_ID", "framework_id")
os.Setenv("HOST", "hostname")
defer os.Unsetenv("MESOS_EXECUTOR_ID")
defer os.Unsetenv("MESOS_FRAMEWORK_ID")
defer os.Unsetenv("HOST")

s := state{Frameworks: []framework{framework{
ID: "framework_id",
Executors: []executor{executor{
ID: "executor_id",
Tasks: []task{task{
Labels: []label{label{Key: "consul", Value: "invalid-name"}},
Discovery: discovery{Ports: ports{Ports: []port{port{
Number: 1234,
Labels: labels{Labels: []label{label{Key: "consul", Value: "valid-name"}, label{Key: "service-port:{port:service}", Value: "tag"}}},
}, port{
Number: 4556,
Name: "service",
}}}},
}},
}},
}}}

agentClient := &mockAgentClient{}
agentClient.On("state").Return(s, nil)

serviceProvider := ServiceProvider{
agentClient: agentClient,
}

serviceInstances, err := serviceProvider.Get(context.Background())

require.NoError(t, err)
require.NotEmpty(t, serviceInstances)
assert.Equal(t, "hostname_1234", serviceInstances[0].ID)
assert.Equal(t, "valid-name", serviceInstances[0].Name)
assert.Equal(t, "hostname", serviceInstances[0].Host)
assert.Equal(t, 1234, serviceInstances[0].Port)
assert.Contains(t, serviceInstances[0].Tags, "service-port:4557")
}

func TestIfReturnsServicesMarthonTaskTag(t *testing.T) {
os.Setenv("MESOS_EXECUTOR_ID", "executor_id")
os.Setenv("MESOS_FRAMEWORK_ID", "framework_id")
os.Setenv("HOST", "hostname")
defer os.Unsetenv("MESOS_EXECUTOR_ID")
defer os.Unsetenv("MESOS_FRAMEWORK_ID")
defer os.Unsetenv("HOST")

s := state{Frameworks: []framework{framework{
ID: "framework_id",
Executors: []executor{executor{
ID: "executor_id",
Tasks: []task{task{
ID: "executor_id_inside_task",
Labels: []label{label{Key: "consul", Value: "invalid-name"}},
Discovery: discovery{Ports: ports{Ports: []port{port{
Number: 1234,
Labels: labels{Labels: []label{label{Key: "consul", Value: "valid-name"}, label{Key: "service-port:{port:service}", Value: "tag"}}},
}, port{
Number: 4556,
Name: "service",
}}}},
}},
}},
}}}

agentClient := &mockAgentClient{}
agentClient.On("state").Return(s, nil)

serviceProvider := ServiceProvider{
agentClient: agentClient,
}

serviceInstances, err := serviceProvider.Get(context.Background())

require.NoError(t, err)
require.NotEmpty(t, serviceInstances)
assert.Equal(t, "hostname_1234", serviceInstances[0].ID)
assert.Equal(t, "valid-name", serviceInstances[0].Name)
assert.Equal(t, "hostname", serviceInstances[0].Host)
assert.Equal(t, 1234, serviceInstances[0].Port)
assert.Contains(t, serviceInstances[0].Tags, "marathon-task:executor_id_inside_task")
}

func TestIfNotPanicsWithEmptyPorts(t *testing.T) {
os.Setenv("MESOS_EXECUTOR_ID", "executor_id")
os.Setenv("MESOS_FRAMEWORK_ID", "framework_id")
Expand Down Expand Up @@ -161,6 +248,7 @@ func TestIfConvertsMesosLabelsToConsulTags(t *testing.T) {
Executors: []executor{executor{
ID: "executor_id",
Tasks: []task{task{
ID: "executor_id_inside_task",
Labels: []label{
label{Key: "consul", Value: "name"},
label{Key: "tag1", Value: "tag"},
Expand All @@ -182,7 +270,7 @@ func TestIfConvertsMesosLabelsToConsulTags(t *testing.T) {

require.NoError(t, err)
require.NotEmpty(t, serviceInstances)
assert.Equal(t, []string{"tag1", "tag2"}, serviceInstances[0].Tags)
assert.Equal(t, []string{"tag1", "tag2", "marathon-task:executor_id_inside_task"}, serviceInstances[0].Tags)
}

type mockAgentClient struct {
Expand Down

0 comments on commit 0e0f9bc

Please sign in to comment.