From 89510863eb910ae95c62e3455ea6366f79463fe7 Mon Sep 17 00:00:00 2001 From: Attila Papai Date: Thu, 7 Mar 2024 18:08:44 +0100 Subject: [PATCH 01/21] add dataset.upsert_data_rows method --- labelbox/schema/dataset.py | 61 +++++++++++++++++++++++++++++++++----- 1 file changed, 54 insertions(+), 7 deletions(-) diff --git a/labelbox/schema/dataset.py b/labelbox/schema/dataset.py index ffed0534f..a61a8a100 100644 --- a/labelbox/schema/dataset.py +++ b/labelbox/schema/dataset.py @@ -1,4 +1,5 @@ -from typing import Dict, Generator, List, Optional, Union, Any +from datetime import datetime +from typing import Dict, Generator, List, Optional, Any import os import json import logging @@ -14,7 +15,6 @@ from io import StringIO import requests -from labelbox import pagination from labelbox.exceptions import InvalidQueryError, LabelboxError, ResourceNotFoundError, InvalidAttributeError from labelbox.orm.comparison import Comparison from labelbox.orm.db_object import DbObject, Updateable, Deletable, experimental @@ -65,16 +65,16 @@ def data_rows( from_cursor: Optional[str] = None, where: Optional[Comparison] = None, ) -> PaginatedCollection: - """ + """ Custom method to paginate data_rows via cursor. Args: from_cursor (str): Cursor (data row id) to start from, if none, will start from the beginning - where (dict(str,str)): Filter to apply to data rows. Where value is a data row column name and key is the value to filter on. + where (dict(str,str)): Filter to apply to data rows. Where value is a data row column name and key is the value to filter on. example: {'external_id': 'my_external_id'} to get a data row with external_id = 'my_external_id' - NOTE: + NOTE: Order of retrieval is newest data row first. Deleted data rows are not retrieved. Failed data rows are not retrieved. @@ -639,13 +639,13 @@ def export_v2( ) -> Task: """ Creates a dataset export task with the given params and returns the task. - + >>> dataset = client.get_dataset(DATASET_ID) >>> task = dataset.export_v2( >>> filters={ >>> "last_activity_at": ["2000-01-01 00:00:00", "2050-01-01 00:00:00"], >>> "label_created_at": ["2000-01-01 00:00:00", "2050-01-01 00:00:00"], - >>> "data_row_ids": [DATA_ROW_ID_1, DATA_ROW_ID_2, ...] # or global_keys: [DATA_ROW_GLOBAL_KEY_1, DATA_ROW_GLOBAL_KEY_2, ...] + >>> "data_row_ids": [DATA_ROW_ID_1, DATA_ROW_ID_2, ...] # or global_keys: [DATA_ROW_GLOBAL_KEY_1, DATA_ROW_GLOBAL_KEY_2, ...] >>> }, >>> params={ >>> "performance_details": False, @@ -750,3 +750,50 @@ def _export( res = res[mutation_name] task_id = res["taskId"] return Task.get_task(self.client, task_id) + + def upsert_data_rows(self, items) -> "Task": + chunk_size = 3 + chunks = [ + items[i:i + chunk_size] for i in range(0, len(items), chunk_size) + ] + manifest = {"source": "SDK", "item_count": 0, "chunk_uris": []} + for chunk in chunks: + manifest["chunk_uris"].append(self._create_descriptor_file(chunk)) + manifest["item_count"] += len(chunk) + + data = json.dumps(manifest).encode("utf-8") + manifest_uri = self.client.upload_data(data, + content_type="application/json", + filename="manifest.json") + + dataset_param = "datasetId" + manifest_uri_param = "manifestUri" + query_str = """mutation UpsertDataRowsPyApi($%s: ID!, $%s: String!){ + upsertDataRows(data:{datasetId: $%s, manifestUri: $%s} + ){ taskId accepted errorMessage } } """ % ( + dataset_param, manifest_uri_param, dataset_param, + manifest_uri_param) + + res = self.client.execute(query_str, { + dataset_param: self.uid, + manifest_uri_param: manifest_uri + }) + res = res["upsertDataRows"] + if not res["accepted"]: + msg = res['errorMessage'] + raise InvalidQueryError( + f"Server did not accept DataRow upsert request. {msg}") + + # Fetch and return the task. + task_id = res["taskId"] + user: User = self.client.get_user() + tasks: List[Task] = list( + user.created_tasks(where=Entity.Task.uid == task_id)) + # Cache user in a private variable as the relationship can't be + # resolved due to server-side limitations (see Task.created_by) + # for more info. + if len(tasks) != 1: + raise ResourceNotFoundError(Entity.Task, task_id) + task: Task = tasks[0] + task._user = user + return task From 4d5949242609ead708f77f86f45a11a9caca7a3b Mon Sep 17 00:00:00 2001 From: Attila Papai Date: Tue, 12 Mar 2024 13:53:47 +0100 Subject: [PATCH 02/21] add DataRowSpec class, update return type of upsertDataRows --- labelbox/schema/data_row.py | 39 ++++++++++++++++++++++++++--- labelbox/schema/dataset.py | 32 ++++++++--------------- tests/integration/test_data_rows.py | 12 ++++++++- 3 files changed, 57 insertions(+), 26 deletions(-) diff --git a/labelbox/schema/data_row.py b/labelbox/schema/data_row.py index 9d8758934..d7cb4d5c7 100644 --- a/labelbox/schema/data_row.py +++ b/labelbox/schema/data_row.py @@ -1,10 +1,12 @@ import logging +from enum import Enum from typing import TYPE_CHECKING, List, Optional, Union import json from labelbox.orm import query from labelbox.orm.db_object import DbObject, Updateable, BulkDeletable, experimental from labelbox.orm.model import Entity, Field, Relationship +from labelbox.pydantic_compat import BaseModel from labelbox.schema.data_row_metadata import DataRowMetadataField # type: ignore from labelbox.schema.export_filters import DatarowExportFilters, build_filters, validate_at_least_one_of_data_row_ids_or_global_keys from labelbox.schema.export_params import CatalogExportParams, validate_catalog_export_params @@ -17,6 +19,35 @@ logger = logging.getLogger(__name__) +class KeyType(str, Enum): + ID = 'ID' + """An existing CUID""" + GKEY = 'GKEY' + """A Global key, could be existing or non-existing""" + AUTO = 'AUTO' + """The key will be auto-generated. Only usable for creates""" + + +class ResolvableId(BaseModel): + """ + The ResolvableId class is a unique ID abstraction that allows us to reference + a DataRow by either a Global key or CUID + """ + type: KeyType = KeyType.GKEY + value: str + + +class DataRowSpec(BaseModel): + row_data: Union[str, dict] + external_id: Optional[str] + global_key: Optional[str] + + +class DataRowUpsertItem(BaseModel): + id: ResolvableId + payload: DataRowSpec + + class DataRow(DbObject, Updateable, BulkDeletable): """ Internal Labelbox representation of a single piece of data (e.g. image, video, text). @@ -220,13 +251,13 @@ def export_v2( task_name (str): name of remote task params (CatalogExportParams): export params - + >>> dataset = client.get_dataset(DATASET_ID) >>> task = DataRow.export_v2( - >>> data_rows=[data_row.uid for data_row in dataset.data_rows.list()], + >>> data_rows=[data_row.uid for data_row in dataset.data_rows.list()], >>> # or a list of DataRow objects: data_rows = data_set.data_rows.list() - >>> # or a list of global_keys=["global_key_1", "global_key_2"], - >>> # Note that exactly one of: data_rows or global_keys parameters can be passed in at a time + >>> # or a list of global_keys=["global_key_1", "global_key_2"], + >>> # Note that exactly one of: data_rows or global_keys parameters can be passed in at a time >>> # and if data rows ids is present, global keys will be ignored >>> params={ >>> "performance_details": False, diff --git a/labelbox/schema/dataset.py b/labelbox/schema/dataset.py index a61a8a100..407b239d0 100644 --- a/labelbox/schema/dataset.py +++ b/labelbox/schema/dataset.py @@ -22,7 +22,7 @@ from labelbox.orm import query from labelbox.exceptions import MalformedQueryException from labelbox.pagination import PaginatedCollection -from labelbox.schema.data_row import DataRow +from labelbox.schema.data_row import DataRow, DataRowUpsertItem from labelbox.schema.export_filters import DatasetExportFilters, build_filters from labelbox.schema.export_params import CatalogExportParams, validate_catalog_export_params from labelbox.schema.export_task import ExportTask @@ -461,7 +461,12 @@ def formatLegacyConversationalData(item): item["row_data"] = one_conversation return item - def convert_item(item): + def convert_item(data_row_item): + if isinstance(data_row_item, DataRowUpsertItem): + item = data_row_item.payload.dict() + else: + item = data_row_item + if "tileLayerUrl" in item: validate_attachments(item) return item @@ -751,7 +756,7 @@ def _export( task_id = res["taskId"] return Task.get_task(self.client, task_id) - def upsert_data_rows(self, items) -> "Task": + def upsert_data_rows(self, items: list[DataRowUpsertItem]) -> "Task": chunk_size = 3 chunks = [ items[i:i + chunk_size] for i in range(0, len(items), chunk_size) @@ -770,7 +775,7 @@ def upsert_data_rows(self, items) -> "Task": manifest_uri_param = "manifestUri" query_str = """mutation UpsertDataRowsPyApi($%s: ID!, $%s: String!){ upsertDataRows(data:{datasetId: $%s, manifestUri: $%s} - ){ taskId accepted errorMessage } } """ % ( + ){ id createdAt updatedAt name status completionPercentage result errors type metadata } } """ % ( dataset_param, manifest_uri_param, dataset_param, manifest_uri_param) @@ -779,21 +784,6 @@ def upsert_data_rows(self, items) -> "Task": manifest_uri_param: manifest_uri }) res = res["upsertDataRows"] - if not res["accepted"]: - msg = res['errorMessage'] - raise InvalidQueryError( - f"Server did not accept DataRow upsert request. {msg}") - - # Fetch and return the task. - task_id = res["taskId"] - user: User = self.client.get_user() - tasks: List[Task] = list( - user.created_tasks(where=Entity.Task.uid == task_id)) - # Cache user in a private variable as the relationship can't be - # resolved due to server-side limitations (see Task.created_by) - # for more info. - if len(tasks) != 1: - raise ResourceNotFoundError(Entity.Task, task_id) - task: Task = tasks[0] - task._user = user + task = Task(self.client, res) + task._user = self.client.get_user() return task diff --git a/tests/integration/test_data_rows.py b/tests/integration/test_data_rows.py index e57ad4d9c..a5e14c70a 100644 --- a/tests/integration/test_data_rows.py +++ b/tests/integration/test_data_rows.py @@ -1,8 +1,9 @@ from tempfile import NamedTemporaryFile -import time import uuid from datetime import datetime import json + +from labelbox.schema.data_row import DataRowUpsertItem, ResolvableId, DataRowSpec from labelbox.schema.media_type import MediaType import pytest @@ -1025,3 +1026,12 @@ def test_create_data_row_with_media_type(dataset, image_url): exc.value) dataset.create_data_row(row_data=image_url, media_type="IMAGE") + + +def test_upsert_data_rows(dataset, image_url): + task = dataset.upsert_data_rows([ + DataRowUpsertItem(id=ResolvableId(value="gkey123"), + payload=DataRowSpec(row_data=image_url)) + ]) + task.wait_till_done() + assert task.status == "COMPLETE" From 90e7a4c8100be9f59eccaabf69745b7a1e889df4 Mon Sep 17 00:00:00 2001 From: Attila Papai Date: Thu, 14 Mar 2024 13:24:53 +0100 Subject: [PATCH 03/21] fix item validation for upsert --- labelbox/schema/dataset.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/labelbox/schema/dataset.py b/labelbox/schema/dataset.py index 407b239d0..c71fd2b5e 100644 --- a/labelbox/schema/dataset.py +++ b/labelbox/schema/dataset.py @@ -484,7 +484,11 @@ def convert_item(data_row_item): parse_metadata_fields(item) # Upload any local file paths item = upload_if_necessary(item) - return item + + if isinstance(data_row_item, DataRowUpsertItem): + return {'id': data_row_item.id.dict(), 'payload': item} + else: + return item if not isinstance(items, Iterable): raise ValueError( From 456bdbabcf736aadbebb0dd50715c7f133f66aca Mon Sep 17 00:00:00 2001 From: Attila Papai Date: Tue, 19 Mar 2024 18:59:45 +0100 Subject: [PATCH 04/21] improve dataset.upsert_data_rows method and add more tests --- labelbox/schema/asset_attachment.py | 41 ++++++----- labelbox/schema/data_row.py | 33 ++++++--- labelbox/schema/dataset.py | 28 ++++++-- tests/integration/test_data_rows.py | 10 --- tests/integration/test_data_rows_upsert.py | 80 ++++++++++++++++++++++ 5 files changed, 148 insertions(+), 44 deletions(-) create mode 100644 tests/integration/test_data_rows_upsert.py diff --git a/labelbox/schema/asset_attachment.py b/labelbox/schema/asset_attachment.py index 2ec65de21..8b18bd01b 100644 --- a/labelbox/schema/asset_attachment.py +++ b/labelbox/schema/asset_attachment.py @@ -6,6 +6,25 @@ from labelbox.orm.model import Field +class AttachmentType(str, Enum): + + @classmethod + def __missing__(cls, value: object): + if str(value) == "TEXT": + warnings.warn( + "The TEXT attachment type is deprecated. Use RAW_TEXT instead.") + return cls.RAW_TEXT + return value + + VIDEO = "VIDEO" + IMAGE = "IMAGE" + IMAGE_OVERLAY = "IMAGE_OVERLAY" + HTML = "HTML" + RAW_TEXT = "RAW_TEXT" + TEXT_URL = "TEXT_URL" + PDF_URL = "PDF_URL" + + class AssetAttachment(DbObject): """Asset attachment provides extra context about an asset while labeling. @@ -14,26 +33,6 @@ class AssetAttachment(DbObject): attachment_value (str): URL to an external file or a string of text """ - class AttachmentType(Enum): - - @classmethod - def __missing__(cls, value: object): - if str(value) == "TEXT": - warnings.warn( - "The TEXT attachment type is deprecated. Use RAW_TEXT instead." - ) - return cls.RAW_TEXT - return value - - VIDEO = "VIDEO" - IMAGE = "IMAGE" - # TEXT = "TEXT" # Deprecated - IMAGE_OVERLAY = "IMAGE_OVERLAY" - HTML = "HTML" - RAW_TEXT = "RAW_TEXT" - TEXT_URL = "TEXT_URL" - PDF_URL = "PDF_URL" - for topic in AttachmentType: vars()[topic.name] = topic.value @@ -51,7 +50,7 @@ def validate_attachment_json(cls, attachment_json: Dict[str, str]) -> None: @classmethod def validate_attachment_type(cls, attachment_type: str) -> None: - valid_types = set(cls.AttachmentType.__members__) + valid_types = set(AttachmentType.__members__) if attachment_type not in valid_types: raise ValueError( f"meta_type must be one of {valid_types}. Found {attachment_type}" diff --git a/labelbox/schema/data_row.py b/labelbox/schema/data_row.py index d7cb4d5c7..8618cff63 100644 --- a/labelbox/schema/data_row.py +++ b/labelbox/schema/data_row.py @@ -1,12 +1,13 @@ import logging from enum import Enum -from typing import TYPE_CHECKING, List, Optional, Union +from typing import TYPE_CHECKING, List, Optional, Union, Any import json from labelbox.orm import query from labelbox.orm.db_object import DbObject, Updateable, BulkDeletable, experimental from labelbox.orm.model import Entity, Field, Relationship -from labelbox.pydantic_compat import BaseModel +from labelbox.pydantic_compat import BaseModel, Field as PydanticField +from labelbox.schema.asset_attachment import AttachmentType from labelbox.schema.data_row_metadata import DataRowMetadataField # type: ignore from labelbox.schema.export_filters import DatarowExportFilters, build_filters, validate_at_least_one_of_data_row_ids_or_global_keys from labelbox.schema.export_params import CatalogExportParams, validate_catalog_export_params @@ -28,23 +29,39 @@ class KeyType(str, Enum): """The key will be auto-generated. Only usable for creates""" -class ResolvableId(BaseModel): +class DataRowKey(BaseModel): """ - The ResolvableId class is a unique ID abstraction that allows us to reference + The DataRowKey class is a unique ID abstraction that allows us to reference a DataRow by either a Global key or CUID """ type: KeyType = KeyType.GKEY value: str +class DataRowMetadataSpec(BaseModel): + schema_id: Optional[str] + value: Any + name: Optional[str] + + +class DataRowAttachmentSpec(BaseModel): + type: AttachmentType + value: str + name: Optional[str] + + class DataRowSpec(BaseModel): - row_data: Union[str, dict] + key: Optional[DataRowKey] = PydanticField(exclude=True) + dataset_id: str + row_data: Optional[Union[str, dict]] external_id: Optional[str] global_key: Optional[str] + metadata: Optional[List[DataRowMetadataSpec]] + attachments: Optional[List[DataRowAttachmentSpec]] class DataRowUpsertItem(BaseModel): - id: ResolvableId + id: DataRowKey payload: DataRowSpec @@ -93,7 +110,7 @@ class DataRow(DbObject, Updateable, BulkDeletable): attachments = Relationship.ToMany("AssetAttachment", False, "attachments") supported_meta_types = supported_attachment_types = set( - Entity.AssetAttachment.AttachmentType.__members__) + AttachmentType.__members__) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -162,7 +179,7 @@ def create_attachment(self, Args: attachment_type (str): Asset attachment type, must be one of: - VIDEO, IMAGE, TEXT, IMAGE_OVERLAY (AssetAttachment.AttachmentType) + VIDEO, IMAGE, TEXT, IMAGE_OVERLAY (AttachmentType) attachment_value (str): Asset attachment value. attachment_name (str): (Optional) Asset attachment name. Returns: diff --git a/labelbox/schema/dataset.py b/labelbox/schema/dataset.py index c71fd2b5e..30fc5b00b 100644 --- a/labelbox/schema/dataset.py +++ b/labelbox/schema/dataset.py @@ -22,7 +22,7 @@ from labelbox.orm import query from labelbox.exceptions import MalformedQueryException from labelbox.pagination import PaginatedCollection -from labelbox.schema.data_row import DataRow, DataRowUpsertItem +from labelbox.schema.data_row import DataRow, DataRowUpsertItem, DataRowSpec, DataRowKey, KeyType from labelbox.schema.export_filters import DatasetExportFilters, build_filters from labelbox.schema.export_params import CatalogExportParams, validate_catalog_export_params from labelbox.schema.export_task import ExportTask @@ -434,9 +434,9 @@ def validate_keys(item): str) and item.get('row_data').startswith("s3:/"): raise InvalidQueryError( "row_data: s3 assets must start with 'https'.") - invalid_keys = set(item) - { - *{f.name for f in DataRow.fields()}, 'attachments', 'media_type' - } + allowed_extra_fields = {'attachments', 'media_type', 'dataset_id'} + invalid_keys = set(item) - {f.name for f in DataRow.fields() + } - allowed_extra_fields if invalid_keys: raise InvalidAttributeError(DataRow, invalid_keys) return item @@ -760,7 +760,25 @@ def _export( task_id = res["taskId"] return Task.get_task(self.client, task_id) - def upsert_data_rows(self, items: list[DataRowUpsertItem]) -> "Task": + def upsert_data_rows(self, specs: list[DataRowSpec]) -> "Task": + + def _convert_specs_to_upsert_items(_specs: list[DataRowSpec]): + _items: list[DataRowUpsertItem] = [] + for spec in _specs: + if spec.key: + key = spec.key + elif spec.global_key: + key = DataRowKey(type=KeyType.GKEY, value=spec.global_key) + else: + raise ValueError( + "Either 'key' or 'global_key' must be provided") + _items.append(DataRowUpsertItem( + payload=spec, + id=key, + )) + return _items + + items = _convert_specs_to_upsert_items(specs) chunk_size = 3 chunks = [ items[i:i + chunk_size] for i in range(0, len(items), chunk_size) diff --git a/tests/integration/test_data_rows.py b/tests/integration/test_data_rows.py index a5e14c70a..decdd25f0 100644 --- a/tests/integration/test_data_rows.py +++ b/tests/integration/test_data_rows.py @@ -3,7 +3,6 @@ from datetime import datetime import json -from labelbox.schema.data_row import DataRowUpsertItem, ResolvableId, DataRowSpec from labelbox.schema.media_type import MediaType import pytest @@ -1026,12 +1025,3 @@ def test_create_data_row_with_media_type(dataset, image_url): exc.value) dataset.create_data_row(row_data=image_url, media_type="IMAGE") - - -def test_upsert_data_rows(dataset, image_url): - task = dataset.upsert_data_rows([ - DataRowUpsertItem(id=ResolvableId(value="gkey123"), - payload=DataRowSpec(row_data=image_url)) - ]) - task.wait_till_done() - assert task.status == "COMPLETE" diff --git a/tests/integration/test_data_rows_upsert.py b/tests/integration/test_data_rows_upsert.py new file mode 100644 index 000000000..1b21383c7 --- /dev/null +++ b/tests/integration/test_data_rows_upsert.py @@ -0,0 +1,80 @@ +from labelbox.schema.asset_attachment import AttachmentType +from labelbox.schema.data_row import DataRowUpsertItem, DataRowKey, DataRowSpec, KeyType, DataRowAttachmentSpec + + +class TestDataRowUpsert: + + def test_create_data_row(self, client, dataset, image_url): + task = dataset.upsert_data_rows([ + DataRowSpec(dataset_id=dataset.uid, + row_data=image_url, + global_key="gkey123") + ]) + task.wait_till_done() + assert task.status == "COMPLETE" + dr = client.get_data_row_by_global_key("gkey123") + assert dr is not None + assert dr.row_data == image_url + assert dr.global_key == "gkey123" + + def test_update_fields_by_data_row_id(self, client, dataset, image_url): + dr = dataset.create_data_row(row_data=image_url, + external_id="ex1", + global_key="gk1") + task = dataset.upsert_data_rows([ + DataRowSpec(key=DataRowKey(value=dr.uid, type=KeyType.ID), + dataset_id=dataset.uid, + external_id="ex1_updated", + global_key="gk1_updated") + ]) + task.wait_till_done() + assert task.status == "COMPLETE" + dr = client.get_data_row(dr.uid) + assert dr is not None + assert dr.external_id == "ex1_updated" + assert dr.global_key == "gk1_updated" + + def test_update_fields_by_global_key(self, client, dataset, image_url): + dr = dataset.create_data_row(row_data=image_url, + external_id="ex1", + global_key="gk1") + task = dataset.upsert_data_rows([ + DataRowSpec(key=DataRowKey(value=dr.global_key, type=KeyType.GKEY), + dataset_id=dataset.uid, + external_id="ex1_updated", + global_key="gk1_updated") + ]) + task.wait_till_done() + assert task.status == "COMPLETE" + dr = client.get_data_row(dr.uid) + assert dr is not None + assert dr.external_id == "ex1_updated" + assert dr.global_key == "gk1_updated" + + def test_upsrt(self, client, image_url): + ds = list(client.get_datasets())[0] + task = ds.create_data_rows([{"row_data": image_url}]) + task.wait_till_done() + assert task.status == "COMPLETE" + drs = list(ds.data_rows()) + print(drs) + dr = drs[0] + task = ds.upsert_data_rows([ + DataRowSpec(key=DataRowKey(value=dr.uid, type=KeyType.ID), + dataset_id=ds.uid, + row_data=dr.row_data, + attachments=[ + DataRowAttachmentSpec(type=AttachmentType.RAW_TEXT, + name="att1", + value="test") + ]) + ]) + task.wait_till_done() + assert task.status == "COMPLETE" + dr = client.get_data_row(dr.uid) + assert dr is not None + attachments = list(dr.attachments()) + assert len(attachments) == 1 + assert attachments[0].attachment_type == AttachmentType.RAW_TEXT + assert attachments[0].attachment_value == "test" + assert attachments[0].attachment_name == "att1" From ccaf82185dbe3d04e1f5dd7a8ebab9d30f9e9ebe Mon Sep 17 00:00:00 2001 From: Attila Papai Date: Thu, 21 Mar 2024 12:17:33 +0100 Subject: [PATCH 05/21] improve dataset.upsert_data_rows method and add more tests --- labelbox/schema/data_row.py | 24 +++- labelbox/schema/dataset.py | 5 +- tests/integration/test_data_rows_upsert.py | 147 +++++++++++++++++---- 3 files changed, 145 insertions(+), 31 deletions(-) diff --git a/labelbox/schema/data_row.py b/labelbox/schema/data_row.py index 8618cff63..14ab9623b 100644 --- a/labelbox/schema/data_row.py +++ b/labelbox/schema/data_row.py @@ -1,4 +1,5 @@ import logging +from abc import ABC from enum import Enum from typing import TYPE_CHECKING, List, Optional, Union, Any import json @@ -29,15 +30,33 @@ class KeyType(str, Enum): """The key will be auto-generated. Only usable for creates""" -class DataRowKey(BaseModel): +class DataRowKey(BaseModel, ABC): """ The DataRowKey class is a unique ID abstraction that allows us to reference a DataRow by either a Global key or CUID """ - type: KeyType = KeyType.GKEY + type: KeyType value: str +class DataRowIdKey(DataRowKey): + """ + This represents a data row identifier key that is provided by Labelbox upon data row creation. + """ + + def __init__(self, value: str): + super().__init__(type=KeyType.ID, value=value) + + +class DataRowGlobalKey(DataRowKey): + """ + This represents a unique data row key that is provided by the user. + """ + + def __init__(self, value: str): + super().__init__(type=KeyType.GKEY, value=value) + + class DataRowMetadataSpec(BaseModel): schema_id: Optional[str] value: Any @@ -52,7 +71,6 @@ class DataRowAttachmentSpec(BaseModel): class DataRowSpec(BaseModel): key: Optional[DataRowKey] = PydanticField(exclude=True) - dataset_id: str row_data: Optional[Union[str, dict]] external_id: Optional[str] global_key: Optional[str] diff --git a/labelbox/schema/dataset.py b/labelbox/schema/dataset.py index 4692c9ba6..452cf0a6d 100644 --- a/labelbox/schema/dataset.py +++ b/labelbox/schema/dataset.py @@ -22,7 +22,7 @@ from labelbox.orm import query from labelbox.exceptions import MalformedQueryException from labelbox.pagination import PaginatedCollection -from labelbox.schema.data_row import DataRow, DataRowUpsertItem, DataRowSpec, DataRowKey, KeyType +from labelbox.schema.data_row import DataRow, DataRowUpsertItem, DataRowSpec, DataRowGlobalKey from labelbox.schema.export_filters import DatasetExportFilters, build_filters from labelbox.schema.export_params import CatalogExportParams, validate_catalog_export_params from labelbox.schema.export_task import ExportTask @@ -764,10 +764,11 @@ def upsert_data_rows(self, specs: list[DataRowSpec]) -> "Task": def _convert_specs_to_upsert_items(_specs: list[DataRowSpec]): _items: list[DataRowUpsertItem] = [] for spec in _specs: + spec.__dict__["dataset_id"] = self.uid if spec.key: key = spec.key elif spec.global_key: - key = DataRowKey(type=KeyType.GKEY, value=spec.global_key) + key = DataRowGlobalKey(spec.global_key) else: raise ValueError( "Either 'key' or 'global_key' must be provided") diff --git a/tests/integration/test_data_rows_upsert.py b/tests/integration/test_data_rows_upsert.py index 1b21383c7..c2dbfc230 100644 --- a/tests/integration/test_data_rows_upsert.py +++ b/tests/integration/test_data_rows_upsert.py @@ -1,29 +1,108 @@ +import uuid + +import pytest + from labelbox.schema.asset_attachment import AttachmentType -from labelbox.schema.data_row import DataRowUpsertItem, DataRowKey, DataRowSpec, KeyType, DataRowAttachmentSpec +from labelbox.schema.data_row import DataRowSpec, DataRowAttachmentSpec, DataRowIdKey, \ + DataRowGlobalKey, DataRowMetadataSpec class TestDataRowUpsert: - def test_create_data_row(self, client, dataset, image_url): + @pytest.fixture + def all_inclusive_data_row(self, dataset, image_url): + dr = dataset.create_data_row( + row_data=image_url, + external_id="ex1", + global_key=str(uuid.uuid4()), + metadata_fields=[{ + "name": "tag", + "value": "tag_string" + }, { + "name": "split", + "value": "train" + }], + attachments=[ + { + "type": "RAW_TEXT", + "name": "att1", + "value": "test1" + }, + { + "type": + "IMAGE", + "name": + "att2", + "value": + "https://storage.googleapis.com/labelbox-sample-datasets/Docs/disease_attachment.jpeg" + }, + { + "type": + "PDF_URL", + "name": + "att3", + "value": + "https://storage.googleapis.com/labelbox-datasets/arxiv-pdf/data/99-word-token-pdfs/0801.3483.pdf" + }, + ]) + return dr + + def test_create_data_row_with_upsert(self, client, dataset, image_url): task = dataset.upsert_data_rows([ - DataRowSpec(dataset_id=dataset.uid, - row_data=image_url, - global_key="gkey123") + DataRowSpec( + row_data=image_url, + global_key="gk1", + external_id="ex1", + attachments=[ + DataRowAttachmentSpec(type=AttachmentType.RAW_TEXT, + name="att1", + value="test1"), + DataRowAttachmentSpec( + type=AttachmentType.IMAGE, + name="att2", + value= + "https://storage.googleapis.com/labelbox-sample-datasets/Docs/disease_attachment.jpeg" + ), + DataRowAttachmentSpec( + type=AttachmentType.PDF_URL, + name="att3", + value= + "https://storage.googleapis.com/labelbox-datasets/arxiv-pdf/data/99-word-token-pdfs/0801.3483.pdf" + ) + ]) ]) task.wait_till_done() assert task.status == "COMPLETE" - dr = client.get_data_row_by_global_key("gkey123") + dr = client.get_data_row_by_global_key("gk1") + assert dr is not None assert dr.row_data == image_url - assert dr.global_key == "gkey123" + assert dr.global_key == "gk1" + assert dr.external_id == "ex1" + + attachments = list(dr.attachments()) + assert len(attachments) == 3 + assert attachments[0].attachment_name == "att1" + assert attachments[0].attachment_type == AttachmentType.RAW_TEXT + assert attachments[ + 0].attachment_value == "https://storage.googleapis.com/labelbox-sample-datasets/Docs/disease_attachment.jpeg" + + assert attachments[1].attachment_name == "att2" + assert attachments[1].attachment_type == AttachmentType.IMAGE + assert attachments[1].attachment_value == "test" - def test_update_fields_by_data_row_id(self, client, dataset, image_url): + assert attachments[2].attachment_name == "att3" + assert attachments[2].attachment_type == AttachmentType.PDF_URL + assert attachments[ + 2].attachment_value == "https://storage.googleapis.com/labelbox-datasets/arxiv-pdf/data/99-word-token-pdfs/0801.3483.pdf" + + def test_update_data_row_fields_with_upsert(self, client, dataset, + image_url): dr = dataset.create_data_row(row_data=image_url, external_id="ex1", global_key="gk1") task = dataset.upsert_data_rows([ - DataRowSpec(key=DataRowKey(value=dr.uid, type=KeyType.ID), - dataset_id=dataset.uid, + DataRowSpec(key=DataRowIdKey(dr.uid), external_id="ex1_updated", global_key="gk1_updated") ]) @@ -34,13 +113,13 @@ def test_update_fields_by_data_row_id(self, client, dataset, image_url): assert dr.external_id == "ex1_updated" assert dr.global_key == "gk1_updated" - def test_update_fields_by_global_key(self, client, dataset, image_url): + def test_update_data_row_fields_with_upsert_by_global_key( + self, client, dataset, image_url): dr = dataset.create_data_row(row_data=image_url, external_id="ex1", global_key="gk1") task = dataset.upsert_data_rows([ - DataRowSpec(key=DataRowKey(value=dr.global_key, type=KeyType.GKEY), - dataset_id=dataset.uid, + DataRowSpec(key=DataRowGlobalKey(dr.global_key), external_id="ex1_updated", global_key="gk1_updated") ]) @@ -51,17 +130,11 @@ def test_update_fields_by_global_key(self, client, dataset, image_url): assert dr.external_id == "ex1_updated" assert dr.global_key == "gk1_updated" - def test_upsrt(self, client, image_url): - ds = list(client.get_datasets())[0] - task = ds.create_data_rows([{"row_data": image_url}]) - task.wait_till_done() - assert task.status == "COMPLETE" - drs = list(ds.data_rows()) - print(drs) - dr = drs[0] - task = ds.upsert_data_rows([ - DataRowSpec(key=DataRowKey(value=dr.uid, type=KeyType.ID), - dataset_id=ds.uid, + def test_update_attachments_with_upsert(self, client, + all_inclusive_data_row, dataset): + dr = all_inclusive_data_row + task = dataset.upsert_data_rows([ + DataRowSpec(key=DataRowIdKey(dr.uid), row_data=dr.row_data, attachments=[ DataRowAttachmentSpec(type=AttachmentType.RAW_TEXT, @@ -75,6 +148,28 @@ def test_upsrt(self, client, image_url): assert dr is not None attachments = list(dr.attachments()) assert len(attachments) == 1 - assert attachments[0].attachment_type == AttachmentType.RAW_TEXT - assert attachments[0].attachment_value == "test" assert attachments[0].attachment_name == "att1" + + def test_update_metadata_with_upsert(self, client, all_inclusive_data_row, + dataset): + dr = all_inclusive_data_row + task = dataset.upsert_data_rows([ + DataRowSpec(key=DataRowGlobalKey(dr.global_key), + row_data=dr.row_data, + metadata=[ + DataRowMetadataSpec(name="tag", + value="updated tag"), + DataRowMetadataSpec(name="split", value="train") + ]) + ]) + task.wait_till_done() + assert task.status == "COMPLETE" + dr = client.get_data_row(dr.uid) + assert dr is not None + assert len(dr.metadata_fields) == 2 + tag_idx, split_idx = ( + 0, 1) if dr.metadata_fields[0]['name'] == "tag" else (1, 0) + assert dr.metadata_fields[tag_idx]['name'] == "tag" + assert dr.metadata_fields[tag_idx]['value'] == "updated tag" + assert dr.metadata_fields[split_idx]['name'] == "split" + assert dr.metadata_fields[split_idx]['value'] == "train" From 163626654d9daf715580365e3b8f7b2d5ee9d755 Mon Sep 17 00:00:00 2001 From: Attila Papai Date: Thu, 21 Mar 2024 13:24:43 +0100 Subject: [PATCH 06/21] adjust code to backend changes --- labelbox/schema/data_row.py | 9 +++++ labelbox/schema/dataset.py | 44 +++++++++++----------- tests/integration/test_data_rows_upsert.py | 25 +++++++++--- 3 files changed, 50 insertions(+), 28 deletions(-) diff --git a/labelbox/schema/data_row.py b/labelbox/schema/data_row.py index 14ab9623b..0f76c9833 100644 --- a/labelbox/schema/data_row.py +++ b/labelbox/schema/data_row.py @@ -57,6 +57,15 @@ def __init__(self, value: str): super().__init__(type=KeyType.GKEY, value=value) +class DataRowAutoKey(DataRowKey): + """ + This represents a key for a create-only data row. + """ + + def __init__(self): + super().__init__(type=KeyType.AUTO, value="") + + class DataRowMetadataSpec(BaseModel): schema_id: Optional[str] value: Any diff --git a/labelbox/schema/dataset.py b/labelbox/schema/dataset.py index 452cf0a6d..dc663f7ef 100644 --- a/labelbox/schema/dataset.py +++ b/labelbox/schema/dataset.py @@ -22,7 +22,7 @@ from labelbox.orm import query from labelbox.exceptions import MalformedQueryException from labelbox.pagination import PaginatedCollection -from labelbox.schema.data_row import DataRow, DataRowUpsertItem, DataRowSpec, DataRowGlobalKey +from labelbox.schema.data_row import DataRow, DataRowUpsertItem, DataRowSpec, DataRowGlobalKey, DataRowAutoKey from labelbox.schema.export_filters import DatasetExportFilters, build_filters from labelbox.schema.export_params import CatalogExportParams, validate_catalog_export_params from labelbox.schema.export_task import ExportTask @@ -761,6 +761,13 @@ def _export( def upsert_data_rows(self, specs: list[DataRowSpec]) -> "Task": + class ManifestFile: + + def __init__(self): + self.source = "SDK" + self.item_count = 0 + self.chunk_uris: list[str] = [] + def _convert_specs_to_upsert_items(_specs: list[DataRowSpec]): _items: list[DataRowUpsertItem] = [] for spec in _specs: @@ -770,12 +777,8 @@ def _convert_specs_to_upsert_items(_specs: list[DataRowSpec]): elif spec.global_key: key = DataRowGlobalKey(spec.global_key) else: - raise ValueError( - "Either 'key' or 'global_key' must be provided") - _items.append(DataRowUpsertItem( - payload=spec, - id=key, - )) + key = DataRowAutoKey() + _items.append(DataRowUpsertItem(payload=spec, id=key)) return _items items = _convert_specs_to_upsert_items(specs) @@ -783,28 +786,25 @@ def _convert_specs_to_upsert_items(_specs: list[DataRowSpec]): chunks = [ items[i:i + chunk_size] for i in range(0, len(items), chunk_size) ] - manifest = {"source": "SDK", "item_count": 0, "chunk_uris": []} + manifest = ManifestFile() for chunk in chunks: - manifest["chunk_uris"].append(self._create_descriptor_file(chunk)) - manifest["item_count"] += len(chunk) + manifest.chunk_uris.append(self._create_descriptor_file(chunk)) + manifest.item_count += len(chunk) - data = json.dumps(manifest).encode("utf-8") + data = json.dumps(manifest.__dict__).encode("utf-8") manifest_uri = self.client.upload_data(data, content_type="application/json", filename="manifest.json") - dataset_param = "datasetId" - manifest_uri_param = "manifestUri" - query_str = """mutation UpsertDataRowsPyApi($%s: ID!, $%s: String!){ - upsertDataRows(data:{datasetId: $%s, manifestUri: $%s} - ){ id createdAt updatedAt name status completionPercentage result errors type metadata } } """ % ( - dataset_param, manifest_uri_param, dataset_param, - manifest_uri_param) + query_str = """ + mutation UpsertDataRowsPyApi($manifestUri: String!) { + upsertDataRows(data: { manifestUri: $manifestUri }) { + id createdAt updatedAt name status completionPercentage result errors type metadata + } + } + """ - res = self.client.execute(query_str, { - dataset_param: self.uid, - manifest_uri_param: manifest_uri - }) + res = self.client.execute(query_str, {"manifestUri": manifest_uri}) res = res["upsertDataRows"] task = Task(self.client, res) task._user = self.client.get_user() diff --git a/tests/integration/test_data_rows_upsert.py b/tests/integration/test_data_rows_upsert.py index c2dbfc230..a197afb06 100644 --- a/tests/integration/test_data_rows_upsert.py +++ b/tests/integration/test_data_rows_upsert.py @@ -47,6 +47,11 @@ def all_inclusive_data_row(self, dataset, image_url): ]) return dr + def test_create_data_row_with_auto_key(self, dataset, image_url): + task = dataset.upsert_data_rows([DataRowSpec(row_data=image_url)]) + task.wait_till_done() + assert len(list(dataset.data_rows())) == 1 + def test_create_data_row_with_upsert(self, client, dataset, image_url): task = dataset.upsert_data_rows([ DataRowSpec( @@ -69,6 +74,10 @@ def test_create_data_row_with_upsert(self, client, dataset, image_url): value= "https://storage.googleapis.com/labelbox-datasets/arxiv-pdf/data/99-word-token-pdfs/0801.3483.pdf" ) + ], + metadata=[ + DataRowMetadataSpec(name="tag", value="tag_string"), + DataRowMetadataSpec(name="split", value="train") ]) ]) task.wait_till_done() @@ -96,6 +105,12 @@ def test_create_data_row_with_upsert(self, client, dataset, image_url): assert attachments[ 2].attachment_value == "https://storage.googleapis.com/labelbox-datasets/arxiv-pdf/data/99-word-token-pdfs/0801.3483.pdf" + assert len(dr.metadata_fields) == 2 + assert dr.metadata_fields[0]['name'] == "tag" + assert dr.metadata_fields[0]['value'] == "updated tag" + assert dr.metadata_fields[1]['name'] == "split" + assert dr.metadata_fields[1]['value'] == "train" + def test_update_data_row_fields_with_upsert(self, client, dataset, image_url): dr = dataset.create_data_row(row_data=image_url, @@ -167,9 +182,7 @@ def test_update_metadata_with_upsert(self, client, all_inclusive_data_row, dr = client.get_data_row(dr.uid) assert dr is not None assert len(dr.metadata_fields) == 2 - tag_idx, split_idx = ( - 0, 1) if dr.metadata_fields[0]['name'] == "tag" else (1, 0) - assert dr.metadata_fields[tag_idx]['name'] == "tag" - assert dr.metadata_fields[tag_idx]['value'] == "updated tag" - assert dr.metadata_fields[split_idx]['name'] == "split" - assert dr.metadata_fields[split_idx]['value'] == "train" + assert dr.metadata_fields[0]['name'] == "tag" + assert dr.metadata_fields[0]['value'] == "updated tag" + assert dr.metadata_fields[1]['name'] == "split" + assert dr.metadata_fields[1]['value'] == "train" From 25753e4f6d20a51b8366e905abd2df2321967369 Mon Sep 17 00:00:00 2001 From: Attila Papai Date: Thu, 21 Mar 2024 15:35:54 +0100 Subject: [PATCH 07/21] add upsert chunk size constant --- labelbox/schema/dataset.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/labelbox/schema/dataset.py b/labelbox/schema/dataset.py index dc663f7ef..aba817059 100644 --- a/labelbox/schema/dataset.py +++ b/labelbox/schema/dataset.py @@ -32,6 +32,7 @@ logger = logging.getLogger(__name__) MAX_DATAROW_PER_API_OPERATION = 150_000 +UPSERT_CHUNK_SIZE = 10_000 class Dataset(DbObject, Updateable, Deletable): @@ -760,6 +761,10 @@ def _export( return Task.get_task(self.client, task_id) def upsert_data_rows(self, specs: list[DataRowSpec]) -> "Task": + if len(specs) > MAX_DATAROW_PER_API_OPERATION: + raise MalformedQueryException( + f"Cannot upsert more than {MAX_DATAROW_PER_API_OPERATION} DataRows per function call." + ) class ManifestFile: @@ -782,9 +787,9 @@ def _convert_specs_to_upsert_items(_specs: list[DataRowSpec]): return _items items = _convert_specs_to_upsert_items(specs) - chunk_size = 3 chunks = [ - items[i:i + chunk_size] for i in range(0, len(items), chunk_size) + items[i:i + UPSERT_CHUNK_SIZE] + for i in range(0, len(items), UPSERT_CHUNK_SIZE) ] manifest = ManifestFile() for chunk in chunks: From 74fba0f3e5ccfa42a72c9142b33f1837685f2522 Mon Sep 17 00:00:00 2001 From: Attila Papai Date: Thu, 21 Mar 2024 15:36:23 +0100 Subject: [PATCH 08/21] add test for multiple chunks --- tests/integration/test_data_rows_upsert.py | 26 ++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/tests/integration/test_data_rows_upsert.py b/tests/integration/test_data_rows_upsert.py index a197afb06..c1c848973 100644 --- a/tests/integration/test_data_rows_upsert.py +++ b/tests/integration/test_data_rows_upsert.py @@ -1,4 +1,6 @@ +import json import uuid +from unittest.mock import patch import pytest @@ -186,3 +188,27 @@ def test_update_metadata_with_upsert(self, client, all_inclusive_data_row, assert dr.metadata_fields[0]['value'] == "updated tag" assert dr.metadata_fields[1]['name'] == "split" assert dr.metadata_fields[1]['value'] == "train" + + def test_multiple_chunks(self, client, dataset, image_url): + mocked_chunk_size = 3 + with patch('labelbox.client.Client.upload_data', + wraps=client.upload_data) as spy_some_function: + with patch('labelbox.schema.dataset.UPSERT_CHUNK_SIZE', + new=mocked_chunk_size): + task = dataset.upsert_data_rows( + [DataRowSpec(row_data=image_url) for i in range(10)]) + task.wait_till_done() + assert len(list(dataset.data_rows())) == 10 + assert spy_some_function.call_count == 5 # 4 chunks + manifest + + first_call_args, _ = spy_some_function.call_args_list[0] + first_chunk_content = first_call_args[0] + data = json.loads(first_chunk_content) + assert len(data) == mocked_chunk_size + + last_call_args, _ = spy_some_function.call_args_list[-1] + manifest_content = last_call_args[0].decode('utf-8') + data = json.loads(manifest_content) + assert data['source'] == "SDK" + assert data['item_count'] == 10 + assert len(data['chunk_uris']) == 4 From b70e6501ce1419addee68be59c88ed6b26367cbc Mon Sep 17 00:00:00 2001 From: Attila Papai Date: Thu, 21 Mar 2024 15:40:41 +0100 Subject: [PATCH 09/21] mypy fix --- labelbox/schema/dataset.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/labelbox/schema/dataset.py b/labelbox/schema/dataset.py index aba817059..c93e4be54 100644 --- a/labelbox/schema/dataset.py +++ b/labelbox/schema/dataset.py @@ -771,10 +771,10 @@ class ManifestFile: def __init__(self): self.source = "SDK" self.item_count = 0 - self.chunk_uris: list[str] = [] + self.chunk_uris: List[str] = [] def _convert_specs_to_upsert_items(_specs: list[DataRowSpec]): - _items: list[DataRowUpsertItem] = [] + _items: List[DataRowUpsertItem] = [] for spec in _specs: spec.__dict__["dataset_id"] = self.uid if spec.key: From bb071b818c21a88af30ff281a33d813dbd8f81cb Mon Sep 17 00:00:00 2001 From: Attila Papai Date: Thu, 21 Mar 2024 15:47:05 +0100 Subject: [PATCH 10/21] mypy fix --- labelbox/schema/dataset.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/labelbox/schema/dataset.py b/labelbox/schema/dataset.py index c93e4be54..0a37bfc5a 100644 --- a/labelbox/schema/dataset.py +++ b/labelbox/schema/dataset.py @@ -760,7 +760,7 @@ def _export( task_id = res["taskId"] return Task.get_task(self.client, task_id) - def upsert_data_rows(self, specs: list[DataRowSpec]) -> "Task": + def upsert_data_rows(self, specs: List[DataRowSpec]) -> "Task": if len(specs) > MAX_DATAROW_PER_API_OPERATION: raise MalformedQueryException( f"Cannot upsert more than {MAX_DATAROW_PER_API_OPERATION} DataRows per function call." @@ -773,7 +773,7 @@ def __init__(self): self.item_count = 0 self.chunk_uris: List[str] = [] - def _convert_specs_to_upsert_items(_specs: list[DataRowSpec]): + def _convert_specs_to_upsert_items(_specs: List[DataRowSpec]): _items: List[DataRowUpsertItem] = [] for spec in _specs: spec.__dict__["dataset_id"] = self.uid From 7e94b45517d9ea663f44a86180aa7190f68356df Mon Sep 17 00:00:00 2001 From: Attila Papai Date: Fri, 22 Mar 2024 19:02:26 +0100 Subject: [PATCH 11/21] exclude None from json --- labelbox/schema/dataset.py | 15 +++++++++++---- tests/integration/test_data_rows_upsert.py | 8 ++++---- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/labelbox/schema/dataset.py b/labelbox/schema/dataset.py index 0a37bfc5a..5e6a9504b 100644 --- a/labelbox/schema/dataset.py +++ b/labelbox/schema/dataset.py @@ -294,7 +294,10 @@ def create_data_rows(self, items) -> "Task": task._user = user return task - def _create_descriptor_file(self, items, max_attachments_per_data_row=None): + def _create_descriptor_file(self, + items, + max_attachments_per_data_row=None, + is_upsert=False): """ This function is shared by both `Dataset.create_data_rows` and `Dataset.create_data_rows_sync` to prepare the input file. The user defined input is validated, processed, and json stringified. @@ -347,6 +350,9 @@ def _create_descriptor_file(self, items, max_attachments_per_data_row=None): AssetAttachment = Entity.AssetAttachment def upload_if_necessary(item): + if is_upsert and 'row_data' not in item: + # When upserting, row_data is not required + return item row_data = item['row_data'] if isinstance(row_data, str) and os.path.exists(row_data): item_url = self.client.upload_file(row_data) @@ -426,7 +432,7 @@ def format_row(item): return item def validate_keys(item): - if 'row_data' not in item: + if not is_upsert and 'row_data' not in item: raise InvalidQueryError( "`row_data` missing when creating DataRow.") @@ -463,7 +469,7 @@ def formatLegacyConversationalData(item): def convert_item(data_row_item): if isinstance(data_row_item, DataRowUpsertItem): - item = data_row_item.payload.dict() + item = data_row_item.payload.dict(exclude_none=True) else: item = data_row_item @@ -793,7 +799,8 @@ def _convert_specs_to_upsert_items(_specs: List[DataRowSpec]): ] manifest = ManifestFile() for chunk in chunks: - manifest.chunk_uris.append(self._create_descriptor_file(chunk)) + manifest.chunk_uris.append( + self._create_descriptor_file(chunk, is_upsert=True)) manifest.item_count += len(chunk) data = json.dumps(manifest.__dict__).encode("utf-8") diff --git a/tests/integration/test_data_rows_upsert.py b/tests/integration/test_data_rows_upsert.py index c1c848973..2fd7887a7 100644 --- a/tests/integration/test_data_rows_upsert.py +++ b/tests/integration/test_data_rows_upsert.py @@ -78,7 +78,7 @@ def test_create_data_row_with_upsert(self, client, dataset, image_url): ) ], metadata=[ - DataRowMetadataSpec(name="tag", value="tag_string"), + DataRowMetadataSpec(name="tag", value="updated tag"), DataRowMetadataSpec(name="split", value="train") ]) ]) @@ -95,12 +95,12 @@ def test_create_data_row_with_upsert(self, client, dataset, image_url): assert len(attachments) == 3 assert attachments[0].attachment_name == "att1" assert attachments[0].attachment_type == AttachmentType.RAW_TEXT - assert attachments[ - 0].attachment_value == "https://storage.googleapis.com/labelbox-sample-datasets/Docs/disease_attachment.jpeg" + assert attachments[0].attachment_value == "test1" assert attachments[1].attachment_name == "att2" assert attachments[1].attachment_type == AttachmentType.IMAGE - assert attachments[1].attachment_value == "test" + assert attachments[ + 1].attachment_value == "https://storage.googleapis.com/labelbox-sample-datasets/Docs/disease_attachment.jpeg" assert attachments[2].attachment_name == "att3" assert attachments[2].attachment_type == AttachmentType.PDF_URL From 8c88fe8bec2e5b7c40019ad686ff69d821012ba7 Mon Sep 17 00:00:00 2001 From: Attila Papai Date: Fri, 22 Mar 2024 19:47:08 +0100 Subject: [PATCH 12/21] finalizing improvements --- labelbox/schema/data_row.py | 1 + labelbox/schema/dataset.py | 36 +++++++++++++++++++++++++++++++++++- 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/labelbox/schema/data_row.py b/labelbox/schema/data_row.py index 0f76c9833..697866d4b 100644 --- a/labelbox/schema/data_row.py +++ b/labelbox/schema/data_row.py @@ -80,6 +80,7 @@ class DataRowAttachmentSpec(BaseModel): class DataRowSpec(BaseModel): key: Optional[DataRowKey] = PydanticField(exclude=True) + dataset_id: Optional[str] row_data: Optional[Union[str, dict]] external_id: Optional[str] global_key: Optional[str] diff --git a/labelbox/schema/dataset.py b/labelbox/schema/dataset.py index 5e6a9504b..6974a9b1d 100644 --- a/labelbox/schema/dataset.py +++ b/labelbox/schema/dataset.py @@ -767,6 +767,40 @@ def _export( return Task.get_task(self.client, task_id) def upsert_data_rows(self, specs: List[DataRowSpec]) -> "Task": + """ + Upserts data rows in this dataset. + + >>> task = dataset.upsert_data_rows([ + >>> # create new data row + >>> DataRowSpec( + >>> row_data="http://my_site.com/photos/img_01.jpg", + >>> global_key="global_key1", + >>> external_id="ex_id1", + >>> attachments=[ + >>> DataRowAttachmentSpec(type=AttachmentType.RAW_TEXT, name="att1", value="test1") + >>> ], + >>> metadata=[ + >>> DataRowMetadataSpec(name="tag", value="tag value"), + >>> ] + >>> ), + >>> # update existing data row by global key + >>> DataRowSpec( + >>> global_key="global_key1", + >>> external_id="ex_id1_updated" + >>> ), + >>> # update global key of data row by existing global key + >>> DataRowSpec( + >>> key=DataRowGlobalKey("global_key1"), + >>> global_key="global_key1_updated" + >>> ), + >>> # update data row by ID + >>> DataRowSpec( + >>> key=DataRowIdKey(dr.uid), + >>> external_id="ex_id1_updated" + >>> ), + >>> ]) + >>> task.wait_till_done() + """ if len(specs) > MAX_DATAROW_PER_API_OPERATION: raise MalformedQueryException( f"Cannot upsert more than {MAX_DATAROW_PER_API_OPERATION} DataRows per function call." @@ -782,7 +816,7 @@ def __init__(self): def _convert_specs_to_upsert_items(_specs: List[DataRowSpec]): _items: List[DataRowUpsertItem] = [] for spec in _specs: - spec.__dict__["dataset_id"] = self.uid + spec.dataset_id = self.uid if spec.key: key = spec.key elif spec.global_key: From 28a44350d9385b433c6d711115c351e39aefa5e3 Mon Sep 17 00:00:00 2001 From: Attila Papai Date: Fri, 22 Mar 2024 20:00:19 +0100 Subject: [PATCH 13/21] add media_type to DataRowSpec with a test --- labelbox/schema/data_row.py | 1 + tests/integration/test_data_rows_upsert.py | 16 ++++++++++++++++ 2 files changed, 17 insertions(+) diff --git a/labelbox/schema/data_row.py b/labelbox/schema/data_row.py index 697866d4b..434d95365 100644 --- a/labelbox/schema/data_row.py +++ b/labelbox/schema/data_row.py @@ -82,6 +82,7 @@ class DataRowSpec(BaseModel): key: Optional[DataRowKey] = PydanticField(exclude=True) dataset_id: Optional[str] row_data: Optional[Union[str, dict]] + media_type: Optional[str] external_id: Optional[str] global_key: Optional[str] metadata: Optional[List[DataRowMetadataSpec]] diff --git a/tests/integration/test_data_rows_upsert.py b/tests/integration/test_data_rows_upsert.py index 2fd7887a7..2007674be 100644 --- a/tests/integration/test_data_rows_upsert.py +++ b/tests/integration/test_data_rows_upsert.py @@ -212,3 +212,19 @@ def test_multiple_chunks(self, client, dataset, image_url): assert data['source'] == "SDK" assert data['item_count'] == 10 assert len(data['chunk_uris']) == 4 + + def test_upsert_embedded_row_data(self, dataset): + pdf_url = "https://lb-test-data.s3.us-west-1.amazonaws.com/document-samples/0801.3483.pdf" + task = dataset.upsert_data_rows([ + DataRowSpec(row_data={ + "pdf_url": + pdf_url, + "text_layer_url": + "https://lb-test-data.s3.us-west-1.amazonaws.com/document-samples/0801.3483-lb-textlayer.json" + }, + media_type="PDF") + ]) + task.wait_till_done() + data_rows = list(dataset.data_rows()) + assert len(data_rows) == 1 + assert data_rows[0].row_data == pdf_url From ea7cd9954ee7adfb4efe4c0b7c73d37143636b8d Mon Sep 17 00:00:00 2001 From: Attila Papai Date: Fri, 22 Mar 2024 20:00:28 +0100 Subject: [PATCH 14/21] add comment --- labelbox/schema/dataset.py | 1 + 1 file changed, 1 insertion(+) diff --git a/labelbox/schema/dataset.py b/labelbox/schema/dataset.py index 6974a9b1d..0c82438e0 100644 --- a/labelbox/schema/dataset.py +++ b/labelbox/schema/dataset.py @@ -816,6 +816,7 @@ def __init__(self): def _convert_specs_to_upsert_items(_specs: List[DataRowSpec]): _items: List[DataRowUpsertItem] = [] for spec in _specs: + # enforce current dataset's id for all specs spec.dataset_id = self.uid if spec.key: key = spec.key From 8e74651969e2b95ebe2334b450232cfb92d67e39 Mon Sep 17 00:00:00 2001 From: Attila Papai Date: Mon, 25 Mar 2024 13:15:27 +0100 Subject: [PATCH 15/21] add test for errors checking --- labelbox/schema/task.py | 6 +++--- tests/integration/test_data_rows_upsert.py | 11 +++++++++++ 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/labelbox/schema/task.py b/labelbox/schema/task.py index 8ba91470c..ed2a49a83 100644 --- a/labelbox/schema/task.py +++ b/labelbox/schema/task.py @@ -66,7 +66,7 @@ def wait_till_done(self, Args: timeout_seconds (float): Maximum time this method can block, in seconds. Defaults to five minutes. - check_frequency (float): Frequency of queries to server to update the task attributes, in seconds. Defaults to two seconds. Minimal value is two seconds. + check_frequency (float): Frequency of queries to server to update the task attributes, in seconds. Defaults to two seconds. Minimal value is two seconds. """ if check_frequency < 2.0: raise ValueError( @@ -90,7 +90,7 @@ def wait_till_done(self, def errors(self) -> Optional[Dict[str, Any]]: """ Fetch the error associated with an import task. """ - if self.name == 'JSON Import': + if self.name == 'JSON Import' or self.type == 'adv-upsert-data-rows': if self.status == "FAILED": result = self._fetch_remote_json() return result["error"] @@ -168,7 +168,7 @@ def download_result(remote_json_field: Optional[str], format: str): "Expected the result format to be either `ndjson` or `json`." ) - if self.name == 'JSON Import': + if self.name == 'JSON Import' or self.type == 'adv-upsert-data-rows': format = 'json' elif self.type == 'export-data-rows': format = 'ndjson' diff --git a/tests/integration/test_data_rows_upsert.py b/tests/integration/test_data_rows_upsert.py index 2007674be..2e16eac53 100644 --- a/tests/integration/test_data_rows_upsert.py +++ b/tests/integration/test_data_rows_upsert.py @@ -228,3 +228,14 @@ def test_upsert_embedded_row_data(self, dataset): data_rows = list(dataset.data_rows()) assert len(data_rows) == 1 assert data_rows[0].row_data == pdf_url + + def test_upsert_duplicate_global_key_error(self, dataset, image_url): + task = dataset.upsert_data_rows([ + DataRowSpec(row_data=image_url, global_key="gk2"), + DataRowSpec(row_data=image_url, global_key="gk2") + ]) + task.wait_till_done() + assert task.status == "COMPLETE" + assert task.errors is not None + assert len(task.errors) == 1 # one data row was created, one failed + assert "Duplicate global key: 'gk2'" in task.errors[0]['message'] From adb494b6af024765c72ecb8c54e91a4d89e4c206 Mon Sep 17 00:00:00 2001 From: Attila Papai Date: Mon, 25 Mar 2024 13:24:24 +0100 Subject: [PATCH 16/21] mangle chunk size constant --- labelbox/schema/dataset.py | 9 +++++---- tests/integration/test_data_rows_upsert.py | 5 +++-- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/labelbox/schema/dataset.py b/labelbox/schema/dataset.py index 0c82438e0..f78f72d2b 100644 --- a/labelbox/schema/dataset.py +++ b/labelbox/schema/dataset.py @@ -1,5 +1,5 @@ from datetime import datetime -from typing import Dict, Generator, List, Optional, Any +from typing import Dict, Generator, List, Optional, Any, Final import os import json import logging @@ -32,7 +32,6 @@ logger = logging.getLogger(__name__) MAX_DATAROW_PER_API_OPERATION = 150_000 -UPSERT_CHUNK_SIZE = 10_000 class Dataset(DbObject, Updateable, Deletable): @@ -48,6 +47,8 @@ class Dataset(DbObject, Updateable, Deletable): created_by (Relationship): `ToOne` relationship to User organization (Relationship): `ToOne` relationship to Organization """ + __upsert_chunk_size: Final = 10_000 + name = Field.String("name") description = Field.String("description") updated_at = Field.DateTime("updated_at") @@ -829,8 +830,8 @@ def _convert_specs_to_upsert_items(_specs: List[DataRowSpec]): items = _convert_specs_to_upsert_items(specs) chunks = [ - items[i:i + UPSERT_CHUNK_SIZE] - for i in range(0, len(items), UPSERT_CHUNK_SIZE) + items[i:i + self.__upsert_chunk_size] + for i in range(0, len(items), self.__upsert_chunk_size) ] manifest = ManifestFile() for chunk in chunks: diff --git a/tests/integration/test_data_rows_upsert.py b/tests/integration/test_data_rows_upsert.py index 2e16eac53..a0eb96435 100644 --- a/tests/integration/test_data_rows_upsert.py +++ b/tests/integration/test_data_rows_upsert.py @@ -193,8 +193,9 @@ def test_multiple_chunks(self, client, dataset, image_url): mocked_chunk_size = 3 with patch('labelbox.client.Client.upload_data', wraps=client.upload_data) as spy_some_function: - with patch('labelbox.schema.dataset.UPSERT_CHUNK_SIZE', - new=mocked_chunk_size): + with patch( + 'labelbox.schema.dataset.Dataset._Dataset__upsert_chunk_size', + new=mocked_chunk_size): task = dataset.upsert_data_rows( [DataRowSpec(row_data=image_url) for i in range(10)]) task.wait_till_done() From 7ec2789255c9b5d58aabc0fe65ad58f88caee9bc Mon Sep 17 00:00:00 2001 From: Attila Papai Date: Mon, 25 Mar 2024 13:41:26 +0100 Subject: [PATCH 17/21] upload chunks in parallel --- labelbox/schema/dataset.py | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/labelbox/schema/dataset.py b/labelbox/schema/dataset.py index f78f72d2b..2b2e7b119 100644 --- a/labelbox/schema/dataset.py +++ b/labelbox/schema/dataset.py @@ -807,13 +807,6 @@ def upsert_data_rows(self, specs: List[DataRowSpec]) -> "Task": f"Cannot upsert more than {MAX_DATAROW_PER_API_OPERATION} DataRows per function call." ) - class ManifestFile: - - def __init__(self): - self.source = "SDK" - self.item_count = 0 - self.chunk_uris: List[str] = [] - def _convert_specs_to_upsert_items(_specs: List[DataRowSpec]): _items: List[DataRowUpsertItem] = [] for spec in _specs: @@ -833,13 +826,23 @@ def _convert_specs_to_upsert_items(_specs: List[DataRowSpec]): items[i:i + self.__upsert_chunk_size] for i in range(0, len(items), self.__upsert_chunk_size) ] - manifest = ManifestFile() - for chunk in chunks: - manifest.chunk_uris.append( - self._create_descriptor_file(chunk, is_upsert=True)) - manifest.item_count += len(chunk) - data = json.dumps(manifest.__dict__).encode("utf-8") + def _upload_chunk(_chunk: list[DataRowUpsertItem]): + return self._create_descriptor_file(_chunk, is_upsert=True) + + file_upload_thread_count = 20 + with ThreadPoolExecutor(file_upload_thread_count) as executor: + futures = [ + executor.submit(_upload_chunk, chunk) for chunk in chunks + ] + chunk_uris = [future.result() for future in as_completed(futures)] + + manifest = { + "source": "SDK", + "item_count": len(items), + "chunk_uris": chunk_uris + } + data = json.dumps(manifest).encode("utf-8") manifest_uri = self.client.upload_data(data, content_type="application/json", filename="manifest.json") From 0eaebc929d01217a36a54303667b2460138e314c Mon Sep 17 00:00:00 2001 From: Attila Papai Date: Mon, 25 Mar 2024 16:37:54 +0100 Subject: [PATCH 18/21] fix mypy --- labelbox/schema/dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/labelbox/schema/dataset.py b/labelbox/schema/dataset.py index 2b2e7b119..659ecb993 100644 --- a/labelbox/schema/dataset.py +++ b/labelbox/schema/dataset.py @@ -827,7 +827,7 @@ def _convert_specs_to_upsert_items(_specs: List[DataRowSpec]): for i in range(0, len(items), self.__upsert_chunk_size) ] - def _upload_chunk(_chunk: list[DataRowUpsertItem]): + def _upload_chunk(_chunk: List[DataRowUpsertItem]): return self._create_descriptor_file(_chunk, is_upsert=True) file_upload_thread_count = 20 From a3acbaa1c33ca2a2016741b661e4d4eb38aa2ab7 Mon Sep 17 00:00:00 2001 From: Attila Papai Date: Tue, 26 Mar 2024 10:09:46 +0100 Subject: [PATCH 19/21] remove pydantic models --- labelbox/schema/data_row.py | 66 --------- labelbox/schema/dataset.py | 100 ++++++++------ tests/integration/test_data_rows_upsert.py | 151 ++++++++++++--------- 3 files changed, 143 insertions(+), 174 deletions(-) diff --git a/labelbox/schema/data_row.py b/labelbox/schema/data_row.py index 1799a455c..1110998ad 100644 --- a/labelbox/schema/data_row.py +++ b/labelbox/schema/data_row.py @@ -1,5 +1,4 @@ import logging -from abc import ABC from enum import Enum from typing import TYPE_CHECKING, List, Optional, Union, Any import json @@ -7,7 +6,6 @@ from labelbox.orm import query from labelbox.orm.db_object import DbObject, Updateable, BulkDeletable, experimental from labelbox.orm.model import Entity, Field, Relationship -from labelbox.pydantic_compat import BaseModel, Field as PydanticField from labelbox.schema.asset_attachment import AttachmentType from labelbox.schema.data_row_metadata import DataRowMetadataField # type: ignore from labelbox.schema.export_filters import DatarowExportFilters, build_filters, validate_at_least_one_of_data_row_ids_or_global_keys @@ -30,70 +28,6 @@ class KeyType(str, Enum): """The key will be auto-generated. Only usable for creates""" -class DataRowKey(BaseModel, ABC): - """ - The DataRowKey class is a unique ID abstraction that allows us to reference - a DataRow by either a Global key or CUID - """ - type: KeyType - value: str - - -class DataRowIdKey(DataRowKey): - """ - This represents a data row identifier key that is provided by Labelbox upon data row creation. - """ - - def __init__(self, value: str): - super().__init__(type=KeyType.ID, value=value) - - -class DataRowGlobalKey(DataRowKey): - """ - This represents a unique data row key that is provided by the user. - """ - - def __init__(self, value: str): - super().__init__(type=KeyType.GKEY, value=value) - - -class DataRowAutoKey(DataRowKey): - """ - This represents a key for a create-only data row. - """ - - def __init__(self): - super().__init__(type=KeyType.AUTO, value="") - - -class DataRowMetadataSpec(BaseModel): - schema_id: Optional[str] - value: Any - name: Optional[str] - - -class DataRowAttachmentSpec(BaseModel): - type: AttachmentType - value: str - name: Optional[str] - - -class DataRowSpec(BaseModel): - key: Optional[DataRowKey] = PydanticField(exclude=True) - dataset_id: Optional[str] - row_data: Optional[Union[str, dict]] - media_type: Optional[str] - external_id: Optional[str] - global_key: Optional[str] - metadata: Optional[List[DataRowMetadataSpec]] - attachments: Optional[List[DataRowAttachmentSpec]] - - -class DataRowUpsertItem(BaseModel): - id: DataRowKey - payload: DataRowSpec - - class DataRow(DbObject, Updateable, BulkDeletable): """ Internal Labelbox representation of a single piece of data (e.g. image, video, text). diff --git a/labelbox/schema/dataset.py b/labelbox/schema/dataset.py index 659ecb993..caf3020b7 100644 --- a/labelbox/schema/dataset.py +++ b/labelbox/schema/dataset.py @@ -22,10 +22,12 @@ from labelbox.orm import query from labelbox.exceptions import MalformedQueryException from labelbox.pagination import PaginatedCollection -from labelbox.schema.data_row import DataRow, DataRowUpsertItem, DataRowSpec, DataRowGlobalKey, DataRowAutoKey +from labelbox.pydantic_compat import BaseModel +from labelbox.schema.data_row import DataRow from labelbox.schema.export_filters import DatasetExportFilters, build_filters from labelbox.schema.export_params import CatalogExportParams, validate_catalog_export_params from labelbox.schema.export_task import ExportTask +from labelbox.schema.identifiable import UniqueId, GlobalKey from labelbox.schema.task import Task from labelbox.schema.user import User @@ -34,6 +36,11 @@ MAX_DATAROW_PER_API_OPERATION = 150_000 +class DataRowUpsertItem(BaseModel): + id: dict + payload: dict + + class Dataset(DbObject, Updateable, Deletable): """ A Dataset is a collection of DataRows. @@ -470,7 +477,7 @@ def formatLegacyConversationalData(item): def convert_item(data_row_item): if isinstance(data_row_item, DataRowUpsertItem): - item = data_row_item.payload.dict(exclude_none=True) + item = data_row_item.payload else: item = data_row_item @@ -493,7 +500,7 @@ def convert_item(data_row_item): item = upload_if_necessary(item) if isinstance(data_row_item, DataRowUpsertItem): - return {'id': data_row_item.id.dict(), 'payload': item} + return {'id': data_row_item.id, 'payload': item} else: return item @@ -767,67 +774,76 @@ def _export( task_id = res["taskId"] return Task.get_task(self.client, task_id) - def upsert_data_rows(self, specs: List[DataRowSpec]) -> "Task": + def upsert_data_rows(self, items) -> "Task": """ Upserts data rows in this dataset. >>> task = dataset.upsert_data_rows([ >>> # create new data row - >>> DataRowSpec( - >>> row_data="http://my_site.com/photos/img_01.jpg", - >>> global_key="global_key1", - >>> external_id="ex_id1", - >>> attachments=[ - >>> DataRowAttachmentSpec(type=AttachmentType.RAW_TEXT, name="att1", value="test1") + >>> { + >>> "row_data": "http://my_site.com/photos/img_01.jpg", + >>> "global_key": "global_key1", + >>> "external_id": "ex_id1", + >>> "attachments": [ + >>> {"type": AttachmentType.RAW_TEXT, "name": "att1", "value": "test1"} >>> ], - >>> metadata=[ - >>> DataRowMetadataSpec(name="tag", value="tag value"), + >>> "metadata": [ + >>> {"name": "tag", "value": "tag value"}, >>> ] - >>> ), + >>> }, >>> # update existing data row by global key - >>> DataRowSpec( - >>> global_key="global_key1", - >>> external_id="ex_id1_updated" - >>> ), + >>> { + >>> "global_key": "global_key1", + >>> "external_id": "ex_id1_updated" + >>> }, >>> # update global key of data row by existing global key - >>> DataRowSpec( - >>> key=DataRowGlobalKey("global_key1"), - >>> global_key="global_key1_updated" - >>> ), + >>> { + >>> "key": GlobalKey("global_key1"), + >>> "global_key": "global_key1_updated" + >>> }, >>> # update data row by ID - >>> DataRowSpec( - >>> key=DataRowIdKey(dr.uid), - >>> external_id="ex_id1_updated" - >>> ), + >>> { + >>> "key": UniqueId(dr.uid), + >>> "external_id": "ex_id1_updated" + >>> }, >>> ]) >>> task.wait_till_done() """ - if len(specs) > MAX_DATAROW_PER_API_OPERATION: + if len(items) > MAX_DATAROW_PER_API_OPERATION: raise MalformedQueryException( f"Cannot upsert more than {MAX_DATAROW_PER_API_OPERATION} DataRows per function call." ) - def _convert_specs_to_upsert_items(_specs: List[DataRowSpec]): - _items: List[DataRowUpsertItem] = [] - for spec in _specs: + def _convert_items_to_upsert_format(_items): + _upsert_items: List[DataRowUpsertItem] = [] + for item in _items: # enforce current dataset's id for all specs - spec.dataset_id = self.uid - if spec.key: - key = spec.key - elif spec.global_key: - key = DataRowGlobalKey(spec.global_key) + item['dataset_id'] = self.uid + if 'key' not in item: + key = {'type': 'AUTO', 'value': ''} + elif isinstance(item['key'], UniqueId): + key = {'type': 'ID', 'value': item['key'].key} + del item['key'] + elif isinstance(item['key'], GlobalKey): + key = {'type': 'GKEY', 'value': item['key'].key} + del item['key'] else: - key = DataRowAutoKey() - _items.append(DataRowUpsertItem(payload=spec, id=key)) - return _items + raise ValueError( + f"Key must be an instance of UniqueId or GlobalKey, got: {type(item['key']).__name__}" + ) + item = { + k: v for k, v in item.items() if v is not None + } # remove None values + _upsert_items.append(DataRowUpsertItem(payload=item, id=key)) + return _upsert_items - items = _convert_specs_to_upsert_items(specs) + specs = _convert_items_to_upsert_format(items) chunks = [ - items[i:i + self.__upsert_chunk_size] - for i in range(0, len(items), self.__upsert_chunk_size) + specs[i:i + self.__upsert_chunk_size] + for i in range(0, len(specs), self.__upsert_chunk_size) ] - def _upload_chunk(_chunk: List[DataRowUpsertItem]): + def _upload_chunk(_chunk): return self._create_descriptor_file(_chunk, is_upsert=True) file_upload_thread_count = 20 @@ -839,7 +855,7 @@ def _upload_chunk(_chunk: List[DataRowUpsertItem]): manifest = { "source": "SDK", - "item_count": len(items), + "item_count": len(specs), "chunk_uris": chunk_uris } data = json.dumps(manifest).encode("utf-8") diff --git a/tests/integration/test_data_rows_upsert.py b/tests/integration/test_data_rows_upsert.py index a0eb96435..e591a39da 100644 --- a/tests/integration/test_data_rows_upsert.py +++ b/tests/integration/test_data_rows_upsert.py @@ -5,8 +5,7 @@ import pytest from labelbox.schema.asset_attachment import AttachmentType -from labelbox.schema.data_row import DataRowSpec, DataRowAttachmentSpec, DataRowIdKey, \ - DataRowGlobalKey, DataRowMetadataSpec +from labelbox.schema.identifiable import UniqueId, GlobalKey class TestDataRowUpsert: @@ -50,38 +49,45 @@ def all_inclusive_data_row(self, dataset, image_url): return dr def test_create_data_row_with_auto_key(self, dataset, image_url): - task = dataset.upsert_data_rows([DataRowSpec(row_data=image_url)]) + task = dataset.upsert_data_rows([{'row_data': image_url}]) task.wait_till_done() assert len(list(dataset.data_rows())) == 1 def test_create_data_row_with_upsert(self, client, dataset, image_url): - task = dataset.upsert_data_rows([ - DataRowSpec( - row_data=image_url, - global_key="gk1", - external_id="ex1", - attachments=[ - DataRowAttachmentSpec(type=AttachmentType.RAW_TEXT, - name="att1", - value="test1"), - DataRowAttachmentSpec( - type=AttachmentType.IMAGE, - name="att2", - value= - "https://storage.googleapis.com/labelbox-sample-datasets/Docs/disease_attachment.jpeg" - ), - DataRowAttachmentSpec( - type=AttachmentType.PDF_URL, - name="att3", - value= - "https://storage.googleapis.com/labelbox-datasets/arxiv-pdf/data/99-word-token-pdfs/0801.3483.pdf" - ) - ], - metadata=[ - DataRowMetadataSpec(name="tag", value="updated tag"), - DataRowMetadataSpec(name="split", value="train") - ]) - ]) + task = dataset.upsert_data_rows([{ + 'row_data': + image_url, + 'global_key': + "gk1", + 'external_id': + "ex1", + 'attachments': [{ + 'type': AttachmentType.RAW_TEXT, + 'name': "att1", + 'value': "test1" + }, { + 'type': + AttachmentType.IMAGE, + 'name': + "att2", + 'value': + "https://storage.googleapis.com/labelbox-sample-datasets/Docs/disease_attachment.jpeg" + }, { + 'type': + AttachmentType.PDF_URL, + 'name': + "att3", + 'value': + "https://storage.googleapis.com/labelbox-datasets/arxiv-pdf/data/99-word-token-pdfs/0801.3483.pdf" + }], + 'metadata': [{ + 'name': "tag", + 'value': "updated tag" + }, { + 'name': "split", + 'value': "train" + }] + }]) task.wait_till_done() assert task.status == "COMPLETE" dr = client.get_data_row_by_global_key("gk1") @@ -118,11 +124,11 @@ def test_update_data_row_fields_with_upsert(self, client, dataset, dr = dataset.create_data_row(row_data=image_url, external_id="ex1", global_key="gk1") - task = dataset.upsert_data_rows([ - DataRowSpec(key=DataRowIdKey(dr.uid), - external_id="ex1_updated", - global_key="gk1_updated") - ]) + task = dataset.upsert_data_rows([{ + 'key': UniqueId(dr.uid), + 'external_id': "ex1_updated", + 'global_key': "gk1_updated" + }]) task.wait_till_done() assert task.status == "COMPLETE" dr = client.get_data_row(dr.uid) @@ -135,11 +141,11 @@ def test_update_data_row_fields_with_upsert_by_global_key( dr = dataset.create_data_row(row_data=image_url, external_id="ex1", global_key="gk1") - task = dataset.upsert_data_rows([ - DataRowSpec(key=DataRowGlobalKey(dr.global_key), - external_id="ex1_updated", - global_key="gk1_updated") - ]) + task = dataset.upsert_data_rows([{ + 'key': GlobalKey(dr.global_key), + 'external_id': "ex1_updated", + 'global_key': "gk1_updated" + }]) task.wait_till_done() assert task.status == "COMPLETE" dr = client.get_data_row(dr.uid) @@ -150,15 +156,17 @@ def test_update_data_row_fields_with_upsert_by_global_key( def test_update_attachments_with_upsert(self, client, all_inclusive_data_row, dataset): dr = all_inclusive_data_row - task = dataset.upsert_data_rows([ - DataRowSpec(key=DataRowIdKey(dr.uid), - row_data=dr.row_data, - attachments=[ - DataRowAttachmentSpec(type=AttachmentType.RAW_TEXT, - name="att1", - value="test") - ]) - ]) + task = dataset.upsert_data_rows([{ + 'key': + UniqueId(dr.uid), + 'row_data': + dr.row_data, + 'attachments': [{ + 'type': AttachmentType.RAW_TEXT, + 'name': "att1", + 'value': "test" + }] + }]) task.wait_till_done() assert task.status == "COMPLETE" dr = client.get_data_row(dr.uid) @@ -170,15 +178,19 @@ def test_update_attachments_with_upsert(self, client, def test_update_metadata_with_upsert(self, client, all_inclusive_data_row, dataset): dr = all_inclusive_data_row - task = dataset.upsert_data_rows([ - DataRowSpec(key=DataRowGlobalKey(dr.global_key), - row_data=dr.row_data, - metadata=[ - DataRowMetadataSpec(name="tag", - value="updated tag"), - DataRowMetadataSpec(name="split", value="train") - ]) - ]) + task = dataset.upsert_data_rows([{ + 'key': + GlobalKey(dr.global_key), + 'row_data': + dr.row_data, + 'metadata': [{ + 'name': "tag", + 'value': "updated tag" + }, { + 'name': "split", + 'value': "train" + }] + }]) task.wait_till_done() assert task.status == "COMPLETE" dr = client.get_data_row(dr.uid) @@ -196,8 +208,9 @@ def test_multiple_chunks(self, client, dataset, image_url): with patch( 'labelbox.schema.dataset.Dataset._Dataset__upsert_chunk_size', new=mocked_chunk_size): - task = dataset.upsert_data_rows( - [DataRowSpec(row_data=image_url) for i in range(10)]) + task = dataset.upsert_data_rows([{ + 'row_data': image_url + } for i in range(10)]) task.wait_till_done() assert len(list(dataset.data_rows())) == 10 assert spy_some_function.call_count == 5 # 4 chunks + manifest @@ -216,15 +229,15 @@ def test_multiple_chunks(self, client, dataset, image_url): def test_upsert_embedded_row_data(self, dataset): pdf_url = "https://lb-test-data.s3.us-west-1.amazonaws.com/document-samples/0801.3483.pdf" - task = dataset.upsert_data_rows([ - DataRowSpec(row_data={ + task = dataset.upsert_data_rows([{ + 'row_data': { "pdf_url": pdf_url, "text_layer_url": "https://lb-test-data.s3.us-west-1.amazonaws.com/document-samples/0801.3483-lb-textlayer.json" }, - media_type="PDF") - ]) + 'media_type': "PDF" + }]) task.wait_till_done() data_rows = list(dataset.data_rows()) assert len(data_rows) == 1 @@ -232,8 +245,14 @@ def test_upsert_embedded_row_data(self, dataset): def test_upsert_duplicate_global_key_error(self, dataset, image_url): task = dataset.upsert_data_rows([ - DataRowSpec(row_data=image_url, global_key="gk2"), - DataRowSpec(row_data=image_url, global_key="gk2") + { + 'row_data': image_url, + 'global_key': "gk2" + }, + { + 'row_data': image_url, + 'global_key': "gk2" + }, ]) task.wait_till_done() assert task.status == "COMPLETE" From 962443217f1a0e80cae3c89df51670d2390880a3 Mon Sep 17 00:00:00 2001 From: Attila Papai Date: Tue, 26 Mar 2024 19:38:36 +0100 Subject: [PATCH 20/21] use unique global keys in tests --- tests/integration/test_data_rows_upsert.py | 28 ++++++++++++---------- 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/tests/integration/test_data_rows_upsert.py b/tests/integration/test_data_rows_upsert.py index e591a39da..2cc893476 100644 --- a/tests/integration/test_data_rows_upsert.py +++ b/tests/integration/test_data_rows_upsert.py @@ -54,11 +54,12 @@ def test_create_data_row_with_auto_key(self, dataset, image_url): assert len(list(dataset.data_rows())) == 1 def test_create_data_row_with_upsert(self, client, dataset, image_url): + gkey = str(uuid.uuid4()) task = dataset.upsert_data_rows([{ 'row_data': image_url, 'global_key': - "gk1", + gkey, 'external_id': "ex1", 'attachments': [{ @@ -90,11 +91,11 @@ def test_create_data_row_with_upsert(self, client, dataset, image_url): }]) task.wait_till_done() assert task.status == "COMPLETE" - dr = client.get_data_row_by_global_key("gk1") + dr = client.get_data_row_by_global_key(gkey) assert dr is not None assert dr.row_data == image_url - assert dr.global_key == "gk1" + assert dr.global_key == gkey assert dr.external_id == "ex1" attachments = list(dr.attachments()) @@ -121,37 +122,39 @@ def test_create_data_row_with_upsert(self, client, dataset, image_url): def test_update_data_row_fields_with_upsert(self, client, dataset, image_url): + gkey = str(uuid.uuid4()) dr = dataset.create_data_row(row_data=image_url, external_id="ex1", - global_key="gk1") + global_key=gkey) task = dataset.upsert_data_rows([{ 'key': UniqueId(dr.uid), 'external_id': "ex1_updated", - 'global_key': "gk1_updated" + 'global_key': f"{gkey}_updated" }]) task.wait_till_done() assert task.status == "COMPLETE" dr = client.get_data_row(dr.uid) assert dr is not None assert dr.external_id == "ex1_updated" - assert dr.global_key == "gk1_updated" + assert dr.global_key == f"{gkey}_updated" def test_update_data_row_fields_with_upsert_by_global_key( self, client, dataset, image_url): + gkey = str(uuid.uuid4()) dr = dataset.create_data_row(row_data=image_url, external_id="ex1", - global_key="gk1") + global_key=gkey) task = dataset.upsert_data_rows([{ 'key': GlobalKey(dr.global_key), 'external_id': "ex1_updated", - 'global_key': "gk1_updated" + 'global_key': f"{gkey}_updated" }]) task.wait_till_done() assert task.status == "COMPLETE" dr = client.get_data_row(dr.uid) assert dr is not None assert dr.external_id == "ex1_updated" - assert dr.global_key == "gk1_updated" + assert dr.global_key == f"{gkey}_updated" def test_update_attachments_with_upsert(self, client, all_inclusive_data_row, dataset): @@ -244,18 +247,19 @@ def test_upsert_embedded_row_data(self, dataset): assert data_rows[0].row_data == pdf_url def test_upsert_duplicate_global_key_error(self, dataset, image_url): + gkey = str(uuid.uuid4()) task = dataset.upsert_data_rows([ { 'row_data': image_url, - 'global_key': "gk2" + 'global_key': gkey }, { 'row_data': image_url, - 'global_key': "gk2" + 'global_key': gkey }, ]) task.wait_till_done() assert task.status == "COMPLETE" assert task.errors is not None assert len(task.errors) == 1 # one data row was created, one failed - assert "Duplicate global key: 'gk2'" in task.errors[0]['message'] + assert f"Duplicate global key: '{gkey}'" in task.errors[0]['message'] From 410463dd5ddfa1271eb0691ed27bb2dd9d2b3d65 Mon Sep 17 00:00:00 2001 From: Attila Papai Date: Wed, 27 Mar 2024 11:03:36 +0100 Subject: [PATCH 21/21] improvements 2 --- labelbox/schema/dataset.py | 58 +++++++++++++++++--------------------- 1 file changed, 26 insertions(+), 32 deletions(-) diff --git a/labelbox/schema/dataset.py b/labelbox/schema/dataset.py index caf3020b7..47fa658b7 100644 --- a/labelbox/schema/dataset.py +++ b/labelbox/schema/dataset.py @@ -774,9 +774,10 @@ def _export( task_id = res["taskId"] return Task.get_task(self.client, task_id) - def upsert_data_rows(self, items) -> "Task": + def upsert_data_rows(self, items, file_upload_thread_count=20) -> "Task": """ - Upserts data rows in this dataset. + Upserts data rows in this dataset. When "key" is provided, and it references an existing data row, + an update will be performed. When "key" is not provided a new data row will be created. >>> task = dataset.upsert_data_rows([ >>> # create new data row @@ -791,11 +792,6 @@ def upsert_data_rows(self, items) -> "Task": >>> {"name": "tag", "value": "tag value"}, >>> ] >>> }, - >>> # update existing data row by global key - >>> { - >>> "global_key": "global_key1", - >>> "external_id": "ex_id1_updated" - >>> }, >>> # update global key of data row by existing global key >>> { >>> "key": GlobalKey("global_key1"), @@ -814,30 +810,7 @@ def upsert_data_rows(self, items) -> "Task": f"Cannot upsert more than {MAX_DATAROW_PER_API_OPERATION} DataRows per function call." ) - def _convert_items_to_upsert_format(_items): - _upsert_items: List[DataRowUpsertItem] = [] - for item in _items: - # enforce current dataset's id for all specs - item['dataset_id'] = self.uid - if 'key' not in item: - key = {'type': 'AUTO', 'value': ''} - elif isinstance(item['key'], UniqueId): - key = {'type': 'ID', 'value': item['key'].key} - del item['key'] - elif isinstance(item['key'], GlobalKey): - key = {'type': 'GKEY', 'value': item['key'].key} - del item['key'] - else: - raise ValueError( - f"Key must be an instance of UniqueId or GlobalKey, got: {type(item['key']).__name__}" - ) - item = { - k: v for k, v in item.items() if v is not None - } # remove None values - _upsert_items.append(DataRowUpsertItem(payload=item, id=key)) - return _upsert_items - - specs = _convert_items_to_upsert_format(items) + specs = self._convert_items_to_upsert_format(items) chunks = [ specs[i:i + self.__upsert_chunk_size] for i in range(0, len(specs), self.__upsert_chunk_size) @@ -846,7 +819,6 @@ def _convert_items_to_upsert_format(_items): def _upload_chunk(_chunk): return self._create_descriptor_file(_chunk, is_upsert=True) - file_upload_thread_count = 20 with ThreadPoolExecutor(file_upload_thread_count) as executor: futures = [ executor.submit(_upload_chunk, chunk) for chunk in chunks @@ -876,3 +848,25 @@ def _upload_chunk(_chunk): task = Task(self.client, res) task._user = self.client.get_user() return task + + def _convert_items_to_upsert_format(self, _items): + _upsert_items: List[DataRowUpsertItem] = [] + for item in _items: + # enforce current dataset's id for all specs + item['dataset_id'] = self.uid + key = item.pop('key', None) + if not key: + key = {'type': 'AUTO', 'value': ''} + elif isinstance(key, UniqueId): + key = {'type': 'ID', 'value': key.key} + elif isinstance(key, GlobalKey): + key = {'type': 'GKEY', 'value': key.key} + else: + raise ValueError( + f"Key must be an instance of UniqueId or GlobalKey, got: {type(item['key']).__name__}" + ) + item = { + k: v for k, v in item.items() if v is not None + } # remove None values + _upsert_items.append(DataRowUpsertItem(payload=item, id=key)) + return _upsert_items