Skip to content
This repository was archived by the owner on Feb 8, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion hyperstart/libhyperstart/hyperstart.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ type Hyperstart interface {
LastStreamSeq() uint64

APIVersion() (uint32, error)
ProcessAsyncEvents() (<-chan hyperstartapi.ProcessAsyncEvent, error)
NewContainer(c *hyperstartapi.Container) (io.WriteCloser, io.ReadCloser, io.ReadCloser, error)
RestoreContainer(c *hyperstartapi.Container) (io.WriteCloser, io.ReadCloser, io.ReadCloser, error)
AddProcess(container string, p *hyperstartapi.Process) (io.WriteCloser, io.ReadCloser, io.ReadCloser, error)
SignalProcess(container, process string, signal syscall.Signal) error
WaitProcess(container, process string) int
TtyWinResize(container, process string, row, col uint16) error

StartSandbox(pod *hyperstartapi.Pod) error
Expand Down
100 changes: 68 additions & 32 deletions hyperstart/libhyperstart/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,21 @@ type pState struct {
stdinPipe streamIn
stdoutPipe io.ReadCloser
stderrPipe io.ReadCloser
exitStatus *int
outClosed bool
waitChan chan int
}

type jsonBasedHyperstart struct {
sync.RWMutex
logPrefix string
vmAPIVersion uint32
closed bool
lastStreamSeq uint64
procs map[pKey]*pState
streamOuts map[uint64]streamOut
ctlChan chan *hyperstartCmd
streamChan chan *hyperstartapi.TtyMessage
processAsyncEvents chan hyperstartapi.ProcessAsyncEvent
logPrefix string
vmAPIVersion uint32
closed bool
lastStreamSeq uint64
procs map[pKey]*pState
streamOuts map[uint64]streamOut
ctlChan chan *hyperstartCmd
streamChan chan *hyperstartapi.TtyMessage
}

type hyperstartCmd struct {
Expand All @@ -56,13 +58,12 @@ type hyperstartCmd struct {

func NewJsonBasedHyperstart(id, ctlSock, streamSock string, lastStreamSeq uint64, waitReady bool) Hyperstart {
h := &jsonBasedHyperstart{
logPrefix: fmt.Sprintf("SB[%s] ", id),
procs: make(map[pKey]*pState),
lastStreamSeq: lastStreamSeq,
streamOuts: make(map[uint64]streamOut),
ctlChan: make(chan *hyperstartCmd, 128),
streamChan: make(chan *hyperstartapi.TtyMessage, 128),
processAsyncEvents: make(chan hyperstartapi.ProcessAsyncEvent, 16),
logPrefix: fmt.Sprintf("SB[%s] ", id),
procs: make(map[pKey]*pState),
lastStreamSeq: lastStreamSeq,
streamOuts: make(map[uint64]streamOut),
ctlChan: make(chan *hyperstartCmd, 128),
streamChan: make(chan *hyperstartapi.TtyMessage, 128),
}
go handleStreamSock(h, streamSock)
go handleCtlSock(h, ctlSock, waitReady)
Expand All @@ -86,17 +87,18 @@ func (h *jsonBasedHyperstart) Close() {
defer h.Unlock()
if !h.closed {
h.Log(TRACE, "close jsonBasedHyperstart")
for pk := range h.procs {
h.processAsyncEvents <- hyperstartapi.ProcessAsyncEvent{Container: pk.c, Process: pk.p, Event: "finished", Status: 255}
}
h.procs = make(map[pKey]*pState)
for _, out := range h.streamOuts {
out.Close()
}
h.streamOuts = make(map[uint64]streamOut)
for pk, ps := range h.procs {
ps.outClosed = true
ps.exitStatus = makeExitStatus(255)
h.handleWaitProcess(pk, ps)
}
h.procs = make(map[pKey]*pState)
close(h.ctlChan)
close(h.streamChan)
close(h.processAsyncEvents)
for cmd := range h.ctlChan {
if cmd.Code != hyperstartapi.INIT_ACK && cmd.Code != hyperstartapi.INIT_ERROR {
cmd.result <- fmt.Errorf("hyperstart closed")
Expand All @@ -106,10 +108,6 @@ func (h *jsonBasedHyperstart) Close() {
}
}

func (h *jsonBasedHyperstart) ProcessAsyncEvents() (<-chan hyperstartapi.ProcessAsyncEvent, error) {
return h.processAsyncEvents, nil
}

func (h *jsonBasedHyperstart) LastStreamSeq() uint64 {
return h.lastStreamSeq
}
Expand Down Expand Up @@ -459,13 +457,22 @@ func handleStreamFromHyperstart(h *jsonBasedHyperstart, conn io.Reader) {
}
}

func makeExitStatus(status int) *int { return &status }

func (h *jsonBasedHyperstart) handleWaitProcess(pk pKey, ps *pState) {
if ps.exitStatus != nil && ps.waitChan != nil && ps.outClosed {
delete(h.procs, pk)
ps.waitChan <- *ps.exitStatus
}
}

func (h *jsonBasedHyperstart) sendProcessAsyncEvent(pae hyperstartapi.ProcessAsyncEvent) {
h.Lock()
defer h.Unlock()
pk := pKey{c: pae.Container, p: pae.Process}
if _, ok := h.procs[pk]; ok {
delete(h.procs, pk)
h.processAsyncEvents <- pae
if ps, ok := h.procs[pk]; ok {
ps.exitStatus = makeExitStatus(pae.Status)
h.handleWaitProcess(pk, ps)
}
}

Expand All @@ -474,8 +481,8 @@ func (h *jsonBasedHyperstart) sendProcessAsyncEvent4242(stdioSeq uint64, code ui
defer h.Unlock()
for pk, ps := range h.procs {
if ps.stdioSeq == stdioSeq {
delete(h.procs, pk)
h.processAsyncEvents <- hyperstartapi.ProcessAsyncEvent{Container: pk.c, Process: pk.p, Event: "finished", Status: int(code)}
ps.exitStatus = makeExitStatus(int(code))
h.handleWaitProcess(pk, ps)
}
}
}
Expand All @@ -487,8 +494,17 @@ func (h *jsonBasedHyperstart) removeStreamOut(seq uint64) {
// doesn't send eof of the stderr back, so we also remove stderr seq here
if out, ok := h.streamOuts[seq]; ok {
delete(h.streamOuts, seq)
if seq == out.ps.stdioSeq && out.ps.stderrSeq > 0 {
delete(h.streamOuts, out.ps.stderrSeq)
if seq == out.ps.stdioSeq {
if out.ps.stderrSeq > 0 {
h.streamOuts[out.ps.stderrSeq].Close()
delete(h.streamOuts, out.ps.stderrSeq)
}
for pk, ps := range h.procs {
if ps.stdioSeq == seq {
ps.outClosed = true
h.handleWaitProcess(pk, ps)
}
}
}
}
}
Expand Down Expand Up @@ -752,6 +768,26 @@ func (h *jsonBasedHyperstart) SignalProcess(container, process string, signal sy
})
}

// wait the process until exit. like waitpid()
// the state is saved until someone calls WaitProcess() if the process exited earlier
// the non-first call of WaitProcess() after process started MAY fail to find the process if the process exited earlier
func (h *jsonBasedHyperstart) WaitProcess(container, process string) int {
h.Lock()
pk := pKey{c: container, p: process}
if ps, ok := h.procs[pk]; ok {
if ps.waitChan == nil {
ps.waitChan = make(chan int, 1)
h.handleWaitProcess(pk, ps)
}
h.Unlock()
status := <-ps.waitChan
ps.waitChan <- status
return status
}
h.Unlock()
return -1
}

func (h *jsonBasedHyperstart) StartSandbox(pod *hyperstartapi.Pod) error {
return h.hyperstartCommand(hyperstartapi.INIT_STARTPOD, pod)
}
Expand Down
25 changes: 0 additions & 25 deletions hypervisor/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,31 +236,6 @@ func (ctx *VmContext) LookupBySession(session uint64) string {
return ""
}

func (ctx *VmContext) handleProcessAsyncEvent(pae *hyperstartapi.ProcessAsyncEvent) {
if pae.Event != "finished" {
return
}
if pae.Process == "init" {
ctx.reportProcessFinished(types.E_CONTAINER_FINISHED, &types.ProcessFinished{
Id: pae.Container, Code: uint8(pae.Status), Ack: make(chan bool, 1),
})
ctx.lock.Lock()
if c, ok := ctx.containers[pae.Container]; ok {
c.Log(TRACE, "container finished, unset iostream pipes")
c.stdinPipe = nil
c.stdoutPipe = nil
c.stderrPipe = nil
c.tty = nil
}
ctx.lock.Unlock()
} else {
ctx.DeleteExec(pae.Process)
ctx.reportProcessFinished(types.E_EXEC_FINISHED, &types.ProcessFinished{
Id: pae.Process, Code: uint8(pae.Status), Ack: make(chan bool, 1),
})
}
}

func (ctx *VmContext) Close() {
ctx.closeOnce.Do(func() {
ctx.Log(INFO, "VmContext Close()")
Expand Down
14 changes: 0 additions & 14 deletions hypervisor/hypervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,6 @@ func (ctx *VmContext) loop() {
ctx.Log(DEBUG, "main event loop exiting")
}

func (ctx *VmContext) handlePAEs() {
ch, err := ctx.hyperstart.ProcessAsyncEvents()
if err == nil {
for e := range ch {
ctx.handleProcessAsyncEvent(&e)
}
}
ctx.hyperstart.Close()
ctx.Log(ERROR, "hyperstart stopped")
ctx.Hub <- &Interrupted{Reason: "hyperstart stopped"}
}

func (ctx *VmContext) watchHyperstart(sendReadyEvent bool) {
timeout := time.AfterFunc(30*time.Second, func() {
if ctx.PauseState == PauseStateUnpaused {
Expand Down Expand Up @@ -92,7 +80,6 @@ func (ctx *VmContext) Launch() {
}

go ctx.loop()
go ctx.handlePAEs()
}

func VmAssociate(vmId string, hub chan VmEvent, client chan *types.VmResponse, pack []byte) (*VmContext, error) {
Expand Down Expand Up @@ -130,7 +117,6 @@ func VmAssociate(vmId string, hub chan VmEvent, client chan *types.VmResponse, p
//}

go context.watchHyperstart(false)
go context.handlePAEs()
go context.loop()
return context, nil
}
Expand Down
7 changes: 7 additions & 0 deletions hypervisor/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,13 @@ func (vm *Vm) AddProcess(container, execId string, terminal bool, args []string,
}

go streamCopy(tty, stdinPipe, stdoutPipe, stderrPipe)
go func() {
status := vm.ctx.hyperstart.WaitProcess(container, execId)
vm.ctx.DeleteExec(execId)
vm.ctx.reportProcessFinished(types.E_EXEC_FINISHED, &types.ProcessFinished{
Id: execId, Code: uint8(status), Ack: make(chan bool, 1),
})
}()
return nil
}

Expand Down
31 changes: 31 additions & 0 deletions hypervisor/vm_states.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/hyperhq/hypercontainer-utils/hlog"
"github.com/hyperhq/runv/hypervisor/types"
)

// states
Expand Down Expand Up @@ -44,6 +45,21 @@ func (ctx *VmContext) newContainer(id string) error {
go streamCopy(c.tty, c.stdinPipe, c.stdoutPipe, c.stderrPipe)
}
ctx.Log(TRACE, "sent INIT_NEWCONTAINER")
go func() {
status := ctx.hyperstart.WaitProcess(id, "init")
ctx.reportProcessFinished(types.E_CONTAINER_FINISHED, &types.ProcessFinished{
Id: id, Code: uint8(status), Ack: make(chan bool, 1),
})
ctx.lock.Lock()
if c, ok := ctx.containers[id]; ok {
c.Log(TRACE, "container finished, unset iostream pipes")
c.stdinPipe = nil
c.stdoutPipe = nil
c.stderrPipe = nil
c.tty = nil
}
ctx.lock.Unlock()
}()
return err
} else {
return fmt.Errorf("container %s not exist", id)
Expand Down Expand Up @@ -72,6 +88,21 @@ func (ctx *VmContext) restoreContainer(id string) (alive bool, err error) {
}
return false, nil
}
go func() {
status := ctx.hyperstart.WaitProcess(id, "init")
ctx.reportProcessFinished(types.E_CONTAINER_FINISHED, &types.ProcessFinished{
Id: id, Code: uint8(status), Ack: make(chan bool, 1),
})
ctx.lock.Lock()
if c, ok := ctx.containers[id]; ok {
c.Log(TRACE, "container finished, unset iostream pipes")
c.stdinPipe = nil
c.stdoutPipe = nil
c.stderrPipe = nil
c.tty = nil
}
ctx.lock.Unlock()
}()
return true, nil
}

Expand Down