Skip to content

Commit

Permalink
feat: Add export with filters (#198)
Browse files Browse the repository at this point in the history
* feat: Add export with filters

* Add chunked export and import to migrate script

* Fix with workspace

* Fix empty tasks on import

* Set empty label config, import, then copy label config

* Black
  • Loading branch information
makseq committed Apr 29, 2024
1 parent ecd529c commit a4ac1c3
Show file tree
Hide file tree
Showing 8 changed files with 371 additions and 49 deletions.
6 changes: 5 additions & 1 deletion examples/README.md
Expand Up @@ -18,7 +18,11 @@ If your data is hosted in Google Cloud Storage (GCS), you can write a Python scr

## Export

* [Export snapshots](export_snapshots.py) - This example shows how to export, check export status and download JSON shapshots from Label Studio.
* [Export with filters](export_with_filters.py) - This example shows how to use the simplest version of exporting data with filters.
* [Export snapshots](export_snapshots.py) - This example shows how to export, check export status and download JSON shapshots from Label Studio. This is detailed code on how to use snapshots. It includes the following steps:
* Create a snapshot
* Check the snapshot status
* Download the snapshot


## Machine Learning
Expand Down
45 changes: 45 additions & 0 deletions examples/export_with_filters.py
@@ -0,0 +1,45 @@
""" Export a snapshot with tasks filtered by ID range.
Note: at this moment it's not possible to export snapshots with filters,
LS API doesn't support it yet. However, it's achievable by creating a view
with a filter and then exporting a snapshot using this view.
This approach is hidden behind the `project.export()` method.
"""

from label_studio_sdk import Client
from label_studio_sdk.data_manager import Filters, Operator, Type, Column


# Usage example
host = 'https://app.heartex.com'
api_key = '<your_api_key>'
project_id = 14528
start_id = 10000
end_id = 20000

# Create a filter for task ID range
filters = Filters.create(
Filters.AND,
[
Filters.item(
Column.inner_id,
Operator.GREATER_OR_EQUAL,
Type.Number,
Filters.value(start_id),
),
Filters.item(
Column.inner_id,
Operator.LESS,
Type.Number,
Filters.value(end_id),
),
],
)

print('Export started ...')
ls = Client(url=host, api_key=api_key)
project = ls.get_project(project_id)
result = project.export(filters=filters, export_type="JSON", output_dir='exported')
print(
f"Export file saved as: exported/{result['filename']}, status: {result['status']}, export_id: {result['export_id']}"
)
131 changes: 92 additions & 39 deletions examples/migrate_ls_to_ls/migrate-ls-to-ls.py
Expand Up @@ -11,10 +11,13 @@
import time

from label_studio_sdk import Client
from label_studio_sdk.data_manager import Filters, Operator, Type, Column
from label_studio_sdk.users import User

logger = logging.getLogger("migration-ls-to-ls")
logger.setLevel(logging.DEBUG)

CHUNK_SIZE = int(os.getenv('CHUNK_SIZE', 1000))
DEFAULT_STORAGE = os.getenv('DEFAULT_STORAGE', '') # 's3', 'gcs' or 'azure'
DEFAULT_STORAGE_REGEX = os.getenv(
'DEFAULT_STORAGE_REGEX', '.*'
Expand Down Expand Up @@ -45,6 +48,7 @@ def __init__(self, src_url, src_key, dst_url, dst_key, dest_workspace):
:param src_key: source Label Studio token
:param dst_url: destination Label Studio instance
:param dst_key: destination Label Studio token
:param dest_workspace: destination workspace id
"""
# Connect to the Label Studio API and check the connection
self.src_ls = Client(url=src_url, api_key=src_key)
Expand Down Expand Up @@ -137,33 +141,47 @@ def run(self, project_ids=None):
self.create_users(users)

# start exporting projects
success = 0
for project in projects:
logger.info(
f'Going to create export snapshot for project {project.id} {project.params["title"]}'
)
status, filename = self.export_snapshot(project)
logger.info(
f"Snapshot for project {project.id} created with status {status} and filename {filename}"
success += self.migrate_project(project)

logger.info(
f"Projects are processed, finishing with {success} successful and {len(projects)} total projects"
)

def migrate_project(self, project):
filenames = self.export_chunked_snapshots(project)
if not filenames:
logger.error(
f"No exported files found: skipping project {project.id}. Maybe project is empty?"
)
self.patch_snapshot_users(filename)
return False

if status != 200:
logger.info(f"Skipping project {project.id} because of errors {status}")
continue
logger.info(f"Patching snapshot users for project {project.id}")
for filename in filenames:
self.patch_snapshot_users(filename)

if self.dest_workspace is not None:
project.params["workspace"] = self.dest_workspace
new_project = self.create_project(project)
logger.info(f"New project creation for project {project.id}")
label_config = str(project.label_config)
project.params["label_config"] = '<View></View>'
new_project = self.create_project(project)

logger.info(f"Going to import {filename} to project {new_project.id}")
logger.info(f"Going to import {filenames} to project {new_project.id}")
for filename in filenames:
new_project.import_tasks(filename)
logger.info(f"Import {filename} finished for project {new_project.id}")
time.sleep(1)

self.add_default_import_storage(new_project)

logger.info("All projects are processed, finish")
project.set_params(label_config=label_config)
self.add_default_import_storage(new_project)
return True

def create_project(self, project):
if self.dest_workspace is not None:
project.params["workspace"] = self.dest_workspace
else:
project.params.pop("workspace", None)

logger.info(
f'Going to create a new project "{project.params["title"]}" from old project {project.id}'
)
Expand Down Expand Up @@ -273,30 +291,66 @@ def create_users(self, users: [User]):
logger.info(f"Created users: {[u.email for u in new_users]}")
return new_users

def export_snapshot(self, project):
"""Export all tasks from the project"""
# create new export snapshot
export_result = project.export_snapshot_create(
title="Migration snapshot",
serialization_options_drafts=False,
serialization_options_annotations__completed_by=False,
serialization_options_predictions=False,
)
assert "id" in export_result
export_id = export_result["id"]
def export_chunked_snapshots(self, project):
"""Export all tasks from the project in chunks and return filenames for each chunk"""

# wait until snapshot is ready
while project.export_snapshot_status(export_id).is_in_progress():
time.sleep(1.0)
logger.info(f'Creating chunked snapshots for project {project.id}')
file_size, filenames, chunk_i = 100, [], 0

# download snapshot file
status, file_name = project.export_snapshot_download(
export_id, export_type="JSON"
while file_size > 2: # 2 bytes is an empty json file
start_id = CHUNK_SIZE * chunk_i
end_id = CHUNK_SIZE * (chunk_i + 1)
logger.info(
f"Exporting chunk {chunk_i} from {start_id} to {end_id} tasks for project {project.id}"
)

# create a filters for inner ID range to split tasks into chunks
filters = self.inner_id_range_filter(start_id, end_id)

# create new export and save to disk
output_dir = "snapshots"
result = project.export(
filters=filters,
title=f"Migration snapshot for chunk {chunk_i}",
serialization_options_drafts=False,
serialization_options_annotations__completed_by=False,
serialization_options_predictions=False,
output_dir=output_dir,
)
status, filename = result["status"], result["filename"]
if status != 200:
logger.info(
f"Error while exporting chunk {chunk_i}: {status}, skipping export"
)
return []

chunk_i += 1
filename = os.path.join(output_dir, filename)
file_size = os.path.getsize(filename)
if file_size > 2:
filenames.append(filename)

return filenames

def inner_id_range_filter(self, start_id, end_id):
filters = Filters.create(
Filters.AND,
[
Filters.item(
Column.inner_id,
Operator.GREATER_OR_EQUAL,
Type.Number,
Filters.value(start_id),
),
Filters.item(
Column.inner_id,
Operator.LESS,
Type.Number,
Filters.value(end_id),
),
],
)
assert status == 200
assert file_name is not None
logger.info(f"Status of the export is {status}. File name is {file_name}")
return status, file_name
return filters

def patch_snapshot_users(self, filename):
"""
Expand Down Expand Up @@ -390,7 +444,6 @@ def run():
dst_key=args.dst_key,
dest_workspace=args.dest_workspace,
)
logging.basicConfig(level=logging.INFO)

project_ids = (
[int(i) for i in args.project_ids.split(",")] if args.project_ids else None
Expand Down
6 changes: 4 additions & 2 deletions label_studio_sdk/data_manager.py
Expand Up @@ -176,6 +176,8 @@ class Column:

id = "tasks:id"
"""Task ID"""
inner_id = "tasks:inner_id"
"""Task Inner ID, it starts from 1 for all projects"""
ground_truth = "tasks:ground_truth"
"""Ground truth status of the tasks"""
annotations_results = "tasks:annotations_results"
Expand All @@ -191,9 +193,9 @@ class Column:
file_upload = "tasks:file_upload"
"""Name of the file uploaded to create the tasks"""
created_at = "tasks:created_at"
"""Time the task was updated at (e.g. new annotation was created, review added, etc)"""
created_at = "tasks:updated_at"
"""Time the task was created at"""
updated_at = "tasks:updated_at"
"""Time the task was updated at (e.g. new annotation was created, review added, etc)"""
annotators = "tasks:annotators"
"""Annotators that completed the task (Community). Can include assigned annotators (Enterprise only)"""
total_predictions = "tasks:total_predictions"
Expand Down

0 comments on commit a4ac1c3

Please sign in to comment.