diff --git a/gcloud/webservice3/resources.py b/gcloud/webservice3/resources.py index e0f2909c9e..52c4fe6403 100644 --- a/gcloud/webservice3/resources.py +++ b/gcloud/webservice3/resources.py @@ -354,6 +354,7 @@ def alter_list_data_to_serialize(self, request, data): bundle.data['output'] = component.outputs_format() bundle.data['form'] = component.form bundle.data['desc'] = component.desc + bundle.data['form_is_embedded'] = component.form_is_embedded() # 国际化 name = bundle.data['name'].split('-') bundle.data['group_name'] = _(name[0]) @@ -367,6 +368,7 @@ def alter_detail_data_to_serialize(self, request, data): bundle.data['output'] = component.outputs_format() bundle.data['form'] = component.form bundle.data['desc'] = component.desc + bundle.data['form_is_embedded'] = component.form_is_embedded() # 国际化 name = bundle.data['name'].split('-') bundle.data['group_name'] = _(name[0]) diff --git a/pipeline/component_framework/component.py b/pipeline/component_framework/component.py index 1cc98320fd..22d63c6bf8 100644 --- a/pipeline/component_framework/component.py +++ b/pipeline/component_framework/component.py @@ -29,6 +29,10 @@ def outputs_format(cls): outputs = map(lambda oi: oi._asdict(), outputs) return outputs + @classmethod + def form_is_embedded(cls): + return getattr(cls, 'embedded_form', False) + def clean_execute_data(self, context): return self.data_dict diff --git a/pipeline/component_framework/library.py b/pipeline/component_framework/library.py index 4d2572a893..d44cd6ad8e 100644 --- a/pipeline/component_framework/library.py +++ b/pipeline/component_framework/library.py @@ -25,8 +25,7 @@ def __new__(cls, *args, **kwargs): raise ValueError('please pass a component_code in args or kwargs: ' 'ComponentLibrary(\'code\') or ComponentLibrary(component_code=\'code\')') if component_code not in cls.components: - raise ComponentNotExistException('component %s does not exist.' % - component_code) + raise ComponentNotExistException('component %s does not exist.' % component_code) return cls.components[component_code] @classmethod @@ -35,4 +34,7 @@ def get_component_class(cls, component_code): @classmethod def get_component(cls, component_code, data_dict): - return cls.get_component_class(component_code)(data_dict) + component_cls = cls.get_component_class(component_code) + if component_cls is None: + raise ComponentNotExistException('component %s does not exist.' % component_code) + return component_cls(data_dict) diff --git a/pipeline/component_framework/test.py b/pipeline/component_framework/test.py new file mode 100644 index 0000000000..e5c95fb824 --- /dev/null +++ b/pipeline/component_framework/test.py @@ -0,0 +1,204 @@ +# -*- coding: utf-8 -*- +""" +Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community +Edition) available. +Copyright (C) 2017-2019 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" + +import logging + +from abc import abstractproperty + +from pipeline.core.data.base import DataObject + +logger = logging.getLogger(__name__) + + +class ComponentTestMixin(object): + + @abstractproperty + def component_cls(self): + raise NotImplementedError() + + @abstractproperty + def cases(self): + raise NotImplementedError() + + @property + def patchers(self): + return [] + + @property + def _component_cls_name(self): + return self.component_cls.__name__ + + def _format_failure_message(self, no, name, msg): + return '{component_cls} case {no}:{name} fail: {msg}'.format( + component_cls=self._component_cls_name, + no=no + 1, + name=name, + msg=msg + ) + + def _do_case_assert(self, service, method, assertion, no, name, args=None, kwargs=None): + + do_continue = False + args = args or [service] + kwargs = kwargs or {} + + data = kwargs.get('data') or args[0] + + if assertion.exc: + # raise assertion + + try: + getattr(service, method)(*args, **kwargs) + except Exception as e: + assert e.__class__ == assertion.exc, self._format_failure_message( + no=no, + name=name, + msg='{method} raise assertion failed,\nexcept: {e}\nactual: {a}'.format( + method=method, + e=assertion.exc, + a=e.__class__ + )) + do_continue = True + else: + self.assertTrue(False, msg=self._format_failure_message( + no=no, + name=name, + msg='{method} raise assertion failed, {method} not raise any exception'.format( + method=method + ) + )) + else: + + result = getattr(service, method)(*args, **kwargs) + + if result is None or result is True: + self.assertTrue(assertion.success, msg=self._format_failure_message( + no=no, + name=name, + msg='{method} success assertion failed, {method} execute success'.format( + method=method + ) + )) + + self.assertDictEqual(data.outputs, assertion.outputs, msg=self._format_failure_message( + no=no, + name=name, + msg='{method} outputs assertion failed,\nexcept: {e}\nactual: {a}'.format( + method=method, + e=data.outputs, + a=assertion.outputs + ) + )) + + else: + self.assertFalse(assertion.success, msg=self._format_failure_message( + no=no, + name=name, + msg='{method} success assertion failed, {method} execute failed'.format( + method=method + ) + )) + + do_continue = True + + return do_continue + + def test_component(self): + patchers = self.patchers + for patcher in patchers: + patcher.start() + + component = self.component_cls({}) + + bound_service = component.service() + + for no, case in enumerate(self.cases): + data = DataObject(inputs=case.inputs) + parent_data = DataObject(inputs=case.parent_data) + + # execute result check + do_continue = self._do_case_assert(service=bound_service, + method='execute', + args=(data, parent_data), + assertion=case.execute_assertion, + no=no, + name=case.name) + + if do_continue: + continue + + if bound_service.need_schedule(): + + if bound_service.interval is None: + # callback case + self._do_case_assert(service=bound_service, + method='schedule', + args=(data, parent_data, case.schedule_assertion.callback_data), + assertion=case.schedule_assertion, + no=no, + name=case.name) + + else: + # schedule case + assertions = case.schedule_assertion + assertions = assertions if isinstance(assertions, list) else [assertions] + + for assertion in assertions: + do_continue = self._do_case_assert(service=bound_service, + method='schedule', + args=(data, parent_data), + assertion=assertion, + no=no, + name=case.name) + + if do_continue: + break + + logger.info('{component} paas {num} cases.'.format( + component=self._component_cls_name, + num=len(self.cases) + )) + + for patcher in patchers: + patcher.stop() + + +class ComponentTestCase(object): + def __init__(self, + inputs, + parent_data, + execute_assertion, + schedule_assertion, + name=''): + self.inputs = inputs + self.parent_data = parent_data + self.execute_assertion = execute_assertion + self.schedule_assertion = schedule_assertion + self.name = name + + +class Assertion(object): + def __init__(self, success, outputs, exc=None): + self.success = success + self.outputs = outputs + self.exc = exc + + +class ExecuteAssertion(Assertion): + pass + + +class ScheduleAssertion(Assertion): + def __init__(self, callback_data, *args, **kwargs): + self.callback_data = callback_data + super(ScheduleAssertion, self).__init__(*args, **kwargs) diff --git a/pipeline/contrib/external_plugins/models/base.py b/pipeline/contrib/external_plugins/models/base.py index 6243446a9c..69fd47305c 100644 --- a/pipeline/contrib/external_plugins/models/base.py +++ b/pipeline/contrib/external_plugins/models/base.py @@ -14,7 +14,7 @@ from copy import deepcopy from abc import abstractmethod -from django.db import models +from django.db import models, IntegrityError from django.utils.translation import ugettext_lazy as _ from pipeline.contrib.external_plugins import exceptions @@ -69,10 +69,15 @@ def update_source_from_config(self, configs): defaults = deepcopy(config['details']) defaults['packages'] = config['packages'] - self.update_or_create( - name=config['name'], - from_config=True, - defaults=defaults) + try: + self.update_or_create( + name=config['name'], + from_config=True, + defaults=defaults) + except IntegrityError: + raise exceptions.InvalidOperationException( + 'There is a external source named "{source_name}" but not create from config, ' + 'can not do source update operation'.format(source_name=config['name'])) class ExternalPackageSource(models.Model): @@ -107,7 +112,7 @@ def modules(self): def update_package_source_from_config(source_configs): classified_config = {source_type: [] for source_type in source_cls_factory.keys()} - for config in source_configs: + for config in deepcopy(source_configs): classified_config.setdefault(config.pop('type'), []).append(config) for source_type, configs in classified_config.items(): diff --git a/pipeline/contrib/external_plugins/models/source.py b/pipeline/contrib/external_plugins/models/source.py index 3276592f66..10a251f292 100644 --- a/pipeline/contrib/external_plugins/models/source.py +++ b/pipeline/contrib/external_plugins/models/source.py @@ -14,7 +14,11 @@ from django.db import models from django.conf import settings from django.utils.translation import ugettext_lazy as _ -from pipeline.contrib.external_plugins.utils.importer.git import GitRepoModuleImporter +from pipeline.contrib.external_plugins.utils.importer import ( + GitRepoModuleImporter, + S3ModuleImporter, + FSModuleImporter +) from pipeline.contrib.external_plugins.models.base import ( GIT, @@ -37,7 +41,9 @@ def importer(self): return GitRepoModuleImporter(repo_raw_url=self.repo_raw_address, branch=self.branch, modules=self.packages.keys(), - proxy=getattr(settings, 'EXTERNAL_SOURCE_PROXY')) + proxy=getattr(settings, 'EXTERNAL_SOURCE_PROXY'), + secure_only=getattr(settings, + 'EXTERNAL_SOURCE_SECURE_RESTRICT', {}).get(self.name, True)) @package_source @@ -52,7 +58,13 @@ def type(): return S3 def importer(self): - pass + return S3ModuleImporter(modules=self.packages.keys(), + service_address=self.service_address, + bucket=self.bucket, + access_key=self.access_key, + secret_key=self.secret_key, + secure_only=getattr(settings, + 'EXTERNAL_SOURCE_SECURE_RESTRICT', {}).get(self.name, True)) @package_source @@ -64,4 +76,5 @@ def type(): return FILE_SYSTEM def importer(self): - pass + return FSModuleImporter(modules=self.packages.keys(), + path=self.path) diff --git a/pipeline/contrib/external_plugins/tests/mock.py b/pipeline/contrib/external_plugins/tests/mock.py index d0f68d870f..b8fabbe617 100644 --- a/pipeline/contrib/external_plugins/tests/mock.py +++ b/pipeline/contrib/external_plugins/tests/mock.py @@ -16,6 +16,12 @@ from mock import MagicMock, patch, call # noqa +def mock_s3_resource(resource, **kwargs): + ret = {'resource': resource} + ret.update(kwargs) + return ret + + class Object(object): pass diff --git a/pipeline/contrib/external_plugins/tests/mock_settings.py b/pipeline/contrib/external_plugins/tests/mock_settings.py index 8aa5aff14c..7b5ae2fab2 100644 --- a/pipeline/contrib/external_plugins/tests/mock_settings.py +++ b/pipeline/contrib/external_plugins/tests/mock_settings.py @@ -16,6 +16,8 @@ IMP_ACQUIRE_LOCK = 'imp.acquire_lock' IMP_RELEASE_LOCK = 'imp.release_lock' REQUESTS_GET = 'requests.get' +BOTO3_RESOURCE = 'boto3.resource' +OS_PATH_EXISTS = 'os.path.exists' IMPORTLIB_IMPORT_MODULE = 'importlib.import_module' @@ -36,3 +38,19 @@ UTILS_IMPORTER_GIT_IS_PACKAGE = 'pipeline.contrib.external_plugins.utils.importer.git.GitRepoModuleImporter.is_package' UTILS_IMPORTER__SETUP_IMPORTER = 'pipeline.contrib.external_plugins.utils.importer.utils._setup_importer' UTILS_IMPORTER__REMOVE_IMPORTER = 'pipeline.contrib.external_plugins.utils.importer.utils._remove_importer' + +UTILS_IMPORTER_S3__FETCH_OBJ_CONTENT = \ + 'pipeline.contrib.external_plugins.utils.importer.s3.S3ModuleImporter._fetch_obj_content' +UTILS_IMPORTER_S3_GET_SOURCE = 'pipeline.contrib.external_plugins.utils.importer.s3.S3ModuleImporter.get_source' +UTILS_IMPORTER_S3_GET_FILE = 'pipeline.contrib.external_plugins.utils.importer.s3.S3ModuleImporter.get_file' +UTILS_IMPORTER_S3_IS_PACKAGE = 'pipeline.contrib.external_plugins.utils.importer.s3.S3ModuleImporter.is_package' +UTILS_IMPORTER_S3__GET_S3_OBJ_CONTENT = \ + 'pipeline.contrib.external_plugins.utils.importer.s3.S3ModuleImporter._get_s3_obj_content' + +UTILS_IMPORTER_FS_GET_SOURCE = 'pipeline.contrib.external_plugins.utils.importer.fs.FSModuleImporter.get_source' +UTILS_IMPORTER_FS_GET_FILE = 'pipeline.contrib.external_plugins.utils.importer.fs.FSModuleImporter.get_file' +UTILS_IMPORTER_FS_IS_PACKAGE = 'pipeline.contrib.external_plugins.utils.importer.fs.FSModuleImporter.is_package' +UTILS_IMPORTER_FS__FETCH_FILE_CONTENT = \ + 'pipeline.contrib.external_plugins.utils.importer.fs.FSModuleImporter._fetch_file_content' +UTILS_IMPORTER_FS__GET_FILE_CONTENT = \ + 'pipeline.contrib.external_plugins.utils.importer.fs.FSModuleImporter._get_file_content' diff --git a/pipeline/contrib/external_plugins/tests/models/base/test_external_package_source.py b/pipeline/contrib/external_plugins/tests/models/base/test_external_package_source.py index 744556da0a..ad6abe278f 100644 --- a/pipeline/contrib/external_plugins/tests/models/base/test_external_package_source.py +++ b/pipeline/contrib/external_plugins/tests/models/base/test_external_package_source.py @@ -11,6 +11,8 @@ specific language governing permissions and limitations under the License. """ +from copy import deepcopy + from django.test import TestCase from pipeline.contrib.external_plugins import exceptions @@ -220,3 +222,12 @@ def test_update_package_source_from_config__unsupported_source_type(self): } ] self.assertRaises(KeyError, ExternalPackageSource.update_package_source_from_config, source_configs) + + def test_update_source_from_config__name_conflict(self): + source = deepcopy(SOURCE_1) + source['type'] = 'git' + ExternalPackageSource.update_package_source_from_config([source]) + GitRepoSource.objects.filter(name=source['name']).update(from_config=False) + self.assertRaises(exceptions.InvalidOperationException, + ExternalPackageSource.update_package_source_from_config, + [source]) diff --git a/pipeline/contrib/external_plugins/tests/utils/importer/test_fs.py b/pipeline/contrib/external_plugins/tests/utils/importer/test_fs.py new file mode 100644 index 0000000000..8872c54bd5 --- /dev/null +++ b/pipeline/contrib/external_plugins/tests/utils/importer/test_fs.py @@ -0,0 +1,134 @@ +# -*- coding: utf-8 -*- +""" +Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community +Edition) available. +Copyright (C) 2017-2019 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" + +from django.test import TestCase + +from pipeline.contrib.external_plugins.tests.mock import * # noqa +from pipeline.contrib.external_plugins.tests.mock_settings import * # noqa +from pipeline.contrib.external_plugins.utils.importer.fs import FSModuleImporter + +GET_FILE_RETURN = 'GET_FILE_RETURN' +GET_SOURCE_RETURN = 'a=1' +IS_PACKAGE_RETURN = True +_FETCH_FILE_RETURN = '_FETCH_FILE_RETURN' + + +class FSModuleImporterTestCase(TestCase): + def setUp(self): + self.path = '/usr/imp/custom_components/' + self.path_without_salsh = '/usr/imp/custom_components' + self.fullname = 'module1.module2.module3' + self.module_url = '/usr/imp/custom_components/module1/module2/module3.py' + self.package_url = '/usr/imp/custom_components/module1/module2/module3/__init__.py' + + def test__init__(self): + importer = FSModuleImporter(modules=[], path=self.path) + self.assertEqual(self.path, importer.path) + + importer = FSModuleImporter(modules=[], path=self.path_without_salsh) + self.assertEqual(self.path, importer.path) + + def test_is_package(self): + importer = FSModuleImporter(modules=[], path=self.path) + + with patch(OS_PATH_EXISTS, MagicMock(return_value=True)): + self.assertTrue(importer.is_package(self.fullname)) + + with patch(OS_PATH_EXISTS, MagicMock(return_value=False)): + self.assertFalse(importer.is_package(self.fullname)) + + @patch(UTILS_IMPORTER_FS_GET_FILE, MagicMock(return_value=GET_FILE_RETURN)) + @patch(UTILS_IMPORTER_FS_GET_SOURCE, MagicMock(return_value=GET_SOURCE_RETURN)) + def test_get_code(self): + expect_code = compile(GET_SOURCE_RETURN, GET_FILE_RETURN, 'exec') + importer = FSModuleImporter(modules=[], path=self.path) + + self.assertEqual(expect_code, importer.get_code(self.fullname)) + + @patch(UTILS_IMPORTER_FS_IS_PACKAGE, MagicMock(return_value=IS_PACKAGE_RETURN)) + @patch(UTILS_IMPORTER_FS__FETCH_FILE_CONTENT, MagicMock(return_value=_FETCH_FILE_RETURN)) + def test_get_source(self): + importer = FSModuleImporter(modules=[], path=self.path) + + self.assertEqual(_FETCH_FILE_RETURN, importer.get_source(self.fullname)) + importer._fetch_file_content.assert_called_once_with( + importer._file_path(self.fullname, is_pkg=IS_PACKAGE_RETURN)) + + @patch(UTILS_IMPORTER_FS_IS_PACKAGE, MagicMock(return_value=IS_PACKAGE_RETURN)) + @patch(UTILS_IMPORTER_FS__FETCH_FILE_CONTENT, MagicMock(return_value=None)) + def test_get_source__fetch_none(self): + importer = FSModuleImporter(modules=[], path=self.path) + + self.assertRaises(ImportError, importer.get_source, self.fullname) + importer._fetch_file_content.assert_called_once_with( + importer._file_path(self.fullname, is_pkg=IS_PACKAGE_RETURN)) + + def test_get_path(self): + importer = FSModuleImporter(modules=[], path=self.path) + + self.assertEqual(importer.get_path(self.fullname), ['/usr/imp/custom_components/module1/module2/module3']) + + def test_get_file(self): + importer = FSModuleImporter(modules=[], path=self.path) + + with patch(UTILS_IMPORTER_FS_IS_PACKAGE, MagicMock(return_value=True)): + self.assertEqual(importer.get_file(self.fullname), self.package_url) + + with patch(UTILS_IMPORTER_FS_IS_PACKAGE, MagicMock(return_value=False)): + self.assertEqual(importer.get_file(self.fullname), self.module_url) + + def test__file_path(self): + importer = FSModuleImporter(modules=[], path=self.path) + + self.assertEqual(importer._file_path(self.fullname, is_pkg=True), self.package_url) + self.assertEqual(importer._file_path(self.fullname, is_pkg=False), self.module_url) + + def test__fetch_file__nocache(self): + importer = FSModuleImporter(modules=[], path=self.path, use_cache=False) + + first_file_content = 'first_file_content' + second_file_content = 'second_file_content' + + with patch(UTILS_IMPORTER_FS__GET_FILE_CONTENT, MagicMock(return_value=first_file_content)): + self.assertEqual(importer._fetch_file_content(self.module_url), first_file_content) + self.assertEqual(importer.file_cache, {}) + + with patch(UTILS_IMPORTER_FS__GET_FILE_CONTENT, MagicMock(return_value=second_file_content)): + self.assertEqual(importer._fetch_file_content(self.module_url), second_file_content) + self.assertEqual(importer.file_cache, {}) + + with patch(UTILS_IMPORTER_FS__GET_FILE_CONTENT, MagicMock(return_value=None)): + self.assertIsNone(importer._fetch_file_content(self.module_url)) + self.assertEqual(importer.file_cache, {}) + + def test__fetch_file__use_cache(self): + importer = FSModuleImporter(modules=[], path=self.path) + + first_file_content = 'first_file_content' + second_file_content = 'second_file_content' + + with patch(UTILS_IMPORTER_FS__GET_FILE_CONTENT, MagicMock(return_value=first_file_content)): + self.assertEqual(importer._fetch_file_content(self.module_url), first_file_content) + self.assertEqual(importer.file_cache[self.module_url], first_file_content) + + with patch(UTILS_IMPORTER_FS__GET_FILE_CONTENT, MagicMock(return_value=second_file_content)): + self.assertEqual(importer._fetch_file_content(self.module_url), first_file_content) + self.assertEqual(importer.file_cache[self.module_url], first_file_content) + + with patch(UTILS_IMPORTER_FS__GET_FILE_CONTENT, MagicMock(return_value=None)): + self.assertIsNone(importer._fetch_file_content(self.package_url)) + self.assertEqual(importer.file_cache[self.package_url], None) + + with patch(UTILS_IMPORTER_FS__GET_FILE_CONTENT, MagicMock(return_value=second_file_content)): + self.assertIsNone(importer._fetch_file_content(self.package_url)) + self.assertEqual(importer.file_cache[self.package_url], None) diff --git a/pipeline/contrib/external_plugins/tests/utils/importer/test_git.py b/pipeline/contrib/external_plugins/tests/utils/importer/test_git.py index bf7dcba7bb..2698b9bc22 100644 --- a/pipeline/contrib/external_plugins/tests/utils/importer/test_git.py +++ b/pipeline/contrib/external_plugins/tests/utils/importer/test_git.py @@ -36,9 +36,11 @@ def setUp(self): def test__init__(self): importer = GitRepoModuleImporter(modules=[], repo_raw_url=self.repo_raw_url, branch=self.branch) self.assertEqual(importer.repo_raw_url, self.repo_raw_url) + self.assertEqual(importer.branch, self.branch) importer = GitRepoModuleImporter(modules=[], repo_raw_url=self.repo_raw_url_without_slash, branch=self.branch) self.assertEqual(importer.repo_raw_url, self.repo_raw_url) + self.assertEqual(importer.branch, self.branch) self.assertRaises(ValueError, GitRepoModuleImporter, @@ -74,6 +76,7 @@ def test__fetch_repo_file__no_cache(self): with patch(REQUESTS_GET, MagicMock(return_value=MockResponse(ok=False))): self.assertIsNone(importer._fetch_repo_file(self.module_url)) + self.assertEqual(importer.file_cache, {}) def test__fetch_repo_file__use_cache(self): importer = GitRepoModuleImporter(modules=[], repo_raw_url=self.repo_raw_url, branch=self.branch) @@ -135,6 +138,7 @@ def test_get_source__fetch_none(self): def test_get_path(self): importer = GitRepoModuleImporter(modules=[], repo_raw_url=self.repo_raw_url, branch=self.branch) + self.assertEqual(importer.get_path(self.fullname), ['https://test-git-repo-raw/master/module1/module2/module3']) def test_get_file(self): diff --git a/pipeline/contrib/external_plugins/tests/utils/importer/test_s3.py b/pipeline/contrib/external_plugins/tests/utils/importer/test_s3.py new file mode 100644 index 0000000000..3c3c63ad4d --- /dev/null +++ b/pipeline/contrib/external_plugins/tests/utils/importer/test_s3.py @@ -0,0 +1,214 @@ +# -*- coding: utf-8 -*- +""" +Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community +Edition) available. +Copyright (C) 2017-2019 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" + +from django.test import TestCase + +from pipeline.contrib.external_plugins.tests.mock import * # noqa +from pipeline.contrib.external_plugins.tests.mock_settings import * # noqa +from pipeline.contrib.external_plugins.utils.importer.s3 import S3ModuleImporter + +GET_FILE_RETURN = 'GET_FILE_RETURN' +GET_SOURCE_RETURN = 'a=1' +IS_PACKAGE_RETURN = True +_FETCH_OBJ_CONTENT_RETURN = '_FETCH_OBJ_CONTENT_RETURN' + + +class S3ModuleImporterTestCase(TestCase): + + def setUp(self): + self.service_address = 'https://test-s3-address/' + self.service_address_without_slash = 'https://test-s3-address' + self.not_secure_service_address = 'http://no-secure-address/' + self.bucket = 'bucket' + self.access_key = 'access_key' + self.secret_key = 'secret_key' + self.fullname = 'module1.module2.module3' + self.module_url = 'https://test-s3-address/bucket/module1/module2/module3.py' + self.package_url = 'https://test-s3-address/bucket/module1/module2/module3/__init__.py' + self.module_key = 'module1/module2/module3.py' + self.package_key = 'module1/module2/module3/__init__.py' + + @patch(BOTO3_RESOURCE, mock_s3_resource) + def test__init__(self): + importer = S3ModuleImporter(modules=[], + service_address=self.service_address, + bucket=self.bucket, + access_key=self.access_key, + secret_key=self.secret_key) + self.assertEqual(self.service_address, importer.service_address) + self.assertEqual(importer.s3, mock_s3_resource('s3', + aws_access_key_id=self.access_key, + aws_secret_access_key=self.secret_key, + endpoint_url=self.service_address)) + + importer = S3ModuleImporter(modules=[], + service_address=self.service_address_without_slash, + bucket=self.bucket, + access_key=self.access_key, + secret_key=self.secret_key) + self.assertEqual(self.service_address, importer.service_address) + self.assertEqual(importer.s3, mock_s3_resource('s3', + aws_access_key_id=self.access_key, + aws_secret_access_key=self.secret_key, + endpoint_url=self.service_address)) + + self.assertRaises(ValueError, + S3ModuleImporter, + modules=[], + service_address=self.not_secure_service_address, + bucket=self.bucket, + access_key=self.access_key, + secret_key=self.secret_key) + + importer = S3ModuleImporter(modules=[], + service_address=self.not_secure_service_address, + bucket=self.bucket, + access_key=self.access_key, + secret_key=self.secret_key, + secure_only=False) + self.assertEqual(self.not_secure_service_address, importer.service_address) + self.assertEqual(importer.s3, mock_s3_resource('s3', + aws_access_key_id=self.access_key, + aws_secret_access_key=self.secret_key, + endpoint_url=self.not_secure_service_address)) + + def test_is_package(self): + importer = S3ModuleImporter(modules=[], + service_address=self.service_address, + bucket=self.bucket, + access_key=self.access_key, + secret_key=self.secret_key) + + with patch(UTILS_IMPORTER_S3__FETCH_OBJ_CONTENT, MagicMock(return_value='')): + self.assertTrue(importer.is_package('a.b.c')) + + with patch(UTILS_IMPORTER_S3__FETCH_OBJ_CONTENT, MagicMock(return_value=None)): + self.assertFalse(importer.is_package('a.b.c')) + + @patch(UTILS_IMPORTER_S3_GET_FILE, MagicMock(return_value=GET_FILE_RETURN)) + @patch(UTILS_IMPORTER_S3_GET_SOURCE, MagicMock(return_value=GET_SOURCE_RETURN)) + def test_get_code(self): + expect_code = compile(GET_SOURCE_RETURN, GET_FILE_RETURN, 'exec') + importer = S3ModuleImporter(modules=[], + service_address=self.service_address, + bucket=self.bucket, + access_key=self.access_key, + secret_key=self.secret_key) + + self.assertEqual(expect_code, importer.get_code(self.fullname)) + + @patch(UTILS_IMPORTER_S3_IS_PACKAGE, MagicMock(return_value=IS_PACKAGE_RETURN)) + @patch(UTILS_IMPORTER_S3__FETCH_OBJ_CONTENT, MagicMock(return_value=_FETCH_OBJ_CONTENT_RETURN)) + def test_get_source(self): + importer = S3ModuleImporter(modules=[], + service_address=self.service_address, + bucket=self.bucket, + access_key=self.access_key, + secret_key=self.secret_key) + + self.assertEqual(_FETCH_OBJ_CONTENT_RETURN, importer.get_source(self.fullname)) + importer._fetch_obj_content.assert_called_once_with(importer._obj_key(self.fullname, is_pkg=IS_PACKAGE_RETURN)) + + @patch(UTILS_IMPORTER_S3_IS_PACKAGE, MagicMock(return_value=IS_PACKAGE_RETURN)) + @patch(UTILS_IMPORTER_S3__FETCH_OBJ_CONTENT, MagicMock(return_value=None)) + def test_get_source__fetch_none(self): + importer = S3ModuleImporter(modules=[], + service_address=self.service_address, + bucket=self.bucket, + access_key=self.access_key, + secret_key=self.secret_key) + + self.assertRaises(ImportError, importer.get_source, self.fullname) + importer._fetch_obj_content.assert_called_once_with(importer._obj_key(self.fullname, is_pkg=IS_PACKAGE_RETURN)) + + @patch(UTILS_IMPORTER_S3_IS_PACKAGE, MagicMock(return_value=IS_PACKAGE_RETURN)) + def test_get_path(self): + importer = S3ModuleImporter(modules=[], + service_address=self.service_address, + bucket=self.bucket, + access_key=self.access_key, + secret_key=self.secret_key) + + self.assertEqual(importer.get_path(self.fullname), ['https://test-s3-address/bucket/module1/module2/module3']) + + def test_get_file(self): + importer = S3ModuleImporter(modules=[], + service_address=self.service_address, + bucket=self.bucket, + access_key=self.access_key, + secret_key=self.secret_key) + + with patch(UTILS_IMPORTER_S3_IS_PACKAGE, MagicMock(return_value=False)): + self.assertEqual(importer.get_file(self.fullname), self.module_url) + + with patch(UTILS_IMPORTER_S3_IS_PACKAGE, MagicMock(return_value=True)): + self.assertEqual(importer.get_file(self.fullname), self.package_url) + + def test__obj_key(self): + importer = S3ModuleImporter(modules=[], + service_address=self.service_address, + bucket=self.bucket, + access_key=self.access_key, + secret_key=self.secret_key) + + self.assertEqual('module1/module2/module3/__init__.py', importer._obj_key(self.fullname, is_pkg=True)) + self.assertEqual('module1/module2/module3.py', importer._obj_key(self.fullname, is_pkg=False)) + + def test__fetch_obj_content__no_cache(self): + importer = S3ModuleImporter(modules=[], + service_address=self.service_address, + bucket=self.bucket, + access_key=self.access_key, + secret_key=self.secret_key, + use_cache=False) + + first_obj_content = 'first_obj_content' + second_obj_content = 'second_obj_content' + + with patch(UTILS_IMPORTER_S3__GET_S3_OBJ_CONTENT, MagicMock(return_value=first_obj_content)): + self.assertEqual(importer._fetch_obj_content(self.module_key), first_obj_content) + self.assertEqual(importer.obj_cache, {}) + + with patch(UTILS_IMPORTER_S3__GET_S3_OBJ_CONTENT, MagicMock(return_value=second_obj_content)): + self.assertEqual(importer._fetch_obj_content(self.module_key), second_obj_content) + self.assertEqual(importer.obj_cache, {}) + + with patch(UTILS_IMPORTER_S3__GET_S3_OBJ_CONTENT, MagicMock(return_value=None)): + self.assertIsNone(importer._fetch_obj_content(self.module_key)) + self.assertEqual(importer.obj_cache, {}) + + def test__fetch_obj_content__use_cache(self): + importer = S3ModuleImporter(modules=[], + service_address=self.service_address, + bucket=self.bucket, + access_key=self.access_key, + secret_key=self.secret_key) + + first_obj_content = 'first_obj_content' + second_obj_content = 'second_obj_content' + + with patch(UTILS_IMPORTER_S3__GET_S3_OBJ_CONTENT, MagicMock(return_value=first_obj_content)): + self.assertEqual(importer._fetch_obj_content(self.module_key), first_obj_content) + self.assertEqual(importer.obj_cache[self.module_key], first_obj_content) + + with patch(UTILS_IMPORTER_S3__GET_S3_OBJ_CONTENT, MagicMock(return_value=second_obj_content)): + self.assertEqual(importer._fetch_obj_content(self.module_key), first_obj_content) + self.assertEqual(importer.obj_cache[self.module_key], first_obj_content) + + with patch(UTILS_IMPORTER_S3__GET_S3_OBJ_CONTENT, MagicMock(return_value=None)): + self.assertIsNone(importer._fetch_obj_content(self.package_key)) + self.assertIsNone(importer.obj_cache[self.package_key]) + + with patch(UTILS_IMPORTER_S3__GET_S3_OBJ_CONTENT, MagicMock(return_value=first_obj_content)): + self.assertIsNone(importer._fetch_obj_content(self.package_key)) + self.assertIsNone(importer.obj_cache[self.package_key]) diff --git a/pipeline/contrib/external_plugins/utils/importer/__init__.py b/pipeline/contrib/external_plugins/utils/importer/__init__.py index 6bebac0b21..f51cd27482 100644 --- a/pipeline/contrib/external_plugins/utils/importer/__init__.py +++ b/pipeline/contrib/external_plugins/utils/importer/__init__.py @@ -13,3 +13,5 @@ from pipeline.contrib.external_plugins.utils.importer.utils import importer_context # noqa from pipeline.contrib.external_plugins.utils.importer.git import GitRepoModuleImporter # noqa +from pipeline.contrib.external_plugins.utils.importer.s3 import S3ModuleImporter # noqa +from pipeline.contrib.external_plugins.utils.importer.fs import FSModuleImporter # noqa diff --git a/pipeline/contrib/external_plugins/utils/importer/fs.py b/pipeline/contrib/external_plugins/utils/importer/fs.py new file mode 100644 index 0000000000..21b5d6296a --- /dev/null +++ b/pipeline/contrib/external_plugins/utils/importer/fs.py @@ -0,0 +1,87 @@ +# -*- coding: utf-8 -*- +""" +Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community +Edition) available. +Copyright (C) 2017-2019 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" + +import os +import logging +import traceback + +from pipeline.contrib.external_plugins.utils.importer.base import NonstandardModuleImporter + +logger = logging.getLogger('root') + + +class FSModuleImporter(NonstandardModuleImporter): + def __init__(self, + modules, + path, + use_cache=True): + super(FSModuleImporter, self).__init__(modules=modules) + + self.path = path if path.endswith('/') else '%s/' % path + self.use_cache = use_cache + self.file_cache = {} + + def is_package(self, fullname): + return os.path.exists(self._file_path(fullname, is_pkg=True)) + + def get_code(self, fullname): + return compile(self.get_source(fullname), self.get_file(fullname), 'exec') + + def get_source(self, fullname): + source_code = self._fetch_file_content(self._file_path(fullname, is_pkg=self.is_package(fullname))) + + if source_code is None: + raise ImportError('Can not find {module} in {path}'.format( + module=fullname, + path=self.path + )) + + return source_code + + def get_path(self, fullname): + return [self._file_path(fullname, is_pkg=True).rpartition('/')[0]] + + def get_file(self, fullname): + return self._file_path(fullname, is_pkg=self.is_package(fullname)) + + def _file_path(self, fullname, is_pkg=False): + base_path = '{path}{file_path}'.format(path=self.path, file_path=fullname.replace('.', '/')) + file_path = '%s/__init__.py' % base_path if is_pkg else '%s.py' % base_path + return file_path + + def _fetch_file_content(self, file_path): + logger.info('Try to fetch file {file_path}'.format(file_path=file_path)) + + if self.use_cache and file_path in self.file_cache: + logger.info('Use content in cache for file: {file_path}'.format(file_path=file_path)) + return self.file_cache[file_path] + + file_content = self._get_file_content(file_path) + + if self.use_cache: + self.file_cache[file_path] = file_content + + return file_content + + def _get_file_content(self, file_path): + try: + with open(file_path) as f: + file_content = f.read() + except IOError: + logger.error('Error occurred when read {file_path} content: {trace}'.format( + file_path=file_path, + trace=traceback.format_exc() + )) + file_content = None + + return file_content diff --git a/pipeline/contrib/external_plugins/utils/importer/s3.py b/pipeline/contrib/external_plugins/utils/importer/s3.py new file mode 100644 index 0000000000..1cfc2a4fda --- /dev/null +++ b/pipeline/contrib/external_plugins/utils/importer/s3.py @@ -0,0 +1,108 @@ +# -*- coding: utf-8 -*- +""" +Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community +Edition) available. +Copyright (C) 2017-2019 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" + +import logging + +import boto3 +from botocore.exceptions import ClientError + +from pipeline.contrib.external_plugins.utils.importer.base import NonstandardModuleImporter + +logger = logging.getLogger('root') + + +class S3ModuleImporter(NonstandardModuleImporter): + def __init__(self, + modules, + service_address, + bucket, + access_key, + secret_key, + use_cache=True, + secure_only=True): + super(S3ModuleImporter, self).__init__(modules=modules) + + if secure_only and not service_address.startswith('https'): + raise ValueError('Only accept https when secure_only it True.') + elif not secure_only: + logger.warning('Using not secure protocol is extremely dangerous!!') + + self.service_address = service_address if service_address.endswith('/') else '%s/' % service_address + self.bucket = bucket + self.use_cache = use_cache + self.s3 = boto3.resource('s3', + aws_access_key_id=access_key, + aws_secret_access_key=secret_key, + endpoint_url=self.service_address) + self.obj_cache = {} + + def is_package(self, fullname): + return self._fetch_obj_content(self._obj_key(fullname, is_pkg=True)) is not None + + def get_code(self, fullname): + return compile(self.get_source(fullname), self.get_file(fullname), 'exec') + + def get_source(self, fullname): + source_code = self._fetch_obj_content(self._obj_key(fullname, is_pkg=self.is_package(fullname))) + + if source_code is None: + raise ImportError('Can not find {module} in {service_address}{bucket}'.format( + module=fullname, + service_address=self.service_address, + bucket=self.bucket + )) + + return source_code + + def get_path(self, fullname): + return [self.get_file(fullname).rpartition('/')[0]] + + def get_file(self, fullname): + return '{service_address}{bucket}/{key}'.format( + service_address=self.service_address, + bucket=self.bucket, + key=self._obj_key(fullname, is_pkg=self.is_package(fullname)) + ) + + def _obj_key(self, fullname, is_pkg): + base_key = fullname.replace('.', '/') + key = '%s/__init__.py' % base_key if is_pkg else '%s.py' % base_key + return key + + def _fetch_obj_content(self, key): + logger.info('Try to fetch object: {key}'.format(key=key)) + + if self.use_cache and key in self.obj_cache: + logger.info('Use content in cache for s3 object: {key}'.format(key=key)) + return self.obj_cache[key] + + obj_content = self._get_s3_obj_content(key) + + if self.use_cache: + self.obj_cache[key] = obj_content + + return obj_content + + def _get_s3_obj_content(self, key): + obj = self.s3.Object(bucket_name=self.bucket, key=key) + + try: + resp = obj.get() + obj_content = resp['Body'].read() + except ClientError as e: + if e.response['Error']['Code'] == 'NoSuchKey': + obj_content = None + else: + raise + + return obj_content diff --git a/pipeline/engine/api.py b/pipeline/engine/api.py index 945e601ed4..854110a779 100644 --- a/pipeline/engine/api.py +++ b/pipeline/engine/api.py @@ -145,8 +145,14 @@ def revoke_pipeline(pipeline_id): return action_result process = PipelineModel.objects.get(id=pipeline_id).process - process.revoke_subprocess() - process.destroy_all() + + if not process: + return ActionResult(result=False, message='relate process is none, this pipeline may be revoked.') + + with transaction.atomic(): + PipelineProcess.objects.select_for_update().get(id=process.id) + process.revoke_subprocess() + process.destroy_all() return action_result diff --git a/pipeline/engine/core/runtime.py b/pipeline/engine/core/runtime.py index c7ba2e5785..2d91e373bb 100644 --- a/pipeline/engine/core/runtime.py +++ b/pipeline/engine/core/runtime.py @@ -74,8 +74,8 @@ def run_loop(process): return # try to transit current node to running state - action = Status.objects.transit(id=current_node.id, to_state=states.RUNNING, start=True, - name=str(current_node.__class__)) + name = (current_node.name or str(current_node.__class__))[:64] + action = Status.objects.transit(id=current_node.id, to_state=states.RUNNING, start=True, name=name) # check rerun limit if not isinstance(current_node, SubProcess) and RERUN_MAX_LIMIT != 0 and \ diff --git a/pipeline/engine/core/schedule.py b/pipeline/engine/core/schedule.py index f2d859d597..62a9b312d9 100644 --- a/pipeline/engine/core/schedule.py +++ b/pipeline/engine/core/schedule.py @@ -102,8 +102,14 @@ def schedule(process_id, schedule_id): logger.info('node %s %s timeout monitor revoke' % (service_act.id, version)) Data.objects.write_node_data(service_act, ex_data=ex_data) - process = PipelineProcess.objects.get(id=sched_service.process_id) - process.adjust_status() + + with transaction.atomic(): + process = PipelineProcess.objects.select_for_update().get(id=sched_service.process_id) + if not process.is_alive: + logger.info('pipeline %s has been revoked, status adjust failed.' % process.root_pipeline_id) + return + + process.adjust_status() # send activity error signal diff --git a/pipeline/engine/utils.py b/pipeline/engine/utils.py index d9696d8c8a..63153e6773 100644 --- a/pipeline/engine/utils.py +++ b/pipeline/engine/utils.py @@ -47,9 +47,9 @@ def calculate_elapsed_time(started_time, archived_time): """ if archived_time and started_time: # when status_tree['archived_time'] == status_tree['started_time'], set elapsed_time to 1s - elapsed_time = (archived_time - started_time).seconds or 1 + elapsed_time = (archived_time - started_time).total_seconds() or 1 elif started_time: - elapsed_time = (timezone.now() - started_time).seconds + elapsed_time = (timezone.now() - started_time).total_seconds() else: elapsed_time = 0 return elapsed_time diff --git a/pipeline/requirements.txt b/pipeline/requirements.txt index 14445da9d4..4e4114d1ec 100644 --- a/pipeline/requirements.txt +++ b/pipeline/requirements.txt @@ -32,3 +32,4 @@ redis-py-cluster==1.3.5 django-timezone-field==3.0 mock==2.0.0 factory_boy==2.11.1 +boto3==1.9.130 diff --git a/pipeline/tests/component_framework/test_component.py b/pipeline/tests/component_framework/test_component.py index 5f7329a1ac..a3ba920675 100644 --- a/pipeline/tests/component_framework/test_component.py +++ b/pipeline/tests/component_framework/test_component.py @@ -43,8 +43,16 @@ class CCUpdateHostModuleComponent(Component): code = 'cc_update_module' form = 'form path' + class CCUpdateHostModuleComponentEmbeddedForm(Component): + name = u'修改主机所属模块' + bound_service = CCUpdateHostModuleService + code = 'cc_update_module_embedded_form' + embedded_form = True + form = 'form path' + self.service = CCUpdateHostModuleService self.component = CCUpdateHostModuleComponent + self.component_embedded_form = CCUpdateHostModuleComponentEmbeddedForm def tearDown(self): ComponentModel.objects.all().delete() @@ -92,3 +100,7 @@ def test_data_for_execution_lack_of_inputs(self): } component = self.component(data) self.assertRaises(ComponentDataLackException, execution_data=component.data_for_execution, args=[None, None]) + + def test_form_is_embedded(self): + self.assertFalse(self.component.form_is_embedded()) + self.assertTrue(self.component_embedded_form.form_is_embedded()) diff --git a/pipeline/tests/component_framework/test_library.py b/pipeline/tests/component_framework/test_library.py index 09f359a2c7..118a3e1a90 100644 --- a/pipeline/tests/component_framework/test_library.py +++ b/pipeline/tests/component_framework/test_library.py @@ -67,6 +67,9 @@ def outputs_format(self): self.assertEqual(ComponentLibrary.get_component('code', {}).__class__, TestComponent) + def test_get_component__raise(self): + self.assertRaises(ComponentNotExistException, ComponentLibrary.get_component, 'c_not_exist', {}) + def test_args_new(self): component = ComponentLibrary(self.component.code) self.assertEqual(component, self.component) diff --git a/pipeline/tests/engine/core/test_runtime.py b/pipeline/tests/engine/core/test_runtime.py index 2f657b2d77..005f5a25f4 100644 --- a/pipeline/tests/engine/core/test_runtime.py +++ b/pipeline/tests/engine/core/test_runtime.py @@ -253,7 +253,7 @@ def test_run_loop(self): with patch('pipeline.engine.core.runtime.FLOW_NODE_HANDLERS', mock_handlers): # 6.1. test should return - current_node = IdentifyObject() + current_node = IdentifyObject(name='name') process = MockPipelineProcess(top_pipeline=PipelineObject(node=current_node), destination_id=uniqid(), current_node_id=current_node.id) @@ -275,7 +275,7 @@ def test_run_loop(self): Status.objects.transit.assert_called_with(id=current_node.id, to_state=states.RUNNING, start=True, - name=str(current_node.__class__)) + name=current_node.name) process.refresh_current_node.assert_called_once_with(current_node.id) diff --git a/pipeline/tests/engine/core/test_schedule.py b/pipeline/tests/engine/core/test_schedule.py index ba33fd4387..ec54facc60 100644 --- a/pipeline/tests/engine/core/test_schedule.py +++ b/pipeline/tests/engine/core/test_schedule.py @@ -161,7 +161,8 @@ def test_schedule__schedule_return_fail_and_transit_success(self): for timeout in (True, False): process = MockPipelineProcess() mock_ss = MockScheduleService(schedule_return=False, service_timeout=timeout) - with mock.patch(PIPELINE_PROCESS_GET, mock.MagicMock(return_value=process)): + with mock.patch(PIPELINE_PROCESS_SELECT_FOR_UPDATE, + mock.MagicMock(return_value=MockQuerySet(get_return=process))): with mock.patch(PIPELINE_SCHEDULE_SERVICE_GET, mock.MagicMock(return_value=mock_ss)): process_id = uniqid() @@ -250,7 +251,8 @@ def test_schedule__schedule_raise_exception_and_transit_success(self): mock_ss = MockScheduleService(schedule_exception=e, service_timeout=timeout) process = MockPipelineProcess() - with mock.patch(PIPELINE_PROCESS_GET, mock.MagicMock(return_value=process)): + with mock.patch(PIPELINE_PROCESS_SELECT_FOR_UPDATE, + mock.MagicMock(return_value=MockQuerySet(get_return=process))): with mock.patch(PIPELINE_SCHEDULE_SERVICE_GET, mock.MagicMock(return_value=mock_ss)): process_id = uniqid() @@ -290,6 +292,59 @@ def test_schedule__schedule_raise_exception_and_transit_success(self): signals.service_schedule_fail.send.reset_mock() valve.send.reset_mock() + @mock.patch(PIPELINE_SCHEDULE_SERVICE_FILTER, mock.MagicMock(return_value=MockQuerySet(exists_return=True))) + @mock.patch(PIPELINE_STATUS_TRANSIT, mock.MagicMock(return_value=MockActionResult(result=True))) + @mock.patch(PIPELINE_DATA_WRITE_NODE_DATA, mock.MagicMock()) + @mock.patch(PIPELINE_STATUS_FILTER, mock.MagicMock(return_value=MockQuerySet(exists_return=True))) + @mock.patch(SCHEDULE_DELETE_PARENT_DATA, mock.MagicMock()) + @mock.patch(SCHEDULE_GET_SCHEDULE_PARENT_DATA, mock.MagicMock(return_value=PARENT_DATA)) + @mock.patch(SCHEDULE_SET_SCHEDULE_DATA, mock.MagicMock()) + @mock.patch(ENGINE_SIGNAL_TIMEOUT_END_SEND, mock.MagicMock()) + @mock.patch(ENGINE_SIGNAL_ACT_SCHEDULE_FAIL_SEND, mock.MagicMock()) + @mock.patch(SIGNAL_VALVE_SEND, mock.MagicMock()) + def test_schedule__schedule_raise_exception_and_process_is_not_alive(self): + for timeout in (True, False): + e = Exception() + mock_ss = MockScheduleService(schedule_exception=e, service_timeout=timeout) + process = MockPipelineProcess(is_alive=False) + + with mock.patch(PIPELINE_PROCESS_SELECT_FOR_UPDATE, + mock.MagicMock(return_value=MockQuerySet(get_return=process))): + with mock.patch(PIPELINE_SCHEDULE_SERVICE_GET, mock.MagicMock(return_value=mock_ss)): + process_id = uniqid() + + schedule.schedule(process_id, mock_ss.id) + + mock_ss.service_act.schedule.assert_called_once_with(PARENT_DATA, mock_ss.callback_data) + + self.assertEqual(mock_ss.schedule_times, 1) + + mock_ss.destroy.assert_not_called() + + if timeout: + signals.service_activity_timeout_monitor_end.send.assert_called_once_with( + sender=mock_ss.service_act.__class__, + node_id=mock_ss.service_act.id, + version=mock_ss.version + ) + else: + signals.service_activity_timeout_monitor_end.send.assert_not_called() + + Data.objects.write_node_data.assert_called() + + process.adjust_status.assert_not_called() + + mock_ss.service_act.schedule_fail.assert_not_called() + + signals.service_schedule_fail.send.assert_not_called() + + valve.send.assert_not_called() + + signals.service_activity_timeout_monitor_end.send.reset_mock() + Data.objects.write_node_data.reset_mock() + signals.service_schedule_fail.send.reset_mock() + valve.send.reset_mock() + @mock.patch(PIPELINE_SCHEDULE_SERVICE_FILTER, mock.MagicMock(return_value=MockQuerySet(exists_return=True))) @mock.patch(PIPELINE_STATUS_TRANSIT, mock.MagicMock(return_value=MockActionResult(result=True))) @mock.patch(PIPELINE_DATA_WRITE_NODE_DATA, mock.MagicMock()) diff --git a/pipeline/tests/engine/test_api.py b/pipeline/tests/engine/test_api.py index 785c5ca9a4..461f463b6c 100644 --- a/pipeline/tests/engine/test_api.py +++ b/pipeline/tests/engine/test_api.py @@ -163,18 +163,30 @@ def test_revoke_pipeline__transit_fail(self): self.assertFalse(act_result.result) + @patch(PIPELINE_FUNCTION_SWITCH_IS_FROZEN, MagicMock(return_value=False)) + @patch(PIPELINE_STATUS_TRANSIT, MagicMock(return_value=MockActionResult(result=True))) + def test_revoke_pipeline__process_is_none(self): + pipeline_model = MockPipelineModel(process=None) + + with patch(PIPELINE_PIPELINE_MODEL_GET, MagicMock(return_value=pipeline_model)): + act_result = api.revoke_pipeline(self.pipeline_id) + + self.assertFalse(act_result.result) + @patch(PIPELINE_FUNCTION_SWITCH_IS_FROZEN, MagicMock(return_value=False)) @patch(PIPELINE_STATUS_TRANSIT, MagicMock(return_value=MockActionResult(result=True))) def test_revoke_pipeline__transit_success(self): pipeline_model = MockPipelineModel() with patch(PIPELINE_PIPELINE_MODEL_GET, MagicMock(return_value=pipeline_model)): - act_result = api.revoke_pipeline(self.pipeline_id) + with mock.patch(PIPELINE_PROCESS_SELECT_FOR_UPDATE, + mock.MagicMock(return_value=MockQuerySet(get_return=pipeline_model.process))): + act_result = api.revoke_pipeline(self.pipeline_id) - self.assertTrue(act_result.result) + self.assertTrue(act_result.result) - pipeline_model.process.revoke_subprocess.assert_called_once() - pipeline_model.process.destroy_all.assert_called_once() + pipeline_model.process.revoke_subprocess.assert_called_once() + pipeline_model.process.destroy_all.assert_called_once() @patch(PIPELINE_FUNCTION_SWITCH_IS_FROZEN, MagicMock(return_value=False)) @patch(PIPELINE_STATUS_TRANSIT, MagicMock(return_value=MockActionResult(result=True))) diff --git a/pipeline/tests/engine/utils/test_utils_func.py b/pipeline/tests/engine/utils/test_utils_func.py new file mode 100644 index 0000000000..cfe1c282fa --- /dev/null +++ b/pipeline/tests/engine/utils/test_utils_func.py @@ -0,0 +1,55 @@ +# -*- coding: utf-8 -*- +""" +Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community +Edition) available. +Copyright (C) 2017-2019 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" + +import datetime + +from django.test import TestCase +from django.utils import timezone + +from pipeline.engine.utils import calculate_elapsed_time + + +class EngineUtilsFuncTestCase(TestCase): + + def test_calculate_elapsed_time(self): + self.assertEqual(calculate_elapsed_time(None, None), 0) + + self.assertEqual(calculate_elapsed_time(started_time=None, archived_time=timezone.now()), 0) + + self.assertNotEqual(calculate_elapsed_time(started_time=timezone.now() - datetime.timedelta(seconds=1), + archived_time=None), + 0) + + # seconds + start = timezone.now() + archive = start + datetime.timedelta(seconds=59) + + self.assertEqual(calculate_elapsed_time(started_time=start, archived_time=archive), 59) + + # minutes + start = timezone.now() + archive = start + datetime.timedelta(minutes=3) + + self.assertEqual(calculate_elapsed_time(started_time=start, archived_time=archive), 3 * 60) + + # hours + start = timezone.now() + archive = start + datetime.timedelta(hours=3) + + self.assertEqual(calculate_elapsed_time(started_time=start, archived_time=archive), 3 * 60 * 60) + + # days + start = timezone.now() + archive = start + datetime.timedelta(days=3) + + self.assertEqual(calculate_elapsed_time(started_time=start, archived_time=archive), 3 * 24 * 60 * 60) diff --git a/pipeline/tests/mock.py b/pipeline/tests/mock.py index 9c59d885bd..e3f4ecd5f2 100644 --- a/pipeline/tests/mock.py +++ b/pipeline/tests/mock.py @@ -51,8 +51,9 @@ def get_outputs(self): class IdentifyObject(object): - def __init__(self, id=None): + def __init__(self, id=None, name=None): self.id = id or uniqid() + self.name = name or '' class StartEventObject(IdentifyObject): diff --git a/pipeline/validators/base.py b/pipeline/validators/base.py index 5c13d85b61..9f4a492792 100644 --- a/pipeline/validators/base.py +++ b/pipeline/validators/base.py @@ -22,13 +22,16 @@ def validate_pipeline_tree(pipeline_tree, cycle_tolerate=False): # 1. connection validation - validate_graph_connection(pipeline_tree) + try: + validate_graph_connection(pipeline_tree) + except exceptions.ConnectionValidateError as e: + raise exceptions.ParserException(e.detail) # do not tolerate circle in flow if not cycle_tolerate: - result = find_graph_circle(pipeline_tree) - if not result['result']: - raise exceptions.CycleErrorException(result['message']) + no_cycle = find_graph_circle(pipeline_tree) + if not no_cycle['result']: + raise exceptions.ParserException(no_cycle['message']) # 2. gateway validation validate_gateways(pipeline_tree) diff --git a/requirements.txt b/requirements.txt index 14445da9d4..4e4114d1ec 100644 --- a/requirements.txt +++ b/requirements.txt @@ -32,3 +32,4 @@ redis-py-cluster==1.3.5 django-timezone-field==3.0 mock==2.0.0 factory_boy==2.11.1 +boto3==1.9.130