JobMaster is a simple job-queue system which allows Python back-end tasks to be triggered or scheduled using a webhook. It works with any PostgreSQL database, and is designed to be simple to use and easy to integrate into your existing codebase.
The deploy() method creates a schema with all necessary tables and procedures in any PostgreSQL database.
Then, add the @task decorator to any python functions — for example a function named foo() in module named my_tasks.
In your web app, you might have a button which passes the following procedure call to your PostgreSQL database:
call jobmaster.insert_job('my_tasks', 'foo', 10, '{"a": 1, "b": 2}'::json)
Whenever your python script runs (for example, on a cron) the job will be retrived from the database and will run my_tasks.foo(a=1, b=2).
There is a lot of additional functionality to explore using optional arguments of the @task() decorator.
Published to PyPi at https://pypi.org/project/jobmaster/.
Simply use pip:
pip install jobmasterAny function in your project can become a JobMaster Task by using the @task decorator.
For example, you may have a function like this:
# jmtests/awesome_things/things1.py
from jobmaster import task
@task
def foo(file_path: str, number: [1, 2, 3]):
"""
Write a number to a file.
:param file_path: the file to write to
:param number: the number to write
"""
with open(file_path, 'w') as f:
f.write(str(number))Think of "tasks" as functions and "jobs" as instances of those functions with specific arguments.
Once you have properly configured JobMaster, this task will be registered with
type_key='things1'(the name of the module)task_key='foo'(the name of the function)
You can add a job to the queue by calling the procedure
call jobmaster.insert_job('things1', 'foo', 10, '{"file_path": "/tmp/sum.txt", "number": 2}'::json)
from wherever you have a connection to your database: from a different python script, from a web application, etc..
jobmaster.insert_job takes 4 arguments:
- The type key of the task
- The task key of the task
- The priority of the job
- The arguments to pass to the task, json formatted
Somewhere in your python project, you will have a script that pops jobs from the queue and executes them:
# jmtests/__init__.py
import sqlalchemy
db_engine = sqlalchemy.create_engine("postgresql+pg8000://", ...)# jmtests/utils/jmutils.py
from jobmaster import JobMaster
from .. import db_engine
from '<any module with tasks>' import *
jobmaster = JobMaster(db_engine=db_engine, _validate_dependencies=True)
# Run jobs from the queue until the queue is empty
if __name__ == '__main__':
jobs = jobmaster.run()This could be run in a loop, or in a cron job, or in a web server, etc..
Tasks can depend on other tasks, and JobMaster will automatically run them in the correct order.
In one file you might have:
# module nice_tasks.py
from jobmaster import task, Dependency, same
@task
def foo(file_path: str, number: int):
with open(file_path, 'w') as f:
f.write(str(number))
@task(dependencies=Dependency(foo, 6, file_path=same))
def bar(file_path: str, number: int, letter: ['A', 'B', 'C'] = 'A'):
with open(file_path, 'r') as f:
_n = int(f.read())
with open(file_path, 'w') as f:
f.write(f"{number + _n}{letter}")
@task(
process_limit=3,
dependencies=[
Dependency(foo, 2, number=10, file_path=same),
Dependency(bar, 2, file_path=same)
]
)
def baz(file_path: str):
with open(file_path, 'r') as f:
s = f.read()And in another file you might have:
# module main.py
import sqlalchemy
from jobmaster import JobMaster
# Create a SQLAlchemy engine for your PostgreSQL database
my_database_engine = sqlalchemy.create_engine("postgresql+pg8000://", ...)
# Create a JobMaster instance
jobmaster = JobMaster(db_engine=my_database_engine)The first time you run your code, you'll need to create the necessary tables in your database. You can do this by using the JobMaster.deploy() method:
# module deploy_jobmaster.py
from .main import jobmaster
# Deploy the necessary tables
jobmaster.deploy()python3 deploy_jobmaster.pyThis will create a schema in your database called jobmaster and create the necessary tables and functions for JobMaster to work.
If you already have a schema called jobmaster in your database, you can specify a different schema name when creating the JobMaster instance:
jobmaster = JobMaster(db_engine=my_database_engine, schema="job_mistress")If you change any of the task definitions, you must update the database by running jobmaster.deploy() again. Running jobmaster.deploy(_reset=True) will drop the schema and all tables then recreate them from scratch, losing any jobs you had in your queue.
We have already introduced the @task decorator.
This can be used without arguments as in
# module my_tasks.py
from jobmaster import task
@task
def foo(a: int, b: int):
# do somethingThis will register the function foo as a task with type key 'my_tasks' and task key 'foo'.
JobMaster will register the parameters a and b, and their types (both int) are infered from the function signature, it is therefore important to use python type hints.
If a parameter is optional, you can specify a default value for it:
@task
def foo(a: int = 1, b: int = 2):
# do somethingIf a parameter has a number of possible options, this should be specified in the type hint:
@task
def foo(a: int, b: [1, 2, 3]):
# do somethingthe argument values 1, 2, and 3 are the only valid values for b.
For such parameters, it may be desirable to insert a job to the queue which performs the task for all possible values of the parameter.
This can be achieved by specifying the relevant parameters in the write_all argument of the @task decorator:
@task(write_all=['b'])
def foo(a: int, b: [1, 2, 3]):
# do somethingthen calling
call jobmaster.insert_job('my_tasks', 'foo', 10, '{"a": 1, "b": "ALL"}'::json)
('ALL' in upper-case). When this job is executed, instead of actually executing the task foo, JobMaster will insert new a job for each possible value of b into the queue.
The type key is the name of the module by default, but you can specify a different type key by passing it as an argument to the decorator:
@task(type_key='cool_tasks', write_all=['b'])
def foo(a: int, b: [1, 2, 3]):
# do somethingTasks can depend on other tasks. This is specified by passing Dependency object (or a list of Dependency objects) to the dependencies argument of the @task decorator:
from jobmaster import task, Dependency, same
@task(type_key='cool_tasks', write_all=['b'])
def foo(a: int, b: [1, 2, 3]):
# do something
@task(
type_key='cool_tasks',
write_all=['b'],
dependencies=Dependency(foo, 2, a=1, b=same)
)
def bar(a: int, b: [1, 2, 3], c: str):
# do somethingThe Dependency object takes the task function, a time (in hours), and the arguments to pass to the task.
In this example, the task bar depends on the task foo with a=1 and b the same as the b of the job for bar.
When a job with task-type "bar" is popped from the queue, JobMaster will first check if there is a job for task-type "foo" with a=1 and b the same as the b of the job for bar, which has been completed in the past 2 hours.
If there isn't, it will insert a job for task-type "foo" with a=1 and b the same as the b of the job for bar into the queue, with a higher priority than the job for bar, then re-insert the job for bar into the queue.
JobMaster can limit jobs using process units.
When a JobMaster object is initialised, the number of process units available on the system is passed as an argument:
jobmaster = JobMaster(db_engine=my_database_engine, system_process_units=1_000)If this argument is not specified, JobMaster will check the environment variable JOBMASTER_SYSTEM_PROCESS_UNITS for the number of process units available. If this is not set, or is not formatted like an integer, it will default to 10,000.
You can then specify the number of process units a task requires by passing an integer to the process_units argument of the @task decorator. If this argument is not specified, the task will require 1 process unit by default.
You can potentially think of process units as Megabytes of RAM. So if you want to restrict JobMaster to using 1GB, set system_process_units=1_000 and set process_units for each task according to how much RAM you expect it to use in MB.
When JobMaster attempts to pop a job from the queue, it checks the queue for all "running" jobs on the same system and sums their process units. This is subtracted from the system process units to get the available process units.
Then, when checking the queue for "waiting" jobs to pop, there is a WHERE process_units <= available_process_units clause.
Example:
from jobmaster import task, Dependency, same
@task(process_units=10, write_all=['b'])
def foo(a: int, b: [1, 2, 3]):
# do something
@task(
type_key='cool_tasks',
write_all=['b'],
dependencies=Dependency(foo, 2, a=1, b=same),
process_units=20
)
def bar(a: int, b: [1, 2, 3], c: str):
# do something