Skip to content
This repository has been archived by the owner on Sep 4, 2021. It is now read-only.

Commit

Permalink
Merge pull request #1532 from flynn/generic-worker
Browse files Browse the repository at this point in the history
controller: Convert the deployer to a generic worker
  • Loading branch information
lmars committed May 14, 2015
2 parents e11df3b + 047c4c6 commit 53a1dfe
Show file tree
Hide file tree
Showing 12 changed files with 165 additions and 145 deletions.
6 changes: 3 additions & 3 deletions bootstrap/manifest_template.json
Expand Up @@ -99,8 +99,8 @@
"service": "flynn-controller-scheduler",
"resurrect": true
},
"deployer": {
"cmd": ["deployer"]
"worker": {
"cmd": ["worker"]
}
}
},
Expand Down Expand Up @@ -150,7 +150,7 @@
"app_step": "controller-inception",
"processes": {
"scheduler": 1,
"deployer": 2,
"worker": 2,
"web": 2
}
},
Expand Down
2 changes: 1 addition & 1 deletion controller/Dockerfile
Expand Up @@ -2,7 +2,7 @@ FROM flynn/busybox

ADD bin/flynn-controller /bin/flynn-controller
ADD bin/flynn-scheduler /bin/flynn-scheduler
ADD bin/flynn-deployer /bin/flynn-deployer
ADD bin/flynn-worker /bin/flynn-worker
ADD start.sh /bin/start-flynn-controller
ADD bin/jsonschema /etc/flynn-controller/jsonschema

Expand Down
2 changes: 1 addition & 1 deletion controller/Tupfile
@@ -1,7 +1,7 @@
include_rules
: |> !go |> bin/flynn-controller
: |> !go ./scheduler |> bin/flynn-scheduler
: |> !go ./deployer |> bin/flynn-deployer
: |> !go ./worker |> bin/flynn-worker
: foreach $(ROOT)/schema/*.json |> !cp |> bin/jsonschema/%g.json
: foreach $(ROOT)/schema/controller/*.json |> !cp |> bin/jsonschema/controller/%g.json
: foreach $(ROOT)/schema/router/*.json |> !cp |> bin/jsonschema/router/%g.json
Expand Down
4 changes: 2 additions & 2 deletions controller/start.sh
Expand Up @@ -3,9 +3,9 @@
case $1 in
controller) exec /bin/flynn-controller ;;
scheduler) exec /bin/flynn-scheduler ;;
deployer) exec /bin/flynn-deployer ;;
worker) exec /bin/flynn-worker ;;
*)
echo "Usage: $0 {controller|scheduler|deployer}"
echo "Usage: $0 {controller|scheduler|worker}"
exit 2
;;
esac
@@ -1,9 +1,9 @@
package strategy
package deployment

import ct "github.com/flynn/flynn/controller/types"

func allAtOnce(d *Deploy) error {
log := d.logger.New("fn", "allAtOnce")
func (d *DeployJob) deployAllAtOnce() error {
log := d.logger.New("fn", "deployAllAtOnce")
log.Info("starting all-at-once deployment")

expected := make(jobEvents)
Expand Down
@@ -1,78 +1,31 @@
package main
package deployment

import (
"encoding/json"
"os"
"time"

"github.com/flynn/flynn/Godeps/_workspace/src/github.com/flynn/que-go"
"github.com/flynn/flynn/Godeps/_workspace/src/github.com/jackc/pgx"
"github.com/flynn/flynn/Godeps/_workspace/src/gopkg.in/inconshreveable/log15.v2"
"github.com/flynn/flynn/controller/client"
"github.com/flynn/flynn/controller/deployer/strategies"
ct "github.com/flynn/flynn/controller/types"
"github.com/flynn/flynn/discoverd/client"
"github.com/flynn/flynn/pkg/attempt"
"github.com/flynn/flynn/pkg/postgres"
"github.com/flynn/flynn/pkg/shutdown"
)

type context struct {
db *postgres.DB
client *controller.Client
logger log15.Logger
}

const workerCount = 10

var logger = log15.New("app", "deployer")

func main() {
log := logger.New("fn", "main")

log.Info("creating controller client")
client, err := controller.NewClient("", os.Getenv("AUTH_KEY"))
if err != nil {
log.Error("error creating controller client", "err", err)
shutdown.Fatal()
}

log.Info("connecting to postgres")
db := postgres.Wait("", "")

log.Info("creating postgres connection pool")
pgxpool, err := pgx.NewConnPool(pgx.ConnPoolConfig{
ConnConfig: pgx.ConnConfig{
Host: os.Getenv("PGHOST"),
User: os.Getenv("PGUSER"),
Password: os.Getenv("PGPASSWORD"),
Database: os.Getenv("PGDATABASE"),
},
AfterConnect: que.PrepareStatements,
MaxConnections: workerCount,
})
if err != nil {
log.Error("error creating postgres connection pool", "err", err)
shutdown.Fatal()
}
shutdown.BeforeExit(func() { pgxpool.Close() })

ctx := context{db: db, client: client}
workers := que.NewWorkerPool(
que.NewClient(pgxpool),
que.WorkMap{"deployment": ctx.HandleJob},
workerCount,
)
workers.Interval = 5 * time.Second

log.Info("starting workers", "count", workerCount, "interval", workers.Interval)
go workers.Start()
shutdown.BeforeExit(func() { workers.Shutdown() })

<-make(chan bool) // block and keep running
func JobHandler(db *postgres.DB, client *controller.Client, logger log15.Logger) func(*que.Job) error {
return (&context{db, client, logger}).HandleDeployment
}

func (c *context) HandleJob(job *que.Job) (e error) {
log := logger.New("fn", "HandleJob")
log.Info("handling job", "id", job.ID, "error_count", job.ErrorCount)
func (c *context) HandleDeployment(job *que.Job) (e error) {
log := c.logger.New("fn", "HandleDeployment")
log.Info("handling deployment", "job_id", job.ID, "error_count", job.ErrorCount)

var args ct.DeployID
if err := json.Unmarshal(job.Args, &args); err != nil {
Expand Down Expand Up @@ -122,7 +75,7 @@ func (c *context) HandleJob(job *que.Job) (e error) {
// rollback failed deploy
if e != nil {
errMsg := e.Error()
if !strategy.IsSkipRollback(e) {
if !IsSkipRollback(e) {
log.Warn("rolling back deployment due to error", "err", e)
e = c.rollback(log, deployment, f)
}
Expand All @@ -133,8 +86,22 @@ func (c *context) HandleJob(job *que.Job) (e error) {
}
}
}()

j := &DeployJob{
Deployment: deployment,
client: c.client,
deployEvents: events,
serviceEvents: make(chan *discoverd.Event),
useJobEvents: make(map[string]struct{}),
logger: c.logger,
oldReleaseState: make(map[string]int, len(deployment.Processes)),
newReleaseState: make(map[string]int, len(deployment.Processes)),
knownJobStates: make(map[jobIDState]struct{}),
omni: make(map[string]struct{}),
}

log.Info("performing deployment")
if err := strategy.Perform(deployment, c.client, events, logger); err != nil {
if err := j.Perform(); err != nil {
log.Error("error performing deployment", "err", err)
return err
}
Expand Down
24 changes: 24 additions & 0 deletions controller/worker/deployment/errors.go
@@ -0,0 +1,24 @@
package deployment

import "fmt"

type ErrSkipRollback struct {
Err string
}

func (e ErrSkipRollback) Error() string {
return e.Err
}

func IsSkipRollback(err error) bool {
_, ok := err.(ErrSkipRollback)
return ok
}

type UnknownStrategyError struct {
Strategy string
}

func (e UnknownStrategyError) Error() string {
return fmt.Sprintf("deployment: unknown strategy %q", e.Strategy)
}

0 comments on commit 53a1dfe

Please sign in to comment.