Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Fix SSL test to accomodate new Twisted API for SSL #152

Closed
wants to merge 5 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 37 additions & 49 deletions src/txkube/test/test_authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@
from twisted.internet.error import DNSLookupError
from twisted.internet.interfaces import IHostResolution, IReactorPluggableNameResolver
from twisted.internet.protocol import Factory
from twisted.protocols.tls import TLSMemoryBIOFactory
from twisted.web.http_headers import Headers
from twisted.test.iosim import ConnectionCompleter
from twisted.test.proto_helpers import AccumulatingProtocol, MemoryReactorClock
from twisted.test.test_sslverify import certificatesForAuthorityAndServer

from ..testing import TestCase, assertNoResult

Expand Down Expand Up @@ -120,19 +122,28 @@ class AuthenticateWithServiceAccountTests(TestCase):
Tests for ``authenticate_with_serviceaccount``.
"""
def _authorized_request(self, token, headers,
kubernetes_host=b"example.invalid."):
kubernetes_host=b"example.invalid.",
port=80,
proto=b'http',
serverWrapper=lambda server: server):
"""
Get an agent using ``authenticate_with_serviceaccount`` and issue a
request with it.

:return bytes: The bytes of the request the agent issues.
"""
server = AccumulatingProtocol()
factory = Factory.forProtocol(lambda: server)
factory.protocolConnectionMade = None

@Factory.forProtocol
def accumulator():
accumulator.currentProtocol = server
return server
accumulator.currentProtocol = None
accumulator.protocolConnectionMade = None
factory = serverWrapper(accumulator)

reactor = create_reactor()
reactor.listenTCP(80, factory)
reactor.listenTCP(port, factory)

t = FilePath(self.useFixture(TempDir()).join(b""))
serviceaccount = t.child(b"serviceaccount")
Expand All @@ -152,19 +163,32 @@ def _authorized_request(self, token, headers,
reactor, path=serviceaccount.path,
)

d = agent.request(b"GET", b"http://" + kubernetes_host, headers)
d = agent.request(b"GET", proto + b"://" + kubernetes_host, headers)
assertNoResult(self, d)
[(host, port, factory, _, _)] = reactor.tcpClients

addr = HOST_MAP.get(kubernetes_host.decode("ascii"), None)
self.expectThat((host, port), Equals((addr, 80)))
self.expectThat((host, port), Equals((addr, port)))

pump = ConnectionCompleter(reactor).succeedOnce()
pump = ConnectionCompleter(reactor).succeedOnce(debug=True)
pump.pump()

return server.data


def _authorized_request_https(self, token, headers,
kubernetes_host=b"example.invalid.",
port=443,
proto=b'https'):
authority, server = certificatesForAuthorityAndServer(kubernetes_host.decode('ascii'))

def tlsify(serverFactory):
return TLSMemoryBIOFactory(server.options(), False, serverFactory)

return self._authorized_request(token, headers,
kubernetes_host=kubernetes_host, port=port, proto=proto, serverWrapper=tlsify)


def test_http_bearer_token_authorization(self):
"""
The ``IAgent`` returned adds an *Authorization* header to each request it
Expand All @@ -177,7 +201,7 @@ def test_http_bearer_token_authorization(self):
# Sure would be nice to have an HTTP parser.
self.assertThat(
request_bytes,
Contains(u"Authorization: Bearer {}".format(token).encode("ascii")),
Contains(b"Authorization: Bearer " + token),
)


Expand All @@ -187,50 +211,14 @@ def test_https_bearer_token_authorization(self):
issues. The header includes the bearer token from the service account
file. This works over HTTPS.
"""
# This test duplicates a lot of logic from _authorized_request because
# ConnectionCompleter doesn't work with TLS connections by itself.
server = AccumulatingProtocol()
factory = Factory.forProtocol(lambda: server)
factory.protocolConnectionMade = None

reactor = create_reactor()
reactor.listenTCP(443, factory)

token = bytes(uuid4())
request_bytes = self._authorized_request_https(token, Headers(),
kubernetes_host=b"example.invalid.")

t = FilePath(self.useFixture(TempDir()).join(b""))
serviceaccount = t.child(b"serviceaccount")
serviceaccount.makedirs()

serviceaccount.child(b"ca.crt").setContent(_CA_CERT_PEM)
serviceaccount.child(b"token").setContent(token)

self.patch(
os, "environ", {
b"KUBERNETES_SERVICE_HOST": b"example.invalid.",
b"KUBERNETES_SERVICE_PORT": b"443",
},
)

agent = authenticate_with_serviceaccount(
reactor, path=serviceaccount.path,
)
headers = Headers()
agent.request(b"GET", b"https://example.invalid.", headers)

[connection] = reactor.sslClients
(host, port, factory) = connection[:3]
# Put it somewhere ConnectionCompleter can deal with.
reactor.tcpClients.append((host, port, factory, None, None))

pump = ConnectionCompleter(reactor).succeedOnce()
pump.pump()

request_bytes = server.data
# Sure would be nice to have an HTTP parser.
self.assertThat(
request_bytes,
Contains(u"Authorization: Bearer {}".format(token).encode("ascii")),
Contains(b"Authorization: Bearer " + token),
)


Expand All @@ -242,7 +230,7 @@ def test_hostname_does_not_resolve(self):
with ExpectedException(DNSLookupError, "DNS lookup failed: no results "
"for hostname lookup: doesnotresolve."):
self._authorized_request(
token="test",
token=b"test",
headers=Headers({}),
kubernetes_host=b"doesnotresolve"
)
Expand All @@ -258,7 +246,7 @@ def test_other_headers_preserved(self):
request_bytes = self._authorized_request(token=token, headers=headers)
self.expectThat(
request_bytes,
Contains(u"Authorization: Bearer {}".format(token).encode("ascii")),
Contains(b"Authorization: Bearer " + token),
)
self.expectThat(
request_bytes,
Expand Down