Skip to content
This repository was archived by the owner on Feb 8, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 45 additions & 7 deletions hypervisor/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type VmContext struct {

logPrefix string

lock sync.Mutex //protect update of context
lock sync.RWMutex //protect update of context
idLock sync.Mutex
pauseLock sync.Mutex
closeOnce sync.Once
Expand Down Expand Up @@ -157,8 +157,8 @@ func InitContext(id string, hub chan VmEvent, client chan *types.VmResponse, dc
// no handler associated with the context. VmEvent handling happens in a
// separate goroutine, so this is thread-safe and asynchronous.
func (ctx *VmContext) SendVmEvent(ev VmEvent) error {
ctx.lock.Lock()
defer ctx.lock.Unlock()
ctx.lock.RLock()
defer ctx.lock.RUnlock()

if ctx.handler == nil {
return fmt.Errorf("VmContext(%s): event handler already shutdown.", ctx.Id)
Expand Down Expand Up @@ -202,8 +202,8 @@ func (ctx *VmContext) NextPciAddr() int {
}

func (ctx *VmContext) LookupExecBySession(session uint64) string {
ctx.lock.Lock()
defer ctx.lock.Unlock()
ctx.lock.RLock()
defer ctx.lock.RUnlock()

for id, exec := range ctx.vmExec {
if exec.Process.Stdio == session {
Expand All @@ -223,8 +223,8 @@ func (ctx *VmContext) DeleteExec(id string) {
}

func (ctx *VmContext) LookupBySession(session uint64) string {
ctx.lock.Lock()
defer ctx.lock.Unlock()
ctx.lock.RLock()
defer ctx.lock.RUnlock()

for id, c := range ctx.containers {
if c.process.Stdio == session {
Expand Down Expand Up @@ -281,6 +281,14 @@ func (ctx *VmContext) Become(handler stateHandler, desc string) {
ctx.Log(DEBUG, "state change from %s to '%s'", orig, desc)
}

func (ctx *VmContext) IsRunning() bool {
var running bool
ctx.lock.RLock()
running = ctx.current == StateRunning
ctx.lock.RUnlock()
return running
}

// User API
func (ctx *VmContext) SetNetworkEnvironment(net *api.SandboxConfig) {
ctx.lock.Lock()
Expand All @@ -298,13 +306,23 @@ func (ctx *VmContext) AddInterface(inf *api.InterfaceDescription, result chan ap
ctx.lock.Lock()
defer ctx.lock.Unlock()

if ctx.current != StateRunning {
ctx.Log(DEBUG, "add interface %s during %v", inf.Id, ctx.current)
result <- NewNotReadyError(ctx.Id)
}

ctx.networks.addInterface(inf, result)
}

func (ctx *VmContext) RemoveInterface(id string, result chan api.Result) {
ctx.lock.Lock()
defer ctx.lock.Unlock()

if ctx.current != StateRunning {
ctx.Log(DEBUG, "remove interface %s during %v", id, ctx.current)
result <- api.NewResultBase(id, true, "pod not running")
}

ctx.networks.removeInterface(id, result)
}

Expand All @@ -328,6 +346,11 @@ func (ctx *VmContext) AddContainer(c *api.ContainerDescription, result chan api.
ctx.lock.Lock()
defer ctx.lock.Unlock()

if ctx.current != StateRunning {
ctx.Log(DEBUG, "add container %s during %v", c.Id, ctx.current)
result <- NewNotReadyError(ctx.Id)
}

if ctx.LogLevel(TRACE) {
ctx.Log(TRACE, "add container %#v", c)
}
Expand Down Expand Up @@ -393,6 +416,11 @@ func (ctx *VmContext) RemoveContainer(id string, result chan<- api.Result) {
ctx.lock.Lock()
defer ctx.lock.Unlock()

if ctx.current != StateRunning {
ctx.Log(DEBUG, "remove container %s during %v", id, ctx.current)
result <- api.NewResultBase(id, true, "pod not running")
}

cc, ok := ctx.containers[id]
if !ok {
ctx.Log(WARNING, "container %s not exist", id)
Expand All @@ -417,6 +445,11 @@ func (ctx *VmContext) AddVolume(vol *api.VolumeDescription, result chan api.Resu
ctx.lock.Lock()
defer ctx.lock.Unlock()

if ctx.current != StateRunning {
ctx.Log(DEBUG, "add volume %s during %v", vol.Name, ctx.current)
result <- NewNotReadyError(ctx.Id)
}

if _, ok := ctx.volumes[vol.Name]; ok {
estr := fmt.Sprintf("duplicate volume %s", vol.Name)
ctx.Log(WARNING, estr)
Expand All @@ -441,6 +474,11 @@ func (ctx *VmContext) RemoveVolume(name string, result chan<- api.Result) {
ctx.lock.Lock()
defer ctx.lock.Unlock()

if ctx.current != StateRunning {
ctx.Log(DEBUG, "remove container %s during %v", name, ctx.current)
result <- api.NewResultBase(name, true, "pod not running")
}

disk, ok := ctx.volumes[name]
if !ok {
ctx.Log(WARNING, "volume %s not exist", name)
Expand Down
100 changes: 15 additions & 85 deletions hypervisor/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (vm *Vm) WaitResponse(match matchResponse, timeout int) chan error {
}

func (vm *Vm) ReleaseVm() error {
if vm.ctx.current != StateRunning {
if !vm.ctx.IsRunning() {
return nil
}

Expand Down Expand Up @@ -226,58 +226,6 @@ func (vm *Vm) WaitProcess(isContainer bool, ids []string, timeout int) <-chan *a
return result
}

//func (vm *Vm) handlePodEvent(mypod *PodStatus) {
// glog.V(1).Infof("hyperHandlePodEvent pod %s, vm %s", mypod.Id, vm.Id)
//
// Status, err := vm.GetResponseChan()
// if err != nil {
// return
// }
// defer vm.ReleaseResponseChan(Status)
//
// exit := false
// mypod.Wg.Add(1)
// for {
// Response, ok := <-Status
// if !ok {
// break
// }
//
// switch Response.Code {
// case types.E_CONTAINER_FINISHED:
// ps, ok := Response.Data.(*types.ProcessFinished)
// if ok {
// mypod.SetOneContainerStatus(ps.Id, ps.Code)
// close(ps.Ack)
// }
// case types.E_EXEC_FINISHED:
// ps, ok := Response.Data.(*types.ProcessFinished)
// if ok {
// mypod.SetExecStatus(ps.Id, ps.Code)
// close(ps.Ack)
// }
// case types.E_VM_SHUTDOWN: // vm exited, sucessful or not
// if mypod.Status == types.S_POD_RUNNING { // not received finished pod before
// mypod.Status = types.S_POD_FAILED
// mypod.FinishedAt = time.Now().Format("2006-01-02T15:04:05Z")
// mypod.SetContainerStatus(types.S_POD_FAILED)
// }
// mypod.Vm = ""
// exit = true
// }
//
// if mypod.Handler != nil {
// mypod.Handler.Handle(Response, mypod.Handler.Data, mypod, vm)
// }
//
// if exit {
// vm.clients = nil
// break
// }
// }
// mypod.Wg.Done()
//}

func (vm *Vm) InitSandbox(config *api.SandboxConfig) {
vm.ctx.SetNetworkEnvironment(config)
vm.ctx.startPod()
Expand All @@ -299,7 +247,7 @@ func (vm *Vm) WaitInit() api.Result {
}

func (vm *Vm) Shutdown() api.Result {
if vm.ctx.current != StateRunning {
if !vm.ctx.IsRunning() {
return api.NewResultBase(vm.Id, false, "not in running state")
}

Expand Down Expand Up @@ -352,10 +300,6 @@ func (vm *Vm) AddRoute() error {
}

func (vm *Vm) AddNic(info *api.InterfaceDescription) error {
if vm.ctx.current != StateRunning {
return NewNotReadyError(vm.Id)
}

client := make(chan api.Result, 1)
vm.ctx.AddInterface(info, client)

Expand All @@ -375,10 +319,6 @@ func (vm *Vm) AddNic(info *api.InterfaceDescription) error {
}

func (vm *Vm) DeleteNic(id string) error {
if vm.ctx.current != StateRunning {
return NewNotReadyError(vm.Id)
}

client := make(chan api.Result, 1)
vm.ctx.RemoveInterface(id, client)

Expand All @@ -403,7 +343,7 @@ func (vm *Vm) SetCpus(cpus int) error {
return nil
}

if vm.ctx.current != StateRunning {
if !vm.ctx.IsRunning() {
return NewNotReadyError(vm.Id)
}

Expand All @@ -420,7 +360,7 @@ func (vm *Vm) AddMem(totalMem int) error {
}

size := totalMem - vm.Mem
if vm.ctx.current != StateRunning {
if !vm.ctx.IsRunning() {
return NewNotReadyError(vm.Id)
}

Expand Down Expand Up @@ -525,6 +465,10 @@ func (vm *Vm) Exec(container, execId, cmd string, terminal bool, tty *TtyIO) err
}

func (vm *Vm) AddProcess(container, execId string, terminal bool, args []string, env []string, workdir string, tty *TtyIO) error {
if !vm.ctx.IsRunning() {
return NewNotReadyError(vm.Id)
}

envs := []hyperstartapi.EnvironmentVar{}

for _, v := range env {
Expand Down Expand Up @@ -553,21 +497,12 @@ func (vm *Vm) AddProcess(container, execId string, terminal bool, args []string,
}

func (vm *Vm) AddVolume(vol *api.VolumeDescription) api.Result {
if vm.ctx.current != StateRunning {
vm.Log(ERROR, "VM is not ready for insert volume %#v", vol)
return NewNotReadyError(vm.Id)
}

result := make(chan api.Result, 1)
vm.ctx.AddVolume(vol, result)
return <-result
}

func (vm *Vm) AddContainer(c *api.ContainerDescription) api.Result {
if vm.ctx.current != StateRunning {
return NewNotReadyError(vm.Id)
}

result := make(chan api.Result, 1)
vm.ctx.AddContainer(c, result)
return <-result
Expand Down Expand Up @@ -631,10 +566,6 @@ func (vm *Vm) batchWaitResult(names []string, op waitResultOp) (bool, map[string
}

func (vm *Vm) StartContainer(id string) error {
if vm.ctx.current != StateRunning {
return NewNotReadyError(vm.Id)
}

err := vm.ctx.newContainer(id)
if err != nil {
return fmt.Errorf("Create new container failed: %v", err)
Expand All @@ -652,10 +583,6 @@ func (vm *Vm) Tty(containerId, execId string, row, column int) error {
}

func (vm *Vm) Attach(tty *TtyIO, container string) error {
if vm.ctx.current != StateRunning {
return NewNotReadyError(vm.Id)
}

cmd := &AttachCommand{
Streams: tty,
Container: container,
Expand All @@ -666,7 +593,7 @@ func (vm *Vm) Attach(tty *TtyIO, container string) error {
func (vm *Vm) Stats() *types.PodStats {
ctx := vm.ctx

if ctx.current != StateRunning {
if !vm.ctx.IsRunning() {
vm.ctx.Log(WARNING, "could not get stats from non-running pod")
return nil
}
Expand All @@ -681,7 +608,7 @@ func (vm *Vm) Stats() *types.PodStats {

func (vm *Vm) Pause(pause bool) error {
ctx := vm.ctx
if ctx.current != StateRunning {
if !vm.ctx.IsRunning() {
return NewNotReadyError(vm.Id)
}

Expand Down Expand Up @@ -713,10 +640,13 @@ func (vm *Vm) Pause(pause bool) error {

func (vm *Vm) Save(path string) error {
ctx := vm.ctx
if !vm.ctx.IsRunning() {
return NewNotReadyError(vm.Id)
}

ctx.pauseLock.Lock()
defer ctx.pauseLock.Unlock()
if ctx.current != StateRunning || ctx.PauseState != PauseStatePaused {
if ctx.PauseState != PauseStatePaused {
return NewNotReadyError(vm.Id)
}

Expand All @@ -726,7 +656,7 @@ func (vm *Vm) Save(path string) error {
func (vm *Vm) GetIPAddrs() []string {
ips := []string{}

if vm.ctx.current != StateRunning {
if !vm.ctx.IsRunning() {
vm.Log(ERROR, "get pod ip failed: %v", NewNotReadyError(vm.Id))
return ips
}
Expand Down
16 changes: 16 additions & 0 deletions hypervisor/vm_states.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ func (ctx *VmContext) newContainer(id string) error {
ctx.lock.Lock()
defer ctx.lock.Unlock()

if ctx.current != StateRunning {
ctx.Log(DEBUG, "start container %s during %v", id, ctx.current)
return NewNotReadyError(ctx.Id)
}

c, ok := ctx.containers[id]
if ok {
ctx.Log(TRACE, "start sending INIT_NEWCONTAINER")
Expand All @@ -48,6 +53,12 @@ func (ctx *VmContext) newContainer(id string) error {
func (ctx *VmContext) restoreContainer(id string) (alive bool, err error) {
ctx.lock.Lock()
defer ctx.lock.Unlock()

if ctx.current != StateRunning {
ctx.Log(DEBUG, "start container %s during %v", id, ctx.current)
return false, NewNotReadyError(ctx.Id)
}

c, ok := ctx.containers[id]
if !ok {
return false, fmt.Errorf("try to associate a container not exist in sandbox")
Expand Down Expand Up @@ -77,6 +88,11 @@ func (ctx *VmContext) attachCmd(cmd *AttachCommand) error {
ctx.lock.Lock()
defer ctx.lock.Unlock()

if ctx.current != StateRunning {
ctx.Log(DEBUG, "attach container %s during %v", cmd.Container, ctx.current)
return NewNotReadyError(ctx.Id)
}

c, ok := ctx.containers[cmd.Container]
if !ok {
estr := fmt.Sprintf("cannot find container %s to attach", cmd.Container)
Expand Down