Skip to content

Commit

Permalink
fix: results ordering should match requested identifiers (#1710)
Browse files Browse the repository at this point in the history
  • Loading branch information
haakonvt committed Apr 10, 2024
1 parent 2ace8a3 commit 9333494
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 16 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ Changes are grouped as follows
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## [7.33.1] - 2024-04-10
### Fixed
- Ordering of elements from calls to `retrieve_multiple` now match the requested elements. For SDK versions between
7.0.0 and 7.33.1, the ordering has been broken when >> 1k elements has been requested (the more requests used, the
more likely that a chunk was out of order).

## [7.33.0] - 2024-04-08
### Added
- All datapoints retrieve methods (except `retrieve_latest`) now support status codes. Note: Support for *inserting*
Expand Down
2 changes: 1 addition & 1 deletion cognite/client/_version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from __future__ import annotations

__version__ = "7.33.0"
__version__ = "7.33.1"
__api_subversion__ = "20230101"
27 changes: 21 additions & 6 deletions cognite/client/utils/_concurrency.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ def execute_tasks_serially(
elif isinstance(task, tuple):
results.append(func(*task))
else:
continue
raise TypeError(f"invalid task type: {type(task)}")
successful_tasks.append(task)

except Exception as err:
Expand All @@ -275,28 +275,34 @@ def execute_tasks(
"""
Will use a default executor if one is not passed explicitly. The default executor type uses a thread pool but can
be changed using ConcurrencySettings.executor_type.
Results are returned in the same order as that given by tasks.
"""
if ConcurrencySettings.uses_mainthread() or isinstance(executor, MainThreadExecutor):
return execute_tasks_serially(func, tasks, fail_fast)

executor = executor or ConcurrencySettings.get_thread_pool_executor(max_workers)
task_order = [id(task) for task in tasks]

futures_dct: dict[Future, tuple | dict] = {}
for task in tasks:
if isinstance(task, dict):
futures_dct[executor.submit(func, **task)] = task
elif isinstance(task, tuple):
futures_dct[executor.submit(func, *task)] = task
else:
raise TypeError(f"invalid task type: {type(task)}")

results, exceptions = [], []
successful_tasks, failed_tasks, unknown_result_tasks, skipped_tasks = [], [], [], []
results: dict[int, tuple | dict] = {}
successful_tasks: dict[int, tuple | dict] = {}
failed_tasks, unknown_result_tasks, skipped_tasks, exceptions = [], [], [], []

for fut in as_completed(futures_dct):
task = futures_dct[fut]
try:
res = fut.result()
successful_tasks.append(task)
results.append(res)
results[id(task)] = task
successful_tasks[id(task)] = res
except CancelledError:
# In fail-fast mode, after an error has been raised, we attempt to cancel and skip tasks:
skipped_tasks.append(task)
Expand All @@ -313,7 +319,16 @@ def execute_tasks(
for fut in futures_dct:
fut.cancel()

return TasksSummary(successful_tasks, unknown_result_tasks, failed_tasks, skipped_tasks, results, exceptions)
ordered_successful_tasks = [results[task_id] for task_id in task_order if task_id in results]
ordered_results = [successful_tasks[task_id] for task_id in task_order if task_id in successful_tasks]
return TasksSummary(
ordered_successful_tasks,
unknown_result_tasks,
failed_tasks,
skipped_tasks,
ordered_results,
exceptions,
)


def classify_error(err: Exception) -> Literal["failed", "unknown"]:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[tool.poetry]
name = "cognite-sdk"

version = "7.33.0"
version = "7.33.1"
description = "Cognite Python SDK"
readme = "README.md"
documentation = "https://cognite-sdk-python.readthedocs-hosted.com"
Expand Down
12 changes: 12 additions & 0 deletions tests/tests_integration/test_api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
generic resource, an arbitrary resource is used instead to test the endpoint.
"""

import random
from unittest.mock import patch

import pytest
Expand Down Expand Up @@ -386,6 +387,17 @@ def test_retrieve_multiple_empty(self, cognite_client: CogniteClient) -> None:
assert isinstance(res, EventList)
assert len(res) == 0

def test_retrieve_multiple__ordering_matches_input(
self, cognite_client: CogniteClient, monkeypatch: MonkeyPatch
) -> None:
# Between SDK version 7.0.0 and 7.33.1, ordering of results was broken when >> 1k elements
# was requested (meaning multiple requests were used):
event_ids = cognite_client.events.list(limit=1000).as_ids()
random.shuffle(event_ids)
monkeypatch.setattr(cognite_client.events, "_RETRIEVE_LIMIT", 80)
res = cognite_client.events.retrieve_multiple(ids=event_ids)
assert res.as_ids() == event_ids


class TestAPIClientDelete:
def test_delete_empty(self, cognite_client: CogniteClient) -> None:
Expand Down
45 changes: 37 additions & 8 deletions tests/tests_unit/test_utils/test_concurrency.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import random
import time
from concurrent.futures import ThreadPoolExecutor

import pytest

from cognite.client.exceptions import CogniteAPIError
from cognite.client.utils._auxiliary import no_op
from cognite.client.utils._concurrency import (
ConcurrencySettings,
MainThreadExecutor,
Expand Down Expand Up @@ -37,15 +37,44 @@ def test_set_and_get_executor(self) -> None:

@pytest.mark.parametrize(
"executor",
(ConcurrencySettings.get_mainthread_executor(), ConcurrencySettings.get_thread_pool_executor(2)),
(ConcurrencySettings.get_mainthread_executor(), ConcurrencySettings.get_thread_pool_executor(5)),
)
def test_executors(self, executor) -> None:
def test_executors__results_ordering_match_tasks(self, executor) -> None:
assert ConcurrencySettings.executor_type == "threadpool"

task_summary = execute_tasks(no_op, [(i,) for i in range(10)], 10, executor=executor)
# We use a task that yields thread control and use N tasks >> max_workers to really test ordering:
task_summary = execute_tasks(
lambda i: time.sleep(random.random() / 50) or i,
tasks=[(i,) for i in range(50)],
max_workers=10,
executor=executor,
)
task_summary.raise_compound_exception_if_failed_tasks()

assert sorted(task_summary.results) == list(range(10))
assert task_summary.results == list(range(50))

def test_executors__results_ordering_match_tasks_even_with_failures(self) -> None:
executor = ConcurrencySettings.get_thread_pool_executor(5)

def test_fn(i: int) -> int:
time.sleep(random.random() / 50)
if i in range(20, 23):
raise ValueError
return i

task_summary = execute_tasks(
test_fn,
tasks=[(i,) for i in range(50)],
max_workers=10,
executor=executor,
)
exp_res = [*range(20), *range(23, 50)]
assert task_summary.results == exp_res
assert task_summary.successful_tasks == [(res,) for res in exp_res]
assert set(task_summary.failed_tasks) == set([(20,), (22,), (21,)])

with pytest.raises(ValueError):
task_summary.raise_compound_exception_if_failed_tasks()

@pytest.mark.parametrize("fail_fast", (False, True))
def test_fail_fast__execute_tasks_serially(self, fail_fast):
Expand All @@ -67,7 +96,7 @@ def test_fail_fast__execute_tasks_with_threads(self, fail_fast):
assert ConcurrencySettings.executor_type == "threadpool"
task_summary = execute_tasks(
i_dont_like_5,
list(zip(range(10))),
list(zip(range(15))),
max_workers=3,
fail_fast=fail_fast,
)
Expand All @@ -77,7 +106,7 @@ def test_fail_fast__execute_tasks_with_threads(self, fail_fast):
assert err.value.failed == [(5,)]
if fail_fast:
assert err.value.skipped
assert 9 == len(err.value.successful) + len(err.value.skipped)
assert 14 == len(err.value.successful) + len(err.value.skipped)
else:
assert not err.value.skipped
assert 9 == len(err.value.successful)
assert 14 == len(err.value.successful)

0 comments on commit 9333494

Please sign in to comment.