Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

winrm: fix up unit tests (#41112) #41211

Merged
merged 1 commit into from Jun 6, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions test/runner/requirements/units.txt
Expand Up @@ -28,3 +28,6 @@ pyfmg

# requirement for aci_rest module
xmljson

# requirement for winrm connection plugin tests
pexpect
261 changes: 130 additions & 131 deletions test/units/plugins/connection/test_winrm.py
Expand Up @@ -6,8 +6,6 @@
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type

import sys

import pytest

from io import StringIO
Expand All @@ -17,6 +15,7 @@
from ansible.module_utils._text import to_bytes
from ansible.playbook.play_context import PlayContext
from ansible.plugins.loader import connection_loader
from ansible.plugins.connection import winrm


class TestConnectionWinRM(object):
Expand Down Expand Up @@ -194,23 +193,13 @@ class TestConnectionWinRM(object):
@pytest.mark.parametrize('play, options, direct, expected, kerb',
((p, o, d, e, k) for p, o, d, e, k in OPTIONS_DATA))
def test_set_options(self, play, options, direct, expected, kerb):
if kerb:
kerberos_mock = MagicMock()
modules = {'kerberos': kerberos_mock}
else:
modules = {'kerberos': None}

module_patcher = patch.dict(sys.modules, modules)
module_patcher.start()
winrm.HAVE_KERBEROS = kerb

pc = PlayContext()
for attr, value in play.items():
setattr(pc, attr, value)

new_stdin = StringIO()

# ensure we get a fresh connection plugin by clearing the cache
connection_loader._module_cache = {}
conn = connection_loader.get('winrm', pc, new_stdin)
conn.set_options(var_options=options, direct=direct)

Expand All @@ -220,149 +209,159 @@ def test_set_options(self, play, options, direct, expected, kerb):
"winrm attr '%s', actual '%s' != expected '%s'"\
% (attr, actual, expected)

module_patcher.stop()


class TestWinRMKerbAuth(object):

DATA = (
# default
({"_extras": {}}, (["kinit", "user@domain"],), False),
({"_extras": {}}, ("kinit", ["user@domain"],), True),

# override kinit path from options
({"_extras": {}, 'ansible_winrm_kinit_cmd': 'kinit2'},
(["kinit2", "user@domain"],), False),
({"_extras": {}, 'ansible_winrm_kinit_cmd': 'kinit2'},
("kinit2", ["user@domain"],), True),

# we expect the -f flag when delegation is set
({"_extras": {'ansible_winrm_kerberos_delegation': True}},
(["kinit", "-f", "user@domain"],), False),
({"_extras": {'ansible_winrm_kerberos_delegation': True}},
("kinit", ["-f", "user@domain"],), True),
)

# pylint bug: https://github.com/PyCQA/pylint/issues/511
# pylint: disable=undefined-variable
@pytest.mark.parametrize('options, expected, pexpect',
((o, e, p) for o, e, p in DATA))
def test_kinit_success(self, options, expected, pexpect):
def mock_popen_communicate(input=None, timeout=None):
@pytest.mark.parametrize('options, expected', [
[{"_extras": {}},
(["kinit", "user@domain"],)],
[{"_extras": {}, 'ansible_winrm_kinit_cmd': 'kinit2'},
(["kinit2", "user@domain"],)],
[{"_extras": {'ansible_winrm_kerberos_delegation': True}},
(["kinit", "-f", "user@domain"],)],
])
def test_kinit_success_subprocess(self, monkeypatch, options, expected):
def mock_communicate(input=None, timeout=None):
return b"", b""

mock_pexpect = None
if pexpect:
mock_pexpect = MagicMock()
mock_pexpect.spawn.return_value.exitstatus = 0
mock_popen = MagicMock()
mock_popen.return_value.communicate = mock_communicate
mock_popen.return_value.returncode = 0
monkeypatch.setattr("subprocess.Popen", mock_popen)

mock_subprocess = MagicMock()
mock_subprocess.Popen.return_value.communicate = mock_popen_communicate
mock_subprocess.Popen.return_value.returncode = 0

modules = {
'pexpect': mock_pexpect,
'subprocess': mock_subprocess,
}
winrm.HAS_PEXPECT = False
pc = PlayContext()
new_stdin = StringIO()
conn = connection_loader.get('winrm', pc, new_stdin)
conn.set_options(var_options=options)

conn._kerb_auth("user@domain", "pass")
mock_calls = mock_popen.mock_calls
assert len(mock_calls) == 1
assert mock_calls[0][1] == expected
actual_env = mock_calls[0][2]['env']
assert list(actual_env.keys()) == ['KRB5CCNAME']
assert actual_env['KRB5CCNAME'].startswith("FILE:/")

@pytest.mark.parametrize('options, expected', [
[{"_extras": {}},
("kinit", ["user@domain"],)],
[{"_extras": {}, 'ansible_winrm_kinit_cmd': 'kinit2'},
("kinit2", ["user@domain"],)],
[{"_extras": {'ansible_winrm_kerberos_delegation': True}},
("kinit", ["-f", "user@domain"],)],
])
def test_kinit_success_pexpect(self, monkeypatch, options, expected):
pytest.importorskip("pexpect")
mock_pexpect = MagicMock()
mock_pexpect.return_value.exitstatus = 0
monkeypatch.setattr("pexpect.spawn", mock_pexpect)

winrm.HAS_PEXPECT = True
pc = PlayContext()
new_stdin = StringIO()
conn = connection_loader.get('winrm', pc, new_stdin)
conn.set_options(var_options=options)

conn._kerb_auth("user@domain", "pass")
mock_calls = mock_pexpect.mock_calls
assert mock_calls[0][1] == expected
actual_env = mock_calls[0][2]['env']
assert list(actual_env.keys()) == ['KRB5CCNAME']
assert actual_env['KRB5CCNAME'].startswith("FILE:/")
assert mock_calls[1][0] == "().expect"
assert mock_calls[1][1] == (".*:",)
assert mock_calls[2][0] == "().sendline"
assert mock_calls[2][1] == ("pass",)
assert mock_calls[3][0] == "().read"
assert mock_calls[4][0] == "().wait"

def test_kinit_with_missing_executable_subprocess(self, monkeypatch):
expected_err = "[Errno 2] No such file or directory: " \
"'/fake/kinit': '/fake/kinit'"
mock_popen = MagicMock(side_effect=OSError(expected_err))

with patch.dict(sys.modules, modules):
pc = PlayContext()
new_stdin = StringIO()
monkeypatch.setattr("subprocess.Popen", mock_popen)

connection_loader._module_cache = {}
conn = connection_loader.get('winrm', pc, new_stdin)
conn.set_options(var_options=options)
winrm.HAS_PEXPECT = False
pc = PlayContext()
new_stdin = StringIO()
conn = connection_loader.get('winrm', pc, new_stdin)
options = {"_extras": {}, "ansible_winrm_kinit_cmd": "/fake/kinit"}
conn.set_options(var_options=options)

with pytest.raises(AnsibleConnectionFailure) as err:
conn._kerb_auth("user@domain", "pass")
if pexpect:
assert len(mock_pexpect.method_calls) == 1
assert mock_pexpect.method_calls[0][1] == expected
actual_env = mock_pexpect.method_calls[0][2]['env']
else:
assert len(mock_subprocess.method_calls) == 1
assert mock_subprocess.method_calls[0][1] == expected
actual_env = mock_subprocess.method_calls[0][2]['env']

assert list(actual_env.keys()) == ['KRB5CCNAME']
assert actual_env['KRB5CCNAME'].startswith("FILE:/")
assert str(err.value) == "Kerberos auth failure when calling " \
"kinit cmd '/fake/kinit': %s" % expected_err

# pylint bug: https://github.com/PyCQA/pylint/issues/511
# pylint: disable=undefined-variable
@pytest.mark.parametrize('use_pexpect', (False, True),)
def test_kinit_with_missing_executable(self, use_pexpect):
expected_err = "[Errno 2] No such file or directory: " \
"'/fake/kinit': '/fake/kinit'"
mock_subprocess = MagicMock()
mock_subprocess.Popen = MagicMock(side_effect=OSError(expected_err))

mock_pexpect = None
if use_pexpect:
expected_err = "The command was not found or was not " \
"executable: /fake/kinit"
def test_kinit_with_missing_executable_pexpect(self, monkeypatch):
pexpect = pytest.importorskip("pexpect")

mock_pexpect = MagicMock()
mock_pexpect.ExceptionPexpect = Exception
mock_pexpect.spawn = MagicMock(side_effect=Exception(expected_err))
expected_err = "The command was not found or was not " \
"executable: /fake/kinit"
mock_pexpect = \
MagicMock(side_effect=pexpect.ExceptionPexpect(expected_err))

modules = {
'pexpect': mock_pexpect,
'subprocess': mock_subprocess,
}
monkeypatch.setattr("pexpect.spawn", mock_pexpect)

with patch.dict(sys.modules, modules):
pc = PlayContext()
new_stdin = StringIO()

connection_loader._module_cache = {}
conn = connection_loader.get('winrm', pc, new_stdin)
options = {"_extras": {}, "ansible_winrm_kinit_cmd": "/fake/kinit"}
conn.set_options(var_options=options)
winrm.HAS_PEXPECT = True
pc = PlayContext()
new_stdin = StringIO()
conn = connection_loader.get('winrm', pc, new_stdin)
options = {"_extras": {}, "ansible_winrm_kinit_cmd": "/fake/kinit"}
conn.set_options(var_options=options)

with pytest.raises(AnsibleConnectionFailure) as err:
conn._kerb_auth("user@domain", "pass")
assert str(err.value) == "Kerberos auth failure when calling " \
"kinit cmd '/fake/kinit': %s" % expected_err
with pytest.raises(AnsibleConnectionFailure) as err:
conn._kerb_auth("user@domain", "pass")
assert str(err.value) == "Kerberos auth failure when calling " \
"kinit cmd '/fake/kinit': %s" % expected_err

# pylint bug: https://github.com/PyCQA/pylint/issues/511
# pylint: disable=undefined-variable
@pytest.mark.parametrize('use_pexpect', (False, True),)
def test_kinit_error(self, use_pexpect):
mechanism = "subprocess"
def test_kinit_error_subprocess(self, monkeypatch):
expected_err = "kinit: krb5_parse_name: " \
"Configuration file does not specify default realm"

def mock_popen_communicate(input=None, timeout=None):
def mock_communicate(input=None, timeout=None):
return b"", to_bytes(expected_err)

mock_subprocess = MagicMock()
mock_subprocess.Popen.return_value.communicate = mock_popen_communicate
mock_subprocess.Popen.return_value.returncode = 1
mock_popen = MagicMock()
mock_popen.return_value.communicate = mock_communicate
mock_popen.return_value.returncode = 1
monkeypatch.setattr("subprocess.Popen", mock_popen)

winrm.HAS_PEXPECT = False
pc = PlayContext()
new_stdin = StringIO()
conn = connection_loader.get('winrm', pc, new_stdin)
conn.set_options(var_options={"_extras": {}})

with pytest.raises(AnsibleConnectionFailure) as err:
conn._kerb_auth("invaliduser", "pass")

mock_pexpect = None
if use_pexpect:
mechanism = "pexpect"
expected_err = "Configuration file does not specify default realm"
assert str(err.value) == \
"Kerberos auth failure for principal invaliduser with " \
"subprocess: %s" % (expected_err)

mock_pexpect = MagicMock()
mock_pexpect.spawn.return_value.expect = MagicMock(side_effect=OSError)
mock_pexpect.spawn.return_value.read.return_value = to_bytes(expected_err)
mock_pexpect.spawn.return_value.exitstatus = 1
def test_kinit_error_pexpect(self, monkeypatch):
pytest.importorskip("pexpect")

modules = {
'pexpect': mock_pexpect,
'subprocess': mock_subprocess,
}
expected_err = "Configuration file does not specify default realm"
mock_pexpect = MagicMock()
mock_pexpect.return_value.expect = MagicMock(side_effect=OSError)
mock_pexpect.return_value.read.return_value = to_bytes(expected_err)
mock_pexpect.return_value.exitstatus = 1

with patch.dict(sys.modules, modules):
pc = PlayContext()
new_stdin = StringIO()
monkeypatch.setattr("pexpect.spawn", mock_pexpect)

connection_loader._module_cache = {}
conn = connection_loader.get('winrm', pc, new_stdin)
conn.set_options(var_options={"_extras": {}})
winrm.HAS_PEXPECT = True
pc = PlayContext()
new_stdin = StringIO()
conn = connection_loader.get('winrm', pc, new_stdin)
conn.set_options(var_options={"_extras": {}})

with pytest.raises(AnsibleConnectionFailure) as err:
conn._kerb_auth("invaliduser", "pass")
with pytest.raises(AnsibleConnectionFailure) as err:
conn._kerb_auth("invaliduser", "pass")

assert str(err.value) == "Kerberos auth failure for principal " \
"invaliduser with %s: %s" % (mechanism, expected_err)
assert str(err.value) == \
"Kerberos auth failure for principal invaliduser with " \
"pexpect: %s" % (expected_err)