Skip to content
This repository has been archived by the owner on Sep 4, 2021. It is now read-only.

controller: Convert the deployer to a generic worker #1532

Merged
merged 1 commit into from May 14, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions bootstrap/manifest_template.json
Expand Up @@ -99,8 +99,8 @@
"service": "flynn-controller-scheduler",
"resurrect": true
},
"deployer": {
"cmd": ["deployer"]
"worker": {
"cmd": ["worker"]
}
}
},
Expand Down Expand Up @@ -150,7 +150,7 @@
"app_step": "controller-inception",
"processes": {
"scheduler": 1,
"deployer": 2,
"worker": 2,
"web": 2
}
},
Expand Down
2 changes: 1 addition & 1 deletion controller/Dockerfile
Expand Up @@ -2,7 +2,7 @@ FROM flynn/busybox

ADD bin/flynn-controller /bin/flynn-controller
ADD bin/flynn-scheduler /bin/flynn-scheduler
ADD bin/flynn-deployer /bin/flynn-deployer
ADD bin/flynn-worker /bin/flynn-worker
ADD start.sh /bin/start-flynn-controller
ADD bin/jsonschema /etc/flynn-controller/jsonschema

Expand Down
2 changes: 1 addition & 1 deletion controller/Tupfile
@@ -1,7 +1,7 @@
include_rules
: |> !go |> bin/flynn-controller
: |> !go ./scheduler |> bin/flynn-scheduler
: |> !go ./deployer |> bin/flynn-deployer
: |> !go ./worker |> bin/flynn-worker
: foreach $(ROOT)/schema/*.json |> !cp |> bin/jsonschema/%g.json
: foreach $(ROOT)/schema/controller/*.json |> !cp |> bin/jsonschema/controller/%g.json
: foreach $(ROOT)/schema/router/*.json |> !cp |> bin/jsonschema/router/%g.json
Expand Down
4 changes: 2 additions & 2 deletions controller/start.sh
Expand Up @@ -3,9 +3,9 @@
case $1 in
controller) exec /bin/flynn-controller ;;
scheduler) exec /bin/flynn-scheduler ;;
deployer) exec /bin/flynn-deployer ;;
worker) exec /bin/flynn-worker ;;
*)
echo "Usage: $0 {controller|scheduler|deployer}"
echo "Usage: $0 {controller|scheduler|worker}"
exit 2
;;
esac
@@ -1,9 +1,9 @@
package strategy
package deployment

import ct "github.com/flynn/flynn/controller/types"

func allAtOnce(d *Deploy) error {
log := d.logger.New("fn", "allAtOnce")
func (d *DeployJob) deployAllAtOnce() error {
log := d.logger.New("fn", "deployAllAtOnce")
log.Info("starting all-at-once deployment")

expected := make(jobEvents)
Expand Down
@@ -1,78 +1,31 @@
package main
package deployment

import (
"encoding/json"
"os"
"time"

"github.com/flynn/flynn/Godeps/_workspace/src/github.com/flynn/que-go"
"github.com/flynn/flynn/Godeps/_workspace/src/github.com/jackc/pgx"
"github.com/flynn/flynn/Godeps/_workspace/src/gopkg.in/inconshreveable/log15.v2"
"github.com/flynn/flynn/controller/client"
"github.com/flynn/flynn/controller/deployer/strategies"
ct "github.com/flynn/flynn/controller/types"
"github.com/flynn/flynn/discoverd/client"
"github.com/flynn/flynn/pkg/attempt"
"github.com/flynn/flynn/pkg/postgres"
"github.com/flynn/flynn/pkg/shutdown"
)

type context struct {
db *postgres.DB
client *controller.Client
logger log15.Logger
}

const workerCount = 10

var logger = log15.New("app", "deployer")

func main() {
log := logger.New("fn", "main")

log.Info("creating controller client")
client, err := controller.NewClient("", os.Getenv("AUTH_KEY"))
if err != nil {
log.Error("error creating controller client", "err", err)
shutdown.Fatal()
}

log.Info("connecting to postgres")
db := postgres.Wait("", "")

log.Info("creating postgres connection pool")
pgxpool, err := pgx.NewConnPool(pgx.ConnPoolConfig{
ConnConfig: pgx.ConnConfig{
Host: os.Getenv("PGHOST"),
User: os.Getenv("PGUSER"),
Password: os.Getenv("PGPASSWORD"),
Database: os.Getenv("PGDATABASE"),
},
AfterConnect: que.PrepareStatements,
MaxConnections: workerCount,
})
if err != nil {
log.Error("error creating postgres connection pool", "err", err)
shutdown.Fatal()
}
shutdown.BeforeExit(func() { pgxpool.Close() })

ctx := context{db: db, client: client}
workers := que.NewWorkerPool(
que.NewClient(pgxpool),
que.WorkMap{"deployment": ctx.HandleJob},
workerCount,
)
workers.Interval = 5 * time.Second

log.Info("starting workers", "count", workerCount, "interval", workers.Interval)
go workers.Start()
shutdown.BeforeExit(func() { workers.Shutdown() })

<-make(chan bool) // block and keep running
func JobHandler(db *postgres.DB, client *controller.Client, logger log15.Logger) func(*que.Job) error {
return (&context{db, client, logger}).HandleDeployment
}

func (c *context) HandleJob(job *que.Job) (e error) {
log := logger.New("fn", "HandleJob")
log.Info("handling job", "id", job.ID, "error_count", job.ErrorCount)
func (c *context) HandleDeployment(job *que.Job) (e error) {
log := c.logger.New("fn", "HandleDeployment")
log.Info("handling deployment", "job_id", job.ID, "error_count", job.ErrorCount)

var args ct.DeployID
if err := json.Unmarshal(job.Args, &args); err != nil {
Expand Down Expand Up @@ -122,7 +75,7 @@ func (c *context) HandleJob(job *que.Job) (e error) {
// rollback failed deploy
if e != nil {
errMsg := e.Error()
if !strategy.IsSkipRollback(e) {
if !IsSkipRollback(e) {
log.Warn("rolling back deployment due to error", "err", e)
e = c.rollback(log, deployment, f)
}
Expand All @@ -133,8 +86,22 @@ func (c *context) HandleJob(job *que.Job) (e error) {
}
}
}()

j := &DeployJob{
Deployment: deployment,
client: c.client,
deployEvents: events,
serviceEvents: make(chan *discoverd.Event),
useJobEvents: make(map[string]struct{}),
logger: c.logger,
oldReleaseState: make(map[string]int, len(deployment.Processes)),
newReleaseState: make(map[string]int, len(deployment.Processes)),
knownJobStates: make(map[jobIDState]struct{}),
omni: make(map[string]struct{}),
}

log.Info("performing deployment")
if err := strategy.Perform(deployment, c.client, events, logger); err != nil {
if err := j.Perform(); err != nil {
log.Error("error performing deployment", "err", err)
return err
}
Expand Down
24 changes: 24 additions & 0 deletions controller/worker/deployment/errors.go
@@ -0,0 +1,24 @@
package deployment

import "fmt"

type ErrSkipRollback struct {
Err string
}

func (e ErrSkipRollback) Error() string {
return e.Err
}

func IsSkipRollback(err error) bool {
_, ok := err.(ErrSkipRollback)
return ok
}

type UnknownStrategyError struct {
Strategy string
}

func (e UnknownStrategyError) Error() string {
return fmt.Sprintf("deployment: unknown strategy %q", e.Strategy)
}