Skip to content

Commit

Permalink
Fixes #78 | Async events handler
Browse files Browse the repository at this point in the history
  • Loading branch information
janisz committed Apr 25, 2016
1 parent 6df975d commit e0f2b8c
Show file tree
Hide file tree
Showing 13 changed files with 491 additions and 302 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ consul-ssl-verify | `true` | Verify certificates when connec
consul-token | | The Consul ACL token
consul-tag | `marathon` | Common tag name added to every service registered in Consul, should be unique for every Marathon-cluster connected to Consul
consul-timeout | `3s` | Time limit for requests made by the Consul HTTP client. A Timeout of zero means no timeout
events-queue-size | `1000` | Size of events queue
listen | `:4000` | Accept connections at this address
log-level | `info` | Log level: panic, fatal, error, warn, info, or debug
log-format | `text` | Log format: JSON, text
Expand All @@ -132,9 +133,10 @@ metrics-location | | Graphite URL (used when metrics
metrics-prefix | `default` | Metrics prefix (resolved to `<hostname>.<app_name>` by default)
metrics-target | `stdout` | Metrics destination `stdout` or `graphite` (empty string disables metrics)
sync-enabled | `true` | Enable Marathon-consul scheduled sync
sync-force | `false` | Force leadership-independent Marathon-consul sync (run always)
sync-interval | `15m0s` | Marathon-consul sync [interval](https://golang.org/pkg/time/#Duration) **Note:** While using file configuration intervals should be provided in *nanoseconds*
sync-leader | `<hostname>:8080` | Marathon cluster-wide node name (defaults to `<hostname>:8080`), the sync will run only if the node is the current [Marathon-leader](https://mesosphere.github.io/marathon/docs/rest-api.html#get-v2-leader)
sync-force | `false` | Force leadership-independent Marathon-consul sync (run always)
workers-pool-size | `10` | Number of concurrent workers processing events


### Endpoints
Expand Down
9 changes: 5 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@ import (
"github.com/allegro/marathon-consul/marathon"
"github.com/allegro/marathon-consul/metrics"
"github.com/allegro/marathon-consul/sync"
"github.com/allegro/marathon-consul/web"
flag "github.com/ogier/pflag"
)

type Config struct {
Consul consul.ConsulConfig
Web struct {
Listen string
}
Consul consul.ConsulConfig
Web web.Config
Sync sync.Config
Marathon marathon.Config
Metrics metrics.Config
Expand Down Expand Up @@ -75,6 +74,8 @@ func (config *Config) parseFlags() {

// Web
flag.StringVar(&config.Web.Listen, "listen", ":4000", "accept connections at this address")
flag.IntVar(&config.Web.QueueSize, "events-queue-size", 1000, "size of events queue")
flag.IntVar(&config.Web.WorkersCount, "workers-pool-size", 10, "number of concurrent workers processing events")

// Sync
flag.BoolVar(&config.Sync.Enabled, "sync-enabled", true, "Enable Marathon-consul scheduled sync")
Expand Down
7 changes: 6 additions & 1 deletion config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/allegro/marathon-consul/marathon"
"github.com/allegro/marathon-consul/metrics"
"github.com/allegro/marathon-consul/sync"
"github.com/allegro/marathon-consul/web"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -99,7 +100,11 @@ func TestConfig_ShouldBeMergedWithFileDefaultsAndFlags(t *testing.T) {
RequestRetries: 5,
AgentFailuresTolerance: 3,
},
Web: struct{ Listen string }{Listen: ":4000"},
Web: web.Config{
Listen: ":4000",
QueueSize: 1000,
WorkersCount: 10,
},
Sync: sync.Config{
Interval: 15 * time.Minute,
Enabled: true,
Expand Down
4 changes: 3 additions & 1 deletion debian/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
"RequestRetries": 5
},
"Web": {
"Listen": ":4000"
"Listen": ":4000",
"QueueSize": 1000,
"WorkersCount": 10
},
"Sync": {
"Enabled": true,
Expand Down
5 changes: 4 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,12 @@ func main() {
sync := sync.New(config.Sync, remote, service)
go sync.StartSyncServicesJob()

handler, stop := web.NewHandler(config.Web, remote, service)
defer stop()

// set up routes
http.HandleFunc("/health", web.HealthHandler)
http.HandleFunc("/events", web.NewEventHandler(service, remote).Handle)
http.HandleFunc("/events", handler)

log.WithField("Port", config.Web.Listen).Info("Listening")
log.Fatal(http.ListenAndServe(config.Web.Listen, nil))
Expand Down
25 changes: 17 additions & 8 deletions marathon/marathon_stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,36 +7,45 @@ import (
)

type MarathonerStub struct {
AppsStub []*apps.App
AppStub map[apps.AppId]*apps.App
TasksStub map[apps.AppId][]*apps.Task
leader string
AppsStub []*apps.App
AppStub map[apps.AppId]*apps.App
TasksStub map[apps.AppId][]*apps.Task
leader string
interactions bool
}

func (m MarathonerStub) ConsulApps() ([]*apps.App, error) {
func (m *MarathonerStub) ConsulApps() ([]*apps.App, error) {
m.interactions = true
return m.AppsStub, nil
}

func (m MarathonerStub) App(id apps.AppId) (*apps.App, error) {
func (m *MarathonerStub) App(id apps.AppId) (*apps.App, error) {
m.interactions = true
if app, ok := m.AppStub[id]; ok {
return app, nil
} else {
return nil, fmt.Errorf("app not found")
}
}

func (m MarathonerStub) Tasks(appId apps.AppId) ([]*apps.Task, error) {
func (m *MarathonerStub) Tasks(appId apps.AppId) ([]*apps.Task, error) {
m.interactions = true
if app, ok := m.TasksStub[appId]; ok {
return app, nil
} else {
return nil, fmt.Errorf("app not found")
}
}

func (m MarathonerStub) Leader() (string, error) {
func (m *MarathonerStub) Leader() (string, error) {
m.interactions = true
return m.leader, nil
}

func (m MarathonerStub) Interactions() bool {
return m.interactions
}

func MarathonerStubWithLeaderForApps(leader string, args ...*apps.App) *MarathonerStub {
stub := MarathonerStubForApps(args...)
stub.leader = leader
Expand Down
3 changes: 3 additions & 0 deletions marathon/marathon_stub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@ func TestMarathonStub(t *testing.T) {
t.Parallel()
// given
m := marathon.MarathonerStubWithLeaderForApps("some.host:1234", utils.ConsulApp("/test/app", 3))
// then
assert.False(t, m.Interactions())
// when
leader, _ := m.Leader()
// then
assert.True(t, m.Interactions())
assert.Equal(t, "some.host:1234", leader)
// when
apps, _ := m.ConsulApps()
Expand Down
2 changes: 1 addition & 1 deletion utils/apps.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package utils

import (
"fmt"
"strings"

"github.com/allegro/marathon-consul/apps"
"strings"
)

func ConsulApp(name string, instances int) *apps.App {
Expand Down
7 changes: 7 additions & 0 deletions web/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package web

type Config struct {
Listen string
QueueSize int
WorkersCount int
}

0 comments on commit e0f2b8c

Please sign in to comment.