Skip to content

Commit

Permalink
Merge pull request #262 from badcure/mcassaniti-proxy_v2
Browse files Browse the repository at this point in the history
mcassaniti's proxy support
  • Loading branch information
badcure committed Jun 28, 2019
2 parents e7a8502 + 3c7ae55 commit ff648dc
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 65 deletions.
4 changes: 2 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,5 @@ __pycache__
*~

/winrm/tests/config.json


.pytest_cache
venv
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# Changelog

### Version 0.3.1
- Ensure server_cert_validation=ignore supersedes ca_trust_path/env overrides
- Ensure `server_cert_validation=ignore` supersedes ca_trust_path/env overrides
- Set minimum version of requests-credssp to support Kerberos auth over CredSSP and other changes
- Added `proxy` support where it can be defined within the application, with the ability to specify the proxy within the application

### Version 0.3.0
- Added support for message encryption over HTTP when using NTLM/Kerberos/CredSSP
Expand Down
1 change: 1 addition & 0 deletions winrm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
FEATURE_SUPPORTED_AUTHTYPES = ['basic', 'certificate', 'ntlm', 'kerberos', 'plaintext', 'ssl', 'credssp']
FEATURE_READ_TIMEOUT = True
FEATURE_OPERATION_TIMEOUT = True
FEATURE_PROXY_SUPPORT = True


class Response(object):
Expand Down
8 changes: 6 additions & 2 deletions winrm/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ def __init__(
kerberos_hostname_override=None,
message_encryption='auto',
credssp_disable_tlsv1_2=False,
send_cbt=True):
send_cbt=True,
proxy='legacy_requests',
):
"""
@param string endpoint: the WinRM webservice endpoint
@param string transport: transport type, one of 'plaintext' (default), 'kerberos', 'ssl', 'ntlm', 'credssp' # NOQA
Expand All @@ -57,6 +59,7 @@ def __init__(
@param int operation_timeout_sec: maximum allowed time in seconds for any single wsman HTTP operation (default 20). Note that operation timeouts while receiving output (the only wsman operation that should take any significant time, and where these timeouts are expected) will be silently retried indefinitely. # NOQA
@param string kerberos_hostname_override: the hostname to use for the kerberos exchange (defaults to the hostname in the endpoint URL)
@param bool message_encryption_enabled: Will encrypt the WinRM messages if set to True and the transport auth supports message encryption (Default True).
@param string proxy: Specify a proxy for the WinRM connection to use. 'legacy_requests'(default) to use environment variables, None to disable proxies completely or the proxy URL itself.
"""

try:
Expand Down Expand Up @@ -88,7 +91,8 @@ def __init__(
auth_method=transport,
message_encryption=message_encryption,
credssp_disable_tlsv1_2=credssp_disable_tlsv1_2,
send_cbt=send_cbt
send_cbt=send_cbt,
proxy=proxy,
)

self.username = username
Expand Down
218 changes: 166 additions & 52 deletions winrm/tests/test_transport.py
Original file line number Diff line number Diff line change
@@ -1,62 +1,176 @@
# coding=utf-8
import os
from winrm.transport import Transport


def test_build_session_cert_validate():
t_default = Transport(endpoint="Endpoint",
server_cert_validation='validate',
username='test',
password='test',
auth_method='basic',
)
t_ca_override = Transport(endpoint="Endpoint",
server_cert_validation='validate',
username='test',
password='test',
auth_method='basic',
ca_trust_path='overridepath',
)
try:
import unittest

from winrm import transport


class TestTransport(unittest.TestCase):
maxDiff = 2048
_old_env = None

def setUp(self):
super(TestTransport, self).setUp()
self._old_env = {}
os.environ.pop('REQUESTS_CA_BUNDLE', None)
os.environ.pop('TRAVIS_APT_PROXY', None)
os.environ.pop('CURL_CA_BUNDLE', None)
os.environ.pop('HTTPS_PROXY', None)
os.environ.pop('HTTP_PROXY', None)
os.environ.pop('NO_PROXY', None)
transport.DISPLAYED_PROXY_WARNING = False

def tearDown(self):
super(TestTransport, self).tearDown()
os.environ.pop('REQUESTS_CA_BUNDLE', None)
os.environ.pop('TRAVIS_APT_PROXY', None)
os.environ.pop('CURL_CA_BUNDLE', None)
os.environ.pop('HTTPS_PROXY', None)
os.environ.pop('HTTP_PROXY', None)
os.environ.pop('NO_PROXY', None)

def test_build_session_cert_validate_1(self):
os.environ['REQUESTS_CA_BUNDLE'] = 'path_to_REQUESTS_CA_CERT'

t_default = transport.Transport(endpoint="https://example.com",
server_cert_validation='validate',
username='test',
password='test',
auth_method='basic',
)
t_default.build_session()
self.assertEqual('path_to_REQUESTS_CA_CERT', t_default.session.verify)

def test_build_session_cert_validate_2(self):
os.environ['CURL_CA_BUNDLE'] = 'path_to_CURL_CA_CERT'

t_default = transport.Transport(endpoint="https://example.com",
server_cert_validation='validate',
username='test',
password='test',
auth_method='basic',
)
t_default.build_session()
self.assertEqual('path_to_CURL_CA_CERT', t_default.session.verify)

def test_build_session_cert_override_1(self):
os.environ['REQUESTS_CA_BUNDLE'] = 'path_to_REQUESTS_CA_CERT'

t_default = transport.Transport(endpoint="https://example.com",
server_cert_validation='validate',
username='test',
password='test',
auth_method='basic',
ca_trust_path='overridepath',
)
t_default.build_session()
t_ca_override.build_session()
assert(t_default.session.verify == 'path_to_REQUESTS_CA_CERT')
assert(t_ca_override.session.verify == 'overridepath')
finally:
del os.environ['REQUESTS_CA_BUNDLE']
self.assertEqual('overridepath', t_default.session.verify)

try:
def test_build_session_cert_override_2(self):
os.environ['CURL_CA_BUNDLE'] = 'path_to_CURL_CA_CERT'

t_default = transport.Transport(endpoint="https://example.com",
server_cert_validation='validate',
username='test',
password='test',
auth_method='basic',
ca_trust_path='overridepath',
)
t_default.build_session()
self.assertEqual('overridepath', t_default.session.verify)

def test_build_session_cert_ignore_1(self):
os.environ['REQUESTS_CA_BUNDLE'] = 'path_to_REQUESTS_CA_CERT'
os.environ['CURL_CA_BUNDLE'] = 'path_to_CURL_CA_CERT'

t_default = transport.Transport(endpoint="https://example.com",
server_cert_validation='ignore',
username='test',
password='test',
auth_method='basic',
)

t_default.build_session()
t_ca_override.build_session()
assert(t_default.session.verify == 'path_to_CURL_CA_CERT')
assert (t_ca_override.session.verify == 'overridepath')
finally:
del os.environ['CURL_CA_BUNDLE']


def test_build_session_cert_ignore():
t_default = Transport(endpoint="Endpoint",
server_cert_validation='ignore',
username='test',
password='test',
auth_method='basic',
)
t_ca_override = Transport(endpoint="Endpoint",
server_cert_validation='ignore',
username='test',
password='test',
auth_method='basic',
ca_trust_path='boguspath'
)
try:
self.assertIs(False, t_default.session.verify)

def test_build_session_cert_ignore_2(self):
os.environ['REQUESTS_CA_BUNDLE'] = 'path_to_REQUESTS_CA_CERT'
os.environ['CURL_CA_BUNDLE'] = 'path_to_CURL_CA_CERT'

t_default = transport.Transport(endpoint="https://example.com",
server_cert_validation='ignore',
username='test',
password='test',
auth_method='basic',
ca_trust_path='boguspath'
)

t_default.build_session()
self.assertIs(False, t_default.session.verify)

def test_build_session_proxy_none(self):
os.environ['HTTP_PROXY'] = 'random_proxy'
os.environ['HTTPS_PROXY'] = 'random_proxy_2'

t_default = transport.Transport(endpoint="https://example.com",
server_cert_validation='validate',
username='test',
password='test',
auth_method='basic',
proxy=None
)

t_default.build_session()
self.assertEqual({'no_proxy': '*'}, t_default.session.proxies)

def test_build_session_proxy_defined(self):
t_default = transport.Transport(endpoint="https://example.com",
server_cert_validation='validate',
username='test',
password='test',
auth_method='basic',
proxy='test_proxy'
)

t_default.build_session()
self.assertEqual({'http': 'test_proxy', 'https': 'test_proxy'}, t_default.session.proxies)

def test_build_session_proxy_defined_and_env(self):
os.environ['HTTPS_PROXY'] = 'random_proxy'

t_default = transport.Transport(endpoint="https://example.com",
server_cert_validation='validate',
username='test',
password='test',
auth_method='basic',
proxy='test_proxy'
)

t_default.build_session()
self.assertEqual({'http': 'test_proxy', 'https': 'test_proxy'}, t_default.session.proxies)

def test_build_session_proxy_with_env_https(self):
os.environ['HTTPS_PROXY'] = 'random_proxy'

t_default = transport.Transport(endpoint="https://example.com",
server_cert_validation='validate',
username='test',
password='test',
auth_method='basic',
)

t_default.build_session()
self.assertEqual({'https': 'random_proxy'}, t_default.session.proxies)

def test_build_session_proxy_with_env_http(self):
os.environ['HTTP_PROXY'] = 'random_proxy'

t_default = transport.Transport(endpoint="https://example.com",
server_cert_validation='validate',
username='test',
password='test',
auth_method='basic',
)

t_default.build_session()
t_ca_override.build_session()
assert(isinstance(t_default.session.verify, bool) and not t_default.session.verify)
assert (isinstance(t_ca_override.session.verify, bool) and not t_ca_override.session.verify)
finally:
del os.environ['REQUESTS_CA_BUNDLE']
del os.environ['CURL_CA_BUNDLE']
self.assertEqual({'http': 'random_proxy'}, t_default.session.proxies)
40 changes: 32 additions & 8 deletions winrm/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from winrm.encryption import Encryption

is_py2 = sys.version[0] == '2'
DISPLAYED_PROXY_WARNING = False

if is_py2:
# use six for this instead?
Expand Down Expand Up @@ -62,7 +63,8 @@ def __init__(
credssp_disable_tlsv1_2=False,
credssp_auth_mechanism='auto',
credssp_minimum_version=2,
send_cbt=True):
send_cbt=True,
proxy='legacy_requests'):
self.endpoint = endpoint
self.username = username
self.password = password
Expand All @@ -80,6 +82,7 @@ def __init__(
self.credssp_auth_mechanism = credssp_auth_mechanism
self.credssp_minimum_version = credssp_minimum_version
self.send_cbt = send_cbt
self.proxy = proxy

if self.server_cert_validation not in [None, 'validate', 'ignore']:
raise WinRMError('invalid server_cert_validation mode: %s' % self.server_cert_validation)
Expand Down Expand Up @@ -150,14 +153,35 @@ def __init__(

def build_session(self):
session = requests.Session()
proxies = dict()

if self.proxy is None:
proxies['no_proxy'] = '*'
elif self.proxy != 'legacy_requests':
# If there was a proxy specified then use it
proxies['http'] = self.proxy
proxies['https'] = self.proxy

# Merge proxy environment variables
settings = session.merge_environment_settings(url=self.endpoint,
proxies=proxies, stream=None, verify=None, cert=None)

global DISPLAYED_PROXY_WARNING

# We want to eventually stop reading proxy information from the environment.
# Also only display the warning once. This method can be called many times during an application's runtime.
if not DISPLAYED_PROXY_WARNING and self.proxy == 'legacy_requests' and (
'http' in settings['proxies'] or 'https' in settings['proxies']):
message = "'pywinrm' will use an environment defined proxy. This feature will be disabled in " \
"the future, please provide it explicitly."
if 'http' in settings['proxies']:
message += " HTTP proxy {proxy} discover.".format(proxy=settings['proxies']['http'])
if 'https' in settings['proxies']:
message += " HTTPS proxy {proxy} discover.".format(proxy=settings['proxies']['https'])

DISPLAYED_PROXY_WARNING = True
warnings.warn(message, DeprecationWarning)

# allow some settings to be merged from env
session.trust_env = True
settings = session.merge_environment_settings(url=self.endpoint, proxies={}, stream=None,
verify=None, cert=None)

# get proxy settings from env
# FUTURE: allow proxy to be passed in directly to supersede this value
session.proxies = settings['proxies']

# specified validation mode takes precedence
Expand Down

0 comments on commit ff648dc

Please sign in to comment.