Skip to content

Commit

Permalink
winrm: fix up unit tests (#41112)
Browse files Browse the repository at this point in the history
  • Loading branch information
jborean93 committed Jun 6, 2018
1 parent 453a6f4 commit ad8e13e
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 131 deletions.
3 changes: 3 additions & 0 deletions test/runner/requirements/units.txt
Expand Up @@ -28,3 +28,6 @@ pyfmg

# requirement for aci_rest module
xmljson

# requirement for winrm connection plugin tests
pexpect
261 changes: 130 additions & 131 deletions test/units/plugins/connection/test_winrm.py
Expand Up @@ -6,8 +6,6 @@
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type

import sys

import pytest

from io import StringIO
Expand All @@ -17,6 +15,7 @@
from ansible.module_utils._text import to_bytes
from ansible.playbook.play_context import PlayContext
from ansible.plugins.loader import connection_loader
from ansible.plugins.connection import winrm


class TestConnectionWinRM(object):
Expand Down Expand Up @@ -194,23 +193,13 @@ class TestConnectionWinRM(object):
@pytest.mark.parametrize('play, options, direct, expected, kerb',
((p, o, d, e, k) for p, o, d, e, k in OPTIONS_DATA))
def test_set_options(self, play, options, direct, expected, kerb):
if kerb:
kerberos_mock = MagicMock()
modules = {'kerberos': kerberos_mock}
else:
modules = {'kerberos': None}

module_patcher = patch.dict(sys.modules, modules)
module_patcher.start()
winrm.HAVE_KERBEROS = kerb

pc = PlayContext()
for attr, value in play.items():
setattr(pc, attr, value)

new_stdin = StringIO()

# ensure we get a fresh connection plugin by clearing the cache
connection_loader._module_cache = {}
conn = connection_loader.get('winrm', pc, new_stdin)
conn.set_options(var_options=options, direct=direct)

Expand All @@ -220,149 +209,159 @@ def test_set_options(self, play, options, direct, expected, kerb):
"winrm attr '%s', actual '%s' != expected '%s'"\
% (attr, actual, expected)

module_patcher.stop()


class TestWinRMKerbAuth(object):

DATA = (
# default
({"_extras": {}}, (["kinit", "user@domain"],), False),
({"_extras": {}}, ("kinit", ["user@domain"],), True),

# override kinit path from options
({"_extras": {}, 'ansible_winrm_kinit_cmd': 'kinit2'},
(["kinit2", "user@domain"],), False),
({"_extras": {}, 'ansible_winrm_kinit_cmd': 'kinit2'},
("kinit2", ["user@domain"],), True),

# we expect the -f flag when delegation is set
({"_extras": {'ansible_winrm_kerberos_delegation': True}},
(["kinit", "-f", "user@domain"],), False),
({"_extras": {'ansible_winrm_kerberos_delegation': True}},
("kinit", ["-f", "user@domain"],), True),
)

# pylint bug: https://github.com/PyCQA/pylint/issues/511
# pylint: disable=undefined-variable
@pytest.mark.parametrize('options, expected, pexpect',
((o, e, p) for o, e, p in DATA))
def test_kinit_success(self, options, expected, pexpect):
def mock_popen_communicate(input=None, timeout=None):
@pytest.mark.parametrize('options, expected', [
[{"_extras": {}},
(["kinit", "user@domain"],)],
[{"_extras": {}, 'ansible_winrm_kinit_cmd': 'kinit2'},
(["kinit2", "user@domain"],)],
[{"_extras": {'ansible_winrm_kerberos_delegation': True}},
(["kinit", "-f", "user@domain"],)],
])
def test_kinit_success_subprocess(self, monkeypatch, options, expected):
def mock_communicate(input=None, timeout=None):
return b"", b""

mock_pexpect = None
if pexpect:
mock_pexpect = MagicMock()
mock_pexpect.spawn.return_value.exitstatus = 0
mock_popen = MagicMock()
mock_popen.return_value.communicate = mock_communicate
mock_popen.return_value.returncode = 0
monkeypatch.setattr("subprocess.Popen", mock_popen)

mock_subprocess = MagicMock()
mock_subprocess.Popen.return_value.communicate = mock_popen_communicate
mock_subprocess.Popen.return_value.returncode = 0

modules = {
'pexpect': mock_pexpect,
'subprocess': mock_subprocess,
}
winrm.HAS_PEXPECT = False
pc = PlayContext()
new_stdin = StringIO()
conn = connection_loader.get('winrm', pc, new_stdin)
conn.set_options(var_options=options)

conn._kerb_auth("user@domain", "pass")
mock_calls = mock_popen.mock_calls
assert len(mock_calls) == 1
assert mock_calls[0][1] == expected
actual_env = mock_calls[0][2]['env']
assert list(actual_env.keys()) == ['KRB5CCNAME']
assert actual_env['KRB5CCNAME'].startswith("FILE:/")

@pytest.mark.parametrize('options, expected', [
[{"_extras": {}},
("kinit", ["user@domain"],)],
[{"_extras": {}, 'ansible_winrm_kinit_cmd': 'kinit2'},
("kinit2", ["user@domain"],)],
[{"_extras": {'ansible_winrm_kerberos_delegation': True}},
("kinit", ["-f", "user@domain"],)],
])
def test_kinit_success_pexpect(self, monkeypatch, options, expected):
pytest.importorskip("pexpect")
mock_pexpect = MagicMock()
mock_pexpect.return_value.exitstatus = 0
monkeypatch.setattr("pexpect.spawn", mock_pexpect)

winrm.HAS_PEXPECT = True
pc = PlayContext()
new_stdin = StringIO()
conn = connection_loader.get('winrm', pc, new_stdin)
conn.set_options(var_options=options)

conn._kerb_auth("user@domain", "pass")
mock_calls = mock_pexpect.mock_calls
assert mock_calls[0][1] == expected
actual_env = mock_calls[0][2]['env']
assert list(actual_env.keys()) == ['KRB5CCNAME']
assert actual_env['KRB5CCNAME'].startswith("FILE:/")
assert mock_calls[1][0] == "().expect"
assert mock_calls[1][1] == (".*:",)
assert mock_calls[2][0] == "().sendline"
assert mock_calls[2][1] == ("pass",)
assert mock_calls[3][0] == "().read"
assert mock_calls[4][0] == "().wait"

def test_kinit_with_missing_executable_subprocess(self, monkeypatch):
expected_err = "[Errno 2] No such file or directory: " \
"'/fake/kinit': '/fake/kinit'"
mock_popen = MagicMock(side_effect=OSError(expected_err))

with patch.dict(sys.modules, modules):
pc = PlayContext()
new_stdin = StringIO()
monkeypatch.setattr("subprocess.Popen", mock_popen)

connection_loader._module_cache = {}
conn = connection_loader.get('winrm', pc, new_stdin)
conn.set_options(var_options=options)
winrm.HAS_PEXPECT = False
pc = PlayContext()
new_stdin = StringIO()
conn = connection_loader.get('winrm', pc, new_stdin)
options = {"_extras": {}, "ansible_winrm_kinit_cmd": "/fake/kinit"}
conn.set_options(var_options=options)

with pytest.raises(AnsibleConnectionFailure) as err:
conn._kerb_auth("user@domain", "pass")
if pexpect:
assert len(mock_pexpect.method_calls) == 1
assert mock_pexpect.method_calls[0][1] == expected
actual_env = mock_pexpect.method_calls[0][2]['env']
else:
assert len(mock_subprocess.method_calls) == 1
assert mock_subprocess.method_calls[0][1] == expected
actual_env = mock_subprocess.method_calls[0][2]['env']

assert list(actual_env.keys()) == ['KRB5CCNAME']
assert actual_env['KRB5CCNAME'].startswith("FILE:/")
assert str(err.value) == "Kerberos auth failure when calling " \
"kinit cmd '/fake/kinit': %s" % expected_err

# pylint bug: https://github.com/PyCQA/pylint/issues/511
# pylint: disable=undefined-variable
@pytest.mark.parametrize('use_pexpect', (False, True),)
def test_kinit_with_missing_executable(self, use_pexpect):
expected_err = "[Errno 2] No such file or directory: " \
"'/fake/kinit': '/fake/kinit'"
mock_subprocess = MagicMock()
mock_subprocess.Popen = MagicMock(side_effect=OSError(expected_err))

mock_pexpect = None
if use_pexpect:
expected_err = "The command was not found or was not " \
"executable: /fake/kinit"
def test_kinit_with_missing_executable_pexpect(self, monkeypatch):
pexpect = pytest.importorskip("pexpect")

mock_pexpect = MagicMock()
mock_pexpect.ExceptionPexpect = Exception
mock_pexpect.spawn = MagicMock(side_effect=Exception(expected_err))
expected_err = "The command was not found or was not " \
"executable: /fake/kinit"
mock_pexpect = \
MagicMock(side_effect=pexpect.ExceptionPexpect(expected_err))

modules = {
'pexpect': mock_pexpect,
'subprocess': mock_subprocess,
}
monkeypatch.setattr("pexpect.spawn", mock_pexpect)

with patch.dict(sys.modules, modules):
pc = PlayContext()
new_stdin = StringIO()

connection_loader._module_cache = {}
conn = connection_loader.get('winrm', pc, new_stdin)
options = {"_extras": {}, "ansible_winrm_kinit_cmd": "/fake/kinit"}
conn.set_options(var_options=options)
winrm.HAS_PEXPECT = True
pc = PlayContext()
new_stdin = StringIO()
conn = connection_loader.get('winrm', pc, new_stdin)
options = {"_extras": {}, "ansible_winrm_kinit_cmd": "/fake/kinit"}
conn.set_options(var_options=options)

with pytest.raises(AnsibleConnectionFailure) as err:
conn._kerb_auth("user@domain", "pass")
assert str(err.value) == "Kerberos auth failure when calling " \
"kinit cmd '/fake/kinit': %s" % expected_err
with pytest.raises(AnsibleConnectionFailure) as err:
conn._kerb_auth("user@domain", "pass")
assert str(err.value) == "Kerberos auth failure when calling " \
"kinit cmd '/fake/kinit': %s" % expected_err

# pylint bug: https://github.com/PyCQA/pylint/issues/511
# pylint: disable=undefined-variable
@pytest.mark.parametrize('use_pexpect', (False, True),)
def test_kinit_error(self, use_pexpect):
mechanism = "subprocess"
def test_kinit_error_subprocess(self, monkeypatch):
expected_err = "kinit: krb5_parse_name: " \
"Configuration file does not specify default realm"

def mock_popen_communicate(input=None, timeout=None):
def mock_communicate(input=None, timeout=None):
return b"", to_bytes(expected_err)

mock_subprocess = MagicMock()
mock_subprocess.Popen.return_value.communicate = mock_popen_communicate
mock_subprocess.Popen.return_value.returncode = 1
mock_popen = MagicMock()
mock_popen.return_value.communicate = mock_communicate
mock_popen.return_value.returncode = 1
monkeypatch.setattr("subprocess.Popen", mock_popen)

winrm.HAS_PEXPECT = False
pc = PlayContext()
new_stdin = StringIO()
conn = connection_loader.get('winrm', pc, new_stdin)
conn.set_options(var_options={"_extras": {}})

with pytest.raises(AnsibleConnectionFailure) as err:
conn._kerb_auth("invaliduser", "pass")

mock_pexpect = None
if use_pexpect:
mechanism = "pexpect"
expected_err = "Configuration file does not specify default realm"
assert str(err.value) == \
"Kerberos auth failure for principal invaliduser with " \
"subprocess: %s" % (expected_err)

mock_pexpect = MagicMock()
mock_pexpect.spawn.return_value.expect = MagicMock(side_effect=OSError)
mock_pexpect.spawn.return_value.read.return_value = to_bytes(expected_err)
mock_pexpect.spawn.return_value.exitstatus = 1
def test_kinit_error_pexpect(self, monkeypatch):
pytest.importorskip("pexpect")

modules = {
'pexpect': mock_pexpect,
'subprocess': mock_subprocess,
}
expected_err = "Configuration file does not specify default realm"
mock_pexpect = MagicMock()
mock_pexpect.return_value.expect = MagicMock(side_effect=OSError)
mock_pexpect.return_value.read.return_value = to_bytes(expected_err)
mock_pexpect.return_value.exitstatus = 1

with patch.dict(sys.modules, modules):
pc = PlayContext()
new_stdin = StringIO()
monkeypatch.setattr("pexpect.spawn", mock_pexpect)

connection_loader._module_cache = {}
conn = connection_loader.get('winrm', pc, new_stdin)
conn.set_options(var_options={"_extras": {}})
winrm.HAS_PEXPECT = True
pc = PlayContext()
new_stdin = StringIO()
conn = connection_loader.get('winrm', pc, new_stdin)
conn.set_options(var_options={"_extras": {}})

with pytest.raises(AnsibleConnectionFailure) as err:
conn._kerb_auth("invaliduser", "pass")
with pytest.raises(AnsibleConnectionFailure) as err:
conn._kerb_auth("invaliduser", "pass")

assert str(err.value) == "Kerberos auth failure for principal " \
"invaliduser with %s: %s" % (mechanism, expected_err)
assert str(err.value) == \
"Kerberos auth failure for principal invaliduser with " \
"pexpect: %s" % (expected_err)

0 comments on commit ad8e13e

Please sign in to comment.