From 91e789ef56305c4d202e6217d18bcc3cd8e6484a Mon Sep 17 00:00:00 2001 From: David Butenhof Date: Fri, 8 Mar 2024 18:05:12 -0500 Subject: [PATCH 1/7] An assortment of Pbench Ops fixes and fun This fixes several issues observed during ops review: 1. The `/api/v1/endpoints` API fails if the server is shut down 2. `tar` unpack errors can result in enormous `stderr` output, which is captured in the `Audit` log; truncate it to 5Mb 3. Change the `pbench-audit` utility to use `dateutil.parser` instead of `click.DateTime()` so we can include fractional seconds and timezone. During the time when we broke PostgreSQL, we failed to create metadata for a number of datasets that were allowed to upload. (Whether we should allow this vs failing the upload is a separate issue.) We have want to repair the excessively large `Audit` attributes records. So I took a stab at some wondrous and magical SQL queries and hackery to begin a new `pbench-repair` utility. Right now, it repairs long audit attributes "intelligently" by trimming individual JSON key values; and it add metadata to datasets which lack critical values. Currently, this includes `server.tarball-path` (which we need to enable TOC and visualization), `dataset.metalog` (capturing the tarball `metadata.log` file), and `server.benchmark` for visualization. There are other `server` namespace values (including expiration time) that could be repaired: I decided not to worry about that as we're not doing expiration anyway. (Though I might add it over the weekend, since it shouldn't be hard.) And there are probably other things we might want to repair in the future using this framework. I tested this in a `runlocal` container, using `psql` to "break" datasets and repair them. I hacked the local `repair.py` with a low "max error" limit to force truncation of audit attributes: ``` pbench-repair --detail --errors --verify --progress 10 (22:52:08) Repairing audit || 60:FAILURE upload fio_rw_2018.02.01T22.40.57 [message] truncated (107) to 105 || 116:SUCCESS apikey None [key] truncated (197) to 105 22 audit records had attributes too long 2 records were fixed (22:52:08) Repairing metadata || fio_rw_2018.02.01T22.40.57 has no server.tarball-path: setting /srv/pbench/archive/fs-version-001/dhcp31-45.perf.lab.eng.bos.redhat.com/08516cc7448035be2cc502f0517783fa/fio_rw_2018.02.01T22.40.57.tar.xz || fio_rw_2018.02.01T22.40.57 has no metalog: setting from metadata.log || fio_rw_2018.02.01T22.40.57 has no server.benchmark: setting 'fio' || pbench-user-benchmark_example-vmstat_2018.10.24T14.38.18 has no server.tarball-path: setting /srv/pbench/archive/fs-version-001/ansible-host/45f0e2af41977b89e07bae4303dc9972/pbench-user-benchmark_example-vmstat_2018.10.24T14.38.18.tar.xz || pbench-user-benchmark_example-vmstat_2018.10.24T14.38.18 has no metalog: setting from metadata.log || pbench-user-benchmark_example-vmstat_2018.10.24T14.38.18 has no server.benchmark: setting 'pbench-user-benchmark' 2 server.tarball-path repairs, 0 failures 2 dataset.metalog repairs, 0 failures 2 server.benchmark repairs ``` --- lib/pbench/cli/server/audit.py | 34 ++- lib/pbench/cli/server/repair.py | 289 ++++++++++++++++++ .../api/resources/endpoint_configure.py | 4 +- lib/pbench/server/cache_manager.py | 26 +- .../test/unit/server/test_cache_manager.py | 15 +- server/Makefile | 1 + setup.cfg | 3 + 7 files changed, 364 insertions(+), 8 deletions(-) create mode 100644 lib/pbench/cli/server/repair.py diff --git a/lib/pbench/cli/server/audit.py b/lib/pbench/cli/server/audit.py index 980f35b6d5..dc0bcb583b 100644 --- a/lib/pbench/cli/server/audit.py +++ b/lib/pbench/cli/server/audit.py @@ -1,8 +1,10 @@ from collections import defaultdict import datetime -from typing import Iterator, Optional +from typing import Any, Iterator, Optional import click +from click import Context, Parameter, ParamType +from dateutil import parser from pbench.cli import pass_cli_context from pbench.cli.server import config_setup, Verify @@ -28,6 +30,30 @@ verifier: Optional[Verify] = None +class DateParser(ParamType): + """The DateParser type converts date strings into `datetime` objects. + + This is a variant of click's built-in DateTime parser, but uses the + more flexible dateutil.parser + """ + + name = "dateparser" + + def convert( + self, value: Any, param: Optional[Parameter], ctx: Optional[Context] + ) -> Any: + if isinstance(value, datetime): + return value + + try: + return parser.parse(value) + except Exception as e: + self.fail(f"{value!r} cannot be converted to a datetime: {str(e)!r}") + + def __repr__(self) -> str: + return type(self).__name__ + + def auditor(kwargs) -> Iterator[str]: """Report audit records matching patterns. @@ -110,6 +136,7 @@ def auditor(kwargs) -> Iterator[str]: @click.command(name="pbench-audit") @pass_cli_context +@click.option("--id", type=int, help="Select by audit event ID") @click.option( "--ids", default=False, @@ -130,6 +157,7 @@ def auditor(kwargs) -> Iterator[str]: @click.option("--object-id", type=str, help="Select by object ID") @click.option("--object-name", type=str, help="Select by object name") @click.option("--page", default=False, is_flag=True, help="Paginate the output") +@click.option("--root-id", type=int, help="Select by audit event root ID") @click.option("--user-id", type=str, help="Select by user ID") @click.option("--user-name", type=str, help="Select by username") @click.option( @@ -142,12 +170,12 @@ def auditor(kwargs) -> Iterator[str]: ) @click.option( "--since", - type=click.DateTime(), + type=DateParser(), help="Select entries on or after specified date/time", ) @click.option( "--until", - type=click.DateTime(), + type=DateParser(), help="Select entries on or before specified date/time", ) @click.option( diff --git a/lib/pbench/cli/server/repair.py b/lib/pbench/cli/server/repair.py new file mode 100644 index 0000000000..4b4fdc4017 --- /dev/null +++ b/lib/pbench/cli/server/repair.py @@ -0,0 +1,289 @@ +from collections import defaultdict +import copy +from operator import and_ +from pathlib import Path +from typing import Any, Optional + +import click +from sqlalchemy import cast, column, or_, String, Subquery +from sqlalchemy.orm import aliased +from sqlalchemy.sql.expression import func + +from pbench.cli import pass_cli_context +from pbench.cli.server import config_setup, Detail, Verify, Watch +from pbench.cli.server.options import common_options +from pbench.common.logger import get_pbench_logger +from pbench.server import BadConfig, cache_manager +from pbench.server.database.database import Database +from pbench.server.database.models.audit import Audit +from pbench.server.database.models.datasets import Dataset, Metadata + +detailer: Optional[Detail] = None +watcher: Optional[Watch] = None +verifier: Optional[Verify] = None + + +LIMIT = cache_manager.MAX_ERROR + + +def repair_audit(kwargs): + """Repair certain audit record problems. + + 1. truncate extremely long audit event "attributes" + + Args: + kwargs: the command options. + """ + + rows = ( + Database.db_session.query(Audit) + .filter(func.length(cast(Audit.attributes, String)) >= LIMIT) + .order_by(Audit.timestamp) + .execution_options(stream_results=True) + .yield_per(2000) + ) + count = 0 + attributes_errors = 0 + updated = 0 + commit_error = None + for audit in rows: + count += 1 + name = f"{audit.id}:{audit.status.name} {audit.name} {audit.object_name}" + # Deep copy works around a limitation in SQLAlchemy JSON proxy handling + a: dict[str, Any] = copy.deepcopy(audit.attributes) + if type(a) is not dict: + detailer.error(f"{name} attributes is type {type(a).__name__}") + attributes_errors += 1 + continue + didit = False + for f in a: + value = a[f] + if type(value) is str and len(value) > LIMIT: + a[f] = value[:LIMIT] + detailer.message(f"{name} [{f}] truncated ({len(value)}) to {LIMIT}") + didit = True + if didit: + audit.attributes = a + updated += 1 + if updated: + try: + Database.db_session.commit() + except Exception as e: + commit_error = str(e) + click.echo(f"{count} audit records had attributes too long") + click.echo(f"{updated} records were fixed") + if attributes_errors: + click.echo(f"{attributes_errors} had format errors in attributes") + if commit_error: + click.echo(f"SQL error, changes were not made: {commit_error!r}") + + +def find_tarball(resource_id: str, kwargs) -> Optional[Path]: + """Find a tarball in the ARCHIVE tree + + This is a "brute force" helper to repair a missing server.tarball-path + metadata. We can't use the cache manager find_dataset(), which relies + on server.tarball-path; so instead we do a search for the MD5 value. + + Args: + resource_id: the dataset resource ID (MD5) + kwargs: CLI options + + Returns: + tarball path if found, or None + """ + + # We use the cache manager as a standard way to get the ARCHIVE root + tree = cache_manager.CacheManager(kwargs["_config"], kwargs["_logger"]) + for controller in tree.archive_root.iterdir(): + watcher.update(f"searching {controller} for {resource_id}") + if controller.is_dir(): + isolator = controller / resource_id + if isolator.is_dir(): + tars = list(isolator.glob("*.tar.xz")) + if len(tars) == 1: + return tars[0] + else: + for md5 in controller.glob("**/*.tar.xz.md5"): + id = md5.read_text().split(" ", maxsplit=1)[0] + if id == resource_id: + tar = md5.with_suffix("") + return tar + return None + + +def repair_metadata(kwargs): + """Repair certain critical metadata errors + + 1. Missing server.tarball-path + 2. Missing dataset.metalog + 3. Missing server.benchmark + + Args: + kwargs: the command options + """ + + # In order to filter on "derived" values like the JSON path extraction, + # we need to use a nested SELECT -- PostgreSQL doesn't allow using an + # "as" label in the WHERE clause, but labeling the nested SELECT columns + # gets around that limitation. We also do two OUTER JOINs in order to + # filter only for datasets where one of the three things we know how to + # repair is missing. + mlog = aliased(Metadata, name="metalog") + meta: Subquery = ( + Database.db_session.query( + Metadata.dataset_ref, + Metadata.value["tarball-path"].label("path"), + Metadata.value["benchmark"].label("benchmark"), + ) + .where(Metadata.key == "server") + .subquery() + ) + query = ( + Database.db_session.query(Dataset, meta.c.path, meta.c.benchmark, mlog) + .outerjoin(meta, Dataset.id == meta.c.dataset_ref) + .outerjoin( + mlog, and_(Dataset.id == mlog.dataset_ref, mlog.key == Metadata.METALOG) + ) + .filter( + or_( + meta.c.path.is_(None), + meta.c.benchmark.is_(None), + column("metalog").is_(None), + ) + ) + .execution_options(stream_results=True) + ) + + path_repairs = 0 + benchmark_repairs = 0 + metalog_repairs = 0 + path_repairs_failed = 0 + metalog_repairs_failed = 0 + rows = query.yield_per(2000) + + # We have to finish reading before we can write metadata, or we'll confuse + # SQLAlchemy's cursor logic. So build a defer queue for metadata values to + # set. (Another alternative would be to list-ify the query: it's unclear + # which would scale better.) + defer = [] + for dataset, path, benchmark, metadata in rows: + fix = {"dataset": dataset, "metadata": defaultdict()} + if not path: + path_repairs += 1 + path = find_tarball(dataset.resource_id, kwargs) + if path: + detailer.message( + f"{dataset.name} has no {Metadata.TARBALL_PATH}: setting {path}" + ) + fix["metadata"][Metadata.TARBALL_PATH] = str(path) + else: + path_repairs_failed += 1 + detailer.error(f"{dataset.name} doesn't seem to have a tarball") + if not metadata or not metadata.value: + metalog_repairs += 1 + which = "metadata.log" + try: + metalog = cache_manager.Tarball._get_metadata(path) + except Exception: + which = "default" + metalog = { + "pbench": { + "name": dataset.name, + "script": Metadata.SERVER_BENCHMARK_UNKNOWN, + } + } + fix["metadata"][Metadata.SERVER_ARCHIVE] = True + detailer.message( + f"{dataset.name} has no {Metadata.METALOG}: setting from {which}" + ) + fix["metalog"] = metalog + else: + metalog = metadata.value + if not benchmark: + benchmark_repairs += 1 + script = metalog.get("pbench", {}).get( + "script", Metadata.SERVER_BENCHMARK_UNKNOWN + ) + detailer.message( + f"{dataset.name} has no {Metadata.SERVER_BENCHMARK}: setting {script!r}" + ) + fix["metadata"][Metadata.SERVER_BENCHMARK] = script + defer.append(fix) + for each in defer: + dataset = each["dataset"] + metalog = each.get("metalog") + if metalog: + try: + Metadata.create(dataset=dataset, key=Metadata.METALOG, value=metalog) + except Exception as e: + metalog_repairs_failed += 1 + detailer.error(f"Unable to create {dataset.name} metalog: {str(e)!r}") + for key, value in each.get("metadata", {}).items(): + Metadata.setvalue(dataset, key, value) + + click.echo( + f"{path_repairs} {Metadata.TARBALL_PATH} repairs, " + f"{path_repairs_failed} failures" + ) + click.echo( + f"{metalog_repairs} dataset.metalog repairs, " + f"{metalog_repairs_failed} failures" + ) + click.echo(f"{benchmark_repairs} {Metadata.SERVER_BENCHMARK} repairs") + + +@click.command(name="pbench-repair") +@pass_cli_context +@click.option( + "--detail", + "-d", + default=False, + is_flag=True, + help="Provide extra diagnostic information", +) +@click.option( + "--errors", + "-e", + default=False, + is_flag=True, + help="Show individual file errors", +) +@click.option( + "--progress", "-p", type=float, default=0.0, help="Show periodic progress messages" +) +@click.option( + "--verify", "-v", default=False, is_flag=True, help="Display intermediate messages" +) +@common_options +def repair(context: object, **kwargs): + """Repair consistency problems in the Pbench Server data + \f + + Args: + context: click context + kwargs: click options + """ + global detailer, verifier, watcher + detailer = Detail(kwargs.get("detail"), kwargs.get("errors")) + verifier = Verify(kwargs.get("verify")) + watcher = Watch(kwargs.get("progress")) + + try: + config = config_setup(context) + kwargs["_logger"] = get_pbench_logger("pbench-repair", config) + kwargs["_config"] = config + verifier.status("Repairing audit") + watcher.update("repairing audit") + repair_audit(kwargs) + verifier.status("Repairing metadata") + watcher.update("repairing metadata") + repair_metadata(kwargs) + rv = 0 + except Exception as exc: + if verifier.verify: + raise + click.secho(exc, err=True, bg="red") + rv = 2 if isinstance(exc, BadConfig) else 1 + + click.get_current_context().exit(rv) diff --git a/lib/pbench/server/api/resources/endpoint_configure.py b/lib/pbench/server/api/resources/endpoint_configure.py index 7c65e9558c..2a53be28c5 100644 --- a/lib/pbench/server/api/resources/endpoint_configure.py +++ b/lib/pbench/server/api/resources/endpoint_configure.py @@ -44,7 +44,9 @@ def __init__(self, config: PbenchServerConfig): proxying was set up for the original endpoints query: e.g., the Javascript `window.origin` from which the Pbench dashboard was loaded. """ - super().__init__(config, ApiSchema(ApiMethod.GET, OperationCode.READ)) + super().__init__( + config, ApiSchema(ApiMethod.GET, OperationCode.READ), always_enabled=True + ) self.server_config = config def _get(self, args: ApiParams, req: Request, context: ApiContext) -> Response: diff --git a/lib/pbench/server/cache_manager.py b/lib/pbench/server/cache_manager.py index 0da346a3f1..e7604c7b65 100644 --- a/lib/pbench/server/cache_manager.py +++ b/lib/pbench/server/cache_manager.py @@ -4,6 +4,7 @@ from enum import auto, Enum import errno import fcntl +import logging from logging import Logger import math import os @@ -26,6 +27,7 @@ RECLAIM_BYTES_PAD = 1024 # Pad unpack reclaim requests by this much MB_BYTES = 1024 * 1024 # Bytes in Mb +MAX_ERROR = 1024 * 5 # Maximum length of subprocess stderr to capture class CacheManagerError(Exception): @@ -1024,9 +1026,31 @@ def subprocess_run( raise exception(ctx, str(exc)) from exc else: if process.returncode != 0: + + # tar failures can include a message for each file in the + # archive, and can be quite large. Break stderr into lines, and + # gather those lines only up to our configured size limit. + size = 0 + message = [] + log = logging.getLogger("sub") + log.info(process.stderr) + lines = process.stderr.split("\n") + for line in lines: + # Skip empty lines + if not line: + continue + size += len(line) + log.info(f"LINE {line!r} ({size} <= MAX {MAX_ERROR})") + if size > MAX_ERROR: + break + message.append(line) + + # If even the first line was too big, truncate it + msg = "\n".join(message) if message else lines[0][:MAX_ERROR] + log.info(f"MSG {msg!r}") raise exception( ctx, - f"{cmd[0]} exited with status {process.returncode}: {process.stderr.strip()!r}", + f"{cmd[0]} exited with status {process.returncode}: {msg!r}", ) def get_unpacked_size(self) -> int: diff --git a/lib/pbench/test/unit/server/test_cache_manager.py b/lib/pbench/test/unit/server/test_cache_manager.py index 146f29debf..d48dce84ab 100644 --- a/lib/pbench/test/unit/server/test_cache_manager.py +++ b/lib/pbench/test/unit/server/test_cache_manager.py @@ -402,15 +402,24 @@ def mock_run(args, **_kwargs): msg = f"An error occurred while unpacking {my_dir}: Command '{my_command}' timed out after 43 seconds" assert str(exc.value) == msg - def test_tarball_subprocess_run_with_returncode(self, monkeypatch): + @pytest.mark.parametrize( + "errmsg,expected", + ( + ("Error unpacking tarball\n", "Error unpacking tarball"), + ("This is a message we'll find too long", "This is a message we'll f"), + ("One line\nTwo line\nThree line\nFour line\n", "One line\nTwo line"), + ), + ) + def test_tarball_subprocess_failure(self, monkeypatch, errmsg, expected): """Test to check the subprocess_run functionality of the Tarball when returncode value is not zero""" my_command = "mycommand" + monkeypatch.setattr("pbench.server.cache_manager.MAX_ERROR", 25) def mock_run(args, **_kwargs): assert args[0] == my_command return subprocess.CompletedProcess( - args, returncode=1, stdout=None, stderr="Some error unpacking tarball\n" + args, returncode=1, stdout=None, stderr=errmsg ) with monkeypatch.context() as m: @@ -422,7 +431,7 @@ def mock_run(args, **_kwargs): Tarball.subprocess_run(command, my_dir, TarballUnpackError, my_dir) msg = f"An error occurred while unpacking {my_dir.name}: {my_command} " - msg += "exited with status 1: 'Some error unpacking tarball'" + msg += f"exited with status 1: {expected!r}" assert str(exc.value) == msg def test_tarball_subprocess_run_success(self, monkeypatch): diff --git a/server/Makefile b/server/Makefile index b8a0316dc1..89306786a5 100644 --- a/server/Makefile +++ b/server/Makefile @@ -22,6 +22,7 @@ INSTALLOPTS = --directory click-scripts = \ pbench-audit \ pbench-reindex \ + pbench-repair \ pbench-report-generator \ pbench-tree-manage \ pbench-user-create \ diff --git a/setup.cfg b/setup.cfg index 8be6d2b920..efe20dce85 100644 --- a/setup.cfg +++ b/setup.cfg @@ -29,6 +29,9 @@ console_scripts = pbench-clear-results = pbench.cli.agent.commands.results.clear:main pbench-clear-tools = pbench.cli.agent.commands.tools.clear:main pbench-config = pbench.cli.agent.commands.conf:main + pbench-repair = pbench.cli.server.repair:repair + pbench-report-generator = pbench.cli.server.report:report + pbench-tree-manage = pbench.cli.server.tree_manage:tree_manage pbench-is-local = pbench.cli.agent.commands.is_local:main pbench-list-tools = pbench.cli.agent.commands.tools.list:main pbench-list-triggers = pbench.cli.agent.commands.triggers.list:main From 74bedaf6c4343ecc8f6239fe7a98c96682727161 Mon Sep 17 00:00:00 2001 From: David Butenhof Date: Fri, 8 Mar 2024 18:14:44 -0500 Subject: [PATCH 2/7] Well, that didn't go well. Take 2. --- lib/pbench/cli/server/audit.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/pbench/cli/server/audit.py b/lib/pbench/cli/server/audit.py index dc0bcb583b..c4df28f9b3 100644 --- a/lib/pbench/cli/server/audit.py +++ b/lib/pbench/cli/server/audit.py @@ -42,7 +42,7 @@ class DateParser(ParamType): def convert( self, value: Any, param: Optional[Parameter], ctx: Optional[Context] ) -> Any: - if isinstance(value, datetime): + if isinstance(value, datetime.datetime): return value try: From f157f2cc519ea195a8022c7a63926f7f1b354167 Mon Sep 17 00:00:00 2001 From: David Butenhof Date: Sat, 9 Mar 2024 10:23:44 -0500 Subject: [PATCH 3/7] "Complete" This adds repair for `server.deletion` (expiration timestamp), completing the repair of the `server` namespace. In copying the setup from `intake_base.py` I realized that intake was technically incorrect (not that it really matters much as we don't, and likely won't, implement dataset expiration) in that it always uses the static lifetime setting from `pbench-config.cfg` rather than recognizing the dynamic server settings value. So I fixed that and made a common implementation. It's also been bothering me that, in the midst of our PostgreSQL problems, we allowed upload of datasets without metadata. I'd initially deliberatedly allowed this looking at the metadata as "extra" and figuring I didn't want to fail an upload just because of that. However, with recent optimizations, we really depend internally on `server.tarball-path` in particular: the new optimized `CacheManager.find_dataset` won't work without it. So failure in setting metadata on intake is now a fatal internal server error. --- lib/pbench/cli/server/repair.py | 35 +++++++++++++++++-- lib/pbench/server/api/resources/__init__.py | 7 ++-- .../server/api/resources/intake_base.py | 17 ++------- .../server/database/models/server_settings.py | 26 +++++++++++++- lib/pbench/test/unit/server/test_upload.py | 17 +++++---- 5 files changed, 77 insertions(+), 25 deletions(-) diff --git a/lib/pbench/cli/server/repair.py b/lib/pbench/cli/server/repair.py index 4b4fdc4017..e7e45d22a7 100644 --- a/lib/pbench/cli/server/repair.py +++ b/lib/pbench/cli/server/repair.py @@ -1,5 +1,6 @@ from collections import defaultdict import copy +import datetime from operator import and_ from pathlib import Path from typing import Any, Optional @@ -17,6 +18,8 @@ from pbench.server.database.database import Database from pbench.server.database.models.audit import Audit from pbench.server.database.models.datasets import Dataset, Metadata +from pbench.server.database.models.server_settings import get_retention_days +from pbench.server.utils import UtcTimeHelper detailer: Optional[Detail] = None watcher: Optional[Watch] = None @@ -135,12 +138,15 @@ def repair_metadata(kwargs): Metadata.dataset_ref, Metadata.value["tarball-path"].label("path"), Metadata.value["benchmark"].label("benchmark"), + Metadata.value["deletion"].label("expiration"), ) .where(Metadata.key == "server") .subquery() ) query = ( - Database.db_session.query(Dataset, meta.c.path, meta.c.benchmark, mlog) + Database.db_session.query( + Dataset, meta.c.path, meta.c.benchmark, meta.c.expiration, mlog + ) .outerjoin(meta, Dataset.id == meta.c.dataset_ref) .outerjoin( mlog, and_(Dataset.id == mlog.dataset_ref, mlog.key == Metadata.METALOG) @@ -149,6 +155,7 @@ def repair_metadata(kwargs): or_( meta.c.path.is_(None), meta.c.benchmark.is_(None), + meta.c.expiration.is_(None), column("metalog").is_(None), ) ) @@ -156,9 +163,11 @@ def repair_metadata(kwargs): ) path_repairs = 0 + expiration_repairs = 0 benchmark_repairs = 0 metalog_repairs = 0 path_repairs_failed = 0 + expiration_repairs_failed = 0 metalog_repairs_failed = 0 rows = query.yield_per(2000) @@ -167,7 +176,7 @@ def repair_metadata(kwargs): # set. (Another alternative would be to list-ify the query: it's unclear # which would scale better.) defer = [] - for dataset, path, benchmark, metadata in rows: + for dataset, path, benchmark, expiration, metadata in rows: fix = {"dataset": dataset, "metadata": defaultdict()} if not path: path_repairs += 1 @@ -200,6 +209,24 @@ def repair_metadata(kwargs): fix["metalog"] = metalog else: metalog = metadata.value + + if not expiration: + expiration_repairs += 1 + try: + retention_days = get_retention_days(kwargs.get("_config")) + retention = datetime.timedelta(days=retention_days) + deletion = UtcTimeHelper(dataset.uploaded + retention).to_iso_string() + fix["metadata"][Metadata.SERVER_DELETION] = deletion + detailer.message( + f"{dataset.name} {Metadata.SERVER_DELETION} " + f"set ({retention_days} days) to {deletion}" + ) + except Exception as e: + detailer.error( + f"unable to calculate {dataset.name} expiration: {str(e)!r}" + ) + expiration_repairs_failed += 1 + if not benchmark: benchmark_repairs += 1 script = metalog.get("pbench", {}).get( @@ -226,6 +253,10 @@ def repair_metadata(kwargs): f"{path_repairs} {Metadata.TARBALL_PATH} repairs, " f"{path_repairs_failed} failures" ) + click.echo( + f"{expiration_repairs} {Metadata.SERVER_DELETION} repairs, " + f"{expiration_repairs_failed} failures" + ) click.echo( f"{metalog_repairs} dataset.metalog repairs, " f"{metalog_repairs_failed} failures" diff --git a/lib/pbench/server/api/resources/__init__.py b/lib/pbench/server/api/resources/__init__.py index ccb59f7307..d74eb095d4 100644 --- a/lib/pbench/server/api/resources/__init__.py +++ b/lib/pbench/server/api/resources/__init__.py @@ -1811,18 +1811,19 @@ def _get_dataset_metadata(dataset: Dataset, requested_items: list[str]) -> JSON: @staticmethod def _set_dataset_metadata( - dataset: Dataset, metadata: dict[str, JSONVALUE] + dataset: Dataset, metadata: dict[str, JSONVALUE], throw: bool = False ) -> dict[str, str]: """Set metadata on a specific Dataset This supports strict Metadata key/value items associated with the Dataset as well as selected columns from the Dataset model. - Errors are collected and returned. + Errors are collected and returned if 'throw' is False (default) Args: dataset: Dataset object metadata: dict of key/value pairs + throw: propagate exceptions instead of returning failures Returns: A dict associating an error string with each failing metadata key. @@ -1837,6 +1838,8 @@ def _set_dataset_metadata( try: Metadata.setvalue(key=k, value=v, dataset=dataset, user=user) except MetadataError as e: + if throw: + raise fail[k] = str(e) return fail diff --git a/lib/pbench/server/api/resources/intake_base.py b/lib/pbench/server/api/resources/intake_base.py index ed428edea8..70fe81248f 100644 --- a/lib/pbench/server/api/resources/intake_base.py +++ b/lib/pbench/server/api/resources/intake_base.py @@ -47,10 +47,7 @@ OperationName, OperationState, ) -from pbench.server.database.models.server_settings import ( - OPTION_SERVER_INDEXING, - ServerSetting, -) +from pbench.server.database.models.server_settings import get_retention_days from pbench.server.sync import Sync from pbench.server.utils import UtcTimeHelper @@ -556,17 +553,11 @@ def _intake( f"Unable to create metalog for Tarball {dataset.name!r}: {exc}" ) from exc - try: - retention_days = self.config.default_retention_period - except Exception as e: - raise APIInternalError( - f"Unable to get integer retention days: {e!s}" - ) from e - # Calculate a default deletion time for the dataset, based on the # time it was uploaded rather than the time it was originally # created which might much earlier. try: + retention_days = get_retention_days(self.config) retention = datetime.timedelta(days=retention_days) deletion = dataset.uploaded + retention notes.append(f"Expected expiration date is {deletion:%Y-%m-%d}.") @@ -583,9 +574,7 @@ def _intake( Metadata.SERVER_BENCHMARK: benchmark, } ) - f = self._set_dataset_metadata(dataset, meta) - if f: - attributes["failures"] = f + self._set_dataset_metadata(dataset, meta, throw=True) except Exception as e: raise APIInternalError(f"Unable to set metadata: {e!s}") from e diff --git a/lib/pbench/server/database/models/server_settings.py b/lib/pbench/server/database/models/server_settings.py index d981ba3ee0..e453e46409 100644 --- a/lib/pbench/server/database/models/server_settings.py +++ b/lib/pbench/server/database/models/server_settings.py @@ -14,7 +14,7 @@ from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.sql.sqltypes import JSON -from pbench.server import JSONOBJECT, JSONVALUE +from pbench.server import JSONOBJECT, JSONVALUE, PbenchServerConfig from pbench.server.database.database import Database from pbench.server.database.models import decode_sql_error @@ -135,6 +135,30 @@ def validate_lifetime(key: str, value: JSONVALUE) -> JSONVALUE: ] +def get_retention_days(config: PbenchServerConfig) -> int: + """Get dataset lifetime + + This recognizes the server settings API as well as the static default if + for some reason we can't read the SQL database or the value we read can't + be converted to an integer. + + Args: + config: the Pbench Server configuration + + Returns: + integer days to expiration + """ + try: + retention_days = int(ServerSetting.get(OPTION_DATASET_LIFETIME)) + return retention_days + except Exception: + try: + retention_days = config.default_retention_period + except Exception: + retention_days = config.MAXIMUM_RETENTION_DAYS + return retention_days + + def validate_server_state(key: str, value: JSONVALUE) -> JSONVALUE: try: status = value[STATE_STATUS_KEY].lower() diff --git a/lib/pbench/test/unit/server/test_upload.py b/lib/pbench/test/unit/server/test_upload.py index fddee226c4..2465465dd7 100644 --- a/lib/pbench/test/unit/server/test_upload.py +++ b/lib/pbench/test/unit/server/test_upload.py @@ -848,8 +848,9 @@ def test_upload_metadata_error( ): """Test handling of post-intake error setting metadata - Cause Metadata.setvalue to fail. This should be reported in "failures" - without failing the upload. + Cause Metadata.setvalue to fail. This is an abnormal error (generally + a PostgreSQL server problem) and should result in a server internal + error. """ datafile, _, md5 = tarball @@ -867,12 +868,16 @@ def setvalue(dataset: Dataset, key: str, **_kwargs): headers=self.gen_headers(pbench_drb_token, md5), ) - assert response.status_code == HTTPStatus.CREATED + assert response.status_code == HTTPStatus.INTERNAL_SERVER_ERROR audit = Audit.query() assert len(audit) == 2 - fails = audit[1].attributes["failures"] - assert isinstance(fails, dict) - assert fails["server.benchmark"].startswith("Metadata SQL error 'fake': ") + with pytest.raises(DatasetNotFound): + Dataset.query(resource_id=md5) + assert ( + audit[1] + .attributes["message"] + .startswith("Internal Pbench Server Error: log reference ") + ) @pytest.mark.freeze_time("1970-01-01") def test_upload_archive( From 740312d438c35bb3a721a026427f25d31586e47f Mon Sep 17 00:00:00 2001 From: David Butenhof Date: Tue, 12 Mar 2024 13:58:08 -0400 Subject: [PATCH 4/7] Changes --- lib/pbench/cli/server/__init__.py | 35 ++++++++++++-- lib/pbench/cli/server/audit.py | 30 +----------- lib/pbench/cli/server/repair.py | 47 +++++++++++++++---- lib/pbench/server/cache_manager.py | 5 -- .../server/database/models/server_settings.py | 8 ++-- 5 files changed, 73 insertions(+), 52 deletions(-) diff --git a/lib/pbench/cli/server/__init__.py b/lib/pbench/cli/server/__init__.py index 108f8307ac..b8d913de4e 100644 --- a/lib/pbench/cli/server/__init__.py +++ b/lib/pbench/cli/server/__init__.py @@ -2,6 +2,7 @@ from threading import Thread import time from typing import Any, Optional +from typing import Any, Optional, Union import click from click import Context, Parameter, ParamType @@ -11,6 +12,27 @@ from pbench.server.database import init_db +class DateParser(ParamType): + """The DateParser type converts date strings into `datetime` objects. + + This is a variant of click's built-in DateTime parser, but uses the + more flexible dateutil.parser + """ + + name = "dateparser" + + def convert( + self, value: Any, param: Optional[Parameter], ctx: Optional[Context] + ) -> Any: + if isinstance(value, datetime.datetime): + return value + + try: + return parser.parse(value) + except Exception as e: + self.fail(f"{value!r} cannot be converted to a datetime: {str(e)!r}") + + class Detail: """Encapsulate generation of additional diagnostics""" @@ -63,13 +85,16 @@ def warning(self, message: str): class Verify: """Encapsulate -v status messages.""" - def __init__(self, verify: bool): + def __init__(self, verify: Union[bool, int]): """Initialize the object. Args: verify: True to write status messages. """ - self.verify = verify + if isinstance(verify, int): + self.verify = verify + else: + self.verify = 1 if verify else 0 def __bool__(self) -> bool: """Report whether verification is enabled. @@ -77,15 +102,15 @@ def __bool__(self) -> bool: Returns: True if verification is enabled. """ - return self.verify + return bool(self.verify) - def status(self, message: str): + def status(self, message: str, level: int = 1): """Write a message if verification is enabled. Args: message: status string """ - if self.verify: + if self.verify >= level: ts = datetime.datetime.now().astimezone() click.secho(f"({ts:%H:%M:%S}) {message}", fg="green", err=True) diff --git a/lib/pbench/cli/server/audit.py b/lib/pbench/cli/server/audit.py index c4df28f9b3..e385d2f177 100644 --- a/lib/pbench/cli/server/audit.py +++ b/lib/pbench/cli/server/audit.py @@ -1,13 +1,11 @@ from collections import defaultdict import datetime -from typing import Any, Iterator, Optional +from typing import Iterator, Optional import click -from click import Context, Parameter, ParamType -from dateutil import parser from pbench.cli import pass_cli_context -from pbench.cli.server import config_setup, Verify +from pbench.cli.server import config_setup, DateParser, Verify from pbench.cli.server.options import common_options from pbench.server import BadConfig, OperationCode from pbench.server.database.database import Database @@ -30,30 +28,6 @@ verifier: Optional[Verify] = None -class DateParser(ParamType): - """The DateParser type converts date strings into `datetime` objects. - - This is a variant of click's built-in DateTime parser, but uses the - more flexible dateutil.parser - """ - - name = "dateparser" - - def convert( - self, value: Any, param: Optional[Parameter], ctx: Optional[Context] - ) -> Any: - if isinstance(value, datetime.datetime): - return value - - try: - return parser.parse(value) - except Exception as e: - self.fail(f"{value!r} cannot be converted to a datetime: {str(e)!r}") - - def __repr__(self) -> str: - return type(self).__name__ - - def auditor(kwargs) -> Iterator[str]: """Report audit records matching patterns. diff --git a/lib/pbench/cli/server/repair.py b/lib/pbench/cli/server/repair.py index e7e45d22a7..4e35c94738 100644 --- a/lib/pbench/cli/server/repair.py +++ b/lib/pbench/cli/server/repair.py @@ -59,11 +59,10 @@ def repair_audit(kwargs): attributes_errors += 1 continue didit = False - for f in a: - value = a[f] + for key, value in a.items(): if type(value) is str and len(value) > LIMIT: - a[f] = value[:LIMIT] - detailer.message(f"{name} [{f}] truncated ({len(value)}) to {LIMIT}") + a[key] = value[:LIMIT] + detailer.message(f"{name} [{key}] truncated ({len(value)}) to {LIMIT}") didit = True if didit: audit.attributes = a @@ -96,6 +95,15 @@ def find_tarball(resource_id: str, kwargs) -> Optional[Path]: tarball path if found, or None """ + def get_md5(file: Path) -> Optional[str]: + """Locate and read a tarball's associated MD5 hash""" + md5 = file.with_suffix(".xz.md5") + if md5.is_file(): + return md5.read_text().split(" ", maxsplit=1)[0] + else: + detailer.error(f"Missing MD5 {md5}") + return None + # We use the cache manager as a standard way to get the ARCHIVE root tree = cache_manager.CacheManager(kwargs["_config"], kwargs["_logger"]) for controller in tree.archive_root.iterdir(): @@ -105,12 +113,33 @@ def find_tarball(resource_id: str, kwargs) -> Optional[Path]: if isolator.is_dir(): tars = list(isolator.glob("*.tar.xz")) if len(tars) == 1: + tar = tars[0] + if get_md5(tar) != resource_id: + detailer.error( + f"Isolated tarball {tar} MD5 doesn't match isolator {resource_id}" + ) + return None + verifier.status(f"Found {tars[0]} for ID {resource_id}", 2) return tars[0] + else: + detailer.error( + f"Isolator directory {isolator} contains multiple tarballs: {[str(t) for t in tars]}" + ) + for tar in tars: + if get_md5(tar) == resource_id: + verifier.status( + f"Found {[str(t) for t in tars]} for ID {resource_id}", + 2, + ) + return tar + detailer.error( + f"Isolator directory {isolator} doesn't contain a tarball for {resource_id}" + ) + return None else: - for md5 in controller.glob("**/*.tar.xz.md5"): - id = md5.read_text().split(" ", maxsplit=1)[0] - if id == resource_id: - tar = md5.with_suffix("") + for tar in controller.glob("**/*.tar.xz"): + if get_md5(tar) == resource_id: + verifier.status(f"Found {str(tar)} for ID {resource_id}", 2) return tar return None @@ -284,7 +313,7 @@ def repair_metadata(kwargs): "--progress", "-p", type=float, default=0.0, help="Show periodic progress messages" ) @click.option( - "--verify", "-v", default=False, is_flag=True, help="Display intermediate messages" + "--verify", "-v", default=False, count=True, help="Display intermediate messages" ) @common_options def repair(context: object, **kwargs): diff --git a/lib/pbench/server/cache_manager.py b/lib/pbench/server/cache_manager.py index e7604c7b65..e3e096374a 100644 --- a/lib/pbench/server/cache_manager.py +++ b/lib/pbench/server/cache_manager.py @@ -4,7 +4,6 @@ from enum import auto, Enum import errno import fcntl -import logging from logging import Logger import math import os @@ -1032,22 +1031,18 @@ def subprocess_run( # gather those lines only up to our configured size limit. size = 0 message = [] - log = logging.getLogger("sub") - log.info(process.stderr) lines = process.stderr.split("\n") for line in lines: # Skip empty lines if not line: continue size += len(line) - log.info(f"LINE {line!r} ({size} <= MAX {MAX_ERROR})") if size > MAX_ERROR: break message.append(line) # If even the first line was too big, truncate it msg = "\n".join(message) if message else lines[0][:MAX_ERROR] - log.info(f"MSG {msg!r}") raise exception( ctx, f"{cmd[0]} exited with status {process.returncode}: {msg!r}", diff --git a/lib/pbench/server/database/models/server_settings.py b/lib/pbench/server/database/models/server_settings.py index e453e46409..4cfbf95e7d 100644 --- a/lib/pbench/server/database/models/server_settings.py +++ b/lib/pbench/server/database/models/server_settings.py @@ -149,14 +149,12 @@ def get_retention_days(config: PbenchServerConfig) -> int: integer days to expiration """ try: - retention_days = int(ServerSetting.get(OPTION_DATASET_LIFETIME)) - return retention_days + return int(ServerSetting.get(OPTION_DATASET_LIFETIME)) except Exception: try: - retention_days = config.default_retention_period + return config.default_retention_period except Exception: - retention_days = config.MAXIMUM_RETENTION_DAYS - return retention_days + return config.MAXIMUM_RETENTION_DAYS def validate_server_state(key: str, value: JSONVALUE) -> JSONVALUE: From 159a2c5a17cbd232563872e7ec3f68a1829602a0 Mon Sep 17 00:00:00 2001 From: David Butenhof Date: Wed, 13 Mar 2024 15:21:39 -0400 Subject: [PATCH 5/7] More churn --- lib/pbench/cli/server/repair.py | 34 ++++++------------- lib/pbench/server/cache_manager.py | 16 +++++++-- lib/pbench/server/indexing_tarballs.py | 6 ++-- .../test/unit/server/test_cache_manager.py | 4 +-- 4 files changed, 31 insertions(+), 29 deletions(-) diff --git a/lib/pbench/cli/server/repair.py b/lib/pbench/cli/server/repair.py index 4e35c94738..e5113f3920 100644 --- a/lib/pbench/cli/server/repair.py +++ b/lib/pbench/cli/server/repair.py @@ -61,7 +61,7 @@ def repair_audit(kwargs): didit = False for key, value in a.items(): if type(value) is str and len(value) > LIMIT: - a[key] = value[:LIMIT] + a[key] = f"[TRUNC({len(value)})]" + value[:LIMIT] detailer.message(f"{name} [{key}] truncated ({len(value)}) to {LIMIT}") didit = True if didit: @@ -112,34 +112,22 @@ def get_md5(file: Path) -> Optional[str]: isolator = controller / resource_id if isolator.is_dir(): tars = list(isolator.glob("*.tar.xz")) - if len(tars) == 1: - tar = tars[0] - if get_md5(tar) != resource_id: - detailer.error( - f"Isolated tarball {tar} MD5 doesn't match isolator {resource_id}" - ) - return None - verifier.status(f"Found {tars[0]} for ID {resource_id}", 2) - return tars[0] - else: + if len(tars) > 1: detailer.error( f"Isolator directory {isolator} contains multiple tarballs: {[str(t) for t in tars]}" ) - for tar in tars: - if get_md5(tar) == resource_id: - verifier.status( - f"Found {[str(t) for t in tars]} for ID {resource_id}", - 2, - ) - return tar - detailer.error( - f"Isolator directory {isolator} doesn't contain a tarball for {resource_id}" - ) - return None + for tar in tars: + if get_md5(tar) == resource_id: + verifier.status(f"Found {tar} for ID {resource_id}", 2) + return tar + detailer.error( + f"Isolator directory {isolator} doesn't contain a tarball for {resource_id}" + ) + return None else: for tar in controller.glob("**/*.tar.xz"): if get_md5(tar) == resource_id: - verifier.status(f"Found {str(tar)} for ID {resource_id}", 2) + verifier.status(f"Found {tar} for ID {resource_id}", 2) return tar return None diff --git a/lib/pbench/server/cache_manager.py b/lib/pbench/server/cache_manager.py index e3e096374a..691d7b9546 100644 --- a/lib/pbench/server/cache_manager.py +++ b/lib/pbench/server/cache_manager.py @@ -1029,20 +1029,32 @@ def subprocess_run( # tar failures can include a message for each file in the # archive, and can be quite large. Break stderr into lines, and # gather those lines only up to our configured size limit. + # + # The exception we raise and ultimately log will start by + # identifying the tarball, followed by the command and the + # return code. The appended stderr may provide additional + # useful context, but in general beyond the first line it'll + # be fairly meaningless. size = 0 message = [] lines = process.stderr.split("\n") + prefix = "" for line in lines: # Skip empty lines if not line: continue size += len(line) if size > MAX_ERROR: + prefix = "[TRUNC]" break message.append(line) - # If even the first line was too big, truncate it - msg = "\n".join(message) if message else lines[0][:MAX_ERROR] + # Prefix with [TRUNC] if we're not including the entire message + # and truncate the first line if it was too long by itself. + if message: + msg = prefix + ("\n".join(message)) + else: + msg = "[TRUNC]" + lines[0][:MAX_ERROR] raise exception( ctx, f"{cmd[0]} exited with status {process.returncode}: {msg!r}", diff --git a/lib/pbench/server/indexing_tarballs.py b/lib/pbench/server/indexing_tarballs.py index 85d3172b79..58b39e62d8 100644 --- a/lib/pbench/server/indexing_tarballs.py +++ b/lib/pbench/server/indexing_tarballs.py @@ -632,8 +632,10 @@ def sighup_handler(*args): # Re-raise a SIGTERM to avoid it being lumped in with general # exception handling below. raise - except Exception: - idxctx.logger.exception(error_code["GENERIC_ERROR"].message) + except Exception as e: + idxctx.logger.exception( + "{}: {!r}", error_code["GENERIC_ERROR"].message, str(e) + ) res = error_code["GENERIC_ERROR"] else: # No exceptions while processing tar balls, success. diff --git a/lib/pbench/test/unit/server/test_cache_manager.py b/lib/pbench/test/unit/server/test_cache_manager.py index d48dce84ab..118c6b863d 100644 --- a/lib/pbench/test/unit/server/test_cache_manager.py +++ b/lib/pbench/test/unit/server/test_cache_manager.py @@ -406,8 +406,8 @@ def mock_run(args, **_kwargs): "errmsg,expected", ( ("Error unpacking tarball\n", "Error unpacking tarball"), - ("This is a message we'll find too long", "This is a message we'll f"), - ("One line\nTwo line\nThree line\nFour line\n", "One line\nTwo line"), + ("This is a message we'll find too long", "[TRUNC]This is a message we'll f"), + ("One line\nTwo line\nThree line\nFour line\n", "[TRUNC]One line\nTwo line"), ), ) def test_tarball_subprocess_failure(self, monkeypatch, errmsg, expected): From fa2c8d213785b63bc03cedacdaebffdbbe998338 Mon Sep 17 00:00:00 2001 From: David Butenhof Date: Wed, 3 Apr 2024 15:50:55 -0400 Subject: [PATCH 6/7] Rebased with reluctant refactoring To simplify edge cases, I give in, although for the record I'm not happy about giving up on the line-based truncation: I just want it to be done. (And, ultimately, I don't think it really matters all that much.) --- lib/pbench/cli/server/__init__.py | 22 ------ lib/pbench/cli/server/repair.py | 9 ++- .../server/api/resources/intake_base.py | 6 +- lib/pbench/server/cache_manager.py | 68 ++++++++----------- .../test/unit/server/test_cache_manager.py | 12 +++- setup.cfg | 4 +- 6 files changed, 48 insertions(+), 73 deletions(-) diff --git a/lib/pbench/cli/server/__init__.py b/lib/pbench/cli/server/__init__.py index b8d913de4e..b66557a1ce 100644 --- a/lib/pbench/cli/server/__init__.py +++ b/lib/pbench/cli/server/__init__.py @@ -1,7 +1,6 @@ import datetime from threading import Thread import time -from typing import Any, Optional from typing import Any, Optional, Union import click @@ -163,27 +162,6 @@ def watcher(self): ) -class DateParser(ParamType): - """The DateParser type converts date strings into `datetime` objects. - - This is a variant of click's built-in DateTime parser, but uses the - more flexible dateutil.parser - """ - - name = "dateparser" - - def convert( - self, value: Any, param: Optional[Parameter], ctx: Optional[Context] - ) -> Any: - if isinstance(value, datetime.datetime): - return value - - try: - return parser.parse(value) - except Exception as e: - self.fail(f"{value!r} cannot be converted to a datetime: {str(e)!r}") - - def config_setup(context: object) -> PbenchServerConfig: config = PbenchServerConfig.create(context.config) # We're going to need the DB to track dataset state, so setup DB access. diff --git a/lib/pbench/cli/server/repair.py b/lib/pbench/cli/server/repair.py index e5113f3920..42287b3d03 100644 --- a/lib/pbench/cli/server/repair.py +++ b/lib/pbench/cli/server/repair.py @@ -61,7 +61,8 @@ def repair_audit(kwargs): didit = False for key, value in a.items(): if type(value) is str and len(value) > LIMIT: - a[key] = f"[TRUNC({len(value)})]" + value[:LIMIT] + p = f"[TRUNC({len(value)})]" + a[key] = (p + value)[:LIMIT] detailer.message(f"{name} [{key}] truncated ({len(value)}) to {LIMIT}") didit = True if didit: @@ -72,8 +73,10 @@ def repair_audit(kwargs): Database.db_session.commit() except Exception as e: commit_error = str(e) - click.echo(f"{count} audit records had attributes too long") - click.echo(f"{updated} records were fixed") + click.echo(f"{count} audit records triggered field truncation") + click.echo( + f"{updated} records were truncated, {count-updated} had no eligible fields" + ) if attributes_errors: click.echo(f"{attributes_errors} had format errors in attributes") if commit_error: diff --git a/lib/pbench/server/api/resources/intake_base.py b/lib/pbench/server/api/resources/intake_base.py index 70fe81248f..75764d9765 100644 --- a/lib/pbench/server/api/resources/intake_base.py +++ b/lib/pbench/server/api/resources/intake_base.py @@ -47,7 +47,11 @@ OperationName, OperationState, ) -from pbench.server.database.models.server_settings import get_retention_days +from pbench.server.database.models.server_settings import ( + get_retention_days, + OPTION_SERVER_INDEXING, + ServerSetting, +) from pbench.server.sync import Sync from pbench.server.utils import UtcTimeHelper diff --git a/lib/pbench/server/cache_manager.py b/lib/pbench/server/cache_manager.py index 691d7b9546..b1527b0177 100644 --- a/lib/pbench/server/cache_manager.py +++ b/lib/pbench/server/cache_manager.py @@ -27,6 +27,7 @@ RECLAIM_BYTES_PAD = 1024 # Pad unpack reclaim requests by this much MB_BYTES = 1024 * 1024 # Bytes in Mb MAX_ERROR = 1024 * 5 # Maximum length of subprocess stderr to capture +TRUNC_PREFIX = "[TRUNC]" # Indicate that subprocess stderr was truncated class CacheManagerError(Exception): @@ -96,24 +97,35 @@ def __init__(self, tarball: Path, error: Exception): self.error = str(error) -class TarballUnpackError(CacheManagerError): +class UnpackBaseError(CacheManagerError): + """Base class for unpacking errors""" + + def __init__(self, context: Path, error: str, stderr: Optional[str] = None): + m = f"{error}: {stderr!r}" if stderr else error + super().__init__(m) + self.stderr = stderr + + +class TarballUnpackError(UnpackBaseError): """An error occurred trying to unpack a tarball.""" - def __init__(self, tarball: Path, error: str): - super().__init__(f"An error occurred while unpacking {tarball}: {error}") + def __init__(self, tarball: Path, error: str, stderr: Optional[str] = None): + super().__init__( + tarball, f"An error occurred while unpacking {tarball}: {error}", stderr + ) self.tarball = tarball - self.error = error -class TarballModeChangeError(CacheManagerError): +class TarballModeChangeError(UnpackBaseError): """An error occurred trying to fix unpacked tarball permissions.""" - def __init__(self, tarball: Path, error: str): + def __init__(self, directory: Path, error: str, stderr: Optional[str] = None): super().__init__( - f"An error occurred while changing file permissions of {tarball}: {error}" + directory, + f"An error occurred while changing file permissions of {directory}: {error}", + stderr, ) - self.tarball = tarball - self.error = error + self.directory = directory class CacheType(Enum): @@ -994,7 +1006,7 @@ def _get_metadata(tarball_path: Path) -> JSONOBJECT: def subprocess_run( command: str, working_dir: PathLike, - exception: type[CacheManagerError], + exception: type[UnpackBaseError], ctx: Path, ): """Runs a command as a subprocess. @@ -1025,39 +1037,13 @@ def subprocess_run( raise exception(ctx, str(exc)) from exc else: if process.returncode != 0: - - # tar failures can include a message for each file in the - # archive, and can be quite large. Break stderr into lines, and - # gather those lines only up to our configured size limit. - # - # The exception we raise and ultimately log will start by - # identifying the tarball, followed by the command and the - # return code. The appended stderr may provide additional - # useful context, but in general beyond the first line it'll - # be fairly meaningless. - size = 0 - message = [] - lines = process.stderr.split("\n") - prefix = "" - for line in lines: - # Skip empty lines - if not line: - continue - size += len(line) - if size > MAX_ERROR: - prefix = "[TRUNC]" - break - message.append(line) - - # Prefix with [TRUNC] if we're not including the entire message - # and truncate the first line if it was too long by itself. - if message: - msg = prefix + ("\n".join(message)) - else: - msg = "[TRUNC]" + lines[0][:MAX_ERROR] + msg = process.stderr + if len(msg) > MAX_ERROR - len(TRUNC_PREFIX): + msg = (TRUNC_PREFIX + msg)[:MAX_ERROR] raise exception( ctx, - f"{cmd[0]} exited with status {process.returncode}: {msg!r}", + f"{cmd[0]} exited with status {process.returncode}", + stderr=msg, ) def get_unpacked_size(self) -> int: diff --git a/lib/pbench/test/unit/server/test_cache_manager.py b/lib/pbench/test/unit/server/test_cache_manager.py index 118c6b863d..cf2e9ef801 100644 --- a/lib/pbench/test/unit/server/test_cache_manager.py +++ b/lib/pbench/test/unit/server/test_cache_manager.py @@ -405,9 +405,15 @@ def mock_run(args, **_kwargs): @pytest.mark.parametrize( "errmsg,expected", ( - ("Error unpacking tarball\n", "Error unpacking tarball"), - ("This is a message we'll find too long", "[TRUNC]This is a message we'll f"), - ("One line\nTwo line\nThree line\nFour line\n", "[TRUNC]One line\nTwo line"), + ("Error unpacking", "Error unpacking"), + ( + "This is a message we'll find too long", + "[TRUNC]This is a message ", + ), + ( + "One line\nTwo line\nThree line\nFour line\n", + "[TRUNC]One line\nTwo line\n", + ), ), ) def test_tarball_subprocess_failure(self, monkeypatch, errmsg, expected): diff --git a/setup.cfg b/setup.cfg index efe20dce85..7aa620f84d 100644 --- a/setup.cfg +++ b/setup.cfg @@ -29,14 +29,12 @@ console_scripts = pbench-clear-results = pbench.cli.agent.commands.results.clear:main pbench-clear-tools = pbench.cli.agent.commands.tools.clear:main pbench-config = pbench.cli.agent.commands.conf:main - pbench-repair = pbench.cli.server.repair:repair - pbench-report-generator = pbench.cli.server.report:report - pbench-tree-manage = pbench.cli.server.tree_manage:tree_manage pbench-is-local = pbench.cli.agent.commands.is_local:main pbench-list-tools = pbench.cli.agent.commands.tools.list:main pbench-list-triggers = pbench.cli.agent.commands.triggers.list:main pbench-register-tool-trigger = pbench.cli.agent.commands.triggers.register:main pbench-reindex = pbench.cli.server.reindex:reindex + pbench-repair = pbench.cli.server.repair:repair pbench-report-generator = pbench.cli.server.report:report pbench-results-move = pbench.cli.agent.commands.results.move:main pbench-results-push = pbench.cli.agent.commands.results.push:main From c6f30791b5b4d32000f58c4faa779b2cd775484a Mon Sep 17 00:00:00 2001 From: David Butenhof Date: Fri, 5 Apr 2024 07:25:00 -0400 Subject: [PATCH 7/7] refine --- lib/pbench/server/cache_manager.py | 2 +- lib/pbench/test/unit/server/test_cache_manager.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/pbench/server/cache_manager.py b/lib/pbench/server/cache_manager.py index b1527b0177..39812eeb31 100644 --- a/lib/pbench/server/cache_manager.py +++ b/lib/pbench/server/cache_manager.py @@ -1038,7 +1038,7 @@ def subprocess_run( else: if process.returncode != 0: msg = process.stderr - if len(msg) > MAX_ERROR - len(TRUNC_PREFIX): + if len(msg) > MAX_ERROR: msg = (TRUNC_PREFIX + msg)[:MAX_ERROR] raise exception( ctx, diff --git a/lib/pbench/test/unit/server/test_cache_manager.py b/lib/pbench/test/unit/server/test_cache_manager.py index cf2e9ef801..b7c8b7c3e1 100644 --- a/lib/pbench/test/unit/server/test_cache_manager.py +++ b/lib/pbench/test/unit/server/test_cache_manager.py @@ -406,6 +406,7 @@ def mock_run(args, **_kwargs): "errmsg,expected", ( ("Error unpacking", "Error unpacking"), + ("This message has 25 bytes", "This message has 25 bytes"), ( "This is a message we'll find too long", "[TRUNC]This is a message ",