diff --git a/ansys/rep/client/common/base_resource.py b/ansys/rep/client/common/base_resource.py index a214e679b..76dada9c5 100644 --- a/ansys/rep/client/common/base_resource.py +++ b/ansys/rep/client/common/base_resource.py @@ -54,6 +54,14 @@ def __repr__(self): ",".join(["%s=%r" % (k, getattr(self, k)) for k in self.declared_fields()]), ) + def __eq__(self, other): + if not isinstance(other, self.__class__): + return NotImplemented + for k in self.declared_fields(): + if not hasattr(other, k) or getattr(self, k, None) != getattr(other, k, None): + return False + return True + def __str__(self): # Ideally we'd simply do diff --git a/ansys/rep/client/jms/api/base.py b/ansys/rep/client/jms/api/base.py index 4e5c0b544..cff4ca783 100644 --- a/ansys/rep/client/jms/api/base.py +++ b/ansys/rep/client/jms/api/base.py @@ -123,3 +123,22 @@ def delete_objects(session: Session, url: str, objects: List[Object]): data = json.dumps({"source_ids": [obj.id for obj in objects]}) r = session.delete(url, data=data) + + +def copy_objects(session: Session, url: str, objects: List[Object], wait: bool = True) -> str: + + are_same = [o.__class__ == objects[0].__class__ for o in objects[1:]] + if not all(are_same): + raise ClientError("Mixed object types") + + obj_type = objects[0].__class__ + rest_name = obj_type.Meta.rest_name + url = f"{url}/{rest_name}:copy" + + source_ids = [obj.id for obj in objects] + r = session.post(url, data=json.dumps({"source_ids": source_ids})) + + operation_location = r.headers["location"] + operation_id = operation_location.rsplit("/", 1)[-1] + + return operation_id diff --git a/ansys/rep/client/jms/api/jms_api.py b/ansys/rep/client/jms/api/jms_api.py index 7bf2f1437..30cb0185e 100644 --- a/ansys/rep/client/jms/api/jms_api.py +++ b/ansys/rep/client/jms/api/jms_api.py @@ -1,13 +1,14 @@ import json import logging import os -import time from typing import List, Union import uuid +import backoff import requests from ansys.rep.client.client import Client +from ansys.rep.client.common import Object from ansys.rep.client.exceptions import REPError from ansys.rep.client.jms.resource import ( Evaluator, @@ -18,6 +19,7 @@ ) from ansys.rep.client.jms.schema.project import ProjectSchema +from .base import copy_objects as base_copy_objects from .base import create_objects, delete_objects, get_object, get_objects, update_objects log = logging.getLogger(__name__) @@ -204,6 +206,29 @@ def delete_task_definition_templates(self, templates: List[TaskDefinitionTemplat """ return delete_objects(self.client.session, self.url, templates) + def copy_task_definition_templates( + self, templates: List[TaskDefinitionTemplate], wait: bool = True + ) -> Union[str, List[str]]: + """Create new task definitions templates by copying existing ones + + Parameters + ---------- + templates : List[TaskDefinitionTemplate] + A list of template objects. Note that only the ``id`` field of the + TaskDefinitionTemplate objects need to be filled; the other fields can be empty. + + wait : bool + Whether to wait for the copy to complete or not. + + Returns + ------- + Union[List[str], str] + If wait=True, returns the list of newly created template IDs. + If wait=False, returns an operation ID that can be used to + track progress. + """ + return _copy_objects(self.client, self.url, templates, wait=wait) + # Task Definition Template Permissions def get_task_definition_template_permissions( self, template_id: str, as_objects: bool = True @@ -241,8 +266,19 @@ def get_operations(self, as_objects=True, **query_params) -> List[Operation]: def get_operation(self, id, as_object=True) -> Operation: return get_object(self.client.session, self.url, Operation, id, as_object=as_object) - def _monitor_operation(self, operation_id: str, interval: float = 1.0): - return _monitor_operation(self, operation_id, interval) + def monitor_operation(self, operation_id: str, max_value: float = 5.0, max_time: float = None): + """Poll an operation until it's completed using an exponential backoff + + Parameters + ---------- + operation_id : str + ID of the operation to be monitored. + max_value: float, optional + Maximum interval between consecutive calls in seconds. + max_time: float, optional + The maximum total amount of time (in seconds) to try before giving up. + """ + return _monitor_operation(self, operation_id, max_value, max_time) ################################################################ # Storages @@ -336,25 +372,47 @@ def delete_project(client, api_url, project): r = client.session.delete(url) -def _monitor_operation(jms_api: JmsApi, operation_id: str, interval: float = 1.0): - - done = False - op = None - while not done: +def _monitor_operation( + jms_api: JmsApi, operation_id: str, max_value: float = 5.0, max_time: float = None +) -> Operation: + @backoff.on_predicate( + backoff.expo, + lambda x: x[1] == False, + jitter=backoff.full_jitter, + max_value=max_value, + max_time=max_time, + ) + def _monitor(): + done = False op = jms_api.get_operation(id=operation_id) if op: done = op.finished - progress = None - if op.progress is not None: - progress = f"{op.progress * 100.0}%" - log.info( - f"Operation {op.name} - progress={progress}, " - f"succeeded={op.succeeded}, finished={op.finished}" - ) - time.sleep(interval) + return op, done + + op, done = _monitor() + + if not done: + raise REPError(f"Operation {operation_id} did not complete.") return op +def _copy_objects( + client: Client, api_url: str, objects: List[Object], wait: bool = True +) -> Union[str, List[str]]: + + operation_id = base_copy_objects(client.session, api_url, objects) + + if not wait: + return operation_id + + op = _monitor_operation(JmsApi(client), operation_id, 1.0) + if not op.succeeded: + obj_type = objects[0].__class__ + rest_name = obj_type.Meta.rest_name + raise REPError(f"Failed to copy {rest_name} with ids = {[obj.id for obj in objects]}.") + return op.result["destination_ids"] + + def restore_project(jms_api, archive_path): if not os.path.exists(archive_path): @@ -385,7 +443,7 @@ def restore_project(jms_api, archive_path): operation_id = operation_location.rsplit("/", 1)[-1] log.debug(f"Operation id: {operation_id}") - op = _monitor_operation(jms_api, operation_id, 1.0) + op = jms_api.monitor_operation(operation_id) if not op.succeeded: raise REPError(f"Failed to restore project from archive {archive_path}.") diff --git a/ansys/rep/client/jms/api/project_api.py b/ansys/rep/client/jms/api/project_api.py index 94239d822..cb984a8ba 100644 --- a/ansys/rep/client/jms/api/project_api.py +++ b/ansys/rep/client/jms/api/project_api.py @@ -20,13 +20,13 @@ ParameterDefinition, ParameterMapping, Permission, + Project, Task, TaskDefinition, ) -from ansys.rep.client.jms.schema.job import JobSchema from .base import create_objects, delete_objects, get_objects, update_objects -from .jms_api import JmsApi, _monitor_operation +from .jms_api import JmsApi, _copy_objects log = logging.getLogger(__name__) @@ -211,6 +211,29 @@ def update_task_definitions( def delete_task_definitions(self, task_definitions: List[TaskDefinition]): return self._delete_objects(task_definitions) + def copy_task_definitions( + self, task_definitions: List[TaskDefinition], wait: bool = True + ) -> Union[str, List[str]]: + """Create new task definitions by copying existing ones + + Parameters + ---------- + task_definitions : List[TaskDefinition] + A list of task definition objects. Note that only the ``id`` field of the + TaskDefinition objects need to be filled; the other fields can be empty. + + wait : bool + Whether to wait for the copy to complete or not. + + Returns + ------- + Union[List[str], str] + If wait=True, returns the list of newly created task definition IDs. + If wait=False, returns an operation ID that can be used to + track progress. + """ + return _copy_objects(self.client, self.url, task_definitions, wait=wait) + ################################################################ # Job definitions def get_job_definitions(self, as_objects=True, **query_params) -> List[JobDefinition]: @@ -229,6 +252,29 @@ def update_job_definitions( def delete_job_definitions(self, job_definitions: List[JobDefinition]): return self._delete_objects(job_definitions) + def copy_job_definitions( + self, job_definitions: List[JobDefinition], wait: bool = True + ) -> Union[str, List[str]]: + """Create new job definitions by copying existing ones + + Parameters + ---------- + job_definitions : List[JobDefinition] + A list of job definition objects. Note that only the ``id`` field of the + JobDefinition objects need to be filled; the other fields can be empty. + + wait : bool + Whether to wait for the copy to complete or not. + + Returns + ------- + Union[List[str], str] + If wait=True, returns the list of newly created job definition IDs. + If wait=False, returns an operation ID that can be used to + track progress. + """ + return _copy_objects(self.client, self.url, job_definitions, wait=wait) + ################################################################ # Jobs def get_jobs(self, as_objects=True, **query_params) -> List[Job]: @@ -246,16 +292,26 @@ def create_jobs(self, jobs: List[Job], as_objects=True) -> List[Job]: """ return self._create_objects(jobs, as_objects=as_objects) - def copy_jobs(self, jobs: List[Job], as_objects=True, **query_params): + def copy_jobs(self, jobs: List[Job], wait: bool = True) -> Union[str, List[str]]: """Create new jobs by copying existing ones - Args: - jobs (list of :class:`ansys.rep.client.jms.Job`): A list of job objects - - Note that only the ``id`` field of the Job objects need to be filled; - the other fields can be empty. + Parameters + ---------- + jobs : List[Job] + A list of job objects. Note that only the ``id`` field of the + Job objects need to be filled; the other fields can be empty. + + wait : bool + Whether to wait for the copy to complete or not. + + Returns + ------- + Union[List[str], str] + If wait=True, returns the list of newly created job IDs. + If wait=False, returns an operation ID that can be used to + track progress. """ - return copy_jobs(self, jobs, as_objects=as_objects, **query_params) + return _copy_objects(self.client, self.url, jobs, wait=wait) def update_jobs(self, jobs: List[Job], as_objects=True) -> List[Job]: """Update existing jobs @@ -531,19 +587,12 @@ def copy_projects( project_api: ProjectApi, project_source_ids: List[str], wait: bool = True ) -> Union[str, List[str]]: - url = f"{project_api.jms_api_url}/projects:copy" - r = project_api.client.session.post(url, data=json.dumps({"source_ids": project_source_ids})) - - operation_location = r.headers["location"] - operation_id = operation_location.rsplit("/", 1)[-1] - - if not wait: - return operation_location - - op = _monitor_operation(JmsApi(project_api.client), operation_id, 1.0) - if not op.succeeded: - raise REPError(f"Failed to copy projects {project_source_ids}.") - return op.result["destination_ids"] + return _copy_objects( + project_api.client, + project_api.jms_api_url, + [Project(id=id) for id in project_source_ids], + wait=wait, + ) def archive_project(project_api: ProjectApi, target_path, include_job_files=True) -> str: @@ -561,7 +610,8 @@ def archive_project(project_api: ProjectApi, target_path, include_job_files=True log.debug(f"Operation location: {operation_location}") operation_id = operation_location.rsplit("/", 1)[-1] - op = _monitor_operation(JmsApi(project_api.client), operation_id, 1.0) + jms_api = JmsApi(project_api.client) + op = jms_api.monitor_operation(operation_id) if not op.succeeded: raise REPError(f"Failed to archive project {project_api.project_id}.\n{op}") @@ -591,16 +641,8 @@ def archive_project(project_api: ProjectApi, target_path, include_job_files=True def copy_jobs(project_api: ProjectApi, jobs: List[Job], as_objects=True, **query_params): """Create new jobs by copying existing ones""" - url = f"{project_api.url}/jobs" - - json_data = json.dumps({"source_ids": [obj.id for obj in jobs]}) - r = project_api.client.session.post(f"{url}", data=json_data, params=query_params) - - data = r.json()["jobs"] - if not as_objects: - return data - - return JobSchema(many=True).load(data) + ids = _copy_objects(client=project_api.client, api_url=project_api.url, objects=jobs, wait=True) + return ids def sync_jobs(project_api: ProjectApi, jobs: List[Job]): diff --git a/examples/mapdl_motorbike_frame/project_setup.py b/examples/mapdl_motorbike_frame/project_setup.py index 35053b836..2f6c70bd4 100644 --- a/examples/mapdl_motorbike_frame/project_setup.py +++ b/examples/mapdl_motorbike_frame/project_setup.py @@ -32,7 +32,7 @@ def create_project( - client, name, version=__external_version__, num_jobs=20, use_exec_script=False + client, name, version=__external_version__, num_jobs=20, use_exec_script=False, active=True ) -> Project: """ Create a REP project consisting of an ANSYS APDL beam model of a motorbike-frame. @@ -47,7 +47,7 @@ def create_project( """ jms_api = JmsApi(client) log.debug("=== Project") - proj = Project(name=name, priority=1, active=True) + proj = Project(name=name, priority=1, active=active) proj = jms_api.create_project(proj, replace=True) project_api = ProjectApi(client, proj.id) diff --git a/setup.py b/setup.py index d66e45e14..2e04eeee1 100644 --- a/setup.py +++ b/setup.py @@ -35,6 +35,7 @@ "marshmallow_oneofschema>=2.0.1", "cachetools>=4.0.0", "python-keycloak>=1.5.0,<=2.12.0", + "backoff>=2.0.0", ], python_requires=">=3.7", packages=find_namespace_packages(where=".", include="ansys*"), diff --git a/tests/jms/test_job_definitions.py b/tests/jms/test_job_definitions.py index fbc063516..1e52f8f51 100644 --- a/tests/jms/test_job_definitions.py +++ b/tests/jms/test_job_definitions.py @@ -1,5 +1,7 @@ import logging +from examples.mapdl_motorbike_frame.project_setup import create_project + from ansys.rep.client.jms import JmsApi, ProjectApi from ansys.rep.client.jms.resource import ( JobDefinition, @@ -63,3 +65,48 @@ def test_task_definition_fields(self): self.assertEqual(task_def.resource_requirements.disk_space, 2199023255552) jms_api.delete_project(project) + + def test_task_and_job_definition_copy(self): + + # create new project + num_jobs = 1 + project = create_project( + self.client(), + f"test_task_definition_copy", + num_jobs=num_jobs, + use_exec_script=False, + active=False, + ) + self.assertIsNotNone(project) + + jms_api = JmsApi(self.client()) + project_api = ProjectApi(self.client(), project.id) + + # copy task definition + task_definitions = project_api.get_task_definitions() + self.assertEqual(len(task_definitions), 1) + + original_td = task_definitions[0] + new_td_id = project_api.copy_task_definitions(task_definitions) + new_td = project_api.get_task_definitions(id=new_td_id)[0] + + self.assertTrue(original_td.name in new_td.name) + for attr in ["software_requirements", "resource_requirements", "execution_command"]: + self.assertEqual(getattr(original_td, attr), getattr(new_td, attr)) + + # copy job definition + job_definitions = project_api.get_job_definitions() + self.assertEqual(len(job_definitions), 1) + + original_jd = job_definitions[0] + new_jd_id = project_api.copy_job_definitions(job_definitions) + new_jd = project_api.get_job_definitions(id=new_jd_id)[0] + + self.assertTrue(original_jd.name in new_jd.name) + self.assertEqual( + len(original_jd.parameter_definition_ids), len(new_jd.parameter_definition_ids) + ) + self.assertEqual(len(original_jd.parameter_mapping_ids), len(new_jd.parameter_mapping_ids)) + self.assertEqual(len(original_jd.task_definition_ids), len(new_jd.task_definition_ids)) + + jms_api.delete_project(project) diff --git a/tests/jms/test_jobs.py b/tests/jms/test_jobs.py index 234e7efb0..31ce025fa 100644 --- a/tests/jms/test_jobs.py +++ b/tests/jms/test_jobs.py @@ -193,7 +193,8 @@ def test_job_integration(self): jobs = project_api.get_jobs() self.assertEqual(len(jobs), 8) - new_jobs = project_api.copy_jobs([Job(id=job.id) for job in jobs[:3]]) + new_job_ids = project_api.copy_jobs([Job(id=job.id) for job in jobs[:3]]) + new_jobs = project_api.get_jobs(id=new_job_ids) for i in range(3): self.assertEqual(new_jobs[i].creator, jobs[i].creator) self.assertEqual(new_jobs[i].note, jobs[i].note) diff --git a/tests/jms/test_resources.py b/tests/jms/test_resources.py new file mode 100644 index 000000000..0952400ab --- /dev/null +++ b/tests/jms/test_resources.py @@ -0,0 +1,143 @@ +import logging +import unittest + +from examples.mapdl_motorbike_frame.project_setup import create_project +from marshmallow import missing + +from ansys.rep.client.jms import ( + FitnessDefinition, + JmsApi, + JobDefinition, + ProjectApi, + ResourceRequirements, + Software, + TaskDefinition, +) +from tests.rep_test import REPTestCase + +log = logging.getLogger(__name__) + + +class REPClientTest(REPTestCase): + def test_task_definition_equality(self): + + td1 = TaskDefinition( + name="TD.1", + execution_command="%executable%", + max_execution_time=10.0, + execution_level=0, + resource_requirements=ResourceRequirements( + memory=256, + disk_space=2, + ), + software_requirements=Software(name="app", version="0.1"), + ) + + self.assertTrue(td1 != None) + self.assertTrue(None != td1) + self.assertTrue(None != td1) + + td2 = TaskDefinition( + name="TD.1", + execution_command="%executable%", + max_execution_time=10.0, + execution_level=0, + resource_requirements=ResourceRequirements( + memory=256, + disk_space=2, + ), + software_requirements=Software(name="app", version="0.1"), + ) + + td3 = td2 + + self.assertFalse(td1 is td2) + self.assertTrue(td3 is td2) + self.assertTrue(td1 == td2) + self.assertTrue(td2 == td1) + self.assertTrue(td1 == td3) + + td2.environment = None + self.assertTrue(td1.environment == missing) + self.assertFalse(td1 == td2) + self.assertFalse(td2 == td1) + + td1.environment = None + self.assertTrue(td1 == td2) + self.assertTrue(td2 == td1) + + jd = JobDefinition() + td = TaskDefinition() + self.assertFalse(jd == td) + + def test_job_definitions_equality(self): + + fd1 = FitnessDefinition(error_fitness=10.0) + fd1.add_fitness_term( + name="weight", + type="design_objective", + weighting_factor=1.0, + expression="map_design_objective( values['weight'], 7.5, 5.5)", + ) + fd1.add_fitness_term( + name="torsional_stiffness", + type="target_constraint", + weighting_factor=1.0, + expression="map_target_constraint( values['torsion_stiffness'], 1313.0, 5.0, 30.0 )", + ) + + jd1 = JobDefinition(fitness_definition=fd1) + + fd2 = FitnessDefinition(error_fitness=10.0) + fd2.add_fitness_term( + name="weight", + type="design_objective", + weighting_factor=1.0, + expression="map_design_objective( values['weight'], 7.5, 5.5)", + ) + fd2.add_fitness_term( + name="torsional_stiffness", + type="target_constraint", + weighting_factor=1.0, + expression="map_target_constraint( values['torsion_stiffness'], 1313.0, 5.0, 30.0 )", + ) + jd2 = JobDefinition(fitness_definition=fd2) + + self.assertTrue(jd1 == jd2) + self.assertFalse(jd1 is jd2) + + jd2.fitness_definition.fitness_term_definitions[0].expression = "_changed_" + + self.assertTrue(jd1 != jd2) + self.assertTrue(jd2 != jd1) + + def test_jobs_equality(self): + + # create new project + num_jobs = 1 + project = create_project( + self.client(), + f"test_jobs_equality", + num_jobs=num_jobs, + use_exec_script=False, + active=False, + ) + self.assertIsNotNone(project) + + project_api = ProjectApi(self.client(), project.id) + + job1 = project_api.get_jobs(limit=1)[0] + job_id = job1.id + + job2 = project_api.get_jobs(id=job_id)[0] + self.assertEqual(job1, job2) + + job2 = project_api.get_jobs(id=job_id, fields=["id", "name"])[0] + self.assertNotEqual(job1, job2) + + jms_api = JmsApi(self.client()) + jms_api.delete_project(project) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/jms/test_task_definition_templates.py b/tests/jms/test_task_definition_templates.py index 5c091bc8e..e282847a9 100644 --- a/tests/jms/test_task_definition_templates.py +++ b/tests/jms/test_task_definition_templates.py @@ -137,8 +137,8 @@ def test_template_integration(self): if templates: self.assertTrue(templates[0].software_requirements == missing) - # Copy template - template_name = f"copied_template_{uuid.uuid4()}" + # Create new template based on existing one + template_name = f"new_template_{uuid.uuid4()}" templates = jms_api.get_task_definition_templates(limit=1) self.assertEqual(len(templates), 1) @@ -151,7 +151,7 @@ def test_template_integration(self): template = templates[0] self.assertEqual(template.name, template_name) - # Modify copied template + # Modify template template.software_requirements[0].version = "2.0.1" templates = jms_api.update_task_definition_templates([template]) self.assertEqual(len(templates), 1) @@ -159,12 +159,27 @@ def test_template_integration(self): self.assertEqual(template.software_requirements[0].version, "2.0.1") self.assertEqual(template.name, template_name) - # Delete copied template + # Delete template jms_api.delete_task_definition_templates([template]) templates = jms_api.get_task_definition_templates(name=template_name) self.assertEqual(len(templates), 0) + # Copy template + templates = jms_api.get_task_definition_templates(limit=1) + self.assertEqual(len(templates), 1) + original_template = templates[0] + new_template_id = jms_api.copy_task_definition_templates(templates) + new_template = jms_api.get_task_definition_templates(id=new_template_id)[0] + + self.assertTrue(original_template.name in new_template.name) + self.assertEqual(original_template.version, new_template.version) + self.assertEqual(original_template.version, new_template.version) + self.assertEqual( + original_template.software_requirements[0].version, + original_template.software_requirements[0].version, + ) + def test_template_permissions(self): client = self.client()