Skip to content

Commit

Permalink
Implement and expose ready state.
Browse files Browse the repository at this point in the history
During deployments (only), we mark applications as ready depending on
the outcome of potentially configured readiness checks.

The change goes along with some major refactoring, primarily focused
towards retrieving all necessary state from a single Marathon API
request thanks to the 'embed=apps.{tasks,deployments,readiness}' query
parameter. This is necessary in order to retrieve a single, consistent
app/task state not skewed by two API requests send away at slightly
different offsets.

Additionally, we stop considering tasks as ready which have not yet
reached the TASK_RUNNING state since still staging or otherwise
non-ready tasks are bound to fail if taken into load-balancing rotation
prematurely.
  • Loading branch information
timoreimann committed May 23, 2017
1 parent 69ec469 commit 767b0d1
Show file tree
Hide file tree
Showing 2 changed files with 502 additions and 113 deletions.
258 changes: 146 additions & 112 deletions services/marathon/marathon.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,26 @@ package marathon

import (
"encoding/json"
"github.com/QubitProducts/bamboo/configuration"
"io/ioutil"
"log"
"net/http"
"sort"
"strings"

"github.com/QubitProducts/bamboo/configuration"
)

const taskStateRunning = "TASK_RUNNING"

// Describes an app process running
type Task struct {
Id string
Host string
Port int
Ports []int
Alive bool
State string
Ready bool
}

// A health check on the application
Expand All @@ -36,6 +42,7 @@ type App struct {
HealthCheckPath string
HealthCheckProtocol string
HealthChecks []HealthCheck
ReadinessCheckPath string
Tasks []Task
ServicePort int
ServicePorts []int
Expand All @@ -58,12 +65,6 @@ func (slice AppList) Swap(i, j int) {
slice[i], slice[j] = slice[j], slice[i]
}

type marathonTaskList []marathonTask

type marathonTasks struct {
Tasks marathonTaskList `json:"tasks"`
}

type HealthCheckResult struct {
Alive bool
}
Expand All @@ -74,12 +75,15 @@ type marathonTask struct {
Host string
Ports []int
ServicePorts []int
State string
StartedAt string
StagedAt string
Version string
HealthCheckResults []HealthCheckResult
}

type marathonTaskList []marathonTask

func (slice marathonTaskList) Len() int {
return len(slice)
}
Expand All @@ -97,11 +101,15 @@ type marathonApps struct {
}

type marathonApp struct {
Id string `json:"id"`
HealthChecks []marathonHealthCheck `json:"healthChecks"`
Ports []int `json:"ports"`
Env map[string]string `json:"env"`
Labels map[string]string `json:"labels"`
Id string `json:"id"`
HealthChecks []marathonHealthCheck `json:"healthChecks"`
Ports []int `json:"ports"`
Env map[string]string `json:"env"`
Labels map[string]string `json:"labels"`
Deployments []deployment `json:"deployments"`
Tasks marathonTaskList `json:"tasks"`
ReadinessChecks []marathonReadinessCheck `json:"readinessChecks"`
ReadinessCheckResults []readinessCheckResult `json:"readinessCheckResults"`
}

type marathonHealthCheck struct {
Expand All @@ -110,110 +118,92 @@ type marathonHealthCheck struct {
PortIndex int `json:"portIndex"`
}

func fetchMarathonApps(endpoint string, conf *configuration.Configuration) (map[string]marathonApp, error) {
client := &http.Client{}
req, _ := http.NewRequest("GET", endpoint+"/v2/apps", nil)
req.Header.Add("Accept", "application/json")
req.Header.Add("Content-Type", "application/json")
if len(conf.Marathon.User) > 0 && len(conf.Marathon.Password) > 0 {
req.SetBasicAuth(conf.Marathon.User, conf.Marathon.Password)
}
response, err := client.Do(req)
type marathonReadinessCheck struct {
Path string `json:"path"`
}

if err != nil {
return nil, err
}
type deployment struct {
ID string `json:"id"`
}

defer response.Body.Close()
var appResponse marathonApps
type readinessCheckResult struct {
TaskID string `json:"taskId"`
Ready bool `json:"ready"`
}

contents, err := ioutil.ReadAll(response.Body)
if err != nil {
return nil, err
}
/*
Apps returns a struct that describes Marathon current app and their
sub tasks information.
err = json.Unmarshal(contents, &appResponse)
if err != nil {
return nil, err
}
Parameters:
endpoint: Marathon HTTP endpoint, e.g. http://localhost:8080
*/
func FetchApps(maraconf configuration.Marathon, conf *configuration.Configuration) (AppList, error) {
var marathonApps []marathonApp
var err error

dataById := map[string]marathonApp{}
// Try all configured endpoints until one succeeds or we exhaust the list,
// whichever comes first.
for _, url := range maraconf.Endpoints() {
marathonApps, err = fetchMarathonApps(url, conf)
if err == nil {
for _, marathonApp := range marathonApps {
sort.Sort(marathonApp.Tasks)
}
apps := createApps(marathonApps)
sort.Sort(apps)
return apps, nil
}
}
// return last error
return nil, err
}

for _, appConfig := range appResponse.Apps {
dataById[appConfig.Id] = appConfig
func fetchMarathonApps(endpoint string, conf *configuration.Configuration) ([]marathonApp, error) {
var appResponse marathonApps
if err := parseJSON(endpoint+"/v2/apps?embed=app.tasks&embed=app.deployments&embed=app.readiness", conf, &appResponse); err != nil {
return nil, err
}

return dataById, nil
return appResponse.Apps, nil
}

func fetchTasks(endpoint string, conf *configuration.Configuration) (map[string]marathonTaskList, error) {
func parseJSON(url string, conf *configuration.Configuration, out interface{}) error {
client := &http.Client{}
req, _ := http.NewRequest("GET", endpoint+"/v2/tasks", nil)
req, _ := http.NewRequest("GET", url, nil)
req.Header.Add("Accept", "application/json")
req.Header.Add("Content-Type", "application/json")
if len(conf.Marathon.User) > 0 && len(conf.Marathon.Password) > 0 {
req.SetBasicAuth(conf.Marathon.User, conf.Marathon.Password)
}
response, err := client.Do(req)

var tasks marathonTasks

response, err := client.Do(req)
if err != nil {
return nil, err
return err
}

contents, err := ioutil.ReadAll(response.Body)
defer response.Body.Close()
if err != nil {
return nil, err
}

err = json.Unmarshal(contents, &tasks)
contents, err := ioutil.ReadAll(response.Body)
if err != nil {
return nil, err
}

taskList := tasks.Tasks
sort.Sort(taskList)

tasksById := map[string]marathonTaskList{}
for _, task := range taskList {
if tasksById[task.AppId] == nil {
tasksById[task.AppId] = marathonTaskList{}
}
tasksById[task.AppId] = append(tasksById[task.AppId], task)
return err
}

for _, task_list := range tasksById {
sort.Sort(task_list)
err = json.Unmarshal(contents, &out)
if err != nil {
return err
}

return tasksById, nil
return nil
}

func calculateTaskHealth(healthCheckResults []HealthCheckResult, healthChecks []marathonHealthCheck) bool {
//If we don't even have health check results for every health check, don't count the task as healthy
if len(healthChecks) > len(healthCheckResults) {
return false
}
for _, healthCheck := range healthCheckResults {
if !healthCheck.Alive {
return false
}
}
return true
}

func createApps(tasksById map[string]marathonTaskList, marathonApps map[string]marathonApp) AppList {
func createApps(marathonApps []marathonApp) AppList {
apps := AppList{}

for appId, mApp := range marathonApps {

for _, mApp := range marathonApps {
appId := mApp.Id
// Try to handle old app id format without slashes
appPath := appId
if !strings.HasPrefix(appId, "/") {
appPath = "/" + appId
}
appPath := "/" + strings.TrimPrefix(mApp.Id, "/")

// build App from marathonApp
app := App{
Expand All @@ -222,6 +212,7 @@ func createApps(tasksById map[string]marathonTaskList, marathonApps map[string]m
EscapedId: strings.Replace(appId, "/", "::", -1),
HealthCheckPath: parseHealthCheckPath(mApp.HealthChecks),
HealthCheckProtocol: parseHealthCheckProtocol(mApp.HealthChecks),
ReadinessCheckPath: parseReadinessCheckPath(mApp.ReadinessChecks),
Env: mApp.Env,
Labels: mApp.Labels,
SplitId: strings.Split(appId, "/"),
Expand All @@ -244,14 +235,16 @@ func createApps(tasksById map[string]marathonTaskList, marathonApps map[string]m

// build Tasks for this App
tasks := []Task{}
for _, mTask := range tasksById[appId] {
for _, mTask := range mApp.Tasks {
if len(mTask.Ports) > 0 {
t := Task{
Id: mTask.Id,
Host: mTask.Host,
Port: mTask.Ports[0],
Ports: mTask.Ports,
Alive: calculateTaskHealth(mTask.HealthCheckResults, mApp.HealthChecks),
State: mTask.State,
Ready: calculateReadiness(mTask, mApp),
}
tasks = append(tasks, t)
}
Expand Down Expand Up @@ -297,41 +290,82 @@ func parseHealthCheckProtocol(checks []marathonHealthCheck) string {
return ""
}

/*
Apps returns a struct that describes Marathon current app and their
sub tasks information.
Parameters:
endpoint: Marathon HTTP endpoint, e.g. http://localhost:8080
*/
func FetchApps(maraconf configuration.Marathon, conf *configuration.Configuration) (AppList, error) {
func parseReadinessCheckPath(checks []marathonReadinessCheck) string {
if len(checks) > 0 {
return checks[0].Path
}

var applist AppList
var err error
return ""
}

// try all configured endpoints until one succeeds
for _, url := range maraconf.Endpoints() {
applist, err = _fetchApps(url, conf)
if err == nil {
return applist, err
func calculateTaskHealth(healthCheckResults []HealthCheckResult, healthChecks []marathonHealthCheck) bool {
// If we don't even have health check results for every health check, don't
// count the task as healthy.
if len(healthChecks) > len(healthCheckResults) {
return false
}
for _, healthCheck := range healthCheckResults {
if !healthCheck.Alive {
return false
}
}
// return last error
return nil, err
return true
}

func _fetchApps(url string, conf *configuration.Configuration) (AppList, error) {
tasks, err := fetchTasks(url, conf)
if err != nil {
return nil, err
func calculateReadiness(task marathonTask, maraApp marathonApp) bool {
switch {
case task.State != taskStateRunning:
// By definition, a task not running cannot be ready.
log.Printf("task %s app %s: ready = false [task state %s != required state %s]", task.Id, maraApp.Id, task.State, taskStateRunning)
return false

case len(maraApp.Deployments) == 0:
// We only care about readiness during deployments; post-deployment readiness
// should be covered by a separate HAProxy health check definition.
log.Printf("task %s app %s: ready = true [no deployment ongoing]", task.Id, maraApp.Id)
return true

case len(maraApp.ReadinessChecks) == 0:
// Applications without configured readiness checks are always considered
// ready.
log.Printf("task %s app %s: ready = true [no readiness checks on app]", task.Id, maraApp.Id)
return true
}

marathonApps, err := fetchMarathonApps(url, conf)
if err != nil {
return nil, err
// Loop through all readiness check results and return the results for
// matching task IDs.
for _, readinessCheckResult := range maraApp.ReadinessCheckResults {
if readinessCheckResult.TaskID == task.Id {
log.Printf("task %s app %s: ready = %t [evaluating readiness check ready state]", task.Id, maraApp.Id, readinessCheckResult.Ready)
return readinessCheckResult.Ready
}
}

apps := createApps(tasks, marathonApps)
sort.Sort(apps)
return apps, nil
// There's a corner case sometimes hit where the first new task of a
// deployment goes from TASK_STAGING to TASK_RUNNING without a corresponding
// health check result being included in the API response. This only happens
// in a very short (yet unlucky) time frame and does not repeat for subsequent
// tasks of the same deployment.
// We identify this situation by checking that we are looking at a part of the
// deployment representing a new task (i.e., it has the most recent version
// timestamp while other timestamps exist as well). If that's the case, we
// err on the side of caution and mark it as non-ready.
versions := map[string]bool{}
var maxVersion string
for _, task := range maraApp.Tasks {
versions[task.Version] = true
if maxVersion == "" || maxVersion < task.Version {
maxVersion = task.Version
}
}
if len(versions) > 1 && task.Version == maxVersion {
log.Printf("task %s app %s: ready = false [new task with version %s not included in readiness check results yet]", task.Id, maraApp.Id, maxVersion)
return false
}

// Finally, we can be certain this task is not part of the deployment (i.e.,
// it's an old task that's going to transition into the TASK_KILLING and/or
// TASK_KILLED state as new tasks' readiness checks gradually turn green.)
log.Printf("task %s app %s: ready = true [task not involved in deployment]", task.Id, maraApp.Id)
return true
}
Loading

0 comments on commit 767b0d1

Please sign in to comment.