From dd0ebacdb154ba0b112713490dd30861264b5683 Mon Sep 17 00:00:00 2001 From: Jim Olsen Date: Fri, 7 Apr 2023 11:48:04 -0400 Subject: [PATCH 1/4] qa checkpoint --- axonius_api_client/cert_human/ssl_capture.py | 292 ++++++++++--------- axonius_api_client/cert_human/stores/cert.py | 4 +- axonius_api_client/constants/ctypes.py | 8 +- axonius_api_client/http.py | 50 ++-- axonius_api_client/tools.py | 4 +- 5 files changed, 191 insertions(+), 167 deletions(-) diff --git a/axonius_api_client/cert_human/ssl_capture.py b/axonius_api_client/cert_human/ssl_capture.py index bb3fc613..7fd19435 100644 --- a/axonius_api_client/cert_human/ssl_capture.py +++ b/axonius_api_client/cert_human/ssl_capture.py @@ -1,16 +1,14 @@ # -*- coding: utf-8 -*- """Base example for setting up the API client.""" -import logging +import types +import typing as t import sys from typing import List, Optional import OpenSSL -# import urllib3 -# import urllib3.connectionpool from urllib3 import connectionpool, poolmanager -LOG: logging.Logger = logging.getLogger(__name__) INJECT_WITH_PYOPENSSL: bool = True @@ -20,190 +18,206 @@ class CaptureHTTPSResponse(connectionpool.HTTPSConnectionPool.ResponseCls): - """Pass.""" + """Bring the captured_ attributes from the connection object down to the response object.""" def __init__(self, *args, **kwargs): - """Pass.""" + """Bring the captured_ attributes from the connection object down to the response object.""" super().__init__(*args, **kwargs) connection = getattr(self, "_connection", None) - for attr in dir(connection): - if attr.startswith("captured_"): - setattr(self, attr, getattr(connection, attr)) + self.captured_cert: Optional[OpenSSL.crypto.X509] = getattr( + connection, "captured_cert", None + ) + self.captured_chain: List[OpenSSL.crypto.X509] = getattr(connection, "captured_chain", []) + self.captured_cert_errors: List[dict] = getattr(connection, "captured_cert_errors", []) + self.captured_chain_errors: List[dict] = getattr(connection, "captured_chain_errors", []) class CaptureHTTPSConnection(connectionpool.HTTPSConnectionPool.ConnectionCls): - """Pass.""" - - def set_captured_cert(self): - """Pass.""" - logger = LOG.getChild(f"{self.__class__.__name__}") - info = f"certificate from {self.sock}" - # logger.debug(f"Fetching {info}") + """Capture SSL certificates and save them as attributes on a connection object.""" + def capture_cert(self) -> t.Tuple[t.Optional[OpenSSL.crypto.X509], t.List[dict]]: + """Capture the SSL certificate from the socket while it is open.""" # works with pyopenssl and ssl, python 3.9+ tested - how = "n/a" - if callable(getattr(self.sock, "getpeercert", None)): - method = "self.sock.getpeercert(True)" - # logger.debug(f"Fetching {info} using method {method}") + sock = getattr(self, "sock", None) + getpeercert = getattr(sock, "getpeercert", None) + cert: t.Optional[OpenSSL.crypto.X509] = None + errors: t.List[dict] = [] + + if callable(getpeercert): + how = f"certificate from {sock}" + method = f"getpeercert={getpeercert}" try: - cert_bytes: bytes = self.sock.getpeercert(True) + cert_bytes: bytes = getpeercert(True) + cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_ASN1, cert_bytes) except Exception as exc: - self.captured_cert_errors.append({"how": how, "method": method, "exc": exc}) - logger.debug(f"Failure fetching {info} using method {method}", exc_info=True) - else: - self.captured_cert: OpenSSL.crypto.X509 = OpenSSL.crypto.load_certificate( - OpenSSL.crypto.FILETYPE_ASN1, cert_bytes - ) - # logger.debug(f"Fetched {info} using method {method}: {self.captured_cert}") - return - - if not self.captured_cert: - logger.debug(f"Unable to fetch {info}") - - def set_captured_chain(self): - """Pass.""" - logger = LOG.getChild(f"{self.__class__.__name__}") - info = f"certificate chain from {self.sock}" - # logger.debug(f"Fetching {info}") - - if hasattr(self.sock, "_sslobj"): + errors.append({"how": how, "method": method, "exc": exc}) + return cert, errors + + def capture_chain(self) -> t.Tuple[t.List[OpenSSL.crypto.X509], t.List[dict]]: + """Capture the SSL certificate chain from the socket while it is open.""" + sock = getattr(self, "sock", None) + sslobj: t.Optional[t.Any] = getattr(sock, "_sslobj", None) + connection: t.Optional[t.Any] = getattr(sock, "connection", None) + + get_verified_chain: t.Optional[callable] = getattr(sslobj, "get_verified_chain", None) + get_unverified_chain: t.Optional[callable] = getattr(sslobj, "get_unverified_chain", None) + get_peer_cert_chain: t.Optional[callable] = getattr(connection, "get_peer_cert_chain", None) + + info = [ + f"socket={sock}", + f"sslobj={sslobj}", + f"connection={connection}", + f"get_verified_chain={get_verified_chain}", + f"get_unverified_chain={get_unverified_chain}", + f"get_peer_cert_chain={get_peer_cert_chain}", + ] + info = ", ".join(info) + errors: t.List[dict] = [] + chain: t.List[OpenSSL.crypto.X509] = [] + + if callable(get_verified_chain): # only available on python 3.10.1+ - how = "python 3.10.1+" - - if callable(getattr(self.sock._sslobj, "get_verified_chain", None)): - method = "self.sock._sslobj.get_verified_chain()" - # logger.debug(f"Fetching {info} using method {method}") - try: - chain_ssl = self.sock._sslobj.get_verified_chain() - # List[_ssl.Certificate] - except Exception as exc: - self.captured_chain_errors.append({"how": how, "method": method, "exc": exc}) - logger.debug(f"Failure fetching {info} using method {method}", exc_info=True) - else: - self.captured_chain: List[OpenSSL.crypto.X509] = [ - OpenSSL.crypto.load_certificate( - OpenSSL.crypto.FILETYPE_PEM, x.public_bytes() - ) - for x in chain_ssl - ] - # logger.debug(f"Fetched {info} using method {method}: {self.captured_chain}") - return - - if callable(getattr(self.sock._sslobj, "get_unverified_chain", None)): - method = "self.sock._sslobj.get_unverified_chain()" - # logger.debug(f"Fetching {info} using method {method}") - try: - chain_ssl = self.sock._sslobj.get_unverified_chain() - # List[_ssl.Certificate] - except Exception as exc: - self.captured_chain_errors.append({"how": how, "method": method, "exc": exc}) - logger.debug(f"Failure fetching {info} using method {method}", exc_info=True) - else: - self.captured_chain: List[OpenSSL.crypto.X509] = [ - OpenSSL.crypto.load_certificate( - OpenSSL.crypto.FILETYPE_PEM, x.public_bytes() - ) - for x in chain_ssl - ] - # logger.debug(f"Fetched {info} using method {method}: {self.captured_chain}") - return - - if hasattr(self.sock, "connection"): - # only available if pyopenssl has been injected into urllib3 via: - how = "import urllib3.contrib.pyopenssl as m; m.inject_into_urllib3()" - - if callable(getattr(self.sock.connection, "get_peer_cert_chain", None)): - method = "self.sock.connection.get_peer_cert_chain()" - # logger.debug(f"Fetching {info} using method {method}") - try: - self.captured_chain: List[ - OpenSSL.crypto.X509 - ] = self.sock.connection.get_peer_cert_chain() - except Exception as exc: - self.captured_chain_errors.append({"how": how, "method": method, "exc": exc}) - logger.exception(f"Failure fetching {info} using method {method}") - else: - # logger.debug(f"Fetched {info} using method {method}: {self.captured_chain}") - return - - if not self.captured_chain: - logger.debug(f"Unable to fetch {info}") + how = f"python 3.10.1+ {info}" + method = f"get_verified_chain={get_verified_chain}" + try: + chain = [ + OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, x.public_bytes()) + for x in get_verified_chain() + ] + except Exception as exc: + errors.append({"how": how, "method": method, "exc": exc}) - def connect(self): - """Pass.""" - # force disable cert verification - # self.cert_reqs = "CERT_NONE" + if not chain and callable(get_unverified_chain): + # only available on python 3.10.1+ + how = f"python 3.10.1+ {info}" + method = f"get_unverified_chain={get_unverified_chain}" + try: + chain = [ + OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, x.public_bytes()) + for x in get_unverified_chain() + ] + except Exception as exc: + errors.append({"how": how, "method": method, "exc": exc}) + if not chain and callable(get_peer_cert_chain): + how = f"import urllib3.contrib.pyopenssl as m; m.inject_into_urllib3() {info}" + method = f"get_peer_cert_chain={get_peer_cert_chain}" + try: + chain = [x for x in self.sock.connection.get_peer_cert_chain()] + except Exception as exc: + errors.append({"how": how, "method": method, "exc": exc}) + + return chain, errors + + def __init__(self, *args, **kwargs): + """Establish our capture attributes.""" + super().__init__(*args, **kwargs) self.captured_cert: Optional[OpenSSL.crypto.X509] = None + self.captured_chain: List[OpenSSL.crypto.X509] = [] self.captured_cert_errors: List[dict] = [] - - self.captured_chain: Optional[List[OpenSSL.crypto.X509]] = None self.captured_chain_errors: List[dict] = [] + def connect(self): + """Connect as usual, but then capture the SSL certificate & chain.""" + # how to force disable cert verification + # self.cert_reqs = "CERT_NONE" super().connect() - self.set_captured_cert() - self.set_captured_chain() + self.captured_cert, self.captured_cert_errors = self.capture_cert() + self.captured_chain, self.captured_chain_errors = self.capture_chain() class CaptureHTTPSConnectionPool(connectionpool.HTTPSConnectionPool): - """Pass.""" + """Pool manager used when capturing certificates.""" ConnectionCls = CaptureHTTPSConnection ResponseCls = CaptureHTTPSResponse class Patches: - """Pass.""" + """State tracker to know what has been patched.""" https_pool = poolmanager.pool_classes_by_scheme["https"] pyopenssl_injected: bool = False -def inject_into_urllib3(with_pyopenssl: bool = INJECT_WITH_PYOPENSSL): - """Pass.""" +def import_pyopenssl() -> t.Optional[types.ModuleType]: + """Import pyopenssl if it is available.""" + try: + import urllib3.contrib.pyopenssl + + return urllib3.contrib.pyopenssl + except ImportError: + return None + + +def import_pyopenssl_and_call( + func: str, reasons: t.Optional[t.List[str]] = None +) -> t.Tuple[bool, t.List[str]]: + """Import pyopenssl and call the given function.""" + performed: bool = False + reasons: t.List[str] = reasons if isinstance(reasons, list) else [] + pyopenssl = import_pyopenssl() + if pyopenssl: + _func = getattr(pyopenssl, func, None) + if callable(_func): + try: + ret = _func() + performed = True + reasons.append(f"pyopenssl.{func}() returned: {ret!r}") + except Exception as exc: + reasons.append(f"pyopenssl.{func}() failed: {exc}") + else: + reasons.append(f"pyopenssl.{func}() not callable") + else: + reasons.append("pyopenssl not available") + return performed, reasons + + +def inject_into_urllib3(with_pyopenssl: bool = INJECT_WITH_PYOPENSSL) -> t.Tuple[bool, t.List[str]]: + """Apply the cert capturing patch to urllib.""" + performed: bool = False + reasons: t.List[str] = [] if with_pyopenssl: if Patches.pyopenssl_injected: - LOG.debug("pyopenssl already patched into urllib3") + reasons.append("pyopenssl already patched into urllib3") elif not INJECT_WITH_PYOPENSSL: - LOG.debug("pyopenssl on 3.10.1+ not needed") + reasons.append("pyopenssl on 3.10.1+ not needed") else: - try: - import urllib3.contrib.pyopenssl - - urllib3.contrib.pyopenssl.inject_into_urllib3() - except ImportError: - pass - Patches.pyopenssl_injected = True - LOG.debug("pyopenssl patched into urllib3") + performed, reasons = import_pyopenssl_and_call( + func="inject_into_urllib3", reasons=reasons + ) if poolmanager.pool_classes_by_scheme["https"] == CaptureHTTPSConnectionPool: - LOG.debug(f"HTTPS pool class is already patched with {CaptureHTTPSConnectionPool}") + reasons.append(f"HTTPS pool class is already patched with {CaptureHTTPSConnectionPool}") else: Patches.https_pool = poolmanager.pool_classes_by_scheme["https"] + performed = True poolmanager.pool_classes_by_scheme["https"] = CaptureHTTPSConnectionPool - LOG.debug(f"HTTPS pool class patched with {CaptureHTTPSConnectionPool}") + reasons.append(f"HTTPS pool class patched with {CaptureHTTPSConnectionPool}") + return performed, reasons -def extract_from_urllib3(with_pyopenssl: bool = INJECT_WITH_PYOPENSSL): - """Pass.""" +def extract_from_urllib3( + with_pyopenssl: bool = INJECT_WITH_PYOPENSSL, +) -> t.Tuple[bool, t.List[str]]: + """Remove the cert capturing patch from urllib.""" + performed: bool = False + reasons: t.List[str] = [] + if with_pyopenssl: if Patches.pyopenssl_injected: - try: - import urllib3.contrib.pyopenssl - - urllib3.contrib.pyopenssl.extract_from_urllib3() - except ImportError: - pass - - Patches.pyopenssl_injected = False - LOG.debug("pyopenssl unpatched from urllib3") + performed, reasons = import_pyopenssl_and_call( + func="extract_from_urllib3", reasons=reasons + ) elif not INJECT_WITH_PYOPENSSL: - LOG.debug("pyopenssl on 3.10.1+ not needed") + reasons.append("pyopenssl on 3.10.1+ not needed") else: - LOG.debug("pyopenssl is not patched into urllib3") + reasons.append("pyopenssl is not patched into urllib3") if poolmanager.pool_classes_by_scheme["https"] == CaptureHTTPSConnectionPool: poolmanager.pool_classes_by_scheme["https"] = Patches.https_pool - LOG.debug(f"HTTPS pool class unpatched to {Patches.https_pool}") + reasons.append(f"HTTPS pool class unpatched to {Patches.https_pool}") + performed = True else: - LOG.debug(f"HTTPS pool class is not patched with {CaptureHTTPSConnectionPool}") + reasons.append(f"HTTPS pool class is not patched with {CaptureHTTPSConnectionPool}") + return performed, reasons diff --git a/axonius_api_client/cert_human/stores/cert.py b/axonius_api_client/cert_human/stores/cert.py index beb621f7..1f6db9f2 100644 --- a/axonius_api_client/cert_human/stores/cert.py +++ b/axonius_api_client/cert_human/stores/cert.py @@ -265,7 +265,7 @@ def from_requests_cert(cls, url: str, **kwargs) -> "Cert": """Pass.""" url_parsed = UrlParser(url=url, default_scheme="https") url = url_parsed.url - inject_into_urllib3() + cls.inject_results = inject_into_urllib3() source = {"url": url, "method": f"{cls.__module__}.{cls.__name__}.from_requests_cert"} kwargs.setdefault("verify", False) response: requests.Response = requests.get(url, **kwargs) @@ -279,7 +279,7 @@ def from_requests_chain(cls, url: str, **kwargs) -> List["Cert"]: """Pass.""" url_parsed = UrlParser(url=url, default_scheme="https") url = url_parsed.url - inject_into_urllib3() + cls.inject_results = inject_into_urllib3() source = {"url": url, "method": f"{cls.__module__}.{cls.__name__}.from_requests_chain"} kwargs.setdefault("verify", False) response: requests.Response = requests.get(url, **kwargs) diff --git a/axonius_api_client/constants/ctypes.py b/axonius_api_client/constants/ctypes.py index 5edb36b5..fa4e2608 100644 --- a/axonius_api_client/constants/ctypes.py +++ b/axonius_api_client/constants/ctypes.py @@ -5,10 +5,10 @@ PathLike: t.TypeVar = t.TypeVar("PathLike", pathlib.Path, str, bytes) PatternLike: t.TypeVar = t.TypeVar("PatternLike", t.Pattern, str, bytes) -PatternLikeListy: t.TypeVar = t.TypeVar("PatternLikeListy", PatternLike, t.List[PatternLike]) -ComplexLike: t.Tuple[t.Type] = (dict, list, tuple) -SimpleLike: t.Tuple[t.Type] = (str, int, bool, float) -Refreshables = t.Optional[t.Union[str, bytes, int, float, bool]] +PatternLikeListy: t.Type = t.Union[PatternLike, t.List[PatternLike]] +ComplexLike: t.Tuple[t.Type, ...] = (dict, list, tuple) +SimpleLike: t.Tuple[t.Type, ...] = (str, int, bool, float) +Refreshables: t.Type = t.Optional[t.Union[str, bytes, int, float, bool]] class FolderBase: diff --git a/axonius_api_client/http.py b/axonius_api_client/http.py index 6ca2f8a9..309e4805 100644 --- a/axonius_api_client/http.py +++ b/axonius_api_client/http.py @@ -1,14 +1,20 @@ # -*- coding: utf-8 -*- """HTTP client.""" +import typing as t import logging import pathlib import warnings -from typing import Any, List, Optional, Pattern, TypeVar, Union +from typing import Any, List, Optional, Pattern, Union import requests +import requests.cookies +import requests.structures +import urllib3 +import urllib3.exceptions from . import cert_human from .constants.api import TIMEOUT_CONNECT, TIMEOUT_RESPONSE +from .constants.ctypes import PatternLikeListy from .constants.logs import LOG_LEVEL_HTTP, MAX_BODY_LEN, REQUEST_ATTR_MAP, RESPONSE_ATTR_MAP from .exceptions import HttpError from .logs import get_obj_log, set_log_level @@ -17,12 +23,11 @@ from .tools import coerce_str, join_url, json_log, listify, path_read, tilde_re from .version import __version__ -cert_human.ssl_capture.inject_into_urllib3() -T_Cookies: TypeVar = Union[dict, requests.cookies.RequestsCookieJar] -T_Headers: TypeVar = Union[dict, requests.structures.CaseInsensitiveDict] -T_StrPattern: TypeVar = Union[str, Pattern] +INJECT_RESULTS: t.Tuple[bool, t.List[str]] = cert_human.ssl_capture.inject_into_urllib3() +T_Cookies: t.Type = t.Union[dict, requests.cookies.RequestsCookieJar] +T_Headers: t.Type = t.Union[dict, requests.structures.CaseInsensitiveDict] -HIDE_HEADERS: str = [ +HIDE_HEADERS: t.List[str] = [ "~cookie", "~auth", "~token", @@ -35,7 +40,7 @@ class Http: - """HTTP client that wraps around around :obj:`requests.Session`.""" + """HTTP client that wraps around :obj:`requests.Session`.""" HIDE_STR = "*********" @@ -83,8 +88,8 @@ def __init__( self.url: str = self.URLPARSED.url """URL to connect to""" - self.LOG_HIDE_HEADERS: List[T_StrPattern] = tilde_re( - kwargs.get("log_hide_headers", HIDE_HEADERS) + self.LOG_HIDE_HEADERS: PatternLikeListy = tilde_re( + listify(kwargs.get("log_hide_headers", HIDE_HEADERS)) ) """Headers to hide when logging.""" @@ -160,7 +165,7 @@ def __init__( ) self.log_request_attrs: Optional[List[str]] = self.LOG_REQUEST_ATTRS self.log_response_attrs: Optional[List[str]] = self.LOG_RESPONSE_ATTRS - + self.session: requests.Session = requests.Session() self.set_urllib_warnings() self.set_urllib_log() self.new_session() @@ -215,9 +220,7 @@ def set_session_cookies(self): def set_session_proxies(self): """Pass.""" - self.session.proxies = {} - self.session.proxies["https"] = self.HTTPS_PROXY - self.session.proxies["http"] = self.HTTP_PROXY + self.session.proxies = {"https": self.HTTPS_PROXY, "http": self.HTTP_PROXY} def set_session_verify(self): """Pass.""" @@ -259,9 +262,9 @@ def set_session_cert(self): def set_urllib_warnings(self): """Pass.""" if self.CERT_WARN is True: - warnings.simplefilter("once", requests.urllib3.exceptions.InsecureRequestWarning) + warnings.simplefilter("once", urllib3.exceptions.InsecureRequestWarning) elif self.CERT_WARN is False: - warnings.simplefilter("ignore", requests.urllib3.exceptions.InsecureRequestWarning) + warnings.simplefilter("ignore", urllib3.exceptions.InsecureRequestWarning) def set_urllib_log(self): """Pass.""" @@ -295,7 +298,7 @@ def __call__( **kwargs: overrides for object attributes * connect_timeout: seconds to wait for connection to open for this request - * response_timeout: seconds to wait for for response for this request + * response_timeout: seconds to wait for response for this request * proxies: proxies for this request * verify: verification of cert for this request * cert: client cert to offer for this request @@ -305,6 +308,7 @@ def __call__( """ def log_if_headers(msg: str): + """Pass.""" if "headers" in self.log_request_attrs: self.LOG.debug(msg) @@ -389,12 +393,14 @@ def _do_log_request(self, request): request (:obj:`requests.PreparedRequest`): prepared request to log attrs/body of """ if self.log_request_attrs: + cookies = getattr(request, "_cookies", {}) + headers = getattr(request, "headers", {}) lattrs = ", ".join(self.log_request_attrs).format( url=request.url, body_size=len(request.body or ""), method=request.method, - headers=self._clean_headers(headers=request.headers), - cookies=self._clean_headers(headers=request._cookies), + headers=self._clean_headers(headers=headers), + cookies=self._clean_headers(headers=cookies), ) self.LOG.debug(f"REQUEST ATTRS: {lattrs}") @@ -409,6 +415,7 @@ def _clean_headers(self, headers: dict) -> dict: """ def getval(key, value): + """Pass.""" skey = str(key).lower() for check in self.LOG_HIDE_HEADERS: if (isinstance(check, str) and check.lower() == skey) or ( @@ -417,6 +424,7 @@ def getval(key, value): return self.HIDE_STR return value + # noinspection PyBroadException try: return {k: getval(k, v) for k, v in headers.items()} except Exception: @@ -509,15 +517,17 @@ def _set_log_attrs(self, attr_map: dict, attr_type: str, value: Union[str, List[ if entry not in log_attrs: log_attrs.append(entry) - def log_body(self, body: Any, body_type: str, src=None) -> str: + def log_body(self, body: Any, body_type: str, src: t.Optional[t.Any] = None) -> str: """Get a string for logging a request or response body. Args: body: content to log body_type: 'request' or 'response' + src: source of the body + """ body = json_log(obj=coerce_str(value=body), trim=self.LOG_BODY_MAX_LEN) - return f"{body_type} BODY:\n{body}" + return f"{body_type} BODY from {src}:\n{body}" def _init(self): """Pass.""" diff --git a/axonius_api_client/tools.py b/axonius_api_client/tools.py index ee7bcd82..bccfcc14 100644 --- a/axonius_api_client/tools.py +++ b/axonius_api_client/tools.py @@ -24,7 +24,7 @@ from . import INIT_DOTENV, PACKAGE_FILE, PACKAGE_ROOT, VERSION from .constants.api import GUI_PAGE_SIZES, FolderDefaults -from .constants.ctypes import PathLike, PatternLike +from .constants.ctypes import PathLike, PatternLike, PatternLikeListy from .constants.general import ( DAYS_MAP, DEBUG_ARGS, @@ -2045,7 +2045,7 @@ def extract_kvs_csv( return ret -def tilde_re(value: t.Any) -> t.Optional[t.Union[str, t.Pattern]]: +def tilde_re(value: t.Any) -> PatternLikeListy: """Pass.""" if isinstance(value, (list, tuple)): return [tilde_re(x) for x in value] From a0bef4efc576d225e134494d50a34fe304978e9a Mon Sep 17 00:00:00 2001 From: Jim Olsen Date: Fri, 7 Apr 2023 11:48:34 -0400 Subject: [PATCH 2/4] version bump --- axonius_api_client/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/axonius_api_client/version.py b/axonius_api_client/version.py index 724f0ee1..5e7779e3 100644 --- a/axonius_api_client/version.py +++ b/axonius_api_client/version.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- """Version information for this package.""" -__version__ = "4.60.3" +__version__ = "4.60.4" VERSION: str = __version__ """Version of package.""" From 0a6c9d780937e03911c869f3c8fd866d33dc7f74 Mon Sep 17 00:00:00 2001 From: Jim Olsen Date: Fri, 7 Apr 2023 20:19:19 -0400 Subject: [PATCH 3/4] windows qa fix --- axonius_api_client/connect.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/axonius_api_client/connect.py b/axonius_api_client/connect.py index 509ce94c..efacadfd 100644 --- a/axonius_api_client/connect.py +++ b/axonius_api_client/connect.py @@ -274,8 +274,11 @@ def __init__( if log_file_rotate: LOG.info("Forcing file logs to rotate") self.HANDLER_FILE.flush() - self.HANDLER_FILE.doRollover() - LOG.info("Forced file logs to rotate") + try: + self.HANDLER_FILE.doRollover() + LOG.info("Forced file logs to rotate") + except Exception as exc: + LOG.exception("Failed to force file logs to rotate: %s", exc) self.HTTP_ARGS: dict = { "url": url, From bf3182f48113b9a56b2f58471735d693f4b0925a Mon Sep 17 00:00:00 2001 From: Jim Olsen Date: Fri, 7 Apr 2023 21:13:51 -0400 Subject: [PATCH 4/4] more windows qa fixes logging rollover errors wrapped properly now --- axonius_api_client/connect.py | 52 +++++++++++++++++++++-------------- 1 file changed, 32 insertions(+), 20 deletions(-) diff --git a/axonius_api_client/connect.py b/axonius_api_client/connect.py index efacadfd..3e389c5d 100644 --- a/axonius_api_client/connect.py +++ b/axonius_api_client/connect.py @@ -1,6 +1,9 @@ # -*- coding: utf-8 -*- """Easy all-in-one connection handler.""" +import typing as t +import types import logging +import logging.handlers import pathlib import re from typing import List, Optional, Union @@ -53,6 +56,7 @@ from .setup_env import get_env_ax from .tools import coerce_bool, coerce_int, json_dump, json_reload, sysinfo from .version import __version__ as VERSION +from . import tools class Connect: @@ -103,6 +107,9 @@ class Connect: """ + TOOLS: types.ModuleType = tools + """Tools module.""" + def __init__( self, url: str, @@ -249,10 +256,10 @@ def __init__( self.STARTED: bool = False """track if :meth:`start` has been called""" - self.HANDLER_FILE: logging.handlers.RotatingFileHandler = None + self.HANDLER_FILE: t.Optional[logging.handlers.RotatingFileHandler] = None """file logging handler""" - self.HANDLER_CON: logging.StreamHandler = None + self.HANDLER_CON: t.Optional[logging.StreamHandler] = None """console logging handler""" HideFormatter.HIDE_ENABLED = self.LOG_HIDE_SECRETS @@ -271,14 +278,8 @@ def __init__( max_files=self.LOG_FILE_MAX_FILES, fmt=self.LOG_FILE_FMT, ) - if log_file_rotate: - LOG.info("Forcing file logs to rotate") - self.HANDLER_FILE.flush() - try: - self.HANDLER_FILE.doRollover() - LOG.info("Forced file logs to rotate") - except Exception as exc: - LOG.exception("Failed to force file logs to rotate: %s", exc) + if log_file_rotate: + self.do_rollover() self.HTTP_ARGS: dict = { "url": url, @@ -325,6 +326,17 @@ def __init__( self._init() + def do_rollover(self): + """Rollover log file.""" + if self.HANDLER_FILE: + LOG.info("Forcing file logs to rotate") + self.HANDLER_FILE.flush() + try: + self.HANDLER_FILE.doRollover() + LOG.info("Forced file logs to rotate") + except Exception as exc: + LOG.exception("Failed to force file logs to rotate: %s", exc) + def start(self): """Connect to and authenticate with Axonius.""" if not self.STARTED: @@ -342,16 +354,16 @@ def start(self): if isinstance(exc, requests.ConnectTimeout): timeout = self.HTTP.CONNECT_TIMEOUT msg = f"{pre}: connection timed out after {timeout} seconds" - cnxexc = ConnectError(msg) + connect_exc = ConnectError(msg) elif isinstance(exc, requests.ConnectionError): reason = self._get_exc_reason(exc=exc) - cnxexc = ConnectError(f"{pre}: {reason}") + connect_exc = ConnectError(f"{pre}: {reason}") elif isinstance(exc, InvalidCredentials): - cnxexc = ConnectError(f"{pre}: Invalid Credentials supplied") + connect_exc = ConnectError(f"{pre}: Invalid Credentials supplied") else: - cnxexc = ConnectError(f"{pre}: {exc}") - cnxexc.exc = exc - raise cnxexc + connect_exc = ConnectError(f"{pre}: {exc}") + connect_exc.exc = exc + raise connect_exc self.STARTED = True LOG.info(str(self)) @@ -372,7 +384,7 @@ def users(self) -> Users: return self._users @property - def vulnerabilities(self) -> Users: + def vulnerabilities(self) -> Vulnerabilities: """Work with user assets.""" self.start() if not hasattr(self, "_vulnerabilities"): @@ -572,9 +584,9 @@ def api_keys(self) -> dict: @classmethod def _get_exc_reason(cls, exc: Exception) -> str: - """Trim exceptions down to a more user friendly display. + """Trim exceptions down to a more user-friendly display. - Uses :attr:`REASON_RES` to do regex substituions. + Uses :attr:`REASON_RES` to do regex substitutions. """ reason = str(exc) for reason_re in cls.REASON_RES: @@ -587,7 +599,7 @@ def jdump(obj, **kwargs): # pragma: no cover """JSON dump utility.""" print(json_reload(obj, **kwargs)) - REASON_RES: List[str] = [ + REASON_RES: List[t.Pattern] = [ re.compile(r".*?object at.*?\>\: ([a-zA-Z0-9\]\[: ]+)"), re.compile(r".*?\] (.*) "), ]