Skip to content

Commit

Permalink
Validate user roles sdk
Browse files Browse the repository at this point in the history
Change-Id: I93e95e89976931930a91ad6c10bd3e7258d80e77
Reviewed-on: https://review.couchbase.org/c/testrunner/+/171247
Tested-by: <shaazinsheikh@gmail.com>
Reviewed-by: <shaazinsheikh@gmail.com>
Reviewed-by: Sumedh Basarkod <sumedhpb8@gmail.com>
  • Loading branch information
shaazin19 committed Feb 28, 2022
1 parent 6e3b269 commit df8cd43
Showing 1 changed file with 95 additions and 12 deletions.
107 changes: 95 additions & 12 deletions pytests/security/upgrade/multiple_CA_upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from pytests.security.ntonencryptionBase import ntonencryptionBase
from pytests.security.x509_multiple_CA_util import x509main, Validation
from pytests.upgrade.newupgradebasetest import NewUpgradeBaseTest
from couchbase.bucket import Bucket
from couchbase.exceptions import CouchbaseError, CouchbaseInputError

log = logging.getLogger()

Expand Down Expand Up @@ -63,18 +65,18 @@ def load_sample_bucket(self, server, bucket_name="travel-sample"):
def add_rbac_groups_roles(self, server):
rest = RestConnection(server)
group_roles = ['admin', 'cluster_admin', 'ro_admin',
'bucket_admin[travel-sample]', 'bucket_full_access[travel-sample]']
'bucket_admin[travel-sample]', 'bucket_full_access[travel-sample]',
'data_backup[travel-sample]']
if self.base_version < "7.0":
group_roles.extend(['security_admin', 'data_reader[travel-sample]',
'data_writer[travel-sample]', 'data_dcp_reader[travel-sample]',
'data_backup[travel-sample]', 'data_monitoring[travel-sample]'])
'data_monitoring[travel-sample]'])
else:
group_roles.extend(['security_admin_local', 'security_admin_external',
'data_reader[travel-sample:_default:_default]',
'data_writer[travel-sample:_default:_default]',
'data_dcp_reader[travel-sample:_default:_default]',
'data_monitoring[travel-sample:_default:_default]',
'data_backup[travel-sample]'])
'data_monitoring[travel-sample:_default:_default]'])
groups = []
for role in group_roles:
group_name = "group_" + role.split("[",1)[0]
Expand Down Expand Up @@ -116,21 +118,20 @@ def add_ldap_users(self, server):
rest.setup_ldap(param, '')

roles = '''cluster_admin,ro_admin,
bucket_admin[travel-sample],bucket_full_access[travel-sample],'''
bucket_admin[travel-sample],bucket_full_access[travel-sample],
data_backup[travel-sample],'''
if self.base_version < "7.0":
roles = roles + '''security_admin,
data_reader[travel-sample],
data_writer[travel-sample],
data_dcp_reader[travel-sample],
data_backup[travel-sample],
data_monitoring[travel-sample]'''
else:
roles = roles + '''security_admin_local,security_admin_external,
data_reader[travel-sample:_default:_default],
data_writer[travel-sample:_default:_default],
data_dcp_reader[travel-sample:_default:_default],
data_monitoring[travel-sample:_default:_default],
data_backup[travel-sample]'''
data_monitoring[travel-sample:_default:_default]'''

# add external users
user_name = "bjones"
Expand All @@ -141,7 +142,6 @@ def add_ldap_users(self, server):
def validate_rbac_users_post_upgrade(self, server):
expected_user_roles = {'cbadminbucket': ['admin'],
'user_group_data_backup': ['data_backup[travel-sample]'],
'user_group_security_admin': ['security_admin_local', 'security_admin_external'],
'user_group_ro_admin': ['ro_admin'],
'user_group_admin': ['admin'],
'user_group_bucket_full_access': ['bucket_full_access[travel-sample]'],
Expand All @@ -154,7 +154,8 @@ def validate_rbac_users_post_upgrade(self, server):
'ro_admin', 'cluster_admin']}
if self.base_version < "7.0":
expected_user_roles.update(
{'user_group_data_reader': ['data_reader[travel-sample][*][*]'],
{'user_group_security_admin': ['security_admin_local', 'security_admin_external'],
'user_group_data_reader': ['data_reader[travel-sample][*][*]'],
'user_group_data_writer': ['data_writer[travel-sample][*][*]'],
'user_group_data_monitoring': ['data_monitoring[travel-sample][*][*]'],
'user_group_data_dcp_reader': ['data_dcp_reader[travel-sample][*][*]']})
Expand All @@ -164,7 +165,9 @@ def validate_rbac_users_post_upgrade(self, server):
'data_dcp_reader[travel-sample][*][*]'])
else:
expected_user_roles.update(
{'user_group_data_reader': ['data_reader[travel-sample][_default][_default]'],
{'user_group_security_admin_external': ['security_admin_external'],
'user_group_security_admin_local': ['security_admin_local'],
'user_group_data_reader': ['data_reader[travel-sample][_default][_default]'],
'user_group_data_writer': ['data_writer[travel-sample][_default][_default]'],
'user_group_data_monitoring': ['data_monitoring[travel-sample][_default][_default]'],
'user_group_data_dcp_reader': ['data_dcp_reader[travel-sample][_default][_default]']})
Expand Down Expand Up @@ -196,6 +199,84 @@ def validate_rbac_users_post_upgrade(self, server):
self.fail("Validation of RBAC user roles post upgrade failed. Mismatch in the roles of {0}."
.format(user))
self.log.info("Validation of RBAC user roles post upgrade -- SUCCESS")
self.user_roles = observed_user_roles

def validate_user_roles_sdk(self, server):
"""
1. Create SDK connection
2. Check whether users created before upgrade are able to write/read docs
into the bucket post upgrade depending on their roles
Written in accordance with python sdk version - 2.4
"""
ntonencryptionBase().disable_nton_cluster(self.servers)
CbServer.use_https = False

for user in self.user_roles:
# select users with data roles
data_roles = [role for role in self.user_roles[user] if "data_" in role]
if data_roles:
self.log.info("USERNAME -- {0}".format(user))

# create sdk connection
bucket_name = "travel-sample"
url = 'couchbase://{ip}/{name}'.format(ip=server.ip, name=bucket_name)
if user == "bjones":
url = 'couchbase://{ip}/{name}?sasl_mech_force=PLAIN'.format(
ip=server.ip, name=bucket_name)
bucket = Bucket(url, username=user, password="password")

try:
# write/upsert into the doc
bucket.upsert('doc123', {'key123': 'value123'})
except CouchbaseInputError:
if any("data_reader" in role for role in data_roles) or ("data_dcp_reader" in \
role for role in data_roles) or any("data_monitoring" in role for \
role in data_roles):
self.log.info("Write permission not granted as expected")
else:
self.fail("Validation failed. The user should have permission to "
"write to the given bucket")
except CouchbaseError:
if any("data_writer" in role for role in data_roles) or any("data_backup" \
in role for role in data_roles):
self.log.info("Write permission granted as expected")
else:
self.fail("Validation failed. The user should not have permission to "
"write to the given bucket")
else:
if any("data_writer" in role for role in data_roles) or any("data_backup" \
in role for role in data_roles):
self.log.info("Write permission granted as expected")
else:
self.fail("Validation failed. The user should not have permission to "
"write to the given bucket")

try:
# read the doc
bucket.get('airline_10')
except CouchbaseInputError:
if any("data_writer" in role for role in data_roles) or any("data_monitoring" \
in role for role in data_roles):
self.log.info("Read permission not granted as expected")
else:
self.fail("Validation failed. The user should have permission to "
"read the given bucket")
except CouchbaseError:
if any("data_reader" in role for role in data_roles) or any("data_dcp_reader" \
in role for role in data_roles) or any("data_backup" in role for role \
in data_roles):
self.log.info("Read permission granted as expected")
else:
self.fail("Validation failed. The user should not have permission to "
"read the given bucket")
else:
if any("data_reader" in role for role in data_roles) or any("data_dcp_reader" \
in role for role in data_roles) or any("data_backup" in role for role \
in data_roles):
self.log.info("Read permission granted as expected")
else:
self.fail("Validation failed. The user should not have permission to "
"read the given bucket")

def convert_to_pkcs8(self, node):
"""
Expand Down Expand Up @@ -328,6 +409,7 @@ def test_multiple_CAs_offline(self):
self.wait_for_rebalance_to_complete(task)
self.auth(servers=self.servers)
self.validate_rbac_users_post_upgrade(self.master)
self.validate_user_roles_sdk(self.master)

def test_multiple_CAs_online(self):
"""
Expand Down Expand Up @@ -389,4 +471,5 @@ def test_multiple_CAs_online(self):
self.x509_new.delete_unused_out_of_the_box_CAs(server=self.master)
self.x509_new.upload_client_cert_settings(server=nodes_in_cluster[0])
self.auth(servers=nodes_in_cluster)
self.validate_rbac_users_post_upgrade(self.master)
self.validate_rbac_users_post_upgrade(self.master)
self.validate_user_roles_sdk(self.master)

0 comments on commit df8cd43

Please sign in to comment.