From 104dac2a1cbcd5fa1a75f31a0017ce4ae2045bc9 Mon Sep 17 00:00:00 2001 From: Benoit Date: Tue, 4 Feb 2020 11:19:27 +0100 Subject: [PATCH] (#261) Add -sha256 & SAN for certs to be consummed by Chrome/Brave & FF --- proxy/common/pki.py | 47 ++++++++++++++++---------- proxy/http/proxy/server.py | 19 +++-------- tests/common/test_pki.py | 67 +++++++++++++++++++++++++++----------- 3 files changed, 82 insertions(+), 51 deletions(-) diff --git a/proxy/common/pki.py b/proxy/common/pki.py index 0f1030b616..7a8764ac4c 100644 --- a/proxy/common/pki.py +++ b/proxy/common/pki.py @@ -16,7 +16,7 @@ import subprocess import tempfile import logging -from typing import List, Generator, Optional, Tuple +from typing import Generator, List, Optional, Tuple from .utils import bytes_ from .constants import COMMA @@ -50,6 +50,7 @@ challengePassword = A challenge password challengePassword_min = 4 challengePassword_max = 20''' +SAN_KEY = "SAN" def remove_passphrase( @@ -101,23 +102,22 @@ def gen_public_key( ] if has_extension: command.extend([ - '-extensions', 'PROXY', + '-extensions', SAN_KEY, ]) return run_openssl_command(command, timeout) def gen_csr( csr_path: str, - key_path: str, - password: str, - crt_path: str, + signing_key_path: str, + subject: str, + validity_in_days: int = 365, timeout: int = 10) -> bool: """Generates a CSR based upon existing certificate and key file.""" command = [ - 'openssl', 'x509', '-x509toreq', - '-passin', 'pass:%s' % password, - '-in', crt_path, '-signkey', key_path, - '-out', csr_path + 'openssl', 'req', '-new', '-sha256', + '-days', str(validity_in_days), '-subj', subject, + '-key', signing_key_path, '-out', csr_path ] return run_openssl_command(command, timeout) @@ -126,10 +126,9 @@ def sign_csr( csr_path: str, crt_path: str, ca_key_path: str, - ca_key_password: str, ca_crt_path: str, serial: str, - alt_subj_names: Optional[List[str]] = None, + alt_subj_names: List[str], extended_key_usage: Optional[str] = None, validity_in_days: int = 365, timeout: int = 10) -> bool: @@ -139,28 +138,42 @@ def sign_csr( 'openssl', 'x509', '-req', '-sha256', '-CA', ca_crt_path, '-CAkey', ca_key_path, - '-passin', 'pass:%s' % ca_key_password, '-set_serial', serial, '-days', str(validity_in_days), '-extfile', extension_path, + '-extensions', SAN_KEY, '-in', csr_path, '-out', crt_path, ] return run_openssl_command(command, timeout) -def get_ext_config( - alt_subj_names: Optional[List[str]] = None, - extended_key_usage: Optional[str] = None) -> bytes: +def gen_crt( + crt_path: str, + signing_key_path: str, + ca_key_path: str, + ca_crt_path: str, + tld: str, + serial: str) -> None: + """For a given tld, generate an signed cert.""" + temp_csr = os.path.join(tempfile.gettempdir(), uuid.uuid4().hex) + gen_csr(temp_csr, signing_key_path, f'/C=/ST=/L=/O=/OU=/CN={tld}') + sign_csr(temp_csr, crt_path, + ca_key_path, ca_crt_path, serial, [tld]) + + +def get_ext_config(alt_subj_names: Optional[List[str]] = None, extended_key_usage: Optional[str] = None) -> bytes: config = b'' # Add SAN extension if alt_subj_names is not None and len(alt_subj_names) > 0: + config += b'[' + bytes_(SAN_KEY) + b']' alt_names = [] for cname in alt_subj_names: alt_names.append(b'DNS:%s' % bytes_(cname)) config += b'\nsubjectAltName=' + COMMA.join(alt_names) # Add extendedKeyUsage section if extended_key_usage is not None: + config += b'[PROXY]' config += b'\nextendedKeyUsage=' + bytes_(extended_key_usage) return config @@ -191,7 +204,6 @@ def ssl_config( if (alt_subj_names is not None and len(alt_subj_names) > 0) or \ extended_key_usage is not None: has_extension = True - config += b'\n[PROXY]' # Add custom extensions config += get_ext_config(alt_subj_names, extended_key_usage) @@ -219,8 +231,7 @@ def run_openssl_command(command: List[str], timeout: int) -> bool: if __name__ == '__main__': available_actions = ( - 'remove_passphrase', 'gen_private_key', 'gen_public_key', - 'gen_csr', 'sign_csr' + 'remove_passphrase', 'gen_private_key', 'gen_public_key' ) parser = argparse.ArgumentParser( diff --git a/proxy/http/proxy/server.py b/proxy/http/proxy/server.py index 7662414572..df40c6a9dd 100644 --- a/proxy/http/proxy/server.py +++ b/proxy/http/proxy/server.py @@ -9,7 +9,6 @@ :license: BSD, see LICENSE for more details. """ import threading -import subprocess import os import ssl import socket @@ -25,6 +24,7 @@ from ..parser import HttpParser, httpParserStates, httpParserTypes from ..methods import httpMethods +from ...common import pki from ...common.types import HasFileno from ...common.constants import PROXY_AGENT_HEADER_VALUE from ...common.utils import build_http_response, text_ @@ -278,8 +278,8 @@ def on_request_complete(self) -> Union[socket.socket, bool]: logger.error( 'BrokenPipeError when wrapping client') return True - except OSError: - logger.error('OSError when wrapping client') + except OSError as e: + logger.error('OSError when wrapping client:"{}"'.format(e.strerror)) return True # Update all plugin connection reference for plugin in self.plugins.values(): @@ -362,18 +362,9 @@ def generate_upstream_certificate( logger.debug('Generating certificates %s', cert_file_path) # TODO: Parse subject from certificate # Currently we only set CN= field for generated certificates. - gen_cert = subprocess.Popen( - ['openssl', 'req', '-new', '-key', self.flags.ca_signing_key_file, '-subj', - f'/C=/ST=/L=/O=/OU=/CN={ text_(self.request.host) }'], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - sign_cert = subprocess.Popen( - ['openssl', 'x509', '-req', '-days', '365', '-CA', self.flags.ca_cert_file, '-CAkey', - self.flags.ca_key_file, '-set_serial', str(self.uid.int), '-out', cert_file_path], - stdin=gen_cert.stdout, - stderr=subprocess.PIPE) # TODO: Ensure sign_cert success. - sign_cert.communicate(timeout=10) + pki.gen_crt(cert_file_path, self.flags.ca_signing_key_file, self.flags.ca_key_file, + self.flags.ca_cert_file, text_(self.request.host), str(self.uid.int)) return cert_file_path def wrap_server(self) -> None: diff --git a/tests/common/test_pki.py b/tests/common/test_pki.py index ebeb767dba..dfafee3aa4 100644 --- a/tests/common/test_pki.py +++ b/tests/common/test_pki.py @@ -33,20 +33,20 @@ def test_run_openssl_command(self, mock_popen: mock.Mock) -> None: def test_get_ext_config(self) -> None: self.assertEqual(pki.get_ext_config(None, None), b'') self.assertEqual(pki.get_ext_config([], None), b'') - self.assertEqual( - pki.get_ext_config( - ['proxy.py'], - None), - b'\nsubjectAltName=DNS:proxy.py') - self.assertEqual( - pki.get_ext_config( - None, - 'serverAuth'), - b'\nextendedKeyUsage=serverAuth') + self.assertEqual(pki.get_ext_config( + ['proxy.py'], None), b'[SAN]\nsubjectAltName=DNS:proxy.py') + self.assertEqual(pki.get_ext_config(None, 'serverAuth'), + b'[PROXY]\nextendedKeyUsage=serverAuth') self.assertEqual(pki.get_ext_config(['proxy.py'], 'serverAuth'), - b'\nsubjectAltName=DNS:proxy.py\nextendedKeyUsage=serverAuth') + b'[SAN]\nsubjectAltName=DNS:proxy.py[PROXY]\nextendedKeyUsage=serverAuth') self.assertEqual(pki.get_ext_config(['proxy.py', 'www.proxy.py'], 'serverAuth'), - b'\nsubjectAltName=DNS:proxy.py,DNS:www.proxy.py\nextendedKeyUsage=serverAuth') + b'[SAN]\nsubjectAltName=DNS:proxy.py,DNS:www.proxy.py[PROXY]\nextendedKeyUsage=serverAuth') + self.assertEqual(pki.get_ext_config( + ['proxy.py']), b'[SAN]\nsubjectAltName=DNS:proxy.py') + self.assertEqual(pki.get_ext_config( + ['proxy.py']), b'[SAN]\nsubjectAltName=DNS:proxy.py') + self.assertEqual(pki.get_ext_config(['proxy.py', 'www.proxy.py']), + b'[SAN]\nsubjectAltName=DNS:proxy.py,DNS:www.proxy.py') def test_ssl_config_no_ext(self) -> None: with pki.ssl_config() as (config_path, has_extension): @@ -61,7 +61,7 @@ def test_ssl_config(self) -> None: self.assertEqual( config.read(), pki.DEFAULT_CONFIG + - b'\n[PROXY]\nsubjectAltName=DNS:proxy.py') + b'[SAN]\nsubjectAltName=DNS:proxy.py') def test_extfile_no_ext(self) -> None: with pki.ext_file() as config_path: @@ -73,7 +73,7 @@ def test_extfile(self) -> None: with open(config_path, 'rb') as config: self.assertEqual( config.read(), - b'\nsubjectAltName=DNS:proxy.py') + b'[SAN]\nsubjectAltName=DNS:proxy.py') def test_gen_private_key(self) -> None: key_path, nopass_key_path = self._gen_private_key() @@ -93,26 +93,55 @@ def test_gen_public_key(self) -> None: def test_gen_csr(self) -> None: key_path, nopass_key_path, crt_path = self._gen_public_private_key() csr_path = os.path.join(tempfile.gettempdir(), 'test_gen_public.csr') - pki.gen_csr(csr_path, key_path, 'password', crt_path) + pki.gen_csr(csr_path, nopass_key_path, + '/C=/ST=/L=/O=/OU=/CN=example.com') self.assertTrue(os.path.exists(csr_path)) # TODO: Assert CSR is valid for provided crt and key os.remove(csr_path) - os.remove(crt_path) os.remove(key_path) os.remove(nopass_key_path) def test_sign_csr(self) -> None: - pass + key_path, nopass_key_path, crt_path = self._gen_public_private_key() + key_path, nopass_key_path_signed = self._gen_private_key() + + csr_path = os.path.join(tempfile.gettempdir(), 'test_gen_public.csr') + pki.gen_csr(csr_path, nopass_key_path_signed, + '/C=/ST=/L=/O=/OU=/CN=example.com') + + csr_path_signed = os.path.join(tempfile.gettempdir(), + 'test_gen_public_signed.pem') + pki.sign_csr(csr_path, csr_path_signed, nopass_key_path, crt_path, + str(123), ["example.com"]) + self.assertTrue(os.path.exists(csr_path_signed)) + os.remove(csr_path) + os.remove(crt_path) + os.remove(key_path) + os.remove(nopass_key_path) + os.remove(csr_path_signed) + + def test_gen_web_cert(self) -> None: + key_path, nopass_key_path, crt_path = self._gen_public_private_key() + key_path, nopass_key_path_signed = self._gen_private_key() + + out = os.path.join(tempfile.gettempdir(), + 'web-csr.pem') + pki.gen_crt(out, nopass_key_path_signed, nopass_key_path, + crt_path, "example.com", str(123)) + + self.assertTrue(os.path.exists(out)) def _gen_public_private_key(self) -> Tuple[str, str, str]: key_path, nopass_key_path = self._gen_private_key() crt_path = os.path.join(tempfile.gettempdir(), 'test_gen_public.crt') - pki.gen_public_key(crt_path, key_path, 'password', '/CN=example.com') + pki.gen_public_key(crt_path, nopass_key_path, + "password", '/CN=example.com') return (key_path, nopass_key_path, crt_path) def _gen_private_key(self) -> Tuple[str, str]: key_path = os.path.join(tempfile.gettempdir(), 'test_gen_private.key') - nopass_key_path = os.path.join(tempfile.gettempdir(), 'test_gen_private_nopass.key') + nopass_key_path = os.path.join( + tempfile.gettempdir(), 'test_gen_private_nopass.key') pki.gen_private_key(key_path, 'password') pki.remove_passphrase(key_path, 'password', nopass_key_path) return (key_path, nopass_key_path)