From 7f03f99960ac31245f447ae2928a2e770b4982da Mon Sep 17 00:00:00 2001 From: Wang Xu Date: Sun, 9 Apr 2017 00:51:52 +0800 Subject: [PATCH] Ensure the vm is running when vm operation called part of the fix of hyperhq/hyperd#598 (pod creation rollback), changes include: - change context lock from Mutex to RWMutex - move ctx.current access inside lock - ensure vm operations has the running status check Signed-off-by: Wang Xu --- hypervisor/context.go | 52 ++++++++++++++++++--- hypervisor/vm.go | 100 ++++++---------------------------------- hypervisor/vm_states.go | 16 +++++++ 3 files changed, 76 insertions(+), 92 deletions(-) diff --git a/hypervisor/context.go b/hypervisor/context.go index 017c1d16..3e71262d 100644 --- a/hypervisor/context.go +++ b/hypervisor/context.go @@ -70,7 +70,7 @@ type VmContext struct { logPrefix string - lock sync.Mutex //protect update of context + lock sync.RWMutex //protect update of context idLock sync.Mutex pauseLock sync.Mutex closeOnce sync.Once @@ -157,8 +157,8 @@ func InitContext(id string, hub chan VmEvent, client chan *types.VmResponse, dc // no handler associated with the context. VmEvent handling happens in a // separate goroutine, so this is thread-safe and asynchronous. func (ctx *VmContext) SendVmEvent(ev VmEvent) error { - ctx.lock.Lock() - defer ctx.lock.Unlock() + ctx.lock.RLock() + defer ctx.lock.RUnlock() if ctx.handler == nil { return fmt.Errorf("VmContext(%s): event handler already shutdown.", ctx.Id) @@ -202,8 +202,8 @@ func (ctx *VmContext) NextPciAddr() int { } func (ctx *VmContext) LookupExecBySession(session uint64) string { - ctx.lock.Lock() - defer ctx.lock.Unlock() + ctx.lock.RLock() + defer ctx.lock.RUnlock() for id, exec := range ctx.vmExec { if exec.Process.Stdio == session { @@ -223,8 +223,8 @@ func (ctx *VmContext) DeleteExec(id string) { } func (ctx *VmContext) LookupBySession(session uint64) string { - ctx.lock.Lock() - defer ctx.lock.Unlock() + ctx.lock.RLock() + defer ctx.lock.RUnlock() for id, c := range ctx.containers { if c.process.Stdio == session { @@ -281,6 +281,14 @@ func (ctx *VmContext) Become(handler stateHandler, desc string) { ctx.Log(DEBUG, "state change from %s to '%s'", orig, desc) } +func (ctx *VmContext) IsRunning() bool { + var running bool + ctx.lock.RLock() + running = ctx.current == StateRunning + ctx.lock.RUnlock() + return running +} + // User API func (ctx *VmContext) SetNetworkEnvironment(net *api.SandboxConfig) { ctx.lock.Lock() @@ -298,6 +306,11 @@ func (ctx *VmContext) AddInterface(inf *api.InterfaceDescription, result chan ap ctx.lock.Lock() defer ctx.lock.Unlock() + if ctx.current != StateRunning { + ctx.Log(DEBUG, "add interface %s during %v", inf.Id, ctx.current) + result <- NewNotReadyError(ctx.Id) + } + ctx.networks.addInterface(inf, result) } @@ -305,6 +318,11 @@ func (ctx *VmContext) RemoveInterface(id string, result chan api.Result) { ctx.lock.Lock() defer ctx.lock.Unlock() + if ctx.current != StateRunning { + ctx.Log(DEBUG, "remove interface %s during %v", id, ctx.current) + result <- api.NewResultBase(id, true, "pod not running") + } + ctx.networks.removeInterface(id, result) } @@ -328,6 +346,11 @@ func (ctx *VmContext) AddContainer(c *api.ContainerDescription, result chan api. ctx.lock.Lock() defer ctx.lock.Unlock() + if ctx.current != StateRunning { + ctx.Log(DEBUG, "add container %s during %v", c.Id, ctx.current) + result <- NewNotReadyError(ctx.Id) + } + if ctx.LogLevel(TRACE) { ctx.Log(TRACE, "add container %#v", c) } @@ -393,6 +416,11 @@ func (ctx *VmContext) RemoveContainer(id string, result chan<- api.Result) { ctx.lock.Lock() defer ctx.lock.Unlock() + if ctx.current != StateRunning { + ctx.Log(DEBUG, "remove container %s during %v", id, ctx.current) + result <- api.NewResultBase(id, true, "pod not running") + } + cc, ok := ctx.containers[id] if !ok { ctx.Log(WARNING, "container %s not exist", id) @@ -417,6 +445,11 @@ func (ctx *VmContext) AddVolume(vol *api.VolumeDescription, result chan api.Resu ctx.lock.Lock() defer ctx.lock.Unlock() + if ctx.current != StateRunning { + ctx.Log(DEBUG, "add volume %s during %v", vol.Name, ctx.current) + result <- NewNotReadyError(ctx.Id) + } + if _, ok := ctx.volumes[vol.Name]; ok { estr := fmt.Sprintf("duplicate volume %s", vol.Name) ctx.Log(WARNING, estr) @@ -441,6 +474,11 @@ func (ctx *VmContext) RemoveVolume(name string, result chan<- api.Result) { ctx.lock.Lock() defer ctx.lock.Unlock() + if ctx.current != StateRunning { + ctx.Log(DEBUG, "remove container %s during %v", name, ctx.current) + result <- api.NewResultBase(name, true, "pod not running") + } + disk, ok := ctx.volumes[name] if !ok { ctx.Log(WARNING, "volume %s not exist", name) diff --git a/hypervisor/vm.go b/hypervisor/vm.go index 0740f1d6..67ebd326 100644 --- a/hypervisor/vm.go +++ b/hypervisor/vm.go @@ -143,7 +143,7 @@ func (vm *Vm) WaitResponse(match matchResponse, timeout int) chan error { } func (vm *Vm) ReleaseVm() error { - if vm.ctx.current != StateRunning { + if !vm.ctx.IsRunning() { return nil } @@ -226,58 +226,6 @@ func (vm *Vm) WaitProcess(isContainer bool, ids []string, timeout int) <-chan *a return result } -//func (vm *Vm) handlePodEvent(mypod *PodStatus) { -// glog.V(1).Infof("hyperHandlePodEvent pod %s, vm %s", mypod.Id, vm.Id) -// -// Status, err := vm.GetResponseChan() -// if err != nil { -// return -// } -// defer vm.ReleaseResponseChan(Status) -// -// exit := false -// mypod.Wg.Add(1) -// for { -// Response, ok := <-Status -// if !ok { -// break -// } -// -// switch Response.Code { -// case types.E_CONTAINER_FINISHED: -// ps, ok := Response.Data.(*types.ProcessFinished) -// if ok { -// mypod.SetOneContainerStatus(ps.Id, ps.Code) -// close(ps.Ack) -// } -// case types.E_EXEC_FINISHED: -// ps, ok := Response.Data.(*types.ProcessFinished) -// if ok { -// mypod.SetExecStatus(ps.Id, ps.Code) -// close(ps.Ack) -// } -// case types.E_VM_SHUTDOWN: // vm exited, sucessful or not -// if mypod.Status == types.S_POD_RUNNING { // not received finished pod before -// mypod.Status = types.S_POD_FAILED -// mypod.FinishedAt = time.Now().Format("2006-01-02T15:04:05Z") -// mypod.SetContainerStatus(types.S_POD_FAILED) -// } -// mypod.Vm = "" -// exit = true -// } -// -// if mypod.Handler != nil { -// mypod.Handler.Handle(Response, mypod.Handler.Data, mypod, vm) -// } -// -// if exit { -// vm.clients = nil -// break -// } -// } -// mypod.Wg.Done() -//} - func (vm *Vm) InitSandbox(config *api.SandboxConfig) { vm.ctx.SetNetworkEnvironment(config) vm.ctx.startPod() @@ -299,7 +247,7 @@ func (vm *Vm) WaitInit() api.Result { } func (vm *Vm) Shutdown() api.Result { - if vm.ctx.current != StateRunning { + if !vm.ctx.IsRunning() { return api.NewResultBase(vm.Id, false, "not in running state") } @@ -352,10 +300,6 @@ func (vm *Vm) AddRoute() error { } func (vm *Vm) AddNic(info *api.InterfaceDescription) error { - if vm.ctx.current != StateRunning { - return NewNotReadyError(vm.Id) - } - client := make(chan api.Result, 1) vm.ctx.AddInterface(info, client) @@ -375,10 +319,6 @@ func (vm *Vm) AddNic(info *api.InterfaceDescription) error { } func (vm *Vm) DeleteNic(id string) error { - if vm.ctx.current != StateRunning { - return NewNotReadyError(vm.Id) - } - client := make(chan api.Result, 1) vm.ctx.RemoveInterface(id, client) @@ -403,7 +343,7 @@ func (vm *Vm) SetCpus(cpus int) error { return nil } - if vm.ctx.current != StateRunning { + if !vm.ctx.IsRunning() { return NewNotReadyError(vm.Id) } @@ -420,7 +360,7 @@ func (vm *Vm) AddMem(totalMem int) error { } size := totalMem - vm.Mem - if vm.ctx.current != StateRunning { + if !vm.ctx.IsRunning() { return NewNotReadyError(vm.Id) } @@ -525,6 +465,10 @@ func (vm *Vm) Exec(container, execId, cmd string, terminal bool, tty *TtyIO) err } func (vm *Vm) AddProcess(container, execId string, terminal bool, args []string, env []string, workdir string, tty *TtyIO) error { + if !vm.ctx.IsRunning() { + return NewNotReadyError(vm.Id) + } + envs := []hyperstartapi.EnvironmentVar{} for _, v := range env { @@ -553,21 +497,12 @@ func (vm *Vm) AddProcess(container, execId string, terminal bool, args []string, } func (vm *Vm) AddVolume(vol *api.VolumeDescription) api.Result { - if vm.ctx.current != StateRunning { - vm.Log(ERROR, "VM is not ready for insert volume %#v", vol) - return NewNotReadyError(vm.Id) - } - result := make(chan api.Result, 1) vm.ctx.AddVolume(vol, result) return <-result } func (vm *Vm) AddContainer(c *api.ContainerDescription) api.Result { - if vm.ctx.current != StateRunning { - return NewNotReadyError(vm.Id) - } - result := make(chan api.Result, 1) vm.ctx.AddContainer(c, result) return <-result @@ -631,10 +566,6 @@ func (vm *Vm) batchWaitResult(names []string, op waitResultOp) (bool, map[string } func (vm *Vm) StartContainer(id string) error { - if vm.ctx.current != StateRunning { - return NewNotReadyError(vm.Id) - } - err := vm.ctx.newContainer(id) if err != nil { return fmt.Errorf("Create new container failed: %v", err) @@ -652,10 +583,6 @@ func (vm *Vm) Tty(containerId, execId string, row, column int) error { } func (vm *Vm) Attach(tty *TtyIO, container string) error { - if vm.ctx.current != StateRunning { - return NewNotReadyError(vm.Id) - } - cmd := &AttachCommand{ Streams: tty, Container: container, @@ -666,7 +593,7 @@ func (vm *Vm) Attach(tty *TtyIO, container string) error { func (vm *Vm) Stats() *types.PodStats { ctx := vm.ctx - if ctx.current != StateRunning { + if !vm.ctx.IsRunning() { vm.ctx.Log(WARNING, "could not get stats from non-running pod") return nil } @@ -681,7 +608,7 @@ func (vm *Vm) Stats() *types.PodStats { func (vm *Vm) Pause(pause bool) error { ctx := vm.ctx - if ctx.current != StateRunning { + if !vm.ctx.IsRunning() { return NewNotReadyError(vm.Id) } @@ -713,10 +640,13 @@ func (vm *Vm) Pause(pause bool) error { func (vm *Vm) Save(path string) error { ctx := vm.ctx + if !vm.ctx.IsRunning() { + return NewNotReadyError(vm.Id) + } ctx.pauseLock.Lock() defer ctx.pauseLock.Unlock() - if ctx.current != StateRunning || ctx.PauseState != PauseStatePaused { + if ctx.PauseState != PauseStatePaused { return NewNotReadyError(vm.Id) } @@ -726,7 +656,7 @@ func (vm *Vm) Save(path string) error { func (vm *Vm) GetIPAddrs() []string { ips := []string{} - if vm.ctx.current != StateRunning { + if !vm.ctx.IsRunning() { vm.Log(ERROR, "get pod ip failed: %v", NewNotReadyError(vm.Id)) return ips } diff --git a/hypervisor/vm_states.go b/hypervisor/vm_states.go index 13a32d92..908441ad 100644 --- a/hypervisor/vm_states.go +++ b/hypervisor/vm_states.go @@ -30,6 +30,11 @@ func (ctx *VmContext) newContainer(id string) error { ctx.lock.Lock() defer ctx.lock.Unlock() + if ctx.current != StateRunning { + ctx.Log(DEBUG, "start container %s during %v", id, ctx.current) + return NewNotReadyError(ctx.Id) + } + c, ok := ctx.containers[id] if ok { ctx.Log(TRACE, "start sending INIT_NEWCONTAINER") @@ -48,6 +53,12 @@ func (ctx *VmContext) newContainer(id string) error { func (ctx *VmContext) restoreContainer(id string) (alive bool, err error) { ctx.lock.Lock() defer ctx.lock.Unlock() + + if ctx.current != StateRunning { + ctx.Log(DEBUG, "start container %s during %v", id, ctx.current) + return false, NewNotReadyError(ctx.Id) + } + c, ok := ctx.containers[id] if !ok { return false, fmt.Errorf("try to associate a container not exist in sandbox") @@ -77,6 +88,11 @@ func (ctx *VmContext) attachCmd(cmd *AttachCommand) error { ctx.lock.Lock() defer ctx.lock.Unlock() + if ctx.current != StateRunning { + ctx.Log(DEBUG, "attach container %s during %v", cmd.Container, ctx.current) + return NewNotReadyError(ctx.Id) + } + c, ok := ctx.containers[cmd.Container] if !ok { estr := fmt.Sprintf("cannot find container %s to attach", cmd.Container)