diff --git a/README.md b/README.md index e0288dc..8d9ed5d 100644 --- a/README.md +++ b/README.md @@ -49,6 +49,7 @@ make test - Provided HTTP healtcheck will be transferred to Consul. - Labels with `tag` value will be converted to Consul tags, `marathon` tag is added by default (e.g, `labels: ["public":"tag", "varnish":"tag", "env": "test"]` → `tags: ["public", "varnish", "marathon"]`). +- To schedule Marathon-consul sync one of two options has to be set: `sync-leader` (run sync only on specified node [leadership](https://mesosphere.github.io/marathon/docs/rest-api.html#get-v2-leader) or `sync-force` (run always). ### Options @@ -73,9 +74,11 @@ marathon-protocol | http | Marathon protocol (http or http marathon-username | | Marathon username for basic auth metrics-interval | 30s | Metrics reporting [interval](https://golang.org/pkg/time/#Duration) **Note:** While using file configuration intervals should be provided in *nanoseconds* metrics-location | | Graphite URL (used when metrics-target is set to graphite) -metrics-prefix | default | Metrics prefix (default is resolved to . +metrics-prefix | default | Metrics prefix (resolved to `hostname>.` by default) metrics-target | stdout | Metrics destination stdout or graphite sync-interval | 15m0s | Marathon-consul sync [interval](https://golang.org/pkg/time/#Duration) **Note:** While using file configuration intervals should be provided in *nanoseconds* +sync-leader | | Marathon instance for which leadership the Marathon-consul sync is enabled (see [Marathon REST API](https://mesosphere.github.io/marathon/docs/rest-api.html#get-v2-leader)) +sync-force | false | Force leadership-independent Marathon-consul sync ### Adding New Root Certificate Authorities diff --git a/config/config.go b/config/config.go index a418b40..330c1f2 100644 --- a/config/config.go +++ b/config/config.go @@ -68,6 +68,8 @@ func (config *Config) parseFlags() { // Sync flag.DurationVar(&config.Sync.Interval, "sync-interval", 15*time.Minute, "Marathon-consul sync interval") + flag.BoolVar(&config.Sync.Force, "sync-force", false, "Force leadership-independent Marathon-consul sync") + flag.StringVar(&config.Sync.Leader, "sync-leader", "", "Marathon instance for which leadership the Marathon-consul sync is enabled") // Marathon flag.StringVar(&config.Marathon.Location, "marathon-location", "localhost:8080", "Marathon URL") diff --git a/main.go b/main.go index e22d326..0dd8f54 100644 --- a/main.go +++ b/main.go @@ -33,8 +33,8 @@ func main() { log.Fatal(err.Error()) } - sync := sync.New(remote, service) - go sync.StartSyncServicesJob(config.Sync.Interval) + sync := sync.New(config.Sync, remote, service) + go sync.StartSyncServicesJob() // set up routes http.HandleFunc("/health", web.HealthHandler) diff --git a/marathon/marathon.go b/marathon/marathon.go index 804db3c..72c58e1 100644 --- a/marathon/marathon.go +++ b/marathon/marathon.go @@ -2,6 +2,7 @@ package marathon import ( "crypto/tls" + "encoding/json" "fmt" log "github.com/Sirupsen/logrus" "github.com/allegro/marathon-consul/apps" @@ -18,6 +19,7 @@ type Marathoner interface { Apps() ([]*apps.App, error) App(tasks.AppId) (*apps.App, error) Tasks(tasks.AppId) ([]*tasks.Task, error) + Leader() (string, error) } type Marathon struct { @@ -27,6 +29,10 @@ type Marathon struct { transport http.RoundTripper } +type LeaderResponse struct { + Leader string `json:"leader"` +} + func New(config Config) (*Marathon, error) { var auth *url.Userinfo if len(config.Username) == 0 && len(config.Password) == 0 { @@ -83,6 +89,20 @@ func (m Marathon) Tasks(app tasks.AppId) ([]*tasks.Task, error) { return tasks.ParseTasks(body) } +func (m Marathon) Leader() (string, error) { + log.WithField("Location", m.Location).Debug("Asking Marathon for leader") + + body, err := m.get(m.url("/v2/leader")) + if err != nil { + return "", err + } + + leaderResponse := &LeaderResponse{} + err = json.Unmarshal(body, leaderResponse) + + return leaderResponse.Leader, err +} + func (m Marathon) get(url string) ([]byte, error) { client := m.getClient() request, err := http.NewRequest("GET", url, nil) diff --git a/marathon/marathon_stub.go b/marathon/marathon_stub.go index 228feaf..fc5d76a 100644 --- a/marathon/marathon_stub.go +++ b/marathon/marathon_stub.go @@ -10,6 +10,7 @@ type MarathonerStub struct { AppsStub []*apps.App AppStub map[tasks.AppId]*apps.App TasksStub map[tasks.AppId][]*tasks.Task + leader string } func (m MarathonerStub) Apps() ([]*apps.App, error) { @@ -32,6 +33,16 @@ func (m MarathonerStub) Tasks(appId tasks.AppId) ([]*tasks.Task, error) { } } +func (m MarathonerStub) Leader() (string, error) { + return m.leader, nil +} + +func MarathonerStubWithLeaderForApps(leader string, args ...*apps.App) *MarathonerStub { + stub := MarathonerStubForApps(args...) + stub.leader = leader + return stub +} + func MarathonerStubForApps(args ...*apps.App) *MarathonerStub { appsMap := make(map[tasks.AppId]*apps.App) tasksMap := make(map[tasks.AppId][]*tasks.Task) @@ -50,5 +61,6 @@ func MarathonerStubForApps(args ...*apps.App) *MarathonerStub { AppsStub: args, AppStub: appsMap, TasksStub: tasksMap, + leader: "localhost:8080", } } diff --git a/marathon/marathon_stub_test.go b/marathon/marathon_stub_test.go index c351a3d..c103f96 100644 --- a/marathon/marathon_stub_test.go +++ b/marathon/marathon_stub_test.go @@ -10,7 +10,11 @@ import ( func TestMarathonStub(t *testing.T) { t.Parallel() // given - m := marathon.MarathonerStubForApps(utils.ConsulApp("/test/app", 3)) + m := marathon.MarathonerStubWithLeaderForApps("some.host:1234", utils.ConsulApp("/test/app", 3)) + // when + leader, _ := m.Leader() + // then + assert.Equal(t, "some.host:1234", leader) // when apps, _ := m.Apps() // then @@ -33,5 +37,4 @@ func TestMarathonStub(t *testing.T) { // then assert.Error(t, errOnNotExistingTasks) assert.Nil(t, notExistingTasks) - } diff --git a/marathon/marathon_test.go b/marathon/marathon_test.go index 3be6a7d..151d1dc 100644 --- a/marathon/marathon_test.go +++ b/marathon/marathon_test.go @@ -277,6 +277,65 @@ func TestUrl_WithAuth(t *testing.T) { assert.Equal(t, "http://peter:parker@localhost:8080/v2/apps", m.url("/v2/apps")) } +func TestLeader_SuccessfulResponse(t *testing.T) { + t.Parallel() + + // given + server, transport := stubServer("/v2/leader", `{"leader": "some.leader.host:8081"}`) + defer server.Close() + url, _ := url.Parse(server.URL) + m, _ := New(Config{Location: url.Host, Protocol: "HTTP"}) + m.transport = transport + + // when + leader, err := m.Leader() + + //then + assert.NoError(t, err) + assert.Equal(t, "some.leader.host:8081", leader) +} + +func TestLeader_ErrorOnMalformedJsonResponse(t *testing.T) { + t.Parallel() + + // given + server, transport := stubServer("/v2/leader", "{") + defer server.Close() + url, _ := url.Parse(server.URL) + m, _ := New(Config{Location: url.Host, Protocol: "HTTP"}) + m.transport = transport + + // when + leader, err := m.Leader() + + //then + assert.Error(t, err) + assert.Empty(t, leader) +} + +func TestLeader_RetryOnFailingResponse(t *testing.T) { + t.Parallel() + + // given + calls := 0 + server, transport := mockServer(func(w http.ResponseWriter, r *http.Request) { + calls++ + w.WriteHeader(500) + }) + defer server.Close() + url, _ := url.Parse(server.URL) + m, _ := New(Config{Location: url.Host, Protocol: "HTTP"}) + m.transport = transport + + // when + leader, err := m.Leader() + + //then + assert.Error(t, err) + assert.Equal(t, 3, calls) + assert.Empty(t, leader) +} + // http://keighl.com/post/mocking-http-responses-in-golang/ func stubServer(uri string, body string) (*httptest.Server, *http.Transport) { return mockServer(func(w http.ResponseWriter, r *http.Request) { diff --git a/sync/config.go b/sync/config.go index 4cc82af..f4cbe49 100644 --- a/sync/config.go +++ b/sync/config.go @@ -4,4 +4,6 @@ import "time" type Config struct { Interval time.Duration + Leader string + Force bool } diff --git a/sync/sync.go b/sync/sync.go index 7786d6d..b8da5c9 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -12,17 +12,28 @@ import ( ) type Sync struct { + config Config marathon marathon.Marathoner service service.ConsulServices } -func New(marathon marathon.Marathoner, service service.ConsulServices) *Sync { - return &Sync{marathon, service} +func New(config Config, marathon marathon.Marathoner, service service.ConsulServices) *Sync { + return &Sync{config, marathon, service} } -func (s *Sync) StartSyncServicesJob(interval time.Duration) *time.Ticker { - log.WithField("Interval", interval).Info("Marathon-consul sync job started") - ticker := time.NewTicker(interval) +func (s *Sync) StartSyncServicesJob() *time.Ticker { + if s.config.Leader == "" && !s.config.Force { + log.Info("Marathon-consul sync disabled") + return nil + } + + log.WithFields(log.Fields{ + "Interval": s.config.Interval, + "Leader": s.config.Leader, + "Force": s.config.Force, + }).Info("Marathon-consul sync job started") + + ticker := time.NewTicker(s.config.Interval) go func() { s.SyncServices() for { @@ -42,6 +53,9 @@ func (s *Sync) SyncServices() error { } func (s *Sync) syncServices() error { + if check, err := s.shouldPerformSync(); !check { + return err + } log.Info("Syncing services started") apps, err := s.marathon.Apps() @@ -63,6 +77,23 @@ func (s *Sync) syncServices() error { return nil } +func (s *Sync) shouldPerformSync() (bool, error) { + if s.config.Force { + log.Debug("Forcing sync") + return true, nil + } + leader, err := s.marathon.Leader() + if err != nil { + log.WithError(err).Error("Could not check instance leadership") + return false, err + } + if leader != s.config.Leader { + log.Info("Marathon instance has no leadership, skipping sync") + return false, nil + } + return true, nil +} + func (s *Sync) registerMarathonApps(apps []*apps.App) { for _, app := range apps { diff --git a/sync/sync_bench_test.go b/sync/sync_bench_test.go index 2387c47..9e8b256 100644 --- a/sync/sync_bench_test.go +++ b/sync/sync_bench_test.go @@ -39,7 +39,7 @@ func BenchmarkDeregisterConsulServicesThatAreNotInMarathonApps100x100(b *testing func bench(b *testing.B, appsCount, instancesCount int) { apps := marathonApps(appsCount, instancesCount) instances := consulInstances(appsCount, instancesCount) - sync := New(nil, consul.NewConsulStub()) + sync := New(Config{}, nil, consul.NewConsulStub()) b.ResetTimer() for i := 0; i < b.N; i++ { diff --git a/sync/sync_test.go b/sync/sync_test.go index 2b15610..53b45cf 100644 --- a/sync/sync_test.go +++ b/sync/sync_test.go @@ -14,16 +14,16 @@ import ( "time" ) -func TestSyncJob(t *testing.T) { +func TestSyncJob_ShouldSyncOnLeadership(t *testing.T) { t.Parallel() // given app := ConsulApp("app1", 1) marathon := marathon.MarathonerStubForApps(app) services := newConsulServicesMock() - sync := New(marathon, services) + sync := New(Config{Interval: 10 * time.Millisecond, Leader: "localhost:8080"}, marathon, services) // when - ticker := sync.StartSyncServicesJob(10 * time.Millisecond) + ticker := sync.StartSyncServicesJob() // then select { @@ -33,6 +33,54 @@ func TestSyncJob(t *testing.T) { } } +func TestSyncServices_ShouldNotSyncOnNoForceNorLeaderSpecified(t *testing.T) { + t.Parallel() + // given + app := ConsulApp("app1", 1) + marathon := marathon.MarathonerStubWithLeaderForApps("localhost:8080", app) + services := newConsulServicesMock() + sync := New(Config{}, marathon, services) + + // when + ticker := sync.StartSyncServicesJob() + + // then + assert.Nil(t, ticker) + assert.Zero(t, services.RegistrationsCount(app.Tasks[0].ID.String())) +} + +func TestSyncServices_ShouldNotSyncOnNoLeadership(t *testing.T) { + t.Parallel() + // given + app := ConsulApp("app1", 1) + marathon := marathon.MarathonerStubWithLeaderForApps("leader:8080", app) + services := newConsulServicesMock() + sync := New(Config{Leader: "different.node:8090"}, marathon, services) + + // when + err := sync.SyncServices() + + // then + assert.NoError(t, err) + assert.Zero(t, services.RegistrationsCount(app.Tasks[0].ID.String())) +} + +func TestSyncServices_ShouldSyncOnForceWithoutLeadership(t *testing.T) { + t.Parallel() + // given + app := ConsulApp("app1", 1) + marathon := marathon.MarathonerStubWithLeaderForApps("leader:8080", app) + services := newConsulServicesMock() + sync := New(Config{Leader: "different.node:8090", Force: true}, marathon, services) + + // when + err := sync.SyncServices() + + // then + assert.NoError(t, err) + assert.Equal(t, 1, services.RegistrationsCount(app.Tasks[0].ID.String())) +} + type ConsulServicesMock struct { registrations map[string]int } @@ -61,6 +109,7 @@ func (c *ConsulServicesMock) Deregister(serviceId tasks.Id, agent string) error } func TestSyncAppsFromMarathonToConsul(t *testing.T) { + t.Parallel() // given marathoner := marathon.MarathonerStubForApps( ConsulApp("app1", 2), @@ -69,7 +118,7 @@ func TestSyncAppsFromMarathonToConsul(t *testing.T) { ) consul := consul.NewConsulStub() - marathonSync := New(marathoner, consul) + marathonSync := newSync(marathoner, consul) // when marathonSync.SyncServices() @@ -83,20 +132,21 @@ func TestSyncAppsFromMarathonToConsul(t *testing.T) { } func TestRemoveInvalidServicesFromConsul(t *testing.T) { + t.Parallel() // given marathoner := marathon.MarathonerStubForApps( ConsulApp("app1-invalid", 1), ConsulApp("app2", 1), ) consul := consul.NewConsulStub() - marathonSync := New(marathoner, consul) + marathonSync := New(Config{}, marathoner, consul) marathonSync.SyncServices() // when marathoner = marathon.MarathonerStubForApps( ConsulApp("app2", 1), ) - marathonSync = New(marathoner, consul) + marathonSync = newSync(marathoner, consul) marathonSync.SyncServices() // then @@ -106,6 +156,7 @@ func TestRemoveInvalidServicesFromConsul(t *testing.T) { } func TestSyncOnlyHealthyServices(t *testing.T) { + t.Parallel() // given marathoner := marathon.MarathonerStubForApps( ConsulApp("app1", 1), @@ -113,7 +164,7 @@ func TestSyncOnlyHealthyServices(t *testing.T) { ConsulAppWithUnhealthyInstances("app3-all-unhealthy", 2, 2), ) consul := consul.NewConsulStub() - marathonSync := New(marathoner, consul) + marathonSync := newSync(marathoner, consul) // when marathonSync.SyncServices() @@ -127,11 +178,12 @@ func TestSyncOnlyHealthyServices(t *testing.T) { } func TestSync_WithRegisteringProblems(t *testing.T) { + t.Parallel() // given marathon := marathon.MarathonerStubForApps(ConsulApp("/test/app", 3)) consul := consul.NewConsulStub() consul.ErrorServices["/test/app.1"] = fmt.Errorf("Problem on registration") - sync := New(marathon, consul) + sync := newSync(marathon, consul) // when err := sync.SyncServices() services, _ := consul.GetAllServices() @@ -141,6 +193,7 @@ func TestSync_WithRegisteringProblems(t *testing.T) { } func TestSync_WithDeregisteringProblems(t *testing.T) { + t.Parallel() // given marathon := marathon.MarathonerStubForApps() consulStub := consul.NewConsulStub() @@ -149,7 +202,7 @@ func TestSync_WithDeregisteringProblems(t *testing.T) { consulStub.Register(consul.MarathonTaskToConsulService(task, notMarathonApp.HealthChecks, notMarathonApp.Labels)) consulStub.ErrorServices[task.ID] = fmt.Errorf("Problem on deregistration") } - sync := New(marathon, consulStub) + sync := newSync(marathon, consulStub) // when err := sync.SyncServices() services, _ := consulStub.GetAllServices() @@ -159,9 +212,10 @@ func TestSync_WithDeregisteringProblems(t *testing.T) { } func TestSync_WithMarathonProblems(t *testing.T) { + t.Parallel() // given marathon := errorMarathon{} - sync := New(marathon, nil) + sync := newSync(marathon, nil) // when err := sync.SyncServices() // then @@ -169,16 +223,21 @@ func TestSync_WithMarathonProblems(t *testing.T) { } func TestSync_WithConsulProblems(t *testing.T) { + t.Parallel() // given marathon := marathon.MarathonerStubForApps() consul := errorConsul{} - sync := New(marathon, consul) + sync := newSync(marathon, consul) // when err := sync.SyncServices() // then assert.Error(t, err) } +func newSync(marathon marathon.Marathoner, service consul.ConsulServices) *Sync { + return New(Config{Leader: "localhost:8080"}, marathon, service) +} + type errorMarathon struct { } @@ -194,6 +253,10 @@ func (m errorMarathon) Tasks(appId tasks.AppId) ([]*tasks.Task, error) { return nil, fmt.Errorf("Error") } +func (m errorMarathon) Leader() (string, error) { + return "", fmt.Errorf("Error") +} + type errorConsul struct { }