Skip to content

Commit

Permalink
Fixing deployment events handling for apps in groups
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniel Krawczyk committed Jan 28, 2016
1 parent c0bc167 commit 29e29ab
Show file tree
Hide file tree
Showing 4 changed files with 295 additions and 46 deletions.
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,16 @@ Endpoint | Description
`/health` | healthcheck - returns `OK`
`/events` | event sink - returns `OK` if all keys are set in an event, error message otherwise

### Known limitations

The following section describes known limitations with `marathon-consul`.

* Every marathon application needs to have a unique service name in Consul.
* In Marathon when a deployment changing the application's service name (by changing its `labels`) is being stopped, it changes app's configuration anyway.
This means we loose the link between the app and the services registered with the old name in Consul.
Later on, if another deployment takes place, new services are registered with a new name, the old ones are not being deregistered though.
A scheduled sync is required to wipe them out.

## Code

This project is based on
Expand Down
127 changes: 101 additions & 26 deletions events/deployment_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package events
import (
"encoding/json"
"github.com/allegro/marathon-consul/apps"
"strings"
)

type DeploymentEvent struct {
Expand All @@ -17,7 +18,9 @@ type Plan struct {
}

type Deployments struct {
Apps []*apps.App `json:"apps"`
Id string `json:"id"`
Apps []*apps.App `json:"apps"`
Groups []*Deployments `json:"groups"`
}

type CurrentStep struct {
Expand All @@ -29,29 +32,39 @@ type Action struct {
AppId apps.AppId `json:"app"`
}

func (d *DeploymentEvent) OriginalApps() []*apps.App {
func (d *Deployments) groups() []*Deployments {
if d.Groups != nil {
return d.Groups
}
return []*Deployments{}
}

func (d *Deployments) apps() []*apps.App {
if d.Apps != nil {
return d.Apps
}
return []*apps.App{}
}

func (d *DeploymentEvent) originalDeployments() *Deployments {
if d.Plan != nil {
if d.Plan.Original != nil {
if d.Plan.Original.Apps != nil {
return d.Plan.Original.Apps
}
return d.Plan.Original
}
}
return []*apps.App{}
return &Deployments{}
}

func (d *DeploymentEvent) TargetApps() []*apps.App {
func (d *DeploymentEvent) targetDeployments() *Deployments {
if d.Plan != nil {
if d.Plan.Target != nil {
if d.Plan.Target.Apps != nil {
return d.Plan.Target.Apps
}
return d.Plan.Target
}
}
return []*apps.App{}
return &Deployments{}
}

func (d *DeploymentEvent) Actions() []*Action {
func (d *DeploymentEvent) actions() []*Action {
if d.CurrentStep != nil {
if d.CurrentStep.Actions != nil {
return d.CurrentStep.Actions
Expand All @@ -61,19 +74,19 @@ func (d *DeploymentEvent) Actions() []*Action {
}

func (d *DeploymentEvent) StoppedConsulApps() []*apps.App {
return d.consulAppsWithActionPerformed(d.OriginalApps(), "StopApplication")
return d.consulAppsWithActionPerformed(d.originalDeployments(), "StopApplication")
}

func (d *DeploymentEvent) RestartedConsulApps() []*apps.App {
return d.consulAppsWithActionPerformed(d.OriginalApps(), "RestartApplication")
return d.consulAppsWithActionPerformed(d.originalDeployments(), "RestartApplication")
}

func (d *DeploymentEvent) RenamedConsulApps() []*apps.App {
original := d.consulAppsWithActionPerformed(d.OriginalApps(), "RestartApplication")
original := d.consulAppsWithActionPerformed(d.originalDeployments(), "RestartApplication")
renamedApps := []*apps.App{}
if len(original) > 0 {

target := d.consulAppsWithActionPerformed(d.TargetApps(), "RestartApplication")
target := d.consulAppsWithActionPerformed(d.targetDeployments(), "RestartApplication")
originalMap := d.appsMap(original)
targetMap := d.appsMap(target)
for id, originalApp := range originalMap {
Expand All @@ -94,27 +107,89 @@ func (d *DeploymentEvent) appsMap(applications []*apps.App) map[apps.AppId]*apps
return result
}

func (d *DeploymentEvent) consulAppsWithActionPerformed(allApps []*apps.App, actionType string) []*apps.App {
foundApps := []*apps.App{}
foundAppIdSet := make(map[apps.AppId]struct{})
func (d *DeploymentEvent) consulAppsWithActionPerformed(deployments *Deployments, actionType string) []*apps.App {
appIds := make(map[apps.AppId]struct{})
var exists struct{}

for _, action := range d.Actions() {
for _, action := range d.actions() {
if action.Type == actionType {
foundAppIdSet[action.AppId] = exists
appIds[action.AppId] = exists
}
}
return d.filterConsulApps(d.findAppsInDeploymentsGroup(appIds, deployments))
}

if len(foundAppIdSet) > 0 {
for _, app := range allApps {
if _, ok := foundAppIdSet[app.ID]; ok && app.IsConsulApp() {
foundApps = append(foundApps, app)
}
func (d *DeploymentEvent) filterConsulApps(allApps []*apps.App) []*apps.App {
filtered := []*apps.App{}
for _, app := range allApps {
if app.IsConsulApp() {
filtered = append(filtered, app)
}
}
return filtered
}

func (d *DeploymentEvent) findAppsInDeploymentsGroup(appIds map[apps.AppId]struct{}, deployment *Deployments) []*apps.App {
foundApps := []*apps.App{}
filteredAppIds := deployment.filterCurrentGroupAppIds(appIds)

foundInCurrentGroup := d.findAppsInCurrentDeploymentGroupApps(filteredAppIds, deployment)
for _, app := range foundInCurrentGroup {
foundApps = append(foundApps, app)
}

foundInChildGroups := d.findAppsInDeploymentChildGroups(filteredAppIds, deployment)
for _, app := range foundInChildGroups {
foundApps = append(foundApps, app)
}
return foundApps
}

func (d *DeploymentEvent) findAppsInCurrentDeploymentGroupApps(appIds map[apps.AppId]struct{}, deployment *Deployments) []*apps.App {
foundApps := []*apps.App{}
searchForCount := len(appIds)

for _, app := range deployment.apps() {
if searchForCount < 1 {
break
}
if _, ok := appIds[app.ID]; ok {
foundApps = append(foundApps, app)
searchForCount--
}
}
return foundApps
}

func (d *DeploymentEvent) findAppsInDeploymentChildGroups(appIds map[apps.AppId]struct{}, deployment *Deployments) []*apps.App {
foundApps := []*apps.App{}
searchForCount := len(appIds)

for _, group := range deployment.groups() {
if searchForCount < 1 {
break
}
foundInChildGroup := d.findAppsInDeploymentsGroup(appIds, group)
for _, app := range foundInChildGroup {
foundApps = append(foundApps, app)
}
searchForCount -= len(foundInChildGroup)
}
return foundApps
}

func (d *Deployments) filterCurrentGroupAppIds(appIds map[apps.AppId]struct{}) map[apps.AppId]struct{} {
filteredAppIds := make(map[apps.AppId]struct{})
var exists struct{}

for appId, _ := range appIds {
if strings.HasPrefix(appId.String(), d.Id) {
filteredAppIds[appId] = exists
}
}
return filteredAppIds
}

func ParseDeploymentEvent(jsonBlob []byte) (*DeploymentEvent, error) {
deploymentInfo := &DeploymentEvent{}
err := json.Unmarshal(jsonBlob, deploymentInfo)
Expand Down
56 changes: 36 additions & 20 deletions events/deployment_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,15 @@ func TestParseDeploymentInfo(t *testing.T) {

// when
deploymentInfo, err := ParseDeploymentEvent(blob)
action := deploymentInfo.Actions()[0]
app := deploymentInfo.OriginalApps()[0]
action := deploymentInfo.actions()[0]
app := deploymentInfo.originalDeployments().apps()[0]

// then
assert.NoError(t, err)
assert.Equal(t, "StopApplication", action.Type)
assert.Equal(t, "/internalName", action.AppId.String())
assert.Equal(t, "/internalName", app.ID.String())
assert.Equal(t, "consulName", app.Labels["consul"])
assert.Equal(t, app, deploymentInfo.StoppedConsulApps()[0])
}

func TestParseDeploymentStepSuccess(t *testing.T) {
Expand All @@ -33,40 +32,57 @@ func TestParseDeploymentStepSuccess(t *testing.T) {

// when
deploymentInfo, err := ParseDeploymentEvent(blob)
action := deploymentInfo.Actions()[0]
app := deploymentInfo.OriginalApps()[1]
action := deploymentInfo.actions()[0]
app := deploymentInfo.RestartedConsulApps()[0]

// then
assert.NoError(t, err)
assert.Equal(t, "RestartApplication", action.Type)
assert.Equal(t, "/a", action.AppId.String())
assert.Equal(t, "/a", app.ID.String())
assert.Equal(t, "b", app.Labels["consul"])
assert.Equal(t, app, deploymentInfo.RestartedConsulApps()[0])
}

func TestOriginalApps_OnEmpty(t *testing.T) {
func TestParseDeploymentStepSuccessWithGroups(t *testing.T) {
t.Parallel()
// given
blob, _ := ioutil.ReadFile("deployment_step_success_with_groups.json")

// when
deploymentInfo, err := ParseDeploymentEvent(blob)
action := deploymentInfo.actions()[0]
app := deploymentInfo.RestartedConsulApps()[0]

// then
assert.NoError(t, err)
assert.Equal(t, "RestartApplication", action.Type)
assert.Equal(t, "/com.example/things/something", action.AppId.String())
assert.Equal(t, "/com.example/things/something", app.ID.String())
assert.Equal(t, "something", app.Labels["consul"])
}

func TestOriginalDeployments_OnEmpty(t *testing.T) {
t.Parallel()
// given
deploymentInfo := &DeploymentEvent{}

// when
apps := deploymentInfo.OriginalApps()
deployments := deploymentInfo.originalDeployments()

// then
assert.Len(t, apps, 0)
assert.NotNil(t, deployments)
}

func TestTargetApps_OnEmpty(t *testing.T) {
func TestTargetDeployments_OnEmpty(t *testing.T) {
t.Parallel()
// given
deploymentInfo := &DeploymentEvent{}

// when
apps := deploymentInfo.TargetApps()
deployments := deploymentInfo.targetDeployments()

// then
assert.Len(t, apps, 0)
assert.NotNil(t, deployments)
}

func TestActions_OnEmpty(t *testing.T) {
Expand All @@ -75,7 +91,7 @@ func TestActions_OnEmpty(t *testing.T) {
deploymentInfo := &DeploymentEvent{}

// when
actions := deploymentInfo.Actions()
actions := deploymentInfo.actions()

// then
assert.Len(t, actions, 0)
Expand Down Expand Up @@ -110,8 +126,8 @@ func TestStoppedConsulApps(t *testing.T) {

// then
assert.Len(t, stoppedApps, 2)
assert.Contains(t, stoppedApps, deploymentInfo.OriginalApps()[1])
assert.Contains(t, stoppedApps, deploymentInfo.OriginalApps()[2])
assert.Contains(t, stoppedApps, deploymentInfo.originalDeployments().apps()[1])
assert.Contains(t, stoppedApps, deploymentInfo.originalDeployments().apps()[2])
}

func TestRestartedConsulApps(t *testing.T) {
Expand Down Expand Up @@ -143,8 +159,8 @@ func TestRestartedConsulApps(t *testing.T) {

// then
assert.Len(t, restartedApps, 2)
assert.Contains(t, restartedApps, deploymentInfo.OriginalApps()[1])
assert.Contains(t, restartedApps, deploymentInfo.OriginalApps()[2])
assert.Contains(t, restartedApps, deploymentInfo.originalDeployments().apps()[1])
assert.Contains(t, restartedApps, deploymentInfo.originalDeployments().apps()[2])
}

func TestStoppedConsulApps_NoStoppedApps(t *testing.T) {
Expand Down Expand Up @@ -218,7 +234,7 @@ func TestRenamedConsulApps(t *testing.T) {

// then
assert.Len(t, renamedApps, 1)
assert.Contains(t, renamedApps, deploymentInfo.OriginalApps()[2])
assert.Contains(t, renamedApps, deploymentInfo.originalDeployments().apps()[2])
}

func TestRenamedConsulApps_OnEmptyPlan(t *testing.T) {
Expand Down Expand Up @@ -305,7 +321,7 @@ func TestRenamedConsulApps_OnCustomNameAdded(t *testing.T) {

// then
assert.Len(t, renamedApps, 1)
assert.Contains(t, renamedApps, deploymentInfo.OriginalApps()[0])
assert.Contains(t, renamedApps, deploymentInfo.originalDeployments().apps()[0])
}

func TestRenamedConsulApps_OnCustomNameAddedToNonConsulApp(t *testing.T) {
Expand Down Expand Up @@ -372,5 +388,5 @@ func TestRenamedConsulApps_OnConsulLabelRemoved(t *testing.T) {

// then
assert.Len(t, renamedApps, 1)
assert.Contains(t, renamedApps, deploymentInfo.OriginalApps()[0])
assert.Contains(t, renamedApps, deploymentInfo.originalDeployments().apps()[0])
}

0 comments on commit 29e29ab

Please sign in to comment.