Skip to content

Commit

Permalink
Marathon leadership-aware sync
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniel Krawczyk committed Jan 8, 2016
1 parent 6e038eb commit 671465b
Show file tree
Hide file tree
Showing 11 changed files with 228 additions and 30 deletions.
24 changes: 15 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ make test
- Provided HTTP healtcheck will be transferred to Consul.
- Labels with `tag` value will be converted to Consul tags, `marathon` tag is added by default
(e.g, `labels: ["public":"tag", "varnish":"tag", "env": "test"]``tags: ["public", "varnish", "marathon"]`).
- The scheduled Marathon-consul sync may run in two modes:
- Only on node that is the current [Marathon-leader](https://mesosphere.github.io/marathon/docs/rest-api.html#get-v2-leader), `sync-leader` parameter should be set to `hostname:port` the current node appears in the Marathon cluster.
Note that there is a difference between `sync-leader` and `marathon-location`: `sync-leader` is used for node leadership detection (should be set to cluster-wide node name), while `marathon-location` is used for connection purpose (may be set to `localhost`)
- On every node, `sync-force` parameter should be set to `true`

### Options

Expand All @@ -64,18 +68,20 @@ consul-ssl-ca-cert | | Path to a CA certificate file,
consul-ssl-cert | | Path to an SSL client certificate to use to authenticate to the Consul server
consul-ssl-verify | `true` | Verify certificates when connecting via SSL
consul-token | | The Consul ACL token
listen | :4000 | Accept connections at this address
log-level | info | Log level: panic, fatal, error, warn, info, or debug
log-format | text | Log format: JSON, text
marathon-location | localhost:8080 | Marathon URL
listen | `:4000` | Accept connections at this address
log-level | `info` | Log level: panic, fatal, error, warn, info, or debug
log-format | `text` | Log format: JSON, text
marathon-location | `localhost:8080` | Marathon URL
marathon-password | | Marathon password for basic auth
marathon-protocol | http | Marathon protocol (http or https)
marathon-protocol | `http` | Marathon protocol (http or https)
marathon-username | | Marathon username for basic auth
metrics-interval | 30s | Metrics reporting [interval](https://golang.org/pkg/time/#Duration) **Note:** While using file configuration intervals should be provided in *nanoseconds*
metrics-interval | `30s` | Metrics reporting [interval](https://golang.org/pkg/time/#Duration) **Note:** While using file configuration intervals should be provided in *nanoseconds*
metrics-location | | Graphite URL (used when metrics-target is set to graphite)
metrics-prefix | default | Metrics prefix (default is resolved to <hostname>.<app_name>
metrics-target | stdout | Metrics destination stdout or graphite
sync-interval | 15m0s | Marathon-consul sync [interval](https://golang.org/pkg/time/#Duration) **Note:** While using file configuration intervals should be provided in *nanoseconds*
metrics-prefix | `default` | Metrics prefix (resolved to `<hostname>.<app_name>` by default)
metrics-target | `stdout` | Metrics destination stdout or graphite
sync-interval | `15m0s` | Marathon-consul sync [interval](https://golang.org/pkg/time/#Duration) **Note:** While using file configuration intervals should be provided in *nanoseconds*
sync-leader | `<hostname>:8080` | Marathon node name (`hostname:port`), if set the sync will run only if the node is the current [Marathon-leader](https://mesosphere.github.io/marathon/docs/rest-api.html#get-v2-leader)
sync-force | `false` | Force leadership-independent Marathon-consul sync (run always)


### Adding New Root Certificate Authorities
Expand Down
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ func (config *Config) parseFlags() {

// Sync
flag.DurationVar(&config.Sync.Interval, "sync-interval", 15*time.Minute, "Marathon-consul sync interval")
flag.BoolVar(&config.Sync.Force, "sync-force", false, "Force leadership-independent Marathon-consul sync")
flag.StringVar(&config.Sync.Leader, "sync-leader", "", "Marathon instance for which leadership the Marathon-consul sync is enabled")

// Marathon
flag.StringVar(&config.Marathon.Location, "marathon-location", "localhost:8080", "Marathon URL")
Expand Down
4 changes: 2 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ func main() {
log.Fatal(err.Error())
}

sync := sync.New(remote, service)
go sync.StartSyncServicesJob(config.Sync.Interval)
sync := sync.New(config.Sync, remote, service)
go sync.StartSyncServicesJob()

// set up routes
http.HandleFunc("/health", web.HealthHandler)
Expand Down
20 changes: 20 additions & 0 deletions marathon/marathon.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package marathon

import (
"crypto/tls"
"encoding/json"
"fmt"
log "github.com/Sirupsen/logrus"
"github.com/allegro/marathon-consul/apps"
Expand All @@ -18,6 +19,7 @@ type Marathoner interface {
Apps() ([]*apps.App, error)
App(tasks.AppId) (*apps.App, error)
Tasks(tasks.AppId) ([]*tasks.Task, error)
Leader() (string, error)
}

type Marathon struct {
Expand All @@ -27,6 +29,10 @@ type Marathon struct {
transport http.RoundTripper
}

type LeaderResponse struct {
Leader string `json:"leader"`
}

func New(config Config) (*Marathon, error) {
var auth *url.Userinfo
if len(config.Username) == 0 && len(config.Password) == 0 {
Expand Down Expand Up @@ -83,6 +89,20 @@ func (m Marathon) Tasks(app tasks.AppId) ([]*tasks.Task, error) {
return tasks.ParseTasks(body)
}

func (m Marathon) Leader() (string, error) {
log.WithField("Location", m.Location).Debug("Asking Marathon for leader")

body, err := m.get(m.url("/v2/leader"))
if err != nil {
return "", err
}

leaderResponse := &LeaderResponse{}
err = json.Unmarshal(body, leaderResponse)

return leaderResponse.Leader, err
}

func (m Marathon) get(url string) ([]byte, error) {
client := m.getClient()
request, err := http.NewRequest("GET", url, nil)
Expand Down
12 changes: 12 additions & 0 deletions marathon/marathon_stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ type MarathonerStub struct {
AppsStub []*apps.App
AppStub map[tasks.AppId]*apps.App
TasksStub map[tasks.AppId][]*tasks.Task
leader string
}

func (m MarathonerStub) Apps() ([]*apps.App, error) {
Expand All @@ -32,6 +33,16 @@ func (m MarathonerStub) Tasks(appId tasks.AppId) ([]*tasks.Task, error) {
}
}

func (m MarathonerStub) Leader() (string, error) {
return m.leader, nil
}

func MarathonerStubWithLeaderForApps(leader string, args ...*apps.App) *MarathonerStub {
stub := MarathonerStubForApps(args...)
stub.leader = leader
return stub
}

func MarathonerStubForApps(args ...*apps.App) *MarathonerStub {
appsMap := make(map[tasks.AppId]*apps.App)
tasksMap := make(map[tasks.AppId][]*tasks.Task)
Expand All @@ -50,5 +61,6 @@ func MarathonerStubForApps(args ...*apps.App) *MarathonerStub {
AppsStub: args,
AppStub: appsMap,
TasksStub: tasksMap,
leader: "localhost:8080",
}
}
7 changes: 5 additions & 2 deletions marathon/marathon_stub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ import (
func TestMarathonStub(t *testing.T) {
t.Parallel()
// given
m := marathon.MarathonerStubForApps(utils.ConsulApp("/test/app", 3))
m := marathon.MarathonerStubWithLeaderForApps("some.host:1234", utils.ConsulApp("/test/app", 3))
// when
leader, _ := m.Leader()
// then
assert.Equal(t, "some.host:1234", leader)
// when
apps, _ := m.Apps()
// then
Expand All @@ -33,5 +37,4 @@ func TestMarathonStub(t *testing.T) {
// then
assert.Error(t, errOnNotExistingTasks)
assert.Nil(t, notExistingTasks)

}
59 changes: 59 additions & 0 deletions marathon/marathon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,65 @@ func TestUrl_WithAuth(t *testing.T) {
assert.Equal(t, "http://peter:parker@localhost:8080/v2/apps", m.url("/v2/apps"))
}

func TestLeader_SuccessfulResponse(t *testing.T) {
t.Parallel()

// given
server, transport := stubServer("/v2/leader", `{"leader": "some.leader.host:8081"}`)
defer server.Close()
url, _ := url.Parse(server.URL)
m, _ := New(Config{Location: url.Host, Protocol: "HTTP"})
m.transport = transport

// when
leader, err := m.Leader()

//then
assert.NoError(t, err)
assert.Equal(t, "some.leader.host:8081", leader)
}

func TestLeader_ErrorOnMalformedJsonResponse(t *testing.T) {
t.Parallel()

// given
server, transport := stubServer("/v2/leader", "{")
defer server.Close()
url, _ := url.Parse(server.URL)
m, _ := New(Config{Location: url.Host, Protocol: "HTTP"})
m.transport = transport

// when
leader, err := m.Leader()

//then
assert.Error(t, err)
assert.Empty(t, leader)
}

func TestLeader_RetryOnFailingResponse(t *testing.T) {
t.Parallel()

// given
calls := 0
server, transport := mockServer(func(w http.ResponseWriter, r *http.Request) {
calls++
w.WriteHeader(500)
})
defer server.Close()
url, _ := url.Parse(server.URL)
m, _ := New(Config{Location: url.Host, Protocol: "HTTP"})
m.transport = transport

// when
leader, err := m.Leader()

//then
assert.Error(t, err)
assert.Equal(t, 3, calls)
assert.Empty(t, leader)
}

// http://keighl.com/post/mocking-http-responses-in-golang/
func stubServer(uri string, body string) (*httptest.Server, *http.Transport) {
return mockServer(func(w http.ResponseWriter, r *http.Request) {
Expand Down
2 changes: 2 additions & 0 deletions sync/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@ import "time"

type Config struct {
Interval time.Duration
Leader string
Force bool
}
41 changes: 36 additions & 5 deletions sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,28 @@ import (
)

type Sync struct {
config Config
marathon marathon.Marathoner
service service.ConsulServices
}

func New(marathon marathon.Marathoner, service service.ConsulServices) *Sync {
return &Sync{marathon, service}
func New(config Config, marathon marathon.Marathoner, service service.ConsulServices) *Sync {
return &Sync{config, marathon, service}
}

func (s *Sync) StartSyncServicesJob(interval time.Duration) *time.Ticker {
log.WithField("Interval", interval).Info("Marathon-consul sync job started")
ticker := time.NewTicker(interval)
func (s *Sync) StartSyncServicesJob() *time.Ticker {
if s.config.Leader == "" && !s.config.Force {
log.Info("Marathon-consul sync disabled")
return nil
}

log.WithFields(log.Fields{
"Interval": s.config.Interval,
"Leader": s.config.Leader,
"Force": s.config.Force,
}).Info("Marathon-consul sync job started")

ticker := time.NewTicker(s.config.Interval)
go func() {
s.SyncServices()
for {
Expand All @@ -42,6 +53,9 @@ func (s *Sync) SyncServices() error {
}

func (s *Sync) syncServices() error {
if check, err := s.shouldPerformSync(); !check {
return err
}
log.Info("Syncing services started")

apps, err := s.marathon.Apps()
Expand All @@ -63,6 +77,23 @@ func (s *Sync) syncServices() error {
return nil
}

func (s *Sync) shouldPerformSync() (bool, error) {
if s.config.Force {
log.Debug("Forcing sync")
return true, nil
}
leader, err := s.marathon.Leader()
if err != nil {
log.WithError(err).Error("Could not check instance leadership")
return false, err
}
if leader != s.config.Leader {
log.Info("Marathon instance has no leadership, skipping sync")
return false, nil
}
return true, nil
}

func (s *Sync) registerMarathonApps(apps []*apps.App) {

for _, app := range apps {
Expand Down
2 changes: 1 addition & 1 deletion sync/sync_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func BenchmarkDeregisterConsulServicesThatAreNotInMarathonApps100x100(b *testing
func bench(b *testing.B, appsCount, instancesCount int) {
apps := marathonApps(appsCount, instancesCount)
instances := consulInstances(appsCount, instancesCount)
sync := New(nil, consul.NewConsulStub())
sync := New(Config{}, nil, consul.NewConsulStub())

b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand Down

0 comments on commit 671465b

Please sign in to comment.