diff --git a/tests/test_tls.py b/tests/test_tls.py index 30b7bf03..8f3edc66 100644 --- a/tests/test_tls.py +++ b/tests/test_tls.py @@ -1376,3 +1376,180 @@ def test_raw_socket_sendto_recvfrom_into(self, port: int) -> None: received, addr = client.recvfrom_into(buffer, 1024) assert addr == address assert buffer[:received] == secret + + +@pytest.mark.e2e() +@pytest.mark.skipif(sys.platform == "win32", reason="Flaky under Windows") +class TestProgramsDTLS_CERT: + @pytest.fixture(scope="class") + def certificate(self) -> Tuple[CRT, _Key]: + return make_root_ca() + + @pytest.fixture(scope="class") + def port(self) -> int: + """Return a free port + + Note: + Not 100% race condition free. + + """ + port = 0 + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: + sock.bind(("localhost", port)) + port = sock.getsockname()[1] + return port + + @pytest.fixture(scope="class") + def server( + self, + rootpath: Path, + port: int, + certificate: Tuple[CRT, _Key], + as_path: Callable[[str], Path], + ) -> Iterator[subprocess.Popen[str]]: + crt, key = certificate + args = [ + sys.executable, + str(rootpath / "programs" / "server.py"), + "--port", + f"{port}", + "--dtls", + "--cert", + as_path(crt.to_PEM()), + "--key", + as_path(key.export_key("PEM")), + ] + with subprocess.Popen(args, text=True, encoding="utf8") as proc: + yield proc + proc.kill() + proc.wait(1.0) + + @pytest.mark.repeat(3) + @pytest.mark.usefixtures("server") + @pytest.mark.timeout(30) + def test_client( + self, + rootpath: Path, + port: int, + certificate: Tuple[CRT, _Key], + as_path: Callable[[str], Path], + ) -> None: + secret = "a very secret message" + crt, key = certificate + args = [ + sys.executable, + str(rootpath / "programs" / "client.py"), + "--port", + f"{port}", + "--dtls", + "--trust-store", + as_path(crt.to_PEM()), + secret, + ] + for _ in range(3): + assert do_communicate(args) == secret + "\n" + + @pytest.mark.usefixtures("server") + @pytest.mark.timeout(30) + def test_raw_socket_send_recv( + self, port: int, certificate: Tuple[CRT, _Key] + ) -> None: + root_crt, root_key = certificate + ee_crt, ee_key = make_crt(root_crt, root_key) + chain = (ee_crt, root_crt), ee_key + address = ("127.0.0.1", port) + secret = b"a very secret message" + with ClientContext( + DTLSConfiguration( + trust_store=make_trust_store(chain), + validate_certificates=False, + ), + ).wrap_socket( + socket.socket(socket.AF_INET, socket.SOCK_DGRAM), "localhost" + ) as client: + client.connect(address) + client.do_handshake() + sent = client.send(secret) + assert sent == len(secret) + + data = client.recv(1024) + assert data == secret + + @pytest.mark.usefixtures("server") + @pytest.mark.timeout(30) + def test_raw_socket_sendto_recvfrom( + self, port: int, certificate: Tuple[CRT, _Key] + ) -> None: + address = ("127.0.0.1", port) + root_crt, root_key = certificate + ee_crt, ee_key = make_crt(root_crt, root_key) + chain = (ee_crt, root_crt), ee_key + secret = b"a very secret message" + with ClientContext( + DTLSConfiguration( + trust_store=make_trust_store(chain), + validate_certificates=False, + ), + ).wrap_socket( + socket.socket(socket.AF_INET, socket.SOCK_DGRAM), "localhost" + ) as client: + client.do_handshake(address) + sent = client.sendto(secret, address) + assert sent == len(secret) + + data, addr = client.recvfrom(1024) + assert addr == address + assert data == secret + + @pytest.mark.usefixtures("server") + @pytest.mark.timeout(30) + def test_raw_socket_sendto_recvfrom_with_flags(self, port: int) -> None: + # Note that flags is always 0 (noop) here because we are only + # interested in testing the API. See also issue #62. + address = ("127.0.0.1", port) + root_crt, root_key = make_root_ca() + ee_crt, ee_key = make_crt(root_crt, root_key) + chain = (ee_crt, root_crt), ee_key + secret = b"a very secret message" + with ClientContext( + DTLSConfiguration( + trust_store=make_trust_store(chain), + validate_certificates=False, + ), + ).wrap_socket( + socket.socket(socket.AF_INET, socket.SOCK_DGRAM), "localhost" + ) as client: + client.do_handshake(0, address) + sent = client.sendto(secret, 0, address) + assert sent == len(secret) + + data, addr = client.recvfrom(1024, 0) + assert addr == address + assert data == secret + + @pytest.mark.usefixtures("server") + @pytest.mark.timeout(30) + def test_raw_socket_sendto_recvfrom_into( + self, port: int, certificate: Tuple[CRT, _Key] + ) -> None: + address = ("127.0.0.1", port) + root_crt, root_key = certificate + ee_crt, ee_key = make_crt(root_crt, root_key) + chain = (ee_crt, root_crt), ee_key + secret = b"a very secret message" + buffer = bytearray(b"\0" * 256) + with ClientContext( + DTLSConfiguration( + trust_store=make_trust_store(chain), + validate_certificates=False, + ), + ).wrap_socket( + socket.socket(socket.AF_INET, socket.SOCK_DGRAM), "localhost" + ) as client: + client.do_handshake(address) + sent = client.sendto(secret, address) + assert sent == len(secret) + + received, addr = client.recvfrom_into(buffer, 1024) + assert addr == address + assert buffer[:received] == secret