diff --git a/examples/README.md b/examples/README.md index b7f6e6d8..e23e3063 100644 --- a/examples/README.md +++ b/examples/README.md @@ -18,7 +18,11 @@ If your data is hosted in Google Cloud Storage (GCS), you can write a Python scr ## Export -* [Export snapshots](export_snapshots.py) - This example shows how to export, check export status and download JSON shapshots from Label Studio. +* [Export with filters](export_with_filters.py) - This example shows how to use the simplest version of exporting data with filters. +* [Export snapshots](export_snapshots.py) - This example shows how to export, check export status and download JSON shapshots from Label Studio. This is detailed code on how to use snapshots. It includes the following steps: + * Create a snapshot + * Check the snapshot status + * Download the snapshot ## Machine Learning diff --git a/examples/export_with_filters.py b/examples/export_with_filters.py new file mode 100644 index 00000000..b9a1abc8 --- /dev/null +++ b/examples/export_with_filters.py @@ -0,0 +1,45 @@ +""" Export a snapshot with tasks filtered by ID range. + +Note: at this moment it's not possible to export snapshots with filters, +LS API doesn't support it yet. However, it's achievable by creating a view +with a filter and then exporting a snapshot using this view. +This approach is hidden behind the `project.export()` method. +""" + +from label_studio_sdk import Client +from label_studio_sdk.data_manager import Filters, Operator, Type, Column + + +# Usage example +host = 'https://app.heartex.com' +api_key = '' +project_id = 14528 +start_id = 10000 +end_id = 20000 + +# Create a filter for task ID range +filters = Filters.create( + Filters.AND, + [ + Filters.item( + Column.inner_id, + Operator.GREATER_OR_EQUAL, + Type.Number, + Filters.value(start_id), + ), + Filters.item( + Column.inner_id, + Operator.LESS, + Type.Number, + Filters.value(end_id), + ), + ], +) + +print('Export started ...') +ls = Client(url=host, api_key=api_key) +project = ls.get_project(project_id) +result = project.export(filters=filters, export_type="JSON", output_dir='exported') +print( + f"Export file saved as: exported/{result['filename']}, status: {result['status']}, export_id: {result['export_id']}" +) diff --git a/examples/migrate_ls_to_ls/migrate-ls-to-ls.py b/examples/migrate_ls_to_ls/migrate-ls-to-ls.py index b1fb65cb..932f786f 100644 --- a/examples/migrate_ls_to_ls/migrate-ls-to-ls.py +++ b/examples/migrate_ls_to_ls/migrate-ls-to-ls.py @@ -11,10 +11,13 @@ import time from label_studio_sdk import Client +from label_studio_sdk.data_manager import Filters, Operator, Type, Column from label_studio_sdk.users import User logger = logging.getLogger("migration-ls-to-ls") +logger.setLevel(logging.DEBUG) +CHUNK_SIZE = int(os.getenv('CHUNK_SIZE', 1000)) DEFAULT_STORAGE = os.getenv('DEFAULT_STORAGE', '') # 's3', 'gcs' or 'azure' DEFAULT_STORAGE_REGEX = os.getenv( 'DEFAULT_STORAGE_REGEX', '.*' @@ -45,6 +48,7 @@ def __init__(self, src_url, src_key, dst_url, dst_key, dest_workspace): :param src_key: source Label Studio token :param dst_url: destination Label Studio instance :param dst_key: destination Label Studio token + :param dest_workspace: destination workspace id """ # Connect to the Label Studio API and check the connection self.src_ls = Client(url=src_url, api_key=src_key) @@ -137,33 +141,47 @@ def run(self, project_ids=None): self.create_users(users) # start exporting projects + success = 0 for project in projects: - logger.info( - f'Going to create export snapshot for project {project.id} {project.params["title"]}' - ) - status, filename = self.export_snapshot(project) - logger.info( - f"Snapshot for project {project.id} created with status {status} and filename {filename}" + success += self.migrate_project(project) + + logger.info( + f"Projects are processed, finishing with {success} successful and {len(projects)} total projects" + ) + + def migrate_project(self, project): + filenames = self.export_chunked_snapshots(project) + if not filenames: + logger.error( + f"No exported files found: skipping project {project.id}. Maybe project is empty?" ) - self.patch_snapshot_users(filename) + return False - if status != 200: - logger.info(f"Skipping project {project.id} because of errors {status}") - continue + logger.info(f"Patching snapshot users for project {project.id}") + for filename in filenames: + self.patch_snapshot_users(filename) - if self.dest_workspace is not None: - project.params["workspace"] = self.dest_workspace - new_project = self.create_project(project) + logger.info(f"New project creation for project {project.id}") + label_config = str(project.label_config) + project.params["label_config"] = '' + new_project = self.create_project(project) - logger.info(f"Going to import {filename} to project {new_project.id}") + logger.info(f"Going to import {filenames} to project {new_project.id}") + for filename in filenames: new_project.import_tasks(filename) logger.info(f"Import {filename} finished for project {new_project.id}") + time.sleep(1) - self.add_default_import_storage(new_project) - - logger.info("All projects are processed, finish") + project.set_params(label_config=label_config) + self.add_default_import_storage(new_project) + return True def create_project(self, project): + if self.dest_workspace is not None: + project.params["workspace"] = self.dest_workspace + else: + project.params.pop("workspace", None) + logger.info( f'Going to create a new project "{project.params["title"]}" from old project {project.id}' ) @@ -273,30 +291,66 @@ def create_users(self, users: [User]): logger.info(f"Created users: {[u.email for u in new_users]}") return new_users - def export_snapshot(self, project): - """Export all tasks from the project""" - # create new export snapshot - export_result = project.export_snapshot_create( - title="Migration snapshot", - serialization_options_drafts=False, - serialization_options_annotations__completed_by=False, - serialization_options_predictions=False, - ) - assert "id" in export_result - export_id = export_result["id"] + def export_chunked_snapshots(self, project): + """Export all tasks from the project in chunks and return filenames for each chunk""" - # wait until snapshot is ready - while project.export_snapshot_status(export_id).is_in_progress(): - time.sleep(1.0) + logger.info(f'Creating chunked snapshots for project {project.id}') + file_size, filenames, chunk_i = 100, [], 0 - # download snapshot file - status, file_name = project.export_snapshot_download( - export_id, export_type="JSON" + while file_size > 2: # 2 bytes is an empty json file + start_id = CHUNK_SIZE * chunk_i + end_id = CHUNK_SIZE * (chunk_i + 1) + logger.info( + f"Exporting chunk {chunk_i} from {start_id} to {end_id} tasks for project {project.id}" + ) + + # create a filters for inner ID range to split tasks into chunks + filters = self.inner_id_range_filter(start_id, end_id) + + # create new export and save to disk + output_dir = "snapshots" + result = project.export( + filters=filters, + title=f"Migration snapshot for chunk {chunk_i}", + serialization_options_drafts=False, + serialization_options_annotations__completed_by=False, + serialization_options_predictions=False, + output_dir=output_dir, + ) + status, filename = result["status"], result["filename"] + if status != 200: + logger.info( + f"Error while exporting chunk {chunk_i}: {status}, skipping export" + ) + return [] + + chunk_i += 1 + filename = os.path.join(output_dir, filename) + file_size = os.path.getsize(filename) + if file_size > 2: + filenames.append(filename) + + return filenames + + def inner_id_range_filter(self, start_id, end_id): + filters = Filters.create( + Filters.AND, + [ + Filters.item( + Column.inner_id, + Operator.GREATER_OR_EQUAL, + Type.Number, + Filters.value(start_id), + ), + Filters.item( + Column.inner_id, + Operator.LESS, + Type.Number, + Filters.value(end_id), + ), + ], ) - assert status == 200 - assert file_name is not None - logger.info(f"Status of the export is {status}. File name is {file_name}") - return status, file_name + return filters def patch_snapshot_users(self, filename): """ @@ -390,7 +444,6 @@ def run(): dst_key=args.dst_key, dest_workspace=args.dest_workspace, ) - logging.basicConfig(level=logging.INFO) project_ids = ( [int(i) for i in args.project_ids.split(",")] if args.project_ids else None diff --git a/label_studio_sdk/data_manager.py b/label_studio_sdk/data_manager.py index ae23333f..0ac15732 100644 --- a/label_studio_sdk/data_manager.py +++ b/label_studio_sdk/data_manager.py @@ -176,6 +176,8 @@ class Column: id = "tasks:id" """Task ID""" + inner_id = "tasks:inner_id" + """Task Inner ID, it starts from 1 for all projects""" ground_truth = "tasks:ground_truth" """Ground truth status of the tasks""" annotations_results = "tasks:annotations_results" @@ -191,9 +193,9 @@ class Column: file_upload = "tasks:file_upload" """Name of the file uploaded to create the tasks""" created_at = "tasks:created_at" - """Time the task was updated at (e.g. new annotation was created, review added, etc)""" - created_at = "tasks:updated_at" """Time the task was created at""" + updated_at = "tasks:updated_at" + """Time the task was updated at (e.g. new annotation was created, review added, etc)""" annotators = "tasks:annotators" """Annotators that completed the task (Community). Can include assigned annotators (Enterprise only)""" total_predictions = "tasks:total_predictions" diff --git a/label_studio_sdk/project.py b/label_studio_sdk/project.py index 2cafa712..12da4fa2 100644 --- a/label_studio_sdk/project.py +++ b/label_studio_sdk/project.py @@ -899,6 +899,23 @@ def create_view(self, filters, ordering=None, title="Tasks"): response = self.make_request("POST", "/api/dm/views", json=data) return response.json() + def delete_view(self, view_id): + """Delete view + + Parameters + ---------- + view_id: int + View ID + + Returns + ------- + dict: + dict with deleted view + + """ + response = self.make_request("DELETE", f"/api/dm/views/{view_id}") + return + @property def tasks(self): """Retrieve all tasks from the project. This call can be very slow if the project has a lot of tasks.""" @@ -1107,10 +1124,10 @@ def create_prediction( """ data = {"task": task_id, "result": result, "score": score} if model_version is not None: - data['model_version'] = model_version - response = self.make_request('POST', "/api/predictions", json=data) + data["model_version"] = model_version + response = self.make_request("POST", "/api/predictions", json=data) json = response.json() - logger.debug(f'Response: {json}') + logger.debug(f"Response: {json}") return json def create_predictions(self, predictions): @@ -2180,7 +2197,7 @@ def assign_annotators_by_sampling( overlap: int = 1, ): """ - Behaves similarly like `assign_annotators()` but instead of specify tasks_ids explicitely, + Behaves similarly like `assign_annotators()` but instead of specify tasks_ids explicitly, it gets users' IDs list and optional view ID and splits all tasks across annotators. Fraction expresses the size of dataset to be assigned. Parameters @@ -2312,6 +2329,101 @@ def export_snapshot_create( ) return response.json() + def export( + self, + filters=None, + title="SDK Export", + export_type="JSON", + output_dir=".", + **kwargs, + ): + """ + Export tasks from the project with optional filters, + and save the exported data to a specified directory. + + This method: + (1) creates a temporary view with the specified filters if they are not None, + (2) creates a new export snapshot using the view ID, + (3) checks the status of the snapshot creation while it's in progress, + (4) and downloads the snapshot file in the specified export format. + (5) After the export, it cleans up and remove the temporary view. + + Parameters + ---------- + filters : data_manager.Filters, dict, optional + Filters to apply when exporting tasks. + If provided, a temporary view is created with these filters. + The format of the filters should match the Label Studio filter options. + Default is None, which means all tasks are exported. + Use label_studio_sdk.data_manager.Filters.create() to create filters, + Example of the filters JSON format: + ```json + { + "conjunction": "and", + "items": [ + { + "filter": "filter:tasks:id", + "operator": "equal", + "type": "Number", + "value": 1 + } + ] + } + ``` + titile : str, optional + The title of the export snapshot. Default is 'SDK Export'. + export_type : str, optional + The format of the exported data. It should be one of the formats supported by Label Studio ('JSON', 'CSV', etc.). Default is 'JSON'. + output_dir : str, optional + The directory where the exported file will be saved. Default is the current directory. + kwargs : kwargs, optional + The same parameters as in the export_snapshot_create method. + + Returns + ------- + dict + containing the status of the export, the filename of the exported file, and the export ID. + + filename : str + Path to the downloaded export file + status : int + 200 is ok + export_id : int + Export ID, you can retrieve more details about this export using this ID + """ + + # Create a temporary view with the specified filters + if filters: + view = self.create_view(title="Temp SDK export", filters=filters) + task_filter_options = {"view": view["id"]} + else: + task_filter_options = None + view = None + + # Create a new export snapshot using the view ID + export_result = self.export_snapshot_create( + title=title, + task_filter_options=task_filter_options, + **kwargs, + ) + + # Check the status of the snapshot creation + export_id = export_result["id"] + while self.export_snapshot_status(export_id).is_in_progress(): + time.sleep(1.0) # Wait until the snapshot is ready + + os.makedirs(output_dir, exist_ok=True) + + # Download the snapshot file once it's ready + status, filename = self.export_snapshot_download( + export_id, export_type=export_type, path=output_dir + ) + + # Clean up the view + if view: + self.delete_view(view["id"]) + return {"status": status, "filename": filename, "export_id": export_id} + def export_snapshot_status(self, export_id: int) -> ExportSnapshotStatus: """ Get export snapshot status by Export ID diff --git a/tests/requirements-test.txt b/tests/requirements-test.txt index cd73eca0..7fafa61c 100644 --- a/tests/requirements-test.txt +++ b/tests/requirements-test.txt @@ -1,2 +1,3 @@ -pytest==6.2.5 -pytest-cov==3.0.0 \ No newline at end of file +pytest==8.1.1 +pytest-cov==4.1.0 +requests_mock==1.11.0 \ No newline at end of file diff --git a/tests/test_client.py b/tests/test_client.py index 3bf3fca7..7a88f79b 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1,4 +1,4 @@ -from unittest.mock import patch, Mock +from unittest.mock import Mock from label_studio_sdk.client import Client diff --git a/tests/test_export.py b/tests/test_export.py new file mode 100644 index 00000000..d304d5ef --- /dev/null +++ b/tests/test_export.py @@ -0,0 +1,105 @@ +from unittest.mock import Mock + +import requests_mock + +from label_studio_sdk.client import Client +from label_studio_sdk.data_manager import Filters, Operator, Type, Column + + +def test_client_headers(): + mock_session = Mock(spec=["request"]) + mock_session.request.return_value = Mock(status_code=200) + client = Client( + url="http://fake.url", + api_key="fake_key", + session=mock_session, + versions={"label-studio": "1.0.0"}, + extra_headers={"Proxy-Authorization": "Bearer fake_bearer"}, + ) + + client.check_connection() + args, kwargs = mock_session.request.call_args + assert kwargs["headers"] == { + "Authorization": f"Token fake_key", + "Proxy-Authorization": "Bearer fake_bearer", + } + + +def test_client_no_extra_headers(): + mock_session = Mock(spec=["request"]) + mock_session.request.return_value = Mock(status_code=200) + client = Client( + url="http://fake.url", + api_key="fake_key", + session=mock_session, + versions={"label-studio": "1.0.0"}, + ) + + client.check_connection() + args, kwargs = mock_session.request.call_args + assert kwargs["headers"] == {"Authorization": f"Token fake_key"} + + +def test_project_export_with_filters(): + with requests_mock.Mocker() as m: + m.get( + 'http://fake.url/api/version', + json={"version": "1.0.0"}, + status_code=200, + ) + m.get( + 'http://fake.url/api/projects/1', + json={"id": 1, "title": "fake_project"}, + status_code=200, + ) + m.post( + 'http://fake.url/api/projects/1/exports', + json={"id": 1, "title": "fake_project"}, + status_code=200, + ) + m.post( + 'http://fake.url/api/dm/views', + json={"id": 1, "title": "fake_project"}, + status_code=200, + ) + m.post( + 'http://fake.url/api/projects/1/exports', + json={"id": 1, "title": "fake_project"}, + status_code=200, + ) + m.get( + 'http://fake.url/api/projects/1/exports/1', + json={"id": 1, "title": "fake_project", "status": "completed"}, + status_code=200, + ) + m.get( + 'http://fake.url/api/projects/1/exports/1/download?exportType=JSON', + headers={"Content-Disposition": "attachment; filename=fake_project.json"}, + status_code=200, + ) + m.delete( + 'http://fake.url/api/dm/views/1', + status_code=200, + ) + + filters = Filters.create( + Filters.AND, + [ + Filters.item( + Column.inner_id, + Operator.GREATER_OR_EQUAL, + Type.Number, + Filters.value(1), + ), + Filters.item( + Column.inner_id, + Operator.LESS, + Type.Number, + Filters.value(100), + ), + ], + ) + + ls = Client(url='http://fake.url', api_key='fake_key') + project = ls.get_project(1) + project.export(filters=filters)