diff --git a/host/libcontainer_backend.go b/host/libcontainer_backend.go index 49e76fda35..2b6530ef74 100644 --- a/host/libcontainer_backend.go +++ b/host/libcontainer_backend.go @@ -742,6 +742,12 @@ func (c *Container) watch(ready chan<- error, buffer host.LogBuffer) error { log := c.l.logger.New("fn", "watch", "job.id", c.job.ID) log.Info("start watching container") + readyErr := func(err error) { + if ready != nil { + ready <- err + } + } + defer func() { c.container.Destroy() c.l.containersMtx.Lock() @@ -773,12 +779,10 @@ func (c *Container) watch(ready chan<- error, buffer host.LogBuffer) error { break } } - if ready != nil { - ready <- err - } if err != nil { log.Error("error connecting to container", "err", err) c.l.state.SetStatusFailed(c.job.ID, errors.New("failed to connect to container")) + readyErr(err) return err } defer c.Client.Close() @@ -787,6 +791,8 @@ func (c *Container) watch(ready chan<- error, buffer host.LogBuffer) error { c.l.containers[c.job.ID] = c c.l.containersMtx.Unlock() + readyErr(nil) + if !c.job.Config.DisableLog && !c.job.Config.TTY { if err := c.followLogs(log, buffer); err != nil { return err @@ -1194,7 +1200,6 @@ func (l *LibcontainerBackend) UnmarshalState(jobs map[string]*host.ActiveJob, jo container.cleanup() continue } - l.containers[j.Job.ID] = container } return nil } diff --git a/host/state.go b/host/state.go index 5af7d0a3e6..867e62d535 100644 --- a/host/state.go +++ b/host/state.go @@ -92,17 +92,6 @@ func (s *State) Restore(backend Backend, buffers host.LogBuffers) (func(), error return err } - if err := persistentBucket.ForEach(func(k, v []byte) error { - for _, job := range s.jobs { - if job.Job.ID == string(v) { - resurrect = append(resurrect, job.Job) - } - } - return nil - }); err != nil { - return err - } - // hand opaque blobs back to backend so it can do its restore backendJobsBlobs := make(map[string][]byte) if err := backendJobsBucket.ForEach(func(k, v []byte) error { @@ -116,12 +105,27 @@ func (s *State) Restore(backend Backend, buffers host.LogBuffers) (func(), error return err } + // resurrect any persistent jobs which are not running + if err := persistentBucket.ForEach(func(k, v []byte) error { + for _, job := range s.jobs { + if job.Job.ID == string(v) && !backend.JobExists(job.Job.ID) { + resurrect = append(resurrect, job.Job) + } + } + return nil + }); err != nil { + return err + } + return nil }); err != nil && err != io.EOF { return nil, fmt.Errorf("could not restore from host persistence db: %s", err) } return func() { + if len(resurrect) == 0 { + return + } var wg sync.WaitGroup wg.Add(len(resurrect)) for _, job := range resurrect {