diff --git a/.github/workflows/ci-testing.yml b/.github/workflows/ci-testing.yml
new file mode 100644
index 0000000..cfd3617
--- /dev/null
+++ b/.github/workflows/ci-testing.yml
@@ -0,0 +1,28 @@
+name: CI-Code-Style
+
+on: [push]
+
+jobs:
+ unit_tests:
+ name: Linter checks
+ runs-on: ubuntu-20.04
+ strategy:
+ matrix:
+ python-version: [3.6]
+
+ steps:
+ - uses: actions/checkout@v3
+ - name: Python${{ matrix.python-version }}
+ uses: actions/setup-python@v4
+ with:
+ python-version: ${{ matrix.python-version }}
+ - name: Install Tox
+ run: |
+ python -m pip install --upgrade pip
+ python -m pip install tox
+ - name: Run code checks
+ run: |
+ tox -e check
+ tox -e "unit_py${PY_VER/./_}" -- -n auto
+ env:
+ PY_VER: ${{ matrix.python-version }}
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..7b00dc9
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,9 @@
+# Distribution / packaging
+*.egg-info
+
+# Unit test / coverage reports
+.tox/
+.coverage
+.cache
+**/*/__pycache__/
+.pytest_cache
diff --git a/.virtualenv.dev-requirements.txt b/.virtualenv.dev-requirements.txt
new file mode 100644
index 0000000..d5fd27b
--- /dev/null
+++ b/.virtualenv.dev-requirements.txt
@@ -0,0 +1,15 @@
+-r .virtualenv.requirements.txt
+
+# python unit testing framework
+pytest
+pytest-cov
+coverage
+
+# Rolling backport of unittest.mock for all Pythons
+mock
+
+# Python style guide checker
+flake8
+
+# Version-bump your software with a single command!
+bumpversion
diff --git a/.virtualenv.requirements.txt b/.virtualenv.requirements.txt
new file mode 100644
index 0000000..1dfcf10
--- /dev/null
+++ b/.virtualenv.requirements.txt
@@ -0,0 +1,5 @@
+lxml
+requests
+dnspython
+M2Crypto
+toml
diff --git a/cloud-regionsrv-client.spec b/cloud-regionsrv-client.spec
index 9cbd8eb..308d673 100644
--- a/cloud-regionsrv-client.spec
+++ b/cloud-regionsrv-client.spec
@@ -16,7 +16,7 @@
#
-%define base_version 10.1.6
+%define base_version 10.1.8
Name: cloud-regionsrv-client
Version: %{base_version}
Release: 0
@@ -189,6 +189,7 @@ fi
%dir /var/cache/cloudregister
%{_mandir}/man*/*
%{_sbindir}/cloudguest-repo-service
+%{_sbindir}/cloudguestregistryauth
%{_sbindir}/containerbuild-regionsrv
%{_sbindir}/createregioninfo
%{_sbindir}/switchcloudguestservices
diff --git a/etc/regionserverclnt.cfg b/etc/regionserverclnt.cfg
index ec35cad..b2bb4d6 100644
--- a/etc/regionserverclnt.cfg
+++ b/etc/regionserverclnt.cfg
@@ -1,12 +1,13 @@
[server]
api = regionInfo
-certLocation = /var/lib/regionService/certs
+certLocation = /usr/lib/regionService/certs
regionsrv = COMMA_SEP_LIST_OF_CLOUD_SPECIFIC_REGION_SERVER
metadata_server = OPTIONAL_URL_FOR_METADATA_SERVER_SMT_INFO
[instance]
dataProvider = none
instanceArgs = none
+httpsOnly = true
[service]
verifyAccess = none
diff --git a/integration_test-process.txt b/integration_test-process.txt
index d9697c2..e5f17c3 100644
--- a/integration_test-process.txt
+++ b/integration_test-process.txt
@@ -35,6 +35,13 @@ After installing the test package with "zypper in"
+ success message on stdout
+ $? is 0
+ zypper lr has repos
+- Update server failover
+ + ip=`grep susecloud.net /etc/hosts | cut -f1`
+ + iptables -A OUTPUT -d $ip -j DROP
+ + zypper ref
+ + grep -i equivalent /var/log/cloudregister
+ + grep susecloud.net /etc/hosts
+ + IP addresses in the last two outputs should match
byos - test on SLES4SAP instance
After installing the test package with "zypper in"
@@ -57,9 +64,9 @@ After installing the test package with "zypper in"
+ no error no message
+ zypper lr has no repos
- SUSEConnect -r XXX
- + registartion successful
+ + registration successful
- SUSEConnect -p sle-module-public-cloud/$VERSION/x86_64
- + registartion successful
+ + registration successful
- SUSEConnect -d -p sle-module-public-cloud/$VERSION/x86_64
+ module deletion successful
+ Requires SUSEConnect > 0.3.32
@@ -79,7 +86,7 @@ After installing the test package with "zypper in"
+ $? is 0
+ repos include HA
- SUSEConnect -p sle-module-public-cloud/$VERSION/x86_64
- + registartion successful
+ + registration successful
+ Cloud based RMT server is the target
- registercloudguest --clean
diff --git a/lib/cloudregister/VERSION b/lib/cloudregister/VERSION
index 08f8616..22a2908 100644
--- a/lib/cloudregister/VERSION
+++ b/lib/cloudregister/VERSION
@@ -1 +1 @@
-10.1.6
+10.1.8
diff --git a/lib/cloudregister/amazonec2.py b/lib/cloudregister/amazonec2.py
index 2dc6334..08f04db 100644
--- a/lib/cloudregister/amazonec2.py
+++ b/lib/cloudregister/amazonec2.py
@@ -32,10 +32,10 @@ def generateRegionSrvArgs():
for imds_ip in imds_ips:
imds_addr = imds_ip
if ':' in imds_ip:
- imds_addr = '[%s]' %imds_ip
+ imds_addr = '[%s]' % imds_ip
try:
token_resp = requests.put(
- token_url %imds_addr,
+ token_url % imds_addr,
headers=token_header
)
if token_resp.status_code == 200:
@@ -43,7 +43,7 @@ def generateRegionSrvArgs():
else:
continue
except requests.exceptions.RequestException:
- msg = 'Unable to retrieve IMDSv2 token using %s' %imds_ip
+ msg = 'Unable to retrieve IMDSv2 token using %s' % imds_ip
logging.info(msg)
continue
break
@@ -58,8 +58,8 @@ def generateRegionSrvArgs():
for imds_ip in imds_ips:
imds_addr = imds_ip
if ':' in imds_ip and '[' not in imds_ip:
- imds_addr = '[%s]' %imds_ip
- metadata_url = 'http://%s/latest/meta-data/' %imds_addr
+ imds_addr = '[%s]' % imds_ip
+ metadata_url = 'http://%s/latest/meta-data/' % imds_addr
zone_info = 'placement/availability-zone'
try:
@@ -104,6 +104,5 @@ def generateRegionSrvArgs():
logging.warning('\tMessage: %s' % zone_resp.text)
if imds_ip == imds_ips[-1]:
return
- continue
return 'regionHint=' + region
diff --git a/lib/cloudregister/registerutils.py b/lib/cloudregister/registerutils.py
index f0b56f3..a5ec70b 100644
--- a/lib/cloudregister/registerutils.py
+++ b/lib/cloudregister/registerutils.py
@@ -50,6 +50,7 @@
DOCKER_CONFIG_PATH = '/etc/docker/daemon.json'
REGISTRIES_CONF_PATH = '/etc/containers/registries.conf'
+
# ----------------------------------------------------------------------------
def add_hosts_entry(smt_server):
"""Add an entry to the /etc/hosts file for the given SMT server"""
@@ -461,7 +462,10 @@ def get_available_smt_servers():
if not os.path.exists(get_state_dir()):
return available_smt_servers
smt_data_files = glob.glob(
- os.path.join(get_state_dir(), AVAILABLE_SMT_SERVER_DATA_FILE_NAME % '*')
+ os.path.join(
+ get_state_dir(),
+ AVAILABLE_SMT_SERVER_DATA_FILE_NAME % '*'
+ )
)
for smt_data in smt_data_files:
available_smt_servers.append(get_smt_from_store(smt_data))
@@ -564,6 +568,14 @@ def set_registry_order_search(registry_fqdn):
_set_registry_order_search_docker(registry_fqdn)
+def refresh_registry_credentials():
+ """Refresh registry credentials."""
+ # to silence InsecureRequestWarning
+ # should be fixed on a different PR
+ requests.packages.urllib3.disable_warnings()
+ return get_activations()
+
+
# ----------------------------------------------------------------------------
def get_credentials_file(update_server, service_name=None):
"""Return the credentials filename.
@@ -617,9 +629,9 @@ def get_current_smt():
smt_ipv6 = smt.get_ipv6()
smt_fqdn = smt.get_FQDN()
# A bit cumbersome to support Python 3.4
- ipv4_search = '%s\s' % smt_ipv4
- ipv6_search = '%s\s' % smt_ipv6
- fqdn_search = '\s%s\s' % smt_fqdn
+ ipv4_search = r'%s\s' % smt_ipv4
+ ipv6_search = r'%s\s' % smt_ipv6
+ fqdn_search = r'\s%s\s' % smt_fqdn
with open(HOSTSFILE_PATH, 'rb') as hosts_file:
hosts = hosts_file.read()
if (
@@ -808,6 +820,7 @@ def get_smt(cache_refreshed=None):
# Fetch cert for new target server
import_smt_cert(new_target)
# Verify the new target server has our credentials
+ replace_hosts_entry(current_smt, new_target)
credentials_file_path = get_credentials_file(new_target)
user, password = get_credentials(credentials_file_path)
if not has_smt_access(
@@ -824,8 +837,8 @@ def get_smt(cache_refreshed=None):
msg += 'current, %s, target update server.'
msg += 'Try again later.'
logging.error(msg % (new_target_ips, original_smt_ips))
+ replace_hosts_entry(new_target, current_smt)
return current_smt
- replace_hosts_entry(current_smt, new_target)
set_as_current_smt(new_target)
return new_target
else:
@@ -992,7 +1005,7 @@ def has_region_changed(cfg):
registered_region = json.loads(
framework_file.read()
)
- except:
+ except Exception:
return False
if (
@@ -1714,7 +1727,9 @@ def _set_registry_order_search_podman(registry_fqdn):
with open(REGISTRIES_CONF_PATH, 'r') as registries_conf_file:
registries_conf = toml.load(registries_conf_file)
- if registry_fqdn not in registries_conf['unqualified-search-registries']: # no-qa
+ missing_registry_fqdn = registry_fqdn not in \
+ registries_conf['unqualified-search-registries']
+ if missing_registry_fqdn:
registries_conf['unqualified-search-registries'] = \
["{}".format(registry_fqdn), 'registry.suse.com'] + \
registries_conf['unqualified-search-registries']
diff --git a/lib/cloudregister/smt.py b/lib/cloudregister/smt.py
index ea2e33f..0d68ed9 100644
--- a/lib/cloudregister/smt.py
+++ b/lib/cloudregister/smt.py
@@ -211,7 +211,7 @@ def write_cert(self, target_dir):
# we write here with the known pattern.
for cert_name in certs_to_write:
try:
- with open(ca_file_path %cert_name, 'w') as smt_ca_file:
+ with open(ca_file_path % cert_name, 'w') as smt_ca_file:
smt_ca_file.write(cert)
except IOError:
errMsg = 'Could not store update server certificate'
@@ -232,8 +232,8 @@ def _form_srv_check_urls(self):
rmt_ip = srv_ip
# Per rfc3986 IPv6 addresses in a URI are enclosed in []
if isinstance(ipaddress.ip_address(rmt_ip), ipaddress.IPv6Address):
- rmt_ip = '[%s]' %srv_ip
- health_url = 'https://%s/api/health/status' %rmt_ip
+ rmt_ip = '[%s]' % srv_ip
+ health_url = 'https://%s/api/health/status' % rmt_ip
cert_url = '%s://%s/' % (self._protocol, rmt_ip)
check_urls[health_url] = cert_url
diff --git a/man/man1/cloudguestregistryauth.1 b/man/man1/cloudguestregistryauth.1
new file mode 100644
index 0000000..76c4fdd
--- /dev/null
+++ b/man/man1/cloudguestregistryauth.1
@@ -0,0 +1,12 @@
+.\" Process this file with
+.\" groff -man -Tascii cloudguestregstryauth.1
+.\"
+.TH cloudguestregstryauth.1
+.SH NAME
+cloudguestregstryauth \- Refresh registry credentials for a registered SUSE Linux Enterprise instance
+.SH SYNOPSIS
+.B cloudguestregstryauth
+.SH DESCRIPTION
+.B cloudguestregstryauth
+Refresh registry credentials for a registered SUSE Linux Enterprise instance. The client will reach
+out to the already configured update server to refresh the existing credentials.
diff --git a/tests/data/regionserverclnt.cfg b/tests/data/regionserverclnt.cfg
index 3076e8f..7f9d314 100644
--- a/tests/data/regionserverclnt.cfg
+++ b/tests/data/regionserverclnt.cfg
@@ -1,5 +1,5 @@
[server]
api = regionInfo
-certLocation = /var/lib/regionService/certs
+certLocation = /usr/lib/regionService/certs
regionsrv = COMMA_SEP_LIST_OF_CLOUD_SPECIFIC_REGION_SERVER
metadata_server = OPTIONAL_URL_FOR_METADATA_SERVER_SMT_INFO
diff --git a/tests/test_azureplugin.py b/tests/test_azureplugin.py
index fb995c0..1d7a432 100644
--- a/tests/test_azureplugin.py
+++ b/tests/test_azureplugin.py
@@ -24,7 +24,7 @@
sys.path.insert(0, code_path)
-import msftazure as azure
+import msftazure as azure # noqa
# ----------------------------------------------------------------------------
diff --git a/tests/test_cloudguestregistryauth.py b/tests/test_cloudguestregistryauth.py
new file mode 100644
index 0000000..a9e1beb
--- /dev/null
+++ b/tests/test_cloudguestregistryauth.py
@@ -0,0 +1,92 @@
+import inspect
+import os
+import sys
+
+from lxml import etree
+
+from importlib.machinery import SourceFileLoader
+
+from pytest import raises
+from textwrap import dedent
+from unittest.mock import patch
+
+test_path = os.path.abspath(
+ os.path.dirname(inspect.getfile(inspect.currentframe())))
+code_path = os.path.abspath('%s/../lib' % test_path)
+data_path = test_path + os.sep + 'data/'
+
+sys.path.insert(0, code_path)
+
+from cloudregister.smt import SMT # noqa
+
+
+# Hack to get the script without the .py imported for testing
+cloudguestregistryauth = SourceFileLoader(
+ 'cloudguestregistryauth',
+ './usr/sbin/cloudguestregistryauth'
+).load_module()
+
+
+@patch('cloudguestregistryauth.os.geteuid')
+def test_registry_call_as_root(mock_os_geteuid):
+ mock_os_geteuid.return_value = 1
+ with raises(SystemExit) as pytest_wrapped_e:
+ cloudguestregistryauth.main()
+
+ assert pytest_wrapped_e.type == SystemExit
+ assert pytest_wrapped_e.value.code == 'You must be root'
+
+
+@patch('cloudregister.registerutils.get_activations')
+@patch('cloudregister.registerutils.is_registered')
+@patch('cloudregister.registerutils.get_current_smt')
+@patch('cloudguestregistryauth.os.geteuid')
+def test_registry_get_activations_error(
+ mock_os_geteuid, mock_get_current_smt,
+ mock_is_registered, mock_get_activations
+):
+ mock_os_geteuid.return_value = 0
+ mock_is_registered.return_value = True
+ smt_data_ipv46 = dedent('''\
+ ''')
+ smt_server = SMT(etree.fromstring(smt_data_ipv46))
+ mock_get_current_smt.return_value = smt_server
+ mock_get_activations.return_value = {}
+
+ with raises(SystemExit) as pytest_wrapped_e:
+ cloudguestregistryauth.main()
+
+ assert pytest_wrapped_e.type == SystemExit
+ assert pytest_wrapped_e.value.code == 'Could not refresh credentials'
+
+
+@patch('builtins.print')
+@patch('cloudregister.registerutils.get_activations')
+@patch('cloudregister.registerutils.is_registered')
+@patch('cloudregister.registerutils.get_current_smt')
+@patch('cloudguestregistryauth.os.geteuid')
+def test_registry_get_activations(
+ mock_os_geteuid, mock_get_current_smt,
+ mock_is_registered, mock_get_activations,
+ mock_print
+):
+ mock_os_geteuid.return_value = 0
+ mock_is_registered.return_value = True
+ smt_data_ipv46 = dedent('''\
+ ''')
+ smt_server = SMT(etree.fromstring(smt_data_ipv46))
+ mock_get_current_smt.return_value = smt_server
+ mock_get_activations.return_value = {'foo': 'bar'}
+
+ cloudguestregistryauth.main()
+ mock_print.assert_called_once_with('Credentials refreshed')
diff --git a/tests/test_ec2plugin.py b/tests/test_ec2plugin.py
index 176943e..19f1c0f 100644
--- a/tests/test_ec2plugin.py
+++ b/tests/test_ec2plugin.py
@@ -16,7 +16,7 @@
import requests
import sys
-from mock import patch
+from mock import patch, call
test_path = os.path.abspath(
os.path.dirname(inspect.getfile(inspect.currentframe())))
@@ -24,7 +24,7 @@
sys.path.insert(0, code_path)
-import amazonec2 as ec2
+import amazonec2 as ec2 # noqa
# ----------------------------------------------------------------------------
@@ -42,16 +42,23 @@ def test_request_fail(mock_logging, mock_request_get, mock_request_put):
mock_request_put.side_effect = requests.exceptions.RequestException
result = ec2.generateRegionSrvArgs()
assert result is None
- assert mock_logging.info.called_with(
- 'Unable to retrieve IMDSv2 token using 169.254.169.254'
- )
- assert mock_logging.info.called_with(
- 'Unable to retrieve IMDSv2 token using fd00:ec2::254'
- )
- assert mock_logging.warning.called_with('Falling back to IMDSv1')
- assert mock_logging.warning.called_with(
- 'Unable to determine instance placement from "fd00:ec2::254"'
- )
+ assert mock_logging.info.call_args_list == [
+ call('Unable to retrieve IMDSv2 token using 169.254.169.254'),
+ call('Unable to retrieve IMDSv2 token using fd00:ec2::254')
+ ]
+ expected_urls = [
+ 'http://169.254.169.254/latest/meta-data/placement/availability-zone',
+ 'http://[fd00:ec2::254]/latest/meta-data/placement/availability-zone'
+ ]
+ assert mock_logging.warning.call_args_list == [
+ call('Falling back to IMDSv1'),
+ call('Unable to determine instance placement from "{}"'.format(
+ expected_urls[0]
+ )),
+ call('Unable to determine instance placement from "{}"'.format(
+ expected_urls[1]
+ ))
+ ]
# ----------------------------------------------------------------------------
@@ -62,12 +69,27 @@ def test_request_fail_response_error(
mock_logging, mock_request_get, mock_request_put
):
"""Test unexpected return value"""
- mock_request_get.return_value = _get_error_response()
+ # make sure loop has two IP addresses
+ mock_request_put.side_effect = [
+ _get_error_response(),
+ _get_error_response()
+ ]
+ mock_request_get.side_effect = [
+ _get_error_response(),
+ _get_error_response()
+ ]
result = ec2.generateRegionSrvArgs()
assert result is None
assert mock_logging.warning.called
- msg = '\tMessage: Test server failure'
- mock_logging.warning.assert_called_with(msg)
+ assert mock_logging.warning.call_args_list == [
+ call('Falling back to IMDSv1'),
+ call('Unable to get availability zone metadata'),
+ call('\tReturn code: 500'),
+ call('\tMessage: Test server failure'),
+ call('Unable to get availability zone metadata'),
+ call('\tReturn code: 500'),
+ call('\tMessage: Test server failure')
+ ]
# ----------------------------------------------------------------------------
diff --git a/tests/test_gceplugin.py b/tests/test_gceplugin.py
index 9f0e034..c340248 100644
--- a/tests/test_gceplugin.py
+++ b/tests/test_gceplugin.py
@@ -24,7 +24,7 @@
sys.path.insert(0, code_path)
-import googlece as gce
+import googlece as gce # noqa
# ----------------------------------------------------------------------------
diff --git a/tests/test_registerutils.py b/tests/test_registerutils.py
index 3535814..dc623ff 100644
--- a/tests/test_registerutils.py
+++ b/tests/test_registerutils.py
@@ -34,8 +34,8 @@
sys.path.insert(0, code_path)
-import cloudregister.registerutils as utils
-from cloudregister.smt import SMT
+import cloudregister.registerutils as utils # noqa
+from cloudregister.smt import SMT # noqa
CACHE_SERVER_IPS = ['54.197.240.216', '54.225.105.144', '107.22.231.220']
@@ -95,7 +95,7 @@ def test_has_region_changed_no_change(subproc, id_path, plugin, srvargs):
plugin.return_value = True
srvargs.return_value = 'regionHint=us-central1-d'
cfg = get_test_config()
- assert False == utils.has_region_changed(cfg)
+ assert utils.has_region_changed(cfg) is False
@patch('cloudregister.registerutils.__get_system_mfg')
@@ -104,7 +104,7 @@ def test_has_region_changed_no_dmidecode(plugin, mfg):
plugin.return_value = False
mfg.return_value = False
cfg = get_test_config()
- assert False == utils.has_region_changed(cfg)
+ assert utils.has_region_changed(cfg) is False
@patch('cloudregister.registerutils.__get_system_mfg')
@@ -113,7 +113,7 @@ def test_has_region_changed_no_plugin(plugin, mfg):
plugin.return_value = False
mfg.return_value = 'Google'
cfg = get_test_config()
- assert False == utils.has_region_changed(cfg)
+ assert utils.has_region_changed(cfg) is False
@patch('cloudregister.registerutils.__get_region_server_args')
@@ -126,7 +126,7 @@ def test_has_region_changed_provider_change(subproc, id_path, plugin, srvargs):
id_path.return_value = data_path + 'framework_info'
plugin.return_value = True
srvargs.return_value = 'regionHint=us-central1-d'
- assert True == utils.has_region_changed(cfg)
+ assert utils.has_region_changed(cfg) is True
@patch('cloudregister.registerutils.__get_region_server_args')
@@ -141,7 +141,7 @@ def test_has_region_changed_provider_and_region_change(
plugin.return_value = True
srvargs.return_value = 'regionHint=us-east-1'
cfg = get_test_config()
- assert True == utils.has_region_changed(cfg)
+ assert utils.has_region_changed(cfg) is True
@patch('cloudregister.registerutils.__get_region_server_args')
@@ -156,7 +156,7 @@ def test_has_region_changed_region_change(
plugin.return_value = True
srvargs.return_value = 'regionHint=us-east2-f'
cfg = get_test_config()
- assert True == utils.has_region_changed(cfg)
+ assert utils.has_region_changed(cfg) is True
@patch('cloudregister.registerutils.json.loads')
@@ -178,7 +178,7 @@ def test_has_region_changed_provider_and_region_change_exception(
mock_srvargs.return_value = 'regionHint=us-east-1'
mock_json_loads.side_effect = Exception('foo')
cfg = get_test_config()
- assert utils.has_region_changed(cfg) == False
+ assert utils.has_region_changed(cfg) is False
def test_is_registration_supported_SUSE_Family():
@@ -206,7 +206,7 @@ def test_has_rmt_in_hosts_has_ipv4():
with mock.patch('builtins.open', mock.mock_open(read_data=hosts_content)):
has_entry = utils.has_rmt_in_hosts(server)
- assert True == has_entry
+ assert has_entry is True
def test_has_rmt_in_hosts_has_ipv4_6():
@@ -221,7 +221,7 @@ def test_has_rmt_in_hosts_has_ipv4_6():
with mock.patch('builtins.open', mock.mock_open(read_data=hosts_content)):
has_entry = utils.has_rmt_in_hosts(server)
- assert True == has_entry
+ assert has_entry is True
def test_has_rmt_in_hosts_ipv4_not_found():
@@ -234,7 +234,7 @@ def test_has_rmt_in_hosts_ipv4_not_found():
with mock.patch('builtins.open', mock.mock_open(read_data=hosts_content)):
has_entry = utils.has_rmt_in_hosts(server)
- assert False == has_entry
+ assert has_entry is False
def test_has_rmt_in_hosts_has_ipv6():
@@ -247,7 +247,7 @@ def test_has_rmt_in_hosts_has_ipv6():
with mock.patch('builtins.open', mock.mock_open(read_data=hosts_content)):
has_entry = utils.has_rmt_in_hosts(server)
- assert True == has_entry
+ assert has_entry is True
def test_has_rmt_in_hosts_has_ipv6_4():
@@ -262,7 +262,7 @@ def test_has_rmt_in_hosts_has_ipv6_4():
with mock.patch('builtins.open', mock.mock_open(read_data=hosts_content)):
has_entry = utils.has_rmt_in_hosts(server)
- assert True == has_entry
+ assert has_entry is True
def test_has_rmt_in_hosts_ipv6_not_found():
@@ -275,7 +275,7 @@ def test_has_rmt_in_hosts_ipv6_not_found():
with mock.patch('builtins.open', mock.mock_open(read_data=hosts_content)):
has_entry = utils.has_rmt_in_hosts(server)
- assert False == has_entry
+ assert has_entry is False
def test_clean_host_file_no_empty_bottom_lines():
@@ -505,13 +505,6 @@ def test_clean_host_file_no_empty_bottom_lines_smt_entry_is_last():
def test_clean_host_file_raised_exception():
hosts_content = ""
- expected_cleaned_hosts = """
-# simulates hosts file containing the ipv6 we are looking for in the test
-
-1.2.3.4 smt-foo.susecloud.net smt-foo
-
-
-4.3.2.1 another_entry.whatever.com another_entry"""
with mock.patch('builtins.open', mock.mock_open(read_data=hosts_content.encode())) as m: # noqa: E501
utils.clean_hosts_file('smt-entry', ''.encode())
@@ -540,13 +533,14 @@ def test_add_hosts_entry(mock_has_ipv6_access):
'\n# Added by SMT registration do not remove, '
'retain comment as well\n'
)
- file_content_entry = '{ip}\t{fqdn}\t{name}\n{ip_reg}\t{reg_name}\n'.format(
- ip=smt_server.get_ipv6(),
- fqdn=smt_server.get_FQDN(),
- name=smt_server.get_name(),
- ip_reg=smt_server.get_ipv6(),
- reg_name=smt_server.get_registry_FQDN()
- )
+ file_content_entry = '{ip}\t{fqdn}\t{name}\n{ip_reg}\t{reg_name}\n' \
+ .format(
+ ip=smt_server.get_ipv6(),
+ fqdn=smt_server.get_FQDN(),
+ name=smt_server.get_name(),
+ ip_reg=smt_server.get_ipv6(),
+ reg_name=smt_server.get_registry_FQDN()
+ )
assert file_handle.write.mock_calls == [
call(file_content_comment),
call(file_content_entry),
@@ -617,14 +611,14 @@ def test_clear_rmt_as_scc_proxy_flag(mock_os_unlink):
@patch('cloudregister.registerutils.get_credentials')
def test_credentials_files_are_equal(mock_get_credentials):
mock_get_credentials.side_effect = [('SCC_foo', 'bar'), ('SCC_foo', 'bar')]
- assert utils.credentials_files_are_equal('foo') == True
+ assert utils.credentials_files_are_equal('foo') is True
assert mock_get_credentials.mock_calls == [
call('/etc/zypp/credentials.d/SCCcredentials'),
call('/etc/zypp/credentials.d/foo')
]
mock_get_credentials.side_effect = [('SCC_bar', 'bar'), ('SCC_foo', 'bar')]
- assert utils.credentials_files_are_equal('foo') == False
+ assert utils.credentials_files_are_equal('foo') is False
def test_credentials_files_are_equal_no_credentials():
@@ -676,7 +670,7 @@ def test_fetch_smt_data_not_200_exception(
response = Response()
response.status_code = 422
mock_request_get.return_value = response
- with raises(SystemExit) as pytest_wrapped_e:
+ with raises(SystemExit):
utils.fetch_smt_data(cfg, None)
assert mock_logging.error.call_args_list == [
call('===================='),
@@ -696,7 +690,7 @@ def test_fetch_smt_data_no_response_text(
response.status_code = 200
response.text = "{}"
mock_request_get.return_value = response
- with raises(SystemExit) as pytest_wrapped_e:
+ with raises(SystemExit):
utils.fetch_smt_data(cfg, None)
assert mock_logging.error.call_args_list == [
call('Metadata server did not supply a value for "fingerprint"'),
@@ -730,35 +724,35 @@ def test_fetch_smt_data_metadata_server(
@patch('cloudregister.registerutils.time.sleep')
@patch('cloudregister.registerutils.logging')
-def test_fetch_smt_data_api_no_answered(
+def test_fetch_smt_data_api_no_answer(
mock_logging,
mock_time_sleep
):
cfg = get_test_config()
del cfg['server']['metadata_server']
cfg.set('server', 'regionsrv', '1.1.1.1')
- with raises(SystemExit) as pytest_wrapped_e:
- fetched_smt_data = utils.fetch_smt_data(cfg, None)
+ with raises(SystemExit):
+ utils.fetch_smt_data(cfg, None)
assert mock_logging.info.call_args_list == [
call('Using API: regionInfo'),
call('Getting update server information, attempt 1'),
call('\tUsing region server: 1.1.1.1'),
call(
- '\tNo cert found: /var/lib/regionService/certs/1.1.1.1.pem '
+ '\tNo cert found: /usr/lib/regionService/certs/1.1.1.1.pem '
'skip this server'
),
call('Waiting 20 seconds before next attempt'),
call('Getting update server information, attempt 2'),
call('\tUsing region server: 1.1.1.1'),
call(
- '\tNo cert found: /var/lib/regionService/certs/1.1.1.1.pem '
+ '\tNo cert found: /usr/lib/regionService/certs/1.1.1.1.pem '
'skip this server'
),
call('Waiting 10 seconds before next attempt'),
call('Getting update server information, attempt 3'),
call('\tUsing region server: 1.1.1.1'),
call(
- '\tNo cert found: /var/lib/regionService/certs/1.1.1.1.pem '
+ '\tNo cert found: /usr/lib/regionService/certs/1.1.1.1.pem '
'skip this server'
)
]
@@ -838,8 +832,8 @@ def test_fetch_smt_data_api_no_valid_ip(
response2.text = smt_xml
mock_request_get.side_effect = [response2, response2]
mock_ipaddress_ip_address.side_effect = ValueError('foo')
- fetched_smt_data = utils.fetch_smt_data(cfg, None)
- assert etree.tostring(fetched_smt_data, encoding='utf-8') == smt_xml.encode()
+ smt_data = utils.fetch_smt_data(cfg, None)
+ assert etree.tostring(smt_data, encoding='utf-8') == smt_xml.encode()
@patch('cloudregister.registerutils.requests.get')
@@ -860,8 +854,8 @@ def test_fetch_smt_data_api_error_response(
response.status_code = 422
response.reason = 'well, you shall not pass'
mock_request_get.return_value = response
- with raises(SystemExit) as pytest_wrapped_e:
- fetched_smt_data = utils.fetch_smt_data(cfg, None)
+ with raises(SystemExit):
+ utils.fetch_smt_data(cfg, None)
assert mock_logging.info.call_args_list == [
call('Using API: regionInfo'),
call('Getting update server information, attempt 1'),
@@ -912,8 +906,8 @@ def test_fetch_smt_data_api_exception(
response.status_code = 422
response.reason = 'well, you shall not pass'
mock_request_get.side_effect = requests.exceptions.RequestException('foo')
- with raises(SystemExit) as pytest_wrapped_e:
- fetched_smt_data = utils.fetch_smt_data(cfg, None)
+ with raises(SystemExit):
+ utils.fetch_smt_data(cfg, None)
assert mock_logging.info.call_args_list == [
call('Using API: regionInfo'),
call('Getting update server information, attempt 1'),
@@ -958,8 +952,8 @@ def test_fetch_smt_data_api_exception_quiet(
response.status_code = 422
response.reason = 'well, you shall not pass'
mock_request_get.side_effect = requests.exceptions.RequestException('foo')
- with raises(SystemExit) as pytest_wrapped_e:
- fetched_smt_data = utils.fetch_smt_data(cfg, 'foo', quiet=True)
+ with raises(SystemExit):
+ utils.fetch_smt_data(cfg, 'foo', quiet=True)
assert mock_logging.info.call_args_list == [
call('Using API: regionInfo'),
call('Waiting 20 seconds before next attempt'),
@@ -986,6 +980,7 @@ def test_find_equivalent_smt_server(mock_is_responsive):
SMTserverIP="192.168.2.1"
SMTserverIPv6="fc00::2"
SMTserverName="fantasy.example.net"
+
SMTregistryName="registry-fantasy.example.net"
region="antarctica-1"/>''')
smt_a = SMT(etree.fromstring(smt_data_ipv46))
@@ -993,7 +988,8 @@ def test_find_equivalent_smt_server(mock_is_responsive):
mock_is_responsive.return_value = True
assert utils.find_equivalent_smt_server(smt_a, [smt_a, smt_b]) == smt_b
- assert utils.find_equivalent_smt_server(smt_a, [smt_a]) == None
+ assert utils.find_equivalent_smt_server(smt_a, [smt_a]) is None
+ assert utils.find_equivalent_smt_server(smt_a, [smt_a]) is None
@patch('cloudregister.registerutils.glob.glob')
@@ -1157,8 +1153,6 @@ def test_get_credentials_file_no_file(mock_logging, mock_glob):
SMTregistryName="registry-fantasy.example.com"
region="antarctica-1"/>''')
smt_server = SMT(etree.fromstring(smt_data_ipv46))
-
-
utils.get_credentials_file(smt_server, 'bar')
assert mock_logging.info.mock_calls == [
call('No credentials entry for "*bar*"'),
@@ -1183,19 +1177,17 @@ def test_get_credentials_two_files(mock_logging, mock_glob):
SMTregistryName="registry-fantasy.example.com"
region="antarctica-1"/>''')
smt_server = SMT(etree.fromstring(smt_data_ipv46))
-
-
assert utils.get_credentials_file(smt_server) == 'foo'
assert mock_logging.warning.mock_calls == [
call('Found multiple credentials for "None" entry and '
- 'hoping for the best')
+ 'hoping for the best')
] # TODO: check this warning
@patch('cloudregister.registerutils.get_smt_from_store')
def test_get_current_smt_no_smt(mock_get_smt_from_store):
mock_get_smt_from_store.return_value = None
- assert utils.get_current_smt() == None
+ assert utils.get_current_smt() is None
@patch('cloudregister.registerutils.os.unlink')
@@ -1232,7 +1224,7 @@ def test_get_current_smt_no_registered(mock_get_smt_from_store):
with mock.patch('builtins.open', mock.mock_open(
read_data=hosts_content.encode()
)):
- assert utils.get_current_smt() == None
+ assert utils.get_current_smt() is None
@patch('cloudregister.registerutils.is_registered')
@@ -1286,8 +1278,8 @@ def test_get_instance_data_provider_option_none():
cfg.set('instance', 'dataProvider', 'none')
expected_data = 'plugin:susecloud\n'
assert utils.get_instance_data(cfg) == expected_data
-
-
+
+
@patch('cloudregister.registerutils.logging')
def test_get_instance_data_cmd_not_found(mock_logging):
cfg = get_test_config()
@@ -1516,10 +1508,10 @@ def test_get_installed_products_no_link(
@patch('cloudregister.registerutils.glob.glob')
def test_get_repo_url(mock_glob):
mock_glob.return_value = ['tests/data/repo_foo.repo']
- assert utils.get_repo_url('SLE-Module-Live-Foo15-SP5-Source-Pool') == \
- 'plugin:/susecloud?credentials=SUSE_Linux_Enterprise_Live_Foo_x86_64&' \
- 'path=/repo/SUSE/Products/SLE-Module-Live-Foo/15-SP5/x86_64/' \
- 'product_source/'
+ assert utils.get_repo_url('SLE-Module-Live-Foo15-SP5-Source-Pool') == (
+ 'plugin:/susecloud?credentials=SUSE_Linux_Enterprise_Live_Foo_x86_64&'
+ 'path=/repo/SUSE/Products/SLE-Module-Live-Foo/15-SP5/x86_64/'
+ 'product_source/')
@patch('cloudregister.registerutils.glob.glob')
@@ -1700,9 +1692,9 @@ def test_get_smt_equivalent_smt_no_access(
call('Using equivalent update server: "(\'42.168.1.1\', \'fc00::7\')"')
]
mock_logging.error.assert_called_once_with(
- "Sibling update server, ('42.168.1.1', 'fc00::7'), does not have system credentials "
- "cannot failover. Retaining current, ('192.168.1.1', 'fc00::1'), target update server."
- 'Try again later.'
+ "Sibling update server, ('42.168.1.1', 'fc00::7'), does not have "
+ 'system credentials cannot failover. Retaining current, '
+ "('192.168.1.1', 'fc00::1'), target update server.Try again later."
)
@@ -1774,7 +1766,7 @@ def test_get_smt_refresh_cache(
@patch('cloudregister.registerutils.os.path.exists')
def test_get_smt_from_store_non_existing_path(mock_os_path_exists):
mock_os_path_exists.return_value = False
- assert utils.get_smt_from_store('foo') == None
+ assert utils.get_smt_from_store('foo') is None
@patch.object(pickle, 'Unpickler')
@@ -1782,7 +1774,9 @@ def test_get_smt_from_store_raise_exception(mock_unpickler):
unpick = Mock()
mock_unpickler.return_value = unpick
unpick.load.side_effect = pickle.UnpicklingError
- assert utils.get_smt_from_store('tests/data/availableSMTInfo_1.obj') == None
+ assert utils.get_smt_from_store(
+ 'tests/data/availableSMTInfo_1.obj'
+ ) is None
@patch('cloudregister.registerutils.get_available_smt_servers')
@@ -1837,7 +1831,7 @@ def test_has_ipv6_access_no_ipv6_defined():
SMTregistryName="registry-foo.susecloud.net"
region="antarctica-1"/>''')
smt_server = SMT(etree.fromstring(smt_data_ipv4))
- assert utils.has_ipv6_access(smt_server) == False
+ assert utils.has_ipv6_access(smt_server) is False
@patch('cloudregister.registerutils.get_config')
@@ -1883,7 +1877,7 @@ def test_has_ipv6_access_exception(
smt_server = SMT(etree.fromstring(smt_data_ipv46))
mock_request.side_effect = Exception("Server's too far, cant be reached")
mock_https_only.return_value = True
- assert utils.has_ipv6_access(smt_server) == False
+ assert utils.has_ipv6_access(smt_server) is False
mock_request.assert_called_once_with(
'https://[fc00::1]/smt.crt',
timeout=3,
@@ -1894,14 +1888,14 @@ def test_has_ipv6_access_exception(
@patch('cloudregister.registerutils.exec_subprocess')
def test_has_nvidia_support(mock_subprocess):
mock_subprocess.return_value = b'NVIDIA', 'bar'
- assert utils.has_nvidia_support() == True
+ assert utils.has_nvidia_support() is True
@patch('cloudregister.registerutils.logging')
@patch('cloudregister.registerutils.exec_subprocess')
def test_has_nvidia_support_exception(mock_subprocess, mock_logging):
mock_subprocess.side_effect = TypeError('foo')
- assert utils.has_nvidia_support() == False
+ assert utils.has_nvidia_support() is False
mock_logging.info.assert_called_once_with(
'lspci command not found, instance Nvidia support cannot be determined'
)
@@ -1910,13 +1904,13 @@ def test_has_nvidia_support_exception(mock_subprocess, mock_logging):
@patch('cloudregister.registerutils.exec_subprocess')
def test_has_nvidia_no_support(mock_subprocess):
mock_subprocess.return_value = b'foo', 'bar'
- assert utils.has_nvidia_support() == False
+ assert utils.has_nvidia_support() is False
@patch('cloudregister.registerutils.__get_service_plugins')
def test_has_services_service_plugin(mock_get_service_plugins):
mock_get_service_plugins.return_value = 'foo'
- assert utils.has_services('foo') == True
+ assert utils.has_services('foo') is True
@patch('cloudregister.registerutils.glob.glob')
@@ -1924,7 +1918,7 @@ def test_has_services_service(mock_get_service_plugins):
mock_get_service_plugins.return_value = ['foo']
content = 'url=plugin:susecloud'
with mock.patch('builtins.open', mock.mock_open(read_data=content)):
- assert utils.has_services('foo') == True
+ assert utils.has_services('foo') is True
@patch('cloudregister.registerutils.requests.post')
@@ -1933,7 +1927,7 @@ def test_has_smt_access_unauthorized(mock_http_basic_auth, mock_post):
response = Response()
response.reason = 'Unauthorized'
mock_post.return_value = response
- assert utils.has_smt_access('foo', 'bar', 'foobar') == False
+ assert utils.has_smt_access('foo', 'bar', 'foobar') is False
@patch('cloudregister.registerutils.requests.post')
@@ -1942,19 +1936,19 @@ def test_has_smt_access_authorized(mock_http_basic_auth, mock_post):
response = Response()
response.reason = 'Super_Authorized'
mock_post.return_value = response
- assert utils.has_smt_access('foo', 'bar', 'foobar') == True
+ assert utils.has_smt_access('foo', 'bar', 'foobar') is True
def test_https_only():
cfg = get_test_config()
cfg.add_section('instance')
cfg.set('instance', 'httpsOnly', 'true')
- assert utils.https_only(cfg) == True
+ assert utils.https_only(cfg) is True
def test_https_only_no():
cfg = get_test_config()
- assert utils.https_only(cfg) == False
+ assert utils.https_only(cfg) is False
@patch.object(SMT, 'write_cert')
@@ -2016,7 +2010,7 @@ def test_import_smtcert_12(
@patch('cloudregister.registerutils.import_smtcert_12')
def test_import_smt_cert_fail(mock_import_smtcert_12, mockin_logging):
mock_import_smtcert_12.return_value = False
- assert utils.import_smt_cert('foo') == None
+ assert utils.import_smt_cert('foo') is None
mockin_logging.error.assert_called_once_with(
'SMT certificate import failed'
)
@@ -2029,7 +2023,7 @@ def test_import_smt_cert_fail(mock_import_smtcert_12, mockin_logging):
def test_import_smt_cert_cert_middling(
mock_import_smtcert_12,
mockin_logging,
- mockin_getsitepackages,
+ mockin_getsitepackages,
mockin_glob
):
mock_import_smtcert_12.return_value = True
@@ -2042,33 +2036,39 @@ def test_import_smt_cert_cert_middling(
)
-def test_is_new_registration_not_new():
- assert utils.is_new_registration() == False
+@patch('cloudregister.registerutils.get_state_dir')
+def test_is_new_registration_not_new(mock_state_dir):
+ mock_state_dir.return_value = data_path
+ assert utils.is_new_registration() is False
def test_is_registration_supported_exception():
cfg_template = get_test_config()
del cfg_template['server']
- assert utils.is_registration_supported(cfg_template) == False
+ assert utils.is_registration_supported(cfg_template) is False
def test_is_registration_supported():
cfg_template = get_test_config()
- assert utils.is_registration_supported(cfg_template) == True
+ assert utils.is_registration_supported(cfg_template) is True
@patch('cloudregister.registerutils.glob.glob')
def test_is_scc_connected(mock_glob):
mock_glob.return_value = ['tests/data/scc_repo.repo']
- assert utils.is_scc_connected() == True
+ assert utils.is_scc_connected() is True
-def test_is_scc_not_connected():
- assert utils.is_scc_connected() == False
+@patch('cloudregister.registerutils.glob.glob')
+def test_is_scc_not_connected(mock_glob):
+ mock_glob.return_value = []
+ assert utils.is_scc_connected() is False
-def test_is_zypper_running_not():
- assert utils.is_zypper_running() == False
+@patch('cloudregister.registerutils.get_zypper_pid')
+def test_is_zypper_running_not(mock_get_zypper_pid):
+ mock_get_zypper_pid.return_value = ''
+ assert utils.is_zypper_running() is False
@patch('cloudregister.registerutils.get_zypper_pid')
@@ -2076,6 +2076,7 @@ def test_is_zypper_running(mock_get_zypper_pid):
mock_get_zypper_pid.return_value = 42
assert utils.is_zypper_running()
+
@patch('cloudregister.registerutils.get_state_dir')
def test_refresh_zypper_pid_cache(mock_get_state_dir):
with tempfile.TemporaryDirectory() as tmpdirname:
@@ -2099,10 +2100,14 @@ def test_set_as_current_smt(mock_get_state_dir):
utils.set_as_current_smt(smt_server)
-@patch.dict(os.environ, {'http_proxy': 'foo', 'https_proxy': 'bar'}, clear=True)
+@patch.dict(
+ os.environ,
+ {'http_proxy': 'foo', 'https_proxy': 'bar'},
+ clear=True
+)
@patch('cloudregister.registerutils.logging')
def test_set_proxy_proxy_set_on_os_env(mock_logging):
- assert utils.set_proxy() == False
+ assert utils.set_proxy() is False
assert mock_logging.info.call_args_list == [
call('Using proxy settings from execution environment'),
call('\thttp_proxy: foo'),
@@ -2113,7 +2118,7 @@ def test_set_proxy_proxy_set_on_os_env(mock_logging):
@patch('cloudregister.registerutils.os.path.exists')
def test_set_proxy_proxy_set_on_directory(mock_os_path_exists):
mock_os_path_exists.return_value = False
- assert utils.set_proxy() == False
+ assert utils.set_proxy() is False
@patch('cloudregister.registerutils.os.path.exists')
@@ -2125,7 +2130,7 @@ def test_set_proxy(mock_os_path_exists):
NO_PROXY="localhost, 127.0.0.1"
"""
with mock.patch('builtins.open', mock.mock_open(read_data=proxy_content)):
- assert utils.set_proxy() == True
+ assert utils.set_proxy() is True
@patch.dict(os.environ, {'http_proxy': '', 'https_proxy': ''}, clear=True)
@@ -2136,7 +2141,7 @@ def test_proxy_not_enable(mock_os_path_exists):
PROXY_ENABLED="no"
"""
with mock.patch('builtins.open', mock.mock_open(read_data=proxy_content)):
- assert utils.set_proxy() == False
+ assert utils.set_proxy() is False
@patch('cloudregister.registerutils.Path')
@@ -2159,7 +2164,7 @@ def test_rmt_as_scc_proxy_flag(mock_path):
@patch('cloudregister.registerutils.get_available_smt_servers')
def test_switch_services_to_plugin_no_servers(mock_get_available_smt_servers):
mock_get_available_smt_servers.return_value = []
- assert utils.switch_services_to_plugin() == None
+ assert utils.switch_services_to_plugin() is None
@patch('cloudregister.registerutils.logging')
@@ -2234,7 +2239,7 @@ def test_remove_registration_data_no_user(
mock_logging
):
mock_get_creds.return_value = None, None
- assert utils.remove_registration_data() == None
+ assert utils.remove_registration_data() is None
mock_logging.info.assert_called_once_with(
'No credentials, nothing to do server side'
)
@@ -2255,7 +2260,7 @@ def test_remove_registration_data_no_registration(
mock_get_creds.return_value = 'foo', 'bar'
mock_is_scc_connected.return_value = False
mock_os_path_exists.return_value = False
- assert utils.remove_registration_data() == None
+ assert utils.remove_registration_data() is None
mock_logging.info.assert_called_once_with(
'No current registration server set.'
)
@@ -2301,7 +2306,7 @@ def test_remove_registration_data(
response.status_code = 204
mock_request_delete.return_value = response
mock_is_scc_connected.return_value = True
- assert utils.remove_registration_data() == None
+ assert utils.remove_registration_data() is None
assert mock_logging.info.call_args_list == [
call("Clean current registration server: ('192.168.1.1', 'fc00::1')"),
call('System successfully removed from update infrastructure'),
@@ -2350,7 +2355,7 @@ def test_remove_registration_data_request_not_OK(
response.status_code = 504
mock_request_delete.return_value = response
mock_is_scc_connected.return_value = True
- assert utils.remove_registration_data() == None
+ assert utils.remove_registration_data() is None
assert mock_logging.info.call_args_list == [
call("Clean current registration server: ('192.168.1.1', 'fc00::1')"),
call(
@@ -2358,10 +2363,11 @@ def test_remove_registration_data_request_not_OK(
'continue with local changes'
),
call('Removing system from SCC'),
- call('System not found in SCC. The system may still be tracked '
- 'against your subscription. It is recommended to investigate '
- 'the issue. System user name: "foo". '
- 'Local registration artifacts removed.'
+ call(
+ 'System not found in SCC. The system may still be tracked '
+ 'against your subscription. It is recommended to investigate '
+ 'the issue. System user name: "foo". '
+ 'Local registration artifacts removed.'
)
]
@@ -2407,8 +2413,7 @@ def test_remove_registration_data_request_exception(
exception = requests.exceptions.RequestException('foo')
mock_request_delete.side_effect = exception
mock_is_scc_connected.return_value = True
- assert utils.remove_registration_data() == None
-
+ assert utils.remove_registration_data() is None
assert mock_logging.warning.call_args_list == [
call('Unable to remove client registration from server'),
call(exception),
@@ -2507,23 +2512,24 @@ def test_switch_smt_repos(mock_get_current_smt, mock_glob):
with open('tests/data/repo_foo.repo') as f:
file_azo = ' '.join(f.readlines())
open_mock = mock.mock_open(read_data=file_azo)
+
def open_f(filename, *args, **kwargs):
return open_mock()
with patch('builtins.open', create=True) as mock_open:
- mock_open.side_effect = open_f
- utils.switch_smt_repos(new_smt_server)
- assert mock_open.call_args_list == [
- call('tests/data/repo_foo.repo', 'r'),
- call('tests/data/repo_foo.repo', 'w')
- ]
- expected_content = file_azo.replace(
- 'plugin:/susecloud',
- new_smt_server.get_FQDN()
- )
- mock_open(
- 'tests/data/repo_foo.repo', 'w'
- ).write.assert_called_once_with(expected_content)
+ mock_open.side_effect = open_f
+ utils.switch_smt_repos(new_smt_server)
+ assert mock_open.call_args_list == [
+ call('tests/data/repo_foo.repo', 'r'),
+ call('tests/data/repo_foo.repo', 'w')
+ ]
+ expected_content = file_azo.replace(
+ 'plugin:/susecloud',
+ new_smt_server.get_FQDN()
+ )
+ mock_open(
+ 'tests/data/repo_foo.repo', 'w'
+ ).write.assert_called_once_with(expected_content)
@patch('cloudregister.registerutils.glob.glob')
@@ -2551,23 +2557,24 @@ def test_switch_smt_service(mock_get_current_smt, mock_glob):
with open('tests/data/repo_foo.repo') as f:
file_azo = ' '.join(f.readlines())
open_mock = mock.mock_open(read_data=file_azo)
+
def open_f(filename, *args, **kwargs):
return open_mock()
with patch('builtins.open', create=True) as mock_open:
- mock_open.side_effect = open_f
- utils.switch_smt_service(new_smt_server)
- assert mock_open.call_args_list == [
- call('tests/data/service.service', 'r'),
- call('tests/data/service.service', 'w')
- ]
- expected_content = file_azo.replace(
- 'plugin:/susecloud',
- new_smt_server.get_FQDN()
- )
- mock_open(
- 'tests/data/repo_foo.repo', 'w'
- ).write.assert_called_once_with(expected_content)
+ mock_open.side_effect = open_f
+ utils.switch_smt_service(new_smt_server)
+ assert mock_open.call_args_list == [
+ call('tests/data/service.service', 'r'),
+ call('tests/data/service.service', 'w')
+ ]
+ expected_content = file_azo.replace(
+ 'plugin:/susecloud',
+ new_smt_server.get_FQDN()
+ )
+ mock_open(
+ 'tests/data/repo_foo.repo', 'w'
+ ).write.assert_called_once_with(expected_content)
@patch('cloudregister.registerutils.logging')
@@ -2591,7 +2598,7 @@ def test_update_ca_chain_failed(mock_exec_subprocess):
@patch('cloudregister.registerutils.is_new_registration')
def test_update_rmt_cert_new_registration(mock_is_new_registration):
mock_is_new_registration.return_value = True
- assert utils.update_rmt_cert('foo') == None
+ assert utils.update_rmt_cert('foo') is None
@patch('cloudregister.registerutils.logging')
@@ -2630,7 +2637,7 @@ def test_update_rmt_cert_no_cert_change(
mock_is_new_registration.return_value = False
mock_set_proxy.return_value = True
mock_fetch_smt_data.return_value = region_smt_data
- assert utils.update_rmt_cert(smt_server) == False
+ assert utils.update_rmt_cert(smt_server) is False
assert mock_logging.info.call_args_list == [
call('Check for cert update'),
call('No cert change')
@@ -2673,7 +2680,7 @@ def test_update_rmt_cert(
mock_is_new_registration.return_value = False
mock_set_proxy.return_value = True
mock_fetch_smt_data.return_value = region_smt_data
- assert utils.update_rmt_cert(smt_server) == True
+ assert utils.update_rmt_cert(smt_server) is True
assert mock_logging.info.call_args_list == [
call('Check for cert update'),
call('Update server cert updated')
@@ -2681,7 +2688,7 @@ def test_update_rmt_cert(
def test_uses_rmt_as_scc_proxy():
- assert utils.uses_rmt_as_scc_proxy() == False
+ assert utils.uses_rmt_as_scc_proxy() is False
@patch('cloudregister.registerutils.json.dumps')
@@ -2707,7 +2714,7 @@ def test_write_framework_identifier(
mock_get_framework_identifier_path.return_value = os.path.join(
tmpdirname, 'foo'
)
- with patch('builtins.open', create=True) as mock_framework_file:
+ with patch('builtins.open', create=True):
utils.write_framework_identifier('foo')
# TODO: fix/check framework unknown + plugin OK valid combination
mock_json_dumps.assert_called_once_with(
@@ -2742,7 +2749,7 @@ def test_write_framework_identifier_no_region(
mock_get_framework_identifier_path.return_value = os.path.join(
tmpdirname, 'foo'
)
- with patch('builtins.open', create=True) as mock_framework_file:
+ with patch('builtins.open', create=True):
utils.write_framework_identifier('foo')
# TODO: fix/check framework unknown + plugin OK valid combination
mock_json_dumps.assert_called_once_with(
@@ -2783,7 +2790,7 @@ def test_get_framework_plugin_no_existing(mock_logging):
cfg = get_test_config()
cfg.add_section('instance')
cfg.set('instance', 'instanceArgs', 'foo')
- assert utils.__get_framework_plugin(cfg) == None
+ assert utils.__get_framework_plugin(cfg) is None
mock_logging.warning.assert_called_once_with(
'Configured instanceArgs module could not be loaded. '
'Continuing without additional arguments.'
@@ -2817,7 +2824,6 @@ def test_get_referenced_credentials_not_found(mock_glob, mock_get_config):
assert utils.__get_referenced_credentials('foo') == []
-
@patch('cloudregister.registerutils.logging')
def test_get_region_server_args_exception(
mock_logging
@@ -2838,7 +2844,7 @@ def test_get_region_server_args_not_region_srv_args(
):
mock_amazon_generate_region_args.return_value = None
mod = __import__('cloudregister.amazonec2', fromlist=[''])
- assert utils.__get_region_server_args(mod) == None
+ assert utils.__get_region_server_args(mod) is None
mock_logging.assert_not_called
@@ -2860,7 +2866,7 @@ def test_get_system_mfg(mock_exec_subprocess):
@patch('cloudregister.registerutils.glob.glob')
def test_has_credentials_in_system(mock_glob, mock_get_referenced_creds):
mock_glob.return_value = ['/etc/zypp/credentials.d/SCCcredentials']
- assert utils.__has_credentials('foo') == True
+ assert utils.__has_credentials('foo') is True
@patch('cloudregister.registerutils.__get_referenced_credentials')
@@ -2868,22 +2874,14 @@ def test_has_credentials_in_system(mock_glob, mock_get_referenced_creds):
def test_has_credentials_in_service(mock_glob, mock_get_referenced_creds):
mock_glob.return_value = ['/etc/zypp/credentials.d/service']
mock_get_referenced_creds.return_value = ['service']
- assert utils.__has_credentials('foo') == True
-
-
-@patch('cloudregister.registerutils.__get_referenced_credentials')
-@patch('cloudregister.registerutils.glob.glob')
-def test_has_credentials_in_service(mock_glob, mock_get_referenced_creds):
- mock_glob.return_value = ['/etc/zypp/credentials.d/service']
- mock_get_referenced_creds.return_value = ['service']
- assert utils.__has_credentials('foo') == True
+ assert utils.__has_credentials('foo') is True
@patch('cloudregister.registerutils.__get_referenced_credentials')
@patch('cloudregister.registerutils.glob.glob')
def test_has_credentials_not_found(mock_glob, mock_get_referenced_creds):
mock_glob.return_value = []
- assert utils.__has_credentials('foo') == False
+ assert utils.__has_credentials('foo') is False
@patch('cloudregister.registerutils.store_smt_data')
@@ -2961,7 +2959,7 @@ def test_remove_artifacts(
mock_os_unlink
):
mock_os_path_exists.return_value = True
- assert utils.__remove_repo_artifacts('foo') == None
+ assert utils.__remove_repo_artifacts('foo') is None
mock_remove_creds.assert_called_once_with('foo')
mock_remove_repos.assert_called_once_with('foo')
mock_remove_service.assert_called_once_with('foo')
@@ -2981,7 +2979,7 @@ def test_remove_artifacts_no_remove_etc_scccreds(
mock_os_path_exists,
mock_os_unlink
):
- assert utils.__remove_repo_artifacts('foo') == None
+ assert utils.__remove_repo_artifacts('foo') is None
mock_remove_creds.assert_called_once_with('foo')
mock_remove_repos.assert_called_once_with('foo')
mock_remove_service.assert_called_once_with('foo')
@@ -3016,7 +3014,7 @@ def test_remove_repos_removed_nothing(mock_os_unlink, mock_logging, mock_glob):
def test_remove_service_not_plugins(
mock_os_unlink,
mock_logging,
- mock_glob,
+ mock_glob,
mock_get_service_plugin
):
mock_glob.return_value = ['tests/data/service.service']
@@ -3060,6 +3058,7 @@ def test_set_registry_credentials_config_does_not_exist(
with patch('builtins.open', create=True) as mock_open:
mock_open_docker_config = MagicMock(spec=io.IOBase)
+
def open_file(filename, mode):
if mode == 'w':
return mock_open_docker_config.return_value
@@ -3108,6 +3107,7 @@ def test_set_registry_credentials_config_does_exist(
with patch('builtins.open', create=True) as mock_open:
mock_open_docker_config = MagicMock(spec=io.IOBase)
+
def open_file(filename, mode):
return mock_open_docker_config.return_value
@@ -3115,7 +3115,7 @@ def open_file(filename, mode):
file_handle = \
mock_open_docker_config.return_value.__enter__.return_value
file_handle.read.return_value = ''
- mock_json_load.return_value = {"auths":{"127.0.0.1": {"auth": 'foo'}}}
+ mock_json_load.return_value = {"auths": {"127.0.0.1": {"auth": 'foo'}}}
utils.set_registry_credentials('127.0.0.1', username, password, '')
@@ -3135,6 +3135,7 @@ class Response():
def json(self):
pass
+
def get_servers_data():
"""The XML data matching the data pickled server objects"""
srv_xml = """\n
@@ -3188,11 +3189,13 @@ def get_modified_servers_data():
return etree.fromstring(srv_xml)
+
def get_test_config():
"""Return a config parser object using the minimum configuration in the
tests/data directory"""
return utils.get_config(data_path + '/regionserverclnt.cfg')
+
class MockServer:
def get_ipv4(self):
return '1.1.1.1'
diff --git a/tests/test_smt.py b/tests/test_smt.py
index 11d9c94..fa616b0 100644
--- a/tests/test_smt.py
+++ b/tests/test_smt.py
@@ -28,7 +28,7 @@
sys.path.insert(0, code_path)
-from smt import SMT
+from smt import SMT # noqa
smt_data_ipv4 = dedent('''\