Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
## Added
* All imports are available via `import labelbox as lb` and `import labelbox.types as lb_types`.
* Attachment_name support to create_attachment()
* New method `Project.task_queues()` to obtain the task queues for a project.
* New method `Project.move_data_rows_to_task_queue()` for moving data rows to a specified task queue.

## Changed
* `LabelImport.create_from_objects()`, `MALPredictionImport.create_from_objects()`, `MEAPredictionImport.create_from_objects()`, `Project.upload_annotations()`, `ModelRun.add_predictions()` now support Python Types for annotations.
Expand Down
6 changes: 6 additions & 0 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ Task
:members:
:show-inheritance:

Task Queue
---------------------------
.. automodule:: labelbox.schema.task_queue
:members:
:show-inheritance:

User
---------------------------

Expand Down
1 change: 1 addition & 0 deletions labelbox/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@
from labelbox.schema.media_type import MediaType
from labelbox.schema.slice import Slice, CatalogSlice
from labelbox.schema.queue_mode import QueueMode
from labelbox.schema.task_queue import TaskQueue
1 change: 1 addition & 0 deletions labelbox/orm/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ class Entity(metaclass=EntityMeta):
Project: Type[labelbox.Project]
Batch: Type[labelbox.Batch]
CatalogSlice: Type[labelbox.CatalogSlice]
TaskQueue: Type[labelbox.TaskQueue]

@classmethod
def _attributes_of_type(cls, attr_type):
Expand Down
147 changes: 102 additions & 45 deletions labelbox/schema/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from labelbox.schema.resource_tag import ResourceTag
from labelbox.schema.task import Task
from labelbox.schema.user import User
from labelbox.schema.task_queue import TaskQueue

if TYPE_CHECKING:
from labelbox import BulkImportRequest
Expand Down Expand Up @@ -69,6 +70,7 @@ class Project(DbObject, Updateable, Deletable):
webhooks (Relationship): `ToMany` relationship to Webhook
benchmarks (Relationship): `ToMany` relationship to Benchmark
ontology (Relationship): `ToOne` relationship to Ontology
task_queues (Relationship): `ToMany` relationship to TaskQueue
"""

name = Field.String("name")
Expand Down Expand Up @@ -794,54 +796,34 @@ def _create_batch_async(self,

task_id = res['taskId']

timeout_seconds = 600
sleep_time = 2
get_task_query_str = """query %s($taskId: ID!) {
task(where: {id: $taskId}) {
status
task = self._wait_for_task(task_id)
if task.status != "COMPLETE":
raise LabelboxError(f"Batch was not created successfully: " +
json.dumps(task.errors))

# obtain batch entity to return
get_batch_str = """query %s($projectId: ID!, $batchId: ID!) {
project(where: {id: $projectId}) {
batches(where: {id: $batchId}) {
nodes {
%s
}
}
}
}
""" % "getTaskPyApi"
""" % ("getProjectBatchPyApi",
query.results_query_part(Entity.Batch))

while True:
task_status = self.client.execute(
get_task_query_str, {'taskId': task_id},
experimental=True)['task']['status']

if task_status == "COMPLETE":
# obtain batch entity to return
get_batch_str = """query %s($projectId: ID!, $batchId: ID!) {
project(where: {id: $projectId}) {
batches(where: {id: $batchId}) {
nodes {
%s
}
}
}
}
""" % ("getProjectBatchPyApi",
query.results_query_part(Entity.Batch))

batch = self.client.execute(
get_batch_str, {
"projectId": self.uid,
"batchId": batch_id
},
timeout=180.0,
experimental=True)["project"]["batches"]["nodes"][0]

# TODO async endpoints currently do not provide failed_data_row_ids in response
return Entity.Batch(self.client, self.uid, batch)
elif task_status == "IN_PROGRESS":
timeout_seconds -= sleep_time
if timeout_seconds <= 0:
raise LabelboxError(
f"Timed out while waiting for batch to be created.")
logger.debug("Creating batch, waiting for server...", self.uid)
time.sleep(sleep_time)
continue
else:
raise LabelboxError(f"Batch was not created successfully.")
batch = self.client.execute(
get_batch_str, {
"projectId": self.uid,
"batchId": batch_id
},
timeout=180.0,
experimental=True)["project"]["batches"]["nodes"][0]

# TODO async endpoints currently do not provide failed_data_row_ids in response
return Entity.Batch(self.client, self.uid, batch)

def _update_queue_mode(self, mode: "QueueMode") -> "QueueMode":
"""
Expand Down Expand Up @@ -1127,6 +1109,81 @@ def batches(self) -> PaginatedCollection:
cursor_path=['project', 'batches', 'pageInfo', 'endCursor'],
experimental=True)

def task_queues(self) -> List[TaskQueue]:
""" Fetch all task queues that belong to this project

Returns:
A `List of `TaskQueue`s
"""
query_str = """query GetProjectTaskQueuesPyApi($projectId: ID!) {
project(where: {id: $projectId}) {
taskQueues {
%s
}
}
}
""" % (query.results_query_part(Entity.TaskQueue))

task_queue_values = self.client.execute(
query_str, {"projectId": self.uid},
timeout=180.0,
experimental=True)["project"]["taskQueues"]

return [
Entity.TaskQueue(self.client, field_values)
for field_values in task_queue_values
]

def move_data_rows_to_task_queue(self, data_row_ids: List[str],
task_queue_id: str):
"""

Moves data rows to the specified task queue.

Args:
data_row_ids: a list of data row ids to be moved
task_queue_id: the task queue id to be moved to, or None to specify the "Done" queue

Returns:
None if successful, or a raised error on failure

"""
method = "createBulkAddRowsToQueueTask"
query_str = """mutation AddDataRowsToTaskQueueAsyncPyApi(
$projectId: ID!
$queueId: ID
$dataRowIds: [ID!]!
) {
project(where: { id: $projectId }) {
%s(
data: { queueId: $queueId, dataRowIds: $dataRowIds }
) {
taskId
}
}
}
""" % method

task_id = self.client.execute(
query_str, {
"projectId": self.uid,
"queueId": task_queue_id,
"dataRowIds": data_row_ids
},
timeout=180.0,
experimental=True)["project"][method]["taskId"]

task = self._wait_for_task(task_id)
if task.status != "COMPLETE":
raise LabelboxError(f"Data rows were not moved successfully: " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What type of Task is this job? Currently, Task.errors only returns value for a JSON Import task

json.dumps(task.errors))

def _wait_for_task(self, task_id: str) -> Task:
task = Task.get_task(self.client, task_id)
task.wait_till_done()

return task

def upload_annotations(
self,
name: str,
Expand Down
26 changes: 24 additions & 2 deletions labelbox/schema/task.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import logging
import requests
import time
Expand All @@ -6,7 +7,7 @@

from labelbox.exceptions import ResourceNotFoundError
from labelbox.orm.db_object import DbObject
from labelbox.orm.model import Field, Relationship
from labelbox.orm.model import Field, Relationship, Entity

if TYPE_CHECKING:
from labelbox import User
Expand Down Expand Up @@ -55,7 +56,7 @@ def refresh(self) -> None:
for field in self.fields():
setattr(self, field.name, getattr(tasks[0], field.name))

def wait_till_done(self, timeout_seconds=300) -> None:
def wait_till_done(self, timeout_seconds: int = 300) -> None:
""" Waits until the task is completed. Periodically queries the server
to update the task attributes.

Expand Down Expand Up @@ -83,9 +84,16 @@ def wait_till_done(self, timeout_seconds=300) -> None:
def errors(self) -> Optional[Dict[str, Any]]:
""" Fetch the error associated with an import task.
"""
if self.type == "add-data-rows-to-batch" or self.type == "send-to-task-queue":
if self.status == "FAILED":
# for these tasks, the error is embedded in the result itself
return json.loads(self.result_url)
return None

# TODO: We should handle error messages for export v2 tasks in the future.
if self.name != 'JSON Import':
return None

if self.status == "FAILED":
result = self._fetch_remote_json()
return result["error"]
Expand Down Expand Up @@ -153,3 +161,17 @@ def download_result():
"Job status still in `IN_PROGRESS`. The result is not available. Call task.wait_till_done() with a larger timeout or contact support."
)
return download_result()

@staticmethod
def get_task(client, task_id):
user: User = client.get_user()
tasks: List[Task] = list(
user.created_tasks(where=Entity.Task.uid == task_id))
# Cache user in a private variable as the relationship can't be
# resolved due to server-side limitations (see Task.created_by)
# for more info.
if len(tasks) != 1:
raise ResourceNotFoundError(Entity.Task, {task_id: task_id})
task: Task = tasks[0]
task._user = user
return task
28 changes: 28 additions & 0 deletions labelbox/schema/task_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from labelbox.orm.db_object import DbObject
from labelbox.orm.model import Field


class TaskQueue(DbObject):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add this class as an entry in docs/source/index.rst to be indexed for readthedocs.
More details https://labelbox.atlassian.net/wiki/spaces/AIENG/pages/1721139216/Release+a+new+Python+SDK+Version#Documentation-Updates

"""
a task queue
Attributes
name
description
queue_type
data_row_count
Relationships
project
organization
pass_queue
fail_queue
"""

name = Field.String("name")
description = Field.String("description")
queue_type = Field.String("queue_type")
data_row_count = Field.Int("data_row_count")

def __init__(self, client, *args, **kwargs):
super().__init__(client, *args, **kwargs)
59 changes: 45 additions & 14 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,18 +311,39 @@ def configured_project_with_label(client, rand_gen, image_url, project, dataset,
One label is already created and yielded when using fixture
"""
project.datasets.connect(dataset)
editor = list(
project.client.get_labeling_frontends(
where=LabelingFrontend.name == "editor"))[0]

ontology_builder = OntologyBuilder(tools=[
Tool(tool=Tool.Type.BBOX, name="test-bbox-class"),
])
project.setup(editor, ontology_builder.asdict())
# TODO: ontology may not be synchronous after setup. remove sleep when api is more consistent
time.sleep(2)
ontology = _setup_ontology(project)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So clean!

label = _create_label(project, datarow, ontology, wait_for_label_processing)

yield [project, dataset, datarow, label]

for label in project.labels():
label.delete()


@pytest.fixture
def configured_batch_project_with_label(client, rand_gen, image_url,
batch_project, dataset, datarow,
wait_for_label_processing):
"""Project with a batch having one datarow
Project contains an ontology with 1 bbox tool
Additionally includes a create_label method for any needed extra labels
One label is already created and yielded when using fixture
"""
data_rows = [dr.uid for dr in list(dataset.data_rows())]
batch_project.create_batch("test-batch", data_rows)

ontology = _setup_ontology(batch_project)
label = _create_label(batch_project, datarow, ontology,
wait_for_label_processing)

yield [batch_project, dataset, datarow, label]

for label in batch_project.labels():
label.delete()


ontology = ontology_builder.from_project(project)
def _create_label(project, datarow, ontology, wait_for_label_processing):
predictions = [{
"uuid": str(uuid.uuid4()),
"schemaId": ontology.tools[0].feature_schema_id,
Expand All @@ -342,7 +363,8 @@ def create_label():
Creates a LabelImport task which will create a label
"""
upload_task = LabelImport.create_from_objects(
client, project.uid, f'label-import-{uuid.uuid4()}', predictions)
project.client, project.uid, f'label-import-{uuid.uuid4()}',
predictions)
upload_task.wait_until_done(sleep_time_seconds=5)
assert upload_task.state == AnnotationImportState.FINISHED, "Label Import did not finish"
assert len(
Expand All @@ -352,11 +374,20 @@ def create_label():
project.create_label = create_label
project.create_label()
label = wait_for_label_processing(project)[0]
return label

yield [project, dataset, datarow, label]

for label in project.labels():
label.delete()
def _setup_ontology(project):
editor = list(
project.client.get_labeling_frontends(
where=LabelingFrontend.name == "editor"))[0]
ontology_builder = OntologyBuilder(tools=[
Tool(tool=Tool.Type.BBOX, name="test-bbox-class"),
])
project.setup(editor, ontology_builder.asdict())
# TODO: ontology may not be synchronous after setup. remove sleep when api is more consistent
time.sleep(2)
return ontology_builder.from_project(project)


@pytest.fixture
Expand Down
Loading