Skip to content

Commit

Permalink
fix: LSDV-5289-1: Load stats recalculation function from string, call…
Browse files Browse the repository at this point in the history
… after update task states (#4676)

* fix: LSDV-5289-1: move stats recalculation to string import

* clarify log messages

* fix confusing log message

---------

Co-authored-by: jombooth <jombooth@users.noreply.github.com>
  • Loading branch information
jombooth and jombooth committed Aug 24, 2023
1 parent d928537 commit 9302ef7
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 32 deletions.
1 change: 1 addition & 0 deletions label_studio/core/settings/base.py
Expand Up @@ -522,6 +522,7 @@
ANNOTATION_MIXIN = 'tasks.mixins.AnnotationMixin'
ORGANIZATION_MIXIN = 'organizations.mixins.OrganizationMixin'
USER_MIXIN = 'users.mixins.UserMixin'
RECALCULATE_ALL_STATS = None
GET_STORAGE_LIST = 'io_storages.functions.get_storage_list'
STORAGE_ANNOTATION_SERIALIZER = 'io_storages.serializers.StorageAnnotationSerializer'
TASK_SERIALIZER_BULK = 'tasks.serializers.BaseTaskSerializerBulk'
Expand Down
52 changes: 33 additions & 19 deletions label_studio/data_import/api.py
Expand Up @@ -122,20 +122,20 @@
],
operation_summary='Import tasks',
operation_description="""
Import data as labeling tasks in bulk using this API endpoint. You can use this API endpoint to import multiple tasks.
Import data as labeling tasks in bulk using this API endpoint. You can use this API endpoint to import multiple tasks.
One POST request is limited at 250K tasks and 200 MB.
**Note:** Imported data is verified against a project *label_config* and must
include all variables that were used in the *label_config*. For example,
if the label configuration has a *$text* variable, then each item in a data object
must include a "text" field.
<br>
## POST requests
<hr style="opacity:0.3">
There are three possible ways to import tasks with this endpoint:
### 1\. **POST with data**
Send JSON tasks as POST data. Only JSON is supported for POSTing files directly.
Update this example to specify your authorization token and Label Studio instance host, then run the following from
Expand All @@ -145,32 +145,32 @@
curl -H 'Content-Type: application/json' -H 'Authorization: Token abc123' \\
-X POST '{host}/api/projects/1/import' --data '[{{"text": "Some text 1"}}, {{"text": "Some text 2"}}]'
```
### 2\. **POST with files**
Send tasks as files. You can attach multiple files with different names.
- **JSON**: text files in JavaScript object notation format
- **CSV**: text files with tables in Comma Separated Values format
- **TSV**: text files with tables in Tab Separated Value format
- **TXT**: simple text files are similar to CSV with one column and no header, supported for projects with one source only
Update this example to specify your authorization token, Label Studio instance host, and file name and path,
then run the following from the command line:
```bash
curl -H 'Authorization: Token abc123' \\
-X POST '{host}/api/projects/1/import' -F ‘file=@path/to/my_file.csv’
```
### 3\. **POST with URL**
You can also provide a URL to a file with labeling tasks. Supported file formats are the same as in option 2.
```bash
curl -H 'Content-Type: application/json' -H 'Authorization: Token abc123' \\
-X POST '{host}/api/projects/1/import' \\
--data '[{{"url": "http://example.com/test1.csv"}}, {{"url": "http://example.com/test2.csv"}}]'
```
<br>
""".format(host=(settings.HOSTNAME or 'https://localhost:8080'))
))
Expand Down Expand Up @@ -217,12 +217,19 @@ def sync_import(self, request, project, preannotated_from_fields, commit_to_proj
task_count = len(tasks)
annotation_count = len(serializer.db_annotations)
prediction_count = len(serializer.db_predictions)

recalculate_stats_counts = {
'task_count': task_count,
'annotation_count': annotation_count,
'prediction_count': prediction_count,
}

# Update counters (like total_annotations) for new tasks and after bulk update tasks stats. It should be a
# single operation as counters affect bulk is_labeled update
project.update_tasks_counters_and_task_states(tasks_queryset=tasks, maximum_annotations_changed=False,
overlap_cohort_percentage_changed=False,
tasks_number_changed=True)
logger.info('Tasks bulk_update finished')
tasks_number_changed=True, recalculate_stats_counts=recalculate_stats_counts)
logger.info('Tasks bulk_update finished (sync import)')

project.summary.update_data_columns(parsed_data)
# TODO: project.summary.update_created_annotations_and_labels
Expand Down Expand Up @@ -368,24 +375,33 @@ def sync_reimport(self, project, file_upload_ids, files_as_tasks_list):
tasks, serializer = self._save(tasks)
duration = time.time() - start

task_count = len(tasks)
annotation_count = len(serializer.db_annotations)
prediction_count = len(serializer.db_predictions)

# Update counters (like total_annotations) for new tasks and after bulk update tasks stats. It should be a
# single operation as counters affect bulk is_labeled update
project.update_tasks_counters_and_task_states(
tasks_queryset=tasks,
maximum_annotations_changed=False,
overlap_cohort_percentage_changed=False,
tasks_number_changed=True,
recalculate_stats_counts={
'task_count': task_count,
'annotation_count': annotation_count,
'prediction_count': prediction_count,
},
)
logger.info('Tasks bulk_update finished')
logger.info('Tasks bulk_update finished (sync reimport)')

project.summary.update_data_columns(tasks)
# TODO: project.summary.update_created_annotations_and_labels

return Response(
{
'task_count': len(tasks),
'annotation_count': len(serializer.db_annotations),
'prediction_count': len(serializer.db_predictions),
'task_count': task_count,
'annotation_count': annotation_count,
'prediction_count': prediction_count,
'duration': duration,
'file_upload_ids': file_upload_ids,
'found_formats': found_formats,
Expand Down Expand Up @@ -677,5 +693,3 @@ def get(self, request, *args, **kwargs):
response.headers['Cache-Control'] = f"no-store, max-age={max_age}"

return response


34 changes: 26 additions & 8 deletions label_studio/data_import/functions.py
@@ -1,3 +1,4 @@
from typing import Optional, Callable
import logging
import traceback
import time
Expand All @@ -17,7 +18,7 @@
logger = logging.getLogger(__name__)


def async_import_background(import_id, user_id, **kwargs):
def async_import_background(import_id, user_id, recalculate_stats_func : Optional[Callable[..., None]] = None, **kwargs):
with transaction.atomic():
try:
project_import = ProjectImport.objects.get(id=import_id)
Expand Down Expand Up @@ -55,10 +56,17 @@ def async_import_background(import_id, user_id, **kwargs):
prediction_count = len(serializer.db_predictions)
# Update counters (like total_annotations) for new tasks and after bulk update tasks stats. It should be a
# single operation as counters affect bulk is_labeled update

recalculate_stats_counts = {
'task_count': task_count,
'annotation_count': annotation_count,
'prediction_count': prediction_count,
}

project.update_tasks_counters_and_task_states(tasks_queryset=tasks, maximum_annotations_changed=False,
overlap_cohort_percentage_changed=False,
tasks_number_changed=True)
logger.info('Tasks bulk_update finished')
tasks_number_changed=True, recalculate_stats_counts=recalculate_stats_counts)
logger.info('Tasks bulk_update finished (async import)')

project.summary.update_data_columns(tasks)
# TODO: project.summary.update_created_annotations_and_labels
Expand Down Expand Up @@ -144,20 +152,30 @@ def async_reimport_background(reimport_id, organization_id, user, **kwargs):
tasks = serializer.save(project_id=project.id)
emit_webhooks_for_instance(organization_id, project, WebhookAction.TASKS_CREATED, tasks)

task_count = len(tasks)
annotation_count = len(serializer.db_annotations)
prediction_count = len(serializer.db_predictions)

recalculate_stats_counts = {
'task_count': task_count,
'annotation_count': annotation_count,
'prediction_count': prediction_count,
}

# Update counters (like total_annotations) for new tasks and after bulk update tasks stats. It should be a
# single operation as counters affect bulk is_labeled update
project.update_tasks_counters_and_task_states(
tasks_queryset=tasks, maximum_annotations_changed=False,
overlap_cohort_percentage_changed=False,
tasks_number_changed=True)
logger.info('Tasks bulk_update finished')
tasks_number_changed=True, recalculate_stats_counts=recalculate_stats_counts)
logger.info('Tasks bulk_update finished (async reimport)')

project.summary.update_data_columns(tasks)
# TODO: project.summary.update_created_annotations_and_labels

reimport.task_count = len(tasks)
reimport.annotation_count = len(serializer.db_annotations)
reimport.prediction_count = len(serializer.db_predictions)
reimport.task_count = task_count
reimport.annotation_count = annotation_count
reimport.prediction_count = prediction_count
reimport.found_formats = found_formats
reimport.data_columns = list(data_columns)
reimport.status = ProjectReimport.Status.COMPLETED
Expand Down
7 changes: 5 additions & 2 deletions label_studio/projects/mixins.py
@@ -1,3 +1,4 @@
from typing import Mapping, Optional
from core.redis import start_job_async_or_sync
from django.utils.functional import cached_property

Expand Down Expand Up @@ -29,7 +30,8 @@ def update_tasks_counters_and_is_labeled(self, tasks_queryset, from_scratch=True
start_job_async_or_sync(self._update_tasks_counters_and_is_labeled, list(tasks_queryset), from_scratch=from_scratch)

def update_tasks_counters_and_task_states(self, tasks_queryset, maximum_annotations_changed,
overlap_cohort_percentage_changed, tasks_number_changed, from_scratch=True):
overlap_cohort_percentage_changed, tasks_number_changed, from_scratch=True,
recalculate_stats_counts: Optional[Mapping[str, int]] = None):
"""
Async start updating tasks counters and than rearrange
:param tasks_queryset: Tasks to update queryset
Expand All @@ -42,7 +44,8 @@ def update_tasks_counters_and_task_states(self, tasks_queryset, maximum_annotati
if not (isinstance(tasks_queryset, set) or isinstance(tasks_queryset, list)):
tasks_queryset = set(tasks_queryset.values_list('id', flat=True))
start_job_async_or_sync(self._update_tasks_counters_and_task_states, tasks_queryset, maximum_annotations_changed,
overlap_cohort_percentage_changed, tasks_number_changed, from_scratch=from_scratch)
overlap_cohort_percentage_changed, tasks_number_changed, from_scratch=from_scratch,
recalculate_stats_counts=recalculate_stats_counts)

def update_tasks_states(self,
maximum_annotations_changed,
Expand Down
15 changes: 12 additions & 3 deletions label_studio/projects/models.py
@@ -1,5 +1,6 @@
"""This file and its contents are licensed under the Apache License 2.0. Please see the included NOTICE for copyright information and LICENSE for a copy of the license.
"""
from typing import Optional, Mapping
import json
import logging

Expand Down Expand Up @@ -92,6 +93,10 @@ def with_counts_annotate(queryset, fields=None):
ProjectMixin = load_func(settings.PROJECT_MIXIN)


# LSE recalculate all stats
recalculate_all_stats = load_func(settings.RECALCULATE_ALL_STATS)


class Project(ProjectMixin, models.Model):
class SkipQueue(models.TextChoices):
# requeue to the end of the same annotator’s queue => annotator gets this task at the end of the queue
Expand Down Expand Up @@ -194,7 +199,7 @@ class SkipQueue(models.TextChoices):
)

control_weights = JSONField(_('control weights'), null=True, default=dict, help_text="Dict of weights for each control tag in metric calculation. Each control tag (e.g. label or choice) will "
"have it's own key in control weight dict with weight for each label and overall weight."
"have it's own key in control weight dict with weight for each label and overall weight."
"For example, if bounding box annotation with control tag named my_bbox should be included with 0.33 weight in agreement calculation, "
"and the first label Car should be twice more important than Airplaine, then you have to need the specify: "
"{'my_bbox': {'type': 'RectangleLabels', 'labels': {'Car': 1.0, 'Airplaine': 0.5}, 'overall': 0.33}")
Expand Down Expand Up @@ -406,7 +411,7 @@ def _rearrange_overlap_cohort(self):
all_project_tasks = Task.objects.filter(project=self)
max_annotations = self.maximum_annotations
must_tasks = int(self.tasks.count() * self.overlap_cohort_percentage / 100 + 0.5)
logger.info(f"Starting _update_tasks_states with params: Project {str(self)} maximum_annotations "
logger.info(f"Starting _rearrange_overlap_cohort with params: Project {str(self)} maximum_annotations "
f"{max_annotations} and percentage {self.overlap_cohort_percentage}")
tasks_with_max_annotations = all_project_tasks.annotate(
anno=Count('annotations', filter=Q_task_finished_annotations & Q(annotations__ground_truth=False))
Expand Down Expand Up @@ -903,7 +908,7 @@ def _update_tasks_counters_and_is_labeled(self, task_ids, from_scratch=True):

def _update_tasks_counters_and_task_states(self, queryset, maximum_annotations_changed,
overlap_cohort_percentage_changed, tasks_number_changed,
from_scratch=True):
from_scratch=True, recalculate_stats_counts : Optional[Mapping[str, int]] = None):
"""
Update tasks counters and update tasks states (rearrange and\or is_labeled)
:param queryset: Tasks to update queryset
Expand All @@ -913,6 +918,10 @@ def _update_tasks_counters_and_task_states(self, queryset, maximum_annotations_c
queryset = make_queryset_from_iterable(queryset)
objs = self._update_tasks_counters(queryset, from_scratch)
self._update_tasks_states(maximum_annotations_changed, overlap_cohort_percentage_changed, tasks_number_changed)

if recalculate_all_stats and recalculate_stats_counts:
recalculate_all_stats(self.id, **recalculate_stats_counts)

return objs

def __str__(self):
Expand Down

0 comments on commit 9302ef7

Please sign in to comment.