Skip to content

Commit

Permalink
Fixes #78 | Process requests async
Browse files Browse the repository at this point in the history
From now all events will be processed in background.
500 and 400 response codes should not appear for end
user. Instead they will get 202 Accepted.
  • Loading branch information
janisz committed Apr 22, 2016
1 parent dc62017 commit c2e06f1
Show file tree
Hide file tree
Showing 13 changed files with 585 additions and 434 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ consul-token | | The Consul ACL token
consul-tag | `marathon` | Common tag name added to every service registered in Consul, should be unique for every Marathon-cluster connected to Consul
consul-timeout | `3s` | Time limit for requests made by the Consul HTTP client. A Timeout of zero means no timeout
listen | `:4000` | Accept connections at this address
events-queue-size | `1000` | Size of events queue
log-level | `info` | Log level: panic, fatal, error, warn, info, or debug
log-format | `text` | Log format: JSON, text
log-file | | Save logs to file (e.g.: `/var/log/marathon-consul.log`). If empty logs are published to STDERR
Expand All @@ -132,9 +133,10 @@ metrics-location | | Graphite URL (used when metrics
metrics-prefix | `default` | Metrics prefix (resolved to `<hostname>.<app_name>` by default)
metrics-target | `stdout` | Metrics destination `stdout` or `graphite` (empty string disables metrics)
sync-enabled | `true` | Enable Marathon-consul scheduled sync
sync-force | `false` | Force leadership-independent Marathon-consul sync (run always)
sync-interval | `15m0s` | Marathon-consul sync [interval](https://golang.org/pkg/time/#Duration) **Note:** While using file configuration intervals should be provided in *nanoseconds*
sync-leader | `<hostname>:8080` | Marathon cluster-wide node name (defaults to `<hostname>:8080`), the sync will run only if the node is the current [Marathon-leader](https://mesosphere.github.io/marathon/docs/rest-api.html#get-v2-leader)
sync-force | `false` | Force leadership-independent Marathon-consul sync (run always)
workers-pool-size |:`10` | Number of concurrent workers processing events


### Endpoints
Expand Down
9 changes: 5 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@ import (
"github.com/allegro/marathon-consul/marathon"
"github.com/allegro/marathon-consul/metrics"
"github.com/allegro/marathon-consul/sync"
"github.com/allegro/marathon-consul/web"
flag "github.com/ogier/pflag"
)

type Config struct {
Consul consul.ConsulConfig
Web struct {
Listen string
}
Consul consul.ConsulConfig
Web web.Config
Sync sync.Config
Marathon marathon.Config
Metrics metrics.Config
Expand Down Expand Up @@ -75,6 +74,8 @@ func (config *Config) parseFlags() {

// Web
flag.StringVar(&config.Web.Listen, "listen", ":4000", "accept connections at this address")
flag.IntVar(&config.Web.QueueSize, "events-queue-size", 1000, "size of events queue")
flag.IntVar(&config.Web.WorkersCount, "workers-pool-size", 10, "number of concurrent workers processing events")

// Sync
flag.BoolVar(&config.Sync.Enabled, "sync-enabled", true, "Enable Marathon-consul scheduled sync")
Expand Down
7 changes: 6 additions & 1 deletion config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/allegro/marathon-consul/marathon"
"github.com/allegro/marathon-consul/metrics"
"github.com/allegro/marathon-consul/sync"
"github.com/allegro/marathon-consul/web"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -99,7 +100,11 @@ func TestConfig_ShouldBeMergedWithFileDefaultsAndFlags(t *testing.T) {
RequestRetries: 5,
AgentFailuresTolerance: 3,
},
Web: struct{ Listen string }{Listen: ":4000"},
Web: web.Config{
Listen: ":4000",
QueueSize: 1000,
WorkersCount: 10,
},
Sync: sync.Config{
Interval: 15 * time.Minute,
Enabled: true,
Expand Down
4 changes: 3 additions & 1 deletion debian/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
"RequestRetries": 5
},
"Web": {
"Listen": ":4000"
"Listen": ":4000",
"QueueSize": 1000,
"WorkersCount": 10
},
"Sync": {
"Enabled": true,
Expand Down
5 changes: 4 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,12 @@ func main() {
sync := sync.New(config.Sync, remote, service)
go sync.StartSyncServicesJob()

handler, stop := web.NewHandler(config.Web, remote, service)
defer stop()

// set up routes
http.HandleFunc("/health", web.HealthHandler)
http.HandleFunc("/events", web.NewEventHandler(service, remote).Handle)
http.HandleFunc("/events", handler)

log.WithField("Port", config.Web.Listen).Info("Listening")
log.Fatal(http.ListenAndServe(config.Web.Listen, nil))
Expand Down
25 changes: 17 additions & 8 deletions marathon/marathon_stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,36 +7,45 @@ import (
)

type MarathonerStub struct {
AppsStub []*apps.App
AppStub map[apps.AppId]*apps.App
TasksStub map[apps.AppId][]*apps.Task
leader string
AppsStub []*apps.App
AppStub map[apps.AppId]*apps.App
TasksStub map[apps.AppId][]*apps.Task
leader string
interactions bool
}

func (m MarathonerStub) ConsulApps() ([]*apps.App, error) {
func (m *MarathonerStub) ConsulApps() ([]*apps.App, error) {
m.interactions = true
return m.AppsStub, nil
}

func (m MarathonerStub) App(id apps.AppId) (*apps.App, error) {
func (m *MarathonerStub) App(id apps.AppId) (*apps.App, error) {
m.interactions = true
if app, ok := m.AppStub[id]; ok {
return app, nil
} else {
return nil, fmt.Errorf("app not found")
}
}

func (m MarathonerStub) Tasks(appId apps.AppId) ([]*apps.Task, error) {
func (m *MarathonerStub) Tasks(appId apps.AppId) ([]*apps.Task, error) {
m.interactions = true
if app, ok := m.TasksStub[appId]; ok {
return app, nil
} else {
return nil, fmt.Errorf("app not found")
}
}

func (m MarathonerStub) Leader() (string, error) {
func (m *MarathonerStub) Leader() (string, error) {
m.interactions = true
return m.leader, nil
}

func (m MarathonerStub) Interactions() bool {
return m.interactions
}

func MarathonerStubWithLeaderForApps(leader string, args ...*apps.App) *MarathonerStub {
stub := MarathonerStubForApps(args...)
stub.leader = leader
Expand Down
3 changes: 3 additions & 0 deletions marathon/marathon_stub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@ func TestMarathonStub(t *testing.T) {
t.Parallel()
// given
m := marathon.MarathonerStubWithLeaderForApps("some.host:1234", utils.ConsulApp("/test/app", 3))
// then
assert.False(t, m.Interactions())
// when
leader, _ := m.Leader()
// then
assert.True(t, m.Interactions())
assert.Equal(t, "some.host:1234", leader)
// when
apps, _ := m.ConsulApps()
Expand Down
7 changes: 7 additions & 0 deletions web/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package web

type Config struct {
Listen string
QueueSize int
WorkersCount int
}
Loading

0 comments on commit c2e06f1

Please sign in to comment.