Skip to content

Commit

Permalink
fixed bug where 'root' and the actual root ID were not being treated …
Browse files Browse the repository at this point in the history
…as equal in some calls

Sped up some calls using new 'descendants' option in versions 1.7 and newer
Handling removal of external connections to a process group when deleting
Updated create_connection logic to handle more cases, still experimental. Now also supports naming a connection
list_all_connections now fetches descendants by default to be in line with other methods
Added handling of test ports and connections for conftest
updated many tests to only analyse test objects and ignore preexisting other objects on canvas
enhanced a lot of test logic for connections to handle cases brought forward by community, still experimental
specifically handling input and output ports for process groups is now much more robust
  • Loading branch information
Chaffelson committed Dec 20, 2018
1 parent 9a626b5 commit 5786472
Show file tree
Hide file tree
Showing 3 changed files with 234 additions and 32 deletions.
137 changes: 118 additions & 19 deletions nipyapi/canvas.py
Expand Up @@ -36,7 +36,7 @@ def get_root_pg_id():
Returns (str): The UUID of the root PG
"""
return nipyapi.nifi.FlowApi().get_process_group_status('root')\
return nipyapi.nifi.FlowApi().get_process_group_status('root') \
.process_group_status.id


Expand Down Expand Up @@ -183,7 +183,7 @@ def flatten(parent_pg):

root_flow = recurse_flow(pg_id)
out = list(flatten(root_flow))
if pg_id == 'root':
if pg_id == 'root' or pg_id == get_root_pg_id():
# This duplicates the nipyapi_extended structure to the root case
pga_handle = nipyapi.nifi.ProcessGroupsApi()
root_entity = pga_handle.get_process_group('root')
Expand Down Expand Up @@ -275,7 +275,7 @@ def list_all_processors(pg_id='root'):
"""
assert isinstance(pg_id, six.string_types), "pg_id should be a string"

if nipyapi.utils.check_version('1.2.0') == -1:
if nipyapi.utils.check_version('1.7.0') <= 0:
targets = nipyapi.nifi.ProcessGroupsApi().get_processors(
id=pg_id,
include_descendant_groups=True
Expand Down Expand Up @@ -323,6 +323,7 @@ def _running_schedule_process_group(pg_id_):
if test_obj.status.aggregate_snapshot.active_thread_count == 0:
return True
return False

assert isinstance(
get_process_group(process_group_id, 'id'),
nipyapi.nifi.ProcessGroupEntity
Expand Down Expand Up @@ -375,6 +376,11 @@ def delete_process_group(process_group, force=False, refresh=True):
if force:
# Stop, drop, and roll.
purge_process_group(target, stop=True)
# Remove inbound connections
for con in list_all_connections():
pg_id = process_group.id
if pg_id in [con.destination_group_id, con.source_group_id]:
delete_connection(con)
# Stop all Controller Services
for x in list_all_controllers(process_group.id):
delete_controller(x, True)
Expand Down Expand Up @@ -803,34 +809,47 @@ def update_variable_registry(process_group, update):
raise ValueError(e.body)


def create_connection(source, target, relationships=None):
def create_connection(source, target, relationships=None, name=None):
"""
Creates a connection between two objects for the given relationships
Args:
source: Object to initiate the connection, e.g. ProcessorEntity
target: Object to terminate the connection, e.g. FunnelEntity
relationships (list): list of strings of relationships to connect, may
be collected from the object 'relationships' property
be collected from the object 'relationships' property (optional)
name (str): Defaults to None, String of Name for Connection (optional)
Returns:
(ConnectionEntity): for the created connection
"""
source_rels = [x.name for x in source.component.relationships]
if relationships:
assert all(i in source_rels for i in relationships),\
"One or more relationships [{0}] not in list of valid Source " \
"Relationships [{1}]".format(str(relationships), str(source_rels))
else:
# if no relationships supplied, we connect them all
relationships = source_rels
# determine source and destination strings by class supplied
source_type = nipyapi.utils.infer_object_label_from_class(source)
target_type = nipyapi.utils.infer_object_label_from_class(target)
if source_type not in ['OUTPUT_PORT', 'INPUT_PORT']:
source_rels = [x.name for x in source.component.relationships]
if relationships:
assert all(i in source_rels for i in relationships), \
"One or more relationships [{0}] not in list of valid " \
"Source Relationships [{1}]" \
.format(str(relationships), str(source_rels))
else:
# if no relationships supplied, we connect them all
relationships = source_rels
if source_type == 'OUTPUT_PORT':
# the hosting process group for an Output port connection to another
# process group is the common parent process group
parent_pg = get_process_group(source.component.parent_group_id, 'id')
if parent_pg.id == get_root_pg_id():
parent_id = parent_pg.id
else:
parent_id = parent_pg.component.parent_group_id
else:
parent_id = source.component.parent_group_id
try:
return nipyapi.nifi.ProcessGroupsApi().create_connection(
id=source.component.parent_group_id,
id=parent_id,
body=nipyapi.nifi.ConnectionEntity(
revision=nipyapi.nifi.RevisionDTO(
version=0
Expand All @@ -843,6 +862,7 @@ def create_connection(source, target, relationships=None):
group_id=source.component.parent_group_id,
type=source_type
),
name=name,
destination=nipyapi.nifi.ConnectableDTO(
id=target.id,
group_id=target.component.parent_group_id,
Expand Down Expand Up @@ -880,7 +900,7 @@ def delete_connection(connection, purge=False):
raise ValueError(e.body)


def list_all_connections(pg_id='root', descendants=False):
def list_all_connections(pg_id='root', descendants=True):
"""
Lists all connections for a given Process Group ID
Expand Down Expand Up @@ -908,7 +928,7 @@ def get_component_connections(component):
assert isinstance(component, nipyapi.nifi.ProcessorEntity)
return [
x for x
in list_all_connections(component.component.parent_group_id)
in list_all_connections(pg_id=component.component.parent_group_id)
if component.id in [x.destination_id, x.source_id]
]

Expand All @@ -930,6 +950,7 @@ def purge_connection(con_id):
request.
"""

# TODO: Reimplement to batched instead of single threaded
def _autumn_leaves(con_id_, drop_request_):
test_obj = nipyapi.nifi.FlowfileQueuesApi().get_drop_request(
Expand All @@ -941,9 +962,7 @@ def _autumn_leaves(con_id_, drop_request_):
if test_obj.failure_reason:
raise ValueError(
"Unable to complete drop request{0}, error was {1}"
.format(
test_obj, test_obj.drop_request.failure_reason
)
.format(test_obj, test_obj.drop_request.failure_reason)
)
return True

Expand Down Expand Up @@ -1246,6 +1265,19 @@ def list_all_controller_types():


def list_all_by_kind(kind, pg_id='root', descendants=True):
"""
Retrieves a list of all instances of a supported object type
Args:
kind (str): one of input_ports, output_ports, funnels, controllers,
connections, remote_process_groups
pg_id (str): optional, ID of the Process Group to use as search base
descendants (bool): optional, whether to collect child group info
Returns:
list of the Entity type of the kind, or single instance, or None
"""
assert kind in [
'input_ports', 'output_ports', 'funnels', 'controllers', 'connections',
'remote_process_groups'
Expand All @@ -1265,22 +1297,29 @@ def list_all_by_kind(kind, pg_id='root', descendants=True):


def list_all_input_ports(pg_id='root', descendants=True):
"""Convenience wrapper for list_all_by_kind for input ports"""
return list_all_by_kind('input_ports', pg_id, descendants)


def list_all_output_ports(pg_id='root', descendants=True):
"""Convenience wrapper for list_all_by_kind for output ports"""
return list_all_by_kind('output_ports', pg_id, descendants)


def list_all_funnels(pg_id='root', descendants=True):
"""Convenience wrapper for list_all_by_kind for funnels"""
return list_all_by_kind('funnels', pg_id, descendants)


def list_all_remote_process_groups(pg_id='root', descendants=True):
"""Convenience wrapper for list_all_by_kind for remote process groups"""
return list_all_by_kind('remote_process_groups', pg_id, descendants)


def get_remote_process_group(rpg_id, summary=False):
"""
Fetch a remote process group object, with optional summary of just ports
"""
rpg = nipyapi.nifi.RemoteProcessGroupsApi().get_remote_process_group(
rpg_id
)
Expand All @@ -1293,3 +1332,63 @@ def get_remote_process_group(rpg_id, summary=False):
'output_ports': rpg.component.contents.output_ports
}
return out


def create_port(pg_id, port_type, name, state, position=None):
"""
Creates a new input or output port of given characteristics
Args:
pg_id (str): ID of the parent Process Group
port_type (str): Either of INPUT_PORT or OUTPUT_PORT
name (str): optional, Name to assign to the port
state (str): One of RUNNING, STOPPED, DISABLED
position (tuple): optional, tuple of ints like (400, 400)
Returns:
(PortEntity) of the created port
"""
assert state in ["RUNNING", "STOPPED", "DISABLED"]
assert port_type in ["INPUT_PORT", "OUTPUT_PORT"]
assert isinstance(pg_id, six.string_types)
position = position if position else (400, 400)
assert isinstance(position, tuple)
handle = nipyapi.nifi.ProcessGroupsApi()
port_generator = getattr(handle, 'create_' + port_type.lower())
try:
return port_generator(
id=pg_id,
body=nipyapi.nifi.PortEntity(
revision=nipyapi.nifi.RevisionDTO(version=0),
component=nipyapi.nifi.PortDTO(
parent_group_id=pg_id,
position=nipyapi.nifi.PositionDTO(
x=float(position[0]),
y=float(position[1])
),
name=name
)
)
)
except nipyapi.nifi.rest.ApiException as e:
raise ValueError(e.body)


def delete_port(port):
"""Deletes a given port from the canvas if possible"""
assert isinstance(port, nipyapi.nifi.PortEntity)
if 'INPUT' in port.port_type:
try:
return nipyapi.nifi.InputPortsApi().remove_input_port(
id=port.id,
version=port.revision.version)
except nipyapi.nifi.rest.ApiException as e:
raise ValueError(e.body)
if 'OUTPUT' in port.port_type:
try:
return nipyapi.nifi.OutputPortsApi().remove_output_port(
id=port.id,
version=port.revision.version)
except nipyapi.nifi.rest.ApiException as e:
raise ValueError(e.body)
25 changes: 24 additions & 1 deletion tests/conftest.py
Expand Up @@ -238,11 +238,34 @@ def cleanup_nifi():
log.info("Bulk cleanup called on host %s",
nipyapi.config.nifi_config.host)
remove_test_templates()
remove_test_processors()
remove_test_connections()
remove_test_controllers()
remove_test_processors()
remove_test_ports()
remove_test_pgs()


def remove_test_connections():
_ = [
nipyapi.canvas.delete_connection(x, True)
for x in nipyapi.canvas.list_all_connections()
if test_basename in x.component.name
]


def remove_test_ports():
_ = [
nipyapi.canvas.delete_port(x)
for x in nipyapi.canvas.list_all_by_kind('input_ports')
if test_basename in x.component.name
]
_ = [
nipyapi.canvas.delete_port(x)
for x in nipyapi.canvas.list_all_by_kind('output_ports')
if test_basename in x.component.name
]


def remove_test_controllers():
_ = [nipyapi.canvas.delete_controller(li, True) for li
in nipyapi.canvas.list_all_controllers() if
Expand Down

0 comments on commit 5786472

Please sign in to comment.