From 28f099e49bbb902fb1c3d4bd92a4d77e2d75a4c3 Mon Sep 17 00:00:00 2001 From: Matt Sokoloff Date: Wed, 22 Feb 2023 16:11:38 -0600 Subject: [PATCH 1/4] exports v2 task support --- labelbox/schema/task.py | 35 ++++++++++++++++++------------- tests/integration/test_project.py | 12 ++--------- 2 files changed, 23 insertions(+), 24 deletions(-) diff --git a/labelbox/schema/task.py b/labelbox/schema/task.py index 31f3390ee..0bf99aec3 100644 --- a/labelbox/schema/task.py +++ b/labelbox/schema/task.py @@ -39,6 +39,7 @@ class Task(DbObject): status = Field.String("status") completion_percentage = Field.Float("completion_percentage") result_url = Field.String("result_url", "result") + errors_url = Field.String("errors_url", "errors") type = Field.String("type") _user: Optional["User"] = None @@ -65,7 +66,9 @@ def wait_till_done(self, timeout_seconds=300) -> None: check_frequency = 2 # frequency of checking, in seconds while True: if self.status != "IN_PROGRESS": - if self.errors is not None: + # self.errors fetches the error content. + # This first condition prevents us from downloading the content for v2 exports + if self.errors_url is not None or self.errors is not None: logger.warning( "There are errors present. Please look at `task.errors` for more details" ) @@ -83,14 +86,15 @@ def wait_till_done(self, timeout_seconds=300) -> None: def errors(self) -> Optional[Dict[str, Any]]: """ Fetch the error associated with an import task. """ - # TODO: We should handle error messages for export v2 tasks in the future. - if self.name != 'JSON Import': - return None - if self.status == "FAILED": - result = self._fetch_remote_json() - return result["error"] - elif self.status == "COMPLETE": - return self.failed_data_rows + if self.name == 'JSON Import': + if self.status == "FAILED": + result = self._fetch_remote_json() + return result["error"] + elif self.status == "COMPLETE": + return self.failed_data_rows + elif self.type == "export-data-rows": + if self.errors_url: + return self._fetch_remote_json(url=self.errors_url) return None @property @@ -122,12 +126,15 @@ def failed_data_rows(self) -> Optional[Dict[str, Any]]: return None @lru_cache() - def _fetch_remote_json(self) -> Dict[str, Any]: + def _fetch_remote_json(self, url: Optional[str] = None) -> Dict[str, Any]: """ Function for fetching and caching the result data. """ + if url is None: + # for backwards compatability + url = self.result_url - def download_result(): - response = requests.get(self.result_url) + def _download_file(url): + response = requests.get(url) response.raise_for_status() try: return response.json() @@ -145,11 +152,11 @@ def download_result(): ) if self.status != "IN_PROGRESS": - return download_result() + return _download_file(url) else: self.wait_till_done(timeout_seconds=600) if self.status == "IN_PROGRESS": raise ValueError( "Job status still in `IN_PROGRESS`. The result is not available. Call task.wait_till_done() with a larger timeout or contact support." ) - return download_result() + return _download_file(url) diff --git a/tests/integration/test_project.py b/tests/integration/test_project.py index 68483b8d8..17b661cea 100644 --- a/tests/integration/test_project.py +++ b/tests/integration/test_project.py @@ -60,17 +60,9 @@ def test_project_export_v2(configured_project_with_label): assert task.name == task_name task.wait_till_done() assert task.status == "COMPLETE" + assert task.errors is None - def download_result(result_url): - response = requests.get(result_url) - response.raise_for_status() - data = [json.loads(line) for line in response.text.splitlines()] - return data - - task_results = download_result(task.result_url) - - for task_result in task_results: - assert len(task_result['errors']) == 0 + for task_result in task.result: task_project = task_result['projects'][project.uid] task_project_label_ids_set = set( map(lambda prediction: prediction['id'], task_project['labels'])) From 4882e676ee7468186d6d7d8e23289e52b112e56d Mon Sep 17 00:00:00 2001 From: Matt Sokoloff Date: Wed, 22 Feb 2023 17:57:19 -0600 Subject: [PATCH 2/4] improve logic for downloading ndjson vs json --- labelbox/schema/task.py | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/labelbox/schema/task.py b/labelbox/schema/task.py index 00ddc46b1..f10f58089 100644 --- a/labelbox/schema/task.py +++ b/labelbox/schema/task.py @@ -138,33 +138,37 @@ def _fetch_remote_json(self, url: Optional[str] = None) -> Dict[str, Any]: # for backwards compatability url = self.result_url - def download_result(url): + def download_result(url, format: str): response = requests.get(url) response.raise_for_status() - try: + if format == 'json': return response.json() - except Exception as e: - pass - try: + elif format == 'ndjson': return ndjson.loads(response.text) - except Exception as e: - raise ValueError("Failed to parse task JSON/NDJSON result.") + else: + raise ValueError( + "Expected the result format to be either `ndjson` or `json`." + ) - if self.name != 'JSON Import' and self.type != 'export-data-rows': + if self.name == 'JSON Import': + format = 'json' + elif self.type == 'export-data-rows': + format = 'ndjson' + else: raise ValueError( "Task result is only supported for `JSON Import` and `export` tasks." " Download task.result_url manually to access the result for other tasks." ) if self.status != "IN_PROGRESS": - return download_result(url) + return download_result(url, format) else: self.wait_till_done(timeout_seconds=600) if self.status == "IN_PROGRESS": raise ValueError( "Job status still in `IN_PROGRESS`. The result is not available. Call task.wait_till_done() with a larger timeout or contact support." ) - return download_result(url) + return download_result(url, format) @staticmethod def get_task(client, task_id): From 3a958903e66bbd87988a63d61596516e42924737 Mon Sep 17 00:00:00 2001 From: Matt Sokoloff Date: Wed, 22 Feb 2023 19:15:21 -0600 Subject: [PATCH 3/4] refresh url before downloading --- labelbox/schema/task.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/labelbox/schema/task.py b/labelbox/schema/task.py index f10f58089..e0caf1396 100644 --- a/labelbox/schema/task.py +++ b/labelbox/schema/task.py @@ -94,8 +94,9 @@ def errors(self) -> Optional[Dict[str, Any]]: elif self.status == "COMPLETE": return self.failed_data_rows elif self.type == "export-data-rows": + self.wait_till_done(timeout_seconds=600) if self.errors_url: - return self._fetch_remote_json(url=self.errors_url) + return self._fetch_remote_json(remote_json_field='errors_url') elif self.type == "add-data-rows-to-batch" or self.type == "send-to-task-queue": if self.status == "FAILED": # for these tasks, the error is embedded in the result itself @@ -131,14 +132,14 @@ def failed_data_rows(self) -> Optional[Dict[str, Any]]: return None @lru_cache() - def _fetch_remote_json(self, url: Optional[str] = None) -> Dict[str, Any]: + def _fetch_remote_json(self, + remote_json_field: Optional[str] = None + ) -> Dict[str, Any]: """ Function for fetching and caching the result data. """ - if url is None: - # for backwards compatability - url = self.result_url - def download_result(url, format: str): + def download_result(remote_json_field: Optional[str], format: str): + url = getattr(self, remote_json_field or 'result_url') response = requests.get(url) response.raise_for_status() if format == 'json': @@ -161,14 +162,14 @@ def download_result(url, format: str): ) if self.status != "IN_PROGRESS": - return download_result(url, format) + return download_result(remote_json_field, format) else: self.wait_till_done(timeout_seconds=600) if self.status == "IN_PROGRESS": raise ValueError( "Job status still in `IN_PROGRESS`. The result is not available. Call task.wait_till_done() with a larger timeout or contact support." ) - return download_result(url, format) + return download_result(remote_json_field, format) @staticmethod def get_task(client, task_id): From 1df8e69a23cf056d2a291e0b7965f265365e304b Mon Sep 17 00:00:00 2001 From: Matt Sokoloff Date: Wed, 22 Feb 2023 19:25:19 -0600 Subject: [PATCH 4/4] cleaner --- labelbox/schema/task.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/labelbox/schema/task.py b/labelbox/schema/task.py index e0caf1396..ea4b83dfe 100644 --- a/labelbox/schema/task.py +++ b/labelbox/schema/task.py @@ -94,9 +94,7 @@ def errors(self) -> Optional[Dict[str, Any]]: elif self.status == "COMPLETE": return self.failed_data_rows elif self.type == "export-data-rows": - self.wait_till_done(timeout_seconds=600) - if self.errors_url: - return self._fetch_remote_json(remote_json_field='errors_url') + return self._fetch_remote_json(remote_json_field='errors_url') elif self.type == "add-data-rows-to-batch" or self.type == "send-to-task-queue": if self.status == "FAILED": # for these tasks, the error is embedded in the result itself @@ -140,6 +138,10 @@ def _fetch_remote_json(self, def download_result(remote_json_field: Optional[str], format: str): url = getattr(self, remote_json_field or 'result_url') + + if url is None: + return None + response = requests.get(url) response.raise_for_status() if format == 'json':