Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Changelog

# Version 3.33.0 (2022-12-13)
### Added
* Added SDK support for creating batches with up to 100k data rows
* Added optional media_type to `client.create_ontology_from_feature_schemas()` and `client.create_ontology()`

### Changed
* String representation of `DbObject` subclasses are now formatted

# Version 3.32.0 (2022-12-02)
### Added
* Added `HTML` Enum to `MediaType`. `HTML` is introduced as a new asset type in Labelbox.
Expand Down
2 changes: 1 addition & 1 deletion docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
copyright = '2021, Labelbox'
author = 'Labelbox'

release = '3.32.0'
release = '3.33.0'

# -- General configuration ---------------------------------------------------

Expand Down
2 changes: 1 addition & 1 deletion labelbox/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name = "labelbox"
__version__ = "3.32.0"
__version__ = "3.33.0"

from labelbox.client import Client
from labelbox.schema.project import Project
Expand Down
43 changes: 30 additions & 13 deletions labelbox/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from labelbox.schema.slice import CatalogSlice
from labelbox.schema.queue_mode import QueueMode

from labelbox.schema.media_type import MediaType
from labelbox.schema.media_type import MediaType, get_media_type_validation_error

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -853,15 +853,18 @@ def rootSchemaPayloadToFeatureSchema(client, payload):
rootSchemaPayloadToFeatureSchema,
['rootSchemaNodes', 'nextCursor'])

def create_ontology_from_feature_schemas(self, name,
feature_schema_ids) -> Ontology:
def create_ontology_from_feature_schemas(self,
name,
feature_schema_ids,
media_type=None) -> Ontology:
"""
Creates an ontology from a list of feature schema ids

Args:
name (str): Name of the ontology
feature_schema_ids (List[str]): List of feature schema ids corresponding to
top level tools and classifications to include in the ontology
media_type (MediaType or None): Media type of a new ontology
Returns:
The created Ontology
"""
Expand Down Expand Up @@ -891,9 +894,9 @@ def create_ontology_from_feature_schemas(self, name,
"Neither `tool` or `classification` found in the normalized feature schema"
)
normalized = {'tools': tools, 'classifications': classifications}
return self.create_ontology(name, normalized)
return self.create_ontology(name, normalized, media_type)

def create_ontology(self, name, normalized) -> Ontology:
def create_ontology(self, name, normalized, media_type=None) -> Ontology:
"""
Creates an ontology from normalized data
>>> normalized = {"tools" : [{'tool': 'polygon', 'name': 'cat', 'color': 'black'}], "classifications" : []}
Expand All @@ -910,13 +913,27 @@ def create_ontology(self, name, normalized) -> Ontology:
Args:
name (str): Name of the ontology
normalized (dict): A normalized ontology payload. See above for details.
media_type (MediaType or None): Media type of a new ontology
Returns:
The created Ontology
"""

if media_type:
if MediaType.is_supported(media_type):
media_type = media_type.value
else:
raise get_media_type_validation_error(media_type)

query_str = """mutation upsertRootSchemaNodePyApi($data: UpsertOntologyInput!){
upsertOntology(data: $data){ %s }
} """ % query.results_query_part(Entity.Ontology)
params = {'data': {'name': name, 'normalized': json.dumps(normalized)}}
params = {
'data': {
'name': name,
'normalized': json.dumps(normalized),
'mediaType': media_type
}
}
res = self.execute(query_str, params)
return Entity.Ontology(self, res['upsertOntology'])

Expand Down Expand Up @@ -1035,9 +1052,9 @@ def _format_failed_rows(rows: Dict[str, str],
)

# Start assign global keys to data rows job
query_str = """mutation assignGlobalKeysToDataRowsPyApi($globalKeyDataRowLinks: [AssignGlobalKeyToDataRowInput!]!) {
assignGlobalKeysToDataRows(data: {assignInputs: $globalKeyDataRowLinks}) {
jobId
query_str = """mutation assignGlobalKeysToDataRowsPyApi($globalKeyDataRowLinks: [AssignGlobalKeyToDataRowInput!]!) {
assignGlobalKeysToDataRows(data: {assignInputs: $globalKeyDataRowLinks}) {
jobId
}
}
"""
Expand Down Expand Up @@ -1172,7 +1189,7 @@ def _format_failed_rows(rows: List[str],

# Query string for retrieving job status and result, if job is done
result_query_str = """query getDataRowsForGlobalKeysResultPyApi($jobId: ID!) {
dataRowsForGlobalKeysResult(jobId: {id: $jobId}) { data {
dataRowsForGlobalKeysResult(jobId: {id: $jobId}) { data {
fetchedDataRows { id }
notFoundGlobalKeys
accessDeniedGlobalKeys
Expand Down Expand Up @@ -1246,8 +1263,8 @@ def clear_global_keys(

'Results' contains a list global keys that were successfully cleared.

'Errors' contains a list of global_keys correspond to the data rows that could not be
modified, accessed by the user, or not found.
'Errors' contains a list of global_keys correspond to the data rows that could not be
modified, accessed by the user, or not found.
Examples:
>>> job_result = client.get_data_row_ids_for_global_keys(["key1","key2"])
>>> print(job_result['status'])
Expand All @@ -1271,7 +1288,7 @@ def _format_failed_rows(rows: List[str],

# Query string for retrieving job status and result, if job is done
result_query_str = """query clearGlobalKeysResultPyApi($jobId: ID!) {
clearGlobalKeysResult(jobId: {id: $jobId}) { data {
clearGlobalKeysResult(jobId: {id: $jobId}) { data {
clearedGlobalKeys
failedToClearGlobalKeys
notFoundGlobalKeys
Expand Down
4 changes: 3 additions & 1 deletion labelbox/orm/db_object.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from datetime import datetime, timezone
from functools import wraps
import logging
import json

from labelbox import utils
from labelbox.exceptions import InvalidQueryError, InvalidAttributeError
Expand Down Expand Up @@ -92,7 +93,8 @@ def __str__(self):
attribute_values = {
field.name: getattr(self, field.name) for field in self.fields()
}
return "<%s %s>" % (self.type_name().split(".")[-1], attribute_values)
return "<%s %s>" % (self.type_name().split(".")[-1],
json.dumps(attribute_values, indent=4, default=str))

def __eq__(self, other):
return (isinstance(other, DbObject) and
Expand Down
4 changes: 2 additions & 2 deletions labelbox/orm/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ def _attributes_of_type(cls, attr_type):

@classmethod
def fields(cls):
""" Returns a generateor that yields all the Fields declared in a
""" Returns a generator that yields all the Fields declared in a
concrete subclass.
"""
for attr in cls._attributes_of_type(Field):
Expand All @@ -398,7 +398,7 @@ def fields(cls):

@classmethod
def relationships(cls):
""" Returns a generateor that yields all the Relationships declared in
""" Returns a generator that yields all the Relationships declared in
a concrete subclass.
"""
return cls._attributes_of_type(Relationship)
Expand Down
6 changes: 6 additions & 0 deletions labelbox/schema/media_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,9 @@ def get_supported_members(cls):
item for item in cls.__members__
if item not in ["Unknown", "Unsupported"]
]


def get_media_type_validation_error(media_type):
return TypeError(f"{media_type} is not a valid media type. Use"
f" any of {MediaType.get_supported_members()}"
" from MediaType. Example: MediaType.Image.")
141 changes: 127 additions & 14 deletions labelbox/schema/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
from labelbox.orm.model import Entity, Field, Relationship
from labelbox.pagination import PaginatedCollection
from labelbox.schema.consensus_settings import ConsensusSettings
from labelbox.schema.data_row import DataRow
from labelbox.schema.media_type import MediaType
from labelbox.schema.queue_mode import QueueMode
from labelbox.schema.resource_tag import ResourceTag
from labelbox.schema.data_row import DataRow

if TYPE_CHECKING:
from labelbox import BulkImportRequest
Expand Down Expand Up @@ -608,22 +608,31 @@ def create_batch(self,

self._wait_until_data_rows_are_processed(
dr_ids, self._wait_processing_max_seconds)
method = 'createBatchV2'
query_str = """mutation %sPyApi($projectId: ID!, $batchInput: CreateBatchInput!) {
project(where: {id: $projectId}) {
%s(input: $batchInput) {
batch {
%s
}
failedDataRowIds
}
}
}
""" % (method, method, query.results_query_part(Entity.Batch))

if consensus_settings:
consensus_settings = ConsensusSettings(**consensus_settings).dict(
by_alias=True)

if len(dr_ids) >= 10_000:
return self._create_batch_async(name, dr_ids, priority,
consensus_settings)
else:
return self._create_batch_sync(name, dr_ids, priority,
consensus_settings)

def _create_batch_sync(self, name, dr_ids, priority, consensus_settings):
method = 'createBatchV2'
query_str = """mutation %sPyApi($projectId: ID!, $batchInput: CreateBatchInput!) {
project(where: {id: $projectId}) {
%s(input: $batchInput) {
batch {
%s
}
failedDataRowIds
}
}
}
""" % (method, method, query.results_query_part(Entity.Batch))
params = {
"projectId": self.uid,
"batchInput": {
Expand All @@ -633,7 +642,6 @@ def create_batch(self,
"consensusSettings": consensus_settings
}
}

res = self.client.execute(query_str,
params,
timeout=180.0,
Expand All @@ -645,6 +653,111 @@ def create_batch(self,
batch,
failed_data_row_ids=res['failedDataRowIds'])

def _create_batch_async(self,
name: str,
dr_ids: List[str],
priority: int = 5,
consensus_settings: Optional[Dict[str,
float]] = None):
method = 'createEmptyBatch'
create_empty_batch_mutation_str = """mutation %sPyApi($projectId: ID!, $input: CreateEmptyBatchInput!) {
project(where: {id: $projectId}) {
%s(input: $input) {
id
}
}
}
""" % (method, method)

params = {
"projectId": self.uid,
"input": {
"name": name,
"consensusSettings": consensus_settings
}
}

res = self.client.execute(create_empty_batch_mutation_str,
params,
timeout=180.0,
experimental=True)["project"][method]
batch_id = res['id']

method = 'addDataRowsToBatchAsync'
add_data_rows_mutation_str = """mutation %sPyApi($projectId: ID!, $input: AddDataRowsToBatchInput!) {
project(where: {id: $projectId}) {
%s(input: $input) {
taskId
}
}
}
""" % (method, method)

params = {
"projectId": self.uid,
"input": {
"batchId": batch_id,
"dataRowIds": dr_ids,
"priority": priority,
}
}

res = self.client.execute(add_data_rows_mutation_str,
params,
timeout=180.0,
experimental=True)["project"][method]

task_id = res['taskId']

timeout_seconds = 600
sleep_time = 2
get_task_query_str = """query %s($taskId: ID!) {
task(where: {id: $taskId}) {
status
}
}
""" % "getTaskPyApi"

while True:
task_status = self.client.execute(
get_task_query_str, {'taskId': task_id},
experimental=True)['task']['status']

if task_status == "COMPLETE":
# obtain batch entity to return
get_batch_str = """query %s($projectId: ID!, $batchId: ID!) {
project(where: {id: $projectId}) {
batches(where: {id: $batchId}) {
nodes {
%s
}
}
}
}
""" % ("getProjectBatchPyApi",
query.results_query_part(Entity.Batch))

batch = self.client.execute(
get_batch_str, {
"projectId": self.uid,
"batchId": batch_id
},
timeout=180.0,
experimental=True)["project"]["batches"]["nodes"][0]

# TODO async endpoints currently do not provide failed_data_row_ids in response
return Entity.Batch(self.client, self.uid, batch)
elif task_status == "IN_PROGRESS":
timeout_seconds -= sleep_time
if timeout_seconds <= 0:
raise LabelboxError(
f"Timed out while waiting for batch to be created.")
logger.debug("Creating batch, waiting for server...", self.uid)
time.sleep(sleep_time)
continue
else:
raise LabelboxError(f"Batch was not created successfully.")

def _update_queue_mode(self, mode: "QueueMode") -> "QueueMode":
"""
Updates the queueing mode of this project.
Expand Down
9 changes: 9 additions & 0 deletions tests/integration/test_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@ def test_create_batch(batch_project: Project, big_dataset: Dataset):
assert batch.size == len(data_rows)


def test_create_batch_async(batch_project: Project, big_dataset: Dataset):
data_rows = [dr.uid for dr in list(big_dataset.export_data_rows())]
batch_project._wait_until_data_rows_are_processed(
data_rows, batch_project._wait_processing_max_seconds)
batch = batch_project._create_batch_async("big-batch", data_rows, 3)
assert batch.name == "big-batch"
assert batch.size == len(data_rows)


def test_create_batch_with_consensus_settings(batch_project: Project,
small_dataset: Dataset):
data_rows = [dr.uid for dr in list(small_dataset.export_data_rows())]
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_data_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,7 @@ def test_data_row_bulk_creation_with_same_global_keys(dataset, sample_image):
assert task.status == "FAILED"
assert len(task.failed_data_rows) > 0
assert len(list(dataset.data_rows())) == 0
assert task.errors == "Import job failed"
assert task.errors == "Data rows contain empty string or duplicate global keys, which are not allowed"

task = dataset.create_data_rows([{
DataRow.row_data: sample_image,
Expand Down