Skip to content

Commit

Permalink
Move sync.Leader to marathon module
Browse files Browse the repository at this point in the history
  • Loading branch information
tomez committed Mar 22, 2017
1 parent b1e8dd8 commit 7f975a0
Show file tree
Hide file tree
Showing 12 changed files with 102 additions and 95 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ marathon-protocol | `http` | Marathon protocol (http or https
marathon-ssl-verify | `true` | Verify certificates when connecting via SSL
marathon-timeout | `30s` | Time limit for requests made by the Marathon HTTP client. A Timeout of zero means no timeout
marathon-username | | Marathon username for basic auth
marathon-leader | | Marathon cluster-wide node name (defaults to <hostname>:8080), the some leader specific calls will be made only if the specified node is the current Marathon-leader")
metrics-interval | `30s` | Metrics reporting interval
metrics-location | | Graphite URL (used when metrics-target is set to graphite)
metrics-prefix | `default` | Metrics prefix (default is resolved to <hostname>.<app_name>
Expand Down
2 changes: 1 addition & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,14 @@ func (config *Config) parseFlags() {
// Sync
flag.BoolVar(&config.Sync.Enabled, "sync-enabled", true, "Enable Marathon-consul scheduled sync")
flag.DurationVar(&config.Sync.Interval.Duration, "sync-interval", 15*time.Minute, "Marathon-consul sync interval")
flag.StringVar(&config.Sync.Leader, "sync-leader", "", "Marathon cluster-wide node name (defaults to <hostname>:8080), the sync will run only if the specified node is the current Marathon-leader")
flag.BoolVar(&config.Sync.Force, "sync-force", false, "Force leadership-independent Marathon-consul sync (run always)")

// Marathon
flag.StringVar(&config.Marathon.Location, "marathon-location", "localhost:8080", "Marathon URL")
flag.StringVar(&config.Marathon.Protocol, "marathon-protocol", "http", "Marathon protocol (http or https)")
flag.StringVar(&config.Marathon.Username, "marathon-username", "", "Marathon username for basic auth")
flag.StringVar(&config.Marathon.Password, "marathon-password", "", "Marathon password for basic auth")
flag.StringVar(&config.Marathon.Leader, "marathon-leader", "", "Marathon cluster-wide node name (defaults to <hostname>:8080), the some leader specific calls will be made only if the specified node is the current Marathon-leader")
flag.BoolVar(&config.Marathon.VerifySsl, "marathon-ssl-verify", true, "Verify certificates when connecting via SSL")
flag.DurationVar(&config.Marathon.Timeout.Duration, "marathon-timeout", 30*time.Second, "Time limit for requests made by the Marathon HTTP client. A Timeout of zero means no timeout")

Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func main() {

consulInstance := consul.New(config.Consul)
// TODO(tz) - move Leader from sync module to highest level config, access like config.Leader
remote, err := marathon.New(config.Marathon, config.Sync.Leader)
remote, err := marathon.New(config.Marathon)
if err != nil {
log.Fatal(err.Error())
}
Expand Down
1 change: 1 addition & 0 deletions marathon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ type Config struct {
Protocol string
Username string
Password string
Leader string
VerifySsl bool
Timeout time.Interval
}
28 changes: 23 additions & 5 deletions marathon/marathon.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io/ioutil"
"net/http"
"net/url"
"os"
"strings"
"time"

Expand All @@ -15,13 +16,15 @@ import (
"github.com/allegro/marathon-consul/metrics"
)

var hostname = os.Hostname

type Marathoner interface {
ConsulApps() ([]*apps.App, error)
App(apps.AppID) (*apps.App, error)
Tasks(apps.AppID) ([]apps.Task, error)
Leader() (string, error)
EventStream([]string, int, int) (*Streamer, error)
AmILeader() (bool, error)
IsLeader() (bool, error)
}

type Marathon struct {
Expand All @@ -36,7 +39,7 @@ type LeaderResponse struct {
Leader string `json:"leader"`
}

func New(config Config, leader string) (*Marathon, error) {
func New(config Config) (*Marathon, error) {
var auth *url.Userinfo
if len(config.Username) == 0 && len(config.Password) == 0 {
auth = nil
Expand All @@ -53,8 +56,8 @@ func New(config Config, leader string) (*Marathon, error) {
return &Marathon{
Location: config.Location,
Protocol: config.Protocol,
MyLeader: config.Leader,
Auth: auth,
MyLeader: leader,
client: &http.Client{
Transport: transport,
Timeout: config.Timeout.Duration,
Expand Down Expand Up @@ -142,7 +145,7 @@ func (m Marathon) leaderPoll() error {
retries := 5
i := 0
for range pollTicker {
leading, err := m.AmILeader()
leading, err := m.IsLeader()
if err != nil {
if i >= retries {
return fmt.Errorf("Failed to get a leader after %d retries", i)
Expand Down Expand Up @@ -229,7 +232,22 @@ func (m Marathon) urlWithQuery(path string, params params) string {
return marathon.String()
}

func (m Marathon) AmILeader() (bool, error) {
func (m *Marathon) IsLeader() (bool, error) {
if m.MyLeader == "" {
if err := m.resolveHostname(); err != nil {
return false, fmt.Errorf("Could not resolve hostname: %v", err)
}
}
leader, err := m.Leader()
return m.MyLeader == leader, err
}

func (m *Marathon) resolveHostname() error {
hostname, err := hostname()
if err != nil {
return err
}
m.MyLeader = fmt.Sprintf("%s:8080", hostname)
log.WithField("Leader", m.MyLeader).Info("Marathon Leader mode set to resolved hostname")
return nil
}
9 changes: 6 additions & 3 deletions marathon/marathon_stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type MarathonerStub struct {
AppsStub []*apps.App
AppStub map[apps.AppID]*apps.App
TasksStub map[apps.AppID][]apps.Task
MyLeader string
leader string
interactionsMu sync.RWMutex
interactions bool
Expand Down Expand Up @@ -46,8 +47,8 @@ func (m *MarathonerStub) EventStream([]string, int, int) (*Streamer, error) {
return &Streamer{}, nil
}

func (m *MarathonerStub) AmILeader() (bool, error) {
return false, nil
func (m *MarathonerStub) IsLeader() (bool, error) {
return m.leader == m.MyLeader, nil
}

func (m *MarathonerStub) Interactions() bool {
Expand All @@ -62,9 +63,10 @@ func (m *MarathonerStub) noteInteraction() {
m.interactions = true
}

func MarathonerStubWithLeaderForApps(leader string, args ...*apps.App) *MarathonerStub {
func MarathonerStubWithLeaderForApps(leader, myLeader string, args ...*apps.App) *MarathonerStub {
stub := MarathonerStubForApps(args...)
stub.leader = leader
stub.MyLeader = myLeader
return stub
}

Expand All @@ -86,6 +88,7 @@ func MarathonerStubForApps(args ...*apps.App) *MarathonerStub {
AppsStub: args,
AppStub: appsMap,
TasksStub: tasksMap,
MyLeader: "localhost:8080",
leader: "localhost:8080",
}
}
2 changes: 1 addition & 1 deletion marathon/marathon_stub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
func TestMarathonStub(t *testing.T) {
t.Parallel()
// given
m := marathon.MarathonerStubWithLeaderForApps("some.host:1234", utils.ConsulApp("/test/app", 3))
m := marathon.MarathonerStubWithLeaderForApps("some.host:1234", "some.host:1234", utils.ConsulApp("/test/app", 3))
// then
assert.False(t, m.Interactions())
// when
Expand Down

0 comments on commit 7f975a0

Please sign in to comment.