Skip to content

Commit

Permalink
Policies now working at docker infrastructure level
Browse files Browse the repository at this point in the history
Need to write more policies tests
  • Loading branch information
Chaffelson committed Apr 19, 2019
1 parent 0ebed1e commit d5d2563
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 59 deletions.
2 changes: 1 addition & 1 deletion nipyapi/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@

# --- Security Context
# This allows easy reference to a set of certificates for use in automation
# By default it points to our demo certs, you should change it for your environment
# By default it points to our demo certs, change it for your environment
default_certs_path = os.path.join(PROJECT_ROOT_DIR, 'demo/keys')
default_ssl_context = {
'ca_file': os.path.join(default_certs_path, 'localhost-ts.pem'),
Expand Down
2 changes: 1 addition & 1 deletion nipyapi/demo/secure_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def connect_nifi_to_registry():
# Make NiFi server a trusted proxy in NiFi Registry
proxy_access_policies = [
("write", "/proxy"),
("read", "/buckets"),
("read", "/buckets")
]
for action, resource in proxy_access_policies:
pol = nipyapi.security.get_access_policy_for_resource(
Expand Down
134 changes: 116 additions & 18 deletions nipyapi/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,36 @@
import ssl
import six
import urllib3
from copy import copy
import nipyapi


log = logging.getLogger(__name__)


__all__ = ['create_service_user', 'create_service_user_group', 'service_login',
__all__ = ['create_service_user', 'create_service_user_group',
'set_service_auth_token', 'service_logout',
'get_service_access_status', 'add_user_to_access_policy',
'update_access_policy', 'get_access_policy_for_resource',
'create_access_policy', 'list_service_users', 'get_service_user',
'set_service_ssl_context', 'add_user_group_to_access_policy']
'set_service_ssl_context', 'add_user_group_to_access_policy',
'bootstrap_security_policies', 'service_login'
]

# These are the known-valid policy actions
_valid_actions = ['read', 'write', 'delete']
# These are the services that these functions know how to configure
_valid_services = ['nifi', 'registry']


def create_service_user(identity, service='nifi'):
def create_service_user(identity, service='nifi', strict=True):
"""
Attempts to create a user with the provided identity in the given service
Args:
identity (str): Identity string for the user
service (str): 'nifi' or 'registry'
strict (bool): If Strict, will error if user already exists
Returns:
The new (User) or (UserEntity) object
Expand All @@ -61,6 +65,8 @@ def create_service_user(identity, service='nifi'):
except (
nipyapi.nifi.rest.ApiException,
nipyapi.registry.rest.ApiException) as e:
if 'already exists' in e.body and not strict:
return get_service_user(identity, service=service)
raise ValueError(e.body)


Expand Down Expand Up @@ -274,15 +280,18 @@ def get_service_access_status(service='nifi', bool_response=False):
raise e


def add_user_to_access_policy(user, policy, service='nifi', refresh=True):
def add_user_to_access_policy(user, policy, service='nifi', refresh=True,
strict=True):
"""
Attempts to add the given user object to the given access policy
Args:
user (User) or (UserEntity): User object to add
policy (AccessPolicyEntity) or (AccessPolicy): Access Policy object
service (str): 'nifi' or 'registry' to identify the target service
refresh (bool): Whether to refresh the policy object before submission
refresh (bool): Whether to refresh the policy object before submit
strict (bool): If True, will return error if user already present,
if False will ignore the already exists
Returns:
Updated Policy object
Expand Down Expand Up @@ -322,15 +331,17 @@ def add_user_to_access_policy(user, policy, service='nifi', refresh=True):
policy_user_ids = [
i.identifier if service == 'registry' else i.id for i in policy_users
]
if user_id not in policy_user_ids:
if service == 'registry':
policy_tgt.users.append(user)
elif service == 'nifi':
policy_tgt.component.users.append({'id': user_id})

assert user_id not in policy_user_ids

if service == 'registry':
policy_tgt.users.append(user)
elif service == 'nifi':
policy_tgt.component.users.append({'id': user_id})

return nipyapi.security.update_access_policy(policy_tgt, service)
return nipyapi.security.update_access_policy(policy_tgt, service)
else:
if strict:
assert user_id not in policy_user_ids, "Strict is True and user already " \
"in Policy"


def add_user_group_to_access_policy(user_group, policy, service='nifi',
Expand Down Expand Up @@ -450,18 +461,25 @@ def get_access_policy_for_resource(resource,
log.info("Called get_access_policy_for_resource with Args %s", locals())

# Strip leading '/' from resource as lookup endpoint prepends a '/'
stripped_resource = resource[1:] if resource.startswith(
'/') else resource
log.info("Getting %s Policy for %s:%s:%s", service, action, resource, str(r_id))
resource = resource[1:] if resource.startswith('/') else resource
log.info("Getting %s Policy for %s:%s:%s", service, action,
resource, str(r_id))
if service == 'nifi':
pol_api = nipyapi.nifi.PoliciesApi()
config = nipyapi.config.nifi_config
else:
pol_api = nipyapi.registry.PoliciesApi()
config = nipyapi.config.registry_config
default_safe_chars = copy(config.safe_chars_for_path_param)
try:
return pol_api.get_access_policy_for_resource(
if '/' not in config.safe_chars_for_path_param:
config.safe_chars_for_path_param += '/'
response = pol_api.get_access_policy_for_resource(
action=action,
resource=stripped_resource
resource='/'.join([resource, r_id]) if r_id else resource
)
config.safe_chars_for_path_param = copy(default_safe_chars)
return response
except nipyapi.nifi.rest.ApiException as e:
if 'Unable to find access policy' in e.body:
log.info("Access policy not found")
Expand All @@ -472,6 +490,8 @@ def get_access_policy_for_resource(resource,
)
log.info("Unexpected Error, raising...")
raise ValueError(e.body)
finally:
config.safe_chars_for_path_param = copy(default_safe_chars)


def create_access_policy(resource, action, r_id=None, service='nifi'):
Expand Down Expand Up @@ -610,3 +630,81 @@ def set_service_ssl_context(
if service == 'registry':
nipyapi.config.registry_config.ssl_context = ssl_context
nipyapi.config.nifi_config.ssl_context = ssl_context


def bootstrap_security_policies(service, admin='CN=user1, OU=nifi',
proxy='CN=localhost, OU=nifi'):
assert service in _valid_services
if 'nifi' in service:
rpg_id = nipyapi.canvas.get_root_pg_id()
nifi_user_identity = nipyapi.security.get_service_user(
admin,
service='nifi'
)
access_policies = [
('write', 'process-groups', rpg_id),
('read', 'process-groups', rpg_id),
('write', 'data/process-groups', rpg_id),
('read', 'data/process-groups', rpg_id),
('read', 'system', None),
]
for pol in access_policies:
ap = nipyapi.security.get_access_policy_for_resource(
action=pol[0],
resource=pol[1],
r_id=pol[2],
service='nifi',
auto_create=True
)
nipyapi.security.add_user_to_access_policy(
user=nifi_user_identity,
policy=ap,
service='nifi',
strict=False
)
else:
reg_user_identity = nipyapi.security.get_service_user(
admin,
service='registry'
)
all_buckets_access_policies = [
("read", "/buckets"),
("write", "/buckets"),
("delete", "/buckets")
]
for action, resource in all_buckets_access_policies:
pol = nipyapi.security.get_access_policy_for_resource(
resource=resource,
action=action,
service='registry',
auto_create=True
)
nipyapi.security.add_user_to_access_policy(
user=reg_user_identity,
policy=pol,
service='registry',
strict=False
)
# Setup Proxy Access
nifi_proxy = nipyapi.security.create_service_user(
identity=proxy,
service='registry',
strict=False
)
proxy_access_policies = [
("write", "/proxy"),
("read", "/buckets")
]
for action, resource in proxy_access_policies:
pol = nipyapi.security.get_access_policy_for_resource(
resource=resource,
action=action,
service='registry',
auto_create=True
)
nipyapi.security.add_user_to_access_policy(
user=nifi_proxy,
policy=pol,
service='registry',
strict=False
)
12 changes: 7 additions & 5 deletions resources/docker/secure/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
version: '2.1'
services:
nifi-secure:
secure-nifi:
image: apache/nifi:1.9.1
container_name: nifi-secure
container_name: secure-nifi
hostname: secure-nifi
ports:
- "8443:8443"
volumes:
- ../../../nipyapi/demo/keys:/opt/certs:ro
- ../../../nipyapi/demo/keys:/opt/certs:ro # Min docker version tested 18, does not work on old docker
environment:
- AUTH=tls
- KEYSTORE_PATH=/opt/certs/localhost-ks.jks
Expand All @@ -16,9 +17,10 @@ services:
- TRUSTSTORE_PASSWORD=localhostTruststorePassword
- TRUSTSTORE_TYPE=JKS
- INITIAL_ADMIN_IDENTITY=CN=user1, OU=nifi
registry-secure:
secure-registry:
image: apache/nifi-registry:0.3.0
container_name: registry-secure
container_name: secure-registry
hostname: secure-registry
ports:
- "18443:18443"
volumes:
Expand Down
30 changes: 4 additions & 26 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
]
secure_registry_endpoints = [
('https://' + test_host + ':18443/nifi-registry-api',
'http://secure-registry:18443',
'https://secure-registry:18443',
'https://' + test_host + ':8443/nifi-api'
)]

Expand Down Expand Up @@ -165,30 +165,6 @@ def regress_flow_reg(request):
nipyapi.config.registry_local_name = request.param[1]


def bootstrap_security_policies():
access_status = nipyapi.security.get_service_access_status('nifi')
rpg_id = nipyapi.canvas.get_root_pg_id()
nifi_user_identity = nipyapi.security.get_service_user(access_status.access_status.identity)
access_policies = [
('write', 'process-groups', rpg_id),
('read', 'process-groups', rpg_id),
('read', 'system', None)
]
for pol in access_policies:
ap = nipyapi.security.get_access_policy_for_resource(
action=pol[0],
resource=pol[1],
r_id=pol[2],
service='nifi',
auto_create=True
)
nipyapi.security.add_user_to_access_policy(
nifi_user_identity,
policy=ap,
service='nifi'
)


# Tests that the Docker test environment is available before running test suite
@pytest.fixture(scope="session", autouse=True)
def session_setup(request):
Expand Down Expand Up @@ -217,12 +193,14 @@ def session_setup(request):
log.info("Tested NiFi client connection, got response from %s",
url)
if 'https://' in url:
bootstrap_security_policies()
nipyapi.security.bootstrap_security_policies(service='nifi')
cleanup_nifi()
elif 'nifi-registry-api' in url:
if nipyapi.registry.FlowsApi().get_available_flow_fields():
log.info("Tested NiFi-Registry client connection, got "
"response from %s", url)
if 'https://' in url:
nipyapi.security.bootstrap_security_policies(service='registry')
cleanup_reg()
else:
raise ValueError("No Response from NiFi-Registry test call"
Expand Down
8 changes: 1 addition & 7 deletions tests/test_security.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,7 @@ def test_update_access_policy():


def test_get_access_policy_for_resource(regress_nifi):
# Test backwards compatibility issue on unsecured NiFi
# Returns an error stating the NiFi isn't set up for this, rather than
# the bad parameter error reported in issue #66
with pytest.raises(ValueError, match='This NiFi is not configured'):
_ = security.get_access_policy_for_resource('flow', 'read')
# Note that on a secured NiFi with no valid policy you will get the error:
# "No applicable policies could be found"
pass


def test_create_access_policy():
Expand Down
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,5 @@ deps =
-r{toxinidir}/requirements.txt
commands =
pip install -U pip
coverage run -m py.test --basetemp={envtmpdir} -x --log-cli-level=INFO
coverage run -m py.test --basetemp={envtmpdir} -x --log-cli-level=WARNING
- coveralls

0 comments on commit d5d2563

Please sign in to comment.