Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: LEAP-824: Remove duplicated tasks with moving of annotations #5572

Merged
merged 12 commits into from
Mar 18, 2024
Merged
114 changes: 9 additions & 105 deletions label_studio/data_manager/actions/experimental.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,13 @@
"""This file and its contents are licensed under the Apache License 2.0. Please see the included NOTICE for copyright information and LICENSE for a copy of the license.
"""

import logging
import random
from collections import defaultdict

import ujson as json
from core.label_config import replace_task_data_undefined_with_config_field
from core.permissions import AllPermissions
from data_manager.actions.basic import delete_tasks
from data_manager.functions import DataManagerException
from django.conf import settings
from io_storages.azure_blob.models import AzureBlobImportStorageLink
from io_storages.gcs.models import GCSImportStorageLink
from io_storages.localfiles.models import LocalFilesImportStorage
from io_storages.s3.models import S3ImportStorageLink
from tasks.models import Annotation, Task
from tasks.serializers import TaskSerializerBulk

Expand Down Expand Up @@ -54,7 +48,10 @@ def propagate_annotations(project, queryset, **kwargs):
TaskSerializerBulk.post_process_annotations(user, db_annotations, 'propagated_annotation')
# Update counters for tasks and is_labeled. It should be a single operation as counters affect bulk is_labeled update
project.update_tasks_counters_and_is_labeled(tasks_queryset=Task.objects.filter(id__in=tasks))
return {'response_code': 200, 'detail': f'Created {len(db_annotations)} annotations'}
return {
'response_code': 200,
'detail': f'Created {len(db_annotations)} annotations',
}


def propagate_annotations_form(user, project):
Expand All @@ -68,90 +65,6 @@ def propagate_annotations_form(user, project):
return [{'columnCount': 1, 'fields': [field]}]


def remove_duplicates(project, queryset, **kwargs):
# get io_storage_* links for tasks, we need to copy them
storages = []
for field in dir(Task):
if field.startswith('io_storages_'):
storages += [field]

tasks = list(queryset.values('data', 'id', 'total_annotations', *storages))
duplicates = defaultdict(list)
for task in list(tasks):
replace_task_data_undefined_with_config_field(task['data'], project)
task['data'] = json.dumps(task['data'])
duplicates[task['data']].append(task)

### build storage links for duplicates ###
classes = {
'io_storages_s3importstoragelink': S3ImportStorageLink,
'io_storages_gcsimportstoragelink': GCSImportStorageLink,
'io_storages_azureblobimportstoragelink': AzureBlobImportStorageLink,
'io_storages_localfilesimportstoragelink': LocalFilesImportStorage,
# 'io_storages_redisimportstoragelink',
# 'lse_io_storages_lses3importstoragelink' # not supported yet
}
for data in list(duplicates):
tasks = duplicates[data]
source = None

# find first task with existing storage link
for task in tasks:
for link in classes:
if link in task and task[link] is not None:
# we don't support case when there are many storage links in duplicated tasks
if source is not None:
source = None
break
source = (task, classes[link], task[link]) # last arg is a storage link id

# add storage links to duplicates
if source:
_class = source[1] # get link name
for task in tasks:
if task['id'] != source[0]['id']:
link_instance = _class.objects.get(id=source[2])
_class.create(
task=Task.objects.get(id=task['id']), key=link_instance.key, storage=link_instance.storage
)

### remove duplicates ###
removing = []

# prepare main tasks which won't be deleted
for data in duplicates:
root = duplicates[data]
if len(root) == 1:
continue

one_task_saved = False
new_root = []
for task in root:
# keep all tasks with annotations in safety
if task['total_annotations'] > 0:
one_task_saved = True
else:
new_root.append(task)

for task in new_root:
# keep the first task in safety
if not one_task_saved:
one_task_saved = True
# remove all other tasks
else:
removing.append(task['id'])

# remove tasks
queryset = queryset.filter(id__in=removing, annotations__isnull=True)
assert queryset.count() == len(removing), (
f'Remove duplicates failed, operation is not finished: '
f'queryset count {queryset.count()} != removing {len(removing)}'
)

delete_tasks(project, queryset)
return {'response_code': 200, 'detail': f'Removed {len(removing)} tasks'}


def rename_labels(project, queryset, **kwargs):
request = kwargs['request']

Expand Down Expand Up @@ -202,7 +115,10 @@ def rename_labels(project, queryset, **kwargs):
annotations = Annotation.objects.filter(project=project)
project.summary.update_created_annotations_and_labels(annotations)

return {'response_code': 200, 'detail': f'Updated {label_count} labels in {annotation_count}'}
return {
'response_code': 200,
'detail': f'Updated {label_count} labels in {annotation_count}',
}


def rename_labels_form(user, project):
Expand Down Expand Up @@ -401,18 +317,6 @@ def add_data_field_form(user, project):
'form': propagate_annotations_form,
},
},
{
'entry_point': remove_duplicates,
'permission': all_permissions.projects_change,
'title': 'Remove Duplicated Tasks',
'order': 1,
'experimental': True,
'dialog': {
'text': 'Confirm that you want to remove duplicated tasks with the same data fields.'
'Only tasks without annotations will be deleted.',
'type': 'confirm',
},
},
{
'entry_point': rename_labels,
'permission': all_permissions.tasks_change,
Expand Down
205 changes: 205 additions & 0 deletions label_studio/data_manager/actions/remove_duplicates.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
"""This file and its contents are licensed under the Apache License 2.0. Please see the included NOTICE for copyright information and LICENSE for a copy of the license.
"""
import logging
from collections import defaultdict

import ujson as json
from core.label_config import replace_task_data_undefined_with_config_field
from core.permissions import AllPermissions
from core.redis import start_job_async_or_sync
from data_manager.actions.basic import delete_tasks
from io_storages.azure_blob.models import AzureBlobImportStorageLink
from io_storages.gcs.models import GCSImportStorageLink
from io_storages.localfiles.models import LocalFilesImportStorage
from io_storages.s3.models import S3ImportStorageLink
from tasks.models import Task

logger = logging.getLogger(__name__)
all_permissions = AllPermissions()


def remove_duplicates(project, queryset, **kwargs):
"""Remove duplicated tasks with the same data fields:
Duplicated tasks will be deleted and all annotations will be moved to the first of the duplicated tasks.
Storage links will be restored for the first task.
"""
start_job_async_or_sync(
remove_duplicates_job,
project,
queryset,
organization_id=project.organization_id,
)
return {'response_code': 200}


def remove_duplicates_job(project, queryset, **kwargs):
"""Job for start_job_async_or_sync"""
duplicates = find_duplicated_tasks_by_data(project, queryset)
restore_storage_links_for_duplicated_tasks(duplicates)
move_annotations(duplicates)
remove_duplicated_tasks(duplicates, project, queryset)


def remove_duplicated_tasks(duplicates, project, queryset):
"""Remove duplicated tasks from queryset with condition that they don't have annotations"""
removing = []
# prepare main tasks which won't be deleted
for data in duplicates:
root = duplicates[data]
if len(root) == 1:
continue

one_task_saved = False
new_root = []
for task in root:
# keep all tasks with annotations in safety
if task['total_annotations'] > 0:
one_task_saved = True
else:
new_root.append(task)

for task in new_root:
# keep the first task in safety
if not one_task_saved:
one_task_saved = True
# remove all other tasks
else:
removing.append(task['id'])

# get the final queryset for removing tasks
queryset = queryset.filter(id__in=removing, annotations__isnull=True)

# check that we don't remove tasks with annotations
if queryset.count() != len(removing):
raise Exception(
f'Remove duplicates failed, operation is not finished: '
f'queryset count {queryset.count()} != removing {len(removing)}. '
'It means that some of duplicated tasks have been annotated twice or more.'
)

delete_tasks(project, queryset)
logger.info(f'Removed {len(removing)} duplicated tasks')
return removing


def move_annotations(duplicates):
"""Move annotations to the first task from duplicated tasks"""
total_moved_annotations = 0

for data in duplicates:
root = duplicates[data]
if len(root) == 1:
continue

# find a task with annotations, make it as "first" main one
i, first = 0, root[0]
for i, task in enumerate(root):
first = task
if task['total_annotations'] > 0:
break

# move annotations to the first task
for task in root[i + 1 :]:
if task['total_annotations'] > 0:
Task.objects.get(id=task['id']).annotations.update(task_id=first['id'])
total_moved_annotations += task['total_annotations']
logger.info(
f"Moved {task['total_annotations']} annotations " f"from task {task['id']} to task {first['id']}"
)
task['total_annotations'] = 0


def restore_storage_links_for_duplicated_tasks(duplicates) -> None:
"""Build storage links for duplicated tasks and save them to Task in DB"""

# storage classes
classes = {
'io_storages_s3importstoragelink': S3ImportStorageLink,
'io_storages_gcsimportstoragelink': GCSImportStorageLink,
'io_storages_azureblobimportstoragelink': AzureBlobImportStorageLink,
'io_storages_localfilesimportstoragelink': LocalFilesImportStorage,
# 'io_storages_redisimportstoragelink',
# 'lse_io_storages_lses3importstoragelink' # not supported yet
}

total_restored_links = 0
for data in list(duplicates):
tasks = duplicates[data]
source = None

# find first task with existing StorageLink
for task in tasks:
for link in classes:
if link in task and task[link] is not None:
# we don't support case when there are many storage links in duplicated tasks
if source is not None:
source = None
break
source = (
task,
classes[link],
task[link],
) # last arg is a storage link id

# add storage links to duplicates
if source:
storage_link_class = source[1] # get link name
for task in tasks:
if task['id'] != source[0]['id']:
# get already existing StorageLink
link_instance = storage_link_class.objects.get(id=source[2])

# assign existing StorageLink to other duplicated tasks
link = storage_link_class(task_id=task['id'], key=link_instance.key, storage=link_instance.storage)
link.save()
total_restored_links += 1
logger.info(f"Restored storage link for task {task['id']} from source task {source[0]['id']}")

logger.info(f'Restored {total_restored_links} storage links for duplicated tasks')


def find_duplicated_tasks_by_data(project, queryset):
"""Find duplicated tasks by `task.data` and return them as a dict"""

# get io_storage_* links for tasks, we need to copy them
storages = []
for field in dir(Task):
if field.startswith('io_storages_'):
storages += [field]

groups = defaultdict(list)
tasks = list(queryset.values('data', 'id', 'total_annotations', *storages))
logger.info(f'Retrieved {len(tasks)} tasks from queryset')

for task in list(tasks):
replace_task_data_undefined_with_config_field(task['data'], project)
task['data'] = json.dumps(task['data'])
groups[task['data']].append(task)

# make groups of duplicated ids for info print
duplicates = {d: groups[d] for d in groups if len(groups[d]) > 1}
info = {d: [task['id'] for task in duplicates[d]] for d in duplicates}

logger.info(f'Found {len(duplicates)} duplicated tasks')
logger.info(f'Duplicated tasks: {info}')
return duplicates


actions = [
{
'entry_point': remove_duplicates,
'permission': all_permissions.projects_change,
'title': 'Remove Duplicated Tasks',
'order': 95,
'experimental': False,
'dialog': {
'text': (
'Confirm that you want to remove duplicated tasks with the same data fields. '
'Duplicated tasks will be deleted and all annotations will be moved to the first task from duplicated tasks. '
'Also Source Storage Links will be restored if at least one duplicated task has a storage link. '
"Warning: Task assignments (enterprise only) won't be saved."
),
'type': 'confirm',
},
},
]
Loading