Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions labelbox/schema/catalog.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Dict, List, Optional, Union
from typing import Any, Dict, List, Optional, Tuple, Union
from labelbox.orm.db_object import experimental
from labelbox.schema.export_filters import CatalogExportFilters, build_filters

Expand All @@ -23,7 +23,7 @@ def export_v2(
task_name: Optional[str] = None,
filters: Union[CatalogExportFilters, Dict[str, List[str]], None] = None,
params: Optional[CatalogExportParams] = None,
) -> Task:
) -> Union[Task, ExportTask]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this PR we return a union or a tuple in several places, is it possible to avoid that ? Here for instance I can't tell the difference between Task and ExportTask. Also we end up not using one of the two return values in other places.

Copy link
Contributor Author

@mnoszczak mnoszczak Apr 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR modifies ExportTask so that it is a superset of Task and behaves similarly. We cannot know what is being returned until we toggle the flag for all customers. We'll be able to narrow down the type in future version (once flag is enabled for everyone).

"""
Creates a catalog export task with the given params, filters and returns the task.

Expand All @@ -42,7 +42,10 @@ def export_v2(
>>> task.wait_till_done()
>>> task.result
"""
return self._export(task_name, filters, params, False)
task, is_streamable = self._export(task_name, filters, params)
if (is_streamable):
return ExportTask(task, True)
return task

@experimental
def export(
Expand Down Expand Up @@ -83,15 +86,15 @@ def export(
>>> stream_type=lb.StreamType.RESULT
>>> ).start(stream_handler=json_stream_handler)
"""
task = self._export(task_name, filters, params, True)
task, _ = self._export(task_name, filters, params, streamable=True)
return ExportTask(task)

def _export(self,
task_name: Optional[str] = None,
filters: Union[CatalogExportFilters, Dict[str, List[str]],
None] = None,
params: Optional[CatalogExportParams] = None,
streamable: bool = False) -> Task:
streamable: bool = False) -> Tuple[Task, bool]:

_params = params or CatalogExportParams({
"attachments": False,
Expand Down Expand Up @@ -120,7 +123,7 @@ def _export(self,
create_task_query_str = (
f"mutation {mutation_name}PyApi"
f"($input: ExportDataRowsInCatalogInput!)"
f"{{{mutation_name}(input: $input){{taskId}}}}")
f"{{{mutation_name}(input: $input){{taskId isStreamable}}}}")

media_type_override = _params.get('media_type_override', None)
query_params: Dict[str, Any] = {
Expand All @@ -132,6 +135,7 @@ def _export(self,
"query": None,
}
},
"isStreamableReady": True,
"params": {
"mediaTypeOverride":
media_type_override.value
Expand Down Expand Up @@ -171,4 +175,5 @@ def _export(self,
error_log_key="errors")
res = res[mutation_name]
task_id = res["taskId"]
return Task.get_task(self.client, task_id)
is_streamable = res["isStreamable"]
return Task.get_task(self.client, task_id), is_streamable
31 changes: 18 additions & 13 deletions labelbox/schema/data_row.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
from enum import Enum
from typing import TYPE_CHECKING, List, Optional, Union, Any
from typing import TYPE_CHECKING, List, Optional, Tuple, Union, Any
import json

from labelbox.orm import query
Expand Down Expand Up @@ -210,12 +210,12 @@ def export(
>>> task.wait_till_done()
>>> task.result
"""
task = DataRow._export(client,
data_rows,
global_keys,
task_name,
params,
streamable=True)
task, _ = DataRow._export(client,
data_rows,
global_keys,
task_name,
params,
streamable=True)
return ExportTask(task)

@staticmethod
Expand All @@ -225,7 +225,7 @@ def export_v2(
global_keys: Optional[List[str]] = None,
task_name: Optional[str] = None,
params: Optional[CatalogExportParams] = None,
) -> Task:
) -> Union[Task, ExportTask]:
"""
Creates a data rows export task with the given list, params and returns the task.
Args:
Expand All @@ -249,8 +249,11 @@ def export_v2(
>>> task.wait_till_done()
>>> task.result
"""
return DataRow._export(client, data_rows, global_keys, task_name,
params)
task, is_streamable = DataRow._export(client, data_rows, global_keys,
task_name, params)
if is_streamable:
return ExportTask(task, True)
return task

@staticmethod
def _export(
Expand All @@ -260,7 +263,7 @@ def _export(
task_name: Optional[str] = None,
params: Optional[CatalogExportParams] = None,
streamable: bool = False,
) -> Task:
) -> Tuple[Task, bool]:
_params = params or CatalogExportParams({
"attachments": False,
"metadata_fields": False,
Expand All @@ -282,7 +285,7 @@ def _export(
create_task_query_str = (
f"mutation {mutation_name}PyApi"
f"($input: ExportDataRowsInCatalogInput!)"
f"{{{mutation_name}(input: $input){{taskId}}}}")
f"{{{mutation_name}(input: $input){{taskId isStreamable}}}}")

data_row_ids = []
if data_rows is not None:
Expand Down Expand Up @@ -315,6 +318,7 @@ def _export(
"query": search_query
}
},
"isStreamableReady": True,
"params": {
"mediaTypeOverride":
media_type_override.value
Expand Down Expand Up @@ -352,4 +356,5 @@ def _export(
print(res)
res = res[mutation_name]
task_id = res["taskId"]
return Task.get_task(client, task_id)
is_streamable = res["isStreamable"]
return Task.get_task(client, task_id), is_streamable
21 changes: 13 additions & 8 deletions labelbox/schema/dataset.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from datetime import datetime
from typing import Dict, Generator, List, Optional, Any, Final
from typing import Dict, Generator, List, Optional, Any, Final, Tuple, Union
import os
import json
import logging
Expand Down Expand Up @@ -650,15 +650,15 @@ def export(
>>> task.wait_till_done()
>>> task.result
"""
task = self._export(task_name, filters, params, streamable=True)
task, _ = self._export(task_name, filters, params, streamable=True)
return ExportTask(task)

def export_v2(
self,
task_name: Optional[str] = None,
filters: Optional[DatasetExportFilters] = None,
params: Optional[CatalogExportParams] = None,
) -> Task:
) -> Union[Task, ExportTask]:
"""
Creates a dataset export task with the given params and returns the task.

Expand All @@ -676,15 +676,18 @@ def export_v2(
>>> task.wait_till_done()
>>> task.result
"""
return self._export(task_name, filters, params)
task, is_streamable = self._export(task_name, filters, params)
if (is_streamable):
return ExportTask(task, True)
return task

def _export(
self,
task_name: Optional[str] = None,
filters: Optional[DatasetExportFilters] = None,
params: Optional[CatalogExportParams] = None,
streamable: bool = False,
) -> Task:
) -> Tuple[Task, bool]:
_params = params or CatalogExportParams({
"attachments": False,
"metadata_fields": False,
Expand Down Expand Up @@ -712,7 +715,7 @@ def _export(
create_task_query_str = (
f"mutation {mutation_name}PyApi"
f"($input: ExportDataRowsInCatalogInput!)"
f"{{{mutation_name}(input: $input){{taskId}}}}")
f"{{{mutation_name}(input: $input){{taskId isStreamable}}}}")
media_type_override = _params.get('media_type_override', None)

if task_name is None:
Expand All @@ -726,6 +729,7 @@ def _export(
"query": None,
}
},
"isStreamableReady": True,
"params": {
"mediaTypeOverride":
media_type_override.value
Expand Down Expand Up @@ -771,7 +775,8 @@ def _export(
error_log_key="errors")
res = res[mutation_name]
task_id = res["taskId"]
return Task.get_task(self.client, task_id)
is_streamable = res["isStreamable"]
return Task.get_task(self.client, task_id), is_streamable

def upsert_data_rows(self, items, file_upload_thread_count=20) -> "Task":
"""
Expand Down Expand Up @@ -868,4 +873,4 @@ def _convert_items_to_upsert_format(self, _items):
k: v for k, v in item.items() if v is not None
} # remove None values
_upsert_items.append(DataRowUpsertItem(payload=item, id=key))
return _upsert_items
return _upsert_items
73 changes: 72 additions & 1 deletion labelbox/schema/export_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,8 @@ class ExportTask:
class ExportTaskException(Exception):
"""Raised when the task is not ready yet."""

def __init__(self, task: Task) -> None:
def __init__(self, task: Task, is_export_v2: bool = False) -> None:
self._is_export_v2 = is_export_v2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume we need to explicitly pass it in because of the api feature flag that might change streamable to export_v2?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This value comes down from API though. The flow is as follows:

  1. New SDK version: Hey API I'm able to handle streamable backend
  2. API: Ok let me check feature flag if I can serve you streamable export
  3. SDK Receives streamable response: uses ExportTask internal (with is_export_v2=True) and tweaks its public API so it behaves just like Task class (result_url/errors_url)

self._task = task

def __repr__(self):
Expand Down Expand Up @@ -530,9 +531,79 @@ def metadata(self):
"""Returns the metadata of the task."""
return self._task.metadata

@property
def result_url(self):
"""Returns the result URL of the task."""
if not self._is_export_v2:
raise ExportTask.ExportTaskException(
"This property is only available for export_v2 tasks due to compatibility reasons, please use streamable errors instead"
)
base_url = self._task.client.rest_endpoint
return base_url + '/export-results/' + self._task.uid + '/' + self._task.client.get_organization(
).uid

@property
def errors_url(self):
"""Returns the errors URL of the task."""
if not self._is_export_v2:
raise ExportTask.ExportTaskException(
"This property is only available for export_v2 tasks due to compatibility reasons, please use streamable errors instead"
)
base_url = self._task.client.rest_endpoint
return base_url + '/export-errors/' + self._task.uid + '/' + self._task.client.get_organization(
).uid

@property
def errors(self):
"""Returns the errors of the task."""
if not self._is_export_v2:
raise ExportTask.ExportTaskException(
"This property is only available for export_v2 tasks due to compatibility reasons, please use streamable errors instead"
)
if self.status == "FAILED":
raise ExportTask.ExportTaskException("Task failed")
if self.status != "COMPLETE":
raise ExportTask.ExportTaskException("Task is not ready yet")

if not self.has_errors():
return None

data = []

metadata_header = ExportTask._get_metadata_header(
self._task.client, self._task.uid, StreamType.ERRORS)
if metadata_header is None:
return None
Stream(
_TaskContext(self._task.client, self._task.uid, StreamType.ERRORS,
metadata_header),
_MultiGCSFileReader(),
JsonConverter(),
).start(stream_handler=lambda output: data.append(output.json_str))
return data

@property
def result(self):
"""Returns the result of the task."""
if self._is_export_v2:
if self.status == "FAILED":
raise ExportTask.ExportTaskException("Task failed")
if self.status != "COMPLETE":
raise ExportTask.ExportTaskException("Task is not ready yet")
data = []

metadata_header = ExportTask._get_metadata_header(
self._task.client, self._task.uid, StreamType.RESULT)
if metadata_header is None:
return []
Stream(
_TaskContext(self._task.client, self._task.uid,
StreamType.RESULT, metadata_header),
_MultiGCSFileReader(),
JsonConverter(),
).start(stream_handler=lambda output: data.append(
json.loads(output.json_str)))
return data
return self._task.result_url

@property
Expand Down
19 changes: 12 additions & 7 deletions labelbox/schema/model_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import warnings
from enum import Enum
from pathlib import Path
from typing import TYPE_CHECKING, Dict, Iterable, Union, List, Optional, Any
from typing import TYPE_CHECKING, Dict, Iterable, Union, Tuple, List, Optional, Any

import requests

Expand Down Expand Up @@ -521,33 +521,36 @@ def export(self,
>>> export_task = export("my_export_task", params={"media_attributes": True})

"""
task = self._export(task_name, params, streamable=True)
task, _ = self._export(task_name, params, streamable=True)
return ExportTask(task)

def export_v2(
self,
task_name: Optional[str] = None,
params: Optional[ModelRunExportParams] = None,
) -> Task:
) -> Union[Task, ExportTask]:
"""
Creates a model run export task with the given params and returns the task.

>>> export_task = export_v2("my_export_task", params={"media_attributes": True})

"""
return self._export(task_name, params)
task, is_streamable = self._export(task_name, params)
if (is_streamable):
return ExportTask(task, True)
return task

def _export(
self,
task_name: Optional[str] = None,
params: Optional[ModelRunExportParams] = None,
streamable: bool = False,
) -> Task:
) -> Tuple[Task, bool]:
mutation_name = "exportDataRowsInModelRun"
create_task_query_str = (
f"mutation {mutation_name}PyApi"
f"($input: ExportDataRowsInModelRunInput!)"
f"{{{mutation_name}(input: $input){{taskId}}}}")
f"{{{mutation_name}(input: $input){{taskId isStreamable}}}}")

_params = params or ModelRunExportParams()

Expand All @@ -557,6 +560,7 @@ def _export(
"filters": {
"modelRunId": self.uid
},
"isStreamableReady": True,
"params": {
"mediaTypeOverride":
_params.get('media_type_override', None),
Expand All @@ -579,7 +583,8 @@ def _export(
error_log_key="errors")
res = res[mutation_name]
task_id = res["taskId"]
return Task.get_task(self.client, task_id)
is_streamable = res["isStreamable"]
return Task.get_task(self.client, task_id), is_streamable

def send_to_annotate_from_model(
self, destination_project_id: str, task_queue_id: Optional[str],
Expand Down
Loading