From 000cdba727d4885ebf5519fb722e2ba8a0e3f847 Mon Sep 17 00:00:00 2001 From: RefaelBeker7 Date: Wed, 27 Nov 2024 14:51:24 +0200 Subject: [PATCH 1/6] Support run ondemand Automation-API Control-M --- src/aapi/bases.py | 33 ++++++- src/clients/ctm_api_client/api/run_api.py | 106 +++++++++++++++++++++ src/clients/ctm_saas_client/api/run_api.py | 106 +++++++++++++++++++++ src/ctm_python_client/core/workflow.py | 39 ++++++-- 4 files changed, 277 insertions(+), 7 deletions(-) diff --git a/src/aapi/bases.py b/src/aapi/bases.py index e6d28f5..b983edf 100644 --- a/src/aapi/bases.py +++ b/src/aapi/bases.py @@ -2,6 +2,11 @@ import enum import typing import json +import random +from ctm_python_client.core.workflow import WorkflowDefaults, Workflow +from ctm_python_client.core.comm import Environment +from ctm_python_client.core.monitoring import RunMonitor +from aapi import Job, FolderJobBaseSmart, Folder class AAPIJob: pass @@ -57,4 +62,30 @@ def dump_aapi(self, f, indent=None): def as_dict(self): return attrs.asdict(self) - \ No newline at end of file + + def run_on_demand(self, environment: Environment, workflow_defaults: WorkflowDefaults = None, inpath: str = f'run_on_demand{random.randint(100,999)}', skip_login: bool = False, file_path: str = None, delete_afterwards: bool = True) -> RunMonitor: + if isinstance(self, Job) or (hasattr(self, 'job_list') and self.job_list is not None and + len(self.job_list) > 0) or isinstance(self, FolderJobBaseSmart): + try: + on_demand_workflow = Workflow( + environment, + workflow_defaults + ) + if isinstance(self, Folder): + on_demand_workflow.add(self) + else: + on_demand_workflow.add(self, inpath=inpath) + + on_demand_workflow.run_on_demand( + skip_login=skip_login, + file_path=file_path, + delete_afterwards=delete_afterwards + ) + except Exception as e: + errors = [err.get('message', '') + ' ' + err.get('item', '') + for err in json.loads(e.body)['errors']] + raise RuntimeError(f"AAPI request failed: {', '.join(errors)}") + finally: + on_demand_workflow.clear_all() + else: + raise Exception('Run is not allowed for json without jobs') diff --git a/src/clients/ctm_api_client/api/run_api.py b/src/clients/ctm_api_client/api/run_api.py index 2ef3b4e..a3202cf 100644 --- a/src/clients/ctm_api_client/api/run_api.py +++ b/src/clients/ctm_api_client/api/run_api.py @@ -3413,6 +3413,112 @@ def run_jobs_with_http_info(self, job_definitions_file, **kwargs): # noqa: E501 collection_formats=collection_formats, ) + def run_on_demand(self, definitions_file, annotation=None, **kwargs): # noqa: E501 + """Run a job on demand # noqa: E501 + + Run a job on demand # noqa: E501 + This method makes a synchronous HTTP request by default. To make an + asynchronous HTTP request, please pass async_req=True + >>> thread = api.run_on_demand(definitions_file, async_req=True) + >>> result = thread.get() + + :param async_req bool + :param file definitions_file: A file that contains jobs definitions to be run on demand (required) + :param file deploy_descriptor_file: Deploy Descriptor JSON file. + :param annotation - user annotation + :return: RunResult + If the method is called asynchronously, + returns the request thread. + """ + kwargs['_return_http_data_only'] = True + if kwargs.get('async_req'): + return self.run_on_demand_with_http_info(definitions_file, annotation, **kwargs) # noqa: E501 + else: + (data) = self.run_on_demand_with_http_info(definitions_file, annotation, **kwargs) # noqa: E501 + return data + + + def run_on_demand_with_http_info(self, definitions_file, annotation, **kwargs): # noqa: E501 + """Run a job on demand # noqa: E501 + + Run a job on demand # noqa: E501 + This method makes a synchronous HTTP request by default. To make an + asynchronous HTTP request, please pass async_req=True + >>> thread = api.run_on_demand_with_http_info(definitions_file, async_req=True) + >>> result = thread.get() + + :param async_req bool + :param file definitions_file: A file that contains jobs definitions to be run on demand (required) + :param file deploy_descriptor_file: Deploy Descriptor JSON file. + :param annotation - user annotation + :return: RunResult + If the method is called asynchronously, + returns the request thread. + """ + + all_params = ['definitions_file', 'deploy_descriptor_file'] # noqa: E501 + all_params.append('async_req') + all_params.append('_return_http_data_only') + all_params.append('_preload_content') + all_params.append('_request_timeout') + + params = locals() + for key, val in six.iteritems(params['kwargs']): + if key not in all_params: + raise TypeError( + "Got an unexpected keyword argument '%s'" + " to method run_on_demand" % key + ) + params[key] = val + del params['kwargs'] + # verify the required parameter 'definitions_file' is set + if self.api_client.client_side_validation and ('definitions_file' not in params or + params['definitions_file'] is None): # noqa: E501 + raise ValueError("Missing the required parameter `definitions_file` when calling `run_on_demand`") # noqa: E501 + + collection_formats = {} + + path_params = {} + + query_params = [] + + header_params = annotation.get_annotation() if annotation else {} + + form_params = [] + local_var_files = {} + if 'definitions_file' in params: + local_var_files['definitionsFile'] = params['definitions_file'] # noqa: E501 + if 'deploy_descriptor_file' in params: + local_var_files['deployDescriptorFile'] = params['deploy_descriptor_file'] # noqa: E501 + + body_params = None + # HTTP header `Accept` + header_params['Accept'] = self.api_client.select_header_accept( + ['application/json']) # noqa: E501 + + # HTTP header `Content-Type` + header_params['Content-Type'] = self.api_client.select_header_content_type( # noqa: E501 + ['multipart/form-data']) # noqa: E501 + + # Authentication setting + auth_settings = ['ApiKeyAuth', 'Bearer'] # noqa: E501 + + return self.api_client.call_api( + '/run/ondemand', 'POST', + path_params, + query_params, + header_params, + body=body_params, + post_params=form_params, + files=local_var_files, + response_type='RunResult', # noqa: E501 + auth_settings=auth_settings, + async_req=params.get('async_req'), + _return_http_data_only=params.get('_return_http_data_only'), + _preload_content=params.get('_preload_content', True), + _request_timeout=params.get('_request_timeout'), + collection_formats=collection_formats) + def run_now(self, job_id, **kwargs): # noqa: E501 """Bypass scheduling cretirias and start the job # noqa: E501 diff --git a/src/clients/ctm_saas_client/api/run_api.py b/src/clients/ctm_saas_client/api/run_api.py index 03fed0d..20abbb2 100644 --- a/src/clients/ctm_saas_client/api/run_api.py +++ b/src/clients/ctm_saas_client/api/run_api.py @@ -2723,6 +2723,112 @@ def run_jobs_with_http_info(self, job_definitions_file, **kwargs): # noqa: E501 _request_timeout=params.get('_request_timeout'), collection_formats=collection_formats) + def run_on_demand(self, definitions_file, annotation=None, **kwargs): # noqa: E501 + """Run a job on demand # noqa: E501 + + Run a job on demand # noqa: E501 + This method makes a synchronous HTTP request by default. To make an + asynchronous HTTP request, please pass async_req=True + >>> thread = api.run_on_demand(definitions_file, async_req=True) + >>> result = thread.get() + + :param async_req bool + :param file definitions_file: A file that contains jobs definitions to be run on demand (required) + :param file deploy_descriptor_file: Deploy Descriptor JSON file. + :param annotation - user annotation + :return: RunResult + If the method is called asynchronously, + returns the request thread. + """ + kwargs['_return_http_data_only'] = True + if kwargs.get('async_req'): + return self.run_on_demand_with_http_info(definitions_file, annotation, **kwargs) # noqa: E501 + else: + (data) = self.run_on_demand_with_http_info(definitions_file, annotation, **kwargs) # noqa: E501 + return data + + + def run_on_demand_with_http_info(self, definitions_file, annotation, **kwargs): # noqa: E501 + """Run a job on demand # noqa: E501 + + Run a job on demand # noqa: E501 + This method makes a synchronous HTTP request by default. To make an + asynchronous HTTP request, please pass async_req=True + >>> thread = api.run_on_demand_with_http_info(definitions_file, async_req=True) + >>> result = thread.get() + + :param async_req bool + :param file definitions_file: A file that contains jobs definitions to be run on demand (required) + :param file deploy_descriptor_file: Deploy Descriptor JSON file. + :param annotation - user annotation + :return: RunResult + If the method is called asynchronously, + returns the request thread. + """ + + all_params = ['definitions_file', 'deploy_descriptor_file'] # noqa: E501 + all_params.append('async_req') + all_params.append('_return_http_data_only') + all_params.append('_preload_content') + all_params.append('_request_timeout') + + params = locals() + for key, val in six.iteritems(params['kwargs']): + if key not in all_params: + raise TypeError( + "Got an unexpected keyword argument '%s'" + " to method run_on_demand" % key + ) + params[key] = val + del params['kwargs'] + # verify the required parameter 'definitions_file' is set + if self.api_client.client_side_validation and ('definitions_file' not in params or + params['definitions_file'] is None): # noqa: E501 + raise ValueError("Missing the required parameter `definitions_file` when calling `run_on_demand`") # noqa: E501 + + collection_formats = {} + + path_params = {} + + query_params = [] + + header_params = annotation.get_annotation() if annotation else {} + + form_params = [] + local_var_files = {} + if 'definitions_file' in params: + local_var_files['definitionsFile'] = params['definitions_file'] # noqa: E501 + if 'deploy_descriptor_file' in params: + local_var_files['deployDescriptorFile'] = params['deploy_descriptor_file'] # noqa: E501 + + body_params = None + # HTTP header `Accept` + header_params['Accept'] = self.api_client.select_header_accept( + ['application/json']) # noqa: E501 + + # HTTP header `Content-Type` + header_params['Content-Type'] = self.api_client.select_header_content_type( # noqa: E501 + ['multipart/form-data']) # noqa: E501 + + # Authentication setting + auth_settings = ['ApiKeyAuth', 'Bearer'] # noqa: E501 + + return self.api_client.call_api( + '/run/ondemand', 'POST', + path_params, + query_params, + header_params, + body=body_params, + post_params=form_params, + files=local_var_files, + response_type='RunResult', # noqa: E501 + auth_settings=auth_settings, + async_req=params.get('async_req'), + _return_http_data_only=params.get('_return_http_data_only'), + _preload_content=params.get('_preload_content', True), + _request_timeout=params.get('_request_timeout'), + collection_formats=collection_formats) + def run_now(self, job_id, **kwargs): # noqa: E501 """Bypass scheduling cretirias and start the job # noqa: E501 diff --git a/src/ctm_python_client/core/workflow.py b/src/ctm_python_client/core/workflow.py index ab76f5f..87dfbbc 100644 --- a/src/ctm_python_client/core/workflow.py +++ b/src/ctm_python_client/core/workflow.py @@ -129,7 +129,7 @@ def add(self, obj: AAPIObject, inpath: str = None, allow_creation: bool = True): pass if inpath: - if isinstance(obj, SubFolder) or isinstance(obj, Job): + if isinstance(obj, SubFolder) or isinstance(obj, Job) or isinstance(obj, AAPIJob): try: folder = self.get(inpath) except Exception as e: @@ -138,7 +138,7 @@ def add(self, obj: AAPIObject, inpath: str = None, allow_creation: bool = True): else: raise e if folder: - if isinstance(obj, Job): + if isinstance(obj, Job) or isinstance(obj, AAPIJob): folder.job_list.append(obj) else: folder.sub_folder_list.append(obj) @@ -146,7 +146,7 @@ def add(self, obj: AAPIObject, inpath: str = None, allow_creation: bool = True): # Folder does not exist, create folder folder, parent_folder = self.create_folder_hierarchy(inpath) self._apply_defaults_for_folder(folder) - if isinstance(obj, Job): + if isinstance(obj, Job) or isinstance(obj, AAPIJob): parent_folder.job_list.append(obj) else: parent_folder.sub_folder_list.append(obj) @@ -408,13 +408,40 @@ def run(self, skip_login: bool = False, file_path: str = None, delete_afterwards try: res = self.aapiclient.run_api.run_jobs(fpath.resolve()) - # return res run_ = RunMonitor(res.run_id, self.aapiclient, monitor_page_uri= res.monitor_page_uri) - if open_in_browser: + if open_in_browser: run_.open_in_browser() return run_ except Exception as e: - raise e + errors = [err.get('message', '') + ' ' + err.get('item', '') + for err in json.loads(e.body)['errors']] + raise RuntimeError(f"AAPI request failed: {', '.join(errors)}") + finally: + if delete_afterwards: + fpath.unlink() + + def run_on_demand(self, skip_login: bool = False, file_path: str = None, delete_afterwards: bool = True, open_in_browser=False) -> RunMonitor: + if not skip_login: + self.aapiclient.authenticate() + + if not file_path: + file_path = f'{tempfile.gettempdir()}/temp_{random.randint(1000,9999)}.json' + + fpath = pathlib.Path(file_path) + + with open(fpath.resolve(), 'w') as f: + self.dump_json(f) + + try: + res = self.aapiclient.run_api.run_on_demand(fpath.resolve()) + run_ = RunMonitor(res.run_id, self.aapiclient, monitor_page_uri= res.monitor_page_uri) + if open_in_browser: + run_.open_in_browser() + return run_ + except Exception as e: + errors = [err.get('message', '') + ' ' + err.get('item', '') + for err in json.loads(e.body)['errors']] + raise RuntimeError(f"AAPI request failed: {', '.join(errors)}") finally: if delete_afterwards: fpath.unlink() From 249cfe072e1fbc56aac48425eb5570e91bbda65e Mon Sep 17 00:00:00 2001 From: RefaelBeker7 Date: Wed, 27 Nov 2024 15:56:34 +0200 Subject: [PATCH 2/6] fix pytest --- src/aapi/bases.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/aapi/bases.py b/src/aapi/bases.py index b983edf..1b2ac25 100644 --- a/src/aapi/bases.py +++ b/src/aapi/bases.py @@ -6,7 +6,7 @@ from ctm_python_client.core.workflow import WorkflowDefaults, Workflow from ctm_python_client.core.comm import Environment from ctm_python_client.core.monitoring import RunMonitor -from aapi import Job, FolderJobBaseSmart, Folder +from aapi import * class AAPIJob: pass From 652eb29c2dc74f71a59deba7b77275e6b365293d Mon Sep 17 00:00:00 2001 From: RefaelBeker7 Date: Wed, 27 Nov 2024 16:03:19 +0200 Subject: [PATCH 3/6] fix pytest --- src/ctm_python_client/core/workflow.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/ctm_python_client/core/workflow.py b/src/ctm_python_client/core/workflow.py index 87dfbbc..065e4d1 100644 --- a/src/ctm_python_client/core/workflow.py +++ b/src/ctm_python_client/core/workflow.py @@ -67,8 +67,7 @@ class WorkflowDefaults: run_as: str = attrs.field(default=None, metadata={'applyon': [ 'Folder', 'SimpleFolder', 'SubFolder', 'Job']}) host: str = attrs.field(default=None, metadata={'applyon': ['Job']}) - when: Job.When = attrs.field(default=None, metadata={ - 'applyon': ['Folder', 'Job']}) + when: Job.When = attrs.field(default=None, metadata={'applyon': ['Folder', 'Job']}) application: str = attrs.field(default=None, metadata={'applyon': [ 'Folder', 'SimpleFolder', 'SubFolder', 'Job']}) sub_application: str = attrs.field(default=None, metadata={'applyon': [ From acd1d22914be135e2ebc5b26d9d721ab316f5e5a Mon Sep 17 00:00:00 2001 From: RefaelBeker7 Date: Wed, 27 Nov 2024 16:07:10 +0200 Subject: [PATCH 4/6] fix pytest --- src/ctm_python_client/core/workflow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ctm_python_client/core/workflow.py b/src/ctm_python_client/core/workflow.py index 065e4d1..5d8dedc 100644 --- a/src/ctm_python_client/core/workflow.py +++ b/src/ctm_python_client/core/workflow.py @@ -8,7 +8,7 @@ import collections import copy -from aapi import * +from aapi import AAPIJob, AAPIObject, Job, Folder, SubFolder, SimpleFolder, WaitForEvents, AddEvents, DeleteEvents, EventOutDelete, EventIn, EventOutAdd, ConnectionProfile from ctm_python_client.core.comm import AAPIClientResponse, AbstractAAPIClient, Environment, EnvironmentMode, OnPremAAPIClient, SaasAAPIClient, sanitize_output from ctm_python_client.core.monitoring import RunMonitor From 9d6b933a7c22954b75079dc9acaf4f8f719919a2 Mon Sep 17 00:00:00 2001 From: RefaelBeker7 Date: Wed, 27 Nov 2024 16:33:38 +0200 Subject: [PATCH 5/6] fix pytest - circular dependency --- src/aapi/bases.py | 18 ++++++++++++++---- src/ctm_python_client/core/workflow.py | 2 +- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/src/aapi/bases.py b/src/aapi/bases.py index 1b2ac25..16cb666 100644 --- a/src/aapi/bases.py +++ b/src/aapi/bases.py @@ -3,10 +3,8 @@ import typing import json import random -from ctm_python_client.core.workflow import WorkflowDefaults, Workflow from ctm_python_client.core.comm import Environment from ctm_python_client.core.monitoring import RunMonitor -from aapi import * class AAPIJob: pass @@ -63,13 +61,25 @@ def dump_aapi(self, f, indent=None): def as_dict(self): return attrs.asdict(self) - def run_on_demand(self, environment: Environment, workflow_defaults: WorkflowDefaults = None, inpath: str = f'run_on_demand{random.randint(100,999)}', skip_login: bool = False, file_path: str = None, delete_afterwards: bool = True) -> RunMonitor: + def run_on_demand(self, environment: Environment, inpath: str = f'run_on_demand{random.randint(100,999)}', skip_login: bool = False, + file_path: str = None, delete_afterwards: bool = True, controlm_server: str = None, run_as: str = None, + host: str = None, application: str = None, sub_application: str = None) -> RunMonitor: + # Import circular dependency + from ctm_python_client.core.workflow import Workflow, WorkflowDefaults + from aapi import Job, Folder, FolderJobBaseSmart + if isinstance(self, Job) or (hasattr(self, 'job_list') and self.job_list is not None and len(self.job_list) > 0) or isinstance(self, FolderJobBaseSmart): try: on_demand_workflow = Workflow( environment, - workflow_defaults + WorkflowDefaults( + controlm_server=controlm_server, + run_as=run_as, + host=host, + application=application, + sub_application=sub_application + ) ) if isinstance(self, Folder): on_demand_workflow.add(self) diff --git a/src/ctm_python_client/core/workflow.py b/src/ctm_python_client/core/workflow.py index 5d8dedc..065e4d1 100644 --- a/src/ctm_python_client/core/workflow.py +++ b/src/ctm_python_client/core/workflow.py @@ -8,7 +8,7 @@ import collections import copy -from aapi import AAPIJob, AAPIObject, Job, Folder, SubFolder, SimpleFolder, WaitForEvents, AddEvents, DeleteEvents, EventOutDelete, EventIn, EventOutAdd, ConnectionProfile +from aapi import * from ctm_python_client.core.comm import AAPIClientResponse, AbstractAAPIClient, Environment, EnvironmentMode, OnPremAAPIClient, SaasAAPIClient, sanitize_output from ctm_python_client.core.monitoring import RunMonitor From 81d198c4bbb50250db38b2b51284a0b460a2ab30 Mon Sep 17 00:00:00 2001 From: RefaelBeker7 Date: Thu, 28 Nov 2024 21:04:11 +0200 Subject: [PATCH 6/6] Support run ondemand Automation-API Control-M --- src/aapi/bases.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/aapi/bases.py b/src/aapi/bases.py index 16cb666..eaf7d64 100644 --- a/src/aapi/bases.py +++ b/src/aapi/bases.py @@ -61,15 +61,14 @@ def dump_aapi(self, f, indent=None): def as_dict(self): return attrs.asdict(self) - def run_on_demand(self, environment: Environment, inpath: str = f'run_on_demand{random.randint(100,999)}', skip_login: bool = False, - file_path: str = None, delete_afterwards: bool = True, controlm_server: str = None, run_as: str = None, - host: str = None, application: str = None, sub_application: str = None) -> RunMonitor: + def run_on_demand(self, environment: Environment, inpath: str = f'run_on_demand{random.randint(100,999)}', controlm_server: str = None, + run_as: str = None, host: str = None, application: str = None, sub_application: str = None, skip_login: bool = False, + file_path: str = None, delete_afterwards: bool = True, open_in_browser: str = None) -> RunMonitor: # Import circular dependency from ctm_python_client.core.workflow import Workflow, WorkflowDefaults - from aapi import Job, Folder, FolderJobBaseSmart + from aapi import Job, Folder - if isinstance(self, Job) or (hasattr(self, 'job_list') and self.job_list is not None and - len(self.job_list) > 0) or isinstance(self, FolderJobBaseSmart): + if isinstance(self, Job) or (hasattr(self, 'job_list') and self.job_list is not None and len(self.job_list) > 0): try: on_demand_workflow = Workflow( environment, @@ -89,7 +88,8 @@ def run_on_demand(self, environment: Environment, inpath: str = f'run_on_demand{ on_demand_workflow.run_on_demand( skip_login=skip_login, file_path=file_path, - delete_afterwards=delete_afterwards + delete_afterwards=delete_afterwards, + open_in_browser=open_in_browser ) except Exception as e: errors = [err.get('message', '') + ' ' + err.get('item', '')