Skip to content
This repository has been archived by the owner on Sep 4, 2021. It is now read-only.

Commit

Permalink
host: Only resurrect jobs if they are not running
Browse files Browse the repository at this point in the history
Signed-off-by: Lewis Marshall <lewis@lmars.net>
  • Loading branch information
lmars committed Jul 8, 2016
1 parent 3c78d53 commit c7d703d
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 15 deletions.
13 changes: 9 additions & 4 deletions host/libcontainer_backend.go
Expand Up @@ -742,6 +742,12 @@ func (c *Container) watch(ready chan<- error, buffer host.LogBuffer) error {
log := c.l.logger.New("fn", "watch", "job.id", c.job.ID)
log.Info("start watching container")

readyErr := func(err error) {
if ready != nil {
ready <- err
}
}

defer func() {
c.container.Destroy()
c.l.containersMtx.Lock()
Expand Down Expand Up @@ -773,12 +779,10 @@ func (c *Container) watch(ready chan<- error, buffer host.LogBuffer) error {
break
}
}
if ready != nil {
ready <- err
}
if err != nil {
log.Error("error connecting to container", "err", err)
c.l.state.SetStatusFailed(c.job.ID, errors.New("failed to connect to container"))
readyErr(err)
return err
}
defer c.Client.Close()
Expand All @@ -787,6 +791,8 @@ func (c *Container) watch(ready chan<- error, buffer host.LogBuffer) error {
c.l.containers[c.job.ID] = c
c.l.containersMtx.Unlock()

readyErr(nil)

if !c.job.Config.DisableLog && !c.job.Config.TTY {
if err := c.followLogs(log, buffer); err != nil {
return err
Expand Down Expand Up @@ -1194,7 +1200,6 @@ func (l *LibcontainerBackend) UnmarshalState(jobs map[string]*host.ActiveJob, jo
container.cleanup()
continue
}
l.containers[j.Job.ID] = container
}
return nil
}
Expand Down
26 changes: 15 additions & 11 deletions host/state.go
Expand Up @@ -92,17 +92,6 @@ func (s *State) Restore(backend Backend, buffers host.LogBuffers) (func(), error
return err
}

if err := persistentBucket.ForEach(func(k, v []byte) error {
for _, job := range s.jobs {
if job.Job.ID == string(v) {
resurrect = append(resurrect, job.Job)
}
}
return nil
}); err != nil {
return err
}

// hand opaque blobs back to backend so it can do its restore
backendJobsBlobs := make(map[string][]byte)
if err := backendJobsBucket.ForEach(func(k, v []byte) error {
Expand All @@ -116,12 +105,27 @@ func (s *State) Restore(backend Backend, buffers host.LogBuffers) (func(), error
return err
}

// resurrect any persistent jobs which are not running
if err := persistentBucket.ForEach(func(k, v []byte) error {
for _, job := range s.jobs {
if job.Job.ID == string(v) && !backend.JobExists(job.Job.ID) {
resurrect = append(resurrect, job.Job)
}
}
return nil
}); err != nil {
return err
}

return nil
}); err != nil && err != io.EOF {
return nil, fmt.Errorf("could not restore from host persistence db: %s", err)
}

return func() {
if len(resurrect) == 0 {
return
}
var wg sync.WaitGroup
wg.Add(len(resurrect))
for _, job := range resurrect {
Expand Down

0 comments on commit c7d703d

Please sign in to comment.