Skip to content

Commit

Permalink
Fixed event handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniel Krawczyk committed Jan 8, 2016
1 parent e25dd50 commit a7457ad
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 49 deletions.
6 changes: 3 additions & 3 deletions consul/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

type ConsulServices interface {
GetAllServices() ([]*consulapi.CatalogService, error)
GetServices(name string) ([]*consulapi.CatalogService, error)
GetServices(name tasks.AppId) ([]*consulapi.CatalogService, error)
Register(service *consulapi.AgentServiceRegistration) error
Deregister(serviceId tasks.Id, agentAddress string) error
}
Expand All @@ -24,7 +24,7 @@ func New(config ConsulConfig) *Consul {
}
}

func (c *Consul) GetServices(name string) ([]*consulapi.CatalogService, error) {
func (c *Consul) GetServices(name tasks.AppId) ([]*consulapi.CatalogService, error) {
agent, err := c.agents.GetAnyAgent()
if err != nil {
return nil, err
Expand All @@ -39,7 +39,7 @@ func (c *Consul) GetServices(name string) ([]*consulapi.CatalogService, error) {
dcAwareQuery := &consulapi.QueryOptions{
Datacenter: dc,
}
services, _, err := agent.Catalog().Service(name, "marathon", dcAwareQuery)
services, _, err := agent.Catalog().Service(name.ConsulServiceName(), "marathon", dcAwareQuery)
if err != nil {
return nil, err
}
Expand Down
7 changes: 5 additions & 2 deletions consul/consul_stub.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package consul

import (
"fmt"
"github.com/allegro/marathon-consul/tasks"
consulapi "github.com/hashicorp/consul/api"
)
Expand Down Expand Up @@ -32,10 +33,10 @@ func (c ConsulStub) GetAllServices() ([]*consulapi.CatalogService, error) {
return catalog, nil
}

func (c ConsulStub) GetServices(name string) ([]*consulapi.CatalogService, error) {
func (c ConsulStub) GetServices(name tasks.AppId) ([]*consulapi.CatalogService, error) {
var catalog []*consulapi.CatalogService
for _, s := range c.services {
if s.Name == name {
if s.Name == name.ConsulServiceName() && contains(s.Tags, "marathon") {
catalog = append(catalog, &consulapi.CatalogService{
Address: s.Address,
ServiceAddress: s.Address,
Expand All @@ -61,8 +62,10 @@ func (c *ConsulStub) Register(service *consulapi.AgentServiceRegistration) error

func (c *ConsulStub) Deregister(serviceId tasks.Id, agent string) error {
if err, ok := c.ErrorServices[serviceId]; ok {
fmt.Println("delete err")
return err
} else {
fmt.Println("delete")
delete(c.services, serviceId)
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions sync/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func newConsulServicesMock() *ConsulServicesMock {
}
}

func (c *ConsulServicesMock) GetServices(name string) ([]*consulapi.CatalogService, error) {
func (c *ConsulServicesMock) GetServices(name tasks.AppId) ([]*consulapi.CatalogService, error) {
return nil, nil
}

Expand Down Expand Up @@ -315,7 +315,7 @@ func (m errorMarathon) Leader() (string, error) {
type errorConsul struct {
}

func (c errorConsul) GetServices(name string) ([]*consulapi.CatalogService, error) {
func (c errorConsul) GetServices(name tasks.AppId) ([]*consulapi.CatalogService, error) {
return nil, fmt.Errorf("Error occured")
}

Expand Down
50 changes: 23 additions & 27 deletions web/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,43 +75,35 @@ func (fh *EventHandler) handleTerminationEvent(w http.ResponseWriter, body []byt
appId := event.Apps()[0].ID
log.WithField("Id", appId).Info("Got TerminationEvent")

app, err := fh.marathon.App(appId)
services, err := fh.service.GetServices(appId)
if err != nil {
log.WithField("Id", appId).WithError(err).Error("There was a problem obtaining app info")
log.WithField("Id", appId).WithError(err).Error("There was a problem getting Consul services")
fh.handleError(err, w)
return
}

if !app.IsConsulApp() {
err = fmt.Errorf("%s is not consul app. Missing consul:true label", app.ID)
log.WithField("Id", app.ID).WithError(err).Debug("Skipping app deregistration from Consul")
fh.handleBadRequest(err, w)
return
}

tasks, err := fh.marathon.Tasks(app.ID)
if err != nil {
log.WithField("Id", app.ID).WithError(err).Error("There was a problem obtaining tasks for app")
fh.handleError(err, w)
return
if len(services) == 0 {
log.WithField("Id", appId).Info("No matching Consul services found")
}

errors := []error{}
for _, task := range tasks {
err = fh.service.Deregister(task.ID, task.Host)

for _, service := range services {
err = fh.service.Deregister(tasks.Id(service.ServiceID), service.Address)
if err != nil {
log.WithField("Id", task.ID).WithError(err).Error("There was a problem deregistering task")
log.WithField("Id", service.ServiceID).WithError(err).Error("There was a problem deregistering task")
errors = append(errors, err)
}
}

if len(errors) != 0 {
errMessage := fmt.Sprintf("%d errors occured deregistering %d services:", len(errors), len(tasks))
errMessage := fmt.Sprintf("%d errors occured deregistering %d services:", len(errors), len(services))
for i, err := range errors {
errMessage = fmt.Sprintf("%s\n%d: %s", errMessage, i, err.Error())
}
err = fmt.Errorf(errMessage)
fh.handleError(err, w)
log.WithError(err).WithField("Id", app.ID).Error("There were problems processing request")
log.WithError(err).WithField("Id", appId).Error("There were problems processing request")
} else {
w.WriteHeader(200)
fmt.Fprintln(w, "OK")
Expand Down Expand Up @@ -209,21 +201,25 @@ func (fh *EventHandler) handleStatusEvent(w http.ResponseWriter, body []byte) {

switch task.TaskStatus {
case "TASK_FINISHED", "TASK_FAILED", "TASK_KILLED", "TASK_LOST":
app, err := fh.marathon.App(task.AppID)
services, err := fh.service.GetServices(task.AppID)
if err != nil {
log.WithField("Id", task.AppID).WithError(err).Error("There was a problem obtaining app info")
log.WithField("Id", task.AppID).WithError(err).Error("There was a problem getting Consul services")
fh.handleError(err, w)
return
}

if !app.IsConsulApp() {
err = fmt.Errorf("%s is not consul app. Missing consul:true label", app.ID)
log.WithField("Id", task.ID).WithError(err).Debug("Not handling task event")
fh.handleBadRequest(err, w)
return
if len(services) == 0 {
log.WithField("Id", task.AppID).Info("No matching Consul services found")
}

fh.service.Deregister(task.ID, task.Host)
for _, service := range services {
if service.ServiceID == task.ID.String() {
err = fh.service.Deregister(tasks.Id(service.ServiceID), service.Address)
if err != nil {
log.WithField("Id", service.ServiceID).WithError(err).Error("There was a problem deregistering task")
}
}
}
w.WriteHeader(200)
fmt.Fprintln(w, "OK")
default:
Expand Down

0 comments on commit a7457ad

Please sign in to comment.