Skip to content

Commit

Permalink
Create a dataclass for indexer worker TaskStatus (#4601)
Browse files Browse the repository at this point in the history
* #4534 -Created a dataclass for indexer worker TaskStatus

* Made types more precise in dataclass for indexer worker TaskStatus as suggested

* Changed timestamps to float and type of progress, finish_time to Synchronized

* Use `Synchronized[float]` type

---------

Co-authored-by: Krystle Salazar <github@krysal.co>
  • Loading branch information
akshay-km and krysal committed Jul 19, 2024
1 parent 6a70050 commit 8c1bc5b
Showing 1 changed file with 27 additions and 10 deletions.
37 changes: 27 additions & 10 deletions indexer_worker/indexer_worker/tasks.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
"""Simple in-memory tracking of executed tasks."""

from __future__ import annotations

import datetime
from dataclasses import dataclass
from multiprocessing.sharedctypes import Synchronized
from typing import Any


def _time_fmt(timestamp: int) -> str | None:
def _time_fmt(timestamp: float) -> str | None:
"""
Format the timestamp into a human-readable date and time notation.
Expand All @@ -16,6 +21,16 @@ def _time_fmt(timestamp: int) -> str | None:
return str(datetime.datetime.utcfromtimestamp(timestamp))


@dataclass
class TaskInfo:
task: Any
start_time: float
model: str
target_index: str
finish_time: Synchronized[float]
progress: Synchronized[float]


class TaskTracker:
def __init__(self):
self.tasks = {}
Expand All @@ -27,9 +42,11 @@ def add_task(self, task_id: str, **kwargs):
:param task: the task being performed
:param task_id: the UUID of the task
"""
self.tasks[task_id] = {
"start_time": datetime.datetime.utcnow().timestamp(),
} | kwargs
task_info = TaskInfo(
start_time=datetime.datetime.utcnow().timestamp(), **kwargs
)

self.tasks[task_id] = task_info

def get_task_status(self, task_id: str) -> dict:
"""
Expand All @@ -39,12 +56,12 @@ def get_task_status(self, task_id: str) -> dict:
:return: response dictionary containing all relevant info about the task
"""
task_info = self.tasks[task_id]
active = task_info["task"].is_alive()
model = task_info["model"]
target_index = task_info["target_index"]
start_time = task_info["start_time"]
finish_time = task_info["finish_time"].value
progress = task_info["progress"].value
active = task_info.task.is_alive()
model = task_info.model
target_index = task_info.target_index
start_time = task_info.start_time
finish_time = task_info.finish_time.value
progress = task_info.progress.value

return {
"task_id": task_id,
Expand Down

0 comments on commit 8c1bc5b

Please sign in to comment.