diff --git a/hyperstart/libhyperstart/hyperstart.go b/hyperstart/libhyperstart/hyperstart.go index 2c4c85ff..44de7fa9 100644 --- a/hyperstart/libhyperstart/hyperstart.go +++ b/hyperstart/libhyperstart/hyperstart.go @@ -12,11 +12,11 @@ type Hyperstart interface { LastStreamSeq() uint64 APIVersion() (uint32, error) - ProcessAsyncEvents() (<-chan hyperstartapi.ProcessAsyncEvent, error) NewContainer(c *hyperstartapi.Container) (io.WriteCloser, io.ReadCloser, io.ReadCloser, error) RestoreContainer(c *hyperstartapi.Container) (io.WriteCloser, io.ReadCloser, io.ReadCloser, error) AddProcess(container string, p *hyperstartapi.Process) (io.WriteCloser, io.ReadCloser, io.ReadCloser, error) SignalProcess(container, process string, signal syscall.Signal) error + WaitProcess(container, process string) int TtyWinResize(container, process string, row, col uint16) error StartSandbox(pod *hyperstartapi.Pod) error diff --git a/hyperstart/libhyperstart/json.go b/hyperstart/libhyperstart/json.go index 389b1c6a..210355a1 100644 --- a/hyperstart/libhyperstart/json.go +++ b/hyperstart/libhyperstart/json.go @@ -30,19 +30,21 @@ type pState struct { stdinPipe streamIn stdoutPipe io.ReadCloser stderrPipe io.ReadCloser + exitStatus *int + outClosed bool + waitChan chan int } type jsonBasedHyperstart struct { sync.RWMutex - logPrefix string - vmAPIVersion uint32 - closed bool - lastStreamSeq uint64 - procs map[pKey]*pState - streamOuts map[uint64]streamOut - ctlChan chan *hyperstartCmd - streamChan chan *hyperstartapi.TtyMessage - processAsyncEvents chan hyperstartapi.ProcessAsyncEvent + logPrefix string + vmAPIVersion uint32 + closed bool + lastStreamSeq uint64 + procs map[pKey]*pState + streamOuts map[uint64]streamOut + ctlChan chan *hyperstartCmd + streamChan chan *hyperstartapi.TtyMessage } type hyperstartCmd struct { @@ -56,13 +58,12 @@ type hyperstartCmd struct { func NewJsonBasedHyperstart(id, ctlSock, streamSock string, lastStreamSeq uint64, waitReady bool) Hyperstart { h := &jsonBasedHyperstart{ - logPrefix: fmt.Sprintf("SB[%s] ", id), - procs: make(map[pKey]*pState), - lastStreamSeq: lastStreamSeq, - streamOuts: make(map[uint64]streamOut), - ctlChan: make(chan *hyperstartCmd, 128), - streamChan: make(chan *hyperstartapi.TtyMessage, 128), - processAsyncEvents: make(chan hyperstartapi.ProcessAsyncEvent, 16), + logPrefix: fmt.Sprintf("SB[%s] ", id), + procs: make(map[pKey]*pState), + lastStreamSeq: lastStreamSeq, + streamOuts: make(map[uint64]streamOut), + ctlChan: make(chan *hyperstartCmd, 128), + streamChan: make(chan *hyperstartapi.TtyMessage, 128), } go handleStreamSock(h, streamSock) go handleCtlSock(h, ctlSock, waitReady) @@ -86,17 +87,18 @@ func (h *jsonBasedHyperstart) Close() { defer h.Unlock() if !h.closed { h.Log(TRACE, "close jsonBasedHyperstart") - for pk := range h.procs { - h.processAsyncEvents <- hyperstartapi.ProcessAsyncEvent{Container: pk.c, Process: pk.p, Event: "finished", Status: 255} - } - h.procs = make(map[pKey]*pState) for _, out := range h.streamOuts { out.Close() } h.streamOuts = make(map[uint64]streamOut) + for pk, ps := range h.procs { + ps.outClosed = true + ps.exitStatus = makeExitStatus(255) + h.handleWaitProcess(pk, ps) + } + h.procs = make(map[pKey]*pState) close(h.ctlChan) close(h.streamChan) - close(h.processAsyncEvents) for cmd := range h.ctlChan { if cmd.Code != hyperstartapi.INIT_ACK && cmd.Code != hyperstartapi.INIT_ERROR { cmd.result <- fmt.Errorf("hyperstart closed") @@ -106,10 +108,6 @@ func (h *jsonBasedHyperstart) Close() { } } -func (h *jsonBasedHyperstart) ProcessAsyncEvents() (<-chan hyperstartapi.ProcessAsyncEvent, error) { - return h.processAsyncEvents, nil -} - func (h *jsonBasedHyperstart) LastStreamSeq() uint64 { return h.lastStreamSeq } @@ -459,13 +457,22 @@ func handleStreamFromHyperstart(h *jsonBasedHyperstart, conn io.Reader) { } } +func makeExitStatus(status int) *int { return &status } + +func (h *jsonBasedHyperstart) handleWaitProcess(pk pKey, ps *pState) { + if ps.exitStatus != nil && ps.waitChan != nil && ps.outClosed { + delete(h.procs, pk) + ps.waitChan <- *ps.exitStatus + } +} + func (h *jsonBasedHyperstart) sendProcessAsyncEvent(pae hyperstartapi.ProcessAsyncEvent) { h.Lock() defer h.Unlock() pk := pKey{c: pae.Container, p: pae.Process} - if _, ok := h.procs[pk]; ok { - delete(h.procs, pk) - h.processAsyncEvents <- pae + if ps, ok := h.procs[pk]; ok { + ps.exitStatus = makeExitStatus(pae.Status) + h.handleWaitProcess(pk, ps) } } @@ -474,8 +481,8 @@ func (h *jsonBasedHyperstart) sendProcessAsyncEvent4242(stdioSeq uint64, code ui defer h.Unlock() for pk, ps := range h.procs { if ps.stdioSeq == stdioSeq { - delete(h.procs, pk) - h.processAsyncEvents <- hyperstartapi.ProcessAsyncEvent{Container: pk.c, Process: pk.p, Event: "finished", Status: int(code)} + ps.exitStatus = makeExitStatus(int(code)) + h.handleWaitProcess(pk, ps) } } } @@ -487,8 +494,17 @@ func (h *jsonBasedHyperstart) removeStreamOut(seq uint64) { // doesn't send eof of the stderr back, so we also remove stderr seq here if out, ok := h.streamOuts[seq]; ok { delete(h.streamOuts, seq) - if seq == out.ps.stdioSeq && out.ps.stderrSeq > 0 { - delete(h.streamOuts, out.ps.stderrSeq) + if seq == out.ps.stdioSeq { + if out.ps.stderrSeq > 0 { + h.streamOuts[out.ps.stderrSeq].Close() + delete(h.streamOuts, out.ps.stderrSeq) + } + for pk, ps := range h.procs { + if ps.stdioSeq == seq { + ps.outClosed = true + h.handleWaitProcess(pk, ps) + } + } } } } @@ -752,6 +768,26 @@ func (h *jsonBasedHyperstart) SignalProcess(container, process string, signal sy }) } +// wait the process until exit. like waitpid() +// the state is saved until someone calls WaitProcess() if the process exited earlier +// the non-first call of WaitProcess() after process started MAY fail to find the process if the process exited earlier +func (h *jsonBasedHyperstart) WaitProcess(container, process string) int { + h.Lock() + pk := pKey{c: container, p: process} + if ps, ok := h.procs[pk]; ok { + if ps.waitChan == nil { + ps.waitChan = make(chan int, 1) + h.handleWaitProcess(pk, ps) + } + h.Unlock() + status := <-ps.waitChan + ps.waitChan <- status + return status + } + h.Unlock() + return -1 +} + func (h *jsonBasedHyperstart) StartSandbox(pod *hyperstartapi.Pod) error { return h.hyperstartCommand(hyperstartapi.INIT_STARTPOD, pod) } diff --git a/hypervisor/context.go b/hypervisor/context.go index 260106ba..1304f9c8 100644 --- a/hypervisor/context.go +++ b/hypervisor/context.go @@ -236,31 +236,6 @@ func (ctx *VmContext) LookupBySession(session uint64) string { return "" } -func (ctx *VmContext) handleProcessAsyncEvent(pae *hyperstartapi.ProcessAsyncEvent) { - if pae.Event != "finished" { - return - } - if pae.Process == "init" { - ctx.reportProcessFinished(types.E_CONTAINER_FINISHED, &types.ProcessFinished{ - Id: pae.Container, Code: uint8(pae.Status), Ack: make(chan bool, 1), - }) - ctx.lock.Lock() - if c, ok := ctx.containers[pae.Container]; ok { - c.Log(TRACE, "container finished, unset iostream pipes") - c.stdinPipe = nil - c.stdoutPipe = nil - c.stderrPipe = nil - c.tty = nil - } - ctx.lock.Unlock() - } else { - ctx.DeleteExec(pae.Process) - ctx.reportProcessFinished(types.E_EXEC_FINISHED, &types.ProcessFinished{ - Id: pae.Process, Code: uint8(pae.Status), Ack: make(chan bool, 1), - }) - } -} - func (ctx *VmContext) Close() { ctx.closeOnce.Do(func() { ctx.Log(INFO, "VmContext Close()") diff --git a/hypervisor/hypervisor.go b/hypervisor/hypervisor.go index dbc8183e..97eb6926 100644 --- a/hypervisor/hypervisor.go +++ b/hypervisor/hypervisor.go @@ -31,18 +31,6 @@ func (ctx *VmContext) loop() { ctx.Log(DEBUG, "main event loop exiting") } -func (ctx *VmContext) handlePAEs() { - ch, err := ctx.hyperstart.ProcessAsyncEvents() - if err == nil { - for e := range ch { - ctx.handleProcessAsyncEvent(&e) - } - } - ctx.hyperstart.Close() - ctx.Log(ERROR, "hyperstart stopped") - ctx.Hub <- &Interrupted{Reason: "hyperstart stopped"} -} - func (ctx *VmContext) watchHyperstart(sendReadyEvent bool) { timeout := time.AfterFunc(30*time.Second, func() { if ctx.PauseState == PauseStateUnpaused { @@ -92,7 +80,6 @@ func (ctx *VmContext) Launch() { } go ctx.loop() - go ctx.handlePAEs() } func VmAssociate(vmId string, hub chan VmEvent, client chan *types.VmResponse, pack []byte) (*VmContext, error) { @@ -130,7 +117,6 @@ func VmAssociate(vmId string, hub chan VmEvent, client chan *types.VmResponse, p //} go context.watchHyperstart(false) - go context.handlePAEs() go context.loop() return context, nil } diff --git a/hypervisor/vm.go b/hypervisor/vm.go index 67ebd326..6bc4b1ef 100644 --- a/hypervisor/vm.go +++ b/hypervisor/vm.go @@ -493,6 +493,13 @@ func (vm *Vm) AddProcess(container, execId string, terminal bool, args []string, } go streamCopy(tty, stdinPipe, stdoutPipe, stderrPipe) + go func() { + status := vm.ctx.hyperstart.WaitProcess(container, execId) + vm.ctx.DeleteExec(execId) + vm.ctx.reportProcessFinished(types.E_EXEC_FINISHED, &types.ProcessFinished{ + Id: execId, Code: uint8(status), Ack: make(chan bool, 1), + }) + }() return nil } diff --git a/hypervisor/vm_states.go b/hypervisor/vm_states.go index 908441ad..cc62350d 100644 --- a/hypervisor/vm_states.go +++ b/hypervisor/vm_states.go @@ -9,6 +9,7 @@ import ( "time" "github.com/hyperhq/hypercontainer-utils/hlog" + "github.com/hyperhq/runv/hypervisor/types" ) // states @@ -44,6 +45,21 @@ func (ctx *VmContext) newContainer(id string) error { go streamCopy(c.tty, c.stdinPipe, c.stdoutPipe, c.stderrPipe) } ctx.Log(TRACE, "sent INIT_NEWCONTAINER") + go func() { + status := ctx.hyperstart.WaitProcess(id, "init") + ctx.reportProcessFinished(types.E_CONTAINER_FINISHED, &types.ProcessFinished{ + Id: id, Code: uint8(status), Ack: make(chan bool, 1), + }) + ctx.lock.Lock() + if c, ok := ctx.containers[id]; ok { + c.Log(TRACE, "container finished, unset iostream pipes") + c.stdinPipe = nil + c.stdoutPipe = nil + c.stderrPipe = nil + c.tty = nil + } + ctx.lock.Unlock() + }() return err } else { return fmt.Errorf("container %s not exist", id) @@ -72,6 +88,21 @@ func (ctx *VmContext) restoreContainer(id string) (alive bool, err error) { } return false, nil } + go func() { + status := ctx.hyperstart.WaitProcess(id, "init") + ctx.reportProcessFinished(types.E_CONTAINER_FINISHED, &types.ProcessFinished{ + Id: id, Code: uint8(status), Ack: make(chan bool, 1), + }) + ctx.lock.Lock() + if c, ok := ctx.containers[id]; ok { + c.Log(TRACE, "container finished, unset iostream pipes") + c.stdinPipe = nil + c.stdoutPipe = nil + c.stderrPipe = nil + c.tty = nil + } + ctx.lock.Unlock() + }() return true, nil }