Skip to content

Commit

Permalink
Deduplicate during fetching when retrieving related resources on Asse…
Browse files Browse the repository at this point in the history
…tList (#504)

* Implement deduplication and update tests.

* Update CHANGELOG.md

* Updates after review.

* Move lock out of loop.
  • Loading branch information
HMEiding authored Aug 20, 2019
1 parent 37414b6 commit 8a8a3ee
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ Changes are grouped as follows

### Changed
- assets.create() no longer validates asset hierarchy and sorts assets before posting. This functionality has been moved to assets.create_hierarchy().
- AssetList.files() and AssetList.events() now deduplicate results during fetching instead of as a postprocessing step.

## [1.0.5] - 2019-08-15
### Added
Expand Down
31 changes: 22 additions & 9 deletions cognite/client/data_classes/assets.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import threading
from typing import *

from cognite.client.data_classes._base import *
Expand Down Expand Up @@ -173,6 +174,10 @@ class AssetList(CogniteResourceList):
_RESOURCE = Asset
_UPDATE = AssetUpdate

def __init__(self, resources: List[Any], cognite_client=None):
super().__init__(resources, cognite_client)
self._retrieve_chunk_size = 100

def time_series(self) -> "TimeSeriesList":
"""Retrieve all time series related to these assets.
Expand Down Expand Up @@ -204,21 +209,29 @@ def files(self) -> "FileMetadataList":
return self._retrieve_related_resources(FileMetadataList, self._cognite_client.files)

def _retrieve_related_resources(self, resource_list_class, resource_api):
seen = set()
lock = threading.Lock()

def retrieve_and_deduplicate(asset_ids):
res = resource_api.list(asset_ids=asset_ids, limit=-1)
resources = resource_list_class([])
with lock:
for resource in res:
if resource.id not in seen:
resources.append(resource)
seen.add(resource.id)
return resources

ids = [a.id for a in self.data]
tasks = []
chunk_size = 100
for i in range(0, len(ids), chunk_size):
tasks.append({"asset_ids": ids[i : i + chunk_size], "limit": -1})
for i in range(0, len(ids), self._retrieve_chunk_size):
tasks.append({"asset_ids": ids[i : i + self._retrieve_chunk_size]})
res_list = utils._concurrency.execute_tasks_concurrently(
resource_api.list, tasks, resource_api._config.max_workers
retrieve_and_deduplicate, tasks, resource_api._config.max_workers
).results
resources = resource_list_class([])
seen = set()
for res in res_list:
for resource in res:
if resource.id not in seen:
resources.append(resource)
seen.add(resource.id)
resources.extend(res)
return resources


Expand Down
15 changes: 9 additions & 6 deletions tests/tests_unit/test_data_classes/test_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,22 @@ def test_get_files(self):
"resource_class, resource_list_class, method",
[(FileMetadata, FileMetadataList, "files"), (Event, EventList, "events")],
)
@mock.patch("cognite.client.utils._concurrency")
def test_get_related_resources_should_not_return_duplicates(
self, mock_concurrency, resource_class, resource_list_class, method
):
assets = AssetList([Asset(id=1), Asset(id=2), Asset(id=3)], cognite_client=mock.MagicMock())
def test_get_related_resources_should_not_return_duplicates(self, resource_class, resource_list_class, method):
r1 = resource_class(id=1)
r2 = resource_class(id=2)
r3 = resource_class(id=3)
resources_a1 = resource_list_class([r1])
resources_a2 = resource_list_class([r2, r3])
resources_a3 = resource_list_class([r2, r3])

mock_concurrency.execute_tasks_concurrently.return_value.results = [resources_a1, resources_a2, resources_a3]
mock_cognite_client = mock.MagicMock()
mock_method = getattr(mock_cognite_client, method)
mock_method.list.side_effect = [resources_a1, resources_a2, resources_a3]
mock_method._config = mock.Mock(max_workers=3)

assets = AssetList([Asset(id=1), Asset(id=2), Asset(id=3)], cognite_client=mock_cognite_client)
assets._retrieve_chunk_size = 1

resources = getattr(assets, method)()
expected = [r1, r2, r3]
assert expected == resources

0 comments on commit 8a8a3ee

Please sign in to comment.