Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 42 additions & 1 deletion src/aapi/bases.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
import enum
import typing
import json
import random
from ctm_python_client.core.comm import Environment
from ctm_python_client.core.monitoring import RunMonitor

class AAPIJob:
pass
Expand Down Expand Up @@ -57,4 +60,42 @@ def dump_aapi(self, f, indent=None):

def as_dict(self):
return attrs.asdict(self)


def run_on_demand(self, environment: Environment, inpath: str = f'run_on_demand{random.randint(100,999)}', controlm_server: str = None,
run_as: str = None, host: str = None, application: str = None, sub_application: str = None, skip_login: bool = False,
file_path: str = None, delete_afterwards: bool = True, open_in_browser: str = None) -> RunMonitor:
# Import circular dependency
from ctm_python_client.core.workflow import Workflow, WorkflowDefaults
from aapi import Job, Folder

if isinstance(self, Job) or (hasattr(self, 'job_list') and self.job_list is not None and len(self.job_list) > 0):
try:
on_demand_workflow = Workflow(
environment,
WorkflowDefaults(
controlm_server=controlm_server,
run_as=run_as,
host=host,
application=application,
sub_application=sub_application
)
)
if isinstance(self, Folder):
on_demand_workflow.add(self)
else:
on_demand_workflow.add(self, inpath=inpath)

on_demand_workflow.run_on_demand(
skip_login=skip_login,
file_path=file_path,
delete_afterwards=delete_afterwards,
open_in_browser=open_in_browser
)
except Exception as e:
errors = [err.get('message', '') + ' ' + err.get('item', '')
for err in json.loads(e.body)['errors']]
raise RuntimeError(f"AAPI request failed: {', '.join(errors)}")
finally:
on_demand_workflow.clear_all()
else:
raise Exception('Run is not allowed for json without jobs')
106 changes: 106 additions & 0 deletions src/clients/ctm_api_client/api/run_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3413,6 +3413,112 @@ def run_jobs_with_http_info(self, job_definitions_file, **kwargs): # noqa: E501
collection_formats=collection_formats,
)

def run_on_demand(self, definitions_file, annotation=None, **kwargs): # noqa: E501
"""Run a job on demand # noqa: E501

Run a job on demand # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.run_on_demand(definitions_file, async_req=True)
>>> result = thread.get()

:param async_req bool
:param file definitions_file: A file that contains jobs definitions to be run on demand (required)
:param file deploy_descriptor_file: Deploy Descriptor JSON file.
:param annotation - user annotation
:return: RunResult
If the method is called asynchronously,
returns the request thread.
"""
kwargs['_return_http_data_only'] = True
if kwargs.get('async_req'):
return self.run_on_demand_with_http_info(definitions_file, annotation, **kwargs) # noqa: E501
else:
(data) = self.run_on_demand_with_http_info(definitions_file, annotation, **kwargs) # noqa: E501
return data


def run_on_demand_with_http_info(self, definitions_file, annotation, **kwargs): # noqa: E501
"""Run a job on demand # noqa: E501

Run a job on demand # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.run_on_demand_with_http_info(definitions_file, async_req=True)
>>> result = thread.get()

:param async_req bool
:param file definitions_file: A file that contains jobs definitions to be run on demand (required)
:param file deploy_descriptor_file: Deploy Descriptor JSON file.
:param annotation - user annotation
:return: RunResult
If the method is called asynchronously,
returns the request thread.
"""

all_params = ['definitions_file', 'deploy_descriptor_file'] # noqa: E501
all_params.append('async_req')
all_params.append('_return_http_data_only')
all_params.append('_preload_content')
all_params.append('_request_timeout')

params = locals()
for key, val in six.iteritems(params['kwargs']):
if key not in all_params:
raise TypeError(
"Got an unexpected keyword argument '%s'"
" to method run_on_demand" % key
)
params[key] = val
del params['kwargs']
# verify the required parameter 'definitions_file' is set
if self.api_client.client_side_validation and ('definitions_file' not in params or
params['definitions_file'] is None): # noqa: E501
raise ValueError("Missing the required parameter `definitions_file` when calling `run_on_demand`") # noqa: E501

collection_formats = {}

path_params = {}

query_params = []

header_params = annotation.get_annotation() if annotation else {}

form_params = []
local_var_files = {}
if 'definitions_file' in params:
local_var_files['definitionsFile'] = params['definitions_file'] # noqa: E501
if 'deploy_descriptor_file' in params:
local_var_files['deployDescriptorFile'] = params['deploy_descriptor_file'] # noqa: E501

body_params = None
# HTTP header `Accept`
header_params['Accept'] = self.api_client.select_header_accept(
['application/json']) # noqa: E501

# HTTP header `Content-Type`
header_params['Content-Type'] = self.api_client.select_header_content_type( # noqa: E501
['multipart/form-data']) # noqa: E501

# Authentication setting
auth_settings = ['ApiKeyAuth', 'Bearer'] # noqa: E501

return self.api_client.call_api(
'/run/ondemand', 'POST',
path_params,
query_params,
header_params,
body=body_params,
post_params=form_params,
files=local_var_files,
response_type='RunResult', # noqa: E501
auth_settings=auth_settings,
async_req=params.get('async_req'),
_return_http_data_only=params.get('_return_http_data_only'),
_preload_content=params.get('_preload_content', True),
_request_timeout=params.get('_request_timeout'),
collection_formats=collection_formats)

def run_now(self, job_id, **kwargs): # noqa: E501
"""Bypass scheduling cretirias and start the job # noqa: E501

Expand Down
106 changes: 106 additions & 0 deletions src/clients/ctm_saas_client/api/run_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2723,6 +2723,112 @@ def run_jobs_with_http_info(self, job_definitions_file, **kwargs): # noqa: E501
_request_timeout=params.get('_request_timeout'),
collection_formats=collection_formats)

def run_on_demand(self, definitions_file, annotation=None, **kwargs): # noqa: E501
"""Run a job on demand # noqa: E501

Run a job on demand # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.run_on_demand(definitions_file, async_req=True)
>>> result = thread.get()

:param async_req bool
:param file definitions_file: A file that contains jobs definitions to be run on demand (required)
:param file deploy_descriptor_file: Deploy Descriptor JSON file.
:param annotation - user annotation
:return: RunResult
If the method is called asynchronously,
returns the request thread.
"""
kwargs['_return_http_data_only'] = True
if kwargs.get('async_req'):
return self.run_on_demand_with_http_info(definitions_file, annotation, **kwargs) # noqa: E501
else:
(data) = self.run_on_demand_with_http_info(definitions_file, annotation, **kwargs) # noqa: E501
return data


def run_on_demand_with_http_info(self, definitions_file, annotation, **kwargs): # noqa: E501
"""Run a job on demand # noqa: E501

Run a job on demand # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.run_on_demand_with_http_info(definitions_file, async_req=True)
>>> result = thread.get()

:param async_req bool
:param file definitions_file: A file that contains jobs definitions to be run on demand (required)
:param file deploy_descriptor_file: Deploy Descriptor JSON file.
:param annotation - user annotation
:return: RunResult
If the method is called asynchronously,
returns the request thread.
"""

all_params = ['definitions_file', 'deploy_descriptor_file'] # noqa: E501
all_params.append('async_req')
all_params.append('_return_http_data_only')
all_params.append('_preload_content')
all_params.append('_request_timeout')

params = locals()
for key, val in six.iteritems(params['kwargs']):
if key not in all_params:
raise TypeError(
"Got an unexpected keyword argument '%s'"
" to method run_on_demand" % key
)
params[key] = val
del params['kwargs']
# verify the required parameter 'definitions_file' is set
if self.api_client.client_side_validation and ('definitions_file' not in params or
params['definitions_file'] is None): # noqa: E501
raise ValueError("Missing the required parameter `definitions_file` when calling `run_on_demand`") # noqa: E501

collection_formats = {}

path_params = {}

query_params = []

header_params = annotation.get_annotation() if annotation else {}

form_params = []
local_var_files = {}
if 'definitions_file' in params:
local_var_files['definitionsFile'] = params['definitions_file'] # noqa: E501
if 'deploy_descriptor_file' in params:
local_var_files['deployDescriptorFile'] = params['deploy_descriptor_file'] # noqa: E501

body_params = None
# HTTP header `Accept`
header_params['Accept'] = self.api_client.select_header_accept(
['application/json']) # noqa: E501

# HTTP header `Content-Type`
header_params['Content-Type'] = self.api_client.select_header_content_type( # noqa: E501
['multipart/form-data']) # noqa: E501

# Authentication setting
auth_settings = ['ApiKeyAuth', 'Bearer'] # noqa: E501

return self.api_client.call_api(
'/run/ondemand', 'POST',
path_params,
query_params,
header_params,
body=body_params,
post_params=form_params,
files=local_var_files,
response_type='RunResult', # noqa: E501
auth_settings=auth_settings,
async_req=params.get('async_req'),
_return_http_data_only=params.get('_return_http_data_only'),
_preload_content=params.get('_preload_content', True),
_request_timeout=params.get('_request_timeout'),
collection_formats=collection_formats)

def run_now(self, job_id, **kwargs): # noqa: E501
"""Bypass scheduling cretirias and start the job # noqa: E501

Expand Down
42 changes: 34 additions & 8 deletions src/ctm_python_client/core/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ class WorkflowDefaults:
run_as: str = attrs.field(default=None, metadata={'applyon': [
'Folder', 'SimpleFolder', 'SubFolder', 'Job']})
host: str = attrs.field(default=None, metadata={'applyon': ['Job']})
when: Job.When = attrs.field(default=None, metadata={
'applyon': ['Folder', 'Job']})
when: Job.When = attrs.field(default=None, metadata={'applyon': ['Folder', 'Job']})
application: str = attrs.field(default=None, metadata={'applyon': [
'Folder', 'SimpleFolder', 'SubFolder', 'Job']})
sub_application: str = attrs.field(default=None, metadata={'applyon': [
Expand Down Expand Up @@ -129,7 +128,7 @@ def add(self, obj: AAPIObject, inpath: str = None, allow_creation: bool = True):
pass

if inpath:
if isinstance(obj, SubFolder) or isinstance(obj, Job):
if isinstance(obj, SubFolder) or isinstance(obj, Job) or isinstance(obj, AAPIJob):
try:
folder = self.get(inpath)
except Exception as e:
Expand All @@ -138,15 +137,15 @@ def add(self, obj: AAPIObject, inpath: str = None, allow_creation: bool = True):
else:
raise e
if folder:
if isinstance(obj, Job):
if isinstance(obj, Job) or isinstance(obj, AAPIJob):
folder.job_list.append(obj)
else:
folder.sub_folder_list.append(obj)
else:
# Folder does not exist, create folder
folder, parent_folder = self.create_folder_hierarchy(inpath)
self._apply_defaults_for_folder(folder)
if isinstance(obj, Job):
if isinstance(obj, Job) or isinstance(obj, AAPIJob):
parent_folder.job_list.append(obj)
else:
parent_folder.sub_folder_list.append(obj)
Expand Down Expand Up @@ -408,13 +407,40 @@ def run(self, skip_login: bool = False, file_path: str = None, delete_afterwards

try:
res = self.aapiclient.run_api.run_jobs(fpath.resolve())
# return res
run_ = RunMonitor(res.run_id, self.aapiclient, monitor_page_uri= res.monitor_page_uri)
if open_in_browser:
if open_in_browser:
run_.open_in_browser()
return run_
except Exception as e:
raise e
errors = [err.get('message', '') + ' ' + err.get('item', '')
for err in json.loads(e.body)['errors']]
raise RuntimeError(f"AAPI request failed: {', '.join(errors)}")
finally:
if delete_afterwards:
fpath.unlink()

def run_on_demand(self, skip_login: bool = False, file_path: str = None, delete_afterwards: bool = True, open_in_browser=False) -> RunMonitor:
if not skip_login:
self.aapiclient.authenticate()

if not file_path:
file_path = f'{tempfile.gettempdir()}/temp_{random.randint(1000,9999)}.json'

fpath = pathlib.Path(file_path)

with open(fpath.resolve(), 'w') as f:
self.dump_json(f)

try:
res = self.aapiclient.run_api.run_on_demand(fpath.resolve())
run_ = RunMonitor(res.run_id, self.aapiclient, monitor_page_uri= res.monitor_page_uri)
if open_in_browser:
run_.open_in_browser()
return run_
except Exception as e:
errors = [err.get('message', '') + ' ' + err.get('item', '')
for err in json.loads(e.body)['errors']]
raise RuntimeError(f"AAPI request failed: {', '.join(errors)}")
finally:
if delete_afterwards:
fpath.unlink()