diff --git a/server/src/uds/services/OpenShift/openshift/client.py b/server/src/uds/services/OpenShift/openshift/client.py index 9ae6bd390..98296de47 100644 --- a/server/src/uds/services/OpenShift/openshift/client.py +++ b/server/src/uds/services/OpenShift/openshift/client.py @@ -34,7 +34,6 @@ import logging import requests import time -import token from uds.core.util import security from uds.core.util.cache import Cache @@ -42,10 +41,8 @@ from . import types, consts, exceptions - logger = logging.getLogger(__name__) - class OpenshiftClient: cluster_url: str api_url: str @@ -104,7 +101,7 @@ def get_token(self) -> str | None: except Exception as ex: logging.error(f"Could not obtain token: {ex}") raise - + def connect(self, force: bool = False) -> requests.Session: # For testing, always use the fixed token session = self._session = security.secure_requests_session(verify=self._verify_ssl) @@ -296,33 +293,28 @@ def monitor_vm_clone( """ Monitor the clone process of a virtual machine. """ - clone_url = f"{api_url}/apis/clone.kubevirt.io/v1alpha1/namespaces/{namespace}/virtualmachineclones/{clone_name}" - headers = {"Authorization": f"Bearer {self.get_token()}", "Accept": "application/json"} + path = f"/apis/clone.kubevirt.io/v1alpha1/namespaces/{namespace}/virtualmachineclones/{clone_name}" logging.info("Monitoring clone process for '%s'...", clone_name) while True: try: - response = requests.get(clone_url, headers=headers, verify=False) - if response.status_code == 200: - clone_data = response.json() - status = clone_data.get('status', {}) - phase = status.get('phase', 'Unknown') - logging.info("Phase: %s", phase) - for condition in status.get('conditions', []): - ctype = condition.get('type', '') - cstatus = condition.get('status', '') - cmsg = condition.get('message', '') - logging.info(" %s: %s - %s", ctype, cstatus, cmsg) - if phase == 'Succeeded': - logging.info("Clone '%s' completed successfully!", clone_name) - break - elif phase == 'Failed': - logging.error("Clone '%s' failed!", clone_name) - break - elif response.status_code == 404: - logging.warning("Clone resource '%s' not found. May have been cleaned up.", clone_name) + response = self.do_request('GET', path) + status = response.get('status', {}) + phase = status.get('phase', 'Unknown') + logging.info("Phase: %s", phase) + for condition in status.get('conditions', []): + ctype = condition.get('type', '') + cstatus = condition.get('status', '') + cmsg = condition.get('message', '') + logging.info(" %s: %s - %s", ctype, cstatus, cmsg) + if phase == 'Succeeded': + logging.info("Clone '%s' completed successfully!", clone_name) break - else: - logging.error("Error monitoring clone: %d", response.status_code) + elif phase == 'Failed': + logging.error("Clone '%s' failed!", clone_name) + break + except exceptions.OpenshiftNotFoundError: + logging.warning("Clone resource '%s' not found. May have been cleaned up.", clone_name) + break except Exception as e: logging.error("Monitoring exception: %s", e) logging.info("Waiting %d seconds before next check...", polling_interval) @@ -332,19 +324,16 @@ def get_vm_pvc_or_dv_name(self, api_url: str, namespace: str, vm_name: str) -> t """ Returns the name of the PVC or DataVolume used by the VM. """ - vm_url = f"{api_url}/apis/kubevirt.io/v1/namespaces/{namespace}/virtualmachines/{vm_name}" - headers = {"Authorization": f"Bearer {self.get_token()}", "Accept": "application/json"} - response = requests.get(vm_url, headers=headers, verify=False) - if response.status_code == 200: - vm_obj = response.json() - volumes = vm_obj.get("spec", {}).get("template", {}).get("spec", {}).get("volumes", []) - for vol in volumes: - pvc = vol.get("persistentVolumeClaim") - if pvc: - return pvc.get("claimName"), "pvc" - dv = vol.get("dataVolume") - if dv: - return dv.get("name"), "dv" + path = f"/apis/kubevirt.io/v1/namespaces/{namespace}/virtualmachines/{vm_name}" + response = self.do_request('GET', path) + volumes = response.get("spec", {}).get("template", {}).get("spec", {}).get("volumes", []) + for vol in volumes: + pvc = vol.get("persistentVolumeClaim") + if pvc: + return pvc.get("claimName"), "pvc" + dv = vol.get("dataVolume") + if dv: + return dv.get("name"), "dv" raise Exception(f"No PVC or DataVolume found in VM {vm_name}") def get_datavolume_phase(self, datavolume_name: str) -> str: @@ -352,13 +341,10 @@ def get_datavolume_phase(self, datavolume_name: str) -> str: Get the phase of a DataVolume. Returns the phase as a string. """ - url = f"{self.api_url}/apis/cdi.kubevirt.io/v1beta1/namespaces/{self.namespace}/datavolumes/{datavolume_name}" - headers = {'Authorization': f'Bearer {self.get_token()}', 'Accept': 'application/json'} + path = f"/apis/cdi.kubevirt.io/v1beta1/namespaces/{self.namespace}/datavolumes/{datavolume_name}" try: - response = requests.get(url, headers=headers, verify=self._verify_ssl, timeout=self._timeout) - if response.status_code == 200: - dv = response.json() - return dv.get('status', {}).get('phase', '') + response = self.do_request('GET', path) + return response.get('status', {}).get('phase', '') except Exception: pass return '' @@ -368,17 +354,14 @@ def get_datavolume_size(self, api_url: str, namespace: str, dv_name: str) -> str Get the size of a DataVolume. Returns the size as a string. """ - url = f"{api_url}/apis/cdi.kubevirt.io/v1beta1/namespaces/{namespace}/datavolumes/{dv_name}" - headers = {"Authorization": f"Bearer {self.get_token()}", "Accept": "application/json"} - response = requests.get(url, headers=headers, verify=False) - if response.status_code == 200: - dv = response.json() - size = dv.get("status", {}).get("amount", None) - if size: - return size - return ( - dv.get("spec", {}).get("pvc", {}).get("resources", {}).get("requests", {}).get("storage") or "" - ) + path = f"/apis/cdi.kubevirt.io/v1beta1/namespaces/{namespace}/datavolumes/{dv_name}" + response = self.do_request('GET', path) + size = response.get("status", {}).get("amount", None) + if size: + return size + return ( + response.get("spec", {}).get("pvc", {}).get("resources", {}).get("requests", {}).get("storage") or "" + ) raise Exception(f"Could not get the size of DataVolume {dv_name}") def get_pvc_size(self, api_url: str, namespace: str, pvc_name: str) -> str: @@ -386,14 +369,11 @@ def get_pvc_size(self, api_url: str, namespace: str, pvc_name: str) -> str: Get the size of a PVC. Returns the size as a string. """ - url = f"{api_url}/api/v1/namespaces/{namespace}/persistentvolumeclaims/{pvc_name}" - headers = {"Authorization": f"Bearer {self.get_token()}", "Accept": "application/json"} - response = requests.get(url, headers=headers, verify=False) - if response.status_code == 200: - pvc = response.json() - capacity = pvc.get("status", {}).get("capacity", {}).get("storage") - if capacity: - return capacity + path = f"/api/v1/namespaces/{namespace}/persistentvolumeclaims/{pvc_name}" + response = self.do_request('GET', path) + capacity = response.get("status", {}).get("capacity", {}).get("storage") + if capacity: + return capacity raise Exception(f"Could not get the size of PVC {pvc_name}") def clone_pvc_with_datavolume( @@ -409,12 +389,7 @@ def clone_pvc_with_datavolume( Clone a PVC using a DataVolume. Returns True if the DataVolume was created successfully, else False. """ - dv_url = f"{api_url}/apis/cdi.kubevirt.io/v1beta1/namespaces/{namespace}/datavolumes" - headers = { - "Authorization": f"Bearer {self.get_token()}", - "Accept": "application/json", - "Content-Type": "application/json", - } + path = f"/apis/cdi.kubevirt.io/v1beta1/namespaces/{namespace}/datavolumes" body: dict[str, typing.Any] = { "apiVersion": "cdi.kubevirt.io/v1beta1", "kind": "DataVolume", @@ -428,12 +403,13 @@ def clone_pvc_with_datavolume( }, }, } - response = requests.post(dv_url, headers=headers, json=body, verify=False) - if response.status_code == 201: + try: + self.do_request('POST', path, data=body) logging.info(f"DataVolume '{cloned_pvc_name}' created successfully") return True - logging.error(f"Failed to create DataVolume: {response.status_code} {response.text}") - return False + except Exception as e: + logging.error(f"Failed to create DataVolume: {e}") + return False def create_vm_from_pvc( self, @@ -448,16 +424,13 @@ def create_vm_from_pvc( Create a new VM from a cloned PVC using DataVolumeTemplates. Returns True if the VM was created successfully, else False. """ - original_vm_url = ( - f"{api_url}/apis/kubevirt.io/v1/namespaces/{namespace}/virtualmachines/{source_vm_name}" - ) - headers = {"Authorization": f"Bearer {self.get_token()}", "Accept": "application/json"} - resp = requests.get(original_vm_url, headers=headers, verify=False) - if resp.status_code != 200: - logging.error(f"Could not get source VM: {resp.status_code} {resp.text}") + path = f"/apis/kubevirt.io/v1/namespaces/{namespace}/virtualmachines/{source_vm_name}" + try: + vm_obj = self.do_request('GET', path) + except Exception as e: + logging.error(f"Could not get source VM: {e}") return False - vm_obj = resp.json() vm_obj['metadata']['name'] = new_vm_name for k in ['resourceVersion', 'uid', 'selfLink']: @@ -503,28 +476,27 @@ def create_vm_from_pvc( logger.info(f"Creating VM '{new_vm_name}' from cloned PVC '{new_dv_name}'.") #logger.info(f"VM Object: {vm_obj}") - create_url = f"{api_url}/apis/kubevirt.io/v1/namespaces/{namespace}/virtualmachines" - headers["Content-Type"] = "application/json" - resp = requests.post(create_url, headers=headers, json=vm_obj, verify=False) - if resp.status_code == 201: + create_path = f"/apis/kubevirt.io/v1/namespaces/{namespace}/virtualmachines" + try: + self.do_request('POST', create_path, data=vm_obj) logging.info(f"VM '{new_vm_name}' created successfully with DataVolumeTemplate.") return True - logging.error(f"Error creating VM: {resp.status_code} {resp.text}") - return False + except Exception as e: + logging.error(f"Error creating VM: {e}") + return False def delete_vm(self, api_url: str, namespace: str, vm_name: str) -> bool: """ Delete a VM by name. Returns True if the VM was deleted successfully, else False. """ - url = f"{api_url}/apis/kubevirt.io/v1/namespaces/{namespace}/virtualmachines/{vm_name}" - headers = {"Authorization": f"Bearer {self.get_token()}", "Accept": "application/json"} - response = requests.delete(url, headers=headers, verify=False) - if response.status_code in [200, 202]: + path = f"/apis/kubevirt.io/v1/namespaces/{namespace}/virtualmachines/{vm_name}" + try: + self.do_request('DELETE', path) logging.info(f"VM {vm_name} deleted successfully.") return True - else: - logging.error(f"Error deleting VM {vm_name}: {response.status_code} - {response.text}") + except Exception as e: + logging.error(f"Error deleting VM {vm_name}: {e}") return False def wait_for_datavolume_clone_progress( @@ -534,14 +506,12 @@ def wait_for_datavolume_clone_progress( Wait for a DataVolume clone to complete. Returns True if the clone completed successfully, else False. """ - url = f"{api_url}/apis/cdi.kubevirt.io/v1beta1/namespaces/{namespace}/datavolumes/{datavolume_name}" - headers = {"Authorization": f"Bearer {self.get_token()}", "Accept": "application/json"} + path = f"/apis/cdi.kubevirt.io/v1beta1/namespaces/{namespace}/datavolumes/{datavolume_name}" start = time.time() while time.time() - start < timeout: - response = requests.get(url, headers=headers, verify=False) - if response.status_code == 200: - dv = response.json() - status = dv.get('status', {}) + try: + response = self.do_request('GET', path) + status = response.get('status', {}) phase = status.get('phase') progress = status.get('progress', 'N/A') logging.info(f"DataVolume {datavolume_name} status: {phase}, progress: {progress}") @@ -551,8 +521,8 @@ def wait_for_datavolume_clone_progress( elif phase == 'Failed': logging.error(f"DataVolume {datavolume_name} clone failed") return False - else: - logging.error(f"Error querying DataVolume {datavolume_name}: {response.status_code}") + except Exception as e: + logging.error(f"Error querying DataVolume {datavolume_name}: {e}") time.sleep(polling_interval) logging.error(f"Timeout waiting for DataVolume {datavolume_name} clone") return False @@ -562,40 +532,47 @@ def start_vm(self, api_url: str, namespace: str, vm_name: str) -> bool: Start a VM by name. Returns True if the VM was started successfully, else False. """ - url = f"{api_url}/apis/kubevirt.io/v1/namespaces/{namespace}/virtualmachines/{vm_name}" - headers = { - "Authorization": f"Bearer {self.get_token()}", - "Content-Type": "application/merge-patch+json", - "Accept": "application/json", - } - body: dict[str, typing.Any] = {"spec": {"runStrategy": "Always"}} - response = requests.patch(url, headers=headers, json=body, verify=False) - if response.status_code in [200, 201]: - logging.info(f"VM {vm_name} started.") - return True - else: - logging.info(f"Error starting VM {vm_name}: {response.status_code} - {response.text}") + + # Get Vm info + path = f"/apis/kubevirt.io/v1/namespaces/{namespace}/virtualmachines/{vm_name}" + try: + vm_obj = self.do_request('GET', path) + except Exception as e: + logging.error(f"Could not get source VM: {e}") return False + + # Update runStrategy to Always + vm_obj['spec']['runStrategy'] = 'Always' + try: + self.do_request('PUT', path, data=vm_obj) + logging.info(f"VM {vm_name} will be started.") + return True + except Exception as e: + logging.info(f"Error starting VM {vm_name}: {e}") + return False def stop_vm(self, api_url: str, namespace: str, vm_name: str) -> bool: """ Stop a VM by name. Returns True if the VM was stopped successfully, else False. """ - url = f"{api_url}/apis/kubevirt.io/v1/namespaces/{namespace}/virtualmachines/{vm_name}" - headers = { - "Authorization": f"Bearer {self.get_token()}", - "Content-Type": "application/merge-patch+json", - "Accept": "application/json", - } - body: dict[str, typing.Any] = {"spec": {"runStrategy": "Halted"}} - response = requests.patch(url, headers=headers, json=body, verify=False) - if response.status_code in [200, 201]: + # Get Vm info + path = f"/apis/kubevirt.io/v1/namespaces/{namespace}/virtualmachines/{vm_name}" + try: + vm_obj = self.do_request('GET', path) + except Exception as e: + logging.error(f"Could not get source VM: {e}") + return False + + # Update runStrategy to Halted + vm_obj['spec']['runStrategy'] = 'Halted' + try: + self.do_request('PUT', path, data=vm_obj) logging.info(f"VM {vm_name} will be stopped.") return True - else: - logging.info(f"Error stopping VM {vm_name}: {response.status_code} - {response.text}") - return False + except Exception as e: + logging.info(f"Error starting VM {vm_name}: {e}") + return False def copy_vm_same_size( self, api_url: str, namespace: str, source_vm_name: str, new_vm_name: str, storage_class: str @@ -687,22 +664,3 @@ def delete_vm_instance(self, vm_name: str) -> bool: except Exception as e: logging.error(f"Error deleting VM: {e}") return False - - def clone_vm_instance(self, source_vm_name: str, new_vm_name: str, storage_class: str) -> bool: - """ - Clone a VM by name, creating a new VM with the same size. - Returns True if clone succeeded, False otherwise. - """ - try: - self.copy_vm_same_size(self.api_url, self.namespace, source_vm_name, new_vm_name, storage_class) - return True - except Exception as e: - logging.error(f"Error cloning VM: {e}") - return False - - @staticmethod - def validate_vm_id(vm_id: str | int) -> None: - try: - int(vm_id) - except ValueError: - raise exceptions.OpenshiftNotFoundError(f'VM {vm_id} not found') diff --git a/server/src/uds/services/OpenShift/provider.py b/server/src/uds/services/OpenShift/provider.py index 1cdc831e9..f2991c3cb 100644 --- a/server/src/uds/services/OpenShift/provider.py +++ b/server/src/uds/services/OpenShift/provider.py @@ -126,10 +126,17 @@ def test( def sanitized_name(self, name: str) -> str: """ Sanitizes the VM name to comply with RFC 1123: - - Lowercase - - Alphanumeric, '-', '.' - - Starts/ends with alphanumeric - - Max length 63 chars + - Converts to lowercase + - Replaces any character not in [a-z0-9.-] with '-' + - Collapses multiple '-' into one + - Removes leading/trailing non-alphanumeric characters + - Limits length to 63 characters """ - name = re.sub(r'^[^a-z0-9]+|[^a-z0-9.-]|-{2,}|[^a-z0-9]+$', '-', name.lower()) - return name[:63] + name = name.lower() + # Replace any character not allowed with '-' + name = re.sub(r'[^a-z0-9.-]', '-', name) + # Collapse multiple '-' into one + name = re.sub(r'-{2,}', '-', name) + # Remove leading/trailing non-alphanumeric characters + name = re.sub(r'^[^a-z0-9]+|[^a-z0-9]+$', '', name) + return name[:63] \ No newline at end of file diff --git a/server/tests/services/openshift/__init__.py b/server/tests/services/openshift/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/server/tests/services/openshift/fixtures.py b/server/tests/services/openshift/fixtures.py new file mode 100644 index 000000000..d85b56efd --- /dev/null +++ b/server/tests/services/openshift/fixtures.py @@ -0,0 +1,348 @@ +# -*- coding: utf-8 -*- +""" +Test fixtures for OpenShift service tests. +Provides reusable functions and mock objects for unit testing OpenShift provider, service, deployment, publication, and user service logic. +All functions are designed to be used across multiple test modules for consistency and maintainability. +""" + +# +# Copyright (c) 2024 Virtual Cable S.L.U. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without modification, +# are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors +# may be used to endorse or promote products derived from this software +# without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +""" +Author: Adolfo Gómez, dkmaster at dkmon dot com +""" +import contextlib +import copy +import functools +import random +import typing + +from unittest import mock +import uuid + + +from uds.core import environment +from uds.core.ui.user_interface import gui +from uds.models.user import User + +from uds.services.OpenShift import service, service_fixed, provider, publication, deployment, deployment_fixed +from uds.services.OpenShift.openshift import types as openshift_types, exceptions as openshift_exceptions + +DEF_VMS: list[openshift_types.VM] = [ + openshift_types.VM( + name=f'vm-{i}', + namespace='default', + uid=f'uid-{i}', + status=openshift_types.VMStatus.STOPPED if i % 2 == 0 else openshift_types.VMStatus.RUNNING, + volume_template=openshift_types.VolumeTemplate(name=f'volume-{i}', storage='10Gi'), + disks=[openshift_types.DeviceDisk(name=f'disk-{i}', boot_order=1)], + volumes=[openshift_types.Volume(name=f'volume-{i}', data_volume=f'dv-{i}')], + ) + for i in range(1, 11) +] +DEF_VM_INSTANCES: list[openshift_types.VMInstance] = [ + openshift_types.VMInstance( + name=f'vm-{i}', + namespace='default', + uid=f'uid-instance-{i}', + interfaces=[ + openshift_types.Interface( + name='eth0', + mac_address=f'00:11:22:33:44:{i:02x}', + ip_address=f'192.168.1.{i}', + ) + ], + status=openshift_types.VMStatus.STOPPED if i % 2 == 0 else openshift_types.VMStatus.RUNNING, + phase=openshift_types.VMStatus.STOPPED if i % 2 == 0 else openshift_types.VMStatus.RUNNING, + ) + for i in range(1, 11) +] + +# clone values to avoid modifying the original ones +VMS: list[openshift_types.VM] = copy.deepcopy(DEF_VMS) +VM_INSTANCES: list[openshift_types.VMInstance] = copy.deepcopy(DEF_VM_INSTANCES) + + +def clear() -> None: + """ + Reset all VM and VMInstance values to their default state. + Use this before each test to ensure a clean environment. + """ + VMS[:] = copy.deepcopy(DEF_VMS) + VM_INSTANCES[:] = copy.deepcopy(DEF_VM_INSTANCES) + + +def replace_vm_info(vm_name: str, **kwargs: typing.Any) -> None: + """ + Update attributes of a VM in VMS by name. + Raises OpenshiftNotFoundError if VM is not found. + """ + try: + vm = next(vm for vm in VMS if vm.name == vm_name) + for k, v in kwargs.items(): + setattr(vm, k, v) + except Exception: + raise openshift_exceptions.OpenshiftNotFoundError(f'VM {vm_name} not found') + + +def replacer_vm_info(**kwargs: typing.Any) -> typing.Callable[..., None]: + """ + Returns a partial function to update VM info with preset kwargs. + Useful for patching or repeated updates in tests. + """ + return functools.partial(replace_vm_info, **kwargs) + + +T = typing.TypeVar('T') + + +def returner(value: T, *args: typing.Any, **kwargs: typing.Any) -> typing.Callable[..., T]: + """ + Returns a function that always returns the given value. + Useful for mocking return values in tests. + """ + def inner(*args: typing.Any, **kwargs: typing.Any) -> T: + return value + + return inner + + +# Provider values +PROVIDER_VALUES_DICT: gui.ValuesDictType = { + 'cluster_url': 'https://oauth-openshift.apps-crc.testing', + 'api_url': 'https://api.crc.testing:6443', + 'username': 'kubeadmin', + 'password': 'test-password', + 'namespace': 'default', + 'verify_ssl': False, + 'concurrent_creation_limit': 1, + 'concurrent_removal_limit': 1, + 'timeout': 10, +} + +# Service values +SERVICE_VALUES_DICT: gui.ValuesDictType = { + 'template': VMS[0].name, + 'basename': 'base', + 'lenname': 4, + 'publication_timeout': 120, + 'prov_uuid': '', +} + +# Service fixed values +SERVICE_FIXED_VALUES_DICT: gui.ValuesDictType = { + 'token': '', + 'machines': [VMS[2].name, VMS[3].name, VMS[4].name], + 'on_logout': 'no', + 'randomize': False, + 'maintain_on_error': False, + 'prov_uuid': '', +} + + +def create_client_mock() -> mock.Mock: + """ + Create a MagicMock for OpenshiftClient with default behaviors and side effects. + Used to simulate API responses in provider/service tests. + """ + client = mock.MagicMock() + + # Prepare deep copies of default data + client.test.return_value = True + client.list_vms.return_value = copy.deepcopy(DEF_VMS) + client.start_vm_instance.return_value = True + client.stop_vm_instance.return_value = True + client.delete_vm_instance.return_value = True + client.get_datavolume_phase.return_value = "Succeeded" + client.get_vm_pvc_or_dv_name.return_value = ("test-pvc", "pvc") + client.get_pvc_size.return_value = "10Gi" + client.create_vm_from_pvc.return_value = True + client.wait_for_datavolume_clone_progress.return_value = True + + def get_vm_info_side_effect(vm_name: str, **kwargs: typing.Any) -> openshift_types.VM | None: + for vm in VMS: + if vm.name == vm_name: + return vm + return None + + def get_vm_instance_info_side_effect(vm_name: str, **kwargs: typing.Any) -> openshift_types.VMInstance | None: + for inst in VM_INSTANCES: + if inst.name == vm_name: + return inst + return None + + client.get_vm_info.side_effect = get_vm_info_side_effect + client.get_vm_instance_info.side_effect = get_vm_instance_info_side_effect + + return client + + +@contextlib.contextmanager +def patched_provider(**kwargs: typing.Any) -> typing.Generator[provider.OpenshiftProvider, None, None]: + """ + Context manager that yields a provider with a patched OpenshiftClient mock. + Use this to ensure all API calls are intercepted and controlled in tests. + """ + client = create_client_mock() + prov = create_provider(**kwargs) + prov._cached_api = client + yield prov + + +def create_provider(**kwargs: typing.Any) -> provider.OpenshiftProvider: + """ + Create an OpenshiftProvider instance with default or overridden values. + Used for provider-level tests and as a dependency for other fixtures. + """ + values = PROVIDER_VALUES_DICT.copy() + values.update(kwargs) + + uuid_ = str(uuid.uuid4()) + return provider.OpenshiftProvider( + environment=environment.Environment.private_environment(uuid_), values=values, uuid=uuid_ + ) + + +def create_service( + provider: typing.Optional[provider.OpenshiftProvider] = None, **kwargs: typing.Any +) -> service.OpenshiftService: + """ + Create an OpenshiftService instance (dynamic service). + Used for service-level tests and as a dependency for user services and publications. + """ + uuid_ = str(uuid.uuid4()) + values = SERVICE_VALUES_DICT.copy() + values.update(kwargs) + srvc = service.OpenshiftService( + environment=environment.Environment.private_environment(uuid_), + provider=provider or create_provider(), + values=values, + uuid=uuid_, + ) + return srvc + + +def create_service_fixed( + provider: typing.Optional[provider.OpenshiftProvider] = None, **kwargs: typing.Any +) -> service_fixed.OpenshiftServiceFixed: + """ + Create an OpenshiftServiceFixed instance (fixed service). + Used for fixed service tests and as a dependency for fixed user services. + """ + uuid_ = str(uuid.uuid4()) + values = SERVICE_FIXED_VALUES_DICT.copy() + values.update(kwargs) + return service_fixed.OpenshiftServiceFixed( + environment=environment.Environment.private_environment(uuid_), + provider=provider or create_provider(), + values=values, + uuid=uuid_, + ) + + +def create_publication( + service: typing.Optional[service.OpenshiftService] = None, + **kwargs: typing.Any, +) -> publication.OpenshiftTemplatePublication: + """ + Create an OpenshiftTemplatePublication instance. + Used for publication-level tests and as a dependency for user services. + """ + uuid_ = str(uuid.uuid4()) + pub = publication.OpenshiftTemplatePublication( + environment=environment.Environment.private_environment(uuid_), + service=service or create_service(**kwargs), + revision=1, + servicepool_name='servicepool_name', + uuid=uuid_, + ) + pub._name = f"pub-{random.randint(1000, 9999)}" + return pub + + +def create_userservice( + service: typing.Optional[service.OpenshiftService] = None, + publication: typing.Optional[publication.OpenshiftTemplatePublication] = None, +) -> deployment.OpenshiftUserService: + """ + Create an OpenshiftUserService instance (dynamic user service). + Used for user service tests that require a publication and service. + """ + uuid_ = str(uuid.uuid4()) + return deployment.OpenshiftUserService( + environment=environment.Environment.private_environment(uuid_), + service=service or create_service(), + publication=publication or create_publication(), + uuid=uuid_, + ) + + +def create_userservice_fixed( + service: typing.Optional[service_fixed.OpenshiftServiceFixed] = None, +) -> deployment_fixed.OpenshiftUserServiceFixed: + """ + Create an OpenshiftUserServiceFixed instance (fixed user service). + Used for tests of fixed user service logic and lifecycle. + """ + uuid_ = str(uuid.uuid4().hex) + return deployment_fixed.OpenshiftUserServiceFixed( + environment=environment.Environment.private_environment(uuid_), + service=service or create_service_fixed(), + publication=None, + uuid=uuid_, + ) + + +def create_user( + name: str = "testuser", + real_name: str = "Test User", + is_admin: bool = False, + state: str = 'A', + password: str = 'password', + mfa_data: str = '', + staff_member: bool = False, + last_access: typing.Optional[str] = None, + parent: typing.Optional[User] = None, + created: typing.Optional[str] = None, + comments: str = '', +) -> User: + """ + Create a mock User instance for testing. + All fields can be customized for specific test scenarios. + """ + user = mock.Mock(spec=User) + user.name = name + user.real_name = real_name + user.is_admin = is_admin + user.state = state + user.password = password + user.mfa_data = mfa_data + user.staff_member = staff_member + user.last_access = last_access + user.parent = parent + user.created = created + user.comments = comments + return user diff --git a/server/tests/services/openshift/test_client.py b/server/tests/services/openshift/test_client.py new file mode 100644 index 000000000..ce2a33e6e --- /dev/null +++ b/server/tests/services/openshift/test_client.py @@ -0,0 +1,189 @@ +# -*- coding: utf-8 -*- + +# +# Copyright (c) 2024 Virtual Cable S.L.U. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without modification, +# are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors +# may be used to endorse or promote products derived from this software +# without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +""" +Author: Adolfo Gómez, dkmaster at dkmon dot com +""" + +import logging + +from uds.services.OpenShift.openshift import client as openshift_client +from tests.utils.test import UDSTransactionTestCase +from tests.utils import vars + +logger = logging.getLogger(__name__) + +class TestOpenshiftClient(UDSTransactionTestCase): + """Tests for operations with OpenShiftClient.""" + + os_client: openshift_client.OpenshiftClient + test_vm: str = '' + test_pool: str = '' + test_storage: str = '' + + def setUp(self) -> None: + """ + Set up OpenShift client and test variables for each test. + Skips tests if required variables are missing. + """ + v = vars.get_vars('openshift') + if not v: + self.skipTest('No OpenShift test variables found') + self.os_client = openshift_client.OpenshiftClient( + cluster_url=v['cluster_url'], + api_url=v['api_url'], + username=v['username'], + password=v['password'], + namespace=v['namespace'], + timeout=int(v['timeout']), + verify_ssl=v['verify_ssl'] == 'true', + ) + self.test_vm = v.get('test_vm', '') + self.test_pool = v.get('test_pool', '') + self.test_storage = v.get('test_storage', '') + + # --- Token/API Tests --- + def test_get_token(self) -> None: + """ + Test that get_token returns a valid token string. + """ + token = self.os_client.get_token() + self.assertIsNotNone(token) + + def test_get_api_url(self) -> None: + """ + Test that get_api_url constructs a valid URL with path and parameters. + """ + url = self.os_client.get_api_url('/test/path', ('param1', 'value1')) + self.assertIn('/test/path', url) + self.assertIn('param1=value1', url) + + def test_get_api_url_invalid(self): + """ + Test that get_api_url works with an invalid path. + """ + url = self.os_client.get_api_url('/invalid/path', ('param', 'value')) + self.assertIn('/invalid/path', url) + + # --- VM Listing/Info Tests --- + def test_list_vms(self) -> None: + """ + Test that list_vms returns a list and get_vm_info works for listed VMs. + """ + vms = self.os_client.list_vms() + self.assertIsInstance(vms, list) + if vms: + info = self.os_client.get_vm_info(vms[0].name) + self.assertIsNotNone(info) + + def test_list_vms_and_check_fields(self): + """ + Test that all VMs returned by list_vms have required fields. + """ + vms = self.os_client.list_vms() + self.assertIsInstance(vms, list) + for vm in vms: + self.assertTrue(hasattr(vm, 'name')) + self.assertTrue(hasattr(vm, 'namespace')) + + def test_get_vm_info(self): + """ + Test that get_vm_info returns info for a valid VM name. + """ + if not self.test_vm: + self.skipTest('No test_vm specified') + info = self.os_client.get_vm_info(self.test_vm) + self.assertIsNotNone(info) + + def test_get_vm_info_invalid(self): + """ + Test that get_vm_info returns None for an invalid VM name. + """ + info = self.os_client.get_vm_info('nonexistent-vm') + self.assertIsNone(info) + + def test_get_vm_instance_info(self): + """ + Test that get_vm_instance_info returns info or None for a valid VM name. + """ + if not self.test_vm: + self.skipTest('No test_vm specified') + info = self.os_client.get_vm_instance_info(self.test_vm) + self.assertTrue(info is None or hasattr(info, 'name')) + + def test_get_vm_instance_info_invalid(self): + """ + Test that get_vm_instance_info returns None for an invalid VM name. + """ + info = self.os_client.get_vm_instance_info('nonexistent-vm') + self.assertIsNone(info) + + # --- VM Lifecycle and Actions --- + def test_vm_lifecycle(self) -> None: + """ + Test VM lifecycle actions: start, stop, delete (skipped in shared environments). + """ + self.skipTest('Skip this test to avoid issues in shared environments') + if not self.test_vm: + self.skipTest('No test_vm specified in test-vars.ini') + self.assertTrue(self.os_client.start_vm_instance(self.test_vm)) + self.assertTrue(self.os_client.stop_vm_instance(self.test_vm)) + self.assertTrue(self.os_client.delete_vm_instance(self.test_vm)) + + def test_start_stop_suspend_resume_vm(self): + """ + Test stop (and optionally start) VM instance. Suspend/resume skipped if not supported. + """ + if not self.test_vm: + self.skipTest('No test_vm specified') + #self.assertTrue(self.os_client.start_vm_instance(self.test_vm)) + self.assertTrue(self.os_client.stop_vm_instance(self.test_vm)) + # Suspend/resume skipped if not supported + + def test_delete_vm_invalid(self): + """ + Test that delete_vm_instance returns False for an invalid VM name. + """ + self.assertFalse(self.os_client.delete_vm_instance('nonexistent-vm')) + + # --- DataVolume Tests --- + # --- DataVolume Tests --- + def test_datavolume_phase(self) -> None: + """ + Test that get_datavolume_phase returns a string for a valid datavolume. + """ + phase = self.os_client.get_datavolume_phase('test-dv') + self.assertIsInstance(phase, str) + + def test_datavolume_phase_invalid(self): + """ + Test that get_datavolume_phase returns a string for an invalid datavolume. + """ + phase = self.os_client.get_datavolume_phase('nonexistent-dv') + self.assertIsInstance(phase, str) diff --git a/server/tests/services/openshift/test_deployment.py b/server/tests/services/openshift/test_deployment.py new file mode 100644 index 000000000..fb0f55555 --- /dev/null +++ b/server/tests/services/openshift/test_deployment.py @@ -0,0 +1,163 @@ +# -*- coding: utf-8 -*- + +# +# Copyright (c) 2024 Virtual Cable S.L.U. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without modification, +# are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors +# may be used to endorse or promote products derived from this software +# without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +""" +Author: Adolfo Gómez, dkmaster at dkmon dot com +""" + +from unittest import mock + +from tests.services.openshift import fixtures +from uds.core.types.states import TaskState +from tests.utils.test import UDSTransactionTestCase + + +class TestOpenshiftDeployment(UDSTransactionTestCase): + def _create_userservice(self): + """ + Helper to create a userservice instance with a preset name for deployment operation tests. + Returns: + userservice: A userservice object with name 'test-vm'. + """ + userservice = fixtures.create_userservice() + userservice._name = 'test-vm' + return userservice + def setUp(self) -> None: + super().setUp() + fixtures.clear() + + # --- Create operation tests --- + def test_op_create_success(self) -> None: + """ + Test successful VM creation operation. + Should clear the waiting_name flag after creation. + """ + userservice = self._create_userservice() + userservice._waiting_name = False + api = userservice.service().api + with mock.patch.object(api, 'get_vm_pvc_or_dv_name', return_value=('test-pvc', 'pvc')), \ + mock.patch.object(api, 'get_pvc_size', return_value='10Gi'), \ + mock.patch.object(api, 'create_vm_from_pvc', return_value=True), \ + mock.patch.object(api, 'wait_for_datavolume_clone_progress', return_value=True): + userservice.op_create() + self.assertFalse(userservice._waiting_name) + + def test_op_create_failure(self) -> None: + """ + Test failed VM creation operation. + Should set the waiting_name flag if creation fails. + """ + userservice = self._create_userservice() + api = userservice.service().api + userservice._waiting_name = False + with mock.patch.object(api, 'get_vm_pvc_or_dv_name', return_value=('test-pvc', 'pvc')), \ + mock.patch.object(api, 'get_pvc_size', return_value='10Gi'), \ + mock.patch.object(api, 'create_vm_from_pvc', return_value=False): + userservice.op_create() + self.assertTrue(userservice._waiting_name) + + def test_op_create_checker_running(self) -> None: + """ + Test create checker returns RUNNING when datavolume phase is pending. + """ + userservice = self._create_userservice() + api = userservice.service().api + with mock.patch.object(api, 'get_datavolume_phase', return_value='Pending'): + state = userservice.op_create_checker() + self.assertEqual(state, TaskState.RUNNING) + + def test_op_create_checker_finished(self) -> None: + """ + Test create checker returns FINISHED when datavolume phase is succeeded and VM info is available. + """ + userservice = self._create_userservice() + api = userservice.service().api + with mock.patch.object(api, 'get_datavolume_phase', return_value='Succeeded'), \ + mock.patch.object(api, 'get_vm_info', return_value=fixtures.VMS[0]), \ + mock.patch.object(api, 'get_vm_instance_info', return_value=fixtures.VM_INSTANCES[0]): + state = userservice.op_create_checker() + self.assertEqual(state, TaskState.FINISHED) + + # --- Delete operation tests --- + def test_op_delete_checker_finished(self) -> None: + """ + Test delete checker returns FINISHED when VM info is None (deleted). + """ + userservice = self._create_userservice() + api = userservice.service().api + with mock.patch.object(api, 'get_vm_info', return_value=None): + state = userservice.op_delete_checker() + self.assertEqual(state, TaskState.FINISHED) + + def test_op_delete_checker_running(self) -> None: + """ + Test delete checker returns RUNNING when VM info still exists. + """ + userservice = self._create_userservice() + api = userservice.service().api + with mock.patch.object(api, 'get_vm_info', return_value=fixtures.VMS[0]): + state = userservice.op_delete_checker() + self.assertEqual(state, TaskState.RUNNING) + + def test_op_delete_completed_checker(self) -> None: + """ + Test delete completed checker always returns FINISHED. + """ + userservice = self._create_userservice() + state = userservice.op_delete_completed_checker() + self.assertEqual(state, TaskState.FINISHED) + + # --- Cancel operation tests --- + def test_op_cancel_checker_finished(self) -> None: + """ + Test cancel checker returns FINISHED when VM info is None (cancelled). + """ + userservice = self._create_userservice() + api = userservice.service().api + with mock.patch.object(api, 'get_vm_info', return_value=None): + state = userservice.op_cancel_checker() + self.assertEqual(state, TaskState.FINISHED) + + def test_op_cancel_checker_running(self) -> None: + """ + Test cancel checker returns RUNNING when VM info still exists. + """ + userservice = self._create_userservice() + api = userservice.service().api + with mock.patch.object(api, 'get_vm_info', return_value=fixtures.VMS[0]): + state = userservice.op_cancel_checker() + self.assertEqual(state, TaskState.RUNNING) + + def test_op_cancel_completed_checker(self) -> None: + """ + Test cancel completed checker always returns FINISHED. + """ + userservice = self._create_userservice() + state = userservice.op_cancel_completed_checker() + self.assertEqual(state, TaskState.FINISHED) diff --git a/server/tests/services/openshift/test_deployment_fixed.py b/server/tests/services/openshift/test_deployment_fixed.py new file mode 100644 index 000000000..1c6720826 --- /dev/null +++ b/server/tests/services/openshift/test_deployment_fixed.py @@ -0,0 +1,163 @@ +# -*- coding: utf-8 -*- + +# +# Copyright (c) 2024 Virtual Cable S.L.U. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without modification, +# are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors +# may be used to endorse or promote products derived from this software +# without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +""" +Author: Adolfo Gómez, dkmaster at dkmon dot com +""" + +from unittest import mock +from uds.core.types.states import TaskState +from tests.services.openshift import fixtures +from tests.utils.test import UDSTransactionTestCase + +class TestOpenshiftUserServiceFixed(UDSTransactionTestCase): + def _create_userservice_fixed(self): + """ + Helper to create a fixed userservice instance for deployment_fixed operation tests. + Returns: + userservice: A fixed userservice object with name 'fixed-vm'. + """ + userservice = fixtures.create_userservice_fixed() + userservice._name = 'fixed-vm' + return userservice + def setUp(self) -> None: + super().setUp() + fixtures.clear() + + # --- Start operation tests --- + def test_op_start_vm_running(self) -> None: + """ + Test that op_start does not start VM if it is already running. + """ + userservice = self._create_userservice_fixed() + api = userservice.service().provider().api + vm_mock = mock.Mock() + vm_mock.status.is_off.return_value = False + with mock.patch.object(api, 'get_vm_info', return_value=vm_mock): + with mock.patch.object(api, 'start_vm_instance') as start_mock: + userservice.op_start() + start_mock.assert_not_called() + + def test_op_start_vm_off(self) -> None: + """ + Test that op_start starts VM if it is off. + """ + userservice = self._create_userservice_fixed() + api = userservice.service().provider().api + vm_mock = mock.Mock() + vm_mock.status.is_off.return_value = True + with mock.patch.object(api, 'get_vm_info', return_value=vm_mock): + with mock.patch.object(api, 'start_vm_instance') as start_mock: + userservice.op_start() + start_mock.assert_called_once_with('fixed-vm') + + # --- Stop operation tests --- + def test_op_stop_vm_off(self) -> None: + """ + Test that op_stop does not stop VM if it is already off. + """ + userservice = self._create_userservice_fixed() + api = userservice.service().provider().api + vm_mock = mock.Mock() + vm_mock.status.is_off.return_value = True + with mock.patch.object(api, 'get_vm_info', return_value=vm_mock): + with mock.patch.object(api, 'stop_vm_instance') as stop_mock: + userservice.op_stop() + stop_mock.assert_not_called() + + def test_op_stop_vm_running(self) -> None: + """ + Test that op_stop stops VM if it is running. + """ + userservice = self._create_userservice_fixed() + api = userservice.service().provider().api + vm_mock = mock.Mock() + vm_mock.status.is_off.return_value = False + with mock.patch.object(api, 'get_vm_info', return_value=vm_mock): + with mock.patch.object(api, 'stop_vm_instance') as stop_mock: + userservice.op_stop() + stop_mock.assert_called_once_with('fixed-vm') + + # --- Start checker tests --- + def test_op_start_checker_running(self) -> None: + """ + Test that op_start_checker returns RUNNING if VM status is not error. + """ + userservice = self._create_userservice_fixed() + api = userservice.service().provider().api + status_mock = mock.Mock() + status_mock.is_error.return_value = False + vm_mock = mock.Mock() + vm_mock.status = status_mock + with mock.patch.object(api, 'get_vm_info', return_value=vm_mock): + state = userservice.op_start_checker() + self.assertEqual(state, TaskState.RUNNING) + + def test_op_start_checker_finished(self) -> None: + """ + Test that op_start_checker returns FINISHED if VM status is RUNNING. + """ + userservice = self._create_userservice_fixed() + api = userservice.service().provider().api + vm_mock = mock.Mock() + from uds.services.OpenShift.openshift import types as opensh_types + vm_mock.status = opensh_types.VMStatus.RUNNING + with mock.patch.object(api, 'get_vm_info', return_value=vm_mock): + state = userservice.op_start_checker() + self.assertEqual(state, TaskState.FINISHED) + + # --- Stop checker tests --- + def test_op_stop_checker_running(self) -> None: + """ + Test that op_stop_checker returns RUNNING if VM status is not error. + """ + userservice = self._create_userservice_fixed() + api = userservice.service().provider().api + vm_mock = mock.Mock() + status_mock = mock.Mock() + status_mock.is_error.return_value = False + vm_mock.status = status_mock + with mock.patch.object(api, 'get_vm_info', return_value=vm_mock): + state = userservice.op_stop_checker() + self.assertEqual(state, TaskState.RUNNING) + + def test_op_stop_checker_finished(self) -> None: + """ + Test that op_stop_checker returns FINISHED if VM status is STOPPED. + """ + userservice = self._create_userservice_fixed() + api = userservice.service().provider().api + vm_mock = mock.Mock() + from uds.services.OpenShift.openshift import types as opensh_types + vm_mock.status = opensh_types.VMStatus.STOPPED + with mock.patch.object(api, 'get_vm_info', return_value=vm_mock): + state = userservice.op_stop_checker() + self.assertEqual(state, TaskState.FINISHED) + + diff --git a/server/tests/services/openshift/test_provider.py b/server/tests/services/openshift/test_provider.py new file mode 100644 index 000000000..69abc0d9f --- /dev/null +++ b/server/tests/services/openshift/test_provider.py @@ -0,0 +1,143 @@ +# -*- coding: utf-8 -*- + +# +# Copyright (c) 2024 Virtual Cable S.L.U. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without modification, +# are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors +# may be used to endorse or promote products derived from this software +# without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +""" +Author: Adolfo Gómez, dkmaster at dkmon dot com +""" +import typing +from unittest import mock + +from uds.core import types, ui, environment +from uds.services.OpenShift.provider import OpenshiftProvider + +from . import fixtures + +from tests.utils.test import UDSTransactionTestCase + + +class TestOpenshiftProvider(UDSTransactionTestCase): + def setUp(self) -> None: + """ + Set up test environment and clear fixtures before each test. + """ + super().setUp() + fixtures.clear() + + # --- Provider Data Tests --- + def test_provider_data(self) -> None: + """ + Test provider data fields and types for correct initialization. + """ + provider = fixtures.create_provider() + self.assertEqual(provider.cluster_url.value, fixtures.PROVIDER_VALUES_DICT['cluster_url']) + self.assertEqual(provider.api_url.value, fixtures.PROVIDER_VALUES_DICT['api_url']) + self.assertEqual(provider.username.value, fixtures.PROVIDER_VALUES_DICT['username']) + self.assertEqual(provider.password.value, fixtures.PROVIDER_VALUES_DICT['password']) + self.assertEqual(provider.namespace.value, fixtures.PROVIDER_VALUES_DICT['namespace']) + self.assertEqual(provider.verify_ssl.value, fixtures.PROVIDER_VALUES_DICT['verify_ssl']) + if not isinstance(provider.concurrent_creation_limit, ui.gui.NumericField): + self.fail('concurrent_creation_limit is not a NumericField') + self.assertEqual(provider.concurrent_creation_limit.as_int(), fixtures.PROVIDER_VALUES_DICT['concurrent_creation_limit']) + if not isinstance(provider.concurrent_removal_limit, ui.gui.NumericField): + self.fail('concurrent_removal_limit is not a NumericField') + self.assertEqual(provider.concurrent_removal_limit.as_int(), fixtures.PROVIDER_VALUES_DICT['concurrent_removal_limit']) + self.assertEqual(provider.timeout.as_int(), fixtures.PROVIDER_VALUES_DICT['timeout']) + + # --- Provider Test Method --- + def test_provider_test(self) -> None: + """ + Test the static provider test method and test_connection logic. + """ + with fixtures.patched_provider() as provider: + api = typing.cast(mock.MagicMock, provider.api) + for ret_val in [True, False]: + api.test.reset_mock() + api.test.return_value = ret_val + # Patch test_connection to return ret_val for static test + with mock.patch('uds.services.OpenShift.provider.OpenshiftProvider.test_connection', return_value=ret_val): + result = OpenshiftProvider.test(environment.Environment.temporary_environment(), fixtures.PROVIDER_VALUES_DICT) + self.assertIsInstance(result, types.core.TestResult) + self.assertEqual(result.success, ret_val) + self.assertIsInstance(result.error, str) + # Ensure test_connection calls api.test + provider.test_connection() + api.test.assert_called_once_with() + + # --- Provider Availability --- + def test_provider_is_available(self) -> None: + """ + Test the provider is_available method and cache behavior. + """ + with fixtures.patched_provider() as provider: + api = typing.cast(mock.MagicMock, provider.api) + # First, true result + self.assertEqual(provider.is_available(), True) + api.test.assert_called_once_with() + api.test.reset_mock() + # Now, even if set test to false, should return true due to cache + api.test.return_value = False + self.assertEqual(provider.is_available(), True) + api.test.assert_not_called() + # clear cache of method + provider.is_available.cache_clear() # type: ignore # cache_clear() is added by decorator + self.assertEqual(provider.is_available(), False) + api.test.assert_called_once_with() + + # --- Provider API Methods --- + def test_provider_api_methods(self) -> None: + """ + Test provider API methods for VM operations and info retrieval. + """ + with fixtures.patched_provider() as provider: + api = typing.cast(mock.MagicMock, provider.api) + self.assertEqual(provider.test_connection(), True) + api.test.assert_called_once_with() + self.assertEqual(provider.api.list_vms(), fixtures.VMS) + self.assertEqual(provider.api.get_vm_info('vm-1'), fixtures.VMS[0]) + self.assertEqual(provider.api.get_vm_instance_info('vm-1'), fixtures.VM_INSTANCES[0]) + self.assertTrue(provider.api.start_vm_instance('vm-1')) + self.assertTrue(provider.api.stop_vm_instance('vm-1')) + self.assertTrue(provider.api.delete_vm_instance('vm-1')) + + # --- Name Sanitization --- + def test_sanitized_name(self) -> None: + """ + Test name sanitization utility for various input cases. + """ + provider = fixtures.create_provider() + test_cases = [ + ('Test-VM-1', 'test-vm-1'), + ('Test_VM@2', 'test-vm-2'), + ('My Test VM!!!', 'my-test-vm'), + ('Test !!! this is', 'test-this-is'), + ('UDS-Pub-Hello World!!--2025065122-v1', 'uds-pub-hello-world-2025065122-v1'), + ('a' * 100, 'a' * 63), # Test truncation + ] + for input_name, expected in test_cases: + self.assertEqual(provider.sanitized_name(input_name), expected) \ No newline at end of file diff --git a/server/tests/services/openshift/test_publication.py b/server/tests/services/openshift/test_publication.py new file mode 100644 index 000000000..f68c444f5 --- /dev/null +++ b/server/tests/services/openshift/test_publication.py @@ -0,0 +1,171 @@ +# -*- coding: utf-8 -*- + +# +# Copyright (c) 2024 Virtual Cable S.L.U. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without modification, +# are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors +# may be used to endorse or promote products derived from this software +# without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +""" +Author: Adolfo Gómez, dkmaster at dkmon dot com +""" + +import typing +from unittest import mock + +from uds.core import types +from tests.services.openshift import fixtures +from tests.utils.test import UDSTransactionTestCase + + +class TestOpenshiftPublication(UDSTransactionTestCase): + def setUp(self) -> None: + super().setUp() + fixtures.clear() + + def test_op_create_and_checker(self) -> None: + """ + Test op_create and op_create_checker flow + """ + with fixtures.patched_provider() as provider: + api = typing.cast(mock.MagicMock, provider.api) + service = fixtures.create_service(provider=provider) + publication = fixtures.create_publication(service=service) + + api.get_vm_pvc_or_dv_name.return_value = ('test-pvc', 'pvc') + api.get_pvc_size.return_value = '10Gi' + api.create_vm_from_pvc.return_value = True + api.wait_for_datavolume_clone_progress.return_value = True + api.get_vm_info.return_value = None + + publication.op_create() + api.get_vm_info.return_value = None + state = publication.op_create_checker() + self.assertEqual(state, types.states.TaskState.RUNNING) + + def get_vm_info_side_effect(name: str) -> mock.Mock | None: + return mock.Mock(status=mock.Mock()) if name == publication._name else None + + api.get_vm_info.side_effect = get_vm_info_side_effect + state = publication.op_create_checker() + self.assertEqual(state, types.states.TaskState.FINISHED) + + def test_op_create_completed_and_checker(self) -> None: + """ + Test op_create_completed and op_create_completed_checker flow + """ + with fixtures.patched_provider() as provider: + api = typing.cast(mock.MagicMock, provider.api) + service = fixtures.create_service(provider=provider) + publication = fixtures.create_publication(service=service) + + # VM running + running_status = mock.Mock() + running_status.is_running.return_value = True + running_vm = mock.Mock(status=running_status) + + def get_vm_info_side_effect(name: str, **kwargs: dict[str, typing.Any]) -> mock.Mock | None: + return running_vm if name == 'test-vm' else None + + api.get_vm_info.side_effect = get_vm_info_side_effect + publication._name = 'test-vm' + publication.op_create_completed() + api.stop_vm_instance.assert_called_with('test-vm') + + # VM stopped + stopped_status = mock.Mock() + stopped_status.is_running.return_value = False + stopped_vm = mock.Mock(status=stopped_status) + + api.get_vm_info.side_effect = None + api.get_vm_info.return_value = stopped_vm + api.stop_vm_instance.reset_mock() + publication.op_create_completed() + api.stop_vm_instance.assert_not_called() + + # Checker: VM not found + api.get_vm_info.return_value = None + state = publication.op_create_completed_checker() + self.assertEqual(state, types.states.TaskState.FINISHED) + + # Checker: VM stopped + api.get_vm_info.return_value = stopped_vm + state = publication.op_create_completed_checker() + self.assertEqual(state, types.states.TaskState.FINISHED) + + # Checker: VM running + api.get_vm_info.return_value = running_vm + state = publication.op_create_completed_checker() + self.assertEqual(state, types.states.TaskState.RUNNING) + + def test_publication_create(self) -> None: + """ + Test publication creation (publish) + """ + with fixtures.patched_provider() as provider: + api = typing.cast(mock.MagicMock, provider.api) + service = fixtures.create_service(provider=provider) + publication = fixtures.create_publication(service=service) + + api.get_vm_pvc_or_dv_name.return_value = ('test-pvc', 'pvc') + api.get_pvc_size.return_value = '10Gi' + api.create_vm_from_pvc.return_value = True + api.wait_for_datavolume_clone_progress.return_value = True + + call_count = {"count": 0} + def vm_info_side_effect(*args: typing.Any, **kwargs: typing.Any) -> typing.Any: + if call_count["count"] < 2: + call_count["count"] += 1 + return fixtures.VMS[0] + + ready_vm = mock.Mock() + ready_vm.status = mock.Mock() + ready_vm.name = publication._name + return ready_vm + api.get_vm_info.side_effect = vm_info_side_effect + + state = publication.publish() + self.assertEqual(state, types.states.State.RUNNING) + + state = publication.check_state() + api.get_vm_pvc_or_dv_name.assert_called() + api.get_pvc_size.assert_called() + api.create_vm_from_pvc.assert_called() + + for _ in range(10): + state = publication.check_state() + if state == types.states.TaskState.FINISHED: + break + self.assertEqual(state, types.states.TaskState.RUNNING) + self.assertEqual(publication.get_template_id(), publication._name) + + def test_get_template_id(self) -> None: + """ + Test template ID retrieval (get_template_id) + """ + service = fixtures.create_service() + publication = fixtures.create_publication(service=service) + publication._name = 'test-template' + template_id = publication.get_template_id() + self.assertEqual(template_id, 'test-template') diff --git a/server/tests/services/openshift/test_serialization_deployment.py b/server/tests/services/openshift/test_serialization_deployment.py new file mode 100644 index 000000000..e7f11efcf --- /dev/null +++ b/server/tests/services/openshift/test_serialization_deployment.py @@ -0,0 +1,64 @@ +import pickle + +from tests.services.openshift import fixtures +from tests.utils.test import UDSTransactionTestCase + + +class TestOpenshiftDeploymentSerialization(UDSTransactionTestCase): + def setUp(self) -> None: + """ + Set up test environment and clear fixtures before each test. + """ + super().setUp() + fixtures.clear() + + def _make_userservice(self): + """ + Helper to create a userservice with all fields set for serialization tests. + """ + userservice = fixtures.create_userservice() + userservice._name = 'test-vm' + userservice._ip = '192.168.1.100' + userservice._mac = '00:11:22:33:44:55' + userservice._vmid = 'test-vm-id' + userservice._reason = 'test-reason' + userservice._waiting_name = True + return userservice + + # --- Serialization Tests --- + def test_userservice_serialization(self) -> None: + """ + Test that userservice object is correctly serialized and deserialized with all fields preserved. + """ + userservice = self._make_userservice() + data = pickle.dumps(userservice) + userservice2 = pickle.loads(data) + + self.assertEqual(userservice2._name, 'test-vm') + self.assertEqual(userservice2._ip, '192.168.1.100') + self.assertEqual(userservice2._mac, '00:11:22:33:44:55') + self.assertEqual(userservice2._vmid, 'test-vm-id') + self.assertEqual(userservice2._reason, 'test-reason') + self.assertTrue(userservice2._waiting_name) + + def test_userservice_methods_after_serialization(self) -> None: + """ + Test that userservice methods return correct values after serialization and deserialization. + """ + userservice = self._make_userservice() + data = pickle.dumps(userservice) + userservice2 = pickle.loads(data) + + self.assertEqual(userservice2.get_name(), 'test-vm') + self.assertEqual(userservice2.get_ip(), '192.168.1.100') + self.assertEqual(userservice2._mac, '00:11:22:33:44:55') + + # --- Field Presence Tests --- + def test_autoserializable_fields(self) -> None: + """ + Test that all expected autoserializable fields are present in userservice object. + """ + userservice = self._make_userservice() + expected = ['_name', '_ip', '_mac', '_vmid', '_reason', '_waiting_name'] + for field in expected: + self.assertTrue(hasattr(userservice, field), f"Missing field: {field}") \ No newline at end of file diff --git a/server/tests/services/openshift/test_serialization_deployment_fixed.py b/server/tests/services/openshift/test_serialization_deployment_fixed.py new file mode 100644 index 000000000..8627f43ec --- /dev/null +++ b/server/tests/services/openshift/test_serialization_deployment_fixed.py @@ -0,0 +1,24 @@ +import pickle +from tests.services.openshift import fixtures +from tests.utils.test import UDSTransactionTestCase + +class TestOpenshiftUserServiceFixed(UDSTransactionTestCase): + def setUp(self) -> None: + """ + Set up test environment and clear fixtures before each test. + """ + super().setUp() + fixtures.clear() + + # --- Serialization Tests --- + def test_userservice_fixed_serialization(self) -> None: + """ + Test that userservice_fixed object is correctly serialized and deserialized with all fields preserved. + """ + userservice = fixtures.create_userservice_fixed() + userservice._name = 'fixed-vm' + userservice._reason = 'fixed-reason' + data = pickle.dumps(userservice) + userservice2 = pickle.loads(data) + self.assertEqual(userservice2._name, 'fixed-vm') + self.assertEqual(userservice2._reason, 'fixed-reason') \ No newline at end of file diff --git a/server/tests/services/openshift/test_serialization_provider.py b/server/tests/services/openshift/test_serialization_provider.py new file mode 100644 index 000000000..f833767c1 --- /dev/null +++ b/server/tests/services/openshift/test_serialization_provider.py @@ -0,0 +1,83 @@ +# -*- coding: utf-8 -*- + +# +# Copyright (c) 2024 Virtual Cable S.L.U. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without modification, +# are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors +# may be used to endorse or promote products derived from this software +# without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +""" +Author: Adolfo Gómez, dkmaster at dkmon dot com +""" +from tests.services.openshift import fixtures +from tests.utils.test import UDSTransactionTestCase +from uds.services.OpenShift.provider import OpenshiftProvider + +PROVIDER_SERIALIZE_DATA = ( + '{' + '"cluster_url": "https://oauth-openshift.apps-crc.testing", ' + '"api_url": "https://api.crc.testing:6443", ' + '"username": "kubeadmin", ' + '"password": "test-password", ' + '"namespace": "default", ' + '"verify_ssl": false, ' + '"concurrent_creation_limit": 1, ' + '"concurrent_removal_limit": 1, ' + '"timeout": 10' + '}' +) + +class TestOpenshiftProviderSerialization(UDSTransactionTestCase): + # --- Serialization Tests --- + def test_provider_methods_after_serialization(self) -> None: + """ + Test that provider methods return correct values after serialization and deserialization. + """ + from uds.core import environment + + provider = fixtures.create_provider() + data = provider.serialize() + + provider2 = OpenshiftProvider(environment=environment.Environment.testing_environment()) + provider2.deserialize(data) + + self.assertEqual(str(provider2.type_name), 'Openshift Provider') + self.assertEqual(str(provider2.type_description), 'Openshift based VMs provider') + self.assertEqual(provider2.cluster_url.value, fixtures.PROVIDER_VALUES_DICT['cluster_url']) + self.assertEqual(provider2.api_url.value, fixtures.PROVIDER_VALUES_DICT['api_url']) + + def test_provider_serialization(self) -> None: + """ + Test that all provider fields are correctly serialized and deserialized. + """ + from uds.core import environment + + provider = fixtures.create_provider() + data = provider.serialize() + + provider2 = OpenshiftProvider(environment=environment.Environment.testing_environment()) + provider2.deserialize(data) + + for field in fixtures.PROVIDER_VALUES_DICT: + self.assertEqual(getattr(provider2, field).value, fixtures.PROVIDER_VALUES_DICT[field]) diff --git a/server/tests/services/openshift/test_serialization_publication.py b/server/tests/services/openshift/test_serialization_publication.py new file mode 100644 index 000000000..7f13bc175 --- /dev/null +++ b/server/tests/services/openshift/test_serialization_publication.py @@ -0,0 +1,105 @@ +# -*- coding: utf-8 -*- + +# +# Copyright (c) 2024 Virtual Cable S.L.U. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without modification, +# are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors +# may be used to endorse or promote products derived from this software +# without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +""" +Author: Adolfo Gómez, dkmaster at dkmon dot com +""" +import pickle + +from tests.services.openshift import fixtures + +from tests.utils.test import UDSTransactionTestCase + + +class TestOpenshiftPublicationSerialization(UDSTransactionTestCase): + EXPECTED_FIELDS = {'_name', '_waiting_name', '_reason', '_queue', '_vmid', '_is_flagged_for_destroy'} + + def setUp(self) -> None: + """ + Set up test environment and clear fixtures before each test. + """ + super().setUp() + fixtures.clear() + + def _make_publication(self): + """ + Helper to create a publication with all fields set for serialization tests. + """ + publication = fixtures.create_publication() + publication._name = 'test-template' + publication._reason = 'test-reason' + publication._waiting_name = True + return publication + + # --- Field Check Helper --- + def check_fields(self, instance: 'fixtures.publication.OpenshiftTemplatePublication') -> None: + """ + Helper to check expected field values in a publication instance. + """ + self.assertEqual(instance._name, 'test-template') + self.assertEqual(instance._reason, 'test-reason') + self.assertTrue(instance._waiting_name) + + # --- Serialization Tests --- + def test_autoserialization_fields(self) -> None: + """ + Test that autoserializable fields match the expected set. + """ + publication = fixtures.create_publication() + fields = set(f[0] for f in publication._autoserializable_fields()) + self.assertSetEqual(fields, self.EXPECTED_FIELDS) + + def test_pickle_serialization(self) -> None: + """ + Test that publication object is correctly serialized and deserialized using pickle. + """ + publication = self._make_publication() + data = pickle.dumps(publication) + publication2 = pickle.loads(data) + self.check_fields(publication2) + + def test_marshal_unmarshal(self) -> None: + """ + Test that publication object is correctly marshaled and unmarshaled. + """ + publication = self._make_publication() + marshaled = publication.marshal() + publication2 = fixtures.create_publication() + publication2.unmarshal(marshaled) + self.check_fields(publication2) + + def test_methods_after_serialization(self) -> None: + """ + Test that publication methods return correct values after serialization and deserialization. + """ + publication = self._make_publication() + data = pickle.dumps(publication) + publication2 = pickle.loads(data) + self.assertEqual(publication2._name, 'test-template') + self.assertEqual(publication2.get_template_id(), 'test-template') \ No newline at end of file diff --git a/server/tests/services/openshift/test_service.py b/server/tests/services/openshift/test_service.py new file mode 100644 index 000000000..b3d3cdc98 --- /dev/null +++ b/server/tests/services/openshift/test_service.py @@ -0,0 +1,190 @@ +# -*- coding: utf-8 -*- +""" +Unit tests for OpenshiftService logic. +All tests use fixtures for setup and mock dependencies. +Tests are grouped by functionality: configuration, utility methods, availability, VM operations, exception handling, and deletion. +""" + +# +# Copyright (c) 2024 Virtual Cable S.L.U. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without modification, +# are permitted provided that the following conditions are met: + +""" +Author: Adolfo Gómez, dkmaster at dkmon dot com +Reorganizado y corregido por GitHub Copilot +""" + +import typing +from unittest import mock +from uds.services.OpenShift.openshift import exceptions as morph_exceptions + +from tests.services.openshift import fixtures +from tests.utils.test import UDSTransactionTestCase + + +class TestOpenshiftService(UDSTransactionTestCase): + def setUp(self) -> None: + super().setUp() + fixtures.clear() + + def _create_service_with_provider(self): + """ + Helper to create a service with a patched provider. + """ + provider_ctx = fixtures.patched_provider() + provider = provider_ctx.__enter__() + service = fixtures.create_service(provider=provider) + return service, provider, provider_ctx + + # --- Configuration and initial data --- + def test_service_data(self) -> None: + """ + Check initial service data values. + """ + service = fixtures.create_service() + self.assertEqual(service.template.value, fixtures.SERVICE_VALUES_DICT['template']) + self.assertEqual(service.basename.value, fixtures.SERVICE_VALUES_DICT['basename']) + self.assertEqual(service.lenname.value, fixtures.SERVICE_VALUES_DICT['lenname']) + self.assertEqual(service.publication_timeout.value, fixtures.SERVICE_VALUES_DICT['publication_timeout']) + + def test_initialize_sets_basename(self) -> None: + """ + Check that initialize sets basename and lenname correctly. + """ + service = fixtures.create_service() + service.basename.value = 'testname' + service.lenname.value = 6 + service.initialize({'basename': 'testname', 'lenname': 6}) + self.assertEqual(service.basename.value, 'testname') + self.assertEqual(service.lenname.value, 6) + + def test_init_gui_sets_choices(self) -> None: + """ + Check that init_gui sets template choices. + """ + service, _, provider_ctx = self._create_service_with_provider() + with mock.patch.object(service.template, 'set_choices') as set_choices_mock: + service.init_gui() + set_choices_mock.assert_called() + provider_ctx.__exit__(None, None, None) + + # --- Utility and accessor methods --- + def test_provider_returns_correct_type(self) -> None: + """ + Check that provider() returns the correct provider instance. + """ + service, provider, provider_ctx = self._create_service_with_provider() + self.assertEqual(service.provider(), provider) + provider_ctx.__exit__(None, None, None) + + def test_api_property_caching(self) -> None: + """ + Check that the api property is cached. + """ + service, _, provider_ctx = self._create_service_with_provider() + api1 = service.api + api2 = service.api + self.assertIs(api1, api2) + provider_ctx.__exit__(None, None, None) + + def test_service_methods(self) -> None: + """ + Check utility methods of the service. + """ + service, _, provider_ctx = self._create_service_with_provider() + self.assertEqual(service.get_basename(), service.basename.value) + self.assertEqual(service.get_lenname(), service.lenname.value) + self.assertEqual(service.sanitized_name('Test VM 1'), 'test-vm-1') + duplicates = list(service.find_duplicates('vm-1', '00:11:22:33:44:55')) + self.assertEqual(len(duplicates), 1) + provider_ctx.__exit__(None, None, None) + + # --- Availability and cache --- + def test_service_is_available(self) -> None: + """ + Check service availability and cache handling. + """ + service, provider, provider_ctx = self._create_service_with_provider() + api = typing.cast(mock.MagicMock, provider.api) + self.assertTrue(service.is_available()) + api.test.assert_called_with() + api.test.return_value = False + self.assertTrue(service.is_available()) + service.provider().is_available.cache_clear() # type: ignore + self.assertFalse(service.is_available()) + api.test.assert_called_with() + provider_ctx.__exit__(None, None, None) + + # --- VM operations --- + def test_vm_operations(self) -> None: + """ + Check VM operations: get_ip, get_mac, is_running, start, stop, shutdown. + """ + service, _, provider_ctx = self._create_service_with_provider() + api = typing.cast(mock.MagicMock, service.api) + ip = service.get_ip(None, 'vm-1') + self.assertEqual(ip, '192.168.1.1') + mac = service.get_mac(None, 'vm-1') + self.assertEqual(mac, '00:11:22:33:44:01') + self.assertTrue(service.is_running(None, 'vm-1')) + service.start(None, 'vm-1') + api.start_vm_instance.assert_called_with('vm-1') + service.stop(None, 'vm-1') + api.stop_vm_instance.assert_called_with('vm-1') + service.shutdown(None, 'vm-1') + api.stop_vm_instance.assert_called_with('vm-1') + provider_ctx.__exit__(None, None, None) + + # --- Exception handling --- + def test_get_ip_raises_exception_if_no_interfaces(self) -> None: + """ + Check that get_ip raises an exception if there are no interfaces. + """ + service, _, provider_ctx = self._create_service_with_provider() + def no_interfaces(_vmid: str): + mock_vm = mock.Mock() + mock_vm.interfaces = [] + return mock_vm + with mock.patch.object(service.api, 'get_vm_instance_info', side_effect=no_interfaces): + with self.assertRaises(Exception): + service.get_ip(None, 'vm-1') + provider_ctx.__exit__(None, None, None) + + def test_get_mac_raises_exception_if_no_interfaces(self) -> None: + """ + Check that get_mac raises an exception if there are no interfaces. + """ + service, _, provider_ctx = self._create_service_with_provider() + def no_interfaces(_vmid: str): + mock_vm = mock.Mock() + mock_vm.interfaces = [] + return mock_vm + with mock.patch.object(service.api, 'get_vm_instance_info', side_effect=no_interfaces): + with self.assertRaises(Exception): + service.get_mac(None, 'vm-1') + provider_ctx.__exit__(None, None, None) + + # --- VM deletion --- + def test_vm_deletion(self) -> None: + """ + Check VM deletion logic and is_deleted method. + """ + service, provider, provider_ctx = self._create_service_with_provider() + api = typing.cast(mock.MagicMock, provider.api) + + # Execute deletion + service.execute_delete('vm-1') + api.delete_vm_instance.assert_called_with('vm-1') + + # Check if deleted + api.get_vm_info.side_effect = morph_exceptions.OpenshiftNotFoundError('not found') + self.assertTrue(service.is_deleted('vm-1')) + + # Simulate VM exists + api.get_vm_info.side_effect = None + api.get_vm_info.return_value = fixtures.VMS[0] + self.assertFalse(service.is_deleted('vm-1')) + provider_ctx.__exit__(None, None, None) \ No newline at end of file diff --git a/server/tests/services/openshift/test_service_fixed.py b/server/tests/services/openshift/test_service_fixed.py new file mode 100644 index 000000000..6f10354c0 --- /dev/null +++ b/server/tests/services/openshift/test_service_fixed.py @@ -0,0 +1,122 @@ +# -*- coding: utf-8 -*- + +# +# Copyright (c) 2024 Virtual Cable S.L.U. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without modification, +# are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors +# may be used to endorse or promote products derived from this software +# without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +""" +Author: Adolfo Gómez, dkmaster at dkmon dot com +""" +import typing +from unittest import mock + +from tests.services.openshift import fixtures + +from tests.utils.test import UDSTransactionTestCase + + + +class TestOpenshiftServiceFixed(UDSTransactionTestCase): + def _create_service_fixed_with_provider(self): + """ + Helper to create a fixed service with a patched provider. + """ + provider_ctx = fixtures.patched_provider() + provider = provider_ctx.__enter__() + service = fixtures.create_service_fixed(provider=provider) + return service, provider, provider_ctx + def setUp(self) -> None: + super().setUp() + fixtures.clear() + + # --- Availability --- + def test_service_is_available(self) -> None: + """ + Test provider availability and cache logic. + """ + service, provider, provider_ctx = self._create_service_fixed_with_provider() + api = typing.cast(mock.MagicMock, provider.api) + self.assertTrue(service.is_available()) + api.test.assert_called_with() + # With cached data, even if test fails, it will return True + api.test.return_value = False + self.assertTrue(service.is_available()) + # Clear cache and test again + service.provider().is_available.cache_clear() # type: ignore + self.assertFalse(service.is_available()) + api.test.assert_called_with() + provider_ctx.__exit__(None, None, None) + + # --- Service methods --- + def test_service_methods(self) -> None: + """ + Test service methods: enumerate_assignables, get_name, get_ip, get_mac, sanitized_name. + """ + service, _, provider_ctx = self._create_service_fixed_with_provider() + # Enumerate assignables + machines = list(service.enumerate_assignables()) + self.assertEqual(len(machines), 3) + self.assertEqual(machines[0].id, 'vm-3') + self.assertEqual(machines[1].id, 'vm-4') + self.assertEqual(machines[2].id, 'vm-5') + # Get machine name + machine_name = service.get_name('uid-3') + self.assertEqual(machine_name, 'vm-3') + # Get IP + ip = service.get_ip('uid-3') + self.assertTrue(ip.startswith('192.168.1.')) + # Get MAC + mac = service.get_mac('uid-3') + self.assertTrue(mac.startswith('00:11:22:33:44:')) + # Sanitized name + sanitized = service.sanitized_name('Test VM 1') + self.assertIsInstance(sanitized, str) + provider_ctx.__exit__(None, None, None) + + # --- Assignment logic --- + def test_get_and_assign(self) -> None: + """ + Test get_and_assign logic for fixed service. + """ + service, _, provider_ctx = self._create_service_fixed_with_provider() + vmid = service.get_and_assign() + self.assertIn(vmid, ['vm-3', 'vm-4', 'vm-5']) + # Should not assign the same again + with service._assigned_access() as assigned: + self.assertIn(vmid, assigned) + provider_ctx.__exit__(None, None, None) + + def test_remove_and_free(self) -> None: + """ + Test remove_and_free logic for fixed service. + """ + service, _, provider_ctx = self._create_service_fixed_with_provider() + vmid = service.get_and_assign() + result = service.remove_and_free(vmid) + self.assertEqual(result.name, 'FINISHED') + with service._assigned_access() as assigned: + self.assertNotIn(vmid, assigned) + provider_ctx.__exit__(None, None, None)