Skip to content

Commit

Permalink
Merge pull request #236 from Axonius/feature/silence_cert_human
Browse files Browse the repository at this point in the history
4.60.4
  • Loading branch information
Jim Olsen committed Apr 8, 2023
2 parents c745a4c + bf3182f commit c94a567
Show file tree
Hide file tree
Showing 7 changed files with 224 additions and 185 deletions.
292 changes: 153 additions & 139 deletions axonius_api_client/cert_human/ssl_capture.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
# -*- coding: utf-8 -*-
"""Base example for setting up the API client."""
import logging
import types
import typing as t
import sys
from typing import List, Optional

import OpenSSL

# import urllib3
# import urllib3.connectionpool
from urllib3 import connectionpool, poolmanager

LOG: logging.Logger = logging.getLogger(__name__)

INJECT_WITH_PYOPENSSL: bool = True

Expand All @@ -20,190 +18,206 @@


class CaptureHTTPSResponse(connectionpool.HTTPSConnectionPool.ResponseCls):
"""Pass."""
"""Bring the captured_ attributes from the connection object down to the response object."""

def __init__(self, *args, **kwargs):
"""Pass."""
"""Bring the captured_ attributes from the connection object down to the response object."""
super().__init__(*args, **kwargs)
connection = getattr(self, "_connection", None)
for attr in dir(connection):
if attr.startswith("captured_"):
setattr(self, attr, getattr(connection, attr))
self.captured_cert: Optional[OpenSSL.crypto.X509] = getattr(
connection, "captured_cert", None
)
self.captured_chain: List[OpenSSL.crypto.X509] = getattr(connection, "captured_chain", [])
self.captured_cert_errors: List[dict] = getattr(connection, "captured_cert_errors", [])
self.captured_chain_errors: List[dict] = getattr(connection, "captured_chain_errors", [])


class CaptureHTTPSConnection(connectionpool.HTTPSConnectionPool.ConnectionCls):
"""Pass."""

def set_captured_cert(self):
"""Pass."""
logger = LOG.getChild(f"{self.__class__.__name__}")
info = f"certificate from {self.sock}"
# logger.debug(f"Fetching {info}")
"""Capture SSL certificates and save them as attributes on a connection object."""

def capture_cert(self) -> t.Tuple[t.Optional[OpenSSL.crypto.X509], t.List[dict]]:
"""Capture the SSL certificate from the socket while it is open."""
# works with pyopenssl and ssl, python 3.9+ tested
how = "n/a"
if callable(getattr(self.sock, "getpeercert", None)):
method = "self.sock.getpeercert(True)"
# logger.debug(f"Fetching {info} using method {method}")
sock = getattr(self, "sock", None)
getpeercert = getattr(sock, "getpeercert", None)
cert: t.Optional[OpenSSL.crypto.X509] = None
errors: t.List[dict] = []

if callable(getpeercert):
how = f"certificate from {sock}"
method = f"getpeercert={getpeercert}"
try:
cert_bytes: bytes = self.sock.getpeercert(True)
cert_bytes: bytes = getpeercert(True)
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_ASN1, cert_bytes)
except Exception as exc:
self.captured_cert_errors.append({"how": how, "method": method, "exc": exc})
logger.debug(f"Failure fetching {info} using method {method}", exc_info=True)
else:
self.captured_cert: OpenSSL.crypto.X509 = OpenSSL.crypto.load_certificate(
OpenSSL.crypto.FILETYPE_ASN1, cert_bytes
)
# logger.debug(f"Fetched {info} using method {method}: {self.captured_cert}")
return

if not self.captured_cert:
logger.debug(f"Unable to fetch {info}")

def set_captured_chain(self):
"""Pass."""
logger = LOG.getChild(f"{self.__class__.__name__}")
info = f"certificate chain from {self.sock}"
# logger.debug(f"Fetching {info}")

if hasattr(self.sock, "_sslobj"):
errors.append({"how": how, "method": method, "exc": exc})
return cert, errors

def capture_chain(self) -> t.Tuple[t.List[OpenSSL.crypto.X509], t.List[dict]]:
"""Capture the SSL certificate chain from the socket while it is open."""
sock = getattr(self, "sock", None)
sslobj: t.Optional[t.Any] = getattr(sock, "_sslobj", None)
connection: t.Optional[t.Any] = getattr(sock, "connection", None)

get_verified_chain: t.Optional[callable] = getattr(sslobj, "get_verified_chain", None)
get_unverified_chain: t.Optional[callable] = getattr(sslobj, "get_unverified_chain", None)
get_peer_cert_chain: t.Optional[callable] = getattr(connection, "get_peer_cert_chain", None)

info = [
f"socket={sock}",
f"sslobj={sslobj}",
f"connection={connection}",
f"get_verified_chain={get_verified_chain}",
f"get_unverified_chain={get_unverified_chain}",
f"get_peer_cert_chain={get_peer_cert_chain}",
]
info = ", ".join(info)
errors: t.List[dict] = []
chain: t.List[OpenSSL.crypto.X509] = []

if callable(get_verified_chain):
# only available on python 3.10.1+
how = "python 3.10.1+"

if callable(getattr(self.sock._sslobj, "get_verified_chain", None)):
method = "self.sock._sslobj.get_verified_chain()"
# logger.debug(f"Fetching {info} using method {method}")
try:
chain_ssl = self.sock._sslobj.get_verified_chain()
# List[_ssl.Certificate]
except Exception as exc:
self.captured_chain_errors.append({"how": how, "method": method, "exc": exc})
logger.debug(f"Failure fetching {info} using method {method}", exc_info=True)
else:
self.captured_chain: List[OpenSSL.crypto.X509] = [
OpenSSL.crypto.load_certificate(
OpenSSL.crypto.FILETYPE_PEM, x.public_bytes()
)
for x in chain_ssl
]
# logger.debug(f"Fetched {info} using method {method}: {self.captured_chain}")
return

if callable(getattr(self.sock._sslobj, "get_unverified_chain", None)):
method = "self.sock._sslobj.get_unverified_chain()"
# logger.debug(f"Fetching {info} using method {method}")
try:
chain_ssl = self.sock._sslobj.get_unverified_chain()
# List[_ssl.Certificate]
except Exception as exc:
self.captured_chain_errors.append({"how": how, "method": method, "exc": exc})
logger.debug(f"Failure fetching {info} using method {method}", exc_info=True)
else:
self.captured_chain: List[OpenSSL.crypto.X509] = [
OpenSSL.crypto.load_certificate(
OpenSSL.crypto.FILETYPE_PEM, x.public_bytes()
)
for x in chain_ssl
]
# logger.debug(f"Fetched {info} using method {method}: {self.captured_chain}")
return

if hasattr(self.sock, "connection"):
# only available if pyopenssl has been injected into urllib3 via:
how = "import urllib3.contrib.pyopenssl as m; m.inject_into_urllib3()"

if callable(getattr(self.sock.connection, "get_peer_cert_chain", None)):
method = "self.sock.connection.get_peer_cert_chain()"
# logger.debug(f"Fetching {info} using method {method}")
try:
self.captured_chain: List[
OpenSSL.crypto.X509
] = self.sock.connection.get_peer_cert_chain()
except Exception as exc:
self.captured_chain_errors.append({"how": how, "method": method, "exc": exc})
logger.exception(f"Failure fetching {info} using method {method}")
else:
# logger.debug(f"Fetched {info} using method {method}: {self.captured_chain}")
return

if not self.captured_chain:
logger.debug(f"Unable to fetch {info}")
how = f"python 3.10.1+ {info}"
method = f"get_verified_chain={get_verified_chain}"
try:
chain = [
OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, x.public_bytes())
for x in get_verified_chain()
]
except Exception as exc:
errors.append({"how": how, "method": method, "exc": exc})

def connect(self):
"""Pass."""
# force disable cert verification
# self.cert_reqs = "CERT_NONE"
if not chain and callable(get_unverified_chain):
# only available on python 3.10.1+
how = f"python 3.10.1+ {info}"
method = f"get_unverified_chain={get_unverified_chain}"
try:
chain = [
OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, x.public_bytes())
for x in get_unverified_chain()
]
except Exception as exc:
errors.append({"how": how, "method": method, "exc": exc})

if not chain and callable(get_peer_cert_chain):
how = f"import urllib3.contrib.pyopenssl as m; m.inject_into_urllib3() {info}"
method = f"get_peer_cert_chain={get_peer_cert_chain}"
try:
chain = [x for x in self.sock.connection.get_peer_cert_chain()]
except Exception as exc:
errors.append({"how": how, "method": method, "exc": exc})

return chain, errors

def __init__(self, *args, **kwargs):
"""Establish our capture attributes."""
super().__init__(*args, **kwargs)
self.captured_cert: Optional[OpenSSL.crypto.X509] = None
self.captured_chain: List[OpenSSL.crypto.X509] = []
self.captured_cert_errors: List[dict] = []

self.captured_chain: Optional[List[OpenSSL.crypto.X509]] = None
self.captured_chain_errors: List[dict] = []

def connect(self):
"""Connect as usual, but then capture the SSL certificate & chain."""
# how to force disable cert verification
# self.cert_reqs = "CERT_NONE"
super().connect()
self.set_captured_cert()
self.set_captured_chain()
self.captured_cert, self.captured_cert_errors = self.capture_cert()
self.captured_chain, self.captured_chain_errors = self.capture_chain()


class CaptureHTTPSConnectionPool(connectionpool.HTTPSConnectionPool):
"""Pass."""
"""Pool manager used when capturing certificates."""

ConnectionCls = CaptureHTTPSConnection
ResponseCls = CaptureHTTPSResponse


class Patches:
"""Pass."""
"""State tracker to know what has been patched."""

https_pool = poolmanager.pool_classes_by_scheme["https"]
pyopenssl_injected: bool = False


def inject_into_urllib3(with_pyopenssl: bool = INJECT_WITH_PYOPENSSL):
"""Pass."""
def import_pyopenssl() -> t.Optional[types.ModuleType]:
"""Import pyopenssl if it is available."""
try:
import urllib3.contrib.pyopenssl

return urllib3.contrib.pyopenssl
except ImportError:
return None


def import_pyopenssl_and_call(
func: str, reasons: t.Optional[t.List[str]] = None
) -> t.Tuple[bool, t.List[str]]:
"""Import pyopenssl and call the given function."""
performed: bool = False
reasons: t.List[str] = reasons if isinstance(reasons, list) else []
pyopenssl = import_pyopenssl()
if pyopenssl:
_func = getattr(pyopenssl, func, None)
if callable(_func):
try:
ret = _func()
performed = True
reasons.append(f"pyopenssl.{func}() returned: {ret!r}")
except Exception as exc:
reasons.append(f"pyopenssl.{func}() failed: {exc}")
else:
reasons.append(f"pyopenssl.{func}() not callable")
else:
reasons.append("pyopenssl not available")
return performed, reasons


def inject_into_urllib3(with_pyopenssl: bool = INJECT_WITH_PYOPENSSL) -> t.Tuple[bool, t.List[str]]:
"""Apply the cert capturing patch to urllib."""
performed: bool = False
reasons: t.List[str] = []
if with_pyopenssl:
if Patches.pyopenssl_injected:
LOG.debug("pyopenssl already patched into urllib3")
reasons.append("pyopenssl already patched into urllib3")
elif not INJECT_WITH_PYOPENSSL:
LOG.debug("pyopenssl on 3.10.1+ not needed")
reasons.append("pyopenssl on 3.10.1+ not needed")
else:
try:
import urllib3.contrib.pyopenssl

urllib3.contrib.pyopenssl.inject_into_urllib3()
except ImportError:
pass
Patches.pyopenssl_injected = True
LOG.debug("pyopenssl patched into urllib3")
performed, reasons = import_pyopenssl_and_call(
func="inject_into_urllib3", reasons=reasons
)

if poolmanager.pool_classes_by_scheme["https"] == CaptureHTTPSConnectionPool:
LOG.debug(f"HTTPS pool class is already patched with {CaptureHTTPSConnectionPool}")
reasons.append(f"HTTPS pool class is already patched with {CaptureHTTPSConnectionPool}")
else:
Patches.https_pool = poolmanager.pool_classes_by_scheme["https"]
performed = True
poolmanager.pool_classes_by_scheme["https"] = CaptureHTTPSConnectionPool
LOG.debug(f"HTTPS pool class patched with {CaptureHTTPSConnectionPool}")
reasons.append(f"HTTPS pool class patched with {CaptureHTTPSConnectionPool}")
return performed, reasons


def extract_from_urllib3(with_pyopenssl: bool = INJECT_WITH_PYOPENSSL):
"""Pass."""
def extract_from_urllib3(
with_pyopenssl: bool = INJECT_WITH_PYOPENSSL,
) -> t.Tuple[bool, t.List[str]]:
"""Remove the cert capturing patch from urllib."""
performed: bool = False
reasons: t.List[str] = []

if with_pyopenssl:
if Patches.pyopenssl_injected:
try:
import urllib3.contrib.pyopenssl

urllib3.contrib.pyopenssl.extract_from_urllib3()
except ImportError:
pass

Patches.pyopenssl_injected = False
LOG.debug("pyopenssl unpatched from urllib3")
performed, reasons = import_pyopenssl_and_call(
func="extract_from_urllib3", reasons=reasons
)
elif not INJECT_WITH_PYOPENSSL:
LOG.debug("pyopenssl on 3.10.1+ not needed")
reasons.append("pyopenssl on 3.10.1+ not needed")
else:
LOG.debug("pyopenssl is not patched into urllib3")
reasons.append("pyopenssl is not patched into urllib3")

if poolmanager.pool_classes_by_scheme["https"] == CaptureHTTPSConnectionPool:
poolmanager.pool_classes_by_scheme["https"] = Patches.https_pool
LOG.debug(f"HTTPS pool class unpatched to {Patches.https_pool}")
reasons.append(f"HTTPS pool class unpatched to {Patches.https_pool}")
performed = True
else:
LOG.debug(f"HTTPS pool class is not patched with {CaptureHTTPSConnectionPool}")
reasons.append(f"HTTPS pool class is not patched with {CaptureHTTPSConnectionPool}")
return performed, reasons
4 changes: 2 additions & 2 deletions axonius_api_client/cert_human/stores/cert.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ def from_requests_cert(cls, url: str, **kwargs) -> "Cert":
"""Pass."""
url_parsed = UrlParser(url=url, default_scheme="https")
url = url_parsed.url
inject_into_urllib3()
cls.inject_results = inject_into_urllib3()
source = {"url": url, "method": f"{cls.__module__}.{cls.__name__}.from_requests_cert"}
kwargs.setdefault("verify", False)
response: requests.Response = requests.get(url, **kwargs)
Expand All @@ -279,7 +279,7 @@ def from_requests_chain(cls, url: str, **kwargs) -> List["Cert"]:
"""Pass."""
url_parsed = UrlParser(url=url, default_scheme="https")
url = url_parsed.url
inject_into_urllib3()
cls.inject_results = inject_into_urllib3()
source = {"url": url, "method": f"{cls.__module__}.{cls.__name__}.from_requests_chain"}
kwargs.setdefault("verify", False)
response: requests.Response = requests.get(url, **kwargs)
Expand Down

0 comments on commit c94a567

Please sign in to comment.