Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions .github/workflows/testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
- name: Fetch tags
run: git fetch --tags --prune --unshallow
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- uses: shogo82148/actions-setup-redis@v1
Expand All @@ -36,14 +36,8 @@ jobs:
run: |
# sudo apt install redis

pushd ..
git clone https://github.com/bitnami/containers.git
cd containers/bitnami/openldap/2.6/debian-12
docker build -t bitnami/openldap:latest .
popd

# Start LDAP
source continuous_integration/scripts/start_LDAP.sh
bash continuous_integration/scripts/start_LDAP.sh

# These packages are installed in the base environment but may be older
# versions. Explicitly upgrade them because they often create
Expand All @@ -70,6 +64,19 @@ jobs:

pip list
- name: Test with pytest
env:
PYTEST_ADDOPTS: "--durations=20"
run: |
coverage run -m pytest -vv
coverage report -m
- name: Dump LDAP diagnostics on failure
if: failure()
run: |
docker ps
docker compose -f continuous_integration/docker-configs/ldap-docker-compose.yml ps
LDAP_CONTAINER_ID=$(docker compose -f continuous_integration/docker-configs/ldap-docker-compose.yml ps -q openldap | tr -d '[:space:]')
if [ -n "$LDAP_CONTAINER_ID" ]; then
docker logs --tail 200 "$LDAP_CONTAINER_ID"
else
docker compose -f continuous_integration/docker-configs/ldap-docker-compose.yml logs --tail 200 openldap
fi
23 changes: 20 additions & 3 deletions bluesky_httpserver/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pytest
import requests
from bluesky_queueserver.manager.comms import zmq_single_request
from bluesky_queueserver.manager.tests.common import re_manager_cmd # noqa: F401
from bluesky_queueserver.manager.tests.common import set_qserver_zmq_encoding # noqa: F401
from xprocess import ProcessStarter

Expand Down Expand Up @@ -60,7 +61,11 @@ def fastapi_server_fs(xprocess):
to perform additional steps (such as setting environmental variables) before the server is started.
"""

def start(http_server_host=SERVER_ADDRESS, http_server_port=SERVER_PORT, api_key=API_KEY_FOR_TESTS):
def start(
http_server_host=SERVER_ADDRESS,
http_server_port=SERVER_PORT,
api_key=API_KEY_FOR_TESTS,
):
class Starter(ProcessStarter):
max_read_lines = 53

Expand Down Expand Up @@ -112,15 +117,27 @@ def add_plans_to_queue():

user_group = _user_group
user = "HTTP unit test setup"
plan1 = {"name": "count", "args": [["det1", "det2"]], "kwargs": {"num": 10, "delay": 1}, "item_type": "plan"}
plan1 = {
"name": "count",
"args": [["det1", "det2"]],
"kwargs": {"num": 10, "delay": 1},
"item_type": "plan",
}
plan2 = {"name": "count", "args": [["det1", "det2"]], "item_type": "plan"}
for plan in (plan1, plan2, plan2):
resp2, _ = zmq_single_request("queue_item_add", {"item": plan, "user": user, "user_group": user_group})
assert resp2["success"] is True, str(resp2)


def request_to_json(
request_type, path, *, request_prefix="/api", api_key=API_KEY_FOR_TESTS, token=None, login=None, **kwargs
request_type,
path,
*,
request_prefix="/api",
api_key=API_KEY_FOR_TESTS,
token=None,
login=None,
**kwargs,
):
if login:
auth = None
Expand Down
29 changes: 19 additions & 10 deletions bluesky_httpserver/tests/test_authenticators.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import os
import time
from typing import Any, Tuple

Expand All @@ -10,20 +11,28 @@
from respx import MockRouter
from starlette.datastructures import URL, QueryParams

# fmt: off
from ..authenticators import LDAPAuthenticator, OIDCAuthenticator, ProxiedOIDCAuthenticator, UserSessionState

LDAP_TEST_HOST = os.environ.get("QSERVER_TEST_LDAP_HOST", "localhost")
LDAP_TEST_PORT = int(os.environ.get("QSERVER_TEST_LDAP_PORT", "1389"))
LDAP_TEST_ALT_HOST = os.environ.get("QSERVER_TEST_LDAP_ALT_HOST")
if not LDAP_TEST_ALT_HOST:
LDAP_TEST_ALT_HOST = "127.0.0.1" if LDAP_TEST_HOST == "localhost" else LDAP_TEST_HOST


# fmt: off


@pytest.mark.parametrize("ldap_server_address, ldap_server_port", [
("localhost", 1389),
("localhost:1389", 904), # Random port, ignored
("localhost:1389", None),
("127.0.0.1", 1389),
("127.0.0.1:1389", 904),
(["localhost"], 1389),
(["localhost", "127.0.0.1"], 1389),
(["localhost", "127.0.0.1:1389"], 1389),
(["localhost:1389", "127.0.0.1:1389"], None),
(LDAP_TEST_HOST, LDAP_TEST_PORT),
(f"{LDAP_TEST_HOST}:{LDAP_TEST_PORT}", 904), # Random port, ignored
(f"{LDAP_TEST_HOST}:{LDAP_TEST_PORT}", None),
(LDAP_TEST_ALT_HOST, LDAP_TEST_PORT),
(f"{LDAP_TEST_ALT_HOST}:{LDAP_TEST_PORT}", 904),
([LDAP_TEST_HOST], LDAP_TEST_PORT),
([LDAP_TEST_HOST, LDAP_TEST_ALT_HOST], LDAP_TEST_PORT),
([LDAP_TEST_HOST, f"{LDAP_TEST_ALT_HOST}:{LDAP_TEST_PORT}"], LDAP_TEST_PORT),
([f"{LDAP_TEST_HOST}:{LDAP_TEST_PORT}", f"{LDAP_TEST_ALT_HOST}:{LDAP_TEST_PORT}"], None),
])
# fmt: on
@pytest.mark.parametrize("use_tls,use_ssl", [(False, False)])
Expand Down
81 changes: 50 additions & 31 deletions bluesky_httpserver/tests/test_console_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,16 @@
import re
import threading
import time as ttime
from typing import Any

import pytest
import requests
from bluesky_queueserver.manager.tests.common import re_manager_cmd # noqa F401
from websockets.sync.client import connect

from bluesky_httpserver.tests.conftest import ( # noqa F401
API_KEY_FOR_TESTS,
SERVER_ADDRESS,
SERVER_PORT,
fastapi_server_fs,
request_to_json,
set_qserver_zmq_encoding,
wait_for_environment_to_be_closed,
Expand All @@ -36,37 +35,42 @@ def __init__(self, api_key=API_KEY_FOR_TESTS, **kwargs):
self._api_key = api_key

def run(self):
kwargs = {"stream": True}
kwargs: dict[str, Any] = {"stream": True}
if self._api_key:
auth = None
headers = {"Authorization": f"ApiKey {self._api_key}"}
kwargs.update({"auth": auth, "headers": headers})
kwargs.update({"headers": headers})

kwargs["timeout"] = (5, 1)

with requests.get(f"http://{SERVER_ADDRESS}:{SERVER_PORT}/api/stream_console_output", **kwargs) as r:
r.encoding = "utf-8"
while not self._exit:
try:
with requests.get(
f"http://{SERVER_ADDRESS}:{SERVER_PORT}/api/stream_console_output",
**kwargs,
) as r:
r.encoding = "utf-8"

characters = []
n_brackets = 0
characters = []
n_brackets = 0

for ch in r.iter_content(decode_unicode=True):
# Note, that some output must be received from the server before the loop exits
if self._exit:
break
for ch in r.iter_content(decode_unicode=True):
if self._exit:
return

characters.append(ch)
if ch == "{":
n_brackets += 1
elif ch == "}":
n_brackets -= 1
characters.append(ch)
if ch == "{":
n_brackets += 1
elif ch == "}":
n_brackets -= 1

# If the received buffer ('characters') is not empty and the message contains
# equal number of opening and closing brackets then consider the message complete.
if characters and not n_brackets:
line = "".join(characters)
characters = []
if characters and not n_brackets:
line = "".join(characters)
characters = []

print(f"{line}")
self.received_data_buffer.append(json.loads(line))
print(f"{line}")
self.received_data_buffer.append(json.loads(line))
except requests.exceptions.ReadTimeout:
continue

def stop(self):
"""
Expand All @@ -81,7 +85,10 @@ def __del__(self):

@pytest.mark.parametrize("zmq_port", (None, 60619))
def test_http_server_stream_console_output_1(
monkeypatch, re_manager_cmd, fastapi_server_fs, zmq_port # noqa F811
monkeypatch,
re_manager_cmd,
fastapi_server_fs,
zmq_port, # noqa F811
):
"""
Test for ``stream_console_output`` API
Expand Down Expand Up @@ -122,7 +129,8 @@ def test_http_server_stream_console_output_1(
assert resp2["items"][0] == resp1["item"]
assert resp2["running_item"] == {}

rsc.join()
rsc.join(timeout=10)
assert not rsc.is_alive(), "Timed out waiting for stream_console_output thread to terminate"

assert len(rsc.received_data_buffer) >= 2, pprint.pformat(rsc.received_data_buffer)

Expand Down Expand Up @@ -160,7 +168,11 @@ def test_http_server_stream_console_output_1(
@pytest.mark.parametrize("zmq_encoding", (None, "json", "msgpack"))
@pytest.mark.parametrize("zmq_port", (None, 60619))
def test_http_server_console_output_1(
monkeypatch, re_manager_cmd, fastapi_server_fs, zmq_port, zmq_encoding # noqa F811
monkeypatch,
re_manager_cmd,
fastapi_server_fs,
zmq_port,
zmq_encoding, # noqa F811
):
"""
Test for ``console_output`` API (not a streaming version).
Expand Down Expand Up @@ -238,7 +250,10 @@ def test_http_server_console_output_1(

@pytest.mark.parametrize("zmq_port", (None, 60619))
def test_http_server_console_output_update_1(
monkeypatch, re_manager_cmd, fastapi_server_fs, zmq_port # noqa F811
monkeypatch,
re_manager_cmd,
fastapi_server_fs,
zmq_port, # noqa F811
):
"""
Test for ``console_output`` API (not a streaming version).
Expand Down Expand Up @@ -379,7 +394,10 @@ def __del__(self):

@pytest.mark.parametrize("zmq_port", (None, 60619))
def test_http_server_console_output_socket_1(
monkeypatch, re_manager_cmd, fastapi_server_fs, zmq_port # noqa F811
monkeypatch,
re_manager_cmd,
fastapi_server_fs,
zmq_port, # noqa F811
):
"""
Test for ``/console_output/ws`` websocket
Expand Down Expand Up @@ -421,7 +439,8 @@ def test_http_server_console_output_socket_1(
assert resp2["items"][0] == resp1["item"]
assert resp2["running_item"] == {}

rsc.join()
rsc.join(timeout=10)
assert not rsc.is_alive(), "Timed out waiting for console_output websocket thread to terminate"

assert len(rsc.received_data_buffer) >= 2, pprint.pformat(rsc.received_data_buffer)

Expand Down
44 changes: 36 additions & 8 deletions bluesky_httpserver/tests/test_core_api_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,17 @@

# Plans used in most of the tests: '_plan1' and '_plan2' are quickly executed '_plan3' runs for 5 seconds.
_plan1 = {"name": "count", "args": [["det1", "det2"]], "item_type": "plan"}
_plan2 = {"name": "scan", "args": [["det1", "det2"], "motor", -1, 1, 10], "item_type": "plan"}
_plan3 = {"name": "count", "args": [["det1", "det2"]], "kwargs": {"num": 5, "delay": 1}, "item_type": "plan"}
_plan2 = {
"name": "scan",
"args": [["det1", "det2"], "motor", -1, 1, 10],
"item_type": "plan",
}
_plan3 = {
"name": "count",
"args": [["det1", "det2"]],
"kwargs": {"num": 5, "delay": 1},
"item_type": "plan",
}
_instruction_stop = {"name": "queue_stop", "item_type": "instruction"}


Expand Down Expand Up @@ -515,8 +524,10 @@ def test_http_server_queue_item_update_2_fail(re_manager, fastapi_server, replac

resp2 = request_to_json("post", "/queue/item/update", json=params)
assert resp2["success"] is False
assert resp2["msg"] == "Failed to add an item: Failed to replace item: " \
"Item with UID 'incorrect_uid' is not in the queue"
assert (
resp2["msg"] == "Failed to add an item: Failed to replace item: "
"Item with UID 'incorrect_uid' is not in the queue"
)

resp3 = request_to_json("get", "/queue/get")
assert resp3["items"] != []
Expand Down Expand Up @@ -1286,16 +1297,33 @@ def test_http_server_history_clear(re_manager, fastapi_server, clear_params, exp


def test_http_server_manager_kill(re_manager, fastapi_server): # noqa F811
timeout_variants = (
"Request timeout: ZMQ communication error: timeout occurred",
"Request timeout: ZMQ communication error: Resource temporarily unavailable",
)

request_to_json("post", "/environment/open")
assert wait_for_environment_to_be_created(10), "Timeout"

resp = request_to_json("post", "/test/manager/kill")
assert "success" not in resp
assert "Request timeout: ZMQ communication error: timeout occurred" in resp["detail"]

ttime.sleep(10)
assert any(_ in resp["detail"] for _ in timeout_variants)

deadline = ttime.time() + 20
last_status = None
while ttime.time() < deadline:
ttime.sleep(0.2)
last_status = request_to_json("get", "/status")
if (
isinstance(last_status, dict)
and last_status.get("manager_state") == "idle"
and last_status.get("worker_environment_exists") is True
):
break
else:
assert False, f"Timeout while waiting for manager recovery after kill. Last status: {last_status!r}"

resp = request_to_json("get", "/status")
resp = last_status
assert resp["msg"].startswith("RE Manager")
assert resp["manager_state"] == "idle"
assert resp["items_in_queue"] == 0
Expand Down
Loading
Loading