# Enable status tracking for business critical application using Amazon DynamoDB

- [Initialize a Task](#Initialize-a-Task)

In this example, we introduce a pattern for tracking the status of business critical tasks using Amazon DynamoDB as the backend. This feature gives you the ability to track status of each task, and error-handling, retry, concurrency control out-of-the-box.

NOTE: this solution is based on [pynamodb_mate](https://github.com/MacHu-GWU/pynamodb_mate-project) Python library.

When managing a large number of business-critical tasks, it is crucial to monitor and identify which tasks have been successful, which have failed, and which are still in progress. If the business logic is a pipeline consisting of a sequence of tasks, it is important to keep track of its current status and have the ability to recover from any failed task. We also have seen some advanced requirements from AWS customers that includes:

- Each task should be consumed once and exactly once.
- Each task should be handled by only one worker, you want a concurrency lock mechanism to avoid double consumption.
- For those succeeded tasks, store additional information such as the output, statistics, metadata of the task and log the success time.
- For those failed tasks, you want to log the detailed error message for debugging.
- You want to get all of the failed tasks by one simple query and rerun with the updated business logic.
- Task might be impossible to complete. To avoid falling into an endless retry loop, you want to ignore the tasks if they fail too many times.
- Run custom query based on task status for analytics purpose.

With [pynamodb_mate](https://github.com/MacHu-GWU/pynamodb_mate-project) Python library, you can enable this advanced feature without refactoring your existing application code, and you can use the "elegant" context manager to wrap around your business logic code and enjoy all the features above.

## Declare Your DynamoDB Status Tracking Table

1. First, We define some status code using the enum Python standard library. It improves the code readability and avoids hard coding meaningless integers everywhere in the code base.
2. We declare a DynamoDB table data model and the status GSI index schema.
3. We also declare some configs.

In [1]:
import pynamodb_mate as pm
from rich import print as rprint

In [2]:
# inherit from the base status enum class and give your status
# a human-readable name and a machine-readable integer
# usually the closer to success, the bigger the integer is
class StatusEnum(pm.patterns.status_tracker.BaseStatusEnum):
    s00_todo = 0
    s03_in_progress = 3
    s06_failed = 6
    s09_success = 9
    s10_ignore = 10


class Tracker(pm.patterns.status_tracker.BaseStatusTracker):
    class Meta:
        # define the table name
        table_name = "pynamodb-mate-example-status-tracker"
        # define the AWS region
        region = "us-east-1"
        # define the billing mode, pay-as-you-go or provisioned
        billing_mode = pm.PAY_PER_REQUEST_BILLING_MODE

    # if you want to query your task by status, you have to define this index
    # the name of this attribute doesn't matter
    status_and_update_time_index = pm.patterns.status_tracker.StatusAndUpdateTimeIndex()

    # one DynamoDB table can serve multiple jobs
    # if you defined a default job id for the table
    # you don't need to explicitly specify the job id in many API
    # in this specific example, we only have one job called "test-job"
    JOB_ID: str = "test-job"
    # how many digits the max status code have, this ensures that the
    # status can be used in comparison
    STATUS_ZERO_PAD = 3
    # how many retry is allowed before we ignore it
    MAX_RETRY = 3
    # how long the lock will expire
    LOCK_EXPIRE_SECONDS = 900
    # the default status code, means "to do", usually start from 0
    DEFAULT_STATUS = StatusEnum.s00_todo.value
    # the status enum class for this tracker
    STATUS_ENUM = StatusEnum

    def start_job(
        self,
        debug=True,
    ) -> "Tracker":
        """
        This is just an example of how to use :meth:`BaseStatusTracker.start`.

        A job should always have four related status codes:

        - in process status
        - failed status
        - success status
        - ignore status

        If you have multiple type of jobs, I recommend creating multiple
        wrapper functions like this for each type of jobs. And ensure that
        the "ignore" status value is the largest status value among all,
        and use the same "ignore" status value for all type of jobs.
        """
        return self.start(
            in_process_status=StatusEnum.s03_in_progress,
            failed_status=StatusEnum.s06_failed,
            success_status=StatusEnum.s09_success,
            ignore_status=StatusEnum.s10_ignore,
            debug=debug,
        )

In [3]:
# Create the table if it doesn't exist
Tracker.create_table(wait=True)

## Initialize a Task

The ``Tracker.new(task_id, data)`` method can be used to initialize a task and save to DynamoDB using the ``DEFAULT_STATUS``.

In [4]:
task_id = "t-1"

# create a new task
tracker = Tracker.new(task_id, data={"version": 1})
rprint(tracker.to_dict())

The ``.start(in_process_status, failed_status, success_status, ignore_status)`` method is a context manager that automatically update status at begin and the end, and lock the task to avoid concurrent access. We declared a ``.start_job()`` method to wrap the original ``.start(...)`` method to avoid entering too many arguments.

In [5]:
print(f"before the job started, the lock status is {tracker.is_locked()}")

# start the job, it will succeed
with tracker.start_job(debug=True):
    print(f"at begin, the status became {tracker.status_name!r}")
    print("and you can see that the task is locked")
    rprint(tracker.to_dict())

    # do some work
    tracker.set_data({"version": 2})

print(f"at the end, the status became {tracker.status_name!r}")
print("and the lock is released")
rprint(tracker.to_dict())

before the job started, the lock status is False
------ ▶️ start task(job_id='test-job', task_id='t-1', status='s00_todo') ------
🔓 set status 's03_in_progress' and lock the task.
at begin, the status became 's03_in_progress'
and you can see that the task is locked


✅ 🔐 task succeeded, set status 's09_success' and unlock the task.
----- ⏹️ end task(job_id='test-job', task_id='t-1', status='s09_success') ------
at the end, the status became 's09_success'
and the lock is released


## Error Handling

Let's reset the task and do it one more time, this time the job logic will fail.

- before the task started, the status is still ``s00_todo``
- at begin of the task, the status became ``s03_in_progress``
- at the end of the task, the status become ``s06_failed``
- the task data remains unchanged and the error is logged.

In [6]:
tracker = Tracker.new(task_id, data={"version": 1})

# start the job, it will succeed
with tracker.start_job(debug=True):
    print(f"at begin, the status became {tracker.status_name!r}")
    rprint(tracker.to_dict())

    # do some work
    raise ValueError("something went wrong")
    # the real work will never succeed
    print("doing some work ...")

------ ▶️ start task(job_id='test-job', task_id='t-1', status='s00_todo') ------
🔓 set status 's03_in_progress' and lock the task.
at begin, the status became 's03_in_progress'


❌ 🔐 task failed, set stats 's06_failed' and unlock the task.
------ ⏹️ end task(job_id='test-job', task_id='t-1', status='s06_failed') ------


ValueError: something went wrong

In [7]:
print(f"at the end, the status became {tracker.status_name!r}")
print("and the error is logged")
rprint(tracker.to_dict())

at the end, the status became 's06_failed'
and the error is logged


## Ignore Task if Failed Too Many Times

You don't want a task that logically can never succeed to fail into a endless loop. In this example, we defined the max retry times is 3. If it failed 3 times in a row, it will be ignored. And if you want to start a task that is ignored, you will see an ``TaskIgnoredError``

In [8]:
# reset the task
tracker = Tracker.new(task_id)

print("at the 0th attempt, the task is:")
rprint(tracker.to_dict())

at the 0th attempt, the task is:


In [9]:
with tracker.start_job():
    raise Exception

------ ▶️ start task(job_id='test-job', task_id='t-1', status='s00_todo') ------
🔓 set status 's03_in_progress' and lock the task.
❌ 🔐 task failed, set stats 's06_failed' and unlock the task.
------ ⏹️ end task(job_id='test-job', task_id='t-1', status='s06_failed') ------


Exception: 

In [10]:
print("at the 1th attempt, the task is:")
print(f"status = {tracker.status_name}")
rprint(tracker.to_dict())

at the 1th attempt, the task is:
status = s06_failed


In [11]:
with tracker.start_job():
    raise Exception

----- ▶️ start task(job_id='test-job', task_id='t-1', status='s06_failed') -----
🔓 set status 's03_in_progress' and lock the task.
❌ 🔐 task failed, set stats 's06_failed' and unlock the task.
------ ⏹️ end task(job_id='test-job', task_id='t-1', status='s06_failed') ------


Exception: 

In [12]:
print("at the 2th attempt, the task is:")
print(f"status = {tracker.status_name}")
rprint(tracker.to_dict())

at the 2th attempt, the task is:
status = s06_failed


In [13]:
with tracker.start_job():
    raise Exception

----- ▶️ start task(job_id='test-job', task_id='t-1', status='s06_failed') -----
🔓 set status 's03_in_progress' and lock the task.
❌ 🔐 task failed 3 times already, set status 's10_ignore' and unlock the task.
------ ⏹️ end task(job_id='test-job', task_id='t-1', status='s10_ignore') ------


Exception: 

In [14]:
print("at the 3th attempt, the task is:")
print(f"status = {tracker.status_name}")
rprint(tracker.to_dict())

at the 3th attempt, the task is:
status = s10_ignore


In [15]:
print("You will see a TaskIgnoredError if you try to start the task again")
with tracker.start_job():
    # since this is in ignore status
    # the task logic will never be executed
    print("do something ...")
    pass

You will see a TaskIgnoredError if you try to start the task again
----- ▶️ start task(job_id='test-job', task_id='t-1', status='s10_ignore') -----
↪️ the task is ignored, do nothing!


TaskIgnoredError: Task test-job____t-1 retry count already exceeded 3, ignore it.

## Save Custom Metadata to the Tracker

You may want to store custom metadata to DynamoDB. Since DynamoDB is schemaless, you can store arbitrary data in DynamoDB.

In [16]:
# reset the task
tracker = Tracker.new(task_id)

with tracker.start_job():
    print("do some work")
    print("save custom metadata to the tracker")
    tracker.data["custom_metrics"] = {"number of file processed": 100}
    tracker.set_data(tracker.data)

print("verify the data attribute")
tracker.refresh()
rprint(tracker.to_dict())

------ ▶️ start task(job_id='test-job', task_id='t-1', status='s00_todo') ------
🔓 set status 's03_in_progress' and lock the task.
do some work
save custom metadata to the tracker
✅ 🔐 task succeeded, set status 's09_success' and unlock the task.
----- ⏹️ end task(job_id='test-job', task_id='t-1', status='s09_success') ------
verify the data attribute


## Query Tasks by Status

To restart some tasks from the last failed, you need to be able to query the tasks by status. The ``Tracker.query_by_status()`` method allow you to get tasks by one or many status codes. By default, it returns tasks ordered by ``update_time``.

In [17]:
# create some test data
for ith, status in enumerate(StatusEnum, start=1):
    tracker = Tracker.make(
        task_id=f"t-{ith}",
        status=status.value,
        data={"status_code": status.value},
    )
    tracker.save()
    print(tracker)

pynamodb-mate-example-status-tracker<test-job____t-1>
pynamodb-mate-example-status-tracker<test-job____t-2>
pynamodb-mate-example-status-tracker<test-job____t-3>
pynamodb-mate-example-status-tracker<test-job____t-4>
pynamodb-mate-example-status-tracker<test-job____t-5>


by default, ``query_by_status`` use the Global Secondary Index (GSI) under the hood. This method only returns part of the original attributes. You need to call ``Tracker.refresh()`` method to get the value of all attributes.

In [18]:
for tracker in Tracker.query_by_status(StatusEnum.s09_success):
    print("by default, the `data` attributes is the default value ")
    rprint(tracker.to_dict())
    print("after refresh, the `data` attributes is the real value ")
    tracker.refresh()
    rprint(tracker.to_dict())

by default, the `data` attributes is the default value 


after refresh, the `data` attributes is the real value 


You can use ``auto_refresh = True`` to yield the real value.

In [19]:
print("With auto_refresh = True, the `data` attributes is the real value ")
for tracker in Tracker.query_by_status(StatusEnum.s10_ignore, auto_refresh=True):
    rprint(tracker.to_dict())

With auto_refresh = True, the `data` attributes is the real value 


## Conclusion

In general, this solution can improve the visibility, resilience and reliability of a business critical application. There’s no upfront effort to use this solution, because DynamoDB is a fully managed service. Naturally, it is scalable to adapt to very high workload or unpredictable workload.

The usage of this solution is not limited to the above examples. If you see potential to use this solution in your business problems, please don’t hesitate to let us know by creating an [issue](https://github.com/MacHu-GWU/pynamodb_mate-project/issues).