From 047c4c6849e4ec983a3f2fb5345e9bc470d9aadf Mon Sep 17 00:00:00 2001 From: Lewis Marshall Date: Wed, 13 May 2015 15:19:53 +0100 Subject: [PATCH] controller: Convert the deployer to a generic worker Signed-off-by: Lewis Marshall --- bootstrap/manifest_template.json | 6 +- controller/Dockerfile | 2 +- controller/Tupfile | 2 +- controller/start.sh | 4 +- .../deployment/all_at_once.go} | 6 +- .../main.go => worker/deployment/context.go} | 81 ++++---------- controller/worker/deployment/errors.go | 24 ++++ .../main.go => worker/deployment/job.go} | 105 ++++++------------ .../deployment/one_by_one.go} | 6 +- .../deployment}/postgres.go | 8 +- controller/worker/main.go | 64 +++++++++++ test/test_scheduler.go | 2 +- 12 files changed, 165 insertions(+), 145 deletions(-) rename controller/{deployer/strategies/all-at-once.go => worker/deployment/all_at_once.go} (94%) rename controller/{deployer/main.go => worker/deployment/context.go} (69%) create mode 100644 controller/worker/deployment/errors.go rename controller/{deployer/strategies/main.go => worker/deployment/job.go} (74%) rename controller/{deployer/strategies/one-by-one.go => worker/deployment/one_by_one.go} (95%) rename controller/{deployer/strategies => worker/deployment}/postgres.go (97%) create mode 100644 controller/worker/main.go diff --git a/bootstrap/manifest_template.json b/bootstrap/manifest_template.json index f34ef14415..11992c83d1 100644 --- a/bootstrap/manifest_template.json +++ b/bootstrap/manifest_template.json @@ -99,8 +99,8 @@ "service": "flynn-controller-scheduler", "resurrect": true }, - "deployer": { - "cmd": ["deployer"] + "worker": { + "cmd": ["worker"] } } }, @@ -150,7 +150,7 @@ "app_step": "controller-inception", "processes": { "scheduler": 1, - "deployer": 2, + "worker": 2, "web": 2 } }, diff --git a/controller/Dockerfile b/controller/Dockerfile index a833a02314..f92a4009bf 100644 --- a/controller/Dockerfile +++ b/controller/Dockerfile @@ -2,7 +2,7 @@ FROM flynn/busybox ADD bin/flynn-controller /bin/flynn-controller ADD bin/flynn-scheduler /bin/flynn-scheduler -ADD bin/flynn-deployer /bin/flynn-deployer +ADD bin/flynn-worker /bin/flynn-worker ADD start.sh /bin/start-flynn-controller ADD bin/jsonschema /etc/flynn-controller/jsonschema diff --git a/controller/Tupfile b/controller/Tupfile index 00956a183a..9a18247ecd 100644 --- a/controller/Tupfile +++ b/controller/Tupfile @@ -1,7 +1,7 @@ include_rules : |> !go |> bin/flynn-controller : |> !go ./scheduler |> bin/flynn-scheduler -: |> !go ./deployer |> bin/flynn-deployer +: |> !go ./worker |> bin/flynn-worker : foreach $(ROOT)/schema/*.json |> !cp |> bin/jsonschema/%g.json : foreach $(ROOT)/schema/controller/*.json |> !cp |> bin/jsonschema/controller/%g.json : foreach $(ROOT)/schema/router/*.json |> !cp |> bin/jsonschema/router/%g.json diff --git a/controller/start.sh b/controller/start.sh index 03ca74fd92..3e47b85791 100755 --- a/controller/start.sh +++ b/controller/start.sh @@ -3,9 +3,9 @@ case $1 in controller) exec /bin/flynn-controller ;; scheduler) exec /bin/flynn-scheduler ;; - deployer) exec /bin/flynn-deployer ;; + worker) exec /bin/flynn-worker ;; *) - echo "Usage: $0 {controller|scheduler|deployer}" + echo "Usage: $0 {controller|scheduler|worker}" exit 2 ;; esac diff --git a/controller/deployer/strategies/all-at-once.go b/controller/worker/deployment/all_at_once.go similarity index 94% rename from controller/deployer/strategies/all-at-once.go rename to controller/worker/deployment/all_at_once.go index a6ae4c94d0..c4957e53c1 100644 --- a/controller/deployer/strategies/all-at-once.go +++ b/controller/worker/deployment/all_at_once.go @@ -1,9 +1,9 @@ -package strategy +package deployment import ct "github.com/flynn/flynn/controller/types" -func allAtOnce(d *Deploy) error { - log := d.logger.New("fn", "allAtOnce") +func (d *DeployJob) deployAllAtOnce() error { + log := d.logger.New("fn", "deployAllAtOnce") log.Info("starting all-at-once deployment") expected := make(jobEvents) diff --git a/controller/deployer/main.go b/controller/worker/deployment/context.go similarity index 69% rename from controller/deployer/main.go rename to controller/worker/deployment/context.go index 2f383ccf66..6df6cd25c4 100644 --- a/controller/deployer/main.go +++ b/controller/worker/deployment/context.go @@ -1,78 +1,31 @@ -package main +package deployment import ( "encoding/json" - "os" "time" "github.com/flynn/flynn/Godeps/_workspace/src/github.com/flynn/que-go" - "github.com/flynn/flynn/Godeps/_workspace/src/github.com/jackc/pgx" "github.com/flynn/flynn/Godeps/_workspace/src/gopkg.in/inconshreveable/log15.v2" "github.com/flynn/flynn/controller/client" - "github.com/flynn/flynn/controller/deployer/strategies" ct "github.com/flynn/flynn/controller/types" + "github.com/flynn/flynn/discoverd/client" "github.com/flynn/flynn/pkg/attempt" "github.com/flynn/flynn/pkg/postgres" - "github.com/flynn/flynn/pkg/shutdown" ) type context struct { db *postgres.DB client *controller.Client + logger log15.Logger } -const workerCount = 10 - -var logger = log15.New("app", "deployer") - -func main() { - log := logger.New("fn", "main") - - log.Info("creating controller client") - client, err := controller.NewClient("", os.Getenv("AUTH_KEY")) - if err != nil { - log.Error("error creating controller client", "err", err) - shutdown.Fatal() - } - - log.Info("connecting to postgres") - db := postgres.Wait("", "") - - log.Info("creating postgres connection pool") - pgxpool, err := pgx.NewConnPool(pgx.ConnPoolConfig{ - ConnConfig: pgx.ConnConfig{ - Host: os.Getenv("PGHOST"), - User: os.Getenv("PGUSER"), - Password: os.Getenv("PGPASSWORD"), - Database: os.Getenv("PGDATABASE"), - }, - AfterConnect: que.PrepareStatements, - MaxConnections: workerCount, - }) - if err != nil { - log.Error("error creating postgres connection pool", "err", err) - shutdown.Fatal() - } - shutdown.BeforeExit(func() { pgxpool.Close() }) - - ctx := context{db: db, client: client} - workers := que.NewWorkerPool( - que.NewClient(pgxpool), - que.WorkMap{"deployment": ctx.HandleJob}, - workerCount, - ) - workers.Interval = 5 * time.Second - - log.Info("starting workers", "count", workerCount, "interval", workers.Interval) - go workers.Start() - shutdown.BeforeExit(func() { workers.Shutdown() }) - - <-make(chan bool) // block and keep running +func JobHandler(db *postgres.DB, client *controller.Client, logger log15.Logger) func(*que.Job) error { + return (&context{db, client, logger}).HandleDeployment } -func (c *context) HandleJob(job *que.Job) (e error) { - log := logger.New("fn", "HandleJob") - log.Info("handling job", "id", job.ID, "error_count", job.ErrorCount) +func (c *context) HandleDeployment(job *que.Job) (e error) { + log := c.logger.New("fn", "HandleDeployment") + log.Info("handling deployment", "job_id", job.ID, "error_count", job.ErrorCount) var args ct.DeployID if err := json.Unmarshal(job.Args, &args); err != nil { @@ -122,7 +75,7 @@ func (c *context) HandleJob(job *que.Job) (e error) { // rollback failed deploy if e != nil { errMsg := e.Error() - if !strategy.IsSkipRollback(e) { + if !IsSkipRollback(e) { log.Warn("rolling back deployment due to error", "err", e) e = c.rollback(log, deployment, f) } @@ -133,8 +86,22 @@ func (c *context) HandleJob(job *que.Job) (e error) { } } }() + + j := &DeployJob{ + Deployment: deployment, + client: c.client, + deployEvents: events, + serviceEvents: make(chan *discoverd.Event), + useJobEvents: make(map[string]struct{}), + logger: c.logger, + oldReleaseState: make(map[string]int, len(deployment.Processes)), + newReleaseState: make(map[string]int, len(deployment.Processes)), + knownJobStates: make(map[jobIDState]struct{}), + omni: make(map[string]struct{}), + } + log.Info("performing deployment") - if err := strategy.Perform(deployment, c.client, events, logger); err != nil { + if err := j.Perform(); err != nil { log.Error("error performing deployment", "err", err) return err } diff --git a/controller/worker/deployment/errors.go b/controller/worker/deployment/errors.go new file mode 100644 index 0000000000..1880622bb2 --- /dev/null +++ b/controller/worker/deployment/errors.go @@ -0,0 +1,24 @@ +package deployment + +import "fmt" + +type ErrSkipRollback struct { + Err string +} + +func (e ErrSkipRollback) Error() string { + return e.Err +} + +func IsSkipRollback(err error) bool { + _, ok := err.(ErrSkipRollback) + return ok +} + +type UnknownStrategyError struct { + Strategy string +} + +func (e UnknownStrategyError) Error() string { + return fmt.Sprintf("deployment: unknown strategy %q", e.Strategy) +} diff --git a/controller/deployer/strategies/main.go b/controller/worker/deployment/job.go similarity index 74% rename from controller/deployer/strategies/main.go rename to controller/worker/deployment/job.go index 0416b5e42d..95903fa574 100644 --- a/controller/deployer/strategies/main.go +++ b/controller/worker/deployment/job.go @@ -1,4 +1,4 @@ -package strategy +package deployment import ( "errors" @@ -12,32 +12,11 @@ import ( "github.com/flynn/flynn/pkg/cluster" ) -type ErrSkipRollback struct { - Err string -} - -func (e ErrSkipRollback) Error() string { - return e.Err -} - -func IsSkipRollback(err error) bool { - _, ok := err.(ErrSkipRollback) - return ok -} - -type UnknownStrategyError struct { - Strategy string -} - -func (e UnknownStrategyError) Error() string { - return fmt.Sprintf("deployer: unknown strategy %q", e.Strategy) -} - type jobIDState struct { jobID, state string } -type Deploy struct { +type DeployJob struct { *ct.Deployment client *controller.Client deployEvents chan<- ct.DeploymentEvent @@ -53,43 +32,29 @@ type Deploy struct { hostCount int } -func (d *Deploy) isOmni(typ string) bool { +func (d *DeployJob) isOmni(typ string) bool { _, ok := d.omni[typ] return ok } -type PerformFunc func(d *Deploy) error - -var performFuncs = map[string]PerformFunc{ - "all-at-once": allAtOnce, - "one-by-one": oneByOne, - "postgres": postgres, -} - -func Perform(d *ct.Deployment, client *controller.Client, deployEvents chan<- ct.DeploymentEvent, logger log15.Logger) error { - log := logger.New("fn", "Perform", "deployment_id", d.ID, "app_id", d.AppID) +func (d *DeployJob) Perform() error { + log := d.logger.New("fn", "Perform", "deployment_id", d.ID, "app_id", d.AppID) log.Info("validating deployment strategy") - performFunc, ok := performFuncs[d.Strategy] - if !ok { + var deployFunc func() error + switch d.Strategy { + case "one-by-one": + deployFunc = d.deployOneByOne + case "all-at-once": + deployFunc = d.deployAllAtOnce + case "postgres": + deployFunc = d.deployPostgres + default: err := UnknownStrategyError{d.Strategy} log.Error("error validating deployment strategy", "err", err) return err } - deploy := &Deploy{ - Deployment: d, - client: client, - deployEvents: deployEvents, - serviceEvents: make(chan *discoverd.Event), - useJobEvents: make(map[string]struct{}), - logger: logger.New("deployment_id", d.ID, "app_id", d.AppID), - oldReleaseState: make(map[string]int, len(d.Processes)), - newReleaseState: make(map[string]int, len(d.Processes)), - knownJobStates: make(map[jobIDState]struct{}), - omni: make(map[string]struct{}), - } - log.Info("determining cluster size") c, err := cluster.NewClient() if err != nil { @@ -101,21 +66,21 @@ func Perform(d *ct.Deployment, client *controller.Client, deployEvents chan<- ct log.Error("error listing cluster hosts", "err", err) return err } - deploy.hostCount = len(hosts) + d.hostCount = len(hosts) log.Info("determining release services and deployment state") - release, err := client.GetRelease(d.NewReleaseID) + release, err := d.client.GetRelease(d.NewReleaseID) if err != nil { log.Error("error getting new release", "release_id", d.NewReleaseID, "err", err) return err } for typ, proc := range release.Processes { if proc.Omni { - deploy.omni[typ] = struct{}{} + d.omni[typ] = struct{}{} } if proc.Service == "" { log.Info(fmt.Sprintf("using job events for %s process type, no service defined", typ)) - deploy.useJobEvents[typ] = struct{}{} + d.useJobEvents[typ] = struct{}{} continue } @@ -140,7 +105,7 @@ func Perform(d *ct.Deployment, client *controller.Client, deployEvents chan<- ct case discoverd.EventKindCurrent: break outer case discoverd.EventKindServiceMeta: - deploy.serviceMeta = event.ServiceMeta + d.serviceMeta = event.ServiceMeta case discoverd.EventKindUp: releaseID, ok := event.Instance.Meta["FLYNN_RELEASE_ID"] if !ok { @@ -148,9 +113,9 @@ func Perform(d *ct.Deployment, client *controller.Client, deployEvents chan<- ct } switch releaseID { case d.OldReleaseID: - deploy.oldReleaseState[typ]++ + d.oldReleaseState[typ]++ case d.NewReleaseID: - deploy.newReleaseState[typ]++ + d.newReleaseState[typ]++ } } case <-time.After(5 * time.Second): @@ -168,15 +133,15 @@ func Perform(d *ct.Deployment, client *controller.Client, deployEvents chan<- ct // dropped. handle that case return } - deploy.serviceEvents <- event + d.serviceEvents <- event } }() } - if len(deploy.useJobEvents) > 0 { + if len(d.useJobEvents) > 0 { log.Info("getting job event stream") - deploy.jobEvents = make(chan *ct.JobEvent) - stream, err := client.StreamJobEvents(d.AppID, deploy.jobEvents) + d.jobEvents = make(chan *ct.JobEvent) + stream, err := d.client.StreamJobEvents(d.AppID, d.jobEvents) if err != nil { log.Error("error getting job event stream", "err", err) return err @@ -184,7 +149,7 @@ func Perform(d *ct.Deployment, client *controller.Client, deployEvents chan<- ct defer stream.Close() log.Info("getting current jobs") - jobs, err := client.JobList(d.AppID) + jobs, err := d.client.JobList(d.AppID) if err != nil { log.Error("error getting current jobs", "err", err) return err @@ -193,30 +158,30 @@ func Perform(d *ct.Deployment, client *controller.Client, deployEvents chan<- ct if job.State != "up" { continue } - if _, ok := deploy.useJobEvents[job.Type]; !ok { + if _, ok := d.useJobEvents[job.Type]; !ok { continue } // track the jobs so we can drop any events received between // connecting the job stream and getting the list of jobs - deploy.knownJobStates[jobIDState{job.ID, "up"}] = struct{}{} + d.knownJobStates[jobIDState{job.ID, "up"}] = struct{}{} switch job.ReleaseID { case d.OldReleaseID: - deploy.oldReleaseState[job.Type]++ + d.oldReleaseState[job.Type]++ case d.NewReleaseID: - deploy.newReleaseState[job.Type]++ + d.newReleaseState[job.Type]++ } } } log.Info( "determined deployment state", - "original", deploy.Processes, - "old_release", deploy.oldReleaseState, - "new_release", deploy.newReleaseState, + "original", d.Processes, + "old_release", d.oldReleaseState, + "new_release", d.newReleaseState, ) - return performFunc(deploy) + return deployFunc() } type jobEvents map[string]map[string]int @@ -246,7 +211,7 @@ func (j jobEvents) Equals(other jobEvents) bool { return true } -func (d *Deploy) waitForJobEvents(releaseID string, expected jobEvents, log log15.Logger) error { +func (d *DeployJob) waitForJobEvents(releaseID string, expected jobEvents, log log15.Logger) error { actual := make(jobEvents) handleEvent := func(jobID, typ, state string) { diff --git a/controller/deployer/strategies/one-by-one.go b/controller/worker/deployment/one_by_one.go similarity index 95% rename from controller/deployer/strategies/one-by-one.go rename to controller/worker/deployment/one_by_one.go index 4279f27717..2f96f7d584 100644 --- a/controller/deployer/strategies/one-by-one.go +++ b/controller/worker/deployment/one_by_one.go @@ -1,4 +1,4 @@ -package strategy +package deployment import ( "fmt" @@ -6,8 +6,8 @@ import ( ct "github.com/flynn/flynn/controller/types" ) -func oneByOne(d *Deploy) error { - log := d.logger.New("fn", "oneByOne") +func (d *DeployJob) deployOneByOne() error { + log := d.logger.New("fn", "deployOneByOne") log.Info("starting one-by-one deployment") oldScale := make(map[string]int, len(d.oldReleaseState)) diff --git a/controller/deployer/strategies/postgres.go b/controller/worker/deployment/postgres.go similarity index 97% rename from controller/deployer/strategies/postgres.go rename to controller/worker/deployment/postgres.go index c71d7ab15b..aa8c58b99f 100644 --- a/controller/deployer/strategies/postgres.go +++ b/controller/worker/deployment/postgres.go @@ -1,4 +1,4 @@ -package strategy +package deployment import ( "encoding/json" @@ -12,8 +12,8 @@ import ( "github.com/flynn/flynn/discoverd/client" ) -func postgres(d *Deploy) (err error) { - log := d.logger.New("fn", "postgres") +func (d *DeployJob) deployPostgres() (err error) { + log := d.logger.New("fn", "deployPostgres") log.Info("starting postgres deployment") defer func() { @@ -236,5 +236,5 @@ loop: } // do a one-by-one deploy for the other process types - return oneByOne(d) + return d.deployOneByOne() } diff --git a/controller/worker/main.go b/controller/worker/main.go new file mode 100644 index 0000000000..683762d818 --- /dev/null +++ b/controller/worker/main.go @@ -0,0 +1,64 @@ +package main + +import ( + "os" + "time" + + "github.com/flynn/flynn/Godeps/_workspace/src/github.com/flynn/que-go" + "github.com/flynn/flynn/Godeps/_workspace/src/github.com/jackc/pgx" + "github.com/flynn/flynn/Godeps/_workspace/src/gopkg.in/inconshreveable/log15.v2" + "github.com/flynn/flynn/controller/client" + "github.com/flynn/flynn/controller/worker/deployment" + "github.com/flynn/flynn/pkg/postgres" + "github.com/flynn/flynn/pkg/shutdown" +) + +const workerCount = 10 + +var logger = log15.New("app", "worker") + +func main() { + log := logger.New("fn", "main") + + log.Info("creating controller client") + client, err := controller.NewClient("", os.Getenv("AUTH_KEY")) + if err != nil { + log.Error("error creating controller client", "err", err) + shutdown.Fatal() + } + + log.Info("connecting to postgres") + db := postgres.Wait("", "") + + log.Info("creating postgres connection pool") + pgxpool, err := pgx.NewConnPool(pgx.ConnPoolConfig{ + ConnConfig: pgx.ConnConfig{ + Host: os.Getenv("PGHOST"), + User: os.Getenv("PGUSER"), + Password: os.Getenv("PGPASSWORD"), + Database: os.Getenv("PGDATABASE"), + }, + AfterConnect: que.PrepareStatements, + MaxConnections: workerCount, + }) + if err != nil { + log.Error("error creating postgres connection pool", "err", err) + shutdown.Fatal() + } + shutdown.BeforeExit(func() { pgxpool.Close() }) + + workers := que.NewWorkerPool( + que.NewClient(pgxpool), + que.WorkMap{ + "deployment": deployment.JobHandler(db, client, logger), + }, + workerCount, + ) + workers.Interval = 5 * time.Second + + log.Info("starting workers", "count", workerCount, "interval", workers.Interval) + workers.Start() + shutdown.BeforeExit(func() { workers.Shutdown() }) + + select {} // block and keep running +} diff --git a/test/test_scheduler.go b/test/test_scheduler.go index d703b9cbb1..490cb9ace1 100644 --- a/test/test_scheduler.go +++ b/test/test_scheduler.go @@ -414,7 +414,7 @@ loop: } expected := map[string]map[string]int{release.ID: { "web": 2, - "deployer": 2, + "worker": 2, "scheduler": testCluster.Size(), }} t.Assert(actual, c.DeepEquals, expected)