Skip to content
This repository has been archived by the owner on Sep 4, 2021. It is now read-only.

Commit

Permalink
Merge pull request #2537 from flynn/deployer-fix-release-events
Browse files Browse the repository at this point in the history
worker: Watch job events per release when deploying
  • Loading branch information
lmars committed Mar 7, 2016
2 parents d810dbd + 915851d commit 585acf6
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 75 deletions.
3 changes: 1 addition & 2 deletions controller/worker/deployment/context.go
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/flynn/flynn/controller/client"
ct "github.com/flynn/flynn/controller/types"
"github.com/flynn/flynn/controller/worker/types"
"github.com/flynn/flynn/discoverd/client"
"github.com/flynn/flynn/pkg/attempt"
"github.com/flynn/flynn/pkg/postgres"
)
Expand Down Expand Up @@ -97,7 +96,7 @@ func (c *context) HandleDeployment(job *que.Job) (e error) {
client: c.client,
deployEvents: events,
serviceNames: make(map[string]string),
serviceEvents: make(chan *discoverd.Event),
jobEvents: make(map[string]chan *JobEvent),
useJobEvents: make(map[string]struct{}),
logger: c.logger,
oldReleaseState: make(map[string]int, len(deployment.Processes)),
Expand Down
210 changes: 144 additions & 66 deletions controller/worker/deployment/job.go
Expand Up @@ -3,6 +3,7 @@ package deployment
import (
"errors"
"fmt"
"sync"
"time"

"github.com/flynn/flynn/Godeps/_workspace/src/gopkg.in/inconshreveable/log15.v2"
Expand All @@ -18,13 +19,38 @@ type jobIDState struct {
state ct.JobState
}

type JobEventType int

const (
JobEventTypeDiscoverd JobEventType = iota
JobEventTypeController
JobEventTypeError
)

// JobEvent is a wrapper around either a discoverd service event, a controller
// job event or a stream error, and helps the deploy to have a separate
// channel per release when waiting for job events
type JobEvent struct {
Type JobEventType
DiscoverdEvent *discoverd.Event
JobEvent *ct.Job
Error error
}

type DeployJob struct {
*ct.Deployment
client *controller.Client
deployEvents chan<- ct.DeploymentEvent
jobEvents chan *ct.Job
client *controller.Client
deployEvents chan<- ct.DeploymentEvent

// jobEvents is a map of release IDs to channels which receive job
// events for that particular release, and is used rather than just
// one channel for all events so that events received whilst waiting
// for one release to scale aren't dropped if they are for another
// release which may be scaled in the future
jobEvents map[string]chan *JobEvent
jobEventsMtx sync.Mutex

serviceNames map[string]string
serviceEvents chan *discoverd.Event
serviceMeta *discoverd.ServiceMeta
useJobEvents map[string]struct{}
logger log15.Logger
Expand All @@ -36,6 +62,38 @@ type DeployJob struct {
stop chan struct{}
}

// ReleaseJobEvents lazily creates and returns a channel of job events for the
// given release
func (d *DeployJob) ReleaseJobEvents(releaseID string) chan *JobEvent {
d.jobEventsMtx.Lock()
defer d.jobEventsMtx.Unlock()
if ch, ok := d.jobEvents[releaseID]; ok {
return ch
}
// give the channel a buffer so events for a release not being waited
// on do not block events for releases being waited on
ch := make(chan *JobEvent, 100)
d.jobEvents[releaseID] = ch
return ch
}

// JobEventErr sends the given error on job event channels for all releases.
//
// It does not close the channels because there are multiple publishers which
// could break (i.e. controller job events and discoverd service events), but
// everything will ultimately be closed when this error is received and the
// deferred stream closes kick in.
func (d *DeployJob) JobEventErr(err error) {
d.jobEventsMtx.Lock()
defer d.jobEventsMtx.Unlock()
for _, ch := range d.jobEvents {
ch <- &JobEvent{
Type: JobEventTypeError,
Error: err,
}
}
}

func (d *DeployJob) isOmni(typ string) bool {
_, ok := d.omni[typ]
return ok
Expand Down Expand Up @@ -132,25 +190,50 @@ func (d *DeployJob) Perform() error {
for {
event, ok := <-events
if !ok {
// if this happens, it means defer cleanup is in progress

// TODO: this could also happen if the stream connection
// dropped. handle that case
// this usually means deferred cleanup is in progress, but send an error
// in case the deploy is still waiting for an event which will now not come.
d.JobEventErr(errors.New("unexpected close of service event stream"))
return
}
d.serviceEvents <- event
if event.Instance == nil {
continue
}
if id, ok := event.Instance.Meta["FLYNN_APP_ID"]; !ok || id != d.AppID {
continue
}
releaseID, ok := event.Instance.Meta["FLYNN_RELEASE_ID"]
if !ok {
continue
}
d.ReleaseJobEvents(releaseID) <- &JobEvent{
Type: JobEventTypeDiscoverd,
DiscoverdEvent: event,
}
}
}()
}

log.Info("getting job event stream")
d.jobEvents = make(chan *ct.Job)
stream, err := d.client.StreamJobEvents(d.AppID, d.jobEvents)
jobEvents := make(chan *ct.Job)
stream, err := d.client.StreamJobEvents(d.AppID, jobEvents)
if err != nil {
log.Error("error getting job event stream", "err", err)
return err
}
defer stream.Close()
go func() {
for {
event, ok := <-jobEvents
if !ok {
d.JobEventErr(errors.New("unexpected close of job event stream"))
return
}
d.ReleaseJobEvents(event.ReleaseID) <- &JobEvent{
Type: JobEventTypeController,
JobEvent: event,
}
}
}()

log.Info("getting current jobs")
jobs, err := d.client.JobList(d.AppID)
Expand Down Expand Up @@ -213,69 +296,64 @@ func (d *DeployJob) waitForJobEvents(releaseID string, expected ct.JobEvents, lo
}
}

jobEvents := d.ReleaseJobEvents(releaseID)
for {
select {
case <-d.stop:
return worker.ErrStopped
case event := <-d.serviceEvents:
if !event.Kind.Any(discoverd.EventKindUp, discoverd.EventKindUpdate) {
continue
}
if id, ok := event.Instance.Meta["FLYNN_APP_ID"]; !ok || id != d.AppID {
continue
}
if id, ok := event.Instance.Meta["FLYNN_RELEASE_ID"]; !ok || id != releaseID {
continue
}
typ, ok := event.Instance.Meta["FLYNN_PROCESS_TYPE"]
if !ok {
continue
}
if _, ok := d.useJobEvents[typ]; ok {
continue
}
jobID, ok := event.Instance.Meta["FLYNN_JOB_ID"]
if !ok {
continue
}
log.Info("got service event", "job.id", jobID, "job.type", typ, "job.state", event.Kind)
handleEvent(jobID, typ, ct.JobStateUp)
if expected.Equals(actual) {
return nil
}
case event, ok := <-d.jobEvents:
if !ok {
return errors.New("unexpected close of job event stream")
}
if event.ReleaseID != releaseID {
continue
}

// if service discovery is being used for the job's type, ignore up events and fail
// the deployment if we get a down event when waiting for the job to come up.
if _, ok := d.useJobEvents[event.Type]; !ok {
if event.State == ct.JobStateUp {
case e := <-jobEvents:
switch e.Type {
case JobEventTypeDiscoverd:
event := e.DiscoverdEvent
if !event.Kind.Any(discoverd.EventKindUp, discoverd.EventKindUpdate) {
continue
}
typ, ok := event.Instance.Meta["FLYNN_PROCESS_TYPE"]
if !ok {
continue
}
if expected[event.Type][ct.JobStateUp] > 0 && event.IsDown() {
handleEvent(event.ID, event.Type, ct.JobStateDown)
return fmt.Errorf("%s process type failed to start, got %s job event", event.Type, event.State)
if _, ok := d.useJobEvents[typ]; ok {
continue
}
jobID, ok := event.Instance.Meta["FLYNN_JOB_ID"]
if !ok {
continue
}
log.Info("got service event", "job.id", jobID, "job.type", typ, "job.state", event.Kind)
handleEvent(jobID, typ, ct.JobStateUp)
if expected.Equals(actual) {
return nil
}
case JobEventTypeController:
event := e.JobEvent
// if service discovery is being used for the job's type, ignore up events and fail
// the deployment if we get a down event when waiting for the job to come up.
if _, ok := d.useJobEvents[event.Type]; !ok {
if event.State == ct.JobStateUp {
continue
}
if expected[event.Type][ct.JobStateUp] > 0 && event.IsDown() {
handleEvent(event.ID, event.Type, ct.JobStateDown)
return fmt.Errorf("%s process type failed to start, got %s job event", event.Type, event.State)
}
}
}

log.Info("got job event", "job.id", event.ID, "job.type", event.Type, "job.state", event.State)
if event.State == ct.JobStateStarting {
continue
}
if _, ok := actual[event.Type]; !ok {
actual[event.Type] = make(map[ct.JobState]int)
}
handleEvent(event.ID, event.Type, event.State)
if event.HostError != nil {
return fmt.Errorf("deployer: %s job failed to start: %s", event.Type, *event.HostError)
}
if expected.Equals(actual) {
return nil
log.Info("got job event", "job.id", event.ID, "job.type", event.Type, "job.state", event.State)
if event.State == ct.JobStateStarting {
continue
}
if _, ok := actual[event.Type]; !ok {
actual[event.Type] = make(map[ct.JobState]int)
}
handleEvent(event.ID, event.Type, event.State)
if event.HostError != nil {
return fmt.Errorf("deployer: %s job failed to start: %s", event.Type, *event.HostError)
}
if expected.Equals(actual) {
return nil
}
case JobEventTypeError:
return e.Error
}
case <-time.After(60 * time.Second):
return fmt.Errorf("timed out waiting for job events: %v", expected)
Expand Down
34 changes: 27 additions & 7 deletions controller/worker/deployment/postgres.go
Expand Up @@ -71,9 +71,17 @@ func (d *DeployJob) deployPostgres() (err error) {
return err
}
log.Info("waiting for postgres to stop")
jobEvents := d.ReleaseJobEvents(d.OldReleaseID)
for {
select {
case event := <-d.serviceEvents:
case e := <-jobEvents:
if e.Type == JobEventTypeError {
return e.Error
}
if e.Type != JobEventTypeDiscoverd {
continue
}
event := e.DiscoverdEvent
if event.Kind == discoverd.EventKindDown && event.Instance.ID == inst.ID {
d.deployEvents <- ct.DeploymentEvent{
ReleaseID: d.OldReleaseID,
Expand Down Expand Up @@ -108,13 +116,20 @@ func (d *DeployJob) deployPostgres() (err error) {
}
log.Info("waiting for new instance to come up")
var inst *discoverd.Instance
jobEvents := d.ReleaseJobEvents(d.NewReleaseID)
loop:
for {
select {
case event := <-d.serviceEvents:
case e := <-jobEvents:
if e.Type == JobEventTypeError {
return nil, e.Error
}
if e.Type != JobEventTypeDiscoverd {
continue
}
event := e.DiscoverdEvent
if event.Kind == discoverd.EventKindUp &&
event.Instance.Meta != nil &&
event.Instance.Meta["FLYNN_RELEASE_ID"] == d.NewReleaseID &&
event.Instance.Meta["FLYNN_PROCESS_TYPE"] == "postgres" {
inst = event.Instance
break loop
Expand Down Expand Up @@ -220,15 +235,20 @@ func (d *DeployJob) deployPostgres() (err error) {

log.Info(fmt.Sprintf("waiting for %d job down events", d.Processes["postgres"]))
actual := 0
jobEvents := d.ReleaseJobEvents(d.OldReleaseID)
loop:
for {
select {
case event, ok := <-d.jobEvents:
if !ok {
return loggedErr("unexpected close of job event stream")
case e := <-jobEvents:
if e.Type == JobEventTypeError {
return loggedErr(e.Error.Error())
}
if e.Type != JobEventTypeController {
continue
}
event := e.JobEvent
log.Info("got job event", "job_id", event.ID, "type", event.Type, "state", event.State)
if event.State == ct.JobStateDown && event.Type == "postgres" && event.ReleaseID == d.OldReleaseID {
if event.State == ct.JobStateDown && event.Type == "postgres" {
actual++
if actual == d.Processes["postgres"] {
break loop
Expand Down

0 comments on commit 585acf6

Please sign in to comment.