# File Tracker

## Declare Your Own Tracker

In [1]:
import shutil
import random
import dataclasses
from pathlib import Path

from abstract_tracker.api import logger, FileTracker, TaskLockedError

from rich import print as rprint

dir_data = Path.cwd() / "data"

@dataclasses.dataclass
class MyTracker(FileTracker):
    @classmethod
    def get_path(cls, id) -> Path:
        """
        The path of the tracker file.
        """
        return dir_data.joinpath(f"{id}.json")

    @classmethod
    def get_expire(cls) -> int:
        """
        Number of seconds before a lock expires for this tracker.

        If you don't declare this, then it will use default = 900 (15 minutes)
        """
        return 10

    @classmethod
    def get_max_attempts(cls) -> int:
        """
        Maximum number of attempts before this task is considered exhaused.

        If you don't declare this, then it will use default = 3
        """
        return 3


class TaskError(Exception):
    pass


# define some dummy task
def run_good_task():
    logger.info("run good task")


def run_bad_task():
    logger.info("run bad task")
    raise TaskError("task failed")

In [2]:
# clean up existing data, let's start from scratch
shutil.rmtree(dir_data, ignore_errors=True)
dir_data.mkdir(parents=True, exist_ok=True)

## Create a new Tracker

In [3]:
# try to load the existing tracker from file backend, if it doesn't exist, then it will be None
tracker = MyTracker.load(id=1)
if tracker is None: # if not exist, create a new tracker with pending status
    tracker = MyTracker.new(id=1)
rprint(tracker)

## Use Context Manager to Manage Lock and Status Automatically

In [4]:
with tracker.start(verbose=True):
    run_good_task()

+----- ⏱ ⏩ start task(id=1, status=0 (pending), attempts=1) -------------------+
| set status = 10 (⏳ in_progress) and 🔓 lock the task.
| +----- start task logging ---------------------------------------------------+
| ⏳ run good task
| +----- end task logging -----------------------------------------------------+
| task succeeded, set status = 40 (✅ succeeded) and 🔐 unlock the task.
+----- ⏰ ⏹️ end task(id=1 status=40)) -----------------------------------------+


In [5]:
tracker = MyTracker.load(id=1)
rprint(tracker)
print(f"{tracker.status_name = }")

tracker.status_name = 'succeeded'


## Automatically Log Error Message

In [6]:
# test on another task (id=2)
tracker = MyTracker.new(id=2)

In [7]:
with tracker.start(verbose=True):
    run_bad_task()

+----- ⏱ ⏩ start task(id=2, status=0 (pending), attempts=1) -------------------+
| set status = 10 (⏳ in_progress) and 🔓 lock the task.
| +----- start task logging ---------------------------------------------------+
| ⏳ run bad task
| +----- end task logging -----------------------------------------------------+
| ❌ task failed, set status = 20 (❌ failed) and 🔐 unlock the task.
+----- ⏰ ⏹️ end task(id=2 status=20)) -----------------------------------------+


TaskError: task failed

In [8]:
tracker = MyTracker.load(id=2)
rprint(tracker)
print(f"{tracker.status_name = }")

tracker.status_name = 'failed'


## Automatically Set Status as Exhausted When Reach Max Attempts

In [9]:
with tracker.start(verbose=True):
    run_bad_task()

+----- ⏱ ⏩ start task(id=2, status=20 (failed), attempts=2) -------------------+
| set status = 10 (⏳ in_progress) and 🔓 lock the task.
| +----- start task logging ---------------------------------------------------+
| ⏳ run bad task
| +----- end task logging -----------------------------------------------------+
| ❌ task failed, set status = 20 (❌ failed) and 🔐 unlock the task.
+----- ⏰ ⏹️ end task(id=2 status=20)) -----------------------------------------+


TaskError: task failed

In [10]:
with tracker.start(verbose=True):
    run_bad_task()

+----- ⏱ ⏩ start task(id=2, status=20 (failed), attempts=3) -------------------+
| set status = 10 (⏳ in_progress) and 🔓 lock the task.
| +----- start task logging ---------------------------------------------------+
| ⏳ run bad task
| +----- end task logging -----------------------------------------------------+
| ❌ task failed 3 times already, set status = 30 (🚫 ignored) and 🔐 unlock the task.
+----- ⏰ ⏹️ end task(id=2 status=30)) -----------------------------------------+


TaskError: task failed

In [11]:
with tracker.start(verbose=True):
    run_bad_task()

+----- ⏱ ⏩ start task(id=2, status=30 (exhausted), attempts=4) ----------------+
| the task is 🚫 exhausted, do nothing!
+----- ⏰ ⏹️ end task(id=2 status=30)) -----------------------------------------+


NoMoreRetryError: Already tried 3 times, No more retry for task 2.

## Automatically Set Concurrency Lock To Prevent Double Consumption

In [12]:
tracker1 = MyTracker.new(id=3)
with tracker1.start(verbose=True):
    # Another worker load the tracker data
    tracker2 = MyTracker.load(id=3)
    with logger.nested(): # just make the logging nicer
        try:
            # Worker 2 try to run task
            with tracker2.start(verbose=True):
                run_good_task() # worker 2 won't be able to run task
        except TaskLockedError as e:
            pass
    run_good_task() # worker 1 then can finish the task
tracker = MyTracker.load(id=3)
rprint(tracker)
print(f"{tracker.status_name = }")

+----- ⏱ ⏩ start task(id=3, status=0 (pending), attempts=1) -------------------+
| set status = 10 (⏳ in_progress) and 🔓 lock the task.
| +----- start task logging ---------------------------------------------------+
| ⏳ +----- ⏱ ⏩ start task(id=3, status=10 (in_progress), attempts=2) ----------+
| ⏳ | 🔓 the task is locked, do nothing!
| ⏳ +----- ⏰ ⏹️ end task(id=3 status=10)) -------------------------------------+
| ⏳ run good task
| +----- end task logging -----------------------------------------------------+
| task succeeded, set status = 40 (✅ succeeded) and 🔐 unlock the task.
+----- ⏰ ⏹️ end task(id=3 status=40)) -----------------------------------------+


tracker.status_name = 'succeeded'
