From 48a559d30d094195ed7e3f8106d1c62142db0411 Mon Sep 17 00:00:00 2001 From: Klaus Opreschko Date: Wed, 8 Nov 2023 22:08:51 -0700 Subject: [PATCH 1/3] Add new classes/interfaces to support streaming export task results --- labelbox/__init__.py | 1 + labelbox/schema/data_row.py | 48 +-- labelbox/schema/dataset.py | 42 +-- labelbox/schema/export_task.py | 605 +++++++++++++++++++++++++++++++++ labelbox/schema/model_run.py | 47 +-- labelbox/schema/project.py | 42 ++- labelbox/schema/slice.py | 36 +- labelbox/schema/task.py | 13 +- 8 files changed, 729 insertions(+), 105 deletions(-) create mode 100644 labelbox/schema/export_task.py diff --git a/labelbox/__init__.py b/labelbox/__init__.py index 31c6631b5..0a4bc77e3 100644 --- a/labelbox/__init__.py +++ b/labelbox/__init__.py @@ -16,6 +16,7 @@ from labelbox.schema.user import User from labelbox.schema.organization import Organization from labelbox.schema.task import Task +from labelbox.schema.export_task import StreamType, ExportTask, JsonConverter, FileConverter from labelbox.schema.labeling_frontend import LabelingFrontend, LabelingFrontendOptions from labelbox.schema.asset_attachment import AssetAttachment from labelbox.schema.webhook import Webhook diff --git a/labelbox/schema/data_row.py b/labelbox/schema/data_row.py index 9950aefa5..9956697aa 100644 --- a/labelbox/schema/data_row.py +++ b/labelbox/schema/data_row.py @@ -9,6 +9,7 @@ from labelbox.schema.data_row_metadata import DataRowMetadataField # type: ignore from labelbox.schema.export_filters import DatarowExportFilters, build_filters, validate_at_least_one_of_data_row_ids_or_global_keys from labelbox.schema.export_params import CatalogExportParams, validate_catalog_export_params +from labelbox.schema.export_task import ExportTask from labelbox.schema.task import Task from labelbox.schema.user import User # type: ignore @@ -157,11 +158,26 @@ def create_attachment(self, res["createDataRowAttachment"]) @staticmethod - def export_v2(client: 'Client', - data_rows: List[Union[str, 'DataRow']] = None, - global_keys: List[str] = None, - task_name: Optional[str] = None, - params: Optional[CatalogExportParams] = None) -> Task: + def export( + client: "Client", + data_rows: List[Union[str, "DataRow"]] = None, + global_keys: List[str] = None, + task_name: Optional[str] = None, + params: Optional[CatalogExportParams] = None, + ) -> ExportTask: + task = DataRow.export_v2(client, data_rows, global_keys, task_name, + params, True) + return ExportTask(task) + + @staticmethod + def export_v2( + client: "Client", + data_rows: List[Union[str, "DataRow"]] = None, + global_keys: List[str] = None, + task_name: Optional[str] = None, + params: Optional[CatalogExportParams] = None, + streamable: bool = False, + ) -> Task: """ Creates a data rows export task with the given list, params and returns the task. Args: @@ -202,9 +218,10 @@ def export_v2(client: 'Client', validate_catalog_export_params(_params) mutation_name = "exportDataRowsInCatalog" - create_task_query_str = """mutation exportDataRowsInCatalogPyApi($input: ExportDataRowsInCatalogInput!){ - %s(input: $input) {taskId} } - """ % (mutation_name) + create_task_query_str = ( + f"mutation {mutation_name}PyApi" + f"($input: ExportDataRowsInCatalogInput!)" + f"{{{mutation_name}(input: $input){{taskId}}}}") data_row_ids = [] if data_rows is not None: @@ -227,7 +244,7 @@ def export_v2(client: 'Client', media_type_override = _params.get('media_type_override', None) if task_name is None: - task_name = f"Export v2: data rows (%s)" % len(data_row_ids) + task_name = f"Export v2: data rows {len(data_row_ids)}" query_params = { "input": { "taskName": task_name, @@ -260,6 +277,7 @@ def export_v2(client: 'Client', "modelRunIds": _params.get('model_run_ids', None), }, + "streamable": streamable } } @@ -269,14 +287,4 @@ def export_v2(client: 'Client', print(res) res = res[mutation_name] task_id = res["taskId"] - user: User = client.get_user() - tasks: List[Task] = list( - user.created_tasks(where=Entity.Task.uid == task_id)) - # Cache user in a private variable as the relationship can't be - # resolved due to server-side limitations (see Task.created_by) - # for more info. - if len(tasks) != 1: - raise ResourceNotFoundError(Entity.Task, task_id) - task: Task = tasks[0] - task._user = user - return task + return Task.get_task(client, task_id) diff --git a/labelbox/schema/dataset.py b/labelbox/schema/dataset.py index 1afc482b3..fd9cc105d 100644 --- a/labelbox/schema/dataset.py +++ b/labelbox/schema/dataset.py @@ -25,6 +25,7 @@ from labelbox.schema.data_row import DataRow from labelbox.schema.export_filters import DatasetExportFilters, build_filters from labelbox.schema.export_params import CatalogExportParams, validate_catalog_export_params +from labelbox.schema.export_task import ExportTask from labelbox.schema.task import Task from labelbox.schema.user import User @@ -601,10 +602,22 @@ def export_data_rows(self, self.uid) time.sleep(sleep_time) - def export_v2(self, - task_name: Optional[str] = None, - filters: Optional[DatasetExportFilters] = None, - params: Optional[CatalogExportParams] = None) -> Task: + def export( + self, + task_name: Optional[str] = None, + filters: Optional[DatasetExportFilters] = None, + params: Optional[CatalogExportParams] = None, + ) -> ExportTask: + task = self.export_v2(task_name, filters, params, True) + return ExportTask(task) + + def export_v2( + self, + task_name: Optional[str] = None, + filters: Optional[DatasetExportFilters] = None, + params: Optional[CatalogExportParams] = None, + streamable: bool = False, + ) -> Task: """ Creates a dataset export task with the given params and returns the task. @@ -645,10 +658,10 @@ def export_v2(self, }) mutation_name = "exportDataRowsInCatalog" - create_task_query_str = """mutation exportDataRowsInCatalogPyApi($input: ExportDataRowsInCatalogInput!){ - %s(input: $input) {taskId} } - """ % (mutation_name) - + create_task_query_str = ( + f"mutation {mutation_name}PyApi" + f"($input: ExportDataRowsInCatalogInput!)" + f"{{{mutation_name}(input: $input){{taskId}}}}") media_type_override = _params.get('media_type_override', None) if task_name is None: @@ -685,6 +698,7 @@ def export_v2(self, "modelRunIds": _params.get('model_run_ids', None), }, + "streamable": streamable, } } @@ -702,14 +716,4 @@ def export_v2(self, error_log_key="errors") res = res[mutation_name] task_id = res["taskId"] - user: User = self.client.get_user() - tasks: List[Task] = list( - user.created_tasks(where=Entity.Task.uid == task_id)) - # Cache user in a private variable as the relationship can't be - # resolved due to server-side limitations (see Task.created_by) - # for more info. - if len(tasks) != 1: - raise ResourceNotFoundError(Entity.Task, task_id) - task: Task = tasks[0] - task._user = user - return task + return Task.get_task(self.client, task_id) diff --git a/labelbox/schema/export_task.py b/labelbox/schema/export_task.py new file mode 100644 index 000000000..eecb4cb1e --- /dev/null +++ b/labelbox/schema/export_task.py @@ -0,0 +1,605 @@ +from abc import ABC, abstractmethod +from dataclasses import dataclass +from enum import Enum +from functools import lru_cache +from pathlib import Path +from typing import ( + Callable, + Generic, + Iterator, + List, + Optional, + Tuple, + TypeVar, + Union, + TYPE_CHECKING, + overload, +) + +import requests +from pydantic import BaseModel + +from labelbox.schema.task import Task +from labelbox.utils import _CamelCaseMixin + +if TYPE_CHECKING: + from labelbox import Client + +OutputT = TypeVar("OutputT") + + +class StreamType(Enum): + """The type of the stream.""" + + RESULT = "RESULT" + ERRORS = "ERRORS" + + +class Range(_CamelCaseMixin, BaseModel): # pylint: disable=too-few-public-methods + """Represents a range.""" + + start: int + end: int + + +class _MetadataHeader(_CamelCaseMixin, BaseModel): # pylint: disable=too-few-public-methods + total_size: int + total_lines: int + + +class _MetadataFileInfo(_CamelCaseMixin, BaseModel): # pylint: disable=too-few-public-methods + offsets: Range + lines: Range + file: str + + +@dataclass +class _TaskContext: + client: "Client" + task_id: str + stream_type: StreamType + metadata_header: _MetadataHeader + + +class Converter(ABC, Generic[OutputT]): + """Abstract class for transforming data.""" + + @dataclass + class ConverterInputArgs: + """Input for the converter.""" + + ctx: _TaskContext + file_info: _MetadataFileInfo + raw_data: str + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + return False + + @abstractmethod + def convert(self, input_args: ConverterInputArgs) -> Iterator[OutputT]: + """Converts the data. + Returns an iterator that yields the converted data. + + Args: + current_offset: The global offset indicating the position of the data within the + exported files. It represents a cumulative offset across multiple + files. + raw_data: The raw data to convert. + + Yields: + Iterator[OutputT]: The converted data. + """ + + +class JsonConverter(Converter["JsonConverter.Output"]): # pylint: disable=too-few-public-methods + """Converts JSON data.""" + + @dataclass + class Output: + """Output with the JSON string.""" + + current_offset: int + current_line: int + json_str: str + + def _find_json_object_offsets(self, data: str) -> List[Tuple[int, int]]: + object_offsets: List[Tuple[int, int]] = [] + stack = [] + current_object_start = None + + for index, char in enumerate(data): + if char == "{": + stack.append(char) + if len(stack) == 1: + current_object_start = index + # we need to account for scenarios where data lands in the middle of an object + # and the object is not the last one in the data + if index > 0 and data[index - + 1] == "\n" and not object_offsets: + object_offsets.append((0, index - 1)) + elif char == "}" and stack: + stack.pop() + if len(stack) == 0 and data[ + index + 1] == "\n" and current_object_start is not None: + object_offsets.append((current_object_start, index + 1)) + current_object_start = None + + # we also need to account for scenarios where data lands in the middle of the last object + return object_offsets if object_offsets else [(0, len(data) - 1)] + + def convert( + self, input_args: Converter.ConverterInputArgs + ) -> Iterator["JsonConverter.Output"]: + current_offset, current_line, raw_data = ( + input_args.file_info.offsets.start, + input_args.file_info.lines.start, + input_args.raw_data, + ) + offsets = self._find_json_object_offsets(raw_data) + for line, (offset_start, offset_end) in enumerate(offsets): + yield JsonConverter.Output( + current_offset + offset_start, + current_line + line, + json_str=raw_data[offset_start:offset_end + 1].strip(), + ) + + +class FileConverter(Converter["FileConverter.Output"]): + """Converts data to a file.""" + + @dataclass + class Output: + """Output with statistics about the written file.""" + + file_path: Path + total_size: int + total_lines: int + current_offset: int + current_line: int + bytes_written: int + + def __init__(self, file_path: str) -> None: + super().__init__() + self._file = None + self._file_path = file_path + + def __enter__(self): + self._file = open(self._file_path, "w", encoding="utf-8") + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self._file.close() + return False + + def convert( + self, input_args: Converter.ConverterInputArgs + ) -> Iterator["FileConverter.Output"]: + # appends data to the file + assert self._file is not None + self._file.write(input_args.raw_data) + yield FileConverter.Output( + file_path=Path(self._file_path), + total_size=input_args.ctx.metadata_header.total_size, + total_lines=input_args.ctx.metadata_header.total_lines, + current_offset=input_args.file_info.offsets.start, + current_line=input_args.file_info.lines.start, + bytes_written=len(input_args.raw_data), + ) + + +class FileRetrieverStrategy(ABC): # pylint: disable=too-few-public-methods + """Abstract class for retrieving files.""" + + def __init__(self, ctx: _TaskContext) -> None: + super().__init__() + self._ctx = ctx + + @abstractmethod + def get_next_chunk(self) -> Optional[Tuple[_MetadataFileInfo, str]]: + """Retrieves the file.""" + + def _get_file_content( + self, query: str, variables: dict, + result_field_name: str) -> Tuple[_MetadataFileInfo, str]: + """Runs the query.""" + res = self._ctx.client.execute(query, variables, error_log_key="errors") + res = res["task"][result_field_name] + file_info = _MetadataFileInfo(**res) if res else None + if not file_info: + raise ValueError( + f"Task {self._ctx.task_id} does not have a metadata file for the " + f"{self._ctx.stream_type.value} stream") + response = requests.get(file_info.file, timeout=30) + response.raise_for_status() + assert len( + response.text + ) == file_info.offsets.end - file_info.offsets.start + 1, ( + f"expected {file_info.offsets.end - file_info.offsets.start + 1} bytes, " + f"got {len(response.text)} bytes") + return file_info, response.text + + +class FileRetrieverByOffset(FileRetrieverStrategy): # pylint: disable=too-few-public-methods + """Retrieves files by offset.""" + + def __init__( + self, + ctx: _TaskContext, + offset: int, + ) -> None: + super().__init__(ctx) + self._current_offset = offset + self._current_line: Optional[int] = None + if self._current_offset >= self._ctx.metadata_header.total_size: + raise ValueError( + f"offset is out of range, max offset is {self._ctx.metadata_header.total_size - 1}" + ) + + def _find_line_at_offset(self, file_content: str, + target_offset: int) -> int: + stack = [] + line_number = 0 + + for index, char in enumerate(file_content): + if char == "{": + stack.append(char) + if len(stack) == 1 and index > 0: + line_number += 1 + elif char == "}" and stack: + stack.pop() + + if index == target_offset: + break + + return line_number + + def get_next_chunk(self) -> Optional[Tuple[_MetadataFileInfo, str]]: + if self._current_offset >= self._ctx.metadata_header.total_size: + return None + query = ( + f"query GetExportFileFromOffsetPyApi" + f"($where: WhereUniqueIdInput, $streamType: TaskStreamType!, $offset: UInt64!)" + f"{{task(where: $where)" + f"{{{'exportFileFromOffset'}(streamType: $streamType, offset: $offset)" + f"{{offsets {{start end}} lines {{start end}} file}}" + f"}}}}") + variables = { + "where": { + "id": self._ctx.task_id + }, + "streamType": self._ctx.stream_type.value, + "offset": str(self._current_offset), + } + file_info, file_content = self._get_file_content( + query, variables, "exportFileFromOffset") + if self._current_line is None: + self._current_line = self._find_line_at_offset( + file_content, self._current_offset - file_info.offsets.start) + self._current_line += file_info.lines.start + file_content = file_content[self._current_offset - + file_info.offsets.start:] + file_info.offsets.start = self._current_offset + file_info.lines.start = self._current_line + self._current_offset = file_info.offsets.end + 1 + self._current_line = file_info.lines.end + 1 + return file_info, file_content + + +class FileRetrieverByLine(FileRetrieverStrategy): # pylint: disable=too-few-public-methods + """Retrieves files by line.""" + + def __init__( + self, + ctx: _TaskContext, + line: int, + ) -> None: + super().__init__(ctx) + self._current_line = line + self._current_offset: Optional[int] = None + if self._current_line >= self._ctx.metadata_header.total_lines: + raise ValueError( + f"line is out of range, max line is {self._ctx.metadata_header.total_lines - 1}" + ) + + def _find_offset_of_line(self, file_content: str, target_line: int): + start_offset = None + stack = [] + line_number = 0 + + for index, char in enumerate(file_content): + if char == "{": + stack.append(char) + if len(stack) == 1: + if line_number == target_line: + start_offset = index + line_number += 1 + elif char == "}" and stack: + stack.pop() + + if line_number > target_line: + break + + return start_offset + + def get_next_chunk(self) -> Optional[Tuple[_MetadataFileInfo, str]]: + if self._current_line >= self._ctx.metadata_header.total_lines: + return None + query = ( + f"query GetExportFileFromLinePyApi" + f"($where: WhereUniqueIdInput, $streamType: TaskStreamType!, $line: UInt64!)" + f"{{task(where: $where)" + f"{{{'exportFileFromLine'}(streamType: $streamType, line: $line)" + f"{{offsets {{start end}} lines {{start end}} file}}" + f"}}}}") + variables = { + "where": { + "id": self._ctx.task_id + }, + "streamType": self._ctx.stream_type.value, + "line": self._current_line, + } + file_info, file_content = self._get_file_content( + query, variables, "exportFileFromLine") + if self._current_offset is None: + self._current_offset = self._find_offset_of_line( + file_content, self._current_line - file_info.lines.start) + self._current_offset += file_info.offsets.start + file_content = file_content[self._current_offset - + file_info.offsets.start:] + file_info.offsets.start = self._current_offset + file_info.lines.start = self._current_line + self._current_offset = file_info.offsets.end + 1 + self._current_line = file_info.lines.end + 1 + return file_info, file_content + + +class _Reader(ABC): # pylint: disable=too-few-public-methods + """Abstract class for reading data from a source.""" + + @abstractmethod + def set_retrieval_strategy(self, strategy: FileRetrieverStrategy) -> None: + """Sets the retrieval strategy.""" + + @abstractmethod + def read(self) -> Iterator[Tuple[_MetadataFileInfo, str]]: + """Reads data from the source.""" + + +class _MultiGCSFileReader(_Reader): # pylint: disable=too-few-public-methods + """Reads data from multiple GCS files in a seamless way.""" + + def __init__(self): + super().__init__() + self._retrieval_strategy = None + + def set_retrieval_strategy(self, strategy: FileRetrieverStrategy) -> None: + """Sets the retrieval strategy.""" + self._retrieval_strategy = strategy + + def read(self) -> Iterator[Tuple[_MetadataFileInfo, str]]: + result = self._retrieval_strategy.get_next_chunk() + while result: + file_info, raw_data = result + yield file_info, raw_data + result = self._retrieval_strategy.get_next_chunk() + + +class Stream(Generic[OutputT]): + """Streams data from a Reader.""" + + def __init__( + self, + ctx: _TaskContext, + reader: _Reader, + converter: Converter, + ): + self._ctx = ctx + self._reader = reader + self._converter = converter + # default strategy is to retrieve files by offset, starting from 0 + self.with_offset(0) + + def __iter__(self): + yield from self._fetch() + + def _fetch(self,) -> Iterator[OutputT]: + """Fetches the result data. + Returns an iterator that yields the offset and the data. + """ + if self._ctx.metadata_header.total_size is None: + return + + stream = self._reader.read() + with self._converter as converter: + for file_info, raw_data in stream: + for output in converter.convert( + Converter.ConverterInputArgs(self._ctx, file_info, + raw_data)): + yield output + + def with_offset(self, offset: int) -> "Stream[OutputT]": + """Sets the offset for the stream.""" + self._reader.set_retrieval_strategy( + FileRetrieverByOffset(self._ctx, offset)) + return self + + def with_line(self, line: int) -> "Stream[OutputT]": + """Sets the line number for the stream.""" + self._reader.set_retrieval_strategy(FileRetrieverByLine( + self._ctx, line)) + return self + + def start( + self, + stream_handler: Optional[Callable[[OutputT], None]] = None) -> None: + """Starts streaming the result data. + Calls the stream_handler for each result. + """ + # this calls the __iter__ method, which in turn calls the _fetch method + for output in self: + if stream_handler: + stream_handler(output) + + +class ExportTask: + """ + An adapter class for working with task objects, providing extended functionality + and convenient access to task-related information. + + This class wraps a `Task` object, allowing you to interact with tasks of this type. + It offers methods to retrieve task results, errors, and metadata, as well as properties + for accessing task details such as UID, status, and creation time. + """ + + class TaskNotReadyException(Exception): + """Raised when the task is not ready yet.""" + + def __init__(self, task: Task) -> None: + self._task = task + + def __repr__(self): + return self._task.__repr__() + + def __str__(self): + return self._task.__str__() + + def __eq__(self, other): + return self._task.__eq__(other) + + def __hash__(self): + return self._task.__hash__() + + @property + def uid(self): + """Returns the uid of the task.""" + return self._task.uid + + @property + def deleted(self): + """Returns whether the task is deleted.""" + return self._task.deleted + + @property + def updated_at(self): + """Returns the last time the task was updated.""" + return self._task.updated_at + + @property + def created_at(self): + """Returns the time the task was created.""" + return self._task.created_at + + @property + def name(self): + """Returns the name of the task.""" + return self._task.name + + @property + def status(self): + """Returns the status of the task.""" + return self._task.status + + @property + def completion_percentage(self): + """Returns the completion percentage of the task.""" + return self._task.completion_percentage + + @property + def type(self): + """Returns the type of the task.""" + return self._task.type + + def wait_till_done(self, timeout_seconds: int = 300) -> None: + """Waits until the task is done.""" + return self._task.wait_till_done(timeout_seconds) + + @staticmethod + @lru_cache(maxsize=5) + def _get_metadata_header( + client, task_id: str, + stream_type: StreamType) -> Union[_MetadataHeader, None]: + """Returns the total file size for a specific task.""" + query = (f"query GetExportMetadataHeaderPyApi" + f"($where: WhereUniqueIdInput, $streamType: TaskStreamType!)" + f"{{task(where: $where)" + f"{{{'exportMetadataHeader'}(streamType: $streamType)" + f"{{totalSize totalLines}}" + f"}}}}") + variables = {"where": {"id": task_id}, "streamType": stream_type.value} + res = client.execute(query, variables, error_log_key="errors") + res = res["task"]["exportMetadataHeader"] + return _MetadataHeader(**res) if res else None + + @staticmethod + def get_total_file_size(client, task_id: str, + stream_type: StreamType) -> Union[int, None]: + """Returns the total file size for a specific task.""" + header = ExportTask._get_metadata_header(client, task_id, stream_type) + return header.total_size if header else None + + @staticmethod + def get_total_lines(client, task_id: str, + stream_type: StreamType) -> Union[int, None]: + """Returns the total file size for a specific task.""" + header = ExportTask._get_metadata_header(client, task_id, stream_type) + return header.total_lines if header else None + + def has_result(self) -> bool: + """Returns whether the task has a result.""" + total_size = ExportTask.get_total_file_size(self._task.client, + self._task.uid, + StreamType.RESULT) + return total_size is not None and total_size > 0 + + def has_errors(self) -> bool: + """Returns whether the task has errors.""" + total_size = ExportTask.get_total_file_size(self._task.client, + self._task.uid, + StreamType.ERRORS) + return total_size is not None and total_size > 0 + + @overload + def get_stream( + self, + converter: JsonConverter = JsonConverter(), + stream_type: StreamType = StreamType.RESULT, + ) -> Stream[JsonConverter.Output]: + """Overload for getting the right typing hints when using a JsonConverter.""" + + @overload + def get_stream( + self, + converter: FileConverter, + stream_type: StreamType = StreamType.RESULT, + ) -> Stream[FileConverter.Output]: + """Overload for getting the right typing hints when using a FileConverter.""" + + def get_stream( + self, + converter: Converter = JsonConverter(), + stream_type: StreamType = StreamType.RESULT, + ) -> Stream: + """Returns the result of the task.""" + if not self._task.status in ["COMPLETE", "FAILED"]: + raise ExportTask.TaskNotReadyException("Task is not ready yet") + + metadata_header = self._get_metadata_header(self._task.client, + self._task.uid, stream_type) + if metadata_header is None: + raise ValueError( + f"Task {self._task.uid} does not have a {stream_type.value} stream" + ) + return Stream( + _TaskContext(self._task.client, self._task.uid, stream_type, + metadata_header), + _MultiGCSFileReader(), + converter, + ) + + @staticmethod + def get_task(client, task_id): + """Returns the task with the given id.""" + return ExportTask(Task.get_task(client, task_id)) diff --git a/labelbox/schema/model_run.py b/labelbox/schema/model_run.py index 3988daa59..8ad29d884 100644 --- a/labelbox/schema/model_run.py +++ b/labelbox/schema/model_run.py @@ -14,6 +14,7 @@ from labelbox.orm.model import Field, Relationship, Entity from labelbox.orm.db_object import DbObject, experimental from labelbox.schema.export_params import ModelRunExportParams +from labelbox.schema.export_task import ExportTask from labelbox.schema.task import Task from labelbox.schema.user import User @@ -503,20 +504,29 @@ def export_labels( self.uid) time.sleep(sleep_time) - """ - Creates a model run export task with the given params and returns the task. - - >>> export_task = export_v2("my_export_task", params={"media_attributes": True}) - - """ + def export(self, + task_name: Optional[str] = None, + params: Optional[ModelRunExportParams] = None) -> ExportTask: + task = self.export_v2(task_name, params, True) + return ExportTask(task) - def export_v2(self, - task_name: Optional[str] = None, - params: Optional[ModelRunExportParams] = None) -> Task: + def export_v2( + self, + task_name: Optional[str] = None, + params: Optional[ModelRunExportParams] = None, + streamable: bool = False, + ) -> Task: + """ + Creates a model run export task with the given params and returns the task. + + >>> export_task = export_v2("my_export_task", params={"media_attributes": True}) + + """ mutation_name = "exportDataRowsInModelRun" - create_task_query_str = """mutation exportDataRowsInModelRunPyApi($input: ExportDataRowsInModelRunInput!){ - %s(input: $input) {taskId} } - """ % (mutation_name) + create_task_query_str = ( + f"mutation {mutation_name}PyApi" + f"($input: ExportDataRowsInModelRunInput!)" + f"{{{mutation_name}(input: $input){{taskId}}}}") _params = params or ModelRunExportParams() @@ -538,6 +548,7 @@ def export_v2(self, "includePredictions": _params.get('predictions', False), }, + "streamable": streamable } } res = self.client.execute(create_task_query_str, @@ -545,17 +556,7 @@ def export_v2(self, error_log_key="errors") res = res[mutation_name] task_id = res["taskId"] - user: User = self.client.get_user() - tasks: List[Task] = list( - user.created_tasks(where=Entity.Task.uid == task_id)) - # Cache user in a private variable as the relationship can't be - # resolved due to server-side limitations (see Task.created_by) - # for more info. - if len(tasks) != 1: - raise ResourceNotFoundError(Entity.Task, task_id) - task: Task = tasks[0] - task._user = user - return task + return Task.get_task(self.client, task_id) class ModelRunDataRow(DbObject): diff --git a/labelbox/schema/project.py b/labelbox/schema/project.py index 0ccf8b37b..7cf16ec28 100644 --- a/labelbox/schema/project.py +++ b/labelbox/schema/project.py @@ -24,6 +24,7 @@ from labelbox.schema.data_row import DataRow from labelbox.schema.export_filters import ProjectExportFilters, validate_datetime, build_filters from labelbox.schema.export_params import ProjectExportParams +from labelbox.schema.export_task import ExportTask from labelbox.schema.media_type import MediaType from labelbox.schema.queue_mode import QueueMode from labelbox.schema.resource_tag import ResourceTag @@ -415,10 +416,23 @@ def _string_from_dict(dictionary: dict, value_with_quotes=False) -> str: self.uid) time.sleep(sleep_time) - def export_v2(self, - task_name: Optional[str] = None, - filters: Optional[ProjectExportFilters] = None, - params: Optional[ProjectExportParams] = None) -> Task: + def export( + self, + task_name: Optional[str] = None, + filters: Optional[ProjectExportFilters] = None, + params: Optional[ProjectExportParams] = None, + ) -> ExportTask: + """Creates a project export task with the given params and returns the task.""" + task = self.export_v2(task_name, filters, params, True) + return ExportTask(task) + + def export_v2( + self, + task_name: Optional[str] = None, + filters: Optional[ProjectExportFilters] = None, + params: Optional[ProjectExportParams] = None, + streamable: bool = False, + ) -> Task: """ Creates a project export task with the given params and returns the task. @@ -460,9 +474,10 @@ def export_v2(self, }) mutation_name = "exportDataRowsInProject" - create_task_query_str = """mutation exportDataRowsInProjectPyApi($input: ExportDataRowsInProjectInput!){ - %s(input: $input) {taskId} } - """ % (mutation_name) + create_task_query_str = ( + f"mutation {mutation_name}PyApi" + f"($input: ExportDataRowsInProjectInput!)" + f"{{{mutation_name}(input: $input){{taskId}}}}") media_type_override = _params.get('media_type_override', None) query_params: Dict[str, Any] = { @@ -494,6 +509,7 @@ def export_v2(self, "includeInterpolatedFrames": _params.get('interpolated_frames', False), }, + "streamable": streamable, } } @@ -505,17 +521,7 @@ def export_v2(self, error_log_key="errors") res = res[mutation_name] task_id = res["taskId"] - user: User = self.client.get_user() - tasks: List[Task] = list( - user.created_tasks(where=Entity.Task.uid == task_id)) - # Cache user in a private variable as the relationship can't be - # resolved due to server-side limitations (see Task.created_by) - # for more info. - if len(tasks) != 1: - raise ResourceNotFoundError(Entity.Task, task_id) - task: Task = tasks[0] - task._user = user - return task + return Task.get_task(self.client, task_id) def export_issues(self, status=None) -> str: """ Calls the server-side Issues exporting that diff --git a/labelbox/schema/slice.py b/labelbox/schema/slice.py index 3364a878c..6eef46aff 100644 --- a/labelbox/schema/slice.py +++ b/labelbox/schema/slice.py @@ -4,6 +4,7 @@ from labelbox.orm.model import Entity, Field from labelbox.pagination import PaginatedCollection from labelbox.schema.export_params import CatalogExportParams, validate_catalog_export_params +from labelbox.schema.export_task import ExportTask from labelbox.schema.task import Task from labelbox.schema.user import User @@ -64,9 +65,18 @@ def get_data_row_ids(self) -> PaginatedCollection: obj_class=lambda _, data_row_id: data_row_id, cursor_path=['getDataRowIdsBySavedQuery', 'pageInfo', 'endCursor']) - def export_v2(self, - task_name: Optional[str] = None, - params: Optional[CatalogExportParams] = None) -> Task: + def export(self, + task_name: Optional[str] = None, + params: Optional[CatalogExportParams] = None) -> ExportTask: + task = self.export_v2(task_name, params, True) + return ExportTask(task) + + def export_v2( + self, + task_name: Optional[str] = None, + params: Optional[CatalogExportParams] = None, + streamable: bool = False, + ) -> Task: """ Creates a slice export task with the given params and returns the task. >>> slice = client.get_catalog_slice("SLICE_ID") @@ -92,9 +102,10 @@ def export_v2(self, validate_catalog_export_params(_params) mutation_name = "exportDataRowsInSlice" - create_task_query_str = """mutation exportDataRowsInSlicePyApi($input: ExportDataRowsInSliceInput!){ - %s(input: $input) {taskId} } - """ % (mutation_name) + create_task_query_str = ( + f"mutation {mutation_name}PyApi" + f"($input: ExportDataRowsInSliceInput!)" + f"{{{mutation_name}(input: $input){{taskId}}}}") media_type_override = _params.get('media_type_override', None) query_params = { @@ -126,6 +137,7 @@ def export_v2(self, "modelRunIds": _params.get('model_run_ids', None), }, + "streamable": streamable, } } @@ -134,17 +146,7 @@ def export_v2(self, error_log_key="errors") res = res[mutation_name] task_id = res["taskId"] - user: User = self.client.get_user() - tasks: List[Task] = list( - user.created_tasks(where=Entity.Task.uid == task_id)) - # Cache user in a private variable as the relationship can't be - # resolved due to server-side limitations (see Task.created_by) - # for more info. - if len(tasks) != 1: - raise ResourceNotFoundError(Entity.Task, task_id) - task: Task = tasks[0] - task._user = user - return task + return Task.get_task(self.client, task_id) class ModelSlice(Slice): diff --git a/labelbox/schema/task.py b/labelbox/schema/task.py index 410633bb7..db7c4dd00 100644 --- a/labelbox/schema/task.py +++ b/labelbox/schema/task.py @@ -62,10 +62,10 @@ def wait_till_done(self, timeout_seconds: int = 300) -> None: to update the task attributes. Args: - timeout_seconds (float): Maximum time this method can block, in seconds. Defaults to one minute. + timeout_seconds (float): Maximum time this method can block, in seconds. Defaults to five minutes. """ check_frequency = 2 # frequency of checking, in seconds - while True: + while timeout_seconds: if self.status != "IN_PROGRESS": # self.errors fetches the error content. # This first condition prevents us from downloading the content for v2 exports @@ -74,13 +74,10 @@ def wait_till_done(self, timeout_seconds: int = 300) -> None: "There are errors present. Please look at `task.errors` for more details" ) return - sleep_time_seconds = min(check_frequency, timeout_seconds) - logger.debug("Task.wait_till_done sleeping for %.2f seconds" % - sleep_time_seconds) - if sleep_time_seconds <= 0: - break + logger.debug("Task.wait_till_done sleeping for %d seconds", + check_frequency) + time.sleep(check_frequency) timeout_seconds -= check_frequency - time.sleep(sleep_time_seconds) self.refresh() @property From a64970d12071b46a45b241ce67a1c4e028454bfd Mon Sep 17 00:00:00 2001 From: Klaus Opreschko Date: Tue, 14 Nov 2023 14:50:20 -0700 Subject: [PATCH 2/3] Add suggested changes --- labelbox/__init__.py | 2 +- labelbox/schema/data_row.py | 38 ++++++++++++--- labelbox/schema/dataset.py | 19 +++++++- labelbox/schema/export_task.py | 84 ++++++++++++++++++---------------- labelbox/schema/model_run.py | 8 +++- labelbox/schema/project.py | 20 +++++++- labelbox/schema/slice.py | 11 ++++- labelbox/schema/task.py | 2 +- 8 files changed, 132 insertions(+), 52 deletions(-) diff --git a/labelbox/__init__.py b/labelbox/__init__.py index 0a4bc77e3..2dffbfcb5 100644 --- a/labelbox/__init__.py +++ b/labelbox/__init__.py @@ -16,7 +16,7 @@ from labelbox.schema.user import User from labelbox.schema.organization import Organization from labelbox.schema.task import Task -from labelbox.schema.export_task import StreamType, ExportTask, JsonConverter, FileConverter +from labelbox.schema.export_task import StreamType, ExportTask, JsonConverter, JsonConverterOutput, FileConverter, FileConverterOutput from labelbox.schema.labeling_frontend import LabelingFrontend, LabelingFrontendOptions from labelbox.schema.asset_attachment import AssetAttachment from labelbox.schema.webhook import Webhook diff --git a/labelbox/schema/data_row.py b/labelbox/schema/data_row.py index 9956697aa..ecc0f0255 100644 --- a/labelbox/schema/data_row.py +++ b/labelbox/schema/data_row.py @@ -160,20 +160,46 @@ def create_attachment(self, @staticmethod def export( client: "Client", - data_rows: List[Union[str, "DataRow"]] = None, - global_keys: List[str] = None, + data_rows: Optional[List[Union[str, "DataRow"]]] = None, + global_keys: Optional[List[str]] = None, task_name: Optional[str] = None, params: Optional[CatalogExportParams] = None, ) -> ExportTask: - task = DataRow.export_v2(client, data_rows, global_keys, task_name, - params, True) + """ + Creates a data rows export task with the given list, params and returns the task. + Args: + client (Client): client to use to make the export request + data_rows (list of DataRow or str): list of data row objects or data row ids to export + task_name (str): name of remote task + params (CatalogExportParams): export params + + >>> dataset = client.get_dataset(DATASET_ID) + >>> task = DataRow.export( + >>> data_rows=[data_row.uid for data_row in dataset.data_rows.list()], + >>> # or a list of DataRow objects: data_rows = data_set.data_rows.list() + >>> # or a list of global_keys=["global_key_1", "global_key_2"], + >>> # Note that exactly one of: data_rows or global_keys parameters can be passed in at a time + >>> # and if data rows ids is present, global keys will be ignored + >>> params={ + >>> "performance_details": False, + >>> "label_details": True + >>> }) + >>> task.wait_till_done() + >>> task.result + """ + task = DataRow.export_v2(client, + data_rows, + global_keys, + task_name, + params, + streamable=True) return ExportTask(task) @staticmethod def export_v2( client: "Client", - data_rows: List[Union[str, "DataRow"]] = None, - global_keys: List[str] = None, + data_rows: Optional[List[Union[str, "DataRow"]]] = None, + global_keys: Optional[List[str]] = None, task_name: Optional[str] = None, params: Optional[CatalogExportParams] = None, streamable: bool = False, diff --git a/labelbox/schema/dataset.py b/labelbox/schema/dataset.py index fd9cc105d..3163b5488 100644 --- a/labelbox/schema/dataset.py +++ b/labelbox/schema/dataset.py @@ -608,7 +608,24 @@ def export( filters: Optional[DatasetExportFilters] = None, params: Optional[CatalogExportParams] = None, ) -> ExportTask: - task = self.export_v2(task_name, filters, params, True) + """ + Creates a dataset export task with the given params and returns the task. + + >>> dataset = client.get_dataset(DATASET_ID) + >>> task = dataset.export( + >>> filters={ + >>> "last_activity_at": ["2000-01-01 00:00:00", "2050-01-01 00:00:00"], + >>> "label_created_at": ["2000-01-01 00:00:00", "2050-01-01 00:00:00"], + >>> "data_row_ids": [DATA_ROW_ID_1, DATA_ROW_ID_2, ...] # or global_keys: [DATA_ROW_GLOBAL_KEY_1, DATA_ROW_GLOBAL_KEY_2, ...] + >>> }, + >>> params={ + >>> "performance_details": False, + >>> "label_details": True + >>> }) + >>> task.wait_till_done() + >>> task.result + """ + task = self.export_v2(task_name, filters, params, streamable=True) return ExportTask(task) def export_v2( diff --git a/labelbox/schema/export_task.py b/labelbox/schema/export_task.py index eecb4cb1e..9a3888fc3 100644 --- a/labelbox/schema/export_task.py +++ b/labelbox/schema/export_task.py @@ -2,6 +2,7 @@ from dataclasses import dataclass from enum import Enum from functools import lru_cache +from io import TextIOWrapper from pathlib import Path from typing import ( Callable, @@ -85,8 +86,8 @@ def convert(self, input_args: ConverterInputArgs) -> Iterator[OutputT]: Args: current_offset: The global offset indicating the position of the data within the - exported files. It represents a cumulative offset across multiple - files. + exported files. It represents a cumulative offset in characters + across multiple files. raw_data: The raw data to convert. Yields: @@ -94,16 +95,17 @@ def convert(self, input_args: ConverterInputArgs) -> Iterator[OutputT]: """ -class JsonConverter(Converter["JsonConverter.Output"]): # pylint: disable=too-few-public-methods - """Converts JSON data.""" +@dataclass +class JsonConverterOutput: + """Output with the JSON string.""" - @dataclass - class Output: - """Output with the JSON string.""" + current_offset: int + current_line: int + json_str: str - current_offset: int - current_line: int - json_str: str + +class JsonConverter(Converter[JsonConverterOutput]): # pylint: disable=too-few-public-methods + """Converts JSON data.""" def _find_json_object_offsets(self, data: str) -> List[Tuple[int, int]]: object_offsets: List[Tuple[int, int]] = [] @@ -132,7 +134,7 @@ def _find_json_object_offsets(self, data: str) -> List[Tuple[int, int]]: def convert( self, input_args: Converter.ConverterInputArgs - ) -> Iterator["JsonConverter.Output"]: + ) -> Iterator[JsonConverterOutput]: current_offset, current_line, raw_data = ( input_args.file_info.offsets.start, input_args.file_info.lines.start, @@ -140,30 +142,31 @@ def convert( ) offsets = self._find_json_object_offsets(raw_data) for line, (offset_start, offset_end) in enumerate(offsets): - yield JsonConverter.Output( + yield JsonConverterOutput( current_offset + offset_start, current_line + line, json_str=raw_data[offset_start:offset_end + 1].strip(), ) -class FileConverter(Converter["FileConverter.Output"]): - """Converts data to a file.""" +@dataclass +class FileConverterOutput: + """Output with statistics about the written file.""" + + file_path: Path + total_size: int + total_lines: int + current_offset: int + current_line: int + bytes_written: int - @dataclass - class Output: - """Output with statistics about the written file.""" - file_path: Path - total_size: int - total_lines: int - current_offset: int - current_line: int - bytes_written: int +class FileConverter(Converter[FileConverterOutput]): + """Converts data to a file.""" def __init__(self, file_path: str) -> None: super().__init__() - self._file = None + self._file: Optional[TextIOWrapper] = None self._file_path = file_path def __enter__(self): @@ -171,16 +174,17 @@ def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): - self._file.close() + if self._file: + self._file.close() return False def convert( self, input_args: Converter.ConverterInputArgs - ) -> Iterator["FileConverter.Output"]: + ) -> Iterator[FileConverterOutput]: # appends data to the file assert self._file is not None self._file.write(input_args.raw_data) - yield FileConverter.Output( + yield FileConverterOutput( file_path=Path(self._file_path), total_size=input_args.ctx.metadata_header.total_size, total_lines=input_args.ctx.metadata_header.total_lines, @@ -380,6 +384,8 @@ def set_retrieval_strategy(self, strategy: FileRetrieverStrategy) -> None: self._retrieval_strategy = strategy def read(self) -> Iterator[Tuple[_MetadataFileInfo, str]]: + if not self._retrieval_strategy: + raise ValueError("retrieval strategy not set") result = self._retrieval_strategy.get_next_chunk() while result: file_info, raw_data = result @@ -533,32 +539,32 @@ def _get_metadata_header( res = res["task"]["exportMetadataHeader"] return _MetadataHeader(**res) if res else None - @staticmethod - def get_total_file_size(client, task_id: str, + def get_total_file_size(self, client, task_id: str, stream_type: StreamType) -> Union[int, None]: """Returns the total file size for a specific task.""" + if not self._task.status in ["COMPLETE", "FAILED"]: + raise ExportTask.TaskNotReadyException("Task is not ready yet") header = ExportTask._get_metadata_header(client, task_id, stream_type) return header.total_size if header else None - @staticmethod - def get_total_lines(client, task_id: str, + def get_total_lines(self, client, task_id: str, stream_type: StreamType) -> Union[int, None]: """Returns the total file size for a specific task.""" + if not self._task.status in ["COMPLETE", "FAILED"]: + raise ExportTask.TaskNotReadyException("Task is not ready yet") header = ExportTask._get_metadata_header(client, task_id, stream_type) return header.total_lines if header else None def has_result(self) -> bool: """Returns whether the task has a result.""" - total_size = ExportTask.get_total_file_size(self._task.client, - self._task.uid, - StreamType.RESULT) + total_size = self.get_total_file_size(self._task.client, self._task.uid, + StreamType.RESULT) return total_size is not None and total_size > 0 def has_errors(self) -> bool: """Returns whether the task has errors.""" - total_size = ExportTask.get_total_file_size(self._task.client, - self._task.uid, - StreamType.ERRORS) + total_size = self.get_total_file_size(self._task.client, self._task.uid, + StreamType.ERRORS) return total_size is not None and total_size > 0 @overload @@ -566,7 +572,7 @@ def get_stream( self, converter: JsonConverter = JsonConverter(), stream_type: StreamType = StreamType.RESULT, - ) -> Stream[JsonConverter.Output]: + ) -> Stream[JsonConverterOutput]: """Overload for getting the right typing hints when using a JsonConverter.""" @overload @@ -574,7 +580,7 @@ def get_stream( self, converter: FileConverter, stream_type: StreamType = StreamType.RESULT, - ) -> Stream[FileConverter.Output]: + ) -> Stream[FileConverterOutput]: """Overload for getting the right typing hints when using a FileConverter.""" def get_stream( diff --git a/labelbox/schema/model_run.py b/labelbox/schema/model_run.py index 8ad29d884..622f50804 100644 --- a/labelbox/schema/model_run.py +++ b/labelbox/schema/model_run.py @@ -507,7 +507,13 @@ def export_labels( def export(self, task_name: Optional[str] = None, params: Optional[ModelRunExportParams] = None) -> ExportTask: - task = self.export_v2(task_name, params, True) + """ + Creates a model run export task with the given params and returns the task. + + >>> export_task = export("my_export_task", params={"media_attributes": True}) + + """ + task = self.export_v2(task_name, params, streamable=True) return ExportTask(task) def export_v2( diff --git a/labelbox/schema/project.py b/labelbox/schema/project.py index 7cf16ec28..55d0dc6a9 100644 --- a/labelbox/schema/project.py +++ b/labelbox/schema/project.py @@ -422,8 +422,24 @@ def export( filters: Optional[ProjectExportFilters] = None, params: Optional[ProjectExportParams] = None, ) -> ExportTask: - """Creates a project export task with the given params and returns the task.""" - task = self.export_v2(task_name, filters, params, True) + """ + Creates a project export task with the given params and returns the task. + + >>> task = project.export( + >>> filters={ + >>> "last_activity_at": ["2000-01-01 00:00:00", "2050-01-01 00:00:00"], + >>> "label_created_at": ["2000-01-01 00:00:00", "2050-01-01 00:00:00"], + >>> "data_row_ids": [DATA_ROW_ID_1, DATA_ROW_ID_2, ...] # or global_keys: [DATA_ROW_GLOBAL_KEY_1, DATA_ROW_GLOBAL_KEY_2, ...] + >>> "batch_ids": [BATCH_ID_1, BATCH_ID_2, ...] + >>> }, + >>> params={ + >>> "performance_details": False, + >>> "label_details": True + >>> }) + >>> task.wait_till_done() + >>> task.result + """ + task = self.export_v2(task_name, filters, params, streamable=True) return ExportTask(task) def export_v2( diff --git a/labelbox/schema/slice.py b/labelbox/schema/slice.py index 6eef46aff..651df56f9 100644 --- a/labelbox/schema/slice.py +++ b/labelbox/schema/slice.py @@ -68,7 +68,16 @@ def get_data_row_ids(self) -> PaginatedCollection: def export(self, task_name: Optional[str] = None, params: Optional[CatalogExportParams] = None) -> ExportTask: - task = self.export_v2(task_name, params, True) + """ + Creates a slice export task with the given params and returns the task. + >>> slice = client.get_catalog_slice("SLICE_ID") + >>> task = slice.export( + >>> params={"performance_details": False, "label_details": True} + >>> ) + >>> task.wait_till_done() + >>> task.result + """ + task = self.export_v2(task_name, params, streamable=True) return ExportTask(task) def export_v2( diff --git a/labelbox/schema/task.py b/labelbox/schema/task.py index db7c4dd00..762845128 100644 --- a/labelbox/schema/task.py +++ b/labelbox/schema/task.py @@ -65,7 +65,7 @@ def wait_till_done(self, timeout_seconds: int = 300) -> None: timeout_seconds (float): Maximum time this method can block, in seconds. Defaults to five minutes. """ check_frequency = 2 # frequency of checking, in seconds - while timeout_seconds: + while timeout_seconds > 0: if self.status != "IN_PROGRESS": # self.errors fetches the error content. # This first condition prevents us from downloading the content for v2 exports From 4cb0cc9a09ee5fab2b1d3d5fc42391e6456abece Mon Sep 17 00:00:00 2001 From: Klaus Opreschko Date: Tue, 14 Nov 2023 15:37:18 -0700 Subject: [PATCH 3/3] Add @experimental decorator to all export methods --- labelbox/schema/data_row.py | 3 ++- labelbox/schema/dataset.py | 3 ++- labelbox/schema/model_run.py | 1 + labelbox/schema/project.py | 3 ++- labelbox/schema/slice.py | 3 ++- 5 files changed, 9 insertions(+), 4 deletions(-) diff --git a/labelbox/schema/data_row.py b/labelbox/schema/data_row.py index ecc0f0255..088d04efa 100644 --- a/labelbox/schema/data_row.py +++ b/labelbox/schema/data_row.py @@ -4,7 +4,7 @@ from labelbox.exceptions import ResourceNotFoundError from labelbox.orm import query -from labelbox.orm.db_object import DbObject, Updateable, BulkDeletable +from labelbox.orm.db_object import DbObject, Updateable, BulkDeletable, experimental from labelbox.orm.model import Entity, Field, Relationship from labelbox.schema.data_row_metadata import DataRowMetadataField # type: ignore from labelbox.schema.export_filters import DatarowExportFilters, build_filters, validate_at_least_one_of_data_row_ids_or_global_keys @@ -157,6 +157,7 @@ def create_attachment(self, return Entity.AssetAttachment(self.client, res["createDataRowAttachment"]) + @experimental @staticmethod def export( client: "Client", diff --git a/labelbox/schema/dataset.py b/labelbox/schema/dataset.py index 3163b5488..69c37dfaf 100644 --- a/labelbox/schema/dataset.py +++ b/labelbox/schema/dataset.py @@ -17,7 +17,7 @@ from labelbox import pagination from labelbox.exceptions import InvalidQueryError, LabelboxError, ResourceNotFoundError, InvalidAttributeError from labelbox.orm.comparison import Comparison -from labelbox.orm.db_object import DbObject, Updateable, Deletable +from labelbox.orm.db_object import DbObject, Updateable, Deletable, experimental from labelbox.orm.model import Entity, Field, Relationship from labelbox.orm import query from labelbox.exceptions import MalformedQueryException @@ -602,6 +602,7 @@ def export_data_rows(self, self.uid) time.sleep(sleep_time) + @experimental def export( self, task_name: Optional[str] = None, diff --git a/labelbox/schema/model_run.py b/labelbox/schema/model_run.py index 622f50804..62b11db36 100644 --- a/labelbox/schema/model_run.py +++ b/labelbox/schema/model_run.py @@ -504,6 +504,7 @@ def export_labels( self.uid) time.sleep(sleep_time) + @experimental def export(self, task_name: Optional[str] = None, params: Optional[ModelRunExportParams] = None) -> ExportTask: diff --git a/labelbox/schema/project.py b/labelbox/schema/project.py index 55d0dc6a9..ca14824b8 100644 --- a/labelbox/schema/project.py +++ b/labelbox/schema/project.py @@ -16,7 +16,7 @@ ProcessingWaitTimeout, ResourceConflict, ResourceNotFoundError) from labelbox.orm import query -from labelbox.orm.db_object import DbObject, Deletable, Updateable +from labelbox.orm.db_object import DbObject, Deletable, Updateable, experimental from labelbox.orm.model import Entity, Field, Relationship from labelbox.pagination import PaginatedCollection from labelbox.schema.consensus_settings import ConsensusSettings @@ -416,6 +416,7 @@ def _string_from_dict(dictionary: dict, value_with_quotes=False) -> str: self.uid) time.sleep(sleep_time) + @experimental def export( self, task_name: Optional[str] = None, diff --git a/labelbox/schema/slice.py b/labelbox/schema/slice.py index 651df56f9..20e40f1c6 100644 --- a/labelbox/schema/slice.py +++ b/labelbox/schema/slice.py @@ -1,6 +1,6 @@ from typing import Optional, List from labelbox.exceptions import ResourceNotFoundError -from labelbox.orm.db_object import DbObject +from labelbox.orm.db_object import DbObject, experimental from labelbox.orm.model import Entity, Field from labelbox.pagination import PaginatedCollection from labelbox.schema.export_params import CatalogExportParams, validate_catalog_export_params @@ -65,6 +65,7 @@ def get_data_row_ids(self) -> PaginatedCollection: obj_class=lambda _, data_row_id: data_row_id, cursor_path=['getDataRowIdsBySavedQuery', 'pageInfo', 'endCursor']) + @experimental def export(self, task_name: Optional[str] = None, params: Optional[CatalogExportParams] = None) -> ExportTask: