Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions ansys/rep/client/common/base_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ def __repr__(self):
",".join(["%s=%r" % (k, getattr(self, k)) for k in self.declared_fields()]),
)

def __eq__(self, other):
if not isinstance(other, self.__class__):
return NotImplemented
for k in self.declared_fields():
if not hasattr(other, k) or getattr(self, k, None) != getattr(other, k, None):
return False
return True

def __str__(self):

# Ideally we'd simply do
Expand Down
19 changes: 19 additions & 0 deletions ansys/rep/client/jms/api/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,22 @@ def delete_objects(session: Session, url: str, objects: List[Object]):
data = json.dumps({"source_ids": [obj.id for obj in objects]})

r = session.delete(url, data=data)


def copy_objects(session: Session, url: str, objects: List[Object], wait: bool = True) -> str:

are_same = [o.__class__ == objects[0].__class__ for o in objects[1:]]
if not all(are_same):
raise ClientError("Mixed object types")

obj_type = objects[0].__class__
rest_name = obj_type.Meta.rest_name
url = f"{url}/{rest_name}:copy"

source_ids = [obj.id for obj in objects]
r = session.post(url, data=json.dumps({"source_ids": source_ids}))

operation_location = r.headers["location"]
operation_id = operation_location.rsplit("/", 1)[-1]

return operation_id
92 changes: 75 additions & 17 deletions ansys/rep/client/jms/api/jms_api.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import json
import logging
import os
import time
from typing import List, Union
import uuid

import backoff
import requests

from ansys.rep.client.client import Client
from ansys.rep.client.common import Object
from ansys.rep.client.exceptions import REPError
from ansys.rep.client.jms.resource import (
Evaluator,
Expand All @@ -18,6 +19,7 @@
)
from ansys.rep.client.jms.schema.project import ProjectSchema

from .base import copy_objects as base_copy_objects
from .base import create_objects, delete_objects, get_object, get_objects, update_objects

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -204,6 +206,29 @@ def delete_task_definition_templates(self, templates: List[TaskDefinitionTemplat
"""
return delete_objects(self.client.session, self.url, templates)

def copy_task_definition_templates(
self, templates: List[TaskDefinitionTemplate], wait: bool = True
) -> Union[str, List[str]]:
"""Create new task definitions templates by copying existing ones

Parameters
----------
templates : List[TaskDefinitionTemplate]
A list of template objects. Note that only the ``id`` field of the
TaskDefinitionTemplate objects need to be filled; the other fields can be empty.

wait : bool
Whether to wait for the copy to complete or not.

Returns
-------
Union[List[str], str]
If wait=True, returns the list of newly created template IDs.
If wait=False, returns an operation ID that can be used to
track progress.
"""
return _copy_objects(self.client, self.url, templates, wait=wait)

# Task Definition Template Permissions
def get_task_definition_template_permissions(
self, template_id: str, as_objects: bool = True
Expand Down Expand Up @@ -241,8 +266,19 @@ def get_operations(self, as_objects=True, **query_params) -> List[Operation]:
def get_operation(self, id, as_object=True) -> Operation:
return get_object(self.client.session, self.url, Operation, id, as_object=as_object)

def _monitor_operation(self, operation_id: str, interval: float = 1.0):
return _monitor_operation(self, operation_id, interval)
def monitor_operation(self, operation_id: str, max_value: float = 5.0, max_time: float = None):
"""Poll an operation until it's completed using an exponential backoff

Parameters
----------
operation_id : str
ID of the operation to be monitored.
max_value: float, optional
Maximum interval between consecutive calls in seconds.
max_time: float, optional
The maximum total amount of time (in seconds) to try before giving up.
"""
return _monitor_operation(self, operation_id, max_value, max_time)

################################################################
# Storages
Expand Down Expand Up @@ -336,25 +372,47 @@ def delete_project(client, api_url, project):
r = client.session.delete(url)


def _monitor_operation(jms_api: JmsApi, operation_id: str, interval: float = 1.0):

done = False
op = None
while not done:
def _monitor_operation(
jms_api: JmsApi, operation_id: str, max_value: float = 5.0, max_time: float = None
) -> Operation:
@backoff.on_predicate(
backoff.expo,
lambda x: x[1] == False,
jitter=backoff.full_jitter,
max_value=max_value,
max_time=max_time,
)
def _monitor():
done = False
op = jms_api.get_operation(id=operation_id)
if op:
done = op.finished
progress = None
if op.progress is not None:
progress = f"{op.progress * 100.0}%"
log.info(
f"Operation {op.name} - progress={progress}, "
f"succeeded={op.succeeded}, finished={op.finished}"
)
time.sleep(interval)
return op, done

op, done = _monitor()

if not done:
raise REPError(f"Operation {operation_id} did not complete.")
return op


def _copy_objects(
client: Client, api_url: str, objects: List[Object], wait: bool = True
) -> Union[str, List[str]]:

operation_id = base_copy_objects(client.session, api_url, objects)

if not wait:
return operation_id

op = _monitor_operation(JmsApi(client), operation_id, 1.0)
if not op.succeeded:
obj_type = objects[0].__class__
rest_name = obj_type.Meta.rest_name
raise REPError(f"Failed to copy {rest_name} with ids = {[obj.id for obj in objects]}.")
return op.result["destination_ids"]


def restore_project(jms_api, archive_path):

if not os.path.exists(archive_path):
Expand Down Expand Up @@ -385,7 +443,7 @@ def restore_project(jms_api, archive_path):
operation_id = operation_location.rsplit("/", 1)[-1]
log.debug(f"Operation id: {operation_id}")

op = _monitor_operation(jms_api, operation_id, 1.0)
op = jms_api.monitor_operation(operation_id)

if not op.succeeded:
raise REPError(f"Failed to restore project from archive {archive_path}.")
Expand Down
108 changes: 75 additions & 33 deletions ansys/rep/client/jms/api/project_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
ParameterDefinition,
ParameterMapping,
Permission,
Project,
Task,
TaskDefinition,
)
from ansys.rep.client.jms.schema.job import JobSchema

from .base import create_objects, delete_objects, get_objects, update_objects
from .jms_api import JmsApi, _monitor_operation
from .jms_api import JmsApi, _copy_objects

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -211,6 +211,29 @@ def update_task_definitions(
def delete_task_definitions(self, task_definitions: List[TaskDefinition]):
return self._delete_objects(task_definitions)

def copy_task_definitions(
self, task_definitions: List[TaskDefinition], wait: bool = True
) -> Union[str, List[str]]:
"""Create new task definitions by copying existing ones

Parameters
----------
task_definitions : List[TaskDefinition]
A list of task definition objects. Note that only the ``id`` field of the
TaskDefinition objects need to be filled; the other fields can be empty.

wait : bool
Whether to wait for the copy to complete or not.

Returns
-------
Union[List[str], str]
If wait=True, returns the list of newly created task definition IDs.
If wait=False, returns an operation ID that can be used to
track progress.
"""
return _copy_objects(self.client, self.url, task_definitions, wait=wait)

################################################################
# Job definitions
def get_job_definitions(self, as_objects=True, **query_params) -> List[JobDefinition]:
Expand All @@ -229,6 +252,29 @@ def update_job_definitions(
def delete_job_definitions(self, job_definitions: List[JobDefinition]):
return self._delete_objects(job_definitions)

def copy_job_definitions(
self, job_definitions: List[JobDefinition], wait: bool = True
) -> Union[str, List[str]]:
"""Create new job definitions by copying existing ones

Parameters
----------
job_definitions : List[JobDefinition]
A list of job definition objects. Note that only the ``id`` field of the
JobDefinition objects need to be filled; the other fields can be empty.

wait : bool
Whether to wait for the copy to complete or not.

Returns
-------
Union[List[str], str]
If wait=True, returns the list of newly created job definition IDs.
If wait=False, returns an operation ID that can be used to
track progress.
"""
return _copy_objects(self.client, self.url, job_definitions, wait=wait)

################################################################
# Jobs
def get_jobs(self, as_objects=True, **query_params) -> List[Job]:
Expand All @@ -246,16 +292,26 @@ def create_jobs(self, jobs: List[Job], as_objects=True) -> List[Job]:
"""
return self._create_objects(jobs, as_objects=as_objects)

def copy_jobs(self, jobs: List[Job], as_objects=True, **query_params):
def copy_jobs(self, jobs: List[Job], wait: bool = True) -> Union[str, List[str]]:
"""Create new jobs by copying existing ones

Args:
jobs (list of :class:`ansys.rep.client.jms.Job`): A list of job objects

Note that only the ``id`` field of the Job objects need to be filled;
the other fields can be empty.
Parameters
----------
jobs : List[Job]
A list of job objects. Note that only the ``id`` field of the
Job objects need to be filled; the other fields can be empty.

wait : bool
Whether to wait for the copy to complete or not.

Returns
-------
Union[List[str], str]
If wait=True, returns the list of newly created job IDs.
If wait=False, returns an operation ID that can be used to
track progress.
"""
return copy_jobs(self, jobs, as_objects=as_objects, **query_params)
return _copy_objects(self.client, self.url, jobs, wait=wait)

def update_jobs(self, jobs: List[Job], as_objects=True) -> List[Job]:
"""Update existing jobs
Expand Down Expand Up @@ -531,19 +587,12 @@ def copy_projects(
project_api: ProjectApi, project_source_ids: List[str], wait: bool = True
) -> Union[str, List[str]]:

url = f"{project_api.jms_api_url}/projects:copy"
r = project_api.client.session.post(url, data=json.dumps({"source_ids": project_source_ids}))

operation_location = r.headers["location"]
operation_id = operation_location.rsplit("/", 1)[-1]

if not wait:
return operation_location

op = _monitor_operation(JmsApi(project_api.client), operation_id, 1.0)
if not op.succeeded:
raise REPError(f"Failed to copy projects {project_source_ids}.")
return op.result["destination_ids"]
return _copy_objects(
project_api.client,
project_api.jms_api_url,
[Project(id=id) for id in project_source_ids],
wait=wait,
)


def archive_project(project_api: ProjectApi, target_path, include_job_files=True) -> str:
Expand All @@ -561,7 +610,8 @@ def archive_project(project_api: ProjectApi, target_path, include_job_files=True
log.debug(f"Operation location: {operation_location}")
operation_id = operation_location.rsplit("/", 1)[-1]

op = _monitor_operation(JmsApi(project_api.client), operation_id, 1.0)
jms_api = JmsApi(project_api.client)
op = jms_api.monitor_operation(operation_id)

if not op.succeeded:
raise REPError(f"Failed to archive project {project_api.project_id}.\n{op}")
Expand Down Expand Up @@ -591,16 +641,8 @@ def archive_project(project_api: ProjectApi, target_path, include_job_files=True
def copy_jobs(project_api: ProjectApi, jobs: List[Job], as_objects=True, **query_params):
"""Create new jobs by copying existing ones"""

url = f"{project_api.url}/jobs"

json_data = json.dumps({"source_ids": [obj.id for obj in jobs]})
r = project_api.client.session.post(f"{url}", data=json_data, params=query_params)

data = r.json()["jobs"]
if not as_objects:
return data

return JobSchema(many=True).load(data)
ids = _copy_objects(client=project_api.client, api_url=project_api.url, objects=jobs, wait=True)
return ids


def sync_jobs(project_api: ProjectApi, jobs: List[Job]):
Expand Down
4 changes: 2 additions & 2 deletions examples/mapdl_motorbike_frame/project_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@


def create_project(
client, name, version=__external_version__, num_jobs=20, use_exec_script=False
client, name, version=__external_version__, num_jobs=20, use_exec_script=False, active=True
) -> Project:
"""
Create a REP project consisting of an ANSYS APDL beam model of a motorbike-frame.
Expand All @@ -47,7 +47,7 @@ def create_project(
"""
jms_api = JmsApi(client)
log.debug("=== Project")
proj = Project(name=name, priority=1, active=True)
proj = Project(name=name, priority=1, active=active)
proj = jms_api.create_project(proj, replace=True)

project_api = ProjectApi(client, proj.id)
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
"marshmallow_oneofschema>=2.0.1",
"cachetools>=4.0.0",
"python-keycloak>=1.5.0,<=2.12.0",
"backoff>=2.0.0",
],
python_requires=">=3.7",
packages=find_namespace_packages(where=".", include="ansys*"),
Expand Down
Loading