Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify kubevirt_vm crud/wait logic #54404

Merged
merged 1 commit into from
Apr 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
52 changes: 19 additions & 33 deletions lib/ansible/module_utils/kubevirt.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,37 +10,18 @@
from ansible.module_utils.k8s.common import list_dict_str
from ansible.module_utils.k8s.raw import KubernetesRawModule

try:
from openshift import watch
from openshift.helper.exceptions import KubernetesException
except ImportError:
# Handled in k8s common:
pass

import re

MAX_SUPPORTED_API_VERSION = 'v1alpha3'
API_GROUP = 'kubevirt.io'


VM_COMMON_ARG_SPEC = {
'name': {'required': True},
'namespace': {'required': True},
'state': {
'default': 'present',
'choices': ['present', 'absent'],
},
'force': {
'type': 'bool',
'default': False,
},
# Put all args that (can) modify 'spec:' here:
VM_SPEC_DEF_ARG_SPEC = {
'resource_definition': {
'type': 'dict',
'aliases': ['definition', 'inline']
},
'merge_type': {'type': 'list', 'choices': ['json', 'merge', 'strategic-merge']},
'wait': {'type': 'bool', 'default': True},
'wait_timeout': {'type': 'int', 'default': 120},
'memory': {'type': 'str'},
'memory_limit': {'type': 'str'},
'cpu_cores': {'type': 'int'},
Expand All @@ -59,6 +40,23 @@
'cpu_shares': {'type': 'int'},
'cpu_features': {'type': 'list'},
}
# And other common args go here:
VM_COMMON_ARG_SPEC = {
'name': {'required': True},
'namespace': {'required': True},
'state': {
'default': 'present',
'choices': ['present', 'absent'],
},
'force': {
'type': 'bool',
'default': False,
},
'merge_type': {'type': 'list', 'choices': ['json', 'merge', 'strategic-merge']},
'wait': {'type': 'bool', 'default': True},
'wait_timeout': {'type': 'int', 'default': 120},
}
VM_COMMON_ARG_SPEC.update(VM_SPEC_DEF_ARG_SPEC)


def virtdict():
Expand Down Expand Up @@ -144,18 +142,6 @@ def merge_dicts(x, y):
else:
yield (k, y[k])

def _create_stream(self, resource, namespace, wait_timeout):
""" Create a stream of events for the object """
w = None
stream = None
try:
w = watch.Watch()
w._api_client = self.client.client
stream = w.stream(resource.get, serialize=False, namespace=namespace, timeout_seconds=wait_timeout)
except KubernetesException as exc:
self.fail_json(msg='Failed to initialize watch: {0}'.format(exc.message))
return w, stream

def get_resource(self, resource):
try:
existing = resource.get(name=self.name, namespace=self.namespace)
Expand Down
219 changes: 129 additions & 90 deletions lib/ansible/modules/cloud/kubevirt/kubevirt_vm.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
state:
description:
- Set the virtual machine to either I(present), I(absent), I(running) or I(stopped).
- "I(present) - Create or update virtual machine."
- "I(absent) - Removes virtual machine."
- "I(running) - Create or update virtual machine and run it."
- "I(stopped) - Stops the virtual machine."
- "I(present) - Create or update a virtual machine. (And run it if it's ephemeral.)"
- "I(absent) - Remove a virtual machine."
- "I(running) - Create or update a virtual machine and run it."
- "I(stopped) - Stop a virtual machine. (This deletes ephemeral VMs.)"
default: "present"
choices:
- present
Expand Down Expand Up @@ -64,11 +64,11 @@
type: list
template:
description:
- "Template to used to create a virtual machine."
- "Name of Template to be used in creation of a virtual machine."
type: str
template_parameters:
description:
- "Value of parameters to be replaced in template parameters."
- "New values of parameters from Template."
type: dict

extends_documentation_fragment:
Expand Down Expand Up @@ -219,17 +219,12 @@

from ansible.module_utils.k8s.common import AUTH_ARG_SPEC

try:
from openshift.dynamic.client import ResourceInstance
except ImportError:
# Handled in module_utils
pass

from ansible.module_utils.k8s.common import AUTH_ARG_SPEC
from ansible.module_utils.kubevirt import (
virtdict,
KubeVirtRawModule,
VM_COMMON_ARG_SPEC,
VM_SPEC_DEF_ARG_SPEC
)

VM_ARG_SPEC = {
Expand All @@ -246,6 +241,9 @@
'template_parameters': {'type': 'dict'},
}

# Which params (can) modify 'spec:' contents of a VM:
VM_SPEC_PARAMS = list(VM_SPEC_DEF_ARG_SPEC.keys()) + ['datavolumes', 'template', 'template_parameters']


class KubeVirtVM(KubeVirtRawModule):

Expand All @@ -257,84 +255,80 @@ def argspec(self):
argument_spec.update(VM_ARG_SPEC)
return argument_spec

def _manage_state(self, running, resource, existing, wait, wait_timeout):
definition = {'metadata': {'name': self.name, 'namespace': self.namespace}, 'spec': {'running': running}}
self.patch_resource(resource, definition, existing, self.name, self.namespace, merge_type='merge')

if wait:
resource = self.find_supported_resource('VirtualMachineInstance')
w, stream = self._create_stream(resource, self.namespace, wait_timeout)

if wait and stream is not None:
self._read_stream(resource, w, stream, self.name, running)

def _read_stream(self, resource, watcher, stream, name, running):
""" Wait for ready_replicas to equal the requested number of replicas. """
for event in stream:
if event.get('object'):
obj = ResourceInstance(resource, event['object'])
if running:
if obj.metadata.name == name and hasattr(obj, 'status'):
phase = getattr(obj.status, 'phase', None)
if phase:
if phase == 'Running' and running:
watcher.stop()
return
else:
# TODO: wait for stopped state:
watcher.stop()
return

self.fail_json(msg="Error waiting for virtual machine. Try a higher wait_timeout value. %s" % obj.to_dict())

def manage_state(self, state):
wait = self.params.get('wait')
wait_timeout = self.params.get('wait_timeout')
resource_version = self.params.get('resource_version')

resource_vm = self.find_supported_resource('VirtualMachine')
existing = self.get_resource(resource_vm)
if resource_version and resource_version != existing.metadata.resourceVersion:
return False

existing_running = False
resource_vmi = self.find_supported_resource('VirtualMachineInstance')
existing_running_vmi = self.get_resource(resource_vmi)
if existing_running_vmi and hasattr(existing_running_vmi.status, 'phase'):
existing_running = existing_running_vmi.status.phase == 'Running'

if state == 'running':
if existing_running:
return False
else:
self._manage_state(True, resource_vm, existing, wait, wait_timeout)
return True
elif state == 'stopped':
if not existing_running:
return False
@staticmethod
def fix_serialization(obj):
if obj and hasattr(obj, 'to_dict'):
return obj.to_dict()
return obj

def _wait_for_vmi_running(self):
for event in self._kind_resource.watch(namespace=self.namespace, timeout=self.params.get('wait_timeout')):
entity = event['object']
if entity.metadata.name != self.name:
mmazur marked this conversation as resolved.
Show resolved Hide resolved
continue
status = entity.get('status', {})
phase = status.get('phase', None)
if phase == 'Running':
return entity

self.fail("Timeout occurred while waiting for virtual machine to start. Maybe try a higher wait_timeout value?")

def _wait_for_vm_state(self, new_state):
if new_state == 'running':
want_created = want_ready = True
else:
want_created = want_ready = False

for event in self._kind_resource.watch(namespace=self.namespace, timeout=self.params.get('wait_timeout')):
entity = event['object']
if entity.metadata.name != self.name:
continue
status = entity.get('status', {})
created = status.get('created', False)
ready = status.get('ready', False)
if (created, ready) == (want_created, want_ready):
return entity

self.fail("Timeout occurred while waiting for virtual machine to achieve '{0}' state. "
"Maybe try a higher wait_timeout value?".format(new_state))

def manage_vm_state(self, new_state, already_changed):
new_running = True if new_state == 'running' else False
changed = False
k8s_obj = {}

if not already_changed:
mmazur marked this conversation as resolved.
Show resolved Hide resolved
k8s_obj = self.get_resource(self._kind_resource)
if not k8s_obj:
self.fail("VirtualMachine object disappeared during module operation, aborting.")
if k8s_obj.spec.get('running', False) == new_running:
return False, k8s_obj

newdef = dict(metadata=dict(name=self.name, namespace=self.namespace), spec=dict(running=new_running))
k8s_obj, err = self.patch_resource(self._kind_resource, newdef, k8s_obj,
self.name, self.namespace, merge_type='merge')
if err:
self.fail_json(**err)
else:
self._manage_state(False, resource_vm, existing, wait, wait_timeout)
return True
changed = True

def execute_module(self):
# Parse parameters specific for this module:
self.client = self.get_api_client()
definition = virtdict()
ephemeral = self.params.get('ephemeral')
state = self.params.get('state')
if self.params.get('wait'):
k8s_obj = self._wait_for_vm_state(new_state)

if not ephemeral:
definition['spec']['running'] = state == 'running'
return changed, k8s_obj

def construct_definition(self, kind, our_state, ephemeral):
definition = virtdict()
processedtemplate = {}

# Construct the API object definition:
vm_template = self.params.get('template')
processedtemplate = {}
if vm_template:
# Find the template the VM should be created from:
template_resource = self.client.resources.get(api_version='template.openshift.io/v1', kind='Template', name='templates')
proccess_template = template_resource.get(name=vm_template, namespace=self.params.get('namespace'))

# Set proper template values set by Ansible parameter 'parameters':
# Set proper template values taken from module option 'template_parameters':
for k, v in self.params.get('template_parameters', {}).items():
for parameter in proccess_template.parameters:
if parameter.name == k:
Expand All @@ -344,27 +338,72 @@ def execute_module(self):
processedtemplates_res = self.client.resources.get(api_version='template.openshift.io/v1', kind='Template', name='processedtemplates')
processedtemplate = processedtemplates_res.create(proccess_template.to_dict()).to_dict()['objects'][0]

if not ephemeral:
definition['spec']['running'] = our_state == 'running'
template = definition if ephemeral else definition['spec']['template']
kind = 'VirtualMachineInstance' if ephemeral else 'VirtualMachine'
template['metadata']['labels']['vm.cnv.io/name'] = self.params.get('name')
dummy, definition = self.construct_vm_definition(kind, definition, template)
definition = dict(self.merge_dicts(processedtemplate, definition))

# Create the VM:
result = self.execute_crud(kind, definition)
changed = result['changed']
return definition

# Manage state of the VM:
if state in ['running', 'stopped']:
if not self.check_mode:
ret = self.manage_state(state)
changed = changed or ret
def execute_module(self):
# Parse parameters specific to this module:
ephemeral = self.params.get('ephemeral')
k8s_state = our_state = self.params.get('state')
kind = 'VirtualMachineInstance' if ephemeral else 'VirtualMachine'
_used_params = [name for name in self.params if self.params[name] is not None]
# Is 'spec:' getting changed?
vm_spec_change = True if set(VM_SPEC_PARAMS).intersection(_used_params) else False
mmazur marked this conversation as resolved.
Show resolved Hide resolved
changed = False
crud_executed = False
method = ''

# Underlying module_utils/k8s/* code knows only of state == present/absent; let's make sure not to confuse it
if ephemeral:
# Ephemerals don't actually support running/stopped; we treat those as aliases for present/absent instead
if our_state == 'running':
self.params['state'] = k8s_state = 'present'
elif our_state == 'stopped':
self.params['state'] = k8s_state = 'absent'
else:
if our_state != 'absent':
self.params['state'] = k8s_state = 'present'

self.client = self.get_api_client()
self._kind_resource = self.find_supported_resource(kind)
k8s_obj = self.get_resource(self._kind_resource)
if not self.check_mode and not vm_spec_change and k8s_state != 'absent' and not k8s_obj:
self.fail("It's impossible to create an empty VM or change state of a non-existent VM.")

# Changes in VM's spec or any changes to VMIs warrant a full CRUD, the latter because
# VMIs don't really have states to manage; they're either present or don't exist
# Also check_mode always warrants a CRUD, as that'll produce a sane result
if vm_spec_change or ephemeral or k8s_state == 'absent' or self.check_mode:
definition = self.construct_definition(kind, our_state, ephemeral)
result = self.execute_crud(kind, definition)
changed = result['changed']
k8s_obj = result['result']
method = result['method']
crud_executed = True

if ephemeral and self.params.get('wait') and k8s_state == 'present' and not self.check_mode:
# Waiting for k8s_state==absent is handled inside execute_crud()
k8s_obj = self._wait_for_vmi_running()

if not ephemeral and our_state in ['running', 'stopped'] and not self.check_mode:
# State==present/absent doesn't involve any additional VMI state management and is fully
# handled inside execute_crud() (including wait logic)
patched, k8s_obj = self.manage_vm_state(our_state, crud_executed)
changed = changed or patched
if changed:
method = method or 'patch'

# Return from the module:
self.exit_json(**{
'changed': changed,
'kubevirt_vm': result.pop('result'),
'result': result,
'kubevirt_vm': self.fix_serialization(k8s_obj),
'method': method
})


Expand Down