Skip to content

Commit

Permalink
Add WSManFaultError
Browse files Browse the repository at this point in the history
Adds the new exception WSManFaultError which inherits from WinRMError
and will be raised when receiving a WSManFault from the server. This new
exception type contains detailed information that could be relevant to
the caller when trying to handle the specific exception.
  • Loading branch information
jborean93 committed Jun 6, 2024
1 parent fbb05e8 commit 0e6fbe5
Show file tree
Hide file tree
Showing 4 changed files with 368 additions and 28 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
### Version 0.5.0
- Dropped Python 2.7, 3.6, and 3.7 support, minimum supported version is 3.8
- Migrate to PEP 517 compliant build with a `pyproject.toml` file
- Added type annotation
- Added `WSManFaultError` which contains WSManFault specific information when receiving a 500 WSMan fault response
- This contains pre-parsed values like the code, subcode, wsman fault code, wmi error code, and raw response
- It can be used by the caller to implement fallback behaviour based on specific error codes

### Version 0.4.3
- Fix invalid regex escape sequences.
Expand Down
58 changes: 58 additions & 0 deletions winrm/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,64 @@ class WinRMError(Exception):
code = 500


class WSManFaultError(WinRMError):
"""WSMan Fault Error.
Exception that is raised when receiving a WSMan fault message. It
contains the raw response as well as the fault details parsed from the
response.
The wsman_fault_code is returned by the Microsoft WSMan server rather than
the WSMan protocol error code strings. The wmierror_code can contain more
fatal service error codes returned as a MSFT_WmiError object, for example
quota violations.
@param int code: The HTTP status code of the response.
@param str message: The error message.
@param str response: The raw WSMan response text.
@param str reason: The WSMan fault reason.
@param string fault_code: The WSMan fault code.
@param string fault_subcode: The WSMan fault subcode.
@param int wsman_fault_code: The MS WSManFault specific code.
@param int wmierror_code: The MS WMI error code.
"""

def __init__(
self,
code: int,
message: str,
response: str,
reason: str,
fault_code: str | None = None,
fault_subcode: str | None = None,
wsman_fault_code: int | None = None,
wmierror_code: int | None = None,
) -> None:
self.code = code
self.response = response
self.fault_code = fault_code
self.fault_subcode = fault_subcode
self.reason = reason
self.wsman_fault_code = wsman_fault_code
self.wmierror_code = wmierror_code

# Using the dict repr is for backwards compatibility.
fault_data = {
"transport_message": message,
"http_status_code": code,
}
if wsman_fault_code is not None:
fault_data["wsmanfault_code"] = wsman_fault_code

if fault_code is not None:
fault_data["fault_code"] = fault_code

if fault_subcode is not None:
fault_data["fault_subcode"] = fault_subcode

super().__init__("{0} (extended fault data: {1})".format(reason, fault_data))


class WinRMTransportError(Exception):
"""WinRM errors specific to transport-level problems (unexpected HTTP error codes, etc)"""

Expand Down
78 changes: 50 additions & 28 deletions winrm/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,19 @@

import xmltodict

from winrm.exceptions import WinRMError, WinRMOperationTimeoutError, WinRMTransportError
from winrm.exceptions import (
WinRMError,
WinRMOperationTimeoutError,
WinRMTransportError,
WSManFaultError,
)
from winrm.transport import Transport

xmlns = {
"soapenv": "http://www.w3.org/2003/05/soap-envelope",
"soapaddr": "http://schemas.xmlsoap.org/ws/2004/08/addressing",
"wsmanfault": "http://schemas.microsoft.com/wbem/wsman/1/wsmanfault",
"wmierror": "http://schemas.dmtf.org/wbem/wscim/1/cim-schema/2/MSFT_WmiError",
}


Expand Down Expand Up @@ -247,33 +253,49 @@ def send_message(self, message: str) -> bytes:
raise ex

fault = root.find("soapenv:Body/soapenv:Fault", xmlns)
if fault is not None:
fault_data = dict(transport_message=ex.message, http_status_code=ex.code)
wsmanfault_code = fault.find("soapenv:Detail/wsmanfault:WSManFault[@Code]", xmlns)
if wsmanfault_code is not None:
fault_data["wsmanfault_code"] = wsmanfault_code.get("Code")
# convert receive timeout code to WinRMOperationTimeoutError
if fault_data["wsmanfault_code"] == "2150858793":
# TODO: this fault code is specific to the Receive operation; convert all op timeouts?
raise WinRMOperationTimeoutError()

fault_code = fault.find("soapenv:Code/soapenv:Value", xmlns)
if fault_code is not None:
fault_data["fault_code"] = fault_code.text

fault_subcode = fault.find("soapenv:Code/soapenv:Subcode/soapenv:Value", xmlns)
if fault_subcode is not None:
fault_data["fault_subcode"] = fault_subcode.text

error_message_node = fault.find("soapenv:Reason/soapenv:Text", xmlns)
if error_message_node is not None:
error_message = error_message_node.text
else:
error_message = "(no error message in fault)"

raise WinRMError("{0} (extended fault data: {1})".format(error_message, fault_data))

raise
if fault is None:
raise

wsmanfault_code_raw = fault.find("soapenv:Detail/wsmanfault:WSManFault[@Code]", xmlns)
wsmanfault_code: int | None = None
if wsmanfault_code_raw is not None:
wsmanfault_code = int(wsmanfault_code_raw.attrib["Code"])

# convert receive timeout code to WinRMOperationTimeoutError
if wsmanfault_code == 2150858793:
# TODO: this fault code is specific to the Receive operation; convert all op timeouts?
raise WinRMOperationTimeoutError()

fault_code_raw = fault.find("soapenv:Code/soapenv:Value", xmlns)
fault_code: str | None = None
if fault_code_raw is not None and fault_code_raw.text:
fault_code = fault_code_raw.text

fault_subcode_raw = fault.find("soapenv:Code/soapenv:Subcode/soapenv:Value", xmlns)
fault_subcode: str | None = None
if fault_subcode_raw is not None and fault_subcode_raw.text:
fault_subcode = fault_subcode_raw.text

error_message_node = fault.find("soapenv:Reason/soapenv:Text", xmlns)
reason: str | None = None
if error_message_node is not None:
reason = error_message_node.text

wmi_error_code_raw = fault.find("soapenv:Detail/wmierror:MSFT_WmiError/wmierror:error_Code", xmlns)
wmi_error_code: int | None = None
if wmi_error_code_raw is not None and wmi_error_code_raw.text:
wmi_error_code = int(wmi_error_code_raw.text)

raise WSManFaultError(
code=ex.code,
message=ex.message,
response=ex.response_text,
reason=reason or "(no error message in fault)",
fault_code=fault_code,
fault_subcode=fault_subcode,
wsman_fault_code=wsmanfault_code,
wmierror_code=wmi_error_code,
)

def close_shell(self, shell_id: str, close_session: bool = True) -> None:
"""
Expand Down
Loading

0 comments on commit 0e6fbe5

Please sign in to comment.