Skip to content

Commit

Permalink
Added special case 'root' process group to list of all process groups
Browse files Browse the repository at this point in the history
Added function and tests for creating a new process group
  • Loading branch information
dchaffey committed Jan 2, 2018
1 parent f686b97 commit 4cecbd4
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 4 deletions.
40 changes: 36 additions & 4 deletions nipyapi/canvas.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@

from __future__ import absolute_import
from swagger_client import ProcessGroupFlowEntity, FlowApi
from swagger_client import ProcessgroupsApi
from swagger_client import ProcessgroupsApi, ProcessGroupEntity
from swagger_client import ProcessGroupDTO, PositionDTO
from swagger_client.rest import ApiException

__all__ = [
"get_root_pg_id", "recurse_flow", "get_flow", "get_process_group_status",
"get_process_group", "list_all_process_groups", "delete_process_group",
"schedule_process_group"
"schedule_process_group", "create_process_group"
]


Expand Down Expand Up @@ -125,15 +126,20 @@ def list_all_process_groups():
"""
def flatten(parent_pg):
"""
Recursively flattens the native datatypes into a generic list
Recursively flattens the native datatypes into a generic list.
Note that the root is a special case as it has no parent
:param parent_pg: ProcessGroupEntity to flatten
:return yield: generator for all ProcessGroupEntities, eventually
"""
for child_pg in parent_pg.process_group_flow.flow.process_groups:
for sub in flatten(child_pg.nipyapi_extended):
yield sub
yield child_pg
return list(flatten(recurse_flow('root')))
out = list(flatten(recurse_flow('root')))
out.append(
ProcessgroupsApi().get_process_group('root')
)
return out


def delete_process_group(process_group_id, revision):
Expand Down Expand Up @@ -173,3 +179,29 @@ def schedule_process_group(process_group_id, target_state):
}
)
return out


def create_process_group(parent_pg, new_pg_name, location):
"""
Creates a new PG with a given name under the provided parent PG
:param parent_pg: ProcessGroupEntity object of the parent PG
:param new_pg_name: String to name the new PG
:param location: Tuple of (x,y) coordinates to place the new PG
:return: ProcessGroupEntity of the new PG
"""
try:
return ProcessgroupsApi().create_process_group(
id=parent_pg.id,
body=ProcessGroupEntity(
revision=parent_pg.revision,
component=ProcessGroupDTO(
name=new_pg_name,
position=PositionDTO(
x=float(location[0]),
y=float(location[1])
)
)
)
)
except ApiException as e:
raise e
12 changes: 12 additions & 0 deletions tests/test_canvas.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,18 @@ def test_list_all_process_groups():
assert isinstance(r, list)


def test_create_process_group():
test_pg_name = "nipyapi_testProcessGroup_00"
r = canvas.create_process_group(
canvas.get_process_group('NiFi Flow'),
test_pg_name,
location=(400.0,400.0)
)
assert r.component.name == test_pg_name
assert r.position.x == r.position.y == 400
assert r.component.parent_group_id == canvas.get_root_pg_id()


def test_get_process_group():
with pytest.raises(ValueError):
_ = canvas.get_process_group('nipyapi_test', 'invalid')
Expand Down

0 comments on commit 4cecbd4

Please sign in to comment.