Skip to content

Commit

Permalink
work on task scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
Marcel committed May 14, 2017
1 parent 4fdffca commit da62d56
Show file tree
Hide file tree
Showing 9 changed files with 241 additions and 32 deletions.
10 changes: 6 additions & 4 deletions gengine/app/alembic/versions/8b935f06690c_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
"""

# revision identifiers, used by Alembic.
from sqlalchemy.dialects.postgresql.base import TIMESTAMP

revision = '8b935f06690c'
down_revision = '2012674516fc'
branch_labels = None
Expand Down Expand Up @@ -35,10 +37,10 @@ def upgrade():
op.create_table('taskexecutions',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('task_id', sa.Integer(), nullable=False),
sa.Column('planned_at', sa.DateTime(), nullable=False),
sa.Column('locked_at', sa.DateTime(), nullable=True),
sa.Column('finished_at', sa.DateTime(), nullable=True),
sa.Column('canceled_at', sa.DateTime(), nullable=True),
sa.Column('planned_at', TIMESTAMP(timezone=True), nullable=False),
sa.Column('locked_at', TIMESTAMP(timezone=True), nullable=True),
sa.Column('finished_at', TIMESTAMP(timezone=True), nullable=True),
sa.Column('canceled_at', TIMESTAMP(timezone=True), nullable=True),
sa.Column('log', sa.String(), nullable=True),
sa.ForeignKeyConstraint(['task_id'], ['tasks.id'], name=op.f('fk_taskexecutions_task_id_tasks'), ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id', name=op.f('pk_taskexecutions'))
Expand Down
30 changes: 30 additions & 0 deletions gengine/app/alembic/versions/902a80a786e8_task_success.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""task_success
Revision ID: 902a80a786e8
Revises: cc8f9f50802f
Create Date: 2017-05-14 16:18:58.009078
"""

# revision identifiers, used by Alembic.
revision = '902a80a786e8'
down_revision = 'cc8f9f50802f'
branch_labels = None
depends_on = None

from alembic import op
import sqlalchemy as sa


def upgrade():
### commands auto generated by Alembic - please adjust! ###
op.add_column('taskexecutions', sa.Column('success', sa.Boolean(), nullable=True))
op.create_index(op.f('ix_taskexecutions_success'), 'taskexecutions', ['success'], unique=False)
### end Alembic commands ###


def downgrade():
### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_taskexecutions_success'), table_name='taskexecutions')
op.drop_column('taskexecutions', 'success')
### end Alembic commands ###
9 changes: 5 additions & 4 deletions gengine/app/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,11 +345,12 @@ def get_default_token_valid_time():
t_taskexecutions = Table('taskexecutions', Base.metadata,
Column('id', ty.Integer, primary_key=True),
Column('task_id', ty.Integer, ForeignKey("tasks.id", ondelete="CASCADE"), index=True, nullable=False),
Column('planned_at', ty.DateTime(), nullable=False, default=None, index=True),
Column('locked_at', ty.DateTime(), nullable=True, default=None, index=True),
Column('finished_at', ty.DateTime(), nullable=True, default=None, index=True),
Column('canceled_at', ty.DateTime(), nullable=True, default=None, index=True),
Column('planned_at', TIMESTAMP(timezone=True), nullable=False, default=None, index=True),
Column('locked_at', TIMESTAMP(timezone=True), nullable=True, default=None, index=True),
Column('finished_at', TIMESTAMP(timezone=True), nullable=True, default=None, index=True),
Column('canceled_at', TIMESTAMP(timezone=True), nullable=True, default=None, index=True),
Column('log', ty.String),
Column('success', ty.Boolean, index=True, nullable=True, default=None),
)

class AuthUser(ABase):
Expand Down
9 changes: 7 additions & 2 deletions gengine/app/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@
description="print sth",
config_scheme=None,
default_config=None,
default_cron=None
default_cron="* * * * *",
default_activated=True
)
def demo_task():
def demo_task(config):
print("TEST123")
return {
'log': None,
'success': True
}


22 changes: 17 additions & 5 deletions gengine/app/tasksystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import venusian
import zope.interface
from gengine.metadata import MySession

from sqlalchemy.sql.expression import and_
from zope.interface.declarations import implementer
Expand Down Expand Up @@ -34,13 +35,18 @@ def callback(scanner, _name, wrapped):
config = scanner.config

registry = config.registry
registry.getUtility(ITaskRegistry).register(self.name, self.description, self.config_scheme, self.default_config, self.default_cron)
registry.getUtility(ITaskRegistry).register(self.name, wrapped, self.description, self.config_scheme, self.default_config, self.default_cron)

import transaction
from .model import t_tasks
from ..metadata import DBSession

sess = DBSession.target()
sess = None
if hasattr(DBSession, "target"):
sess = DBSession
else:
sess = DBSession()

with transaction.manager:

db_task = sess.execute(t_tasks.select().where(and_(
Expand Down Expand Up @@ -80,7 +86,7 @@ def callback(scanner, _name, wrapped):
class ITaskRegistry(zope.interface.Interface):
registrations = zope.interface.Attribute("""blahblah""")

def register(name, description, config_scheme, default_config, default_cron):
def register(name, fun, description, config_scheme, default_config, default_cron):
"""bar blah blah"""


Expand All @@ -89,10 +95,16 @@ class TaskRegistry:
def __init__(self):
self.registrations = defaultdict(lambda: defaultdict(dict))

def register(self, name, description, config_scheme, default_config, default_cron):
def register(self, name, fun, description, config_scheme, default_config, default_cron):
self.registrations[name] = {
"fun": fun,
"description": description,
"config_scheme": config_scheme,
"default_config": default_config,
"default_cron": default_cron
}
}

def execute(self, name, config):
if not config:
config = self.registrations.get(name).get("default_config", None)
return self.registrations[name]["fun"](config=config)
12 changes: 11 additions & 1 deletion gengine/base/util.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import datetime
import pytz

class DictObjectProxy:

def __init__(self, obj={}):
Expand Down Expand Up @@ -27,4 +30,11 @@ def __setattr__(self, name, value):

def __call__(self, *args, **kwargs):
return self.target(*args, **kwargs)



def dt_now():
return datetime.datetime.utcnow().replace(tzinfo=pytz.utc)

def dt_ago(**kw):
return datetime.datetime.utcnow().replace(tzinfo=pytz.utc) - datetime.timedelta(**kw)

33 changes: 17 additions & 16 deletions gengine/maintenance/scripts/scheduler_beat.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@

import datetime

from gengine.base.util import dt_ago, dt_now
from sqlalchemy.sql.expression import and_
from zope.sqlalchemy.datamanager import mark_changed

from gengine.metadata import MySession

log = logging.getLogger(__name__)
log.addHandler(logging.StreamHandler())

Expand Down Expand Up @@ -54,8 +53,6 @@ def main(argv=sys.argv):

config = Configurator(settings=settings)
pyramid_dogpile_cache.includeme(config)
config.include("gengine.app.tasksystem")
config.scan()

from gengine.metadata import (
init_session,
Expand All @@ -78,6 +75,8 @@ def main(argv=sys.argv):
import crontab
from gengine.app.tasksystem import ITaskRegistry

config.include("gengine.app.tasksystem")
config.scan("gengine")
enginetasks = config.registry.getUtility(ITaskRegistry).registrations

with transaction.manager:
Expand All @@ -92,15 +91,17 @@ def main(argv=sys.argv):

if cron:

now = dt_now().replace(second=0)

item = crontab.CronItem(line=cron)
s = item.schedule()
prev = s.get_next()
next = s.get_next()
s = item.schedule(date_from=now)
prev = s.get_next().replace(second=0)
next = s.get_next().replace(second=0)

execs = sess.execute(m.t_taskexecutions.select().where(and_(
m.t_taskexecutions.c.task_id == s["id"],
m.t_taskexecutions.c.canceled_at is None,
m.t_taskexecutions.c.finished_at is None,
m.t_taskexecutions.c.task_id == task["id"],
m.t_taskexecutions.c.canceled_at == None,
m.t_taskexecutions.c.finished_at == None,
)).order_by(m.t_taskexecutions.c.planned_at.desc())).fetchall()

found = False
Expand All @@ -111,23 +112,23 @@ def main(argv=sys.argv):
# The next execution is already planned
found = True

if exec["planned_at"] <= prev and prev < datetime.datetime.now() - datetime.timedelta(minutes=10) and not exec["locked_at"]:
if exec["planned_at"] <= prev and prev < dt_ago(minutes=10) and not exec["locked_at"]:
# The execution is more than 10 minutes in the past and not yet locked (worker not running / overloaded)
if next - datetime.timedelta(minutes=10) < datetime.datetime.now():
if next - datetime.timedelta(minutes=10) < dt_now():
# The next execution is planned in less than 10 minutes, cancel the other one
sess.execute(
m.t_taskexecutions.update().values({
'canceled_at': datetime.datetime.now()
'canceled_at': dt_now()
}).where({
'id': exec["id"]
})
)

if exec["locked_at"] and exec["locked_at"] < datetime.datetime.now() - datetime.timedelta(hours=24):
if exec["locked_at"] and exec["locked_at"] < dt_ago(hours=24):
# this task is running for more than 24 hours. probably crashed.... set it to canceled
sess.execute(
m.t_taskexecutions.update().values({
'canceled_at': datetime.datetime.now()
'canceled_at': dt_now()
}).where({
'id': exec["id"]
})
Expand All @@ -137,7 +138,7 @@ def main(argv=sys.argv):
# Plan next execution
sess.execute(
m.t_taskexecutions.insert().values({
'task_id': s["id"],
'task_id': task["id"],
'planned_at': next
})
)
Expand Down

0 comments on commit da62d56

Please sign in to comment.