Skip to content

Commit

Permalink
Using dedicated event to store working time (#7958)
Browse files Browse the repository at this point in the history
- Parsing JSON payloads to get `working_time` in general leads to low
performance in Clickhouse requests. This patch will not fix it right
now, but with this patch, after a period of time we may switch to new
quick approach to calculate working time.
- There will not be a lot of `send:working_time` events, we may store
this scope of events for a longer time (e.g. 5 years instead of one by
default).
- Finally storing working time in such events like `click:element` or
`send:exception`, or `debug:info` seems not logical.
- Also, the history showed, that as result in different bugs, these
events may sometime lose information about `job_id`, `task_id`, etc.

Resolved #7884
  • Loading branch information
bsekachev committed Jun 3, 2024
1 parent dfb9ecf commit 7f4be9c
Show file tree
Hide file tree
Showing 7 changed files with 189 additions and 94 deletions.
4 changes: 4 additions & 0 deletions changelog.d/20240529_154845_boris_dedicated_event_for_wt.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
### Changed

- CVAT now stores users' working time in events of a dedicated type
(<https://github.com/cvat-ai/cvat/pull/7958>)
16 changes: 7 additions & 9 deletions cvat-ui/src/actions/annotation-actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -925,14 +925,7 @@ export function getJobAsync({
throw new Error('Requested resource id is not valid');
}

const loadJobEvent = await logger.log(
EventScope.loadJob,
{
task_id: taskID,
job_id: jobID,
},
true,
);
const loadJobEvent = await logger.log(EventScope.loadJob, {}, true);

getCore().config.globalObjectsCounter = 0;
const [job] = await cvat.jobs.get({ jobID });
Expand Down Expand Up @@ -979,7 +972,12 @@ export function getJobAsync({
}
}

loadJobEvent.close(await jobInfoGenerator(job));
loadJobEvent.close({
...await jobInfoGenerator(job),
jobID: job.id,
taskID: job.taskId,
projectID: job.projectId,
});

const openTime = Date.now();
dispatch({
Expand Down
18 changes: 9 additions & 9 deletions cvat/apps/engine/tests/test_rest_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -550,25 +550,25 @@ def setUpTestData(cls):
create_db_users(cls)
cls.data = {
"events": [{
"scope": "test:scope1",
"timestamp": "2019-01-29T12:34:56.000000Z",
"task": 1,
"job": 1,
"proj": 2,
"scope": "debug:info",
"timestamp": "2024-05-30T17:05:13.776Z",
"task_id": 1,
"job_id": 1,
"project_id": 2,
"organization": 2,
"count": 1,
"payload": json.dumps({
"client_id": 12321235123,
"client_id": 123456,
"message": "just test message",
"name": "add point",
"is_active": True,
}),
},
{
"timestamp": "2019-02-24T12:34:56.000000Z",
"scope": "test:scope2",
"timestamp": "2024-05-30T17:05:14.776Z",
"scope": "debug:info",
}],
"timestamp": "2019-02-24T12:34:58.000000Z",
"timestamp": "2024-05-30T17:05:15.776Z",
}


Expand Down
133 changes: 101 additions & 32 deletions cvat/apps/events/handlers.py
Original file line number Diff line number Diff line change
@@ -1,43 +1,35 @@
# Copyright (C) 2023 CVAT.ai Corporation
# Copyright (C) 2023-2024 CVAT.ai Corporation
#
# SPDX-License-Identifier: MIT

import datetime
import traceback
from copy import deepcopy
from typing import Optional, Union
import traceback
import rq

from rest_framework.views import exception_handler
from rest_framework.exceptions import NotAuthenticated
import rq
from crum import get_current_request, get_current_user
from rest_framework import status
from crum import get_current_user, get_current_request

from cvat.apps.engine.models import (
Project,
Task,
Job,
User,
CloudStorage,
Issue,
Comment,
Label,
)
from cvat.apps.engine.serializers import (
ProjectReadSerializer,
TaskReadSerializer,
JobReadSerializer,
BasicUserSerializer,
CloudStorageReadSerializer,
IssueReadSerializer,
CommentReadSerializer,
LabelSerializer,
)
from cvat.apps.engine.models import ShapeType
from cvat.apps.organizations.models import Membership, Organization, Invitation
from cvat.apps.organizations.serializers import OrganizationReadSerializer, MembershipReadSerializer, InvitationReadSerializer
from rest_framework.exceptions import NotAuthenticated
from rest_framework.views import exception_handler

from cvat.apps.engine.models import (CloudStorage, Comment, Issue, Job, Label,
Project, ShapeType, Task, User)
from cvat.apps.engine.serializers import (BasicUserSerializer,
CloudStorageReadSerializer,
CommentReadSerializer,
IssueReadSerializer,
JobReadSerializer, LabelSerializer,
ProjectReadSerializer,
TaskReadSerializer)
from cvat.apps.organizations.models import Invitation, Membership, Organization
from cvat.apps.organizations.serializers import (InvitationReadSerializer,
MembershipReadSerializer,
OrganizationReadSerializer)

from .event import event_scope, record_server_event
from .cache import get_cache
from .event import event_scope, record_server_event


def project_id(instance):
if isinstance(instance, Project):
Expand Down Expand Up @@ -141,7 +133,7 @@ def user_name(instance=None):

def user_email(instance=None):
current_user = get_user(instance)
return _get_value(current_user, "email")
return _get_value(current_user, "email") or None

def organization_slug(instance):
if isinstance(instance, Organization):
Expand Down Expand Up @@ -594,3 +586,80 @@ def handle_viewset_exception(exc, context):
)

return response

def handle_client_events_push(request, data: dict):
TIME_THRESHOLD = datetime.timedelta(seconds=100)
WORKING_TIME_SCOPE = 'send:working_time'
WORKING_TIME_RESOLUTION = datetime.timedelta(milliseconds=1)
COLLAPSED_EVENT_SCOPES = frozenset(("change:frame",))
org = request.iam_context["organization"]

def read_ids(event: dict) -> tuple[int | None, int | None, int | None]:
return event.get("job_id"), event.get("task_id"), event.get("project_id")

def get_end_timestamp(event: dict) -> datetime.datetime:
if event["scope"] in COLLAPSED_EVENT_SCOPES:
return event["timestamp"] + datetime.timedelta(milliseconds=event["duration"])
return event["timestamp"]

if previous_event := data["previous_event"]:
previous_end_timestamp = get_end_timestamp(previous_event)
previous_ids = read_ids(previous_event)
elif data["events"]:
previous_end_timestamp = data["events"][0]["timestamp"]
previous_ids = read_ids(data["events"][0])

working_time_per_ids = {}
for event in data["events"]:
working_time = datetime.timedelta()
timestamp = event["timestamp"]

if timestamp > previous_end_timestamp:
t_diff = timestamp - previous_end_timestamp
if t_diff < TIME_THRESHOLD:
working_time += t_diff

previous_end_timestamp = timestamp

end_timestamp = get_end_timestamp(event)
if end_timestamp > previous_end_timestamp:
working_time += end_timestamp - previous_end_timestamp
previous_end_timestamp = end_timestamp

if previous_ids not in working_time_per_ids:
working_time_per_ids[previous_ids] = {
"value": datetime.timedelta(),
"timestamp": timestamp,
}

working_time_per_ids[previous_ids]["value"] += working_time
previous_ids = read_ids(event)

if data["events"]:
common = {
"user_id": request.user.id,
"user_name": request.user.username,
"user_email": request.user.email or None,
"org_id": getattr(org, "id", None),
"org_slug": getattr(org, "slug", None),
}

for ids, working_time in working_time_per_ids.items():
job_id, task_id, project_id = ids
if working_time["value"].total_seconds():
value = working_time["value"] // WORKING_TIME_RESOLUTION
record_server_event(
scope=WORKING_TIME_SCOPE,
request_id=request_id(),
# keep it in payload for backward compatibility
# but in the future it is much better to use a "duration" field
# because parsing JSON in SQL query is very slow
payload={"working_time": value},
timestamp=str(working_time["timestamp"].timestamp()),
duration=value,
project_id=project_id,
task_id=task_id,
job_id=job_id,
count=1,
**common,
)
56 changes: 22 additions & 34 deletions cvat/apps/events/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from rest_framework import serializers


class EventSerializer(serializers.Serializer):
scope = serializers.CharField(required=True)
obj_name = serializers.CharField(required=False, allow_null=True)
Expand All @@ -27,19 +28,20 @@ class EventSerializer(serializers.Serializer):
payload = serializers.CharField(required=False, allow_null=True)

class ClientEventsSerializer(serializers.Serializer):
ALLOWED_SCOPES = frozenset((
'load:cvat', 'load:job', 'save:job', 'restore:job',
'upload:annotations', 'send:exception', 'send:task_info',
'draw:object', 'paste:object', 'copy:object', 'propagate:object',
'drag:object', 'resize:object', 'delete:object', 'lock:object',
'merge:objects', 'split:objects', 'group:objects', 'slice:object',
'join:objects', 'change:attribute', 'change:label', 'change:frame',
'zoom:image', 'fit:image', 'rotate:image', 'action:undo', 'action:redo',
'debug:info', 'run:annotations_action', 'click:element'
))

events = EventSerializer(many=True, default=[])
previous_event = EventSerializer(default=None, allow_null=True, write_only=True)
timestamp = serializers.DateTimeField()
_TIME_THRESHOLD = datetime.timedelta(seconds=100)
_WORKING_TIME_RESOLUTION = datetime.timedelta(milliseconds=1)
_COLLAPSED_EVENT_SCOPES = frozenset(("change:frame",))

@classmethod
def _end_timestamp(cls, event: dict) -> datetime.datetime:
if event["scope"] in cls._COLLAPSED_EVENT_SCOPES:
return event["timestamp"] + datetime.timedelta(milliseconds=event["duration"])

return event["timestamp"]

def to_internal_value(self, data):
data = super().to_internal_value(data)
Expand All @@ -52,35 +54,21 @@ def to_internal_value(self, data):
receive_time = datetime.datetime.now(datetime.timezone.utc)
time_correction = receive_time - send_time

if previous_event := data["previous_event"]:
previous_end_timestamp = self._end_timestamp(previous_event)
elif data["events"]:
previous_end_timestamp = data["events"][0]["timestamp"]
if data["previous_event"]:
data["previous_event"]["timestamp"] += time_correction

for event in data["events"]:
working_time = datetime.timedelta()
scope = event["scope"]
if scope not in ClientEventsSerializer.ALLOWED_SCOPES:
raise serializers.ValidationError({ "scope": f"Event scope **{scope}** is not allowed from client" })

timestamp = event["timestamp"]
if timestamp > previous_end_timestamp:
t_diff = timestamp - previous_end_timestamp
if t_diff < self._TIME_THRESHOLD:
working_time += t_diff

previous_end_timestamp = timestamp

end_timestamp = self._end_timestamp(event)
if end_timestamp > previous_end_timestamp:
working_time += end_timestamp - previous_end_timestamp
previous_end_timestamp = end_timestamp

payload = json.loads(event.get("payload", "{}"))
payload.update({
"working_time": working_time // self._WORKING_TIME_RESOLUTION,
"username": request.user.username,
})
try:
payload = json.loads(event.get("payload", "{}"))
except json.JSONDecodeError:
raise serializers.ValidationError({ "payload": "JSON payload is not valid in passed event" })

event.update({
"timestamp": str((timestamp + time_correction).timestamp()),
"timestamp": event["timestamp"] + time_correction,
"source": "client",
"org_id": org_id,
"org_slug": org_slug,
Expand Down
27 changes: 17 additions & 10 deletions cvat/apps/events/views.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
# Copyright (C) 2023 CVAT.ai Corporation
# Copyright (C) 2023-2024 CVAT.ai Corporation
#
# SPDX-License-Identifier: MIT

from django.conf import settings
from rest_framework import status, viewsets
from rest_framework.response import Response
from drf_spectacular.utils import OpenApiResponse, OpenApiParameter, extend_schema
from drf_spectacular.types import OpenApiTypes
from drf_spectacular.utils import (OpenApiParameter, OpenApiResponse,
extend_schema)
from rest_framework import status, viewsets
from rest_framework.renderers import JSONRenderer
from rest_framework.response import Response


from cvat.apps.iam.filters import ORGANIZATION_OPEN_API_PARAMETERS
from cvat.apps.engine.log import vlogger
from cvat.apps.events.permissions import EventsPermission
from cvat.apps.events.serializers import ClientEventsSerializer
from cvat.apps.engine.log import vlogger
from cvat.apps.iam.filters import ORGANIZATION_OPEN_API_PARAMETERS

from .export import export
from .handlers import handle_client_events_push


class EventsViewSet(viewsets.ViewSet):
serializer_class = None
Expand All @@ -31,11 +34,15 @@ def create(self, request):
serializer = ClientEventsSerializer(data=request.data, context={"request": request})
serializer.is_valid(raise_exception=True)

for event in serializer.data["events"]:
message = JSONRenderer().render(event).decode('UTF-8')
handle_client_events_push(request, serializer.validated_data)
for event in serializer.validated_data["events"]:
message = JSONRenderer().render({
**event,
'timestamp': str(event["timestamp"].timestamp())
}).decode('UTF-8')
vlogger.info(message)

return Response(serializer.data, status=status.HTTP_201_CREATED)
return Response(serializer.validated_data, status=status.HTTP_201_CREATED)

@extend_schema(summary='Get an event log',
methods=['GET'],
Expand Down
Loading

0 comments on commit 7f4be9c

Please sign in to comment.