# Enable status tracking for business critical application using sqlalchemy_mate

In this example, we introduce a pattern for tracking the status of business critical jobs using Relational database as the backend. This feature gives you the ability to track status of each job, and error-handling, retry, concurrency control out-of-the-box.

NOTE: this solution is based on sqlalchemy_mate Python library.

When managing a large number of business-critical jobs, it is crucial to monitor and identify which jobs have been successful, which have failed, and which are still in progress. If the business logic is a pipeline consisting of a sequence of jobs, it is important to keep track of its current status and have the ability to recover from any failed job. We also have seen some advanced requirements like:

- Each job should be consumed once and exactly once.
- Each job should be handled by only one worker, you want a concurrency lock mechanism to avoid double consumption.
- For those succeeded jobs, store additional information such as the output, statistics, metadata of the job and log the success time.
- For those failed jobs, you want to log the detailed error message for debugging.
- You want to get all of the failed jobs by one simple query and rerun with the updated business logic.
- job might be impossible to complete. To avoid falling into an endless retry loop, you want to ignore the jobs if they fail too many times.
- Run custom query based on job status for analytics purpose.

With sqlalchemy_mate Python library, you can enable this advanced feature without refactoring your existing application code, and you can use the "elegant" context manager to wrap around your business logic code and enjoy all the features above.

## Declare Your Status Tracking Table

1. First, We define some status code using the enum Python standard library. It improves the code readability and avoids hard coding meaningless integers everywhere in the code base.
2. We declare a sqlalchemy ORM data model.

In [33]:
import enum

import sqlalchemy as sa
import sqlalchemy.orm as orm
import sqlalchemy_mate.api as sm
from sqlalchemy_mate.tests.api import engine_psql as engine

from rich import print as rprint

In [34]:
class StatusEnum(int, enum.Enum):
    pending = 10
    in_progress = 20
    failed = 30
    succeeded = 40
    ignored = 50

In [35]:
Base = orm.declarative_base()

class Job(Base, sm.ExtendedBase, sm.patterns.status_tracker.JobMixin):
    __tablename__ = "sqlalchemy_mate_status_tracker_job"

    @classmethod
    def start_job(
        cls,
        id: str,
        skip_error: bool = False,
        debug: bool = False,
    ):
        return cls.start(
            engine=engine,
            id=id,
            in_process_status=StatusEnum.in_progress.value,
            failed_status=StatusEnum.failed.value,
            success_status=StatusEnum.succeeded.value,
            ignore_status=StatusEnum.ignored.value,
            expire=15,
            max_retry=3,
            skip_error=skip_error,
            debug=debug,
        )


Base.metadata.create_all(engine)

## Initialize Some Jobs

First, let's initialize some jobs. At begin, all the job are in ``pending`` status.

In [36]:
with engine.connect() as conn:
    conn.execute(Job.__table__.delete())
    conn.commit()

In [37]:
with orm.Session(engine) as ses:
    job = Job.create_and_save(
        engine_or_session=ses,
        id="job-1",
        status=StatusEnum.pending.value,
        data={"version": 0},
    )
    job = Job.create_and_save(
        engine_or_session=ses,
        id="job-2",
        status=StatusEnum.pending.value,
        data={"version": 0},
    )
    job = Job.create_and_save(
        engine_or_session=ses,
        id="job-3",
        status=StatusEnum.pending.value,
        data={"version": 0},
    )
    rprint(job)

In [38]:
# You can also do this to initialize many jobs in batch
# with orm.Session(engine) as ses:
#     for job_id in ["job-1", "job-2", "job-3"]:
#         job = Job.create(
#             id=job_id,
#             status=StatusEnum.pending.value,
#             data={"version": 0},
#         )
#         ses.add(job)
#     ses.commit()
#     rprint(job)

## Job Succeeded

The ``Job.start()`` class method is a magic context manager that does a lot of things.

1. It try to obtain lock before the job begin. Once we have obtained the lock, other work won't be able to update this items (they will see that it is locked).
2. Any raised exception will be captured by the context manager, and it will set the status as ``failed``, add retry count, log the error (and save the error information to DB), and release the lock.
3. If the job has been failed too many times, it will set the status as ``ignored``.
4. If everything goes well, it will set status as ``succeeded`` and apply updates.

In [39]:
with Job.start_job(id="job-1", debug=True) as (
    job,
    updates,
):
    # run your job logic here ...
    updates.set(key="data", value={"version": job.data["version"] + 1})

----------------------------- ▶️ start Job 'job-1'------------------------------
🔓Try to set status = 20 and lock the job 'job-1' ...
  Successfully lock the job!
✅ 🔐 job succeeded, set status = 40 and unlock the job.
------------------------ ⏹️ end Job 'job-1' status = 40)------------------------


In [40]:
with orm.Session(engine) as ses:
    job = ses.get(Job, "job-1")
    rprint(job)

## Job Failed

In [41]:
class CustomError(Exception):
    pass


with Job.start_job(id="job-2", debug=True) as (
    job,
    updates,
):
    updates.set(key="data", value={"version": job.data["version"] + 1})
    # intentionally raise an error to simulate a failed job
    raise CustomError("something wrong in job-2") 

----------------------------- ▶️ start Job 'job-2'------------------------------
🔓Try to set status = 20 and lock the job 'job-2' ...
  Successfully lock the job!
❌ 🔐 job failed, set status = 30 and unlock the job.
------------------------ ⏹️ end Job 'job-2' status = 20)------------------------


CustomError: something wrong in job-2

In [42]:
with orm.Session(engine) as ses:
    job = ses.get(Job, "job-2")
    rprint(job)

## Ignore If Job Fail Too Many Times

You don't want a job that logically can never succeed to fail into a endless loop. In this example, we defined the max retry times is 3 (See ORM data model). If it failed 3 times in a row, it will be ignored. And if you want to start a job k that is ignored, you will see an ``JobIgnoredError``

In [43]:
with Job.start_job(id="job-3", debug=True) as (job, updates):
    raise CustomError("something wrong in job-3 first attempts")

----------------------------- ▶️ start Job 'job-3'------------------------------
🔓Try to set status = 20 and lock the job 'job-3' ...
  Successfully lock the job!
❌ 🔐 job failed, set status = 30 and unlock the job.
------------------------ ⏹️ end Job 'job-3' status = 20)------------------------


CustomError: something wrong in job-3 first attempts

In [44]:
with Job.start_job(id="job-3", debug=True) as (job, updates):
    raise CustomError("something wrong in job-3 second attempts")

----------------------------- ▶️ start Job 'job-3'------------------------------
🔓Try to set status = 20 and lock the job 'job-3' ...
  Successfully lock the job!
❌ 🔐 job failed, set status = 30 and unlock the job.
------------------------ ⏹️ end Job 'job-3' status = 20)------------------------


CustomError: something wrong in job-3 second attempts

In [45]:
with Job.start_job(id="job-3", debug=True) as (job, updates):
    raise CustomError("something wrong in job-3 third attempts")

----------------------------- ▶️ start Job 'job-3'------------------------------
🔓Try to set status = 20 and lock the job 'job-3' ...
  Successfully lock the job!
❌ 🔐 job failed 3 times already, set status = 50 and unlock the job.
------------------------ ⏹️ end Job 'job-3' status = 20)------------------------


CustomError: something wrong in job-3 third attempts

In [46]:
# The 4th attempts will raise ``JobIgnoredError``
with Job.start_job(id="job-3", debug=True) as (job, updates):
    updates.set(key="data", value={"version": job.data["version"] + 1})

----------------------------- ▶️ start Job 'job-3'------------------------------
↪️ the job is ignored, do nothing!


JobIgnoredError: Job 'job-3' retry count already exceeded 3, ignore it.

In [47]:
with orm.Session(engine) as ses:
    job = ses.get(Job, "job-3")
    rprint(job)

## Recap

A relationship database is perfect for this status tracking use case. However, if you don't want to manage the database cluster or you are experiencing super high volume of conccurent jobs, you could consider using Amazon DynamoDB. Amazon DynamoDB is a serverless, infinitely scalable, key value store that is perfect for this use case. [pynamodb_mate](https://github.com/MacHu-GWU/pynamodb_mate-project) Python library has built-in support for this status tracking pattern. You can see example at [Enable status tracking for business critical application using Amazon DynamoDB
](https://github.com/MacHu-GWU/pynamodb_mate-project/blob/master/examples/patterns/status-tracker.ipynb)