Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 32 additions & 16 deletions lib/pbench/cli/server/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,13 +295,17 @@ def report_sql():
"""Report the SQL table storage statistics"""

watcher.update("inspecting SQL tables")
table_count = 0
row_count = 0
row_size = 0
click.echo("SQL storage report:")
t_w = 20
r_w = 10
s_w = 10
click.echo(f" {'Table':<{t_w}} {'Rows':<{r_w}} {'Storage':<{s_w}}")
click.echo(f" {'':-<{t_w}} {'':-<{r_w}} {'':-<{s_w}}")
for t in inspect(Database.db_session.get_bind()).get_table_names():
table_count += 1
(rows,) = next(
Database.db_session.execute(statement=text(f"SELECT COUNT(*) FROM {t}"))
)
Expand All @@ -312,6 +316,11 @@ def report_sql():
)
)
click.echo(f" {t:<{t_w}} {rows:>{r_w},d} {humanize.naturalsize(size):>{s_w}}")
row_count += rows
row_size += size
click.echo(
f" Total of {row_count:,d} rows in {table_count:,d} tables, consuming {humanize.naturalsize(row_size)}"
)

if not detailer:
return
Expand Down Expand Up @@ -357,6 +366,7 @@ def report_states():
index_pattern: re.Pattern = re.compile(r"^(\d+):(.*)$")
index_errors = defaultdict(int)
index_messages = defaultdict(str)
ops_anomalies = 0
operations = defaultdict(lambda: defaultdict(int))
rows = Database.db_session.execute(
statement=text(
Expand All @@ -366,22 +376,26 @@ def report_states():
)
for dataset, operation, state, message in rows:
watcher.update(f"inspecting {dataset}:{operation}")
operations[operation][state] += 1
if state == "FAILED":
detailer.error(f"{operation} {state} for {dataset}: {message!r}")
if operation == "INDEX":
match = index_pattern.match(message)
if match:
try:
code = int(match.group(1))
message = match.group(2)
index_errors[code] += 1
if code not in index_messages:
index_messages[code] = message
except Exception as e:
detailer.error(
f"{dataset} unexpected 'INDEX' error {message}: {str(e)!r}"
)
if operation is None:
ops_anomalies += 1
detailer.error(f"{dataset} doesn't have operational state")
else:
operations[operation][state] += 1
if state == "FAILED":
detailer.error(f"{operation} {state} for {dataset}: {message!r}")
if operation == "INDEX":
match = index_pattern.match(message)
if match:
try:
code = int(match.group(1))
message = match.group(2)
index_errors[code] += 1
if code not in index_messages:
index_messages[code] = message
except Exception as e:
detailer.error(
f"{dataset} unexpected 'INDEX' error {message}: {str(e)!r}"
)
click.echo("Operational states:")
for name, states in operations.items():
click.echo(f" {name} states:")
Expand All @@ -392,6 +406,8 @@ def report_states():
click.echo(
f" CODE {code:2d}: {count:>6,d} {index_messages[code]}"
)
if ops_anomalies:
click.echo(f" {ops_anomalies} datasets are missing operational state")


@click.command(name="pbench-report-generator")
Expand Down
27 changes: 4 additions & 23 deletions lib/pbench/server/api/resources/query_apis/datasets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,13 @@
ApiParams,
ApiSchema,
ParamType,
SchemaError,
)
from pbench.server.api.resources.query_apis import ElasticBase
from pbench.server.database.models.datasets import Dataset, Metadata
from pbench.server.database.models.index_map import IndexMap
from pbench.server.database.models.templates import Template


class MissingDatasetNameParameter(SchemaError):
"""The subclass schema is missing the required "name" parameter required
to locate a Dataset.

NOTE: This is a development error, not a client error, and will be raised
when the API is initialized at server startup. Arguably, this could be an
assert since it prevents launching the server.
"""

def __init__(self, subclass_name: str, message: str):
super().__init__()
self.subclass_name = subclass_name
self.message = message

def __str__(self) -> str:
return f"API {self.subclass_name} is {self.message}"


class IndexMapBase(ElasticBase):
"""A base class for query apis that depends on the index map.

Expand Down Expand Up @@ -133,17 +114,17 @@ def get_index(
"""

archive_only = Metadata.getvalue(dataset, Metadata.SERVER_ARCHIVE)
if archive_only and ok_no_index:
return ""

if archive_only:
if ok_no_index:
return ""
raise APIAbort(HTTPStatus.CONFLICT, "Dataset indexing was disabled")

index_keys = list(IndexMap.indices(dataset, root_index_name))

if not index_keys:
raise APIAbort(
HTTPStatus.NOT_FOUND, f"Dataset has no {root_index_name!r} data"
HTTPStatus.NOT_FOUND,
f"Dataset has no {root_index_name if root_index_name else 'indexed'!r} data",
)

indices = ",".join(index_keys)
Expand Down
79 changes: 48 additions & 31 deletions lib/pbench/server/api/resources/query_apis/datasets/datasets.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from collections import defaultdict
from http import HTTPStatus

from flask import current_app, jsonify, Response
Expand All @@ -16,8 +17,9 @@
ParamType,
Schema,
)
from pbench.server.api.resources.query_apis import ApiContext, PostprocessError
from pbench.server.api.resources.query_apis import ApiContext
from pbench.server.api.resources.query_apis.datasets import IndexMapBase
import pbench.server.auth.auth as Auth
from pbench.server.cache_manager import CacheManager
from pbench.server.database.models.audit import AuditReason, AuditStatus, AuditType
from pbench.server.database.models.datasets import (
Expand Down Expand Up @@ -120,14 +122,13 @@ def assemble(self, params: ApiParams, context: ApiContext) -> JSONOBJECT:

dataset = context["dataset"]
action = context["attributes"].action
get_action = action == "get"
context["action"] = action
audit_attributes = {}
access = None
owner = None
elastic_options = {"ignore_unavailable": "true"}

if not get_action:
if action != "get":
elastic_options["refresh"] = "true"
operations = Operation.by_state(dataset, OperationState.WORKING)
if context["attributes"].require_stable and operations:
Expand All @@ -147,7 +148,9 @@ def assemble(self, params: ApiParams, context: ApiContext) -> JSONOBJECT:
OperationState.WORKING.name,
e,
)
raise APIAbort(HTTPStatus.CONFLICT, "Unable to set operational state")
raise APIInternalError(
f"can't set {OperationState.WORKING.name} on {dataset.name}: {str(e)!r} "
)
context["sync"] = sync
context["auditing"]["attributes"] = audit_attributes
if action == "update":
Expand All @@ -156,6 +159,14 @@ def assemble(self, params: ApiParams, context: ApiContext) -> JSONOBJECT:
if not access and not owner:
raise MissingParameters(["access", "owner"])

if owner and owner != dataset.owner_id:
auth_user = Auth.token_auth.current_user()
if not auth_user.is_admin():
raise APIAbort(
HTTPStatus.FORBIDDEN,
"ADMIN role is required to change dataset ownership",
)

if access:
audit_attributes["access"] = access
context["access"] = access
Expand All @@ -167,12 +178,16 @@ def assemble(self, params: ApiParams, context: ApiContext) -> JSONOBJECT:
else:
owner = dataset.owner_id

# Get the Elasticsearch indices occupied by the dataset. If there are
# none, return with an empty query to disable the Elasticsearch call.
# Get the Elasticsearch indices occupied by the dataset.
#
# We postprocess UPDATE and DELETE even without any indexed documents
# in order to update the Dataset object, so tell get_index not to fail
# in that case, and return an empty query to disable the Elasticsearch
# call.
#
# It's important that all context fields required for postprocessing
# of unindexed datasets have been set before this!
indices = self.get_index(dataset, ok_no_index=(not get_action))
indices = self.get_index(dataset, ok_no_index=(action != "get"))
context["indices"] = indices
if not indices:
return {}
Expand All @@ -195,11 +210,12 @@ def assemble(self, params: ApiParams, context: ApiContext) -> JSONOBJECT:
}

if action == "update":
painless = "ctx._source.authorization=params.authorization"
script_params = {"authorization": {"access": access, "owner": owner}}
script = {"source": painless, "lang": "painless", "params": script_params}
json["path"] = f"{indices}/_update_by_query"
json["kwargs"]["json"]["script"] = script
json["kwargs"]["json"]["script"] = {
"source": "ctx._source.authorization=params.authorization",
"lang": "painless",
"params": {"authorization": {"access": access, "owner": owner}},
}
elif action == "get":
json["path"] = f"{indices}/_search"
elif action == "delete":
Expand Down Expand Up @@ -232,32 +248,31 @@ def postprocess(self, es_json: JSONOBJECT, context: ApiContext) -> Response:
current_app.logger.info("POSTPROCESS {}: {}", dataset.name, es_json)
failures = 0
if action == "get":
count = None
hits = []
try:
count = es_json["hits"]["total"]["value"]
hits = es_json["hits"]["hits"]
if int(count) == 0:
current_app.logger.info("No data returned by Elasticsearch")
return jsonify([])
raise APIInternalError(
f"Elasticsearch returned no matches for {dataset.name}"
)
except KeyError as e:
raise PostprocessError(
HTTPStatus.BAD_REQUEST,
f"Can't find Elasticsearch match data {e} in {es_json!r}",
raise APIInternalError(
f"Can't find Elasticsearch match data for {dataset.name} ({e}) in {es_json!r}",
)
except ValueError as e:
raise PostprocessError(
HTTPStatus.BAD_REQUEST,
f"Elasticsearch hit count {count!r} value: {e}",
raise APIInternalError(
f"Elasticsearch bad hit count {count!r} for {dataset.name}: {e}",
)
results = []
for hit in es_json["hits"]["hits"]:
s = hit["_source"]
s["id"] = hit["_id"]
results.append(s)

results = defaultdict(int)
for hit in hits:
results[hit["_index"]] += 1
return jsonify(results)
else:
if es_json:
fields = ("deleted", "updated", "total", "version_conflicts")
results = {f: es_json[f] if f in es_json else None for f in fields}
results = {f: es_json.get(f, 0) for f in fields}
failures = len(es_json["failures"]) if "failures" in es_json else 0
results["failures"] = failures
else:
Expand All @@ -282,7 +297,7 @@ def postprocess(self, es_json: JSONOBJECT, context: ApiContext) -> Response:
dataset.update()
except Exception as e:
raise APIInternalError(
f"unable to update dataset {dataset.name}: {str(e)!r}"
f"Unable to update dataset {dataset.name}: {str(e)!r}"
) from e
elif action == "delete":
try:
Expand All @@ -295,7 +310,7 @@ def postprocess(self, es_json: JSONOBJECT, context: ApiContext) -> Response:
del context["sync"]
except Exception as e:
raise APIInternalError(
f"unable to delete dataset {dataset.name}: {str(e)!r}"
f"Unable to delete dataset {dataset.name}: {str(e)!r}"
) from e

# The DELETE API removes the "sync" context on success to signal
Expand All @@ -312,7 +327,9 @@ def postprocess(self, es_json: JSONOBJECT, context: ApiContext) -> Response:
auditing["attributes"] = {"message": str(e)}
auditing["status"] = AuditStatus.WARNING
auditing["reason"] = AuditReason.INTERNAL
raise APIInternalError(f"Unexpected sync unlock error '{e}'") from e
raise APIInternalError(
f"Unexpected sync error {dataset.name} {str(e)!r}"
) from e

# Return the summary document as the success response, or abort with an
# internal error if we tried to operate on Elasticsearch documents but
Expand All @@ -323,9 +340,9 @@ def postprocess(self, es_json: JSONOBJECT, context: ApiContext) -> Response:
auditing["reason"] = AuditReason.INTERNAL
auditing["attributes"][
"message"
] = f"Unable to {context['attributes'].action} some indexed documents"
] = f"Unable to {action} some indexed documents"
raise APIInternalError(
f"Failed to {context['attributes'].action} any of {results['total']} "
f"Failed to {action} any of {results['total']} "
f"Elasticsearch documents: {es_json}"
)
elif sync:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,17 @@
from pbench.server.api.resources import (
APIAbort,
ApiAuthorizationType,
APIInternalError,
ApiMethod,
ApiParams,
ApiSchema,
Parameter,
ParamType,
Schema,
)
from pbench.server.api.resources.query_apis import ApiContext, PostprocessError
from pbench.server.api.resources.query_apis import ApiContext
from pbench.server.api.resources.query_apis.datasets import IndexMapBase
from pbench.server.database.models.datasets import (
Dataset,
DatasetNotFound,
Metadata,
MetadataError,
)
from pbench.server.database.models.datasets import Metadata, MetadataError


class DatasetsDetail(IndexMapBase):
Expand Down Expand Up @@ -122,18 +118,17 @@ def postprocess(self, es_json: JSONOBJECT, context: ApiContext) -> Response:
}
]
"""

dataset = context["dataset"]
hits = es_json["hits"]["hits"]

# NOTE: we're expecting just one. We're matching by just the
# dataset name, which ought to be unique.
# dataset resource ID, which ought to be unique.
if len(hits) == 0:
raise PostprocessError(
HTTPStatus.BAD_REQUEST, "The specified dataset has gone missing"
)
raise APIInternalError(f"Dataset {dataset.name} run document is missing")
elif len(hits) > 1:
raise PostprocessError(
HTTPStatus.BAD_REQUEST, "Too many hits for a unique query"
)
raise APIInternalError(f"Dataset {dataset.name} has multiple run documents")

src = hits[0]["_source"]

# We're merging the "run" and "@metadata" sub-documents into
Expand All @@ -147,12 +142,7 @@ def postprocess(self, es_json: JSONOBJECT, context: ApiContext) -> Response:
}

try:
dataset = Dataset.query(resource_id=(src["run"]["id"]))
m = self._get_dataset_metadata(dataset, context["metadata"])
except DatasetNotFound:
raise APIAbort(
HTTPStatus.BAD_REQUEST, f"Dataset {src['run']['id']} not found"
)
except MetadataError as e:
raise APIAbort(HTTPStatus.BAD_REQUEST, str(e))

Expand Down
Loading