Skip to content

Commit

Permalink
falk: implement wait_for_shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
TheJJ committed Jul 22, 2018
1 parent 0a52b43 commit ec0ca21
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 15 deletions.
4 changes: 4 additions & 0 deletions etc/falk.conf.example
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,7 @@ command = qemu-system-x86_64 -machine type=q35,accel=kvm -cpu host -smp 2 -m 2G
# then use a vncclient to connect to port 5901
# (maybe forward the port to your machine with `ssh -L 12345:localhost:5901 youruser@falkhost`,
# then connect with a vncviewer: e.g. `vinagre localhost:12345`)


#[some-other-vm-id]
#... definitions for another vm, just like the ones below [some-random-vm-id]
11 changes: 6 additions & 5 deletions falk/manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,19 @@ async def spawn_shell(falk, vm_id, volatile, command):
logging.debug("VM launched, waiting for ssh...")
await vm.wait_for_ssh_port()

if manage:
logging.warning("please shut down the VM gracefully "
"to avoid data loss (=> `sudo poweroff`)")

# ssh into the machine, force tty allocation
async with SSHProcess(command,
vm.ssh_user, vm.ssh_host,
vm.ssh_port, vm.ssh_known_host_key, pipes=False,
options=["-t"]) as proc:
ret = await proc.wait()

# dirty hardcode to wait for vm shutdown
# otherwise, disks may not be synced.
# let falk tell us when the vm dies.
# TODO: await vm.wait_for_shutdown(30)
await asyncio.sleep(5)
# wait 30 maximum for the machine to exit gracefully
await vm.wait_for_shutdown(30)

await vm.terminate()
await vm.cleanup()
Expand Down
9 changes: 9 additions & 0 deletions falk/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,15 @@ def __init__(self, run_id, running, ssh_user, ssh_host, ssh_port, ssh_known_host
self.ssh_known_host_key = ssh_known_host_key


class ShutdownWait(RequestID):
"""
Wait until the VM exits on its own.
"""
def __init__(self, run_id, timeout):
super().__init__(run_id)
self.timeout = timeout


class Terminate(RequestID):
"""
Kill the VM.
Expand Down
8 changes: 8 additions & 0 deletions falk/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,5 +386,13 @@ async def control_message(self, msg):
elif isinstance(msg, messages.Exit):
self.close()

elif isinstance(msg, messages.ShutdownWait):
self.log(f"waiting {msg.timeout}s for machine shutdown..",
level=logging.DEBUG)
success = await self.get_machine(msg.run_id).\
wait_for_shutdown(msg.timeout)
if not success:
answer = messages.Error("shutdown-wait timed out")

self.log("answer: %s" % answer, level=logging.DEBUG)
return answer, force_mode
7 changes: 7 additions & 0 deletions falk/vm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,13 @@ async def is_running(self):
"""
pass

@abstractmethod
async def wait_for_shutdown(self, timeout=60):
"""
Sleep for a maximum of `timeout` until the container terminates.
"""
pass

@abstractmethod
async def terminate(self):
""" Terminate the container if it doesn't shutdown on its own """
Expand Down
57 changes: 49 additions & 8 deletions falk/vm/qemu.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
qemu virtual machines.
"""

import asyncio
import logging
import os
from pathlib import Path
Expand Down Expand Up @@ -63,8 +64,19 @@ async def prepare(self, manage=False):
"-f", "qcow2",
str(self.running_image),
]
if subprocess.call(command) != 0:
raise RuntimeError("could not create overlay image")

proc = await asyncio.create_subprocess_exec(*command)

try:
ret = await asyncio.wait_for(proc.wait(), timeout=60)
except asyncio.TimeoutError as exc:
raise RuntimeError("timeout when creating "
"overlay image") from exc

if ret != 0:
raise RuntimeError(f"could not create overlay image: "
f"qemu-img returned {ret}")

else:
# TODO: even in management mode, create a cow image,
# but in the end merge it back into a new image and
Expand All @@ -75,7 +87,7 @@ async def prepare(self, manage=False):

# TODO: disallow multiple management connections at once.

logging.warning("VM launching in management mode!")
logging.info("QEMU VM launching in management mode!")
# to manage, use the base image to run
self.running_image = str(self.cfg.base_image)

Expand All @@ -91,24 +103,53 @@ async def launch(self):
part = part.replace("{SSHPORT}", str(self.ssh_port))
command.append(part)

self.process = subprocess.Popen(command, stdin=subprocess.PIPE)
self.process = await asyncio.create_subprocess_exec(
*command,
stdin=subprocess.PIPE,
stdout=None,
stderr=None
)
self.process.stdin.close()

async def is_running(self):
if self.process:
running = self.process.poll() is None
running = self.process.returncode is None
else:
running = False

return running

async def wait_for_shutdown(self, timeout):
if not self.process:
return

try:
await asyncio.wait_for(self.process.wait(), timeout)
return True

except asyncio.TimeoutError:
logging.warning("shutdown wait timed out.")
return False

async def terminate(self):
if self.process:
if not self.process:
return

if self.process.returncode is not None:
return

try:
self.process.terminate()
await asyncio.wait_for(self.process.wait(), timeout=10)

except asyncio.TimeoutError:
self.process.kill()
self.process.wait()
await self.process.wait()

self.process = None

async def cleanup(self):
if not (self.manage or self.running_image is None):
if not self.manage and self.running_image is not None:
try:
os.unlink(str(self.running_image))
except FileNotFoundError:
Expand Down
9 changes: 7 additions & 2 deletions kevin/falkvm.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,13 @@ def connection_made(reader, writer):
self.ssh_host,
self.ssh_port)], timeout)

async def wait_for_shutdown(self, timeout=60):
async def wait_for_shutdown(self, timeout=20):
"""
Request from falk so he tells us when the machine is dead.
"""
raise NotImplementedError()
msg = await self.falk.query(messages.ShutdownWait(run_id=self.run_id,
timeout=timeout))
if not isinstance(msg, messages.OK):
raise VMError(f"Failed to wait for shutdown: {msg.msg}")

return msg

0 comments on commit ec0ca21

Please sign in to comment.