Skip to content

Commit

Permalink
Merge pull request #125 from mkoura/parallel_execution
Browse files Browse the repository at this point in the history
New parallel execution; allure reports
  • Loading branch information
mkoura committed Sep 17, 2020
2 parents 939f5f4 + 5622af1 commit 8538c7d
Show file tree
Hide file tree
Showing 10 changed files with 779 additions and 644 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ install:
.dirs:
mkdir -p .artifacts .cli_coverage .reports

# run all tests
# run all tests, generate allure report
tests: .dirs
pytest cardano_node_tests -n 8 --artifacts-base-dir=.artifacts/ --cli-coverage-dir=.cli_coverage/ --html=.reports/report.html
pytest cardano_node_tests -n 8 --dist loadscope --artifacts-base-dir=.artifacts/ --cli-coverage-dir=.cli_coverage/ --alluredir=.reports/

lint:
pre-commit run -a
263 changes: 67 additions & 196 deletions cardano_node_tests/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,12 @@
import pytest
from _pytest.fixtures import FixtureRequest
from _pytest.tmpdir import TempdirFactory
from filelock import FileLock

from cardano_node_tests.utils import clusterlib
from cardano_node_tests.utils import helpers
from cardano_node_tests.utils import parallel_run

LOGGER = logging.getLogger(__name__)
LOCK_FILE = "pytest_cluster.lock"
RUNNING_FILE = ".cluster_running"
MARK_FIRST_STARTED = ".mark_first_started"


def pytest_addoption(parser: Any) -> None:
Expand All @@ -35,107 +32,13 @@ def pytest_addoption(parser: Any) -> None:
)


def pytest_html_report_title(report: Any) -> None:
cardano_version = helpers.get_cardano_version()
report.title = (
f"cardano-node {cardano_version['cardano-node']} (git rev {cardano_version['git_rev']})"
)


def pytest_configure(config: Any) -> None:
cardano_version = helpers.get_cardano_version()
config._metadata["cardano-node"] = cardano_version["cardano-node"]
config._metadata["cardano-node rev"] = cardano_version["git_rev"]
config._metadata["ghc"] = cardano_version["ghc"]


def _fresh_cluster(
change_dir: Path,
cluster_logs_session: Generator,
tmp_path_factory: TempdirFactory,
worker_id: str,
request: FixtureRequest,
) -> Generator[clusterlib.ClusterLib, None, None]:
"""Run fresh cluster for each test marked with the "first" marker."""
# pylint: disable=unused-argument
pytest_tmp_dir = Path(tmp_path_factory.getbasetemp())

# not executing with multiple workers
if not worker_id or worker_id == "master":
helpers.stop_cluster()
cluster_obj = helpers.start_cluster()
tmp_path = Path(tmp_path_factory.mktemp("addrs_data"))
helpers.setup_test_addrs(cluster_obj, tmp_path)

try:
yield cluster_obj
finally:
# save CLI coverage
helpers.save_cli_coverage(cluster_obj, request)
# stop the cluster
helpers.stop_cluster()
# save artifacts
helpers.save_cluster_artifacts(artifacts_dir=pytest_tmp_dir)
return

# executing in parallel with multiple workers
lock_dir = pytest_tmp_dir.parent
# as the tests marked with "first" need fresh cluster, no other tests on
# any other worker can run until the ones that requested this fixture are
# finished
with FileLock(f"{lock_dir}/.fresh_{LOCK_FILE}"):
# indicate that tests marked as "first" were launched on this worker
open(lock_dir / MARK_FIRST_STARTED, "a").close()
# indicate that a tests marked as "first" is running on this worker
open(lock_dir / f"{RUNNING_FILE}_fresh_{worker_id}", "a").close()

# start and setup the cluster
helpers.stop_cluster()
cluster_obj = helpers.start_cluster()
tmp_path = Path(tmp_path_factory.mktemp("addrs_data"))
helpers.setup_test_addrs(cluster_obj, tmp_path)
try:
yield cluster_obj
finally:
# indicate the tests are no longer running on this worker
os.remove(lock_dir / f"{RUNNING_FILE}_fresh_{worker_id}")
# save CLI coverage
helpers.save_cli_coverage(cluster_obj, request)
# stop the cluster
helpers.stop_cluster()
# save artifacts
helpers.save_cluster_artifacts(artifacts_dir=pytest_tmp_dir)


def _wait_for_fresh(lock_dir: Path) -> None:
"""Wait until all tests marked as "first" are finished."""
while True:
# if the status files exists, the tests are still in progress
if list(lock_dir.glob(f"{RUNNING_FILE}_fresh_*")):
helpers.wait_for(
lambda: not list(lock_dir.glob(f"{RUNNING_FILE}_fresh_*")),
delay=5,
num_sec=20_000,
)
# other session-scoped cluster tests are already running, i.e. the "first"
# tests are already finished
elif list(lock_dir.glob(f"{RUNNING_FILE}_session_*")):
break
# wait for a while to see if new status files are created
else:
still_running = helpers.wait_for(
lambda: list(lock_dir.glob(f"{RUNNING_FILE}_fresh_*")),
delay=2,
num_sec=10,
silent=True,
)
if not still_running:
break


# session scoped fixtures


@pytest.fixture(scope="session")
def change_dir(tmp_path_factory: TempdirFactory) -> None:
"""Change CWD to temp directory before running tests."""
Expand All @@ -144,124 +47,92 @@ def change_dir(tmp_path_factory: TempdirFactory) -> None:
LOGGER.info(f"Changed CWD to '{tmp_path}'.")


@pytest.yield_fixture(scope="session")
def cluster_logs_session(
tmp_path_factory: TempdirFactory, worker_id: str, request: FixtureRequest
) -> Generator:
pytest_tmp_dir = Path(tmp_path_factory.getbasetemp())

try:
yield
finally:
process_artifacts = True # avoid return in finally

# process artifacts if this is a last worker that was running tests
if worker_id and worker_id != "master":
lock_dir = pytest_tmp_dir = pytest_tmp_dir.parent
with FileLock(f"{lock_dir}/.session_stop_{LOCK_FILE}"):
if list(lock_dir.glob(f"{RUNNING_FILE}_session_*")):
process_artifacts = False

if process_artifacts:
helpers.process_artifacts(pytest_tmp_dir=pytest_tmp_dir, request=request)
def _run_cluster_cleanup(
tmp_path_factory: TempdirFactory, worker_id: str, request: FixtureRequest, pytest_tmp_dir: Path
) -> None:
cluster_manager_obj = parallel_run.ClusterManager(
tmp_path_factory=tmp_path_factory, worker_id=worker_id, request=request
)
cluster_manager_obj._log("running cluster cleanup")
# stop cluster
cluster_manager_obj.stop()
# process artifacts
helpers.process_artifacts(pytest_tmp_dir=pytest_tmp_dir, request=request)


@pytest.yield_fixture(scope="session")
def cluster_session(
change_dir: Path,
cluster_logs_session: Generator,
tmp_path_factory: TempdirFactory,
worker_id: str,
request: FixtureRequest,
) -> Generator[clusterlib.ClusterLib, None, None]:
# pylint: disable=unused-argument
def cluster_cleanup(
tmp_path_factory: TempdirFactory, worker_id: str, request: FixtureRequest
) -> Generator:
pytest_tmp_dir = Path(tmp_path_factory.getbasetemp())

# not executing with multiple workers
if not worker_id or worker_id == "master":
helpers.stop_cluster()
cluster_obj = helpers.start_cluster()
tmp_path = Path(tmp_path_factory.mktemp("addrs_data"))
helpers.setup_test_addrs(cluster_obj, tmp_path)

try:
yield cluster_obj
finally:
# save CLI coverage
helpers.save_cli_coverage(cluster_obj, request)
# stop the cluster
helpers.stop_cluster()
# save artifacts
helpers.save_cluster_artifacts(artifacts_dir=pytest_tmp_dir)
yield
_run_cluster_cleanup(
tmp_path_factory=tmp_path_factory,
worker_id=worker_id,
request=request,
pytest_tmp_dir=pytest_tmp_dir,
)
return

# executing in parallel with multiple workers
lock_dir = pytest_tmp_dir.parent
mark_first_started = lock_dir / MARK_FIRST_STARTED
# make sure this code is executed on single worker at a time
with FileLock(f"{lock_dir}/.session_{LOCK_FILE}"):
# make sure tests marked as "first" had enought time to start
if not mark_first_started.exists():
helpers.wait_for(mark_first_started.exists, delay=2, num_sec=10, silent=True)

# wait until all tests marked as "first" are finished
_wait_for_fresh(lock_dir)

if list(lock_dir.glob(f"{RUNNING_FILE}_session_*")):
# indicate that tests are running on this worker
open(lock_dir / f"{RUNNING_FILE}_session_{worker_id}", "a").close()
# reuse existing cluster
cluster_env = helpers.get_cluster_env()
cluster_obj = clusterlib.ClusterLib(cluster_env["state_dir"])
else:
# indicate that tests are running on this worker
open(lock_dir / f"{RUNNING_FILE}_session_{worker_id}", "a").close()
# start and setup the cluster
helpers.stop_cluster()
cluster_obj = helpers.start_cluster()
tmp_path = Path(tmp_path_factory.mktemp("addrs_data"))
helpers.setup_test_addrs(cluster_obj, tmp_path)
lock_dir = pytest_tmp_dir = pytest_tmp_dir.parent

try:
yield cluster_obj
finally:
with FileLock(f"{lock_dir}/.session_stop_{LOCK_FILE}"):
# indicate that tests are no longer running on this worker
os.remove(lock_dir / f"{RUNNING_FILE}_session_{worker_id}")
# save CLI coverage
helpers.save_cli_coverage(cluster_obj, request)
# stop cluster if this is a last worker that was running tests
if not list(lock_dir.glob(f"{RUNNING_FILE}_session_*")):
helpers.stop_cluster()
# save artifacts
helpers.save_cluster_artifacts(artifacts_dir=pytest_tmp_dir)
with helpers.FileLockIfXdist(f"{lock_dir}/.cluster_cleanup.lock"):
open(lock_dir / f".started_session_{worker_id}", "a").close()

yield

@pytest.fixture(scope="session")
def addrs_data_session(cluster_session: clusterlib.ClusterLib) -> dict:
# pylint: disable=unused-argument
return helpers.load_addrs_data()

with helpers.FileLockIfXdist(f"{lock_dir}/.cluster_cleanup.lock"):
os.remove(lock_dir / f".started_session_{worker_id}")
if lock_dir.glob(".started_session_*"):
return

# class scoped fixtures
with helpers.FileLockIfXdist(f"{lock_dir}/{parallel_run.CLUSTER_LOCK}"):
_run_cluster_cleanup(
tmp_path_factory=tmp_path_factory,
worker_id=worker_id,
request=request,
pytest_tmp_dir=pytest_tmp_dir,
)


cluster_class = pytest.yield_fixture(_fresh_cluster, scope="class")
@pytest.fixture(scope="session", autouse=True)
def session_autouse(change_dir: Any, cluster_cleanup: Any) -> None:
"""Autouse required session fixtures."""
# pylint: disable=unused-argument,unnecessary-pass
pass


@pytest.fixture(scope="class")
def addrs_data_class(cluster_class: clusterlib.ClusterLib) -> dict:
# pylint: disable=unused-argument
return helpers.load_addrs_data()
@pytest.yield_fixture
def cluster_manager(
tmp_path_factory: TempdirFactory,
worker_id: str,
request: FixtureRequest,
) -> Generator[parallel_run.ClusterManager, None, None]:
cluster_manager_obj = parallel_run.ClusterManager(
tmp_path_factory=tmp_path_factory, worker_id=worker_id, request=request
)
yield cluster_manager_obj
cluster_manager_obj.on_test_stop()


# function scoped fixtures
@pytest.fixture
def cluster(
cluster_manager: parallel_run.ClusterManager,
) -> clusterlib.ClusterLib:
return cluster_manager.get()


cluster_func = pytest.yield_fixture(_fresh_cluster)
@pytest.fixture
def cluster_singleton(
cluster_manager: parallel_run.ClusterManager,
) -> clusterlib.ClusterLib:
return cluster_manager.get(singleton=True)


@pytest.fixture
def addrs_data_func(cluster_func: clusterlib.ClusterLib) -> dict:
# pylint: disable=unused-argument
return helpers.load_addrs_data()
def cluster_singleton_clean(
cluster_manager: parallel_run.ClusterManager,
) -> clusterlib.ClusterLib:
return cluster_manager.get(singleton=True, cleanup=True)

0 comments on commit 8538c7d

Please sign in to comment.