diff --git a/apiclient/harvester_api/managers/internals.py b/apiclient/harvester_api/managers/internals.py index 1261769de..548b7bed2 100644 --- a/apiclient/harvester_api/managers/internals.py +++ b/apiclient/harvester_api/managers/internals.py @@ -41,9 +41,11 @@ class UpgradeManager(BaseManager): CREATE_PATH = "v1/harvester/harvesterhci.io.upgrades" API_PATH_fmt = "v1/harvester/harvesterhci.io.upgrades/{namespace}{name}" - def create_data(self, version_name, namespace=DEFAULT_HARVESTER_NAMESPACE): + def create_data(self, version_name, namespace=DEFAULT_HARVESTER_NAMESPACE, annotations=None): + annotations = annotations or dict() data = { "type": "harvesterhci.io.upgrade", + "annotations": annotations, "metadata": { "generateName": "hvst-upgrade-", "namespace": namespace @@ -54,8 +56,10 @@ def create_data(self, version_name, namespace=DEFAULT_HARVESTER_NAMESPACE): } return data - def create(self, version_name, namespace=DEFAULT_HARVESTER_NAMESPACE, *, raw=False): - data = self.create_data(version_name) + def create( + self, version_name, namespace=DEFAULT_HARVESTER_NAMESPACE, *, raw=False, annotations=None + ): + data = self.create_data(version_name, annotations=annotations) path = self.API_PATH_fmt.format(name="", namespace=namespace) return self._create(path, json=data, raw=raw) diff --git a/apiclient/harvester_api/managers/storageclasses.py b/apiclient/harvester_api/managers/storageclasses.py index 6fd7d4a94..6b10f71c7 100644 --- a/apiclient/harvester_api/managers/storageclasses.py +++ b/apiclient/harvester_api/managers/storageclasses.py @@ -14,6 +14,14 @@ def get(self, name="", *, raw=False, **kwargs): path = self.PATH_fmt.format(SC_API=self.API_VERSION, name=name) return self._get(path, raw=raw, **kwargs) + def get_default(self): + code, data = self.get() + for sc in data['items']: + if 'true' == sc['metadata']['annotations'].get(DEFAULT_STORAGE_CLASS_ANNOTATION): + return code, sc + else: + return code, data + def create_data(self, name, replicas): data = { "type": f"{self.API_VERSION}", diff --git a/harvester_e2e_tests/fixtures/api_client.py b/harvester_e2e_tests/fixtures/api_client.py index 6994e0fe9..9437e227e 100644 --- a/harvester_e2e_tests/fixtures/api_client.py +++ b/harvester_e2e_tests/fixtures/api_client.py @@ -60,6 +60,11 @@ def sleep_timeout(request): return request.config.getoption("--sleep-timeout", 4) +@pytest.fixture(scope="session") +def upgrade_timeout(request): + return request.config.getoption('--upgrade-wait-timeout') or 7200 + + @pytest.fixture(scope="session") def rancher_wait_timeout(request): return request.config.getoption("--rancher-cluster-wait-timeout", 1800) diff --git a/harvester_e2e_tests/integrations/test_upgrade.py b/harvester_e2e_tests/integrations/test_upgrade.py index 75ea54061..428272231 100644 --- a/harvester_e2e_tests/integrations/test_upgrade.py +++ b/harvester_e2e_tests/integrations/test_upgrade.py @@ -1,457 +1,78 @@ -import json -import os import re +import json import yaml import socket from time import sleep +from operator import add +from functools import reduce from datetime import datetime, timedelta import pytest - +from paramiko.ssh_exception import SSHException, NoValidConnectionsError from harvester_api.managers import DEFAULT_HARVESTER_NAMESPACE, DEFAULT_LONGHORN_NAMESPACE -from paramiko.ssh_exception import ChannelException, AuthenticationException, \ - NoValidConnectionsError - -DEFAULT_STORAGE_CLS = "harvester-longhorn" - -DEFAULT_USER = "ubuntu" -DEFAULT_PASSWORD = "root" - -NETWORK_VLAN_ID_LABEL = "network.harvesterhci.io/vlan-id" -UPGRADE_STATE_LABEL = "harvesterhci.io/upgradeState" -CONTROL_PLANE_LABEL = "node-role.kubernetes.io/control-plane" -NODE_INTERNAL_IP_ANNOTATION = "rke2.io/internal-ip" - -LOGGING_NAMESPACE = "cattle-logging-system" - -expected_harvester_crds = { - "addons.harvesterhci.io": False, - "blockdevices.harvesterhci.io": False, - "keypairs.harvesterhci.io": False, - "preferences.harvesterhci.io": False, - "settings.harvesterhci.io": False, - "supportbundles.harvesterhci.io": False, - "upgrades.harvesterhci.io": False, - "versions.harvesterhci.io": False, - "virtualmachinebackups.harvesterhci.io": False, - "virtualmachineimages.harvesterhci.io": False, - "virtualmachinerestores.harvesterhci.io": False, - "virtualmachinetemplates.harvesterhci.io": False, - "virtualmachinetemplateversions.harvesterhci.io": False, - - "clusternetworks.network.harvesterhci.io": False, - "linkmonitors.network.harvesterhci.io": False, - "nodenetworks.network.harvesterhci.io": False, - "vlanconfigs.network.harvesterhci.io": False, - "vlanstatuses.network.harvesterhci.io": False, - - "ksmtuneds.node.harvesterhci.io": False, - - "loadbalancers.loadbalancer.harvesterhci.io": False, -} pytest_plugins = [ "harvester_e2e_tests.fixtures.api_client", "harvester_e2e_tests.fixtures.virtualmachines" ] -minio_manifest_fmt = """ -cat < datetime.now(): - code, data = api_client.images.get(name) - if 100 == data.get('status', {}).get('progress', 0): - break - sleep(3) - else: - raise AssertionError( - "Failed to create Image with error:\n" - f"Status({code}): {data}" - ) - - return image_data - - -def _get_ip_from_vmi(vmi): - assert _check_vm_ip_assigned(vmi), "virtual machine does not have ip assigned" - return vmi['status']['interfaces'][0]['ipAddress'] - - -def _check_vm_is_running(vmi): - return (vmi.get('status', {}).get('phase') == 'Running' and - vmi.get('status', {}).get('nodeName') != "") - - -def _check_vm_ip_assigned(vmi): - return (len(vmi.get('status', {}).get('interfaces')) > 0 and - vmi.get('status').get('interfaces')[0].get('ipAddress') is not None) - - -def _check_assigned_ip_func(api_client, vm_name): - def _check_assigned_ip(): - code, data = api_client.vms.get_status(vm_name) - if code != 200: - return False - return _check_vm_is_running(data) and _check_vm_ip_assigned(data) - return _check_assigned_ip - - -def _wait_for_vm_ready(api_client, vm_name, timeout=300): - endtime = datetime.now() + timedelta(seconds=timeout) - while endtime > datetime.now(): - if _check_assigned_ip_func(api_client, vm_name)(): - break - sleep(5) - else: - raise AssertionError("Time out while waiting for vm to be created") - - -def _wait_for_write_data(vm_shell, ip, ssh_user="ubuntu", timeout=300): - endtime = datetime.now() + timedelta(seconds=timeout) - - script = "head /dev/urandom | md5sum | head -c 20 > ~/generate_str; sync" - while endtime > datetime.now(): - try: - with vm_shell.login(ip, ssh_user, password=DEFAULT_PASSWORD) as sh: - stdout, stderr = sh.exec_command(script, get_pty=True) - assert not stderr, ( - f"Failed to execute {script} on {ip}: {stderr}") - return - except (ChannelException, AuthenticationException, NoValidConnectionsError, - socket.timeout): - continue - else: - raise AssertionError(f"Time out while waiting for execute the script: {script}") - - -def _get_data_from_vm(vm_shell, ip, ssh_user="ubuntu", timeout=300): - endtime = datetime.now() + timedelta(seconds=timeout) +UPGRADE_STATE_LABEL = "harvesterhci.io/upgradeState" +NODE_INTERNAL_IP_ANNOTATION = "rke2.io/internal-ip" - script = "cat ~/generate_str" - while endtime > datetime.now(): - try: - with vm_shell.login(ip, ssh_user, password=DEFAULT_PASSWORD) as sh: - stdout, stderr = sh.exec_command(script, get_pty=True) - assert not stderr, ( - f"Failed to execute {script} on {ip}: {stderr}") - return stdout - except (ChannelException, AuthenticationException, NoValidConnectionsError, - socket.timeout): - continue - else: - raise AssertionError(f"Time out while waiting for execute the script: {script}") +@pytest.fixture(scope="module") +def cluster_state(request, unique_name, api_client): + class ClusterState: + vm1 = None + vm2 = None + vm3 = None + pass -def _ping(vm_shell, vm_ip, target_ip, ssh_user="ubuntu", timeout=300): - endtime = datetime.now() + timedelta(seconds=timeout) + state = ClusterState() - script = f"ping -c1 {target_ip} > /dev/null && echo -n success || echo -n fail" - while endtime > datetime.now(): - try: - with vm_shell.login(vm_ip, ssh_user, password=DEFAULT_PASSWORD) as sh: - stdout, stderr = sh.exec_command(script, get_pty=True) - assert not stderr, ( - f"Failed to execute {script} on {vm_ip}: {stderr}") - if stdout == 'success': - return True - return False - except (ChannelException, AuthenticationException, NoValidConnectionsError, - socket.timeout): - continue + if request.config.getoption('--upgrade-target-version'): + state.version_verify = True + state.version = request.config.getoption('--upgrade-target-version') else: - raise AssertionError(f"Time out while waiting for execute the script: {script}") - - -def _create_basic_vm(api_client, cluster_state, vm_shell, vm_prefix, sc="", - timeout=300): - reused = False - vm_name = "" - code, data = api_client.vms.get() - if code == 200: - for v in data["data"]: - if vm_prefix in v['metadata']['name']: - reused = True - vm_name = v['metadata']['name'] - cluster_state.unique_id = cluster_state.unique_id[len(vm_prefix)+1:] - - if not reused: - vm_name = f'{vm_prefix}-{cluster_state.unique_id}' - - vmspec = api_client.vms.Spec(1, 1, mgmt_network=False) - - image_id = (f"{cluster_state.ubuntu_image['metadata']['namespace']}/" - f"{cluster_state.ubuntu_image['metadata']['name']}") - vmspec.add_image(cluster_state.ubuntu_image['metadata']['name'], image_id, size=5) - vmspec.add_volume("new-sc-disk", 5, storage_cls=sc) - - network_id = (f"{cluster_state.network['metadata']['namespace']}/" - f"{cluster_state.network['metadata']['name']}") - vmspec.add_network("vlan", network_id) - vmspec.user_data = f"""#cloud-config -chpasswd: - expire: false -package_update: true -packages: -- qemu-guest-agent -password: {DEFAULT_PASSWORD} -runcmd: -- - systemctl - - enable - - --now - - qemu-guest-agent.service -ssh_pwauth: true -write_files: - - owner: root:root - path: /etc/netplan/50-cloud-init.yaml - permissions: '0644' - content: | - network: - version: 2 - ethernets: - enp1s0: - dhcp4: yes - dhcp-identifier: mac -""" - - code, vm = api_client.vms.create(vm_name, vmspec) - assert code == 201, ( - "Failed to create vm1: %s" % (data)) - - _wait_for_vm_ready(api_client, vm_name, timeout=timeout) - - code, vmi = api_client.vms.get_status(vm_name) - if code != 200: - return None - - if not reused: - _wait_for_write_data(vm_shell, _get_ip_from_vmi(vmi), cluster_state.image_ssh_user, - timeout=timeout) - - return vmi - - -def _create_version(request, api_client, version="master"): - if version == "": - version = "master" - - isoURL = request.config.getoption('--upgrade-iso-url') - checksum = request.config.getoption('--upgrade-iso-checksum') - - return api_client.versions.create(version, isoURL, checksum) + state.version_verify = False + state.version = f"version-{unique_name}" + return state -def _get_all_nodes(api_client): - code, data = api_client.hosts.get() - assert code == 200, ( - f"Failed to get nodes: {code}, {data}") - return data['data'] - - -def _get_master_and_worker_nodes(api_client): - nodes = _get_all_nodes(api_client) - master_nodes = [] - worker_nodes = [] - for node in nodes: - if node['metadata']['labels'].get(CONTROL_PLANE_LABEL) == "true": - master_nodes.append(node) - else: - worker_nodes.append(node) - return master_nodes, worker_nodes - -def _is_installed_version(api_client, target_version): - code, data = api_client.upgrades.get() - if code == 200 and len(data['items']) > 0: - for upgrade in data['items']: - if upgrade.get('spec', {}).get('version') == target_version and \ - upgrade['metadata']['labels'].get(UPGRADE_STATE_LABEL) == 'Succeeded': - return True - return False +@pytest.fixture(scope="module") +def harvester_crds(): + return { + "addons.harvesterhci.io": False, + "blockdevices.harvesterhci.io": False, + "keypairs.harvesterhci.io": False, + "preferences.harvesterhci.io": False, + "settings.harvesterhci.io": False, + "supportbundles.harvesterhci.io": False, + "upgrades.harvesterhci.io": False, + "versions.harvesterhci.io": False, + "virtualmachinebackups.harvesterhci.io": False, + "virtualmachineimages.harvesterhci.io": False, + "virtualmachinerestores.harvesterhci.io": False, + "virtualmachinetemplates.harvesterhci.io": False, + "virtualmachinetemplateversions.harvesterhci.io": False, + + "clusternetworks.network.harvesterhci.io": False, + "linkmonitors.network.harvesterhci.io": False, + "nodenetworks.network.harvesterhci.io": False, + "vlanconfigs.network.harvesterhci.io": False, + "vlanstatuses.network.harvesterhci.io": False, + + "ksmtuneds.node.harvesterhci.io": False, + + "loadbalancers.loadbalancer.harvesterhci.io": False, + } -@pytest.fixture(scope="session") -def upgrade_target(request): +@pytest.fixture(scope="module") +def upgrade_target(request, unique_name): version = request.config.getoption('--upgrade-target-version') - assert version, "Target Version should not be empty" + version = version or f"upgrade-{unique_name}" iso_url = request.config.getoption('--upgrade-iso-url') assert iso_url, "Target ISO URL should not be empty" checksum = request.config.getoption("--upgrade-iso-checksum") @@ -460,11 +81,6 @@ def upgrade_target(request): return version, iso_url, checksum -@pytest.fixture(scope="session") -def upgrade_timeout(request): - return request.config.getoption('--upgrade-wait-timeout') or 7200 - - @pytest.fixture(scope="module") def image(api_client, image_ubuntu, unique_name, wait_timeout): unique_image_id = f'image-{unique_name}' @@ -492,455 +108,291 @@ def image(api_client, image_ubuntu, unique_name, wait_timeout): code, data = api_client.images.delete(unique_image_id) -@pytest.fixture -def stopped_vm(request, api_client, ssh_keypair, wait_timeout, unique_name, image): - unique_vm_name = f"{request.node.name.lstrip('test_').replace('_', '-')}-{unique_name}" - cpu, mem = 1, 2 - pub_key, pri_key = ssh_keypair - vm_spec = api_client.vms.Spec(cpu, mem) - vm_spec.add_image("disk-0", image['id']) - vm_spec.run_strategy = "Halted" +@pytest.fixture(scope='module') +def cluster_network(vlan_nic, api_client, unique_name): + code, data = api_client.clusternetworks.get_config() + assert 200 == code, (code, data) - userdata = yaml.safe_load(vm_spec.user_data) - userdata['ssh_authorized_keys'] = [pub_key] - vm_spec.user_data = yaml.dump(userdata) + node_key = 'network.harvesterhci.io/matched-nodes' + cnet_nodes = dict() # cluster_network: items + for cfg in data['items']: + if vlan_nic in cfg['spec']['uplink']['nics']: + nodes = json.loads(cfg['metadata']['annotations'][node_key]) + cnet_nodes.setdefault(cfg['spec']['clusterNetwork'], []).extend(nodes) - code, data = api_client.vms.create(unique_vm_name, vm_spec) + code, data = api_client.hosts.get() + assert 200 == code, (code, data) + all_nodes = set(n['id'] for n in data['data']) + try: + # vlad_nic configured on specific cluster network, reuse it + yield next(cnet for cnet, nodes in cnet_nodes.items() if all_nodes == set(nodes)) + return None + except StopIteration: + configured_nodes = reduce(add, cnet_nodes.values(), []) + if any(n in configured_nodes for n in all_nodes): + raise AssertionError( + "Not all nodes' VLAN NIC {vlan_nic} are available.\n" + f"VLAN NIC configured nodes: {configured_nodes}\n" + f"All nodes: {all_nodes}\n" + ) + + # Create cluster network + cnet = f"cnet-{datetime.strptime(unique_name, '%Hh%Mm%Ss%f-%m-%d').strftime('%H%M%S')}" + created = [] + code, data = api_client.clusternetworks.create(cnet) assert 201 == code, (code, data) - endtime = datetime.now() + timedelta(seconds=wait_timeout) - while endtime > datetime.now(): - code, data = api_client.vms.get(unique_vm_name) - if "Stopped" == data.get('status', {}).get('printableStatus'): - break - sleep(1) + while all_nodes: + node = all_nodes.pop() + code, data = api_client.clusternetworks.create_config(node, cnet, vlan_nic, hostname=node) + assert 201 == code, ( + f"Failed to create cluster config for {node}\n" + f"Created: {created}\t Remaining: {all_nodes}\n" + f"API Status({code}): {data}" + ) + created.append(node) - yield unique_vm_name, image['user'], pri_key + yield cnet - code, data = api_client.vms.get(unique_vm_name) - vm_spec = api_client.vms.Spec.from_dict(data) + # Teardown + deleted = {name: api_client.clusternetworks.delete_config(name) for name in created} + failed = [(name, code, data) for name, (code, data) in deleted.items() if 200 != code] + if failed: + fmt = "Unable to delete VLAN Config {} with error ({}): {}" + raise AssertionError( + "\n".join(fmt.format(name, code, data) for (name, code, data) in failed) + ) - api_client.vms.delete(unique_vm_name) + code, data = api_client.clusternetworks.delete(cnet) + assert 200 == code, (code, data) + + +@pytest.fixture(scope="module") +def vm_network(api_client, unique_name, wait_timeout, cluster_network, vlan_id, cluster_state): + code, data = api_client.networks.create( + unique_name, vlan_id, cluster_network=cluster_network + ) + assert 201 == code, (code, data) endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): - code, data = api_client.vms.get_status(unique_vm_name) - if 404 == code: - break + code, data = api_client.networks.get(unique_name) + annotations = data['metadata'].get('annotations', {}) + if 200 == code and annotations.get('network.harvesterhci.io/route'): + route = json.loads(annotations['network.harvesterhci.io/route']) + if route['cidr']: + break sleep(3) - - for vol in vm_spec.volumes: - vol_name = vol['volume']['persistentVolumeClaim']['claimName'] - api_client.volumes.delete(vol_name) - - -@pytest.fixture(scope="class") -def network(request, api_client, cluster_state): - code, data = api_client.networks.get() - assert code == 200, ( - "Failed to get networks: %s" % (data)) - - vlan_id = request.config.getoption('--vlan-id') or 1 - for network in data['items']: - if network['metadata']['labels'].get(NETWORK_VLAN_ID_LABEL) == f"{vlan_id}": - cluster_state.network = network - return - - raise AssertionError("Failed to find a routable vlan network") - - -@pytest.fixture(scope="class") -def unique_id(cluster_state, unique_name): - cluster_state.unique_id = unique_name - - -@pytest.fixture(scope="class") -def cluster_state(): - class ClusterState: - vm1 = None - vm2 = None - vm3 = None - pass - - return ClusterState() - - -@pytest.fixture(scope="class") -def cluster_prereq(request, cluster_state, prepare_dependence, unique_id, network, base_sc, - ubuntu_image, new_sc): - assert request.config.getoption('--upgrade-iso-url'), ( - "upgrade-iso-url must be not empty") - - assert request.config.getoption('--upgrade-iso-checksum'), ( - "upgrade-iso-checksum must be not empty") - - if request.config.getoption('--upgrade-target-version'): - cluster_state.version_verify = True - cluster_state.version = request.config.getoption('--upgrade-target-version') else: - cluster_state.version_verify = False - cluster_state.version = f"version-{cluster_state.unique_id}" - - -@pytest.fixture(scope='class') -def ubuntu_image(request, api_client, cluster_state, wait_timeout): - image_name = "focal-server-cloudimg-amd64" - - base_url = 'https://cloud-images.ubuntu.com/focal/current/' - - cache_url = request.config.getoption('--image-cache-url') - if cache_url: - base_url = cache_url - url = os.path.join(base_url, 'focal-server-cloudimg-amd64.img') - - image_json = _create_image(api_client, url, name=image_name, - wait_timeout=wait_timeout) - cluster_state.ubuntu_image = image_json - cluster_state.image_ssh_user = "ubuntu" - return image_json - - -def _vm1_backup(api_client, cluster_state, timeout=300): - code, backups = api_client.backups.get() - assert code in (200, 404), "Failed to get backups." - - backup_name = None - for backup in backups['data']: - if "vm1-backup" in backup['metadata']['name']: - backup_name = backup['metadata']['name'] - break - - if backup_name is None: - backup_name = f"vm1-backup-{cluster_state.unique_id}" - code, data = api_client.vms.backup(cluster_state.vm1['metadata']['name'], backup_name) - assert code == 204, ( - f"Failed to backup vm: {data}") - - def _wait_for_backup(): - nonlocal data - code, data = api_client.backups.get(backup_name) - assert code == 200, ( - f"Failed to get backup {backup_name}: {data}") + raise AssertionError( + "VM network created but route info not available\n" + f"API Status({code}): {data}" + ) - return data.get('status', {}).get('readyToUse', False) + cluster_state.network = data + yield dict(name=unique_name, cidr=route['cidr'], namespace=data['metadata']['namespace']) - endtime = datetime.now() + timedelta(seconds=timeout) + code, data = api_client.networks.delete(unique_name) + endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): - if _wait_for_backup(): + code, data = api_client.networks.get(unique_name) + if 404 == code: break - sleep(5) + sleep(3) else: - raise AssertionError("Time out while waiting for backup to be created") - - return data - - -@pytest.fixture(scope="class") -def base_sc(request, api_client, cluster_state): - code, data = api_client.scs.get() - assert code == 200, (f"Failed to get storage classes: {data}") - - for sc in data['items']: - if "base-sc" in sc['metadata']['name']: - cluster_state.base_sc = sc - return - - sc_name = f"base-sc-{cluster_state.unique_id}" - cluster_state.base_sc = _create_default_storage_class(request, api_client, sc_name) - + raise AssertionError( + f"Failed to remote VM network {unique_name} after {wait_timeout}s\n" + f"API Status({code}): {data}" + ) -@pytest.fixture(scope="class") -def new_sc(request, api_client, cluster_state): - code, data = api_client.scs.get() - assert code == 200, (f"Failed to get storage classes: {data}") - for sc in data['items']: - if "new-sc" in sc['metadata']['name']: - cluster_state.new_sc = sc - return +@pytest.fixture(scope="module") +def config_storageclass(request, api_client, unique_name, cluster_state): + replicas = request.config.getoption('--upgrade-sc-replicas') or 3 - sc_name = f"new-sc-{cluster_state.unique_id}" - cluster_state.new_sc = _create_default_storage_class(request, api_client, sc_name) + code, default_sc = api_client.scs.get_default() + assert 200 == code, (code, default_sc) + sc_name = f"new-sc-{replicas}-{unique_name}" + code, data = api_client.scs.create(sc_name, replicas) + assert 201 == code, (code, data) -def _create_default_storage_class(request, api_client, name): - replicas = request.config.getoption('--upgrade-sc-replicas') or 3 + code, data = api_client.scs.set_default(sc_name) + assert 200 == code, (code, data) - code, data = api_client.scs.get(name) - if code != 200: - code, data = api_client.scs.create(name, replicas) - assert code == 201, ( - f"Failed to create new storage class {name}: {data}") + cluster_state.scs = (default_sc, data) + yield default_sc, data - sc_data = data + code, data = api_client.scs.set_default(default_sc['metadata']['name']) + assert 200 == code, (code, data) - code, data = api_client.scs.set_default(name) - assert code == 200, ( - f"Failed to set default storage class {name}: {data}") - return sc_data +@pytest.fixture(scope="module") +def interceptor(api_client): + from inspect import getmembers, ismethod + class Interceptor: + _v121_vm = True -@pytest.fixture(scope='class') -def prepare_dependence(request, api_client, host_shell, wait_timeout): - predep = request.config.getoption('--upgrade-prepare-dependence') or False - if not predep: - return + def intercepts(self): + meths = getmembers(self, predicate=ismethod) + return [m for name, m in meths if name.startswith("intercept_")] - _prepare_network(request, api_client) + def check(self, data): + for func in self.intercepts(): + func(data) - _prepare_minio(api_client, host_shell) + def intercept_v121_vm(self, data): + if "v1.2.1" != api_client.cluster_version.raw: + return + if self._v121_vm: + code, data = api_client.vms.get() + for vm in data.get('data', []): + api_client.vms.stop(vm['metadata']['name']) + self._v121_vm = False + else: + conds = dict((c['type'], c) for c in data.get('status', {}).get('conditions', [])) + st = data.get('metadata', {}).get('labels', {}).get('harvesterhci.io/upgradeState') + if "Succeeded" == st and "True" == conds.get('Completed', {}).get('status'): + code, data = api_client.vms.get() + for vm in data.get('data', []): + api_client.vms.start(vm['metadata']['name']) + + return Interceptor() - _prepare_configuration(api_client, wait_timeout) +@pytest.fixture(scope="class") +def config_backup_target(request, api_client, wait_timeout): + # multiple fixtures from `vm_backup_restore` + conflict_retries = 5 + nfs_endpoint = request.config.getoption('--nfs-endpoint') + assert nfs_endpoint, f"NFS endpoint not configured: {nfs_endpoint}" + assert nfs_endpoint.startswith("nfs://"), ( + f"NFS endpoint should starts with `nfs://`, not {nfs_endpoint}" + ) + backup_type, config = ("NFS", dict(endpoint=nfs_endpoint)) -def _prepare_network(request, api_client): - code, data = api_client.networks.get() - assert code == 200, ( - "Failed to get networks: %s" % (data)) + code, data = api_client.settings.get('backup-target') + origin_spec = api_client.settings.BackupTargetSpec.from_dict(data) - vlan_id = request.config.getoption('--vlan-id') or 1 - for network in data['items']: - if network['metadata']['labels'].get(NETWORK_VLAN_ID_LABEL) == f"{vlan_id}": - break - else: - # create cluster network and network if not existing - vlan_nic = request.config.getoption('--vlan-nic') - assert vlan_nic is not None, "vlan nic is not configured" - - code, clusternetwork = api_client.clusternetworks.get(vlan_nic) - assert code in {200, 404}, "Failed to get cluster network" - if code == 404: - code, clusternetwork = api_client.clusternetworks.create(vlan_nic) - assert code == 201, "Failed to create cluster network" - - code, networkconfig = api_client.clusternetworks.get_config(vlan_nic) - assert code in {200, 404}, "Failed to get network config" - if code == 404: - code, networkconfig = api_client.clusternetworks.create_config(vlan_nic, - vlan_nic, - vlan_nic) - assert code == 201, "Failed to create network config" - - code, network = api_client.networks.create(f"vlan{vlan_id}", vlan_id, - cluster_network=vlan_nic) - assert code == 201, "Failed to create network config" - - -def _prepare_minio(api_client, host_shell): - masters, workers = _get_master_and_worker_nodes(api_client) - assert len(masters) > 0, "Failed to get nodes" - - node_name = "" - node_ip = "" - for node in masters: - if NODE_INTERNAL_IP_ANNOTATION in node["metadata"]["annotations"]: - node_name = node["metadata"]["name"] - node_ip = node["metadata"]["annotations"][NODE_INTERNAL_IP_ANNOTATION] + spec = getattr(api_client.settings.BackupTargetSpec, backup_type)(**config) + # ???: when switching S3 -> NFS, update backup-target will easily hit resource conflict + # so we would need retries to apply the change. + for _ in range(conflict_retries): + code, data = api_client.settings.update('backup-target', spec) + if 409 == code and "Conflict" == data['reason']: + sleep(3) + else: break else: - raise AssertionError("Failed to get node ip from all nodes") + raise AssertionError( + f"Unable to update backup-target after {conflict_retries} retried." + f"API Status({code}): {data}" + ) + assert 200 == code, ( + f'Failed to update backup target to {backup_type} with {config}\n' + f"API Status({code}): {data}" + ) - script = minio_manifest_fmt.format(node_name=node_name) + yield spec + + # remove unbound LH backupVolumes + code, data = api_client.lhbackupvolumes.get() + assert 200 == code, "Failed to list lhbackupvolumes" + + check_names = [] + for volume_data in data["items"]: + volume_name = volume_data["metadata"]["name"] + backup_name = volume_data["status"]["lastBackupName"] + if not backup_name: + api_client.lhbackupvolumes.delete(volume_name) + check_names.append(volume_name) - try: - with host_shell.login(node_ip) as shell: - out, err = shell.exec_command(script) - assert not err, f"Failed to create minio: f{err}" - except Exception as e: - raise AssertionError(f"Failed to execute command script: {e}") - - -def _prepare_configuration(api_client, wait_timeout): - code, ca = api_client.settings.update("additional-ca", {"value": minio_cert}) - assert code == 200, ( - f"Failed to update ca: ${ca}") - - value = { - "type": "s3", - "endpoint": "https://minio-service.default:9000", - "accessKeyId": "longhorn-test-access-key", - "secretAccessKey": "longhorn-test-secret-key", - "bucketName": "backupbucket", - "bucketRegion": "us-east-1", - "cert": "", - "virtualHostedStyle": False, - } endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): - code, backup_target = api_client.settings.update("backup-target", - {"value": json.dumps(value)}) - if code == 200: + for name in check_names[:]: + code, data = api_client.lhbackupvolumes.get(name) + if 404 == code: + check_names.remove(name) + if not check_names: break - sleep(10) + sleep(3) else: raise AssertionError( - "Failed to update backup-target:\n" - f"{backup_target}" - ) + f"Failed to delete unbound lhbackupvolumes: {check_names}\n" + f"Last API Status({code}): {data}" + ) + # restore to original backup-target and remove backups not belong to it + code, data = api_client.settings.update('backup-target', origin_spec) + code, data = api_client.backups.get() + assert 200 == code, "Failed to list backups" -@pytest.fixture(scope="class") -def vm_prereq(cluster_state, api_client, vm_shell, wait_timeout): - # create new storage class, make it default - # create 3 VMs: - # - having the new storage class - # - the VM that have some data written, take backup - # - the VM restored from the backup - cluster_state.vm1 = _create_basic_vm(api_client, - cluster_state, vm_shell, vm_prefix="vm1", - sc=cluster_state.base_sc['metadata']['name'], - timeout=wait_timeout) - cluster_state.backup = _vm1_backup(api_client, cluster_state, wait_timeout) - - code, vms = api_client.vms.get() - assert code in (200, 404), "Failed to get vms" - - vm2_name = None - for vm in vms['data']: - if "vm2" in vm['metadata']['name']: - vm2_name = vm['metadata']['name'] - - if vm2_name is None: - vm2_name = f"vm2-{cluster_state.unique_id}" - restore_spec = api_client.backups.RestoreSpec(True, vm_name=vm2_name) - code, data = api_client.backups.restore(cluster_state.backup['metadata']['name'], - restore_spec) - assert code == 201, ( - f"Failed to restore to vm2: {data}") + check_names = [] + for backup in data['data']: + endpoint = backup['status']['backupTarget'].get('endpoint') + if endpoint != origin_spec.value.get('endpoint'): + api_client.backups.delete(backup['metadata']['name']) + check_names.append(backup['metadata']['name']) endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): - if _check_assigned_ip_func(api_client, vm2_name)(): + for name in check_names[:]: + code, data = api_client.backups.get(name) + if 404 == code: + check_names.remove(name) + if not check_names: break - sleep(5) + sleep(3) else: - raise AssertionError("Time out while waiting for assigned ip for vm2") + raise AssertionError( + f"Failed to delete backups: {check_names}\n" + f"Last API Status({code}): {data}" + ) - # modify the hostname - code, data = api_client.vms.get(vm2_name) - vm_spec = api_client.vms.Spec.from_dict(data) - vm_spec.hostname = vm2_name - endtime = datetime.now() + timedelta(seconds=wait_timeout) - while endtime > datetime.now(): - code, data = api_client.vms.update(vm2_name, vm_spec) - if code == 200: - break - sleep(5) - else: - raise AssertionError("Time out while waiting for update hostname") - # restart the vm2 - endtime = datetime.now() + timedelta(seconds=wait_timeout) - while endtime > datetime.now(): - code, data = api_client.vms.restart(vm2_name) - if code == 204: - break - sleep(5) - else: - raise AssertionError("Time out while waiting for update hostname") +@pytest.fixture +def stopped_vm(request, api_client, ssh_keypair, wait_timeout, unique_name, image): + unique_vm_name = f"{request.node.name.lstrip('test_').replace('_', '-')}-{unique_name}" + cpu, mem = 1, 2 + pub_key, pri_key = ssh_keypair + vm_spec = api_client.vms.Spec(cpu, mem) + vm_spec.add_image("disk-0", image['id']) + vm_spec.run_strategy = "Halted" - # waiting for vm2 perform to restart - sleep(60) + userdata = yaml.safe_load(vm_spec.user_data) + userdata['ssh_authorized_keys'] = [pub_key] + vm_spec.user_data = yaml.dump(userdata) + code, data = api_client.vms.create(unique_vm_name, vm_spec) + assert 201 == code, (code, data) endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): - if _check_assigned_ip_func(api_client, vm2_name)(): + code, data = api_client.vms.get(unique_vm_name) + if "Stopped" == data.get('status', {}).get('printableStatus'): break - sleep(5) - else: - raise AssertionError("Time out while waiting for assigned ip for vm2") - - code, cluster_state.vm2 = api_client.vms.get_status(vm2_name) - assert code == 200, ( - f"Failed to get vm2 vmi: {data}") + sleep(1) - # verify data - vm1_data = _get_data_from_vm(vm_shell, _get_ip_from_vmi(cluster_state.vm1), - cluster_state.image_ssh_user, - timeout=wait_timeout) - vm2_data = _get_data_from_vm(vm_shell, _get_ip_from_vmi(cluster_state.vm2), - cluster_state.image_ssh_user, - timeout=wait_timeout) + yield unique_vm_name, image['user'], pri_key - assert vm1_data == vm2_data, ("Data in VM is lost") + code, data = api_client.vms.get(unique_vm_name) + vm_spec = api_client.vms.Spec.from_dict(data) - # check VMs should able to reach each others (in same networks) - assert _ping(vm_shell, _get_ip_from_vmi(cluster_state.vm1), - _get_ip_from_vmi(cluster_state.vm2), - cluster_state.image_ssh_user, - timeout=wait_timeout), ( - "Failed to ping each other") + api_client.vms.delete(unique_vm_name) + endtime = datetime.now() + timedelta(seconds=wait_timeout) + while endtime > datetime.now(): + code, data = api_client.vms.get_status(unique_vm_name) + if 404 == code: + break + sleep(3) - cluster_state.vm3 = _create_basic_vm(api_client, cluster_state, vm_shell, - vm_prefix="vm3", - sc=cluster_state.new_sc['metadata']['name'], - timeout=wait_timeout) + for vol in vm_spec.volumes: + vol_name = vol['volume']['persistentVolumeClaim']['claimName'] + api_client.volumes.delete(vol_name) @pytest.mark.upgrade @pytest.mark.negative @pytest.mark.any_nodes class TestInvalidUpgrade: - def test_degraded_volume( - self, api_client, wait_timeout, vm_shell_from_host, vm_checker, upgrade_target, stopped_vm - ): - """ - Criteria: create upgrade should fails if there are any degraded volumes - Steps: - 1. Create a VM using a volume with 3 replicas. - 2. Delete one replica of the volume. Let the volume stay in - degraded state. - 3. Immediately upgrade Harvester. - 4. Upgrade should fail. - """ - vm_name, ssh_user, pri_key = stopped_vm - vm_started, (code, vmi) = vm_checker.wait_started(vm_name) - assert vm_started, (code, vmi) - - # Write date into VM - vm_ip = next(iface['ipAddress'] for iface in vmi['status']['interfaces'] - if iface['name'] == 'default') - code, data = api_client.hosts.get(vmi['status']['nodeName']) - host_ip = next(addr['address'] for addr in data['status']['addresses'] - if addr['type'] == 'InternalIP') - with vm_shell_from_host(host_ip, vm_ip, ssh_user, pkey=pri_key) as sh: - stdout, stderr = sh.exec_command( - "dd if=/dev/urandom of=./generate_file bs=1M count=1024; sync" - ) - assert not stderr, (stdout, stderr) - - # Get pv name of the volume - claim_name = vmi["spec"]["volumes"][0]["persistentVolumeClaim"]["claimName"] - code, data = api_client.volumes.get(name=claim_name) - assert code == 200, f"Failed to get volume {claim_name}: {data}" - pv_name = data["spec"]["volumeName"] - - # Make the volume becomes degraded - code, data = api_client.lhreplicas.get() - assert code == 200 and data['items'], f"Failed to get longhorn replicas ({code}): {data}" - replica = next(r for r in data["items"] if pv_name == r['spec']['volumeName']) - api_client.lhreplicas.delete(name=replica['metadata']['name']) - endtime = datetime.now() + timedelta(seconds=wait_timeout) - while endtime > datetime.now(): - code, data = api_client.lhvolumes.get(pv_name) - if 200 == code and "degraded" == data['status']['robustness']: - break - else: - raise AssertionError( - f"Unable to make the Volume {pv_name} degraded\n" - f"API Status({code}): {data}" - ) - - # create upgrade and verify it is not allowed - version, url, checksum = upgrade_target - code, data = api_client.versions.create(version, url, checksum) - assert code == 201, f"Failed to create version {version}: {data}" - code, data = api_client.upgrades.create(version) - assert code == 400, f"Failed to verify degraded volume: {code}, {data}" - - # Teardown invalid upgrade - api_client.upgrades.delete(data['metadata']['name']) - api_client.versions.delete(version) - def test_iso_url(self, api_client, unique_name, upgrade_timeout): """ Steps: @@ -1035,102 +487,250 @@ def test_version_compatibility( api_client.upgrades.delete(data['metadata']['name']) api_client.versions.delete(version) + def test_degraded_volume( + self, api_client, wait_timeout, vm_shell_from_host, vm_checker, upgrade_target, stopped_vm + ): + """ + Criteria: create upgrade should fails if there are any degraded volumes + Steps: + 1. Create a VM using a volume with 3 replicas. + 2. Delete one replica of the volume. Let the volume stay in + degraded state. + 3. Immediately upgrade Harvester. + 4. Upgrade should fail. + """ + vm_name, ssh_user, pri_key = stopped_vm + vm_started, (code, vmi) = vm_checker.wait_started(vm_name) + assert vm_started, (code, vmi) + + # Write date into VM + vm_ip = next(iface['ipAddress'] for iface in vmi['status']['interfaces'] + if iface['name'] == 'default') + code, data = api_client.hosts.get(vmi['status']['nodeName']) + host_ip = next(addr['address'] for addr in data['status']['addresses'] + if addr['type'] == 'InternalIP') + with vm_shell_from_host(host_ip, vm_ip, ssh_user, pkey=pri_key) as sh: + stdout, stderr = sh.exec_command( + "dd if=/dev/urandom of=./generate_file bs=1M count=1024; sync" + ) + assert not stdout, (stdout, stderr) + + # Get pv name of the volume + claim_name = vmi["spec"]["volumes"][0]["persistentVolumeClaim"]["claimName"] + code, data = api_client.volumes.get(name=claim_name) + assert code == 200, f"Failed to get volume {claim_name}: {data}" + pv_name = data["spec"]["volumeName"] + + # Make the volume becomes degraded + code, data = api_client.lhreplicas.get() + assert code == 200 and data['items'], f"Failed to get longhorn replicas ({code}): {data}" + replica = next(r for r in data["items"] if pv_name == r['spec']['volumeName']) + api_client.lhreplicas.delete(name=replica['metadata']['name']) + endtime = datetime.now() + timedelta(seconds=wait_timeout) + while endtime > datetime.now(): + code, data = api_client.lhvolumes.get(pv_name) + if 200 == code and "degraded" == data['status']['robustness']: + break + else: + raise AssertionError( + f"Unable to make the Volume {pv_name} degraded\n" + f"API Status({code}): {data}" + ) + + # create upgrade and verify it is not allowed + version, url, checksum = upgrade_target + code, data = api_client.versions.create(version, url, checksum) + assert code == 201, f"Failed to create version {version}: {data}" + code, data = api_client.upgrades.create(version) + assert code == 400, f"Failed to verify degraded volume: {code}, {data}" + + # Teardown invalid upgrade + api_client.versions.delete(version) + @pytest.mark.upgrade @pytest.mark.any_nodes class TestAnyNodesUpgrade: + @pytest.mark.dependency(name="preq_setup_logging") + def test_preq_setup_logging(self, api_client): + # TODO: enable addon if > v1.2.0 + return + + @pytest.mark.dependency(name="preq_setup_vmnetwork") + def test_preq_setup_vmnetwork(self, vm_network): + ''' Be used to trigger the fixture to setup VM network ''' + + @pytest.mark.dependency(name="preq_setup_storageclass") + def test_preq_setup_storageclass(self, config_storageclass): + """ Be used to trigger the fixture to setup storageclass""" + + @pytest.mark.dependency(name="preq_setup_vms") + def test_preq_setup_vms( + self, api_client, ssh_keypair, unique_name, vm_checker, vm_shell, vm_network, image, + config_storageclass, config_backup_target, wait_timeout, cluster_state + ): + # create new storage class, make it default + # create 3 VMs: + # - having the new storage class + # - the VM that have some data written, take backup + # - the VM restored from the backup + pub_key, pri_key = ssh_keypair + old_sc, new_sc = config_storageclass + unique_vm_name = f"ug-vm-{unique_name}" + + cpu, mem, size = 1, 2, 5 + vm_spec = api_client.vms.Spec(cpu, mem, mgmt_network=False) + vm_spec.add_image('disk-0', image['id'], size=size) + vm_spec.add_network('nic-1', f"{vm_network['namespace']}/{vm_network['name']}") + userdata = yaml.safe_load(vm_spec.user_data) + userdata['ssh_authorized_keys'] = [pub_key] + vm_spec.user_data = yaml.dump(userdata) + + code, data = api_client.vms.create(unique_vm_name, vm_spec) + assert 201 == code, (code, data) + vm_got_ips, (code, data) = vm_checker.wait_interfaces(unique_vm_name) + assert vm_got_ips, ( + f"Failed to Start VM({unique_vm_name}) with errors:\n" + f"Status: {data.get('status')}\n" + f"API Status({code}): {data}" + ) + vm_ip = next(iface['ipAddress'] for iface in data['status']['interfaces'] + if iface['name'] == 'nic-1') + # write data into VM + endtime = datetime.now() + timedelta(seconds=wait_timeout) + while endtime > datetime.now(): + try: + with vm_shell.login(vm_ip, image['user'], pkey=pri_key) as sh: + cloud_inited, (out, err) = vm_checker.wait_cloudinit_done(sh) + assert cloud_inited and not err, (out, err) + out, err = sh.exec_command( + "dd if=/dev/urandom of=./generate_file bs=1M count=1024; sync" + ) + assert not out, (out, err) + vm1_md5, err = sh.exec_command( + "md5sum ./generate_file > ./generate_file.md5; cat ./generate_file.md5" + ) + assert not err, (vm1_md5, err) + break + except (SSHException, NoValidConnectionsError): + sleep(5) + else: + raise AssertionError("Timed out while writing data into VM") + + # Take backup then check it's ready + code, data = api_client.vms.backup(unique_vm_name, unique_vm_name) + assert 204 == code, (code, data) + endtime = datetime.now() + timedelta(seconds=wait_timeout) + while endtime > datetime.now(): + code, backup = api_client.backups.get(unique_vm_name) + if 200 == code and backup.get('status', {}).get('readyToUse'): + break + sleep(3) + else: + raise AssertionError( + f'Timed-out waiting for the backup \'{unique_vm_name}\' to be ready.' + ) + # restore into new VM + restored_vm_name = f"r-{unique_vm_name}" + spec = api_client.backups.RestoreSpec.for_new(restored_vm_name) + code, data = api_client.backups.restore(unique_vm_name, spec) + assert 201 == code, (code, data) + vm_got_ips, (code, data) = vm_checker.wait_interfaces(restored_vm_name) + assert vm_got_ips, ( + f"Failed to Start VM({restored_vm_name}) with errors:\n" + f"Status: {data.get('status')}\n" + f"API Status({code}): {data}" + ) + # Check data consistency + r_vm_ip = next(iface['ipAddress'] for iface in data['status']['interfaces'] + if iface['name'] == 'nic-1') + endtime = datetime.now() + timedelta(seconds=wait_timeout) + while endtime > datetime.now(): + try: + with vm_shell.login(r_vm_ip, image['user'], pkey=pri_key) as sh: + cloud_inited, (out, err) = vm_checker.wait_cloudinit_done(sh) + assert cloud_inited and not err, (out, err) + out, err = sh.exec_command("md5sum -c ./generate_file.md5") + assert not err, (out, err) + vm2_md5, err = sh.exec_command("cat ./generate_file.md5") + assert not err, (vm2_md5, err) + assert vm1_md5 == vm2_md5 + out, err = sh.exec_command( + f"ping -c1 {vm_ip} > /dev/null && echo -n success || echo -n fail" + ) + assert "success" == out and not err + break + except (SSHException, NoValidConnectionsError): + sleep(5) + else: + raise AssertionError("Unable to login to restored VM to check data consistency") + + # Create VM having additional volume with new storage class + vm_spec.add_volume("vol-1", 5, storage_cls=new_sc['metadata']['name']) + code, data = api_client.vms.create(f"sc-{unique_vm_name}", vm_spec) + assert 201 == code, (code, data) + vm_got_ips, (code, data) = vm_checker.wait_interfaces(f"sc-{unique_vm_name}") + assert vm_got_ips, ( + f"Failed to Start VM(sc-{unique_vm_name}) with errors:\n" + f"Status: {data.get('status')}\n" + f"API Status({code}): {data}" + ) + + # store into cluster's state + names = [unique_vm_name, f"r-{unique_vm_name}", f"sc-{unique_vm_name}"] + cluster_state.vms = dict(md5=vm1_md5, names=names, ssh_user=image['user'], pkey=pri_key) @pytest.mark.dependency(name="any_nodes_upgrade") - def test_perform_upgrade(self, cluster_prereq, vm_prereq, request, api_client, cluster_state, - wait_timeout): + def test_perform_upgrade( + self, api_client, unique_name, upgrade_target, upgrade_timeout, interceptor + ): """ - perform upgrade - check all nodes upgraded """ - if cluster_state.version_verify: - assert not _is_installed_version(api_client, cluster_state.version), ( - f"The current version is already {cluster_state.version}") - - self._perform_upgrade(request, api_client, cluster_state) - - # start vms when are stopped - code, data = api_client.vms.get() - assert code == 200, (f"Failed to get vms: {data}") - - for vm in data["data"]: - if "ready" not in vm["status"] or not vm["status"]["ready"]: - endtime = datetime.now() + timedelta(seconds=wait_timeout) - while endtime > datetime.now(): - code, data = api_client.vms.start(vm["metadata"]["name"]) - if code == 204: - break - sleep(5) - else: - raise AssertionError(f"start vm timeout: {data}") - - # wait for vm to be assigned an IP - _wait_for_vm_ready(api_client, vm["metadata"]["name"], - timeout=wait_timeout) - - def _perform_upgrade(self, request, api_client, cluster_state): - # force create upgrade version - code, data = api_client.versions.get(cluster_state.version) - if code == 200: - code, data = api_client.versions.delete(cluster_state.version) - assert code == 204, ( - f"Failed to delete version {cluster_state.version}: {data}") - - code, data = _create_version(request, api_client, cluster_state.version) - assert code == 201, ( - f"Failed to create version {cluster_state.version}: {data}") - - code, data = api_client.upgrades.create(cluster_state.version) - assert code == 201, ( - f"Failed to upgrade version {cluster_state.version}: {code}, {data}") - - def _wait_for_upgrade(): - try: - code, upgrade_data = api_client.upgrades.get(data["metadata"]["name"]) - if code != 200: - return False, {} - except Exception: - return False, upgrade_data.get('status', {}) - - status = upgrade_data.get('status', {}) - - if upgrade_data['metadata'].get('labels', {}).get(UPGRADE_STATE_LABEL) == "Succeeded": - return True, status + # Check nodes counts + code, data = api_client.hosts.get() + assert code == 200, (code, data) + nodes = len(data['data']) - conds = upgrade_data.get('status', {}).get('conditions', []) - if len(conds) > 0: - for cond in conds: - if cond["status"] == "False": - cond_type = cond["type"] - raise AssertionError(f"Upgrade failed: {cond_type}: {cond}") - - if cond["type"] == "Completed" and cond["status"] == "True": - return True, status - - return False, status + # create Upgrade version and start + skip_version_check = {"harvesterhci.io/skip-version-check": True} # for test purpose + version, url, checksum = upgrade_target + version = f"{version}-{unique_name}" + code, data = api_client.versions.create(version, url, checksum) + assert 201 == code, f"Failed to create upgrade for {version}" + code, data = api_client.upgrades.create(version, annotations=skip_version_check) + assert 201 == code, f"Failed to start upgrade for {version}" + upgrade_name = data['metadata']['name'] - nodes = _get_all_nodes(api_client) - upgrade_timeout = request.config.getoption('--upgrade-wait-timeout') or 7200 - endtime = datetime.now() + timedelta(seconds=upgrade_timeout * len(nodes)) + # Check upgrade status + # TODO: check every upgrade stages + endtime = datetime.now() + timedelta(seconds=upgrade_timeout * nodes) while endtime > datetime.now(): - upgraded, status = _wait_for_upgrade() - if upgraded: + code, data = api_client.upgrades.get(upgrade_name) + if 200 != code: + continue + interceptor.check(data) + conds = dict((c['type'], c) for c in data.get('status', {}).get('conditions', [])) + state = data.get('metadata', {}).get('labels', {}).get('harvesterhci.io/upgradeState') + if "Succeeded" == state and "True" == conds.get('Completed', {}).get('status'): break - sleep(5) + if any("False" == c['status'] for c in conds.values()): + raise AssertionError(f"Upgrade failed with conditions: {conds.values()}") + sleep(30) else: - raise AssertionError(f"Upgrade timeout: {status}") + raise AssertionError( + f"Upgrade timed out with conditions: {conds.values()}\n" + f"API Status({code}): {data}" + ) - @pytest.mark.dependency(depends=["any_nodes_upgrade"]) + @pytest.mark.dependency(depends=["any_nodes_upgrade", "preq_setup_logging"]) def test_verify_logging_pods(self, api_client): """ Verify logging pods and logs Criteria: https://github.com/harvester/tests/issues/535 """ - code, pods = api_client.get_pods(namespace=LOGGING_NAMESPACE) + code, pods = api_client.get_pods(namespace="cattle-logging-system") assert code == 200 and len(pods['data']) > 0, "No logging pods found" fails = [] @@ -1146,7 +746,10 @@ def test_verify_logging_pods(self, api_client): @pytest.mark.dependency(depends=["any_nodes_upgrade"]) def test_verify_audit_log(self, api_client, host_shell, wait_timeout): - masters, workers = _get_master_and_worker_nodes(api_client) + code, data = api_client.hosts.get() + assert 200 == code, (code, data) + label_main = "node-role.kubernetes.io/control-plane" + masters = [n for n in data['data'] if n['metadata']['labels'].get(label_main) == "true"] assert len(masters) > 0, "No master nodes found" script = ("sudo tail /var/lib/rancher/rke2/server/logs/audit.log | awk 'END{print}' " @@ -1168,8 +771,7 @@ def test_verify_audit_log(self, api_client, host_shell, wait_timeout): continue if not err and cmp[ip] < timestamp: done.add(ip) - except (ChannelException, AuthenticationException, NoValidConnectionsError, - socket.timeout): + except (SSHException, NoValidConnectionsError, socket.timeout): continue if not done.symmetric_difference(node_ips): @@ -1180,7 +782,7 @@ def test_verify_audit_log(self, api_client, host_shell, wait_timeout): "\n".join("Node {ip} audit log is not updated." for ip in set(node_ips) ^ done) ) - @pytest.mark.dependency(depends=["any_nodes_upgrade"]) + @pytest.mark.dependency(depends=["any_nodes_upgrade", "preq_setup_vmnetwork"]) def test_verify_network(self, api_client, cluster_state): """ Verify cluster and VLAN networks - cluster network `mgmt` should exists @@ -1204,8 +806,8 @@ def test_verify_network(self, api_client, cluster_state): assert any(used_vlan == n['metadata']['name'] for n in vnets['items']), ( f"VLAN {used_vlan} not found") - @pytest.mark.dependency(depends=["any_nodes_upgrade"]) - def test_verify_vms(self, api_client, cluster_state, vm_shell, wait_timeout): + @pytest.mark.dependency(depends=["any_nodes_upgrade", "preq_setup_vms"]) + def test_verify_vms(self, api_client, cluster_state, vm_shell, vm_checker, wait_timeout): """ Verify VMs' state and data Criteria: - VMs should keep in running state @@ -1213,93 +815,127 @@ def test_verify_vms(self, api_client, cluster_state, vm_shell, wait_timeout): """ code, vmis = api_client.vms.get_status() - assert code == 200, ( - f"Failed to get VMs: {code}, {vmis}") - - assert len(vmis["data"]) > 0, ("No VMs found") + assert code == 200 and len(vmis['data']), (code, vmis) - fails = [] - for vmi in vmis['data']: - if "vm1" in vmi['metadata']['name']: - cluster_state.vm1 = vmi - if "vm2" in vmi['metadata']['name']: - cluster_state.vm2 = vmi - if not _check_vm_is_running(vmi): - fails.append(vmi['metadata']['name']) - assert not fails, "\n".join(f"VM {n} is not running" for n in fails) - - vm1_data = _get_data_from_vm(vm_shell, _get_ip_from_vmi(cluster_state.vm1), - cluster_state.image_ssh_user, - timeout=wait_timeout) - vm2_data = _get_data_from_vm(vm_shell, _get_ip_from_vmi(cluster_state.vm2), - cluster_state.image_ssh_user, - timeout=wait_timeout) - assert vm1_data == vm2_data, ("Data in VM is lost") + endtime = datetime.now() + timedelta(seconds=wait_timeout) + while endtime > datetime.now(): + fails, ips = list(), dict() + for name in cluster_state.vms['names']: + code, data = api_client.vms.get_status(name) + try: + assert 200 == code + assert "Running" == data['status']['phase'] + assert data['status']['nodeName'] + ips[name] = next(iface['ipAddress'] for iface in data['status']['interfaces'] + if iface['name'] == 'nic-1') + except (AssertionError, TypeError, StopIteration, KeyError) as ex: + fails.append((name, (ex, code, data))) + if not fails: + break + else: + raise AssertionError("\n".join( + f"VM {name} is not in expected state.\nException: {ex}\nAPI Status({code}): {data}" + for (name, (ex, code, data)) in fails) + ) - @pytest.mark.dependency(depends=["any_nodes_upgrade"]) - def test_verify_restore_vm(self, api_client, cluster_state, vm_shell, wait_timeout): + pri_key, ssh_user = cluster_state.vms['pkey'], cluster_state.vms['ssh_user'] + for name in cluster_state.vms['names'][:-1]: + vm_ip = ips[name] + endtime = datetime.now() + timedelta(seconds=wait_timeout) + while endtime > datetime.now(): + try: + with vm_shell.login(vm_ip, ssh_user, pkey=pri_key) as sh: + out, err = sh.exec_command("md5sum -c ./generate_file.md5") + assert not err, (out, err) + md5, err = sh.exec_command("cat ./generate_file.md5") + assert not err, (md5, err) + assert md5 == cluster_state.vms['md5'] + break + except (SSHException, NoValidConnectionsError): + sleep(5) + else: + fails.append(f"Data in VM({name}, {vm_ip}) is inconsistent.") + + assert not fails, "\n".join(fails) + + # Teardown: remove all VMs + for name in cluster_state.vms['names']: + code, data = api_client.vms.get(name) + spec = api_client.vms.Spec.from_dict(data) + _ = vm_checker.wait_deleted(name) + for vol in spec.volumes: + vol_name = vol['volume']['persistentVolumeClaim']['claimName'] + api_client.volumes.delete(vol_name) + + @pytest.mark.dependency(depends=["any_nodes_upgrade", "preq_setup_vms"]) + def test_verify_restore_vm( + self, api_client, cluster_state, vm_shell, vm_checker, wait_timeout + ): """ Verify VM restored from the backup Criteria: - VM should able to start - data in VM should not lost """ - vm4_name = f"vm4-{cluster_state.unique_id}" - restore_spec = api_client.backups.RestoreSpec(True, vm_name=vm4_name) - code, data = api_client.backups.create(cluster_state.backup['metadata']['name'], - restore_spec) - assert code == 201, ( - f"Failed to restore to vm4: {data}") + backup_name = cluster_state.vms['names'][0] + restored_vm_name = f"new-r-{backup_name}" + + # Restore VM from backup and check networking is good + restore_spec = api_client.backups.RestoreSpec.for_new(restored_vm_name) + code, data = api_client.backups.restore(backup_name, restore_spec) + assert code == 201, "Unable to restore backup {backup_name} after upgrade" + vm_got_ips, (code, data) = vm_checker.wait_interfaces(restored_vm_name) + assert vm_got_ips, ( + f"Failed to Start VM({restored_vm_name}) with errors:\n" + f"Status: {data.get('status')}\n" + f"API Status({code}): {data}" + ) + # Check data in restored VM is consistent + pri_key, ssh_user = cluster_state.vms['pkey'], cluster_state.vms['ssh_user'] + vm_ip = next(iface['ipAddress'] for iface in data['status']['interfaces'] + if iface['name'] == 'nic-1') endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): - if _check_assigned_ip_func(api_client, vm4_name)(): - break - sleep(5) + try: + with vm_shell.login(vm_ip, ssh_user, pkey=pri_key) as sh: + cloud_inited, (out, err) = vm_checker.wait_cloudinit_done(sh) + assert cloud_inited and not err, (out, err) + out, err = sh.exec_command("md5sum -c ./generate_file.md5") + assert not err, (out, err) + md5, err = sh.exec_command("cat ./generate_file.md5") + assert not err, (md5, err) + assert md5 == cluster_state.vms['md5'] + break + except (SSHException, NoValidConnectionsError): + sleep(5) else: - raise AssertionError("Time out while waiting for assigned ip for vm4") - - code, vm4 = api_client.vms.get_status(vm4_name) - assert code == 200, ( - f"Failed to get vm2 vmi: {vm4}") - - vm4_data = _get_data_from_vm(vm_shell, _get_ip_from_vmi(vm4), - cluster_state.image_ssh_user, - timeout=wait_timeout) - vm1_data = _get_data_from_vm(vm_shell, _get_ip_from_vmi(cluster_state.vm1), - cluster_state.image_ssh_user, - timeout=wait_timeout) - assert vm1_data == vm4_data, ("Data in VM is not the same as the original") - - @pytest.mark.dependency(depends=["any_nodes_upgrade"]) - def test_verify_storage_class(self, api_client): + raise AssertionError("Unable to login to restored VM to check data consistency") + + # teardown: remove the VM + code, data = api_client.vms.get(restored_vm_name) + spec = api_client.vms.Spec.from_dict(data) + _ = vm_checker.wait_deleted(restored_vm_name) + for vol in spec.volumes: + vol_name = vol['volume']['persistentVolumeClaim']['claimName'] + api_client.volumes.delete(vol_name) + + @pytest.mark.dependency(depends=["any_nodes_upgrade", "preq_setup_storageclass"]) + def test_verify_storage_class(self, api_client, cluster_state): """ Verify StorageClasses and defaults - `new_sc` should be settle as default - `longhorn` should exists """ code, scs = api_client.scs.get() - assert code == 200, ( - "Failed to get StorageClasses: %d, %s" % (code, scs)) - + assert code == 200, ("Failed to get StorageClasses: %d, %s" % (code, scs)) assert len(scs["items"]) > 0, ("No StorageClasses found") - longhorn_exists = False - test_exists = False - test_default = False - for sc in scs["items"]: - annotations = sc["metadata"].get('annotations', {}) - if sc["metadata"]["name"] == "longhorn": - longhorn_exists = True - - if "new-sc" in sc["metadata"]["name"]: - test_exists = True - default = annotations["storageclass.kubernetes.io/is-default-class"] - if default == "true": - test_default = True - - assert longhorn_exists, ("longhorn StorageClass not found") - assert test_exists, ("test StorageClass not found") - assert test_default, ("test StorageClass is not default") + created_sc = cluster_state.scs[-1]['metadata']['name'] + names = {sc['metadata']['name']: sc['metadata'].get('annotations') for sc in scs['items']} + assert "longhorn" in names + assert created_sc in names + assert "storageclass.kubernetes.io/is-default-class" in names[created_sc] + assert "true" == names[created_sc]["storageclass.kubernetes.io/is-default-class"] @pytest.mark.dependency(depends=["any_nodes_upgrade"]) def test_verify_os_version(self, request, api_client, cluster_state, host_shell): @@ -1309,8 +945,9 @@ def test_verify_os_version(self, request, api_client, cluster_state, host_shell) pytest.skip("skip verify os version") # Get all nodes - nodes = _get_all_nodes(api_client) - for node in nodes: + code, data = api_client.hosts.get() + assert 200 == code, (code, data) + for node in data['data']: node_ip = node["metadata"]["annotations"][NODE_INTERNAL_IP_ANNOTATION] with host_shell.login(node_ip) as sh: @@ -1327,9 +964,13 @@ def test_verify_rke2_version(self, api_client, host_shell): # Verify node version on all nodes script = "cat /etc/harvester-release.yaml" + label_main = "node-role.kubernetes.io/control-plane" + code, data = api_client.hosts.get() + assert 200 == code, (code, data) + masters = [n for n in data['data'] if n['metadata']['labels'].get(label_main) == "true"] + # Verify rke2 version except_rke2_version = "" - masters, workers = _get_master_and_worker_nodes(api_client) for node in masters: node_ip = node["metadata"]["annotations"][NODE_INTERNAL_IP_ANNOTATION] @@ -1409,14 +1050,14 @@ def test_verify_deployed_components_version(self, api_client): assert longhorn_manager_version_existed, "longhorn manager version is not correct" @pytest.mark.dependency(depends=["any_nodes_upgrade"]) - def test_verify_crds_existed(self, api_client): + def test_verify_crds_existed(self, api_client, harvester_crds): """ Verify crds existed Criteria: - crds should be existed """ not_existed_crds = [] exist_crds = True - for crd in expected_harvester_crds: + for crd in harvester_crds: code, _ = api_client.get_crds(name=crd) if code != 200: @@ -1427,32 +1068,39 @@ def test_verify_crds_existed(self, api_client): raise AssertionError(f"CRDs {not_existed_crds} are not existed") @pytest.mark.dependency(depends=["any_nodes_upgrade"]) - def test_upgrade_vm_deleted(self, api_client): - code, data = api_client.vms.get(namespace='harvester-system') - upgrade_vms = [vm for vm in data['data'] if 'upgrade' in vm['id']] - - assert not upgrade_vms, ( - "Upgrade related VM still available:\n" - f"{upgrade_vms}" - ) + def test_upgrade_vm_deleted(self, api_client, wait_timeout): + # max to wait 300s for the upgrade related VMs to be deleted + endtime = datetime.now() + timedelta(seconds=min(wait_timeout / 5, 300)) + while endtime > datetime.now(): + code, data = api_client.vms.get(namespace='harvester-system') + upgrade_vms = [vm for vm in data['data'] if 'upgrade' in vm['id']] + if not upgrade_vms: + break + else: + raise AssertionError(f"Upgrade related VM still available:\n{upgrade_vms}") @pytest.mark.dependency(depends=["any_nodes_upgrade"]) - def test_upgrade_volume_deleted(self, api_client): - code, data = api_client.volumes.get(namespace='harvester-system') - upgrade_vols = [vol for vol in data['data'] if 'upgrade' in vol['id']] - - assert not upgrade_vols, ( - "Upgrade related volume(s) still available:\n" - f"{upgrade_vols}" - ) + def test_upgrade_volume_deleted(self, api_client, wait_timeout): + # max to wait 300s for the upgrade related volumes to be deleted + endtime = datetime.now() + timedelta(seconds=min(wait_timeout / 5, 300)) + while endtime > datetime.now(): + code, data = api_client.volumes.get(namespace='harvester-system') + upgrade_vols = [vol for vol in data['data'] + if 'upgrade' in vol['id'] and not vol['id'].endswith('log-archive')] + if not upgrade_vols: + break + else: + raise AssertionError(f"Upgrade related volume(s) still available:\n{upgrade_vols}") @pytest.mark.dependency(depends=["any_nodes_upgrade"]) - def test_upgrade_image_deleted(self, api_client): - code, data = api_client.images.get(namespace='harvester-system') - upgrade_images = [image for image in data['items'] - if 'upgrade' in image['spec']['displayName']] - - assert not upgrade_images, ( - "Upgrade related image(s) still available:\n" - f"{upgrade_images}" - ) + def test_upgrade_image_deleted(self, api_client, wait_timeout): + # max to wait 300s for the upgrade related volumes to be deleted + endtime = datetime.now() + timedelta(seconds=min(wait_timeout / 5, 300)) + while endtime > datetime.now(): + code, data = api_client.images.get(namespace='harvester-system') + upgrade_images = [image for image in data['items'] + if 'upgrade' in image['spec']['displayName']] + if not upgrade_images: + break + else: + raise AssertionError(f"Upgrade related image(s) still available:\n{upgrade_images}")