Skip to content

Commit

Permalink
Add edge unit tests for modules (#20)
Browse files Browse the repository at this point in the history
* Add edge unit tests for modules: volumes, users, smb, rsync, ntp, nfs, licenses, ftp, directoryservice, config, cache, aio, afp

* Move _assert_equal_objects to base class
  • Loading branch information
saimonation committed Mar 13, 2020
1 parent 5e98191 commit 0dc883a
Show file tree
Hide file tree
Showing 16 changed files with 600 additions and 4 deletions.
4 changes: 4 additions & 0 deletions tests/ut/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,7 @@ def patch_property(self, module_path, **patch_kwargs):
"""Mock patch a given path as a property and schedule proper mock cleanup."""
patch_kwargs.update({'new_callable': unittest.mock.PropertyMock})
return self.patch_call(module_path, **patch_kwargs)

def _assert_equal_objects(self, expected_param, actual_param):
for field in [a for a in dir(actual_param) if not a.startswith('__')]:
self.assertEqual(getattr(actual_param, field), getattr(expected_param, field))
20 changes: 20 additions & 0 deletions tests/ut/base_edge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import unittest.mock as mock

from cterasdk.object import Gateway
from tests.ut import base


class BaseEdgeTest(base.BaseTest):

def setUp(self):
super().setUp()
self._filer = Gateway("")

def _init_filer(self, get_response=None, put_response=None, post_response=None,
add_response=None, execute_response=None, delete_response=None):
self._filer.get = mock.MagicMock(return_value=get_response)
self._filer.put = mock.MagicMock(return_value=put_response)
self._filer.post = mock.MagicMock(return_value=post_response)
self._filer.add = mock.MagicMock(return_value=add_response)
self._filer.execute = mock.MagicMock(return_value=execute_response)
self._filer.delete = mock.MagicMock(return_value=delete_response)
4 changes: 0 additions & 4 deletions tests/ut/test_core_cloudfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,3 @@ def test_undelete(self):
cloudfs.CloudFS(self._global_admin).undelete(self._name, self._owner)
self._global_admin.get.assert_called_once_with('/users/' + self._owner + '/displayName')
self._global_admin.files.undelete.assert_called_once_with(self._owner + '/' + self._name)

def _assert_equal_objects(self, expected_param, actual_param):
for field in [a for a in dir(actual_param) if not a.startswith('__')]:
self.assertEqual(getattr(actual_param, field), getattr(expected_param, field))
23 changes: 23 additions & 0 deletions tests/ut/test_edge_afp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from cterasdk.edge import afp
from cterasdk.edge.enum import Mode
from tests.ut import base_edge


class TestEdgeAFP(base_edge.BaseEdgeTest):

def test_afp_is_disabled(self):
self._init_filer(get_response=Mode.Disabled)
ret = afp.AFP(self._filer).is_disabled()
self._filer.get.assert_called_once_with('/config/fileservices/afp/mode')
self.assertEqual(ret, True)

def test_afp_is_not_disabled(self):
self._init_filer(get_response=Mode.Enabled)
ret = afp.AFP(self._filer).is_disabled()
self._filer.get.assert_called_once_with('/config/fileservices/afp/mode')
self.assertEqual(ret, False)

def test_disable_afp(self):
self._init_filer()
afp.AFP(self._filer).disable()
self._filer.put.assert_called_once_with('/config/fileservices/afp/mode', Mode.Disabled)
43 changes: 43 additions & 0 deletions tests/ut/test_edge_aio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from unittest import mock

from cterasdk.edge import aio
from cterasdk.common import Object
from tests.ut import base_edge


class TestEdgeAIO(base_edge.BaseEdgeTest):

def setUp(self):
super().setUp()
self._disable_aio = (False, 0, 0)
self._enable_aio = (True, 1, 1)

def test_disable_aio(self):
get_response = self._get_cifs_object(enable_aio=True)
self._init_filer(get_response=get_response)
aio.AIO(self._filer).disable()
self._filer.get.assert_called_once_with('/config/fileservices/cifs')
self._filer.put.assert_called_once_with('/config/fileservices/cifs', mock.ANY)

expected_param = self._get_cifs_object(enable_aio=False)
actual_param = self._filer.put.call_args[0][1]
self._assert_equal_objects(expected_param, actual_param)

def test_enable_aio(self):
get_response = self._get_cifs_object(enable_aio=False)
self._init_filer(get_response=get_response)
aio.AIO(self._filer).enable()
self._filer.get.assert_called_once_with('/config/fileservices/cifs')
self._filer.put.assert_called_once_with('/config/fileservices/cifs', mock.ANY)

expected_param = self._get_cifs_object(enable_aio=True)
actual_param = self._filer.put.call_args[0][1]
self._assert_equal_objects(expected_param, actual_param)

def _get_cifs_object(self, enable_aio):
robust_mutexes, aio_read_threshold, aio_write_threshold = self._enable_aio if enable_aio else self._disable_aio
cifs_param = Object()
cifs_param.robustMutexes = robust_mutexes
cifs_param.aioReadThreshold = aio_read_threshold
cifs_param.aioWriteThreshold = aio_write_threshold
return cifs_param
21 changes: 21 additions & 0 deletions tests/ut/test_edge_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from cterasdk.edge import cache
from cterasdk.edge.enum import OperationMode
from tests.ut import base_edge


class TestEdgeCaching(base_edge.BaseEdgeTest):

def test_enable_caching(self):
self._init_filer()
cache.Cache(self._filer).enable()
self._filer.put.assert_called_once_with('/config/cloudsync/cloudExtender/operationMode', OperationMode.CachingGateway)

def test_disable_caching(self):
self._init_filer()
cache.Cache(self._filer).disable()
self._filer.put.assert_called_once_with('/config/cloudsync/cloudExtender/operationMode', OperationMode.Disabled)

def test_force_eviction(self):
self._init_filer()
cache.Cache(self._filer).force_eviction()
self._filer.execute.assert_called_once_with('/config/cloudsync', 'forceExecuteEvictor', None)
34 changes: 34 additions & 0 deletions tests/ut/test_edge_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from cterasdk.edge import config
from tests.ut import base_edge


class TestEdgeConfig(base_edge.BaseEdgeTest):

def setUp(self):
super().setUp()
self._hostname = 'vGateway-01dc'
self._location = '205 E. 42nd St. New York, NY. 10017'

def test_get_hostname(self):
self._init_filer(get_response=self._hostname)
ret = config.Config(self._filer).get_hostname()
self._filer.get.assert_called_once_with('/config/device/hostname')
self.assertEqual(ret, self._hostname)

def test_set_hostname(self):
self._init_filer(put_response=self._hostname)
ret = config.Config(self._filer).set_hostname(self._hostname)
self._filer.put.assert_called_once_with('/config/device/hostname', self._hostname)
self.assertEqual(ret, self._hostname)

def test_get_location(self):
self._init_filer(get_response=self._location)
ret = config.Config(self._filer).get_location()
self._filer.get.assert_called_once_with('/config/device/location')
self.assertEqual(ret, self._location)

def test_set_location(self):
self._init_filer(put_response=self._location)
ret = config.Config(self._filer).set_location(self._location)
self._filer.put.assert_called_once_with('/config/device/location', self._location)
self.assertEqual(ret, self._location)
133 changes: 133 additions & 0 deletions tests/ut/test_edge_directory_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
from unittest import mock

from cterasdk import exception
from cterasdk.edge import directoryservice
from cterasdk.common import Object
from tests.ut import base_edge


class TestEdgeDirectoryService(base_edge.BaseEdgeTest):

def setUp(self):
super().setUp()
self._domain = "ctera.local"
self._username = 'admin'
self._password = 'password'
self._workgroup = "CTERA"
self._domain_flat_name = "CTERA"
self._mapping_min = 2000000
self._mapping_max = 5000000
self._filer.network.tcp_connect = mock.MagicMock()

def test_connect(self):
get_response = self._get_workgroup_param()
self._init_filer(get_response=get_response)
directoryservice.DirectoryService(self._filer).connect(self._domain, self._username, self._password)
self._filer.network.tcp_connect.assert_called_once_with(address=self._domain, port=389)
self._filer.get.assert_called_once_with('/config/fileservices/cifs')
self._filer.put.assert_called_once_with('/config/fileservices/cifs', mock.ANY)

expected_param = self._get_domain_param()
actual_param = self._filer.put.call_args[0][1]
self._assert_equal_objects(expected_param, actual_param)

self._filer.execute.assert_called_once_with("/status/fileservices/cifs", "joinDomain", mock.ANY)
expected_param = self._get_domain_join_param()
actual_param = self._filer.execute.call_args[0][2]
self._assert_equal_objects(expected_param, actual_param)

def test_connect_with_ou_path(self):
ou_path = "ou=North America,DC=ctera,DC=local"
get_response = self._get_workgroup_param()
self._init_filer(get_response=get_response)
directoryservice.DirectoryService(self._filer).connect(self._domain, self._username, self._password, ou_path)
self._filer.network.tcp_connect.assert_called_once_with(address=self._domain, port=389)
self._filer.get.assert_called_once_with('/config/fileservices/cifs')
self._filer.put.assert_called_once_with('/config/fileservices/cifs', mock.ANY)

expected_param = self._get_domain_param()
actual_param = self._filer.put.call_args[0][1]
self._assert_equal_objects(expected_param, actual_param)

self._filer.execute.assert_called_once_with("/status/fileservices/cifs", "joinDomain", mock.ANY)
expected_param = self._get_domain_join_param(ou_path)
actual_param = self._filer.execute.call_args[0][2]
self._assert_equal_objects(expected_param, actual_param)

def test_connect_raise(self):
get_response = self._get_workgroup_param()
self._init_filer(get_response=get_response)

expected_exception = exception.CTERAException()
self._filer.execute = mock.MagicMock(side_effect=expected_exception)
with self.assertRaises(exception.CTERAException):
directoryservice.DirectoryService(self._filer).connect(self._domain, self._username, self._password)

def test_set_advanced_mapping(self):
get_response = TestEdgeDirectoryService._get_advanced_mapping_object(self._domain_flat_name, 0, 0)
self._init_filer(get_response=get_response)
directoryservice.DirectoryService(self._filer).advanced_mapping(self._domain_flat_name, self._mapping_min, self._mapping_max)
self._filer.get.assert_called_once_with('/config/fileservices/cifs/idMapping/map')
self._filer.put.assert_called_once_with('/config/fileservices/cifs/idMapping/map', mock.ANY)

expected_param = TestEdgeDirectoryService._get_advanced_mapping_object(self._domain_flat_name, self._mapping_min, self._mapping_max)
actual_param = self._filer.put.call_args[0][1]
self._assert_equal_objects(expected_param[0], actual_param[0])

def test_set_advanced_mapping_raise(self):
self.patch_call("cterasdk.edge.directoryservice.DirectoryService.domains")
get_response = self._get_advanced_mapping_object('Invalid domain name', 0, 0)
self._init_filer(get_response=get_response)
with self.assertRaises(exception.CTERAException) as error:
directoryservice.DirectoryService(self._filer).advanced_mapping(self._domain_flat_name, self._mapping_min, self._mapping_max)
self.assertEqual('Could not find domain name', error.exception.message)

def test_domains(self):
domain = Object()
domain.flatName = self._domain_flat_name
execute_response = [domain]
self._init_filer(execute_response=execute_response)
ret = directoryservice.DirectoryService(self._filer).domains()
self._filer.execute.assert_called_once_with('/status/fileservices/cifs', 'enumDiscoveredDomains')
self.assertEqual(ret[0], self._domain_flat_name)

def test_disconnect(self):
get_response = self._get_domain_param()
self._init_filer(get_response=get_response)
directoryservice.DirectoryService(self._filer).disconnect()
self._filer.get.assert_called_once_with('/config/fileservices/cifs')
self._filer.put.assert_called_once_with('/config/fileservices/cifs', mock.ANY)

expected_param = self._get_workgroup_param()
actual_param = self._filer.put.call_args[0][1]
self._assert_equal_objects(expected_param, actual_param)

@staticmethod
def _get_advanced_mapping_object(domain_flat_name, min_id, max_id):
mapping = Object()
mapping.domainFlatName = domain_flat_name
mapping.minID = min_id
mapping.maxID = max_id
return [mapping]

def _get_domain_join_param(self, ou=None):
o = Object()
o.username = self._username
o.password = self._password
if ou is not None:
o.ouPath = ou
return o

def _get_domain_param(self):
cifs_param = Object()
cifs_param.type = "domain"
cifs_param.workgroup = None
cifs_param.domain = self._domain
return cifs_param

def _get_workgroup_param(self):
cifs_param = Object()
cifs_param.type = "workgroup"
cifs_param.workgroup = self._workgroup
cifs_param.domain = None
return cifs_param
23 changes: 23 additions & 0 deletions tests/ut/test_edge_ftp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from cterasdk.edge import ftp
from cterasdk.edge.enum import Mode
from tests.ut import base_edge


class TestEdgeFTP(base_edge.BaseEdgeTest):

def test_ftp_is_disabled(self):
self._init_filer(get_response=Mode.Disabled)
ret = ftp.FTP(self._filer).is_disabled()
self._filer.get.assert_called_once_with('/config/fileservices/ftp/mode')
self.assertEqual(ret, True)

def test_ftp_is_not_disabled(self):
self._init_filer(get_response=Mode.Enabled)
ret = ftp.FTP(self._filer).is_disabled()
self._filer.get.assert_called_once_with('/config/fileservices/ftp/mode')
self.assertEqual(ret, False)

def test_disable_ftp(self):
self._init_filer()
ftp.FTP(self._filer).disable()
self._filer.put.assert_called_once_with('/config/fileservices/ftp/mode', Mode.Disabled)
25 changes: 25 additions & 0 deletions tests/ut/test_edge_licenses.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from unittest import mock

from cterasdk import exception
from cterasdk.edge import licenses
from cterasdk.edge.enum import License
from tests.ut import base_edge


class TestEdgeLicenses(base_edge.BaseEdgeTest):

def test_apply_license(self):
self._init_filer()
ctera_licenses = ['vGateway8', 'vGateway', 'vGateway32', 'vGateway64', 'vGateway128']
calls = [mock.call('/config/device/activeLicenseType', ctera_license) for ctera_license in ctera_licenses]
licenses.Licenses(self._filer).apply(License.EV8)
licenses.Licenses(self._filer).apply(License.EV16)
licenses.Licenses(self._filer).apply(License.EV32)
licenses.Licenses(self._filer).apply(License.EV64)
licenses.Licenses(self._filer).apply(License.EV128)
self._filer.put.assert_has_calls(calls)

def test_apply_license_raise_input_error(self):
with self.assertRaises(exception.InputError) as error:
licenses.Licenses(self._filer).apply('Expected Failure')
self.assertEqual('Invalid license type', error.exception.message)
23 changes: 23 additions & 0 deletions tests/ut/test_edge_nfs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from cterasdk.edge import nfs
from cterasdk.edge.enum import Mode
from tests.ut import base_edge


class TestEdgeNFS(base_edge.BaseEdgeTest):

def test_nfs_is_disabled(self):
self._init_filer(get_response=Mode.Disabled)
ret = nfs.NFS(self._filer).is_disabled()
self._filer.get.assert_called_once_with('/config/fileservices/nfs/mode')
self.assertEqual(ret, True)

def test_nfs_is_not_disabled(self):
self._init_filer(get_response=Mode.Enabled)
ret = nfs.NFS(self._filer).is_disabled()
self._filer.get.assert_called_once_with('/config/fileservices/nfs/mode')
self.assertEqual(ret, False)

def test_disable_nfs(self):
self._init_filer()
nfs.NFS(self._filer).disable()
self._filer.put.assert_called_once_with('/config/fileservices/nfs/mode', Mode.Disabled)
29 changes: 29 additions & 0 deletions tests/ut/test_edge_ntp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from unittest import mock

from cterasdk.edge import ntp
from cterasdk.edge.enum import Mode
from tests.ut import base_edge


class TestEdgeNTP(base_edge.BaseEdgeTest):

def test_enable_ntp(self):
self._init_filer()
ntp.NTP(self._filer).enable()
self._filer.put.assert_called_once_with('/config/time/NTPMode', Mode.Enabled)

def test_disable_ntp(self):
self._init_filer()
ntp.NTP(self._filer).disable()
self._filer.put.assert_called_once_with('/config/time/NTPMode', Mode.Disabled)

def test_enable_ntp_with_servers(self):
servers = [str(i) + '.pool.ntp.org' for i in range(0, 3)]
self._init_filer()
ntp.NTP(self._filer).enable(servers)
self._filer.put.assert_has_calls(
[
mock.call('/config/time/NTPMode', Mode.Enabled),
mock.call('/config/time/NTPServer', servers)
]
)

0 comments on commit 0dc883a

Please sign in to comment.