Skip to content

Commit

Permalink
CBQE-7208: 7/n Infra for pkcs#8 private keys
Browse files Browse the repository at this point in the history
Also adding an unrelated change to add a negative test for mult CA

Change-Id: Ie3bc4cf627a36299a9eb91b565698c0609224424
Reviewed-on: http://review.couchbase.org/c/testrunner/+/162636
Tested-by: Sumedh Basarkod <sumedhpb8@gmail.com>
Tested-by: Balakumaran G <balakumaran.gopal@couchbase.com>
Reviewed-by: Sumedh Basarkod <sumedhpb8@gmail.com>
Reviewed-by: Balakumaran G <balakumaran.gopal@couchbase.com>
  • Loading branch information
sumedhpb committed Oct 4, 2021
1 parent cf321b0 commit 6cebf8d
Show file tree
Hide file tree
Showing 4 changed files with 222 additions and 28 deletions.
7 changes: 5 additions & 2 deletions lib/membase/api/rest_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5903,13 +5903,16 @@ def load_trusted_CAs(self):
"node/controller/loadTrustedCAs", 'POST')
return status, content

def reload_certificate(self):
def reload_certificate(self, params=''):
""" Reload certificate
Call this function after uploading a certificate to the cluster to activate the new certificate.
"""
headers = self._create_capi_headers()
status, content, header = self._http_request(self.baseUrl + "node/controller/reloadCertificate", 'POST', headers=headers)
status, content, header = self._http_request(self.baseUrl + "node/controller/reloadCertificate",
'POST',
headers=headers,
params=params)
return status, content

def get_trusted_CAs(self):
Expand Down
5 changes: 4 additions & 1 deletion pytests/security/multiple_CA.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ class MultipleCA(BaseTestCase):

def setUp(self):
super(MultipleCA, self).setUp()
self.x509 = x509main(host=self.master)
self.passphrase_type = self.input.param("passphrase_type", "script")
self.encryption_type = self.input.param("encryption_type", "aes")
self.x509 = x509main(host=self.master, encryption_type=self.encryption_type,
passphrase_type=self.passphrase_type)
for server in self.servers:
self.x509.delete_inbox_folder_on_server(server=server)
self.basic_url = "https://" + self.servers[0].ip + ":18091/pools/default/"
Expand Down
53 changes: 53 additions & 0 deletions pytests/security/multiple_CA_negative.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import copy
import json

from pytests.basetestcase import BaseTestCase
from pytests.security.x509_multiple_CA_util import x509main, Validation


class MultipleCANegative(BaseTestCase):

def setUp(self):
super(MultipleCANegative, self).setUp()
self.x509 = x509main(host=self.master)
for server in self.servers:
self.x509.delete_inbox_folder_on_server(server=server)
self.basic_url = "https://" + self.servers[0].ip + ":18091/pools/default/"

def tearDown(self):
self.x509 = x509main(host=self.master)
self.x509.teardown_certs(servers=self.servers)
super(MultipleCANegative, self).tearDown()

def test_untrusted_client_cert_fails(self):
"""
Verify that a client cert signed by an untrusted root
CA is not authenticated
"""
self.x509.generate_multiple_x509_certs(servers=self.servers)
self.log.info("Manifest #########\n {0}".format(json.dumps(x509main.manifest, indent=4)))
cas = copy.deepcopy(x509main.root_ca_names)
cas.remove("clientroot") # make "clientroot" ca untrusted
for server in self.servers[:self.nodes_init]:
_ = self.x509.upload_root_certs(server=server, root_ca_names=cas)
self.x509.upload_node_certs(servers=self.servers[:self.nodes_init])
self.x509.upload_client_cert_settings(server=self.servers[0])
client_cert_path_tuple = self.x509.get_client_cert(int_ca_name="iclient1_clientroot")
self.x509_validation = Validation(server=self.servers[0],
cacert=None,
client_cert_path_tuple=client_cert_path_tuple)
try:
status, content, response = self.x509_validation.urllib_request(api=self.basic_url)
except Exception as e:
self.log.info("Rest api connection with untrusted client cert "
"didn't work as expected {0}".format(e))
else:
self.fail("Rest api connection with untrusted client cert worked")
try:
client = self.x509_validation.sdk_connection()
self.x509_validation.creates_sdk(client)
except Exception as e:
self.log.info("SDk connection with untrusted client cert didn't work "
"as expected {0}".format(e))
else:
self.fail("SDK connection with untrusted client cert worked")
185 changes: 160 additions & 25 deletions pytests/security/x509_multiple_CA_util.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
import base64
import importlib
import json
import random
import os
import copy
import fileinput
import sys
import string
import time
import requests
import logger
import httplib2

from shutil import copyfile
from lib.Cb_constants.CBServer import CbServer
from lib.membase.api.rest_client import RestConnection
from lib.remote.remote_util import RemoteMachineShellConnection
from couchbase.bucket import Bucket
Expand Down Expand Up @@ -42,7 +41,7 @@ class x509main:
SLAVE_HOST = ServerInfo('127.0.0.1', 22, 'root', 'couchbase')
CLIENT_CERT_AUTH_JSON = 'client_cert_auth1.json'
CLIENT_CERT_AUTH_TEMPLATE = 'client_cert_config_template.txt'
IP_ADDRESS = '172.16.1.174'
IP_ADDRESS = '172.16.1.174' # dummy ip address
ROOT_CA_CONFIG = "./pytests/security/x509_extension_files/config"
CA_EXT = "./pytests/security/x509_extension_files/ca.ext"
SERVER_EXT = "./pytests/security/x509_extension_files/server.ext"
Expand All @@ -54,39 +53,61 @@ class x509main:
manifest = {} # active CA manifest
node_ca_map = {} # {<node_ip>: {signed_by: <int_ca_name>, path: <node_ca_dir>}}
client_ca_map = {} # {<client_ca_name>: {signed_by: <int_ca_name>, path: <client_ca_dir>}}
private_key_passphrase_map = {} # {<node_ip>:<plain_passw>}
ca_count = 0 # total count of active root certs

def __init__(self,
host=None,
wildcard_dns=None,
client_cert_state="enable",
paths="subject.cn:san.dnsname:san.uri",
prefixs="www.cb-:us.:www.", delimeter=".:.:.",
client_ip="172.16.1.174", dns=None, uri=None,
alt_names="default",
encryption_type="",
standard="pkcs8",
encryption_type="aes",
key_length=1024,
wildcard_dns=None):
passphrase_type="plain",
passphrase_script_path="default",
passphrase_script_args=None,
passhprase_url="https://testingsomething.free.beeceptor.com/",
passphrase_plain="default",
passphrase_load_timeout=5000,
https_opts=None):
if https_opts is None:
self.https_opts = {"verifyPeer": 'false'}
self.log = logger.Logger.get_logger()

if host is not None:
self.host = host
self.install_path = self._get_install_path(self.host)
self.slave_host = x509main.SLAVE_HOST
self.disable_ssl_certificate_validation = False
if CbServer.use_https:
self.disable_ssl_certificate_validation = True

self.client_cert_state = client_cert_state
self.paths = paths.split(":") # client cert's paths
self.prefixs = prefixs.split(":") # client cert's prefixes
self.delimeters = delimeter.split(":") # client cert's delimiters

# Node cert settings
self.wildcard_dns = wildcard_dns

# Client cert settings
self.client_cert_state = client_cert_state # enable/disable/mandatory
self.paths = paths.split(":") # client cert's SAN paths
self.prefixs = prefixs.split(":") # client cert's SAN prefixes
self.delimeters = delimeter.split(":") # client cert's SAN delimiters
self.client_ip = client_ip # a dummy client ip name.
self.dns = dns # client cert's san.dns
self.uri = uri # client cert's san.uri
self.alt_names = alt_names # either 'default' or 'non-default' alt names
self.encryption_type = encryption_type
self.alt_names = alt_names # either 'default' or 'non-default' SAN names

# Node private key settings
self.standard = standard # PKCS standard; currently supports PKCS#1 & PKCS#8
if encryption_type in ["none", "None", "", None]:
encryption_type = None
self.encryption_type = encryption_type # encryption algo for private key in case of PKCS#8. Can be put to None
self.key_length = key_length
self.wildcard_dns = wildcard_dns
# Node private key passphrase settings
self.passphrase_type = passphrase_type # 'script'/'rest'/'plain'
self.passphrase_script_path = passphrase_script_path # path on node to store the bash script
self.passphrase_script_args = passphrase_script_args
self.passphrase_url = passhprase_url
self.passphrase_plain = passphrase_plain
self.passphrase_load_timeout = passphrase_load_timeout

# Get the install path for different operating systems
def _get_install_path(self, host):
Expand Down Expand Up @@ -186,6 +207,14 @@ def get_node_cert(server):
"long_chain" + server.ip + ".pem"
return node_ca_key_path, node_ca_path

@staticmethod
def get_node_private_key_passphrase_script(server):
"""
Given a server object,
returns the path of the bash script(which prints pkey passphrase for that node) on slave
"""
return x509main.node_ca_map[str(server.ip)]["path"] + "passphrase.sh"

def get_client_cert(self, int_ca_name):
"""
returns client's cert and key
Expand Down Expand Up @@ -213,6 +242,62 @@ def create_ca_bundle(self):
shell.execute_command(cat_cmd)
shell.disconnect()

def convert_to_pkcs8(self, node_ip, key_path, node_ca_dir):
"""
converts pkcs#1 key to encrypted/un-encrypted pkcs#8 key
with the same name as pkcs#1 key
:node_ip: ip_addr of the node
:key_path: pkcs#1 key path on slave
:node_ca_dir: node's dir which contains related cert documents.
"""
tmp_encrypted_key_path = node_ca_dir + "enckey.key"
shell = RemoteMachineShellConnection(self.slave_host)
if self.encryption_type:
if self.passphrase_type == "plain":
if self.passphrase_plain != "default":
passw = self.passphrase_plain
else:
# generate passw
passw = ''.join(random.choice(string.ascii_uppercase + string.digits)
for _ in range(20))
x509main.private_key_passphrase_map[str(node_ip)] = passw
elif self.passphrase_type == "script":
# generate passw
passw = ''.join(random.choice(string.ascii_uppercase + string.digits)
for _ in range(20))
# create bash file with "echo <passw>"
# TODo also support creating bash file that takes args
passphrase_path = node_ca_dir + "passphrase.sh"
bash_content = "echo '" + passw + "'"
with open(passphrase_path, "w") as fh:
fh.write(bash_content)
os.chmod(passphrase_path, 0o777)
else:
response = requests.get(self.passphrase_url)
passw = response.content.decode('utf-8')

# convert cmd
convert_cmd = "openssl pkcs8 -in " + key_path + " -passout pass:" + passw +\
" -topk8 -out " + tmp_encrypted_key_path
output, error = shell.execute_command(convert_cmd)
self.log.info('Output message is {0} and error message is {1}'.format(output, error))

else:
convert_cmd = "openssl pkcs8 -in " + key_path +\
" -topk8 -nocrypt -out " + tmp_encrypted_key_path
output, error = shell.execute_command(convert_cmd)
self.log.info('Output message is {0} and error message is {1}'.format(output, error))

# delete old pkcs1 key & rename encrypted key
del_cmd = "rm -rf " + key_path
output, error = shell.execute_command(del_cmd)
self.log.info('Output message is {0} and error message is {1}'.format(output, error))
mv_cmd = "mv " + tmp_encrypted_key_path + " " + key_path
output, error = shell.execute_command(mv_cmd)
self.log.info('Output message is {0} and error message is {1}'.format(output, error))
shell.disconnect()

def generate_root_certificate(self, root_ca_name, cn_name=None):
root_ca_dir = x509main.CACERTFILEPATH + root_ca_name + "/"
self.create_directory(root_ca_dir)
Expand All @@ -223,7 +308,7 @@ def generate_root_certificate(self, root_ca_name, cn_name=None):

shell = RemoteMachineShellConnection(self.slave_host)
# create ca.key
output, error = shell.execute_command("openssl genrsa " + self.encryption_type +
output, error = shell.execute_command("openssl genrsa " +
" -out " + root_ca_key_path +
" " + str(self.key_length))
self.log.info('Output message is {0} and error message is {1}'.format(output, error))
Expand Down Expand Up @@ -257,7 +342,7 @@ def generate_intermediate_certificate(self, root_ca_name, int_ca_name):

shell = RemoteMachineShellConnection(self.slave_host)
# create int CA private key
output, error = shell.execute_command("openssl genrsa " + self.encryption_type +
output, error = shell.execute_command("openssl genrsa " +
" -out " + int_ca_key_path +
" " + str(self.key_length))
self.log.info('Output message is {0} and error message is {1}'.format(output, error))
Expand Down Expand Up @@ -318,7 +403,7 @@ def generate_node_certificate(self, root_ca_name, int_ca_name, node_ip):

shell = RemoteMachineShellConnection(self.slave_host)
# create node CA private key
output, error = shell.execute_command("openssl genrsa " + self.encryption_type +
output, error = shell.execute_command("openssl genrsa " +
" -out " + node_ca_key_path +
" " + str(self.key_length))
self.log.info('Output message is {0} and error message is {1}'.format(output, error))
Expand Down Expand Up @@ -346,6 +431,10 @@ def generate_node_certificate(self, root_ca_name, int_ca_name, node_ip):
" > " + node_chain_ca_path)
self.log.info('Output message is {0} and error message is {1}'.format(output, error))

if self.standard == "pkcs8":
self.convert_to_pkcs8(node_ip=node_ip, key_path=node_ca_key_path,
node_ca_dir=node_ca_dir)

os.remove(temp_cert_extensions_file)
shell.disconnect()
x509main.node_ca_map[str(node_ip)] = dict()
Expand Down Expand Up @@ -394,7 +483,7 @@ def generate_client_certificate(self, root_ca_name, int_ca_name):

shell = RemoteMachineShellConnection(self.slave_host)
# create private key for client
output, error = shell.execute_command("openssl genrsa " + self.encryption_type +
output, error = shell.execute_command("openssl genrsa " +
" -out " + client_ca_key_path +
" " + str(self.key_length))
self.log.info('Output message is {0} and error message is {1}'.format(output, error))
Expand Down Expand Up @@ -644,17 +733,44 @@ def load_trusted_CAs(self, server=None, from_non_localhost=True):
return content
# ToDO write code to upload from localhost

@staticmethod
def reload_node_certificates(servers):
def reload_node_certificates(self, servers):
"""
reload node certificates from inbox folder
params
:servers: list of nodes
"""
def build_params(node):
params = dict()
if self.encryption_type:
params["privateKeyPassphrase"] = dict()
params["privateKeyPassphrase"]["type"] = self.passphrase_type
if self.passphrase_type == "script":
if self.passphrase_script_path != "default":
params["privateKeyPassphrase"]["path"] = self.passphrase_script_path +\
"/passphrase.sh"
else:
params["privateKeyPassphrase"]["path"] = self.install_path + \
x509main.CHAINFILEPATH +\
"/passphrase.sh"
params["privateKeyPassphrase"]["timeout"] = self.passphrase_load_timeout
if self.passphrase_script_args:
params["privateKeyPassphrase"]["args"] = self.passphrase_script_args
elif self.passphrase_type == "rest":
params["privateKeyPassphrase"]["url"] = self.passphrase_url
params["privateKeyPassphrase"]["timeout"] = self.passphrase_load_timeout
params["privateKeyPassphrase"]["httpsOpts"] = self.https_opts
else:
params["privateKeyPassphrase"]["type"] = "plain"
params["privateKeyPassphrase"]["password"] = \
x509main.private_key_passphrase_map[str(node.ip)]
params = json.dumps(params)
return params

for server in servers:
rest = RestConnection(server)
status, content = rest.reload_certificate()
params = build_params(server)
status, content = rest.reload_certificate(params=params)
if not status:
msg = "Could not load reload node cert on %s; Failed with error %s" \
% (server.ip, content)
Expand Down Expand Up @@ -695,6 +811,25 @@ def copy_node_cert(self, server):
self.copy_file_from_slave_to_server(server, node_ca_path, dest_pem_path)
dest_pkey_path = self.install_path + x509main.CHAINFILEPATH + "/pkey.key"
self.copy_file_from_slave_to_server(server, node_ca_key_path, dest_pkey_path)
if self.encryption_type and self.passphrase_type == "script":
node_key_passphrase_path = self.get_node_private_key_passphrase_script(server)
if self.passphrase_script_path == "default":
dest_node_key_passphrase_path = self.install_path + \
x509main.CHAINFILEPATH + \
"/passphrase.sh"
else:
dest_node_key_passphrase_path = self.passphrase_script_path + \
"/passphrase.sh"
self.copy_file_from_slave_to_server(server, node_key_passphrase_path,
dest_node_key_passphrase_path)
shell = RemoteMachineShellConnection(server)
output, error = shell.execute_command("chown couchbase:couchbase " +
dest_node_key_passphrase_path)
self.log.info('Output message is {0} and error message is {1}'.format(output, error))
output, error = shell.execute_command("chmod 777 " +
dest_node_key_passphrase_path)
self.log.info('Output message is {0} and error message is {1}'.format(output, error))
shell.disconnect()

def teardown_certs(self, servers):
"""
Expand Down

0 comments on commit 6cebf8d

Please sign in to comment.