-
Notifications
You must be signed in to change notification settings - Fork 68
[DIAG-944] Batch Mode #328
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
18 commits
Select commit
Hold shift + click to select a range
dd771af
ADD: beta batch queue methods
fa0062f
ADD: beta batch queue methods
2c00f56
FIX: dequeue method
023c7ba
FIX: dequeue method
b8e6ae1
Merge branch 'develop' into gj/batch-mode
4dd8029
CHG: update methods for GA
6212be5
Format and fix bugs
93cdfaf
CHG: move into update statement
3f927dc
CHG: remove internal flag
d3f4837
CHG: data row ids and require batch mode
97c00ad
CHG: address comments
8b6b7a2
FIX: gql formatting
8ad5b45
FIX: change queue mode tests
e5d196f
remove method
8be9e0d
Update project.py
89a9b00
Address pr comments
9760126
Format and use test dir
e0ed0b6
FIX: import
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,27 +1,31 @@ | ||
| import enum | ||
| import json | ||
| import time | ||
| import logging | ||
| import time | ||
| import warnings | ||
| from collections import namedtuple | ||
| from datetime import datetime, timezone | ||
| from pathlib import Path | ||
| from typing import Dict, Union, Iterable | ||
| from typing import Dict, Union, Iterable, List, Optional | ||
| from urllib.parse import urlparse | ||
| import requests | ||
|
|
||
| import ndjson | ||
| import requests | ||
|
|
||
| from labelbox import utils | ||
| from labelbox.schema.data_row import DataRow | ||
| from labelbox.orm import query | ||
| from labelbox.schema.bulk_import_request import BulkImportRequest | ||
| from labelbox.exceptions import InvalidQueryError, LabelboxError | ||
| from labelbox.orm import query | ||
| from labelbox.orm.db_object import DbObject, Updateable, Deletable | ||
| from labelbox.orm.model import Entity, Field, Relationship | ||
| from labelbox.pagination import PaginatedCollection | ||
| from labelbox.schema.bulk_import_request import BulkImportRequest | ||
| from labelbox.schema.data_row import DataRow | ||
|
|
||
| try: | ||
| datetime.fromisoformat # type: ignore[attr-defined] | ||
| except AttributeError: | ||
| from backports.datetime_fromisoformat import MonkeyPatch | ||
|
|
||
| MonkeyPatch.patch_fromisoformat() | ||
|
|
||
| try: | ||
|
|
@@ -31,6 +35,19 @@ | |
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| MAX_QUEUE_BATCH_SIZE = 1000 | ||
|
|
||
|
|
||
| class QueueMode(enum.Enum): | ||
| Batch = "Batch" | ||
| Dataset = "Dataset" | ||
|
|
||
|
|
||
| class QueueErrors(enum.Enum): | ||
| InvalidDataRowType = 'InvalidDataRowType' | ||
| AlreadyInProject = 'AlreadyInProject' | ||
| HasAttachedLabel = 'HasAttachedLabel' | ||
|
|
||
|
|
||
| class Project(DbObject, Updateable, Deletable): | ||
| """ A Project is a container that includes a labeling frontend, an ontology, | ||
|
|
@@ -79,6 +96,14 @@ class Project(DbObject, Updateable, Deletable): | |
| benchmarks = Relationship.ToMany("Benchmark", False) | ||
| ontology = Relationship.ToOne("Ontology", True) | ||
|
|
||
| def update(self, **kwargs): | ||
|
|
||
| mode: Optional[QueueMode] = kwargs.pop("queue_mode", None) | ||
| if mode: | ||
| self._update_queue_mode(mode) | ||
|
|
||
| return super().update(**kwargs) | ||
|
|
||
| def members(self): | ||
| """ Fetch all current members for this project | ||
|
|
||
|
|
@@ -407,14 +432,14 @@ def setup(self, labeling_frontend, labeling_frontend_options): | |
| a.k.a. project ontology. If given a `dict` it will be converted | ||
| to `str` using `json.dumps`. | ||
| """ | ||
| organization = self.client.get_organization() | ||
|
|
||
| if not isinstance(labeling_frontend_options, str): | ||
| labeling_frontend_options = json.dumps(labeling_frontend_options) | ||
|
|
||
| self.labeling_frontend.connect(labeling_frontend) | ||
|
|
||
| LFO = Entity.LabelingFrontendOptions | ||
| labeling_frontend_options = self.client._create( | ||
| self.client._create( | ||
| LFO, { | ||
| LFO.project: self, | ||
| LFO.labeling_frontend: labeling_frontend, | ||
|
|
@@ -424,6 +449,103 @@ def setup(self, labeling_frontend, labeling_frontend_options): | |
| timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") | ||
| self.update(setup_complete=timestamp) | ||
|
|
||
| def queue(self, data_row_ids: List[str]): | ||
| """Add Data Rows to the Project queue""" | ||
|
|
||
| method = "submitBatchOfDataRows" | ||
| return self._post_batch(method, data_row_ids) | ||
|
|
||
| def dequeue(self, data_row_ids: List[str]): | ||
| """Remove Data Rows from the Project queue""" | ||
|
|
||
| method = "removeBatchOfDataRows" | ||
| return self._post_batch(method, data_row_ids) | ||
|
|
||
| def _post_batch(self, method, data_row_ids: List[str]): | ||
| """Post batch methods""" | ||
|
|
||
| if self.queue_mode() != QueueMode.Batch: | ||
| raise ValueError("Project must be in batch mode") | ||
|
|
||
| if len(data_row_ids) > MAX_QUEUE_BATCH_SIZE: | ||
| raise ValueError( | ||
| f"Batch exceeds max size of {MAX_QUEUE_BATCH_SIZE}, consider breaking it into parts" | ||
| ) | ||
|
|
||
| query = """mutation %sPyApi($projectId: ID!, $dataRowIds: [ID!]!) { | ||
| project(where: {id: $projectId}) { | ||
| %s(data: {dataRowIds: $dataRowIds}) { | ||
| dataRows { | ||
| dataRowId | ||
| error | ||
| } | ||
| } | ||
| } | ||
| } | ||
| """ % (method, method) | ||
|
|
||
| res = self.client.execute(query, { | ||
| "projectId": self.uid, | ||
| "dataRowIds": data_row_ids | ||
| })["project"][method]["dataRows"] | ||
|
|
||
| # TODO: figure out error messaging | ||
| if len(data_row_ids) == len(res): | ||
| raise ValueError("No dataRows were submitted successfully") | ||
|
|
||
| if len(data_row_ids) > 0: | ||
| warnings.warn("Some Data Rows were not submitted successfully") | ||
|
|
||
| return res | ||
|
|
||
| def _update_queue_mode(self, mode: QueueMode) -> QueueMode: | ||
|
|
||
| if self.queue_mode() == mode: | ||
| return mode | ||
|
|
||
| if mode == QueueMode.Batch: | ||
| status = "ENABLED" | ||
| elif mode == QueueMode.Dataset: | ||
| status = "DISABLED" | ||
| else: | ||
| raise ValueError( | ||
| "Must provide either `BATCH` or `DATASET` as a mode") | ||
|
|
||
| query_str = """mutation %s($projectId: ID!, $status: TagSetStatusInput!) { | ||
| project(where: {id: $projectId}) { | ||
| setTagSetStatus(input: {tagSetStatus: $status}) { | ||
| tagSetStatus | ||
| } | ||
| } | ||
| } | ||
| """ % "setTagSetStatusPyApi" | ||
|
|
||
| self.client.execute(query_str, { | ||
| 'projectId': self.uid, | ||
| 'status': status | ||
| }) | ||
|
Comment on lines
+523
to
+526
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe not run this if the current queue mode == mode param the user passed in |
||
|
|
||
| return mode | ||
|
|
||
| def queue_mode(self): | ||
|
|
||
| query_str = """query %s($projectId: ID!) { | ||
| project(where: {id: $projectId}) { | ||
| tagSetStatus | ||
| } | ||
| } | ||
| """ % "GetTagSetStatusPyApi" | ||
|
|
||
| status = self.client.execute( | ||
| query_str, {'projectId': self.uid})["project"]["tagSetStatus"] | ||
|
|
||
| if status == "ENABLED": | ||
| return QueueMode.Batch | ||
| elif status == "DISABLED": | ||
| return QueueMode.Dataset | ||
| else: | ||
| raise ValueError("Status not known") | ||
|
|
||
| def validate_labeling_parameter_overrides(self, data): | ||
| for idx, row in enumerate(data): | ||
| if len(row) != 3: | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,30 @@ | ||
| import pytest | ||
|
|
||
| from labelbox import Dataset, Project | ||
| from labelbox.schema.project import QueueMode | ||
|
|
||
| IMAGE_URL = "https://storage.googleapis.com/diagnostics-demo-data/coco/COCO_train2014_000000000034.jpg" | ||
|
|
||
|
|
||
| @pytest.fixture | ||
| def big_dataset(dataset: Dataset): | ||
| task = dataset.create_data_rows([ | ||
| { | ||
| "row_data": IMAGE_URL, | ||
| "external_id": "my-image" | ||
| }, | ||
| ] * 250) | ||
| task.wait_till_done() | ||
|
|
||
| yield dataset | ||
| dataset.delete() | ||
|
|
||
|
|
||
| def test_submit_batch(configured_project: Project, big_dataset): | ||
| configured_project.update(queue_mode=QueueMode.Batch) | ||
|
|
||
| data_rows = [dr.uid for dr in list(big_dataset.export_data_rows())] | ||
| queue_res = configured_project.queue(data_rows) | ||
| assert not len(queue_res) | ||
| dequeue_res = configured_project.dequeue(data_rows) | ||
| assert not len(dequeue_res) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any danger to toggling the mode? What happens to existing annotations or data in the queue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will be handled by product in the future