diff --git a/src/aapi/bases.py b/src/aapi/bases.py index e6d28f5..eaf7d64 100644 --- a/src/aapi/bases.py +++ b/src/aapi/bases.py @@ -2,6 +2,9 @@ import enum import typing import json +import random +from ctm_python_client.core.comm import Environment +from ctm_python_client.core.monitoring import RunMonitor class AAPIJob: pass @@ -57,4 +60,42 @@ def dump_aapi(self, f, indent=None): def as_dict(self): return attrs.asdict(self) - \ No newline at end of file + + def run_on_demand(self, environment: Environment, inpath: str = f'run_on_demand{random.randint(100,999)}', controlm_server: str = None, + run_as: str = None, host: str = None, application: str = None, sub_application: str = None, skip_login: bool = False, + file_path: str = None, delete_afterwards: bool = True, open_in_browser: str = None) -> RunMonitor: + # Import circular dependency + from ctm_python_client.core.workflow import Workflow, WorkflowDefaults + from aapi import Job, Folder + + if isinstance(self, Job) or (hasattr(self, 'job_list') and self.job_list is not None and len(self.job_list) > 0): + try: + on_demand_workflow = Workflow( + environment, + WorkflowDefaults( + controlm_server=controlm_server, + run_as=run_as, + host=host, + application=application, + sub_application=sub_application + ) + ) + if isinstance(self, Folder): + on_demand_workflow.add(self) + else: + on_demand_workflow.add(self, inpath=inpath) + + on_demand_workflow.run_on_demand( + skip_login=skip_login, + file_path=file_path, + delete_afterwards=delete_afterwards, + open_in_browser=open_in_browser + ) + except Exception as e: + errors = [err.get('message', '') + ' ' + err.get('item', '') + for err in json.loads(e.body)['errors']] + raise RuntimeError(f"AAPI request failed: {', '.join(errors)}") + finally: + on_demand_workflow.clear_all() + else: + raise Exception('Run is not allowed for json without jobs') diff --git a/src/clients/ctm_api_client/api/run_api.py b/src/clients/ctm_api_client/api/run_api.py index 2ef3b4e..a3202cf 100644 --- a/src/clients/ctm_api_client/api/run_api.py +++ b/src/clients/ctm_api_client/api/run_api.py @@ -3413,6 +3413,112 @@ def run_jobs_with_http_info(self, job_definitions_file, **kwargs): # noqa: E501 collection_formats=collection_formats, ) + def run_on_demand(self, definitions_file, annotation=None, **kwargs): # noqa: E501 + """Run a job on demand # noqa: E501 + + Run a job on demand # noqa: E501 + This method makes a synchronous HTTP request by default. To make an + asynchronous HTTP request, please pass async_req=True + >>> thread = api.run_on_demand(definitions_file, async_req=True) + >>> result = thread.get() + + :param async_req bool + :param file definitions_file: A file that contains jobs definitions to be run on demand (required) + :param file deploy_descriptor_file: Deploy Descriptor JSON file. + :param annotation - user annotation + :return: RunResult + If the method is called asynchronously, + returns the request thread. + """ + kwargs['_return_http_data_only'] = True + if kwargs.get('async_req'): + return self.run_on_demand_with_http_info(definitions_file, annotation, **kwargs) # noqa: E501 + else: + (data) = self.run_on_demand_with_http_info(definitions_file, annotation, **kwargs) # noqa: E501 + return data + + + def run_on_demand_with_http_info(self, definitions_file, annotation, **kwargs): # noqa: E501 + """Run a job on demand # noqa: E501 + + Run a job on demand # noqa: E501 + This method makes a synchronous HTTP request by default. To make an + asynchronous HTTP request, please pass async_req=True + >>> thread = api.run_on_demand_with_http_info(definitions_file, async_req=True) + >>> result = thread.get() + + :param async_req bool + :param file definitions_file: A file that contains jobs definitions to be run on demand (required) + :param file deploy_descriptor_file: Deploy Descriptor JSON file. + :param annotation - user annotation + :return: RunResult + If the method is called asynchronously, + returns the request thread. + """ + + all_params = ['definitions_file', 'deploy_descriptor_file'] # noqa: E501 + all_params.append('async_req') + all_params.append('_return_http_data_only') + all_params.append('_preload_content') + all_params.append('_request_timeout') + + params = locals() + for key, val in six.iteritems(params['kwargs']): + if key not in all_params: + raise TypeError( + "Got an unexpected keyword argument '%s'" + " to method run_on_demand" % key + ) + params[key] = val + del params['kwargs'] + # verify the required parameter 'definitions_file' is set + if self.api_client.client_side_validation and ('definitions_file' not in params or + params['definitions_file'] is None): # noqa: E501 + raise ValueError("Missing the required parameter `definitions_file` when calling `run_on_demand`") # noqa: E501 + + collection_formats = {} + + path_params = {} + + query_params = [] + + header_params = annotation.get_annotation() if annotation else {} + + form_params = [] + local_var_files = {} + if 'definitions_file' in params: + local_var_files['definitionsFile'] = params['definitions_file'] # noqa: E501 + if 'deploy_descriptor_file' in params: + local_var_files['deployDescriptorFile'] = params['deploy_descriptor_file'] # noqa: E501 + + body_params = None + # HTTP header `Accept` + header_params['Accept'] = self.api_client.select_header_accept( + ['application/json']) # noqa: E501 + + # HTTP header `Content-Type` + header_params['Content-Type'] = self.api_client.select_header_content_type( # noqa: E501 + ['multipart/form-data']) # noqa: E501 + + # Authentication setting + auth_settings = ['ApiKeyAuth', 'Bearer'] # noqa: E501 + + return self.api_client.call_api( + '/run/ondemand', 'POST', + path_params, + query_params, + header_params, + body=body_params, + post_params=form_params, + files=local_var_files, + response_type='RunResult', # noqa: E501 + auth_settings=auth_settings, + async_req=params.get('async_req'), + _return_http_data_only=params.get('_return_http_data_only'), + _preload_content=params.get('_preload_content', True), + _request_timeout=params.get('_request_timeout'), + collection_formats=collection_formats) + def run_now(self, job_id, **kwargs): # noqa: E501 """Bypass scheduling cretirias and start the job # noqa: E501 diff --git a/src/clients/ctm_saas_client/api/run_api.py b/src/clients/ctm_saas_client/api/run_api.py index 03fed0d..20abbb2 100644 --- a/src/clients/ctm_saas_client/api/run_api.py +++ b/src/clients/ctm_saas_client/api/run_api.py @@ -2723,6 +2723,112 @@ def run_jobs_with_http_info(self, job_definitions_file, **kwargs): # noqa: E501 _request_timeout=params.get('_request_timeout'), collection_formats=collection_formats) + def run_on_demand(self, definitions_file, annotation=None, **kwargs): # noqa: E501 + """Run a job on demand # noqa: E501 + + Run a job on demand # noqa: E501 + This method makes a synchronous HTTP request by default. To make an + asynchronous HTTP request, please pass async_req=True + >>> thread = api.run_on_demand(definitions_file, async_req=True) + >>> result = thread.get() + + :param async_req bool + :param file definitions_file: A file that contains jobs definitions to be run on demand (required) + :param file deploy_descriptor_file: Deploy Descriptor JSON file. + :param annotation - user annotation + :return: RunResult + If the method is called asynchronously, + returns the request thread. + """ + kwargs['_return_http_data_only'] = True + if kwargs.get('async_req'): + return self.run_on_demand_with_http_info(definitions_file, annotation, **kwargs) # noqa: E501 + else: + (data) = self.run_on_demand_with_http_info(definitions_file, annotation, **kwargs) # noqa: E501 + return data + + + def run_on_demand_with_http_info(self, definitions_file, annotation, **kwargs): # noqa: E501 + """Run a job on demand # noqa: E501 + + Run a job on demand # noqa: E501 + This method makes a synchronous HTTP request by default. To make an + asynchronous HTTP request, please pass async_req=True + >>> thread = api.run_on_demand_with_http_info(definitions_file, async_req=True) + >>> result = thread.get() + + :param async_req bool + :param file definitions_file: A file that contains jobs definitions to be run on demand (required) + :param file deploy_descriptor_file: Deploy Descriptor JSON file. + :param annotation - user annotation + :return: RunResult + If the method is called asynchronously, + returns the request thread. + """ + + all_params = ['definitions_file', 'deploy_descriptor_file'] # noqa: E501 + all_params.append('async_req') + all_params.append('_return_http_data_only') + all_params.append('_preload_content') + all_params.append('_request_timeout') + + params = locals() + for key, val in six.iteritems(params['kwargs']): + if key not in all_params: + raise TypeError( + "Got an unexpected keyword argument '%s'" + " to method run_on_demand" % key + ) + params[key] = val + del params['kwargs'] + # verify the required parameter 'definitions_file' is set + if self.api_client.client_side_validation and ('definitions_file' not in params or + params['definitions_file'] is None): # noqa: E501 + raise ValueError("Missing the required parameter `definitions_file` when calling `run_on_demand`") # noqa: E501 + + collection_formats = {} + + path_params = {} + + query_params = [] + + header_params = annotation.get_annotation() if annotation else {} + + form_params = [] + local_var_files = {} + if 'definitions_file' in params: + local_var_files['definitionsFile'] = params['definitions_file'] # noqa: E501 + if 'deploy_descriptor_file' in params: + local_var_files['deployDescriptorFile'] = params['deploy_descriptor_file'] # noqa: E501 + + body_params = None + # HTTP header `Accept` + header_params['Accept'] = self.api_client.select_header_accept( + ['application/json']) # noqa: E501 + + # HTTP header `Content-Type` + header_params['Content-Type'] = self.api_client.select_header_content_type( # noqa: E501 + ['multipart/form-data']) # noqa: E501 + + # Authentication setting + auth_settings = ['ApiKeyAuth', 'Bearer'] # noqa: E501 + + return self.api_client.call_api( + '/run/ondemand', 'POST', + path_params, + query_params, + header_params, + body=body_params, + post_params=form_params, + files=local_var_files, + response_type='RunResult', # noqa: E501 + auth_settings=auth_settings, + async_req=params.get('async_req'), + _return_http_data_only=params.get('_return_http_data_only'), + _preload_content=params.get('_preload_content', True), + _request_timeout=params.get('_request_timeout'), + collection_formats=collection_formats) + def run_now(self, job_id, **kwargs): # noqa: E501 """Bypass scheduling cretirias and start the job # noqa: E501 diff --git a/src/ctm_python_client/core/workflow.py b/src/ctm_python_client/core/workflow.py index ab76f5f..065e4d1 100644 --- a/src/ctm_python_client/core/workflow.py +++ b/src/ctm_python_client/core/workflow.py @@ -67,8 +67,7 @@ class WorkflowDefaults: run_as: str = attrs.field(default=None, metadata={'applyon': [ 'Folder', 'SimpleFolder', 'SubFolder', 'Job']}) host: str = attrs.field(default=None, metadata={'applyon': ['Job']}) - when: Job.When = attrs.field(default=None, metadata={ - 'applyon': ['Folder', 'Job']}) + when: Job.When = attrs.field(default=None, metadata={'applyon': ['Folder', 'Job']}) application: str = attrs.field(default=None, metadata={'applyon': [ 'Folder', 'SimpleFolder', 'SubFolder', 'Job']}) sub_application: str = attrs.field(default=None, metadata={'applyon': [ @@ -129,7 +128,7 @@ def add(self, obj: AAPIObject, inpath: str = None, allow_creation: bool = True): pass if inpath: - if isinstance(obj, SubFolder) or isinstance(obj, Job): + if isinstance(obj, SubFolder) or isinstance(obj, Job) or isinstance(obj, AAPIJob): try: folder = self.get(inpath) except Exception as e: @@ -138,7 +137,7 @@ def add(self, obj: AAPIObject, inpath: str = None, allow_creation: bool = True): else: raise e if folder: - if isinstance(obj, Job): + if isinstance(obj, Job) or isinstance(obj, AAPIJob): folder.job_list.append(obj) else: folder.sub_folder_list.append(obj) @@ -146,7 +145,7 @@ def add(self, obj: AAPIObject, inpath: str = None, allow_creation: bool = True): # Folder does not exist, create folder folder, parent_folder = self.create_folder_hierarchy(inpath) self._apply_defaults_for_folder(folder) - if isinstance(obj, Job): + if isinstance(obj, Job) or isinstance(obj, AAPIJob): parent_folder.job_list.append(obj) else: parent_folder.sub_folder_list.append(obj) @@ -408,13 +407,40 @@ def run(self, skip_login: bool = False, file_path: str = None, delete_afterwards try: res = self.aapiclient.run_api.run_jobs(fpath.resolve()) - # return res run_ = RunMonitor(res.run_id, self.aapiclient, monitor_page_uri= res.monitor_page_uri) - if open_in_browser: + if open_in_browser: run_.open_in_browser() return run_ except Exception as e: - raise e + errors = [err.get('message', '') + ' ' + err.get('item', '') + for err in json.loads(e.body)['errors']] + raise RuntimeError(f"AAPI request failed: {', '.join(errors)}") + finally: + if delete_afterwards: + fpath.unlink() + + def run_on_demand(self, skip_login: bool = False, file_path: str = None, delete_afterwards: bool = True, open_in_browser=False) -> RunMonitor: + if not skip_login: + self.aapiclient.authenticate() + + if not file_path: + file_path = f'{tempfile.gettempdir()}/temp_{random.randint(1000,9999)}.json' + + fpath = pathlib.Path(file_path) + + with open(fpath.resolve(), 'w') as f: + self.dump_json(f) + + try: + res = self.aapiclient.run_api.run_on_demand(fpath.resolve()) + run_ = RunMonitor(res.run_id, self.aapiclient, monitor_page_uri= res.monitor_page_uri) + if open_in_browser: + run_.open_in_browser() + return run_ + except Exception as e: + errors = [err.get('message', '') + ' ' + err.get('item', '') + for err in json.loads(e.body)['errors']] + raise RuntimeError(f"AAPI request failed: {', '.join(errors)}") finally: if delete_afterwards: fpath.unlink()