Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 29 additions & 18 deletions proxy/common/pki.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import subprocess
import tempfile
import logging
from typing import List, Generator, Optional, Tuple
from typing import Generator, List, Optional, Tuple

from .utils import bytes_
from .constants import COMMA
Expand Down Expand Up @@ -50,6 +50,7 @@
challengePassword = A challenge password
challengePassword_min = 4
challengePassword_max = 20'''
SAN_KEY = "SAN"


def remove_passphrase(
Expand Down Expand Up @@ -101,23 +102,22 @@ def gen_public_key(
]
if has_extension:
command.extend([
'-extensions', 'PROXY',
'-extensions', SAN_KEY,
])
return run_openssl_command(command, timeout)


def gen_csr(
csr_path: str,
key_path: str,
password: str,
crt_path: str,
signing_key_path: str,
subject: str,
validity_in_days: int = 365,
timeout: int = 10) -> bool:
"""Generates a CSR based upon existing certificate and key file."""
command = [
'openssl', 'x509', '-x509toreq',
'-passin', 'pass:%s' % password,
'-in', crt_path, '-signkey', key_path,
'-out', csr_path
'openssl', 'req', '-new', '-sha256',
'-days', str(validity_in_days), '-subj', subject,
'-key', signing_key_path, '-out', csr_path
]
return run_openssl_command(command, timeout)

Expand All @@ -126,10 +126,9 @@ def sign_csr(
csr_path: str,
crt_path: str,
ca_key_path: str,
ca_key_password: str,
ca_crt_path: str,
serial: str,
alt_subj_names: Optional[List[str]] = None,
alt_subj_names: List[str],
extended_key_usage: Optional[str] = None,
validity_in_days: int = 365,
timeout: int = 10) -> bool:
Expand All @@ -139,28 +138,42 @@ def sign_csr(
'openssl', 'x509', '-req', '-sha256',
'-CA', ca_crt_path,
'-CAkey', ca_key_path,
'-passin', 'pass:%s' % ca_key_password,
'-set_serial', serial,
'-days', str(validity_in_days),
'-extfile', extension_path,
'-extensions', SAN_KEY,
'-in', csr_path,
'-out', crt_path,
]
return run_openssl_command(command, timeout)


def get_ext_config(
alt_subj_names: Optional[List[str]] = None,
extended_key_usage: Optional[str] = None) -> bytes:
def gen_crt(
crt_path: str,
signing_key_path: str,
ca_key_path: str,
ca_crt_path: str,
tld: str,
serial: str) -> None:
"""For a given tld, generate an signed cert."""
temp_csr = os.path.join(tempfile.gettempdir(), uuid.uuid4().hex)
gen_csr(temp_csr, signing_key_path, f'/C=/ST=/L=/O=/OU=/CN={tld}')
sign_csr(temp_csr, crt_path,
ca_key_path, ca_crt_path, serial, [tld])


def get_ext_config(alt_subj_names: Optional[List[str]] = None, extended_key_usage: Optional[str] = None) -> bytes:
config = b''
# Add SAN extension
if alt_subj_names is not None and len(alt_subj_names) > 0:
config += b'[' + bytes_(SAN_KEY) + b']'
alt_names = []
for cname in alt_subj_names:
alt_names.append(b'DNS:%s' % bytes_(cname))
config += b'\nsubjectAltName=' + COMMA.join(alt_names)
# Add extendedKeyUsage section
if extended_key_usage is not None:
config += b'[PROXY]'
config += b'\nextendedKeyUsage=' + bytes_(extended_key_usage)
return config

Expand Down Expand Up @@ -191,7 +204,6 @@ def ssl_config(
if (alt_subj_names is not None and len(alt_subj_names) > 0) or \
extended_key_usage is not None:
has_extension = True
config += b'\n[PROXY]'

# Add custom extensions
config += get_ext_config(alt_subj_names, extended_key_usage)
Expand Down Expand Up @@ -219,8 +231,7 @@ def run_openssl_command(command: List[str], timeout: int) -> bool:

if __name__ == '__main__':
available_actions = (
'remove_passphrase', 'gen_private_key', 'gen_public_key',
'gen_csr', 'sign_csr'
'remove_passphrase', 'gen_private_key', 'gen_public_key'
)

parser = argparse.ArgumentParser(
Expand Down
19 changes: 5 additions & 14 deletions proxy/http/proxy/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
:license: BSD, see LICENSE for more details.
"""
import threading
import subprocess
import os
import ssl
import socket
Expand All @@ -25,6 +24,7 @@
from ..parser import HttpParser, httpParserStates, httpParserTypes
from ..methods import httpMethods

from ...common import pki
from ...common.types import HasFileno
from ...common.constants import PROXY_AGENT_HEADER_VALUE
from ...common.utils import build_http_response, text_
Expand Down Expand Up @@ -278,8 +278,8 @@ def on_request_complete(self) -> Union[socket.socket, bool]:
logger.error(
'BrokenPipeError when wrapping client')
return True
except OSError:
logger.error('OSError when wrapping client')
except OSError as e:
logger.error('OSError when wrapping client:"{}"'.format(e.strerror))
return True
# Update all plugin connection reference
for plugin in self.plugins.values():
Expand Down Expand Up @@ -362,18 +362,9 @@ def generate_upstream_certificate(
logger.debug('Generating certificates %s', cert_file_path)
# TODO: Parse subject from certificate
# Currently we only set CN= field for generated certificates.
gen_cert = subprocess.Popen(
['openssl', 'req', '-new', '-key', self.flags.ca_signing_key_file, '-subj',
f'/C=/ST=/L=/O=/OU=/CN={ text_(self.request.host) }'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
sign_cert = subprocess.Popen(
['openssl', 'x509', '-req', '-days', '365', '-CA', self.flags.ca_cert_file, '-CAkey',
self.flags.ca_key_file, '-set_serial', str(self.uid.int), '-out', cert_file_path],
stdin=gen_cert.stdout,
stderr=subprocess.PIPE)
# TODO: Ensure sign_cert success.
sign_cert.communicate(timeout=10)
pki.gen_crt(cert_file_path, self.flags.ca_signing_key_file, self.flags.ca_key_file,
self.flags.ca_cert_file, text_(self.request.host), str(self.uid.int))
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kind of fail to understand the motive behind all the changes.

a) I think they won't work cross platform e.g. will break on MacOS
b) IIUC, all we wanted was SAN extension in the signing step. So you could have only modified sign_csr to incorporate SAN extension when called from server.py

Copy link
Contributor Author

@Benouare Benouare Feb 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need SAN and SHA 256. As your link say :

SHA-1 signed certificates are no longer trusted for TLS.

If i remember well, we must have a base cert/key/ca in sha256, to sign in sha256. It's for this reason that I ve changed a big part of the implementation.
I will take a look on it today.

edit :
If we compare the ligne 371 of server.py and the sign_csr method, there is now, more or less no difference :

[
'openssl', 'x509', '-req',process
 '-days', '365',
 '-CA', self.flags.ca_cert_file,
 '-CAkey',self.flags.ca_key_file, 
'-set_serial', str(self.uid.int), 
'-out', cert_file_path
]

Vs

[
            'openssl', 'x509', '-req', '-sha256',
            '-CA', ca_crt_path,
            '-CAkey', ca_key_path,
            '-set_serial', serial,
            '-days', str(validity_in_days),
            '-extensions', SAN_KEY,
            '-extfile', extension_path,
            '-in', csr_path,
            '-out', crt_path,
]

it only miss the -sha256, and extensions/extfile.

I made some changes, hope it will be better for you.
Cheers

return cert_file_path

def wrap_server(self) -> None:
Expand Down
67 changes: 48 additions & 19 deletions tests/common/test_pki.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,20 @@ def test_run_openssl_command(self, mock_popen: mock.Mock) -> None:
def test_get_ext_config(self) -> None:
self.assertEqual(pki.get_ext_config(None, None), b'')
self.assertEqual(pki.get_ext_config([], None), b'')
self.assertEqual(
pki.get_ext_config(
['proxy.py'],
None),
b'\nsubjectAltName=DNS:proxy.py')
self.assertEqual(
pki.get_ext_config(
None,
'serverAuth'),
b'\nextendedKeyUsage=serverAuth')
self.assertEqual(pki.get_ext_config(
['proxy.py'], None), b'[SAN]\nsubjectAltName=DNS:proxy.py')
self.assertEqual(pki.get_ext_config(None, 'serverAuth'),
b'[PROXY]\nextendedKeyUsage=serverAuth')
self.assertEqual(pki.get_ext_config(['proxy.py'], 'serverAuth'),
b'\nsubjectAltName=DNS:proxy.py\nextendedKeyUsage=serverAuth')
b'[SAN]\nsubjectAltName=DNS:proxy.py[PROXY]\nextendedKeyUsage=serverAuth')
self.assertEqual(pki.get_ext_config(['proxy.py', 'www.proxy.py'], 'serverAuth'),
b'\nsubjectAltName=DNS:proxy.py,DNS:www.proxy.py\nextendedKeyUsage=serverAuth')
b'[SAN]\nsubjectAltName=DNS:proxy.py,DNS:www.proxy.py[PROXY]\nextendedKeyUsage=serverAuth')
self.assertEqual(pki.get_ext_config(
['proxy.py']), b'[SAN]\nsubjectAltName=DNS:proxy.py')
self.assertEqual(pki.get_ext_config(
['proxy.py']), b'[SAN]\nsubjectAltName=DNS:proxy.py')
self.assertEqual(pki.get_ext_config(['proxy.py', 'www.proxy.py']),
b'[SAN]\nsubjectAltName=DNS:proxy.py,DNS:www.proxy.py')

def test_ssl_config_no_ext(self) -> None:
with pki.ssl_config() as (config_path, has_extension):
Expand All @@ -61,7 +61,7 @@ def test_ssl_config(self) -> None:
self.assertEqual(
config.read(),
pki.DEFAULT_CONFIG +
b'\n[PROXY]\nsubjectAltName=DNS:proxy.py')
b'[SAN]\nsubjectAltName=DNS:proxy.py')

def test_extfile_no_ext(self) -> None:
with pki.ext_file() as config_path:
Expand All @@ -73,7 +73,7 @@ def test_extfile(self) -> None:
with open(config_path, 'rb') as config:
self.assertEqual(
config.read(),
b'\nsubjectAltName=DNS:proxy.py')
b'[SAN]\nsubjectAltName=DNS:proxy.py')

def test_gen_private_key(self) -> None:
key_path, nopass_key_path = self._gen_private_key()
Expand All @@ -93,26 +93,55 @@ def test_gen_public_key(self) -> None:
def test_gen_csr(self) -> None:
key_path, nopass_key_path, crt_path = self._gen_public_private_key()
csr_path = os.path.join(tempfile.gettempdir(), 'test_gen_public.csr')
pki.gen_csr(csr_path, key_path, 'password', crt_path)
pki.gen_csr(csr_path, nopass_key_path,
'/C=/ST=/L=/O=/OU=/CN=example.com')
self.assertTrue(os.path.exists(csr_path))
# TODO: Assert CSR is valid for provided crt and key
os.remove(csr_path)
os.remove(crt_path)
os.remove(key_path)
os.remove(nopass_key_path)

def test_sign_csr(self) -> None:
pass
key_path, nopass_key_path, crt_path = self._gen_public_private_key()
key_path, nopass_key_path_signed = self._gen_private_key()

csr_path = os.path.join(tempfile.gettempdir(), 'test_gen_public.csr')
pki.gen_csr(csr_path, nopass_key_path_signed,
'/C=/ST=/L=/O=/OU=/CN=example.com')

csr_path_signed = os.path.join(tempfile.gettempdir(),
'test_gen_public_signed.pem')
pki.sign_csr(csr_path, csr_path_signed, nopass_key_path, crt_path,
str(123), ["example.com"])
self.assertTrue(os.path.exists(csr_path_signed))
os.remove(csr_path)
os.remove(crt_path)
os.remove(key_path)
os.remove(nopass_key_path)
os.remove(csr_path_signed)

def test_gen_web_cert(self) -> None:
key_path, nopass_key_path, crt_path = self._gen_public_private_key()
key_path, nopass_key_path_signed = self._gen_private_key()

out = os.path.join(tempfile.gettempdir(),
'web-csr.pem')
pki.gen_crt(out, nopass_key_path_signed, nopass_key_path,
crt_path, "example.com", str(123))

self.assertTrue(os.path.exists(out))

def _gen_public_private_key(self) -> Tuple[str, str, str]:
key_path, nopass_key_path = self._gen_private_key()
crt_path = os.path.join(tempfile.gettempdir(), 'test_gen_public.crt')
pki.gen_public_key(crt_path, key_path, 'password', '/CN=example.com')
pki.gen_public_key(crt_path, nopass_key_path,
"password", '/CN=example.com')
return (key_path, nopass_key_path, crt_path)

def _gen_private_key(self) -> Tuple[str, str]:
key_path = os.path.join(tempfile.gettempdir(), 'test_gen_private.key')
nopass_key_path = os.path.join(tempfile.gettempdir(), 'test_gen_private_nopass.key')
nopass_key_path = os.path.join(
tempfile.gettempdir(), 'test_gen_private_nopass.key')
pki.gen_private_key(key_path, 'password')
pki.remove_passphrase(key_path, 'password', nopass_key_path)
return (key_path, nopass_key_path)