Skip to content
This repository has been archived by the owner on Sep 4, 2021. It is now read-only.

controller: Add in-batches deploy strategy #4567

Merged
merged 3 commits into from Dec 30, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
43 changes: 40 additions & 3 deletions cli/deployment.go
Expand Up @@ -4,7 +4,7 @@ import (
"fmt"
"strconv"

"github.com/flynn/flynn/controller/client"
controller "github.com/flynn/flynn/controller/client"
ct "github.com/flynn/flynn/controller/types"
"github.com/flynn/go-docopt"
)
Expand All @@ -13,13 +13,16 @@ func init() {
register("deployment", runDeployments, `
usage: flynn deployment
flynn deployment timeout [<timeout>]
flynn deployment batch-size [<size>]

Manage app deployments
Manage app deployments.

Commands:
With no arguments, shows a list of deployments

timeout gets or sets the number of seconds to wait for each job to start when deploying
timeout gets or sets the number of seconds to wait for each job to start when deploying

batch-size gets or sets the batch size for deployments using the in-batches strategy

Examples:

Expand All @@ -34,6 +37,11 @@ Examples:

$ flynn deployment timeout
150

$ flynn deployment batch-size 3

$ flynn deployment batch-size
3
`)
}

Expand All @@ -43,6 +51,11 @@ func runDeployments(args *docopt.Args, client controller.Client) error {
return runSetDeployTimeout(args, client)
}
return runGetDeployTimeout(args, client)
} else if args.Bool["batch-size"] {
if args.String["<size>"] != "" {
return runSetDeployBatchSize(args, client)
}
return runGetDeployBatchSize(args, client)
}

deployments, err := client.DeploymentList(mustApp())
Expand Down Expand Up @@ -79,3 +92,27 @@ func runSetDeployTimeout(args *docopt.Args, client controller.Client) error {
DeployTimeout: int32(timeout),
})
}

func runGetDeployBatchSize(args *docopt.Args, client controller.Client) error {
app, err := client.GetApp(mustApp())
if err != nil {
return err
}
batchSize := app.DeployBatchSize()
if batchSize == nil {
fmt.Println("not set")
} else {
fmt.Println(*batchSize)
}
return nil
}

func runSetDeployBatchSize(args *docopt.Args, client controller.Client) error {
batchSize, err := strconv.Atoi(args.String["<size>"])
if err != nil {
return fmt.Errorf("error parsing batch-size %q: %s", args.String["<size>"], err)
}
app := &ct.App{ID: mustApp()}
app.SetDeployBatchSize(batchSize)
return client.UpdateApp(app)
}
5 changes: 3 additions & 2 deletions controller/data/deployment.go
Expand Up @@ -84,6 +84,7 @@ func (r *DeploymentRepo) Add(appID, releaseID string) (*ct.Deployment, error) {
Processes: oldFormation.Processes,
Tags: oldFormation.Tags,
DeployTimeout: app.DeployTimeout,
BatchSize: app.DeployBatchSize(),
}
if oldRelease != nil {
d.OldReleaseID = oldRelease.ID
Expand All @@ -110,7 +111,7 @@ func (r *DeploymentRepo) Add(appID, releaseID string) (*ct.Deployment, error) {
if d.ID == "" {
d.ID = random.UUID()
}
if err := tx.QueryRow("deployment_insert", d.ID, d.AppID, oldReleaseID, d.NewReleaseID, d.Strategy, d.Processes, d.Tags, d.DeployTimeout).Scan(&d.CreatedAt); err != nil {
if err := tx.QueryRow("deployment_insert", d.ID, d.AppID, oldReleaseID, d.NewReleaseID, d.Strategy, d.Processes, d.Tags, d.DeployTimeout, d.BatchSize).Scan(&d.CreatedAt); err != nil {
tx.Rollback()
if postgres.IsUniquenessError(err, "isolate_deploys") {
return nil, ct.ValidationError{Message: "Cannot create deploy, there is already one in progress for this app."}
Expand Down Expand Up @@ -178,7 +179,7 @@ func scanDeployment(s postgres.Scanner) (*ct.Deployment, error) {
d := &ct.Deployment{}
var oldReleaseID *string
var status *string
err := s.Scan(&d.ID, &d.AppID, &oldReleaseID, &d.NewReleaseID, &d.Strategy, &status, &d.Processes, &d.Tags, &d.DeployTimeout, &d.CreatedAt, &d.FinishedAt)
err := s.Scan(&d.ID, &d.AppID, &oldReleaseID, &d.NewReleaseID, &d.Strategy, &status, &d.Processes, &d.Tags, &d.DeployTimeout, &d.BatchSize, &d.CreatedAt, &d.FinishedAt)
if err == pgx.ErrNoRows {
err = ErrNotFound
}
Expand Down
8 changes: 4 additions & 4 deletions controller/data/queries.go
Expand Up @@ -206,8 +206,8 @@ SELECT COUNT(*) FROM (
WHERE deleted_at IS NULL
) AS l WHERE l.layer_id = $1`
deploymentInsertQuery = `
INSERT INTO deployments (deployment_id, app_id, old_release_id, new_release_id, strategy, processes, tags, deploy_timeout)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8) RETURNING created_at`
INSERT INTO deployments (deployment_id, app_id, old_release_id, new_release_id, strategy, processes, tags, deploy_timeout, batch_size)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) RETURNING created_at`
deploymentUpdateFinishedAtQuery = `
UPDATE deployments SET finished_at = $2 WHERE deployment_id = $1`
deploymentUpdateFinishedAtNowQuery = `
Expand All @@ -218,7 +218,7 @@ DELETE FROM deployments WHERE deployment_id = $1`
WITH deployment_events AS (SELECT * FROM events WHERE object_type = 'deployment')
SELECT d.deployment_id, d.app_id, d.old_release_id, d.new_release_id,
strategy, e1.data->>'status' AS status,
processes, tags, deploy_timeout, d.created_at, d.finished_at
processes, tags, deploy_timeout, batch_size, d.created_at, d.finished_at
FROM deployments d
LEFT JOIN deployment_events e1
ON d.deployment_id = e1.object_id::uuid
Expand All @@ -229,7 +229,7 @@ WHERE e2.created_at IS NULL AND d.deployment_id = $1`
WITH deployment_events AS (SELECT * FROM events WHERE object_type = 'deployment')
SELECT d.deployment_id, d.app_id, d.old_release_id, d.new_release_id,
strategy, e1.data->>'status' AS status,
processes, tags, deploy_timeout, d.created_at, d.finished_at
processes, tags, deploy_timeout, batch_size, d.created_at, d.finished_at
FROM deployments d
LEFT JOIN deployment_events e1
ON d.deployment_id = e1.object_id::uuid
Expand Down
4 changes: 4 additions & 0 deletions controller/data/schema.go
Expand Up @@ -543,6 +543,10 @@ $$ LANGUAGE plpgsql`,
migrations.Add(34,
`INSERT INTO event_types (name) VALUES ('scale_request_cancelation')`,
)
migrations.Add(35,
`INSERT INTO deployment_strategies (name) VALUES ('in-batches')`,
`ALTER TABLE deployments ADD COLUMN batch_size integer`,
)
}

func MigrateDB(db *postgres.DB) error {
Expand Down
24 changes: 24 additions & 0 deletions controller/types/types.go
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"errors"
"fmt"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -75,6 +76,28 @@ func (a *App) Critical() bool {
return ok && v == "true"
}

// DeployBatchSize returns the batch size to use when deploying using the
// in-batches deployment strategy
func (a *App) DeployBatchSize() *int {
v, ok := a.Meta["flynn-deploy-batch-size"]
if !ok {
return nil
}
if i, err := strconv.Atoi(v); err == nil {
return &i
}
return nil
}

// SetDeployBatchSize sets the batch size to use when deploying using the
// in-batches deployment strategy
func (a *App) SetDeployBatchSize(size int) {
if a.Meta == nil {
a.Meta = make(map[string]string)
}
a.Meta["flynn-deploy-batch-size"] = strconv.Itoa(size)
}

type Release struct {
ID string `json:"id,omitempty"`
AppID string `json:"app_id,omitempty"`
Expand Down Expand Up @@ -365,6 +388,7 @@ type Deployment struct {
Processes map[string]int `json:"processes,omitempty"`
Tags map[string]map[string]string `json:"tags,omitempty"`
DeployTimeout int32 `json:"deploy_timeout,omitempty"`
BatchSize *int `json:"batch_size,omitempty"`
CreatedAt *time.Time `json:"created_at,omitempty"`
FinishedAt *time.Time `json:"finished_at,omitempty"`
}
Expand Down
53 changes: 53 additions & 0 deletions controller/worker/deployment/in_batches.go
@@ -0,0 +1,53 @@
package deployment

import (
"fmt"
"sort"
"time"

"github.com/flynn/flynn/pkg/attempt"
"github.com/flynn/flynn/pkg/cluster"
)

func (d *DeployJob) deployInBatches() error {
log := d.logger.New("fn", "deployInBatches")
log.Info("starting in-batches deployment")

batchSize := d.BatchSize
if batchSize == nil {
log.Info("batch size not set, using number of hosts")
hostCount := 0
hostAttempts := attempt.Strategy{
Total: 10 * time.Second,
Delay: 100 * time.Millisecond,
}
if err := hostAttempts.Run(func() error {
hosts, err := cluster.NewClient().Hosts()
if err != nil {
return fmt.Errorf("error determining number of hosts: %s", err)
}
hostCount = len(hosts)
return nil
}); err != nil {
return err
}
batchSize = &hostCount
}

processTypes := make([]string, 0, len(d.Processes))
for typ := range d.Processes {
processTypes = append(processTypes, typ)
}
sort.Sort(sort.StringSlice(processTypes))

log.Info("scaling in batches", "size", *batchSize)

for _, typ := range processTypes {
if err := d.scaleUpDownInBatches(typ, *batchSize, log); err != nil {
return err
}
}

log.Info("finished in-batches deployment")
return nil
}
44 changes: 33 additions & 11 deletions controller/worker/deployment/job.go
Expand Up @@ -4,9 +4,9 @@ import (
"fmt"
"time"

"github.com/flynn/flynn/controller/client"
controller "github.com/flynn/flynn/controller/client"
ct "github.com/flynn/flynn/controller/types"
"github.com/flynn/flynn/controller/worker/types"
worker "github.com/flynn/flynn/controller/worker/types"
"github.com/inconshreveable/log15"
)

Expand All @@ -33,6 +33,8 @@ func (d *DeployJob) Perform() error {
deployFunc = d.deployOneByOne
case "one-down-one-up":
deployFunc = d.deployOneDownOneUp
case "in-batches":
deployFunc = d.deployInBatches
case "all-at-once":
deployFunc = d.deployAllAtOnce
case "sirenia":
Expand Down Expand Up @@ -158,12 +160,16 @@ func (d *DeployJob) logJobEvent(job *ct.Job) error {
}

func (d *DeployJob) scaleOneByOne(typ string, log log15.Logger) error {
for i := 0; i < d.Processes[typ]; i++ {
if err := d.scaleNewFormationUpByOne(typ, log); err != nil {
return d.scaleUpDownInBatches(typ, 1, log)
}

func (d *DeployJob) scaleUpDownInBatches(typ string, batchCount int, log log15.Logger) error {
for i := 0; i < d.Processes[typ]; i += batchCount {
if err := d.scaleNewFormationUp(typ, batchCount, log); err != nil {
return err
}

if err := d.scaleOldFormationDownByOne(typ, log); err != nil {
if err := d.scaleOldFormationDown(typ, batchCount, log); err != nil {
return err
}
}
Expand All @@ -183,6 +189,10 @@ func (d *DeployJob) scaleOneDownOneUp(typ string, log log15.Logger) error {
}

func (d *DeployJob) scaleNewFormationUpByOne(typ string, log log15.Logger) error {
return d.scaleNewFormationUp(typ, 1, log)
}

func (d *DeployJob) scaleNewFormationUp(typ string, count int, log log15.Logger) error {
// only scale new processes which still exist
if _, ok := d.newRelease.Processes[typ]; !ok {
return nil
Expand All @@ -191,24 +201,36 @@ func (d *DeployJob) scaleNewFormationUpByOne(typ string, log log15.Logger) error
if d.newFormation.Processes[typ] == d.Processes[typ] {
return nil
}
log.Info("scaling new formation up by one", "release.id", d.NewReleaseID, "job.type", typ)
d.newFormation.Processes[typ]++
log.Info("scaling new formation up", "release.id", d.NewReleaseID, "job.type", typ, "count", count)
d.newFormation.Processes[typ] += count
// don't scale higher than d.Processes
if d.newFormation.Processes[typ] > d.Processes[typ] {
d.newFormation.Processes[typ] = d.Processes[typ]
}
if err := d.scaleNewRelease(); err != nil {
log.Error("error scaling new formation up by one", "release.id", d.NewReleaseID, "job.type", typ, "err", err)
log.Error("error scaling new formation up", "release.id", d.NewReleaseID, "job.type", typ, "count", count, "err", err)
return err
}
return nil
}

func (d *DeployJob) scaleOldFormationDownByOne(typ string, log log15.Logger) error {
return d.scaleOldFormationDown(typ, 1, log)
}

func (d *DeployJob) scaleOldFormationDown(typ string, count int, log log15.Logger) error {
// don't scale lower than zero
if d.oldFormation.Processes[typ] == 0 {
return nil
}
log.Info("scaling old formation down by one", "release.id", d.OldReleaseID, "job.type", typ)
d.oldFormation.Processes[typ]--
log.Info("scaling old formation down", "release.id", d.OldReleaseID, "job.type", typ, "count", count)
d.oldFormation.Processes[typ] -= count
// don't scale lower than zero
if d.oldFormation.Processes[typ] < 0 {
d.oldFormation.Processes[typ] = 0
}
if err := d.scaleOldRelease(true); err != nil {
log.Error("error scaling old formation down by one", "release.id", d.OldReleaseID, "job.type", typ, "err", err)
log.Error("error scaling old formation down", "release.id", d.OldReleaseID, "job.type", typ, "count", count, "err", err)
return err
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion schema/controller/common.json
Expand Up @@ -43,7 +43,7 @@
},
"strategy": {
"type": "string",
"enum": ["all-at-once", "one-by-one", "sirenia", "discoverd-meta", "one-down-one-up"]
"enum": ["all-at-once", "one-by-one", "sirenia", "discoverd-meta", "one-down-one-up", "in-batches"]
},
"meta": {
"description": "client-specified metadata",
Expand Down
4 changes: 4 additions & 0 deletions schema/controller/deployment.json
Expand Up @@ -45,6 +45,10 @@
"deploy_timeout": {
"$ref": "/schema/controller/common#/definitions/deploy_timeout"
},
"batch_size": {
lmars marked this conversation as resolved.
Show resolved Hide resolved
"description": "batch size for in-batches deployments",
"type": "integer"
},
"created_at": {
"$ref": "/schema/controller/common#/definitions/created_at"
},
Expand Down