Skip to content

Commit

Permalink
regular_worker: simplified selection of work to do
Browse files Browse the repository at this point in the history
The new single query locks and returns an Operation ready to
be executed (all its inputs are in the 'present' state).
In itself, this would be interesting to upstream to AWB, but
it should also be noted that relying instead on the reservations
should instead provide a simpler way to do, more in line with our
slogans (we're doing it for the planner only now). This is more
complicated than for concrete applications, because the regular
worker would not be daemon for most of them, and instead would
react to operator requests.
  • Loading branch information
gracinet committed Nov 6, 2018
1 parent 8320419 commit 2544046
Showing 1 changed file with 15 additions and 30 deletions.
45 changes: 15 additions & 30 deletions anyblok_wms_examples/basic/regular_worker.py
Expand Up @@ -8,7 +8,7 @@
# obtain one at http://mozilla.org/MPL/2.0/.
import logging
from datetime import datetime, timedelta

from sqlalchemy import func
from anyblok import Declarations

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -170,11 +170,19 @@ def planned_op_lock_query(self):
if query is not None:
return query
logger.warning("Lock query not found in cache")
Operation = self.registry.Wms.Operation
Wms = self.registry.Wms
Operation = Wms.Operation
HI = Operation.HistoryInput
Avatar = Wms.PhysObj.Avatar
all_inputs_present = Avatar.query(
func.bool_and(Avatar.state == 'present')).join(
HI.avatar).filter(HI.operation_id == Operation.id)
query = Operation.query(Operation.id).filter(
Operation.type != 'wms_arrival',
all_inputs_present.as_scalar(),
Operation.state == 'planned').order_by(
Operation.dt_execution).with_for_update(
of=Operation,
key_share=True,
skip_locked=True)
self._planned_lock_query = query
Expand All @@ -183,45 +191,22 @@ def planned_op_lock_query(self):
def select_ready_operation(self):
"""Find an operation ready to be processed (and lock it)
:return: the operation or None and boolean telling if no operation was
found if that's definitive
:return: the operation or None
"""
Operation = self.registry.Wms.Operation
# starting with a fresh MVCC snapshot
self.registry.commit()
# TODO this is too complicated: locking an op then climbing along
# the 'follows' relation to find an executable one.
# it'd be much simpler to look for an Operation whose inputs are
# all present.
planned_id = self.planned_op_lock_query().first()
if planned_id is None:
return None, True
return None
planned = Operation.query().get(planned_id)
previous_planned = True
while previous_planned:
previous_planned = [
op.id for op in planned.follows if op.state == 'planned']
if not previous_planned:
break
planned = Operation.query().filter(
Operation.id.in_(previous_planned)).with_for_update(
key_share=True,
skip_locked=True).first()
if planned is None:
return None, False
return planned, None
return planned

def process_one(self):
"""Find any Operation that can be done, and execute it."""
# first alternative: climbing up planned operations, without
# complicated outer join to avatars that are conflict sources
# for PG
op, stop = self.select_ready_operation()
op = self.select_ready_operation()
if op is None:
if stop:
return
else:
return True
return

logger.info("%s, found op ready to be executed: %r, doing it now.",
self, op)
Expand Down

0 comments on commit 2544046

Please sign in to comment.