Skip to content

Commit

Permalink
Add NiFi-Registry-0.2.0 support (#67)
Browse files Browse the repository at this point in the history
* Reworked pytest parametrization to support multiple nifi-registry versions
Improved processor scheduling call to check if start was successful, changed test to component.state from status.run_status as it's more reliable
Minor logging cleanups and changes
Deprecated testing against NiFi-1.5.0, added testing against NiFi-Registry-0.2.0
Added pytest fixture cleanups for NiFi-Registry testing
Renamed 'regress' to 'regress_nifi' for clarity when working with tests
Finally realised that fixture ordering matters and cleaned up a few things
Added default log control for tox run as a convenience
  • Loading branch information
Chaffelson committed Aug 2, 2018
1 parent 88a6872 commit 593059f
Show file tree
Hide file tree
Showing 61 changed files with 2,006 additions and 440 deletions.
27 changes: 24 additions & 3 deletions nipyapi/canvas.py
Expand Up @@ -5,6 +5,7 @@
"""

from __future__ import absolute_import
import logging
import six
import nipyapi

Expand All @@ -19,6 +20,8 @@
'get_bulletins', 'get_bulletin_board'
]

log = logging.getLogger(__name__)


def get_root_pg_id():
"""
Expand Down Expand Up @@ -516,6 +519,7 @@ def schedule_processor(processor, scheduled, refresh=True):
Note that this doesn't guarantee that it will change state, merely that
it will be instructed to try.
Some effort is made to wait and see if the processor starts
Args:
processor (ProcessorEntity): The Processor to target
Expand All @@ -531,10 +535,21 @@ def schedule_processor(processor, scheduled, refresh=True):
assert isinstance(refresh, bool)

def _running_schedule_processor(processor_):
test_obj = nipyapi.nifi.ProcessorsApi().get_processor(processor_.id)
test_obj = nipyapi.canvas.get_processor(processor_.id, 'id')
if test_obj.status.aggregate_snapshot.active_thread_count == 0:
return True
log.info("Processor not stopped, active thread count %s",
test_obj.status.aggregate_snapshot.active_thread_count)
return False

def _starting_schedule_processor(processor_):
test_obj = nipyapi.canvas.get_processor(processor_.id, 'id')
if test_obj.component.state == 'RUNNING':
return True
log.info("Processor not started, run_status %s",
test_obj.component.state)
return False

assert isinstance(scheduled, bool)
if refresh:
target = nipyapi.canvas.get_processor(processor.id, 'id')
Expand All @@ -559,8 +574,14 @@ def _running_schedule_processor(processor_):
return result
# Return False if we scheduled a stop, but it didn't stop
return False
# Return the True or False result if we were trying to start the processor
return result
else:
# Test that the Processor started
start_test = nipyapi.utils.wait_to_complete(
_starting_schedule_processor, target
)
if start_test:
return result
return False


def update_processor(processor, update):
Expand Down
2 changes: 1 addition & 1 deletion nipyapi/config.py
Expand Up @@ -21,7 +21,7 @@
# Note that changing the default hosts below will not
# affect an API connection that's already running.
# You'll need to change the .api_client.host for that, and there is a
# convenience function for this in nipyapi.utils
# convenience function for this in nipyapi.utils.set_endpoint

# Set Default Host for NiFi
nifi_config.host = 'http://localhost:8080/nifi-api'
Expand Down
22 changes: 12 additions & 10 deletions nipyapi/demo/fdlc.py
Expand Up @@ -2,6 +2,7 @@
import logging
import nipyapi
from nipyapi.utils import DockerContainer
from time import sleep

log = logging.getLogger(__name__)
log.setLevel(logging.INFO)
Expand Down Expand Up @@ -56,8 +57,8 @@
)
]

dev_pg_name = 'dev_pg_0'
dev_proc_name = 'dev_proc_0'
dev_pg_name = 'my_pg_0'
dev_proc_name = 'my_proc_0'
dev_reg_client_name = 'dev_reg_client_0'
dev_bucket_name = 'dev_bucket_0'
dev_ver_flow_name = 'dev_ver_flow_0'
Expand All @@ -74,13 +75,13 @@
"in hex (0,1,2,3,4,5,6,7,8,9,a,b,c,d) and should be called in order."
"\nEach step will log activities to INFO, and you are encouraged to "
"look at the code in this script to see how each step is completed."
"\nEach step will also issue instructions through print statements like"
"\nhttp://github.com/Chaffelson/nipyapi/blob/master/nipyapi/demo/fdlc.py"
"\nEach step will also issue instructions through print statements like "
"this one, these instructions will vary so please read them as you go."
"\nNote that the first call will log a lot of information while it boots "
"the Docker containers, further instructions will follow."
"\nNote that the first call will log a lot of information while it boots"
" the Docker containers, further instructions will follow."
"\nNote that you can reset it at any time by calling step_1 again.\n"
"\nPlease start by calling the function 'step_1_boot_demo_env()'."
)
"\nPlease start by calling the function 'step_1_boot_demo_env()'.")


def step_1_boot_demo_env():
Expand Down Expand Up @@ -109,15 +110,16 @@ def step_1_boot_demo_env():
nipyapi_delay=nipyapi.config.long_retry_delay,
nipyapi_max_wait=nipyapi.config.long_max_wait
)
# Sleeping to wait for all startups to return before printing guide
sleep(1)
print("Your Docker containers should now be ready, please find them at the"
"following URLs:"
"\nnifi-dev ", dev_nifi_url,
"\nreg-dev ", dev_reg_url,
"\nreg-prod ", prod_reg_url,
"\nnifi-prod ", prod_nifi_url,
"\nPlease open each of these in a browser tab."
"\nPlease then call the function 'step_2_create_reg_clients()'\n"
)
"\nPlease then call the function 'step_2_create_reg_clients()'\n")


def step_2_create_reg_clients():
Expand Down Expand Up @@ -272,7 +274,7 @@ def step_9_deploy_prod_flow_to_nifi():
reg_client = nipyapi.versioning.get_registry_client(prod_reg_client_name)
nipyapi.versioning.deploy_flow_version(
parent_id=nipyapi.canvas.get_root_pg_id(),
location=(0,0),
location=(0, 0),
bucket_id=bucket.identifier,
flow_id=flow.identifier,
reg_client_id=reg_client.id,
Expand Down
7 changes: 5 additions & 2 deletions nipyapi/registry/__init__.py
Expand Up @@ -5,7 +5,7 @@
The REST API provides an interface to a registry with operations for saving, versioning, reading NiFi flows and components.
OpenAPI spec version: 0.2.0-SNAPSHOT
OpenAPI spec version: 0.2.0
Contact: dev@nifi.apache.org
Generated by: https://github.com/swagger-api/swagger-codegen.git
"""
Expand All @@ -20,23 +20,26 @@
from .models.bucket import Bucket
from .models.bucket_item import BucketItem
from .models.bundle import Bundle
from .models.component_difference import ComponentDifference
from .models.component_difference_group import ComponentDifferenceGroup
from .models.connectable_component import ConnectableComponent
from .models.controller_service_api import ControllerServiceAPI
from .models.current_user import CurrentUser
from .models.fields import Fields
from .models.link import Link
from .models.permissions import Permissions
from .models.position import Position
from .models.resource import Resource
from .models.resource_permissions import ResourcePermissions
from .models.tenant import Tenant
from .models.the_position_of_a_component_on_the_graph import ThePositionOfAComponentOnTheGraph
from .models.uri_builder import UriBuilder
from .models.user import User
from .models.user_group import UserGroup
from .models.versioned_connection import VersionedConnection
from .models.versioned_controller_service import VersionedControllerService
from .models.versioned_flow import VersionedFlow
from .models.versioned_flow_coordinates import VersionedFlowCoordinates
from .models.versioned_flow_difference import VersionedFlowDifference
from .models.versioned_flow_snapshot import VersionedFlowSnapshot
from .models.versioned_flow_snapshot_metadata import VersionedFlowSnapshotMetadata
from .models.versioned_funnel import VersionedFunnel
Expand Down
7 changes: 4 additions & 3 deletions nipyapi/registry/api_client.py
Expand Up @@ -4,7 +4,7 @@
The REST API provides an interface to a registry with operations for saving, versioning, reading NiFi flows and components.
OpenAPI spec version: 0.2.0-SNAPSHOT
OpenAPI spec version: 0.2.0
Contact: dev@nifi.apache.org
Generated by: https://github.com/swagger-api/swagger-codegen.git
"""
Expand Down Expand Up @@ -178,7 +178,8 @@ def sanitize_for_serialization(self, obj):
If obj is None, return None.
If obj is str, int, long, float, bool, return directly.
If obj is datetime.datetime, datetime.date convert to string in iso8601 format.
If obj is datetime.datetime, datetime.date
convert to string in iso8601 format.
If obj is list, sanitize each element in the list.
If obj is dict, return the dict.
If obj is swagger model, return the properties dict.
Expand Down Expand Up @@ -627,6 +628,6 @@ def __deserialize_model(self, data, klass):
value = data[klass.attribute_map[attr]]
kwargs[attr] = self.__deserialize(value, attr_type)

instance = klass(**kwargs)
instance = klass(**kwargs)

return instance
24 changes: 12 additions & 12 deletions nipyapi/registry/apis/access_api.py
Expand Up @@ -5,7 +5,7 @@
The REST API provides an interface to a registry with operations for saving, versioning, reading NiFi flows and components.
OpenAPI spec version: 0.2.0-SNAPSHOT
OpenAPI spec version: 0.2.0
Contact: dev@nifi.apache.org
Generated by: https://github.com/swagger-api/swagger-codegen.git
"""
Expand Down Expand Up @@ -121,7 +121,7 @@ def create_access_token_by_trying_all_providers_with_http_info(self, **kwargs):
select_header_content_type(['*/*'])

# Authentication setting
auth_settings = []
auth_settings = ['tokenAuth']

return self.api_client.call_api('/access/token', 'POST',
path_params,
Expand Down Expand Up @@ -219,7 +219,7 @@ def create_access_token_using_basic_auth_credentials_with_http_info(self, **kwar
select_header_content_type(['*/*'])

# Authentication setting
auth_settings = ['BasicAuth']
auth_settings = ['tokenAuth', 'BasicAuth']

return self.api_client.call_api('/access/token/login', 'POST',
path_params,
Expand Down Expand Up @@ -317,7 +317,7 @@ def create_access_token_using_identity_provider_credentials_with_http_info(self,
select_header_content_type(['*/*'])

# Authentication setting
auth_settings = []
auth_settings = ['tokenAuth']

return self.api_client.call_api('/access/token/identity-provider', 'POST',
path_params,
Expand Down Expand Up @@ -415,7 +415,7 @@ def create_access_token_using_kerberos_ticket_with_http_info(self, **kwargs):
select_header_content_type(['*/*'])

# Authentication setting
auth_settings = []
auth_settings = ['tokenAuth']

return self.api_client.call_api('/access/token/kerberos', 'POST',
path_params,
Expand All @@ -435,7 +435,7 @@ def create_access_token_using_kerberos_ticket_with_http_info(self, **kwargs):
def get_access_status(self, **kwargs):
"""
Returns the current client's authenticated identity and permissions to top-level resources
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please define a `callback` function
to be invoked when receiving the response.
Expand All @@ -460,7 +460,7 @@ def get_access_status(self, **kwargs):
def get_access_status_with_http_info(self, **kwargs):
"""
Returns the current client's authenticated identity and permissions to top-level resources
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please define a `callback` function
to be invoked when receiving the response.
Expand Down Expand Up @@ -513,7 +513,7 @@ def get_access_status_with_http_info(self, **kwargs):
select_header_content_type(['*/*'])

# Authentication setting
auth_settings = ['Authorization']
auth_settings = ['tokenAuth', 'Authorization']

return self.api_client.call_api('/access', 'GET',
path_params,
Expand All @@ -533,7 +533,7 @@ def get_access_status_with_http_info(self, **kwargs):
def get_identity_provider_usage_instructions(self, **kwargs):
"""
Provides a description of how the currently configured identity provider expects credentials to be passed to POST /access/token/identity-provider
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please define a `callback` function
to be invoked when receiving the response.
Expand All @@ -558,7 +558,7 @@ def get_identity_provider_usage_instructions(self, **kwargs):
def get_identity_provider_usage_instructions_with_http_info(self, **kwargs):
"""
Provides a description of how the currently configured identity provider expects credentials to be passed to POST /access/token/identity-provider
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please define a `callback` function
to be invoked when receiving the response.
Expand Down Expand Up @@ -611,7 +611,7 @@ def get_identity_provider_usage_instructions_with_http_info(self, **kwargs):
select_header_content_type(['*/*'])

# Authentication setting
auth_settings = []
auth_settings = ['tokenAuth']

return self.api_client.call_api('/access/token/identity-provider/usage', 'GET',
path_params,
Expand Down Expand Up @@ -709,7 +709,7 @@ def test_identity_provider_recognizes_credentials_format_with_http_info(self, **
select_header_content_type(['*/*'])

# Authentication setting
auth_settings = []
auth_settings = ['tokenAuth']

return self.api_client.call_api('/access/token/identity-provider/test', 'POST',
path_params,
Expand Down

0 comments on commit 593059f

Please sign in to comment.