Skip to content

Commit

Permalink
"Note" non-fatal INTAKE behaviors (#3510)
Browse files Browse the repository at this point in the history
* "Note" non-fatal INTAKE behaviors

PBENCH-1231

After accidentally observing an uploaded dataset that was marked "archiveonly"
not indexed (because the file name didn't match the result directory name), we
decided that the intake process should make the reasons more obvious.

I've handled this by adding "notes", which are recorded both in the successful
JSON response payload and in the finalized audit record. While I was at it, I
exposed our decisions regarding the canonical benchmarked workload name and
the computed expiration date.

While doing this, I stripped the rather prolific logging of `INTAKE` to just
two, a "pre" log identifying the file and user, and a "post" log providing the
summary file system status (adding the size of the tarball).
  • Loading branch information
dbutenhof committed Aug 3, 2023
1 parent 780adca commit c3688cd
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 20 deletions.
49 changes: 29 additions & 20 deletions lib/pbench/server/api/resources/intake_base.py
Expand Up @@ -79,7 +79,6 @@ def __init__(self, config: PbenchServerConfig, schema: ApiSchema):
self.temporary.mkdir(mode=0o755, parents=True, exist_ok=True)
method = list(self.schemas.schemas.keys())[0]
self.name = self.schemas[method].audit_name
current_app.logger.info("INTAKE temporary directory is {}", self.temporary)

@staticmethod
def process_metadata(metas: list[str]) -> JSONOBJECT:
Expand Down Expand Up @@ -197,6 +196,7 @@ def _intake(
audit: Optional[Audit] = None
username: Optional[str] = None
intake_dir: Optional[Path] = None
notes = []

prefix = current_app.server_config.rest_uri
origin = f"{self._get_uri_base(request).host}{prefix}/datasets/"
Expand Down Expand Up @@ -253,17 +253,12 @@ def _intake(
md5_full_path = intake_dir / f"{filename}.md5"

bytes_received = 0
usage = shutil.disk_usage(tar_full_path.parent)
current_app.logger.info(
"{} {} (pre): {:.3}% full, {} remaining",
"INTAKE (pre) {} {} for {} to {}",
self.name,
tar_full_path.name,
float(usage.used) / float(usage.total) * 100.0,
humanize.naturalsize(usage.free),
)

current_app.logger.info(
"{} {} for {} to {}", self.name, filename, username, tar_full_path
dataset_name,
username,
tar_full_path,
)

# Create a tracking dataset object; it'll begin in UPLOADING state
Expand Down Expand Up @@ -405,15 +400,6 @@ def _intake(
f"Unable to create dataset in file system for {tar_full_path}: {exc}"
) from exc

usage = shutil.disk_usage(tar_full_path.parent)
current_app.logger.info(
"{} {} (post): {:.3}% full, {} remaining",
self.name,
tar_full_path.name,
float(usage.used) / float(usage.total) * 100.0,
humanize.naturalsize(usage.free),
)

# From this point, failure will remove the tarball from the cache
# manager.
recovery.add(tarball.delete)
Expand All @@ -430,13 +416,21 @@ def _intake(
benchmark = Metadata.SERVER_BENCHMARK_UNKNOWN
metalog = {"pbench": {"name": dataset.name, "script": benchmark}}
metadata[Metadata.SERVER_ARCHIVE] = True
current_app.logger.warning(
"INTAKE marking {} as archive-only because no 'metadata.log' can be found.",
dataset.name,
)
notes.append(
f"Results archive is missing '{dataset.name}/metadata.log'."
)
attributes["missing_metadata"] = True
else:
p = metalog.get("pbench")
if p:
benchmark = p.get("script", Metadata.SERVER_BENCHMARK_UNKNOWN)
else:
benchmark = Metadata.SERVER_BENCHMARK_UNKNOWN
notes.append(f"Identified benchmark workload {benchmark!r}.")
Metadata.create(dataset=dataset, key=Metadata.METALOG, value=metalog)
except Exception as exc:
raise APIInternalError(
Expand All @@ -456,6 +450,7 @@ def _intake(
try:
retention = datetime.timedelta(days=retention_days)
deletion = dataset.uploaded + retention
notes.append(f"Expected expiration date is {deletion:%Y-%m-%d}.")

# Make a shallow copy so we can add keys without affecting the
# original (which will be recorded in the audit log)
Expand All @@ -469,7 +464,6 @@ def _intake(
Metadata.SERVER_BENCHMARK: benchmark,
}
)
current_app.logger.info("Metadata for {}: {}", dataset.name, meta)
f = self._set_dataset_metadata(dataset, meta)
if f:
attributes["failures"] = f
Expand All @@ -481,9 +475,23 @@ def _intake(
# Determine whether we should enable the INDEX operation.
should_index = not metadata.get(Metadata.SERVER_ARCHIVE, False)
enable_next = [OperationName.INDEX] if should_index else None
if not should_index:
notes.append("Indexing is disabled by 'archive only' setting.")
Sync(current_app.logger, OperationName.UPLOAD).update(
dataset=dataset, state=OperationState.OK, enabled=enable_next
)
if notes:
attributes["notes"] = notes

usage = shutil.disk_usage(tar_full_path.parent)
current_app.logger.info(
"INTAKE (post) {} {}: {:.3}% full, {} remaining: dataset size {}",
self.name,
dataset.name,
float(usage.used) / float(usage.total) * 100.0,
humanize.naturalsize(usage.free),
humanize.naturalsize(stream.length),
)
Audit.create(
root=audit, status=AuditStatus.SUCCESS, attributes=attributes
)
Expand Down Expand Up @@ -530,6 +538,7 @@ def _intake(
"message": "File successfully uploaded",
"name": dataset.name,
"resource_id": dataset.resource_id,
"notes": notes,
}
)
response.headers["location"] = f"{origin}{dataset.resource_id}/inventory/"
Expand Down
38 changes: 38 additions & 0 deletions lib/pbench/test/functional/server/test_datasets.py
Expand Up @@ -21,9 +21,29 @@


def utc_from_str(date: str) -> datetime:
"""Convert a date string to a UTC datetime
Args:
date: date/time string
Returns:
UTC datetime object
"""
return dateutil.parser.parse(date).replace(tzinfo=timezone.utc)


def expiration() -> str:
"""Calculate a datetime for dataset deletion from "now".
Returns:
A "YYYY-MM-DD" string representing the day when a dataset uploaded
"now" would be deleted.
"""
retention = timedelta(days=730)
d = datetime.now(timezone.utc) + retention
return f"{d:%Y-%m-%d}"


@dataclass
class Tarball:
"""Record the tarball path and the uploaded access value"""
Expand Down Expand Up @@ -72,10 +92,16 @@ def test_upload_all(self, server_client: PbenchServerClient, login_user):
assert (
response.status_code == HTTPStatus.CREATED
), f"upload returned unexpected status {response.status_code}, {response.text} ({t})"
metabench = server_client.get_metadata(md5, ["server.benchmark"])
benchmark = metabench["server.benchmark"]
assert response.json() == {
"message": "File successfully uploaded",
"name": name,
"resource_id": md5,
"notes": [
f"Identified benchmark workload {benchmark!r}.",
f"Expected expiration date is {expiration()}.",
],
}
assert response.headers["location"] == server_client._uri(
API.DATASETS_INVENTORY, {"dataset": md5, "target": ""}
Expand Down Expand Up @@ -178,6 +204,11 @@ def test_archive_only(server_client: PbenchServerClient, login_user):
"message": "File successfully uploaded",
"name": name,
"resource_id": md5,
"notes": [
"Identified benchmark workload 'fio'.",
f"Expected expiration date is {expiration()}.",
"Indexing is disabled by 'archive only' setting.",
],
}
assert response.headers["location"] == server_client._uri(
API.DATASETS_INVENTORY, {"dataset": md5, "target": ""}
Expand Down Expand Up @@ -216,10 +247,17 @@ def test_no_metadata(server_client: PbenchServerClient, login_user):
assert (
response.status_code == HTTPStatus.CREATED
), f"upload {name} returned unexpected status {response.status_code}, {response.text}"

assert response.json() == {
"message": "File successfully uploaded",
"name": name,
"resource_id": md5,
"notes": [
"Results archive is missing 'nometadata/metadata.log'.",
"Identified benchmark workload 'unknown'.",
f"Expected expiration date is {expiration()}.",
"Indexing is disabled by 'archive only' setting.",
],
}
assert response.headers["location"] == server_client._uri(
API.DATASETS_INVENTORY, {"dataset": md5, "target": ""}
Expand Down
9 changes: 9 additions & 0 deletions lib/pbench/test/unit/server/test_relay.py
Expand Up @@ -88,6 +88,7 @@ def test_missing_authorization_header(self, client, server_config):
assert not self.cachemanager_created

@responses.activate
@pytest.mark.freeze_time("2023-07-01")
def test_relay(self, client, server_config, pbench_drb_token, tarball):
"""Verify the success path
Expand Down Expand Up @@ -127,6 +128,10 @@ def test_relay(self, client, server_config, pbench_drb_token, tarball):
"message": "File successfully uploaded",
"name": name,
"resource_id": md5,
"notes": [
"Identified benchmark workload 'unknown'.",
"Expected expiration date is 2025-06-30.",
],
}
assert (
response.headers["location"]
Expand Down Expand Up @@ -164,6 +169,10 @@ def test_relay(self, client, server_config, pbench_drb_token, tarball):
assert audit[1].attributes == {
"access": "private",
"metadata": {"global.pbench.test": "data"},
"notes": [
"Identified benchmark workload 'unknown'.",
"Expected expiration date is 2025-06-30.",
],
}

@responses.activate
Expand Down
30 changes: 30 additions & 0 deletions lib/pbench/test/unit/server/test_upload.py
Expand Up @@ -440,6 +440,10 @@ def test_upload(self, client, pbench_drb_token, server_config, tarball):
"message": "File successfully uploaded",
"name": name,
"resource_id": md5,
"notes": [
"Identified benchmark workload 'unknown'.",
"Expected expiration date is 1972-01-01.",
],
}
assert (
response.headers["location"]
Expand Down Expand Up @@ -491,6 +495,10 @@ def test_upload(self, client, pbench_drb_token, server_config, tarball):
assert audit[1].attributes == {
"access": "private",
"metadata": {"global.pbench.test": "data"},
"notes": [
"Identified benchmark workload 'unknown'.",
"Expected expiration date is 1972-01-01.",
],
}

@pytest.mark.freeze_time("1970-01-01")
Expand Down Expand Up @@ -526,6 +534,7 @@ def test_upload_invalid_metadata(
],
}

@pytest.mark.freeze_time("2023-07-01")
def test_upload_duplicate(self, client, server_config, pbench_drb_token, tarball):
datafile, _, md5 = tarball
with datafile.open("rb") as data_fp:
Expand All @@ -540,6 +549,10 @@ def test_upload_duplicate(self, client, server_config, pbench_drb_token, tarball
"message": "File successfully uploaded",
"name": Dataset.stem(datafile),
"resource_id": md5,
"notes": [
"Identified benchmark workload 'unknown'.",
"Expected expiration date is 2025-06-30.",
],
}
assert (
response.headers["location"]
Expand Down Expand Up @@ -694,6 +707,11 @@ def test_upload_archive(self, client, pbench_drb_token, server_config, tarball):
assert audit[1].attributes == {
"access": "private",
"metadata": {"server.archiveonly": True, "server.origin": "test"},
"notes": [
"Identified benchmark workload 'unknown'.",
"Expected expiration date is 1972-01-01.",
"Indexing is disabled by 'archive only' setting.",
],
}

@pytest.mark.freeze_time("1970-01-01")
Expand All @@ -715,6 +733,12 @@ def test_upload_nometa(self, client, pbench_drb_token, server_config, tarball):
"message": "File successfully uploaded",
"name": name,
"resource_id": md5,
"notes": [
f"Results archive is missing '{name}/metadata.log'.",
"Identified benchmark workload 'unknown'.",
"Expected expiration date is 1972-01-01.",
"Indexing is disabled by 'archive only' setting.",
],
}
assert (
response.headers["location"]
Expand Down Expand Up @@ -771,4 +795,10 @@ def test_upload_nometa(self, client, pbench_drb_token, server_config, tarball):
"access": "private",
"metadata": {"server.archiveonly": True, "server.origin": "test"},
"missing_metadata": True,
"notes": [
f"Results archive is missing '{name}/metadata.log'.",
"Identified benchmark workload 'unknown'.",
"Expected expiration date is 1972-01-01.",
"Indexing is disabled by 'archive only' setting.",
],
}

0 comments on commit c3688cd

Please sign in to comment.