Skip to content

Commit

Permalink
CBQE-7208: 8/n Fix bash script and ..
Browse files Browse the repository at this point in the history
add support to delete trusted CAs

Change-Id: I15675c570698cb738a751e03539cbe3638608c6a
Reviewed-on: http://review.couchbase.org/c/testrunner/+/163315
Tested-by: Sumedh Basarkod <sumedhpb8@gmail.com>
Reviewed-by: Sumedh Basarkod <sumedhpb8@gmail.com>
Reviewed-by: Balakumaran G <balakumaran.gopal@couchbase.com>
  • Loading branch information
sumedhpb authored and bkumaran committed Oct 11, 2021
1 parent 019e706 commit 9abb62f
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 20 deletions.
11 changes: 11 additions & 0 deletions lib/membase/api/rest_client.py
Expand Up @@ -5923,6 +5923,17 @@ def get_trusted_CAs(self):
+ "/pools/default/trustedCAs", 'GET')
return status, content

def delete_trusted_CA(self, ca_id):
"""
Deletes a trusted CA from the cluster, given its ID
"""
status, content, header = self._http_request(self.baseUrl
+ "/pools/default/trustedCAs/"
+ str(ca_id),
'DELETE')
return status, content

def client_cert_auth(self, state, prefixes):
"""
Args:
Expand Down
6 changes: 4 additions & 2 deletions pytests/security/multiple_CA.py
Expand Up @@ -8,9 +8,11 @@ class MultipleCA(BaseTestCase):

def setUp(self):
super(MultipleCA, self).setUp()
self.standard = self.input.param("standard", "pkcs8")
self.passphrase_type = self.input.param("passphrase_type", "script")
self.encryption_type = self.input.param("encryption_type", "aes")
self.x509 = x509main(host=self.master, encryption_type=self.encryption_type,
self.encryption_type = self.input.param("encryption_type", "aes256")
self.x509 = x509main(host=self.master, standard=self.standard,
encryption_type=self.encryption_type,
passphrase_type=self.passphrase_type)
for server in self.servers:
self.x509.delete_inbox_folder_on_server(server=server)
Expand Down
112 changes: 94 additions & 18 deletions pytests/security/x509_multiple_CA_util.py
Expand Up @@ -65,7 +65,7 @@ def __init__(self,
client_ip="172.16.1.174", dns=None, uri=None,
alt_names="default",
standard="pkcs8",
encryption_type="aes",
encryption_type="aes256",
key_length=1024,
passphrase_type="plain",
passphrase_script_path="default",
Expand Down Expand Up @@ -99,7 +99,8 @@ def __init__(self,
self.standard = standard # PKCS standard; currently supports PKCS#1 & PKCS#8
if encryption_type in ["none", "None", "", None]:
encryption_type = None
self.encryption_type = encryption_type # encryption algo for private key in case of PKCS#8. Can be put to None
# encryption pkcs#5 v2 algo for private key in case of PKCS#8. Can be put to None
self.encryption_type = encryption_type
self.key_length = key_length
# Node private key passphrase settings
self.passphrase_type = passphrase_type # 'script'/'rest'/'plain'
Expand Down Expand Up @@ -269,7 +270,8 @@ def convert_to_pkcs8(self, node_ip, key_path, node_ca_dir):
# create bash file with "echo <passw>"
# TODo also support creating bash file that takes args
passphrase_path = node_ca_dir + "passphrase.sh"
bash_content = "echo '" + passw + "'"
bash_content = "#!/bin/bash\n"
bash_content = bash_content + "echo '" + passw + "'"
with open(passphrase_path, "w") as fh:
fh.write(bash_content)
os.chmod(passphrase_path, 0o777)
Expand All @@ -278,13 +280,14 @@ def convert_to_pkcs8(self, node_ip, key_path, node_ca_dir):
passw = response.content.decode('utf-8')

# convert cmd
convert_cmd = "openssl pkcs8 -in " + key_path + " -passout pass:" + passw +\
" -topk8 -out " + tmp_encrypted_key_path
convert_cmd = "openssl pkcs8 -in " + key_path + " -passout pass:" + passw + \
" -topk8 -v2 " + self.encryption_type + \
" -out " + tmp_encrypted_key_path
output, error = shell.execute_command(convert_cmd)
self.log.info('Output message is {0} and error message is {1}'.format(output, error))

else:
convert_cmd = "openssl pkcs8 -in " + key_path +\
convert_cmd = "openssl pkcs8 -in " + key_path + \
" -topk8 -nocrypt -out " + tmp_encrypted_key_path
output, error = shell.execute_command(convert_cmd)
self.log.info('Output message is {0} and error message is {1}'.format(output, error))
Expand Down Expand Up @@ -313,7 +316,7 @@ def generate_root_certificate(self, root_ca_name, cn_name=None):
" " + str(self.key_length))
self.log.info('Output message is {0} and error message is {1}'.format(output, error))
if cn_name is None:
cn_name = "RootCA" + root_ca_name
cn_name = root_ca_name
# create ca.pem
output, error = shell.execute_command("openssl req -config " + config_path +
" -new -x509 -days 3650" +
Expand Down Expand Up @@ -582,7 +585,7 @@ def generate_multiple_x509_certs(self, servers, spec_file_name="default"):
# third client
root_ca_name = "clientroot"
int_ca_name = "iclient1_" + root_ca_name
self.generate_root_certificate(root_ca_name, cn_name="clientroot")
self.generate_root_certificate(root_ca_name)
self.generate_intermediate_certificate(root_ca_name, int_ca_name)
self.generate_client_certificate(root_ca_name, int_ca_name)

Expand All @@ -599,13 +602,15 @@ def rotate_certs(self, all_servers, root_ca_names="all"):
"""
if root_ca_names == "all":
root_ca_names = copy.deepcopy(x509main.root_ca_names)
old_ids = self.get_ids_from_ca_names(ca_names=root_ca_names,
server=all_servers[0])
nodes_affected_ips = list()
for root_ca_name in root_ca_names:
root_ca_manifest = copy.deepcopy(x509main.manifest[root_ca_name])
del x509main.manifest[root_ca_name]
self.remove_directory(root_ca_manifest['path'])
x509main.root_ca_names.remove(root_ca_name)
cn_name = 'My Company Root CA ' + root_ca_name + ' rotated'
cn_name = root_ca_name + 'rotated'
self.generate_root_certificate(root_ca_name=root_ca_name,
cn_name=cn_name)
intermediate_cas_manifest = root_ca_manifest["intermediate"]
Expand Down Expand Up @@ -637,7 +642,8 @@ def rotate_certs(self, all_servers, root_ca_names="all"):
_ = self.upload_root_certs(server=server, root_ca_names=root_ca_names)
self.upload_node_certs(servers=servers)
self.create_ca_bundle()
# TODo delete off the old trusted CAs from the server
self.delete_trusted_CAs(server=servers[0], ids=old_ids,
mark_deleted=False)

def upload_root_certs(self, server=None, root_ca_names=None):
"""
Expand Down Expand Up @@ -691,7 +697,7 @@ def write_client_cert_json_new(self):

def upload_client_cert_settings(self, server=None):
"""
Upload client cert settings to CB server that was initialized in init function
Upload client cert settings(that was initialized in init function) to CB server
"""
if server is None:
server = self.host
Expand All @@ -711,7 +717,6 @@ def upload_client_cert_settings(self, server=None):
return content
else:
raise Exception(content)
# TODo move some of this code to rest client

def load_trusted_CAs(self, server=None, from_non_localhost=True):
if not server:
Expand Down Expand Up @@ -740,20 +745,22 @@ def reload_node_certificates(self, servers):
params
:servers: list of nodes
"""

def build_params(node):
params = dict()
if self.encryption_type:
params["privateKeyPassphrase"] = dict()
params["privateKeyPassphrase"]["type"] = self.passphrase_type
if self.passphrase_type == "script":
if self.passphrase_script_path != "default":
params["privateKeyPassphrase"]["path"] = self.passphrase_script_path +\
params["privateKeyPassphrase"]["path"] = self.passphrase_script_path + \
"/passphrase.sh"
else:
params["privateKeyPassphrase"]["path"] = self.install_path + \
x509main.CHAINFILEPATH +\
x509main.CHAINFILEPATH + \
"/passphrase.sh"
params["privateKeyPassphrase"]["timeout"] = self.passphrase_load_timeout
params["privateKeyPassphrase"]["trim"] = 'true'
if self.passphrase_script_args:
params["privateKeyPassphrase"]["args"] = self.passphrase_script_args
elif self.passphrase_type == "rest":
Expand All @@ -769,7 +776,9 @@ def build_params(node):

for server in servers:
rest = RestConnection(server)
params = build_params(server)
params = ''
if self.standard == "pkcs8":
params = build_params(server)
status, content = rest.reload_certificate(params=params)
if not status:
msg = "Could not load reload node cert on %s; Failed with error %s" \
Expand All @@ -785,8 +794,68 @@ def get_trusted_CAs(self, server=None):
msg = "Could not get trusted CAs on %s; Failed with error %s" \
% (server.ip, content)
raise Exception(msg)
return content
# ToDO write code to parse content
return json.loads(content.decode('utf-8'))

def get_ca_names_from_ids(self, ids, server=None):
"""
Returns list of root ca_names,
given a list of of CA IDs
"""
ca_names = list()
content = self.get_trusted_CAs(server=server)
for ca_dict in content:
if int(ca_dict["id"]) in ids:
subject = ca_dict["subject"]
root_ca_name = subject.split("CN=")[1]
ca_names.append(root_ca_name)
return ca_names

def get_ids_from_ca_names(self, ca_names, server=None):
"""
Returns list of CA IDs,
given a list of string of CA names
"""
ca_ids = list()
content = self.get_trusted_CAs(server=server)
for ca_dict in content:
ca_id = ca_dict["id"]
subject = ca_dict["subject"]
root_ca_name = subject.split("CN=")[1]
if root_ca_name in ca_names:
ca_ids.append(int(ca_id))
return ca_ids

def delete_trusted_CAs(self, server=None, ids=None, mark_deleted=True):
"""
Deletes trusted CAs from cluster
:server: server object to make rest (defaults to self.host)
:ids: list of CA IDs to delete. Defaults to all trusted CAs which
haven't signed any node
:mark_deleted: Boolean on whether to remove it from root_ca_names
global variable list. Defaults to True
Returns None
"""
if server is None:
server = self.host
rest = RestConnection(server)
if ids is None:
ids = list()
content = self.get_trusted_CAs(server)
for ca_dict in content:
if len(ca_dict["nodes"]) == 0:
ca_id = ca_dict["id"]
ids.append(ca_id)
ca_names = self.get_ca_names_from_ids(ids=ids, server=server)
for ca_id in ids:
status, content = rest.delete_trusted_CA(ca_id=ca_id)
if not status:
raise Exception("Could not delete trusted CA with id {0}".format(id))
if mark_deleted:
for ca_name in ca_names:
ca_name = ca_name.rstrip("rotated")
if ca_name in x509main.root_ca_names:
x509main.root_ca_names.remove(ca_name)

def copy_trusted_CAs(self, root_ca_names, server=None):
"""
Expand Down Expand Up @@ -831,6 +900,11 @@ def copy_node_cert(self, server):
self.log.info('Output message is {0} and error message is {1}'.format(output, error))
shell.disconnect()

@staticmethod
def regenerate_certs(server):
rest = RestConnection(server)
rest.regenerate_cluster_certificate()

def teardown_certs(self, servers):
"""
1. Remove dir from slave
Expand All @@ -839,7 +913,9 @@ def teardown_certs(self, servers):
self.remove_directory(x509main.CACERTFILEPATH)
for server in servers:
self.delete_inbox_folder_on_server(server=server)
# ToDO delete trusted certs
for server in servers:
self.regenerate_certs(server=server)
self.delete_trusted_CAs(server=server)


class Validation:
Expand Down

0 comments on commit 9abb62f

Please sign in to comment.