Skip to content

Commit

Permalink
programs: Add e2e tests for DTLS + CERT
Browse files Browse the repository at this point in the history
  • Loading branch information
Synss committed May 5, 2023
1 parent a9d2e85 commit ffdd744
Showing 1 changed file with 177 additions and 0 deletions.
177 changes: 177 additions & 0 deletions tests/test_tls.py
Expand Up @@ -1376,3 +1376,180 @@ def test_raw_socket_sendto_recvfrom_into(self, port: int) -> None:
received, addr = client.recvfrom_into(buffer, 1024)
assert addr == address
assert buffer[:received] == secret


@pytest.mark.e2e()
@pytest.mark.skipif(sys.platform == "win32", reason="Flaky under Windows")
class TestProgramsDTLS_CERT:
@pytest.fixture(scope="class")
def certificate(self) -> Tuple[CRT, _Key]:
return make_root_ca()

@pytest.fixture(scope="class")
def port(self) -> int:
"""Return a free port
Note:
Not 100% race condition free.
"""
port = 0
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
sock.bind(("localhost", port))
port = sock.getsockname()[1]
return port

@pytest.fixture(scope="class")
def server(
self,
rootpath: Path,
port: int,
certificate: Tuple[CRT, _Key],
as_path: Callable[[str], Path],
) -> Iterator[subprocess.Popen[str]]:
crt, key = certificate
args = [
sys.executable,
str(rootpath / "programs" / "server.py"),
"--port",
f"{port}",
"--dtls",
"--cert",
as_path(crt.to_PEM()),
"--key",
as_path(key.export_key("PEM")),
]
with subprocess.Popen(args, text=True, encoding="utf8") as proc:
yield proc
proc.kill()
proc.wait(1.0)

@pytest.mark.repeat(3)
@pytest.mark.usefixtures("server")
@pytest.mark.timeout(30)
def test_client(
self,
rootpath: Path,
port: int,
certificate: Tuple[CRT, _Key],
as_path: Callable[[str], Path],
) -> None:
secret = "a very secret message"
crt, key = certificate
args = [
sys.executable,
str(rootpath / "programs" / "client.py"),
"--port",
f"{port}",
"--dtls",
"--trust-store",
as_path(crt.to_PEM()),
secret,
]
for _ in range(3):
assert do_communicate(args) == secret + "\n"

@pytest.mark.usefixtures("server")
@pytest.mark.timeout(30)
def test_raw_socket_send_recv(
self, port: int, certificate: Tuple[CRT, _Key]
) -> None:
root_crt, root_key = certificate
ee_crt, ee_key = make_crt(root_crt, root_key)
chain = (ee_crt, root_crt), ee_key
address = ("127.0.0.1", port)
secret = b"a very secret message"
with ClientContext(
DTLSConfiguration(
trust_store=make_trust_store(chain),
validate_certificates=False,
),
).wrap_socket(
socket.socket(socket.AF_INET, socket.SOCK_DGRAM), "localhost"
) as client:
client.connect(address)
client.do_handshake()
sent = client.send(secret)
assert sent == len(secret)

data = client.recv(1024)
assert data == secret

@pytest.mark.usefixtures("server")
@pytest.mark.timeout(30)
def test_raw_socket_sendto_recvfrom(
self, port: int, certificate: Tuple[CRT, _Key]
) -> None:
address = ("127.0.0.1", port)
root_crt, root_key = certificate
ee_crt, ee_key = make_crt(root_crt, root_key)
chain = (ee_crt, root_crt), ee_key
secret = b"a very secret message"
with ClientContext(
DTLSConfiguration(
trust_store=make_trust_store(chain),
validate_certificates=False,
),
).wrap_socket(
socket.socket(socket.AF_INET, socket.SOCK_DGRAM), "localhost"
) as client:
client.do_handshake(address)
sent = client.sendto(secret, address)
assert sent == len(secret)

data, addr = client.recvfrom(1024)
assert addr == address
assert data == secret

@pytest.mark.usefixtures("server")
@pytest.mark.timeout(30)
def test_raw_socket_sendto_recvfrom_with_flags(self, port: int) -> None:
# Note that flags is always 0 (noop) here because we are only
# interested in testing the API. See also issue #62.
address = ("127.0.0.1", port)
root_crt, root_key = make_root_ca()
ee_crt, ee_key = make_crt(root_crt, root_key)
chain = (ee_crt, root_crt), ee_key
secret = b"a very secret message"
with ClientContext(
DTLSConfiguration(
trust_store=make_trust_store(chain),
validate_certificates=False,
),
).wrap_socket(
socket.socket(socket.AF_INET, socket.SOCK_DGRAM), "localhost"
) as client:
client.do_handshake(0, address)
sent = client.sendto(secret, 0, address)
assert sent == len(secret)

data, addr = client.recvfrom(1024, 0)
assert addr == address
assert data == secret

@pytest.mark.usefixtures("server")
@pytest.mark.timeout(30)
def test_raw_socket_sendto_recvfrom_into(
self, port: int, certificate: Tuple[CRT, _Key]
) -> None:
address = ("127.0.0.1", port)
root_crt, root_key = certificate
ee_crt, ee_key = make_crt(root_crt, root_key)
chain = (ee_crt, root_crt), ee_key
secret = b"a very secret message"
buffer = bytearray(b"\0" * 256)
with ClientContext(
DTLSConfiguration(
trust_store=make_trust_store(chain),
validate_certificates=False,
),
).wrap_socket(
socket.socket(socket.AF_INET, socket.SOCK_DGRAM), "localhost"
) as client:
client.do_handshake(address)
sent = client.sendto(secret, address)
assert sent == len(secret)

received, addr = client.recvfrom_into(buffer, 1024)
assert addr == address
assert buffer[:received] == secret

0 comments on commit ffdd744

Please sign in to comment.