Skip to content

Commit

Permalink
Support status codes in retrieve_latest (#1703)
Browse files Browse the repository at this point in the history
  • Loading branch information
haakonvt committed Apr 11, 2024
1 parent e6cc816 commit b18ec1e
Show file tree
Hide file tree
Showing 8 changed files with 271 additions and 35 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ Changes are grouped as follows
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## [7.34.0] - 2024-04-11
### Added
- Datapoints method `retrieve_latest` now supports status codes.
- Slicing or indexing a `Datapoints` or `DatapointsArray` instance, now propagates status codes (when present).

## [7.33.1] - 2024-04-10
### Fixed
- Ordering of elements from calls to `retrieve_multiple` now match the requested elements. For SDK versions between
Expand Down
162 changes: 136 additions & 26 deletions cognite/client/_api/datapoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -1226,6 +1226,9 @@ def retrieve_latest(
before: None | int | str | datetime = None,
target_unit: str | None = None,
target_unit_system: str | None = None,
include_status: bool = False,
ignore_bad_datapoints: bool = True,
treat_uncertain_as_bad: bool = True,
ignore_unknown_ids: bool = False,
) -> Datapoints | DatapointsList | None:
"""`Get the latest datapoint for one or more time series <https://developer.cognite.com/api#tag/Time-series/operation/getLatest>`_
Expand All @@ -1236,15 +1239,18 @@ def retrieve_latest(
before (None | int | str | datetime): (Union[int, str, datetime]): Get latest datapoint before this time. Not used when passing 'LatestDatapointQuery'.
target_unit (str | None): The unit_external_id of the datapoint returned. If the time series does not have a unit_external_id that can be converted to the target_unit, an error will be returned. Cannot be used with target_unit_system.
target_unit_system (str | None): The unit system of the datapoint returned. Cannot be used with target_unit.
include_status (bool): Also return the status code, an integer, for each datapoint in the response.
ignore_bad_datapoints (bool): Prevent datapoints with a bad status code to be returned. Default: True.
treat_uncertain_as_bad (bool): Treat uncertain status codes as bad. If false, treat uncertain as good. Default: True.
ignore_unknown_ids (bool): Ignore IDs and external IDs that are not found rather than throw an exception.
Returns:
Datapoints | DatapointsList | None: A Datapoints object containing the requested data, or a DatapointsList if multiple were requested. If `ignore_unknown_ids` is `True`, a single time series is requested and it is not found, the function will return `None`.
Examples:
Getting the latest datapoint in a time series. This method returns a Datapoints object, so the datapoint will
be the first element (note: status codes are not supported or returned yet):
Getting the latest datapoint in a time series. This method returns a Datapoints object, so the datapoint
(if it exists) will be the first element:
>>> from cognite.client import CogniteClient
>>> client = CogniteClient()
Expand All @@ -1271,23 +1277,37 @@ def retrieve_latest(
>>> latest_abc = res[0][0]
>>> latest_def = res[1][0]
If you need to specify a different value of 'before' for each time series, or different values for
unit or unit system, you may pass several LatestDatapointQuery objects::
If you for example need to specify a different value of 'before' for each time series, you may pass several
LatestDatapointQuery objects. These will override any parameter passed directly to the function and also allows
for individual customisation of 'target_unit', 'target_unit_system', 'include_status', 'ignore_bad_datapoints'
and 'treat_uncertain_as_bad'.
>>> from datetime import datetime, timezone
>>> id_queries = [
... 123,
... LatestDatapointQuery(id=456, before="1w-ago"),
... LatestDatapointQuery(id=789, before=datetime(2018,1,1, tzinfo=timezone.utc)),
... LatestDatapointQuery(id=987, target_unit="temperature:deg_f")]
>>> ext_id_queries = [
... "foo",
... LatestDatapointQuery(external_id="abc", before="3h-ago", target_unit_system="Imperial"),
... LatestDatapointQuery(external_id="def", include_status=True),
... LatestDatapointQuery(external_id="ghi", treat_uncertain_as_bad=False),
... LatestDatapointQuery(external_id="jkl", include_status=True, ignore_bad_datapoints=False)]
>>> res = client.time_series.data.retrieve_latest(
... id=id_queries,
... external_id=LatestDatapointQuery(
... external_id="abc", before="3h-ago", target_unit_system="Imperial")
... )
... id=id_queries, external_id=ext_id_queries)
"""
fetcher = RetrieveLatestDpsFetcher(
id, external_id, before, target_unit, target_unit_system, ignore_unknown_ids, self
id=id,
external_id=external_id,
before=before,
target_unit=target_unit,
target_unit_system=target_unit_system,
include_status=include_status,
ignore_bad_datapoints=ignore_bad_datapoints,
treat_uncertain_as_bad=treat_uncertain_as_bad,
ignore_unknown_ids=ignore_unknown_ids,
dps_client=self,
)
res = fetcher.fetch_datapoints()
if not fetcher.input_is_singleton:
Expand Down Expand Up @@ -1639,15 +1659,26 @@ def __init__(
before: None | int | str | datetime,
target_unit: None | str,
target_unit_system: None | str,
include_status: bool,
ignore_bad_datapoints: bool,
treat_uncertain_as_bad: bool,
ignore_unknown_ids: bool,
dps_client: DatapointsAPI,
) -> None:
self.before_settings: dict[tuple[str, int], None | int | str | datetime] = {}
self.default_before = before
self.default_unit = target_unit
self.default_unit_system = target_unit_system
self.target_unit_settings: dict[tuple[str, int], None | str] = {}
self.target_unit_system_settings: dict[tuple[str, int], None | str] = {}
self.default_include_status = include_status
self.default_ignore_bad_datapoints = ignore_bad_datapoints
self.default_treat_uncertain_as_bad = treat_uncertain_as_bad

self.settings_before: dict[tuple[str, int], None | int | str | datetime] = {}
self.settings_target_unit: dict[tuple[str, int], None | str] = {}
self.settings_target_unit_system: dict[tuple[str, int], None | str] = {}
self.settings_include_status: dict[tuple[str, int], bool | None] = {}
self.settings_ignore_bad_datapoints: dict[tuple[str, int], bool | None] = {}
self.settings_treat_uncertain_as_bad: dict[tuple[str, int], bool | None] = {}

self.ignore_unknown_ids = ignore_unknown_ids
self.dps_client = dps_client

Expand All @@ -1656,18 +1687,33 @@ def __init__(
self._is_singleton = IdentifierSequence.load(parsed_ids, parsed_xids).is_singleton()
self._all_identifiers = self._prepare_requests(parsed_ids, parsed_xids)

# If features related to status codes are requested, use beta:
# TODO: Remove once status codes -> GA
self.api_subversion = None
if self.requires_api_subversion_beta():
self.api_subversion = dps_client._api_subversion + "-beta"
dps_client._status_codes_warning.warn()

@property
def input_is_singleton(self) -> bool:
return self._is_singleton

def requires_api_subversion_beta(self) -> bool:
return any(
query.get("includeStatus") is True
or query.get("ignoreBadDataPoints") is False
or query.get("treatUncertainAsBad") is False
for query in self._all_identifiers
)

@staticmethod
def _get_and_check_identifier(
query: LatestDatapointQuery,
identifier_type: Literal["id", "external_id"],
) -> int | str:
if (as_primitive := getattr(query, identifier_type)) is None:
if query.identifier.name() != identifier_type:
raise ValueError(f"Missing '{identifier_type}' from: '{query}'")
return as_primitive
return query.identifier.as_primitive()

def _parse_user_input(
self,
Expand All @@ -1680,18 +1726,26 @@ def _parse_user_input(
# memorize the individual 'before'-settings when/where given:
elif isinstance(user_input, LatestDatapointQuery):
as_primitive = self._get_and_check_identifier(user_input, identifier_type)
self.before_settings[(identifier_type, 0)] = user_input.before
self.target_unit_settings[(identifier_type, 0)] = user_input.target_unit
self.target_unit_system_settings[(identifier_type, 0)] = user_input.target_unit_system
idx = (identifier_type, 0)
self.settings_before[idx] = user_input.before
self.settings_target_unit[idx] = user_input.target_unit
self.settings_target_unit_system[idx] = user_input.target_unit_system
self.settings_include_status[idx] = user_input.include_status
self.settings_ignore_bad_datapoints[idx] = user_input.ignore_bad_datapoints
self.settings_treat_uncertain_as_bad[idx] = user_input.treat_uncertain_as_bad
return as_primitive
elif isinstance(user_input, MutableSequence):
user_input = user_input[:] # Modify a shallow copy to avoid side effects
for i, inp in enumerate(user_input):
if isinstance(inp, LatestDatapointQuery):
as_primitive = self._get_and_check_identifier(inp, identifier_type)
self.before_settings[(identifier_type, i)] = inp.before
self.target_unit_settings[(identifier_type, i)] = inp.target_unit
self.target_unit_system_settings[(identifier_type, i)] = inp.target_unit_system
idx = (identifier_type, i)
self.settings_before[idx] = inp.before
self.settings_target_unit[idx] = inp.target_unit
self.settings_target_unit_system[idx] = inp.target_unit_system
self.settings_include_status[idx] = inp.include_status
self.settings_ignore_bad_datapoints[idx] = inp.ignore_bad_datapoints
self.settings_treat_uncertain_as_bad[idx] = inp.treat_uncertain_as_bad
user_input[i] = as_primitive # mutating while iterating like a boss
return user_input

Expand All @@ -1708,27 +1762,82 @@ def _prepare_requests(
# specify a particular timestamp for 'now' in order to possibly get a datapoint a few hundred ms fresher:
for identifiers, identifier_type in zip([all_ids, all_xids], ["id", "external_id"]):
for i, dct in enumerate(identifiers):
i_before = self.before_settings.get((identifier_type, i)) or self.default_before
idx = (identifier_type, i)
i_before = self.settings_before.get(idx) or self.default_before
if "now" != i_before is not None: # mypy doesn't understand 'i_before not in {"now", None}'
dct["before"] = timestamp_to_ms(i_before)
i_target_unit = self.target_unit_settings.get((identifier_type, i)) or self.default_unit
i_target_unit_system = (
self.target_unit_system_settings.get((identifier_type, i)) or self.default_unit_system
)

i_target_unit = self.settings_target_unit.get(idx) or self.default_unit
i_target_unit_system = self.settings_target_unit_system.get(idx) or self.default_unit_system
if i_target_unit is not None and i_target_unit_system is not None:
raise ValueError("You must use either 'target_unit' or 'target_unit_system', not both.")
if i_target_unit is not None:
dct["targetUnit"] = i_target_unit
if i_target_unit_system is not None:
dct["targetUnitSystem"] = i_target_unit_system

# Careful logic: "Not given" vs "given" vs "default" with "truthy/falsy":
if (
self.settings_include_status.get(idx) is True
or self.settings_include_status.get(idx) is None
and self.default_include_status is True
):
dct["includeStatus"] = True

if (
self.settings_ignore_bad_datapoints.get(idx) is False
or self.settings_ignore_bad_datapoints.get(idx) is None
and self.default_ignore_bad_datapoints is False
):
dct["ignoreBadDataPoints"] = False

if (
self.settings_treat_uncertain_as_bad.get(idx) is False
or self.settings_treat_uncertain_as_bad.get(idx) is None
and self.default_treat_uncertain_as_bad is False
):
dct["treatUncertainAsBad"] = False

all_ids.extend(all_xids)
return all_ids

@staticmethod
def _json_float_translation(value: float | Literal["Infinity", "-Infinity", "NaN"] | None) -> float | None:
# As opposed to protobuf, retrieve_latest uses JSON and it returns out-of-range float values as strings:
return {"Infinity": math.inf, "-Infinity": -math.inf, "NaN": math.nan}.get(value, value) # type: ignore [arg-type]

def _post_fix_status_codes_and_stringified_floats(self, result: list[dict[str, Any]]) -> list[dict[str, Any]]:
# Due to 'ignore_unknown_ids', we can't just zip queries & results and iterate... sadness
if self.ignore_unknown_ids and len(result) < len(self._all_identifiers):
ids_exists = (
{("id", r["id"]) for r in result}
.union({("xid", r["externalId"]) for r in result})
.difference({("xid", None)})
) # fmt: skip
self._all_identifiers = [
query
for query in self._all_identifiers
if ids_exists.intersection((("id", query.get("id")), ("xid", query.get("externalId"))))
]
for query, res in zip(self._all_identifiers, result):
if not (dps := res["datapoints"]):
continue
(dp,) = dps
if query.get("includeStatus") is True:
dp.setdefault("status", {"code": 0, "symbol": "Good"}) # Not returned from API by default
if query.get("ignoreBadDataPoints") is False:
# Bad data can have value missing (we translate to None):
dp.setdefault("value", None)
if not res["isString"]:
dp["value"] = self._json_float_translation(dp["value"])
return result

def fetch_datapoints(self) -> list[dict[str, Any]]:
tasks = [
{
"url_path": self.dps_client._RESOURCE_PATH + "/latest",
"json": {"items": chunk, "ignoreUnknownIds": self.ignore_unknown_ids},
"api_subversion": self.api_subversion, # TODO: remove once status codes -> GA
}
for chunk in split_into_chunks(self._all_identifiers, self.dps_client._RETRIEVE_LATEST_LIMIT)
]
Expand All @@ -1737,4 +1846,5 @@ def fetch_datapoints(self) -> list[dict[str, Any]]:
task_unwrap_fn=unpack_items_in_payload,
task_list_element_unwrap_fn=IdentifierSequenceCore.extract_identifiers,
)
return tasks_summary.joined_results(lambda res: res.json()["items"])
result = tasks_summary.joined_results(lambda res: res.json()["items"])
return self._post_fix_status_codes_and_stringified_floats(result)
2 changes: 1 addition & 1 deletion cognite/client/_version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from __future__ import annotations

__version__ = "7.33.1"
__version__ = "7.34.0"
__api_subversion__ = "20230101"
18 changes: 14 additions & 4 deletions cognite/client/data_classes/datapoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,17 +167,27 @@ class LatestDatapointQuery:
before (Union[None, int, str, datetime]): Get latest datapoint before this time. None means 'now'.
target_unit (str | None): The unit_external_id of the data points returned. If the time series does not have a unit_external_id that can be converted to the target_unit, an error will be returned. Cannot be used with target_unit_system.
target_unit_system (str | None): The unit system of the data points returned. Cannot be used with target_unit.
include_status (bool): Also return the status code, an integer, for each datapoint in the response.
ignore_bad_datapoints (bool): Prevent data points with a bad status code to be returned. Default: True.
treat_uncertain_as_bad (bool): Treat uncertain status codes as bad. If false, treat uncertain as good. Default: True.
"""

id: int | None = None
external_id: str | None = None
id: InitVar[int | None] = None
external_id: InitVar[str | None] = None
before: None | int | str | datetime = None
target_unit: str | None = None
target_unit_system: str | None = None
include_status: bool | None = None
ignore_bad_datapoints: bool | None = None
treat_uncertain_as_bad: bool | None = None

def __post_init__(self) -> None:
def __post_init__(self, id: int | None, external_id: str | None) -> None:
# Ensure user have just specified one of id/xid:
Identifier.of_either(self.id, self.external_id)
object.__setattr__(self, "_identifier", Identifier.of_either(id, external_id))

@property
def identifier(self) -> Identifier:
return self._identifier # type: ignore [attr-defined]


class Datapoint(CogniteResource):
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[tool.poetry]
name = "cognite-sdk"

version = "7.33.1"
version = "7.34.0"
description = "Cognite Python SDK"
readme = "README.md"
documentation = "https://cognite-sdk-python.readthedocs-hosted.com"
Expand Down
7 changes: 5 additions & 2 deletions scripts/create_ts_for_integration_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,11 +291,14 @@ def get_bad():
for ts in idx:
v, c = get_bad()
dps_all_bad.append({"timestamp": ts, "value": v, "status": {"code": c}})
dps_all_bad_str.append({"timestamp": ts, "value": f"str-{v}", "status": {"code": c}})
# ~50/50 values that can be interpreted as floats (for tests ensuring not converted to float):
string_value = v if v is None else f"str-{v}" if ts % 23 % 2 else str(v)
dps_all_bad_str.append({"timestamp": ts, "value": string_value, "status": {"code": c}})

v, c = random.choice([get_good, get_uncertain, get_bad])()
dps.append({"timestamp": ts, "value": v, "status": {"code": c}})
dps_str.append({"timestamp": ts, "value": f"str-{v}", "status": {"code": c}})
string_value = v if v is None else f"str-{v}" if ts % 23 % 2 else str(v)
dps_str.append({"timestamp": ts, "value": string_value, "status": {"code": c}})

mixed_ts, mixed_ts_str, bad_ts, bad_ts_str = NAMES[118:122]
client.time_series.upsert(
Expand Down
Loading

0 comments on commit b18ec1e

Please sign in to comment.