From 13f2ef92c506771b1409c1fe7ec8478491004ff0 Mon Sep 17 00:00:00 2001 From: Val Brodsky Date: Tue, 26 Dec 2023 10:25:53 -0800 Subject: [PATCH] Support global keys for bulk delete --- labelbox/schema/data_row_metadata.py | 84 +++++- .../test_data_row_delete_metadata.py | 269 ++++++++++++++++++ tests/integration/test_data_row_metadata.py | 99 +------ ...est_unit_delete_batch_data_row_metadata.py | 56 ++++ 4 files changed, 403 insertions(+), 105 deletions(-) create mode 100644 tests/integration/test_data_row_delete_metadata.py create mode 100644 tests/unit/test_unit_delete_batch_data_row_metadata.py diff --git a/labelbox/schema/data_row_metadata.py b/labelbox/schema/data_row_metadata.py index 93ec2ad79..3b4c2c8ef 100644 --- a/labelbox/schema/data_row_metadata.py +++ b/labelbox/schema/data_row_metadata.py @@ -9,9 +9,10 @@ from pydantic import BaseModel, conlist, constr from labelbox.schema.identifiables import DataRowIdentifiers, UniqueIds +from labelbox.schema.identifiable import UniqueId, GlobalKey from labelbox.schema.ontology import SchemaId -from labelbox.utils import _CamelCaseMixin, format_iso_datetime, format_iso_from_string +from labelbox.utils import _CamelCaseMixin, camel_case, format_iso_datetime, format_iso_from_string class DataRowMetadataKind(Enum): @@ -57,9 +58,12 @@ class DataRowMetadata(_CamelCaseMixin): class DeleteDataRowMetadata(_CamelCaseMixin): - data_row_id: str + data_row_id: Union[str, UniqueId, GlobalKey] fields: List[SchemaId] + class Config: + arbitrary_types_allowed = True + class DataRowMetadataBatchResponse(_CamelCaseMixin): global_key: Optional[str] @@ -86,9 +90,28 @@ class _UpsertBatchDataRowMetadata(_CamelCaseMixin): class _DeleteBatchDataRowMetadata(_CamelCaseMixin): - data_row_id: str + data_row_identifier: Union[UniqueId, GlobalKey] schema_ids: List[SchemaId] + class Config: + arbitrary_types_allowed = True + alias_generator = camel_case + + def dict(self, *args, **kwargs): + res = super().dict(*args, **kwargs) + if 'data_row_identifier' in res.keys(): + key = 'data_row_identifier' + id_type_key = 'id_type' + else: + key = 'dataRowIdentifier' + id_type_key = 'idType' + data_row_identifier = res.pop(key) + res[key] = { + "id": data_row_identifier.key, + id_type_key: data_row_identifier.id_type + } + return res + _BatchInputs = Union[List[_UpsertBatchDataRowMetadata], List[_DeleteBatchDataRowMetadata]] @@ -556,7 +579,17 @@ def bulk_delete( """ Delete metadata from a datarow by specifiying the fields you want to remove >>> delete = DeleteDataRowMetadata( - >>> data_row_id="datarow-id", + >>> data_row_id=UniqueId("datarow-id"), + >>> fields=[ + >>> "schema-id-1", + >>> "schema-id-2" + >>> ... + >>> ] + >>> ) + >>> mdo.batch_delete([metadata]) + + >>> delete = DeleteDataRowMetadata( + >>> data_row_id=GlobalKey("global-key"), >>> fields=[ >>> "schema-id-1", >>> "schema-id-2" @@ -565,8 +598,22 @@ def bulk_delete( >>> ) >>> mdo.batch_delete([metadata]) + >>> delete = DeleteDataRowMetadata( + >>> data_row_id="global-key", + >>> fields=[ + >>> "schema-id-1", + >>> "schema-id-2" + >>> ... + >>> ] + >>> ) + >>> mdo.batch_delete([metadata]) + + Args: deletes: Data row and schema ids to delete + For data row, we support UniqueId, str, and GlobalKey. + If you pass a str, we will assume it is a UniqueId + Do not pass a mix of data row ids and global keys in the same list Returns: list of unsuccessful deletions. @@ -575,13 +622,34 @@ def bulk_delete( """ if not len(deletes): - raise ValueError("Empty list passed") + raise ValueError("The 'deletes' list cannot be empty.") + + passed_strings = False + for i, delete in enumerate(deletes): + if isinstance(delete.data_row_id, str): + passed_strings = True + deletes[i] = DeleteDataRowMetadata(data_row_id=UniqueId( + delete.data_row_id), + fields=delete.fields) + elif isinstance(delete.data_row_id, UniqueId): + continue + elif isinstance(delete.data_row_id, GlobalKey): + continue + else: + raise ValueError( + f"Invalid data row identifier type '{type(delete.data_row_id)}' for '{delete.data_row_id}'" + ) + + if passed_strings: + warnings.warn( + "Using string for data row id will be deprecated. Please use " + "UniqueId instead.") def _batch_delete( deletes: List[_DeleteBatchDataRowMetadata] ) -> List[DataRowMetadataBatchResponse]: - query = """mutation DeleteDataRowMetadataBetaPyApi($deletes: [DataRowCustomMetadataBatchDeleteInput!]!) { - deleteDataRowCustomMetadata(data: $deletes) { + query = """mutation DeleteDataRowMetadataBetaPyApi($deletes: [DataRowIdentifierCustomMetadataBatchDeleteInput!]) { + deleteDataRowCustomMetadata(dataRowIdentifiers: $deletes) { dataRowId error fields { @@ -810,7 +878,7 @@ def _validate_delete(self, delete: DeleteDataRowMetadata): deletes.add(schema.uid) return _DeleteBatchDataRowMetadata( - data_row_id=delete.data_row_id, + data_row_identifier=delete.data_row_id, schema_ids=list(delete.fields)).dict(by_alias=True) def _validate_custom_schema_by_name(self, diff --git a/tests/integration/test_data_row_delete_metadata.py b/tests/integration/test_data_row_delete_metadata.py new file mode 100644 index 000000000..e7859b41a --- /dev/null +++ b/tests/integration/test_data_row_delete_metadata.py @@ -0,0 +1,269 @@ +from datetime import datetime +import uuid + +import pytest + +from labelbox import DataRow, Dataset +from labelbox.exceptions import MalformedQueryException +from labelbox.schema.data_row_metadata import DataRowMetadataField, DataRowMetadata, DataRowMetadataKind, DeleteDataRowMetadata +from labelbox.schema.identifiable import GlobalKey, UniqueId + +INVALID_SCHEMA_ID = "1" * 25 +FAKE_SCHEMA_ID = "0" * 25 +FAKE_DATAROW_ID = "D" * 25 +SPLIT_SCHEMA_ID = "cko8sbczn0002h2dkdaxb5kal" +TRAIN_SPLIT_ID = "cko8sbscr0003h2dk04w86hof" +TEST_SPLIT_ID = "cko8scbz70005h2dkastwhgqt" +TEXT_SCHEMA_ID = "cko8s9r5v0001h2dk9elqdidh" +CAPTURE_DT_SCHEMA_ID = "cko8sdzv70006h2dk8jg64zvb" +CUSTOM_TEXT_SCHEMA_NAME = 'custom_text' + +FAKE_NUMBER_FIELD = { + "id": FAKE_SCHEMA_ID, + "name": "number", + "kind": 'CustomMetadataNumber', + "reserved": False +} + + +@pytest.fixture +def mdo(client): + mdo = client.get_data_row_metadata_ontology() + try: + mdo.create_schema(CUSTOM_TEXT_SCHEMA_NAME, DataRowMetadataKind.string) + except MalformedQueryException: + # Do nothing if already exists + pass + mdo._raw_ontology = mdo._get_ontology() + mdo._raw_ontology.append(FAKE_NUMBER_FIELD) + mdo._build_ontology() + yield mdo + + +@pytest.fixture +def big_dataset(dataset: Dataset, image_url): + task = dataset.create_data_rows([ + { + "row_data": image_url, + "external_id": "my-image", + "global_key": str(uuid.uuid4()) + }, + ] * 5) + task.wait_till_done() + + yield dataset + + +def make_metadata(dr_id: str = None, gk: str = None) -> DataRowMetadata: + msg = "A message" + time = datetime.utcnow() + + metadata = DataRowMetadata( + global_key=gk, + data_row_id=dr_id, + fields=[ + DataRowMetadataField(schema_id=SPLIT_SCHEMA_ID, + value=TEST_SPLIT_ID), + DataRowMetadataField(schema_id=CAPTURE_DT_SCHEMA_ID, value=time), + DataRowMetadataField(schema_id=TEXT_SCHEMA_ID, value=msg), + ]) + return metadata + + +def make_named_metadata(dr_id) -> DataRowMetadata: + msg = "A message" + time = datetime.utcnow() + + metadata = DataRowMetadata(data_row_id=dr_id, + fields=[ + DataRowMetadataField(name='split', + value=TEST_SPLIT_ID), + DataRowMetadataField(name='captureDateTime', + value=time), + DataRowMetadataField( + name=CUSTOM_TEXT_SCHEMA_NAME, value=msg), + ]) + return metadata + + +def test_bulk_delete_datarow_metadata(data_row, mdo): + """test bulk deletes for all fields""" + metadata = make_metadata(data_row.uid) + mdo.bulk_upsert([metadata]) + assert len(mdo.bulk_export([data_row.uid])[0].fields) + upload_ids = [m.schema_id for m in metadata.fields[:-2]] + mdo.bulk_delete( + [DeleteDataRowMetadata(data_row_id=data_row.uid, fields=upload_ids)]) + remaining_ids = set( + [f.schema_id for f in mdo.bulk_export([data_row.uid])[0].fields]) + assert not len(remaining_ids.intersection(set(upload_ids))) + + +@pytest.fixture +def data_row_unique_id(data_row): + return UniqueId(data_row.uid) + + +@pytest.fixture +def data_row_global_key(data_row): + return GlobalKey(data_row.global_key) + + +@pytest.fixture +def data_row_id_as_str(data_row): + return data_row.uid + + +@pytest.mark.parametrize( + 'data_row_for_delete', + ['data_row_id_as_str', 'data_row_unique_id', 'data_row_global_key']) +def test_bulk_delete_datarow_metadata(data_row_for_delete, data_row, mdo, + request): + """test bulk deletes for all fields""" + metadata = make_metadata(data_row.uid) + mdo.bulk_upsert([metadata]) + assert len(mdo.bulk_export([data_row.uid])[0].fields) + upload_ids = [m.schema_id for m in metadata.fields[:-2]] + mdo.bulk_delete([ + DeleteDataRowMetadata( + data_row_id=request.getfixturevalue(data_row_for_delete), + fields=upload_ids) + ]) + remaining_ids = set( + [f.schema_id for f in mdo.bulk_export([data_row.uid])[0].fields]) + assert not len(remaining_ids.intersection(set(upload_ids))) + + +@pytest.mark.parametrize( + 'data_row_for_delete', + ['data_row_id_as_str', 'data_row_unique_id', 'data_row_global_key']) +def test_bulk_partial_delete_datarow_metadata(data_row_for_delete, data_row, + mdo, request): + """Delete a single from metadata""" + n_fields = len(mdo.bulk_export([data_row.uid])[0].fields) + metadata = make_metadata(data_row.uid) + mdo.bulk_upsert([metadata]) + + assert len(mdo.bulk_export( + [data_row.uid])[0].fields) == (n_fields + len(metadata.fields)) + + mdo.bulk_delete([ + DeleteDataRowMetadata( + data_row_id=request.getfixturevalue(data_row_for_delete), + fields=[TEXT_SCHEMA_ID]) + ]) + fields = [f for f in mdo.bulk_export([data_row.uid])[0].fields] + assert len(fields) == (len(metadata.fields) - 1) + + +@pytest.fixture +def data_row_unique_ids(big_dataset): + deletes = [] + data_row_ids = [dr.uid for dr in big_dataset.data_rows()] + + for data_row_id in data_row_ids: + deletes.append( + DeleteDataRowMetadata( + data_row_id=UniqueId(data_row_id), + fields=[SPLIT_SCHEMA_ID, CAPTURE_DT_SCHEMA_ID])) + return deletes + + +@pytest.fixture +def data_row_ids_as_str(big_dataset): + deletes = [] + data_row_ids = [dr.uid for dr in big_dataset.data_rows()] + + for data_row_id in data_row_ids: + deletes.append( + DeleteDataRowMetadata( + data_row_id=data_row_id, + fields=[SPLIT_SCHEMA_ID, CAPTURE_DT_SCHEMA_ID])) + return deletes + + +@pytest.fixture +def data_row_global_keys(big_dataset): + deletes = [] + global_keys = [dr.global_key for dr in big_dataset.data_rows()] + + for data_row_id in global_keys: + deletes.append( + DeleteDataRowMetadata( + data_row_id=GlobalKey(data_row_id), + fields=[SPLIT_SCHEMA_ID, CAPTURE_DT_SCHEMA_ID])) + return deletes + + +@pytest.mark.parametrize( + 'data_rows_for_delete', + ['data_row_ids_as_str', 'data_row_unique_ids', 'data_row_global_keys']) +def test_large_bulk_delete_datarow_metadata(data_rows_for_delete, big_dataset, + mdo, request): + metadata = [] + data_row_ids = [dr.uid for dr in big_dataset.data_rows()] + for data_row_id in data_row_ids: + metadata.append( + DataRowMetadata(data_row_id=data_row_id, + fields=[ + DataRowMetadataField(schema_id=SPLIT_SCHEMA_ID, + value=TEST_SPLIT_ID), + DataRowMetadataField(schema_id=TEXT_SCHEMA_ID, + value="test-message") + ])) + errors = mdo.bulk_upsert(metadata) + assert len(errors) == 0 + + deletes = request.getfixturevalue(data_rows_for_delete) + errors = mdo.bulk_delete(deletes) + + assert len(errors) == len(data_row_ids) + for error in errors: + assert error.fields == [CAPTURE_DT_SCHEMA_ID] + assert error.error == 'Schema did not exist' + + for data_row_id in data_row_ids: + fields = [f for f in mdo.bulk_export([data_row_id])[0].fields] + assert len(fields) == 1, fields + assert SPLIT_SCHEMA_ID not in [field.schema_id for field in fields] + + +@pytest.mark.parametrize( + 'data_row_for_delete', + ['data_row_id_as_str', 'data_row_unique_id', 'data_row_global_key']) +def test_bulk_delete_datarow_enum_metadata(data_row_for_delete, + data_row: DataRow, mdo, request): + """test bulk deletes for non non fields""" + metadata = make_metadata(data_row.uid) + metadata.fields = [ + m for m in metadata.fields if m.schema_id == SPLIT_SCHEMA_ID + ] + mdo.bulk_upsert([metadata]) + + exported = mdo.bulk_export([data_row.uid])[0].fields + assert len(exported) == len( + set([x.schema_id for x in metadata.fields] + + [x.schema_id for x in exported])) + + mdo.bulk_delete([ + DeleteDataRowMetadata( + data_row_id=request.getfixturevalue(data_row_for_delete), + fields=[SPLIT_SCHEMA_ID]) + ]) + exported = mdo.bulk_export([data_row.uid])[0].fields + assert len(exported) == 0 + + +@pytest.mark.parametrize( + 'data_row_for_delete', + ['data_row_id_as_str', 'data_row_unique_id', 'data_row_global_key']) +def test_delete_non_existent_schema_id(data_row_for_delete, data_row, mdo, + request): + res = mdo.bulk_delete([ + DeleteDataRowMetadata( + data_row_id=request.getfixturevalue(data_row_for_delete), + fields=[SPLIT_SCHEMA_ID]) + ]) + assert len(res) == 1 + assert res[0].fields == [SPLIT_SCHEMA_ID] + assert res[0].error == 'Schema did not exist' diff --git a/tests/integration/test_data_row_metadata.py b/tests/integration/test_data_row_metadata.py index bc9959a2b..037296103 100644 --- a/tests/integration/test_data_row_metadata.py +++ b/tests/integration/test_data_row_metadata.py @@ -3,11 +3,10 @@ import pytest import uuid -from labelbox import DataRow, Dataset +from labelbox import Dataset from labelbox.exceptions import MalformedQueryException -from labelbox.schema.data_row_metadata import DataRowMetadataField, DataRowMetadata, DataRowMetadataKind, DeleteDataRowMetadata, \ - DataRowMetadataOntology, _parse_metadata_schema from labelbox.schema.identifiables import GlobalKeys, UniqueIds +from labelbox.schema.data_row_metadata import DataRowMetadataField, DataRowMetadata, DataRowMetadataKind, DataRowMetadataOntology, _parse_metadata_schema INVALID_SCHEMA_ID = "1" * 25 FAKE_SCHEMA_ID = "0" * 25 @@ -209,90 +208,6 @@ def test_upsert_datarow_metadata_option_by_incorrect_name(data_row, mdo): mdo.bulk_upsert([metadata]) -def test_bulk_delete_datarow_metadata(data_row, mdo): - """test bulk deletes for all fields""" - metadata = make_metadata(data_row.uid) - mdo.bulk_upsert([metadata]) - assert len(mdo.bulk_export([data_row.uid])[0].fields) - upload_ids = [m.schema_id for m in metadata.fields[:-2]] - mdo.bulk_delete( - [DeleteDataRowMetadata(data_row_id=data_row.uid, fields=upload_ids)]) - remaining_ids = set( - [f.schema_id for f in mdo.bulk_export([data_row.uid])[0].fields]) - assert not len(remaining_ids.intersection(set(upload_ids))) - - -def test_bulk_partial_delete_datarow_metadata(data_row, mdo): - """Delete a single from metadata""" - n_fields = len(mdo.bulk_export([data_row.uid])[0].fields) - metadata = make_metadata(data_row.uid) - mdo.bulk_upsert([metadata]) - - assert len(mdo.bulk_export( - [data_row.uid])[0].fields) == (n_fields + len(metadata.fields)) - - mdo.bulk_delete([ - DeleteDataRowMetadata(data_row_id=data_row.uid, fields=[TEXT_SCHEMA_ID]) - ]) - fields = [f for f in mdo.bulk_export([data_row.uid])[0].fields] - assert len(fields) == (len(metadata.fields) - 1) - - -def test_large_bulk_delete_datarow_metadata(big_dataset, mdo): - metadata = [] - data_row_ids = [dr.uid for dr in big_dataset.data_rows()] - for data_row_id in data_row_ids: - metadata.append( - DataRowMetadata(data_row_id=data_row_id, - fields=[ - DataRowMetadataField(schema_id=SPLIT_SCHEMA_ID, - value=TEST_SPLIT_ID), - DataRowMetadataField(schema_id=TEXT_SCHEMA_ID, - value="test-message") - ])) - errors = mdo.bulk_upsert(metadata) - assert len(errors) == 0 - - deletes = [] - for data_row_id in data_row_ids: - deletes.append( - DeleteDataRowMetadata( - data_row_id=data_row_id, - fields=[SPLIT_SCHEMA_ID, CAPTURE_DT_SCHEMA_ID])) - errors = mdo.bulk_delete(deletes) - - assert len(errors) == len(data_row_ids) - for error in errors: - assert error.fields == [CAPTURE_DT_SCHEMA_ID] - assert error.error == 'Schema did not exist' - - for data_row_id in data_row_ids: - fields = [f for f in mdo.bulk_export([data_row_id])[0].fields] - assert len(fields) == 1, fields - assert SPLIT_SCHEMA_ID not in [field.schema_id for field in fields] - - -def test_bulk_delete_datarow_enum_metadata(data_row: DataRow, mdo): - """test bulk deletes for non non fields""" - metadata = make_metadata(data_row.uid) - metadata.fields = [ - m for m in metadata.fields if m.schema_id == SPLIT_SCHEMA_ID - ] - mdo.bulk_upsert([metadata]) - - exported = mdo.bulk_export([data_row.uid])[0].fields - assert len(exported) == len( - set([x.schema_id for x in metadata.fields] + - [x.schema_id for x in exported])) - - mdo.bulk_delete([ - DeleteDataRowMetadata(data_row_id=data_row.uid, - fields=[SPLIT_SCHEMA_ID]) - ]) - exported = mdo.bulk_export([data_row.uid])[0].fields - assert len(exported) == 0 - - def test_raise_enum_upsert_schema_error(data_row, mdo): """Setting an option id as the schema id will raise a Value Error""" @@ -317,16 +232,6 @@ def test_upsert_non_existent_schema_id(data_row, mdo): mdo.bulk_upsert([metadata]) -def test_delete_non_existent_schema_id(data_row, mdo): - res = mdo.bulk_delete([ - DeleteDataRowMetadata(data_row_id=data_row.uid, - fields=[SPLIT_SCHEMA_ID]) - ]) - assert len(res) == 1 - assert res[0].fields == [SPLIT_SCHEMA_ID] - assert res[0].error == 'Schema did not exist' - - def test_parse_raw_metadata(mdo): example = { 'dataRowId': diff --git a/tests/unit/test_unit_delete_batch_data_row_metadata.py b/tests/unit/test_unit_delete_batch_data_row_metadata.py new file mode 100644 index 000000000..d1a901230 --- /dev/null +++ b/tests/unit/test_unit_delete_batch_data_row_metadata.py @@ -0,0 +1,56 @@ +from re import U + +from labelbox.schema.data_row_metadata import _DeleteBatchDataRowMetadata +from labelbox.schema.identifiable import GlobalKey, UniqueId + + +def test_dict_delete_data_row_batch(): + obj = _DeleteBatchDataRowMetadata( + data_row_identifier=UniqueId("abcd"), + schema_ids=["clqh77tyk000008l2a9mjesa1", "clqh784br000008jy0yuq04fy"]) + assert obj.dict() == { + "data_row_identifier": { + "id": "abcd", + "id_type": "ID" + }, + "schema_ids": [ + "clqh77tyk000008l2a9mjesa1", "clqh784br000008jy0yuq04fy" + ] + } + + obj = _DeleteBatchDataRowMetadata( + data_row_identifier=GlobalKey("fegh"), + schema_ids=["clqh77tyk000008l2a9mjesa1", "clqh784br000008jy0yuq04fy"]) + assert obj.dict() == { + "data_row_identifier": { + "id": "fegh", + "id_type": "GKEY" + }, + "schema_ids": [ + "clqh77tyk000008l2a9mjesa1", "clqh784br000008jy0yuq04fy" + ] + } + + +def test_dict_delete_data_row_batch_by_alias(): + obj = _DeleteBatchDataRowMetadata( + data_row_identifier=UniqueId("abcd"), + schema_ids=["clqh77tyk000008l2a9mjesa1", "clqh784br000008jy0yuq04fy"]) + assert obj.dict(by_alias=True) == { + "dataRowIdentifier": { + "id": "abcd", + "idType": "ID" + }, + "schemaIds": ["clqh77tyk000008l2a9mjesa1", "clqh784br000008jy0yuq04fy"] + } + + obj = _DeleteBatchDataRowMetadata( + data_row_identifier=GlobalKey("fegh"), + schema_ids=["clqh77tyk000008l2a9mjesa1", "clqh784br000008jy0yuq04fy"]) + assert obj.dict(by_alias=True) == { + "dataRowIdentifier": { + "id": "fegh", + "idType": "GKEY" + }, + "schemaIds": ["clqh77tyk000008l2a9mjesa1", "clqh784br000008jy0yuq04fy"] + }