Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 24 additions & 29 deletions src/datajoint/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import platform
import subprocess

from .condition import AndList, Not
from .condition import AndList, Not, make_condition
from .errors import DataJointError, DuplicateError
from .heading import Heading
from .table import Table
Expand Down Expand Up @@ -431,8 +431,10 @@ def reserve(self, key: dict) -> bool:
"""
Attempt to reserve a pending job for processing.

Updates status to ``'reserved'`` if currently ``'pending'`` and
``scheduled_time <= now``.
Atomically updates status to ``'reserved'`` if currently ``'pending'``
and ``scheduled_time <= now``, using a single UPDATE with a WHERE clause
that includes the status check. This prevents race conditions where
multiple workers could reserve the same job simultaneously.

Parameters
----------
Expand All @@ -444,33 +446,26 @@ def reserve(self, key: dict) -> bool:
bool
True if reservation successful, False if job not available.
"""
# Check if job is pending and scheduled (use CURRENT_TIMESTAMP(3) for datetime(3) precision)
job = (self & key & "status='pending'" & "scheduled_time <= CURRENT_TIMESTAMP(3)").to_dicts()

if not job:
return False

# Get server time for reserved_time
server_now = self.connection.query("SELECT CURRENT_TIMESTAMP").fetchone()[0]

# Build update row with primary key and new values
pk = self._get_pk(key)
update_row = {
**pk,
"status": "reserved",
"reserved_time": server_now,
"host": platform.node(),
"pid": os.getpid(),
"connection_id": self.connection.connection_id,
"user": self.connection.get_user(),
"version": _get_job_version(),
}

try:
self.update1(update_row)
return True
except Exception:
return False
where = make_condition(self, pk, set())
qi = self.adapter.quote_identifier
assignments = ", ".join(f"{qi(k)}=%s" for k in ("status", "host", "pid", "connection_id", "user", "version"))
query = (
f"UPDATE {self.full_table_name} "
f"SET {assignments}, {qi('reserved_time')}=CURRENT_TIMESTAMP(3) "
f"WHERE {where} AND {qi('status')}='pending' "
f"AND {qi('scheduled_time')} <= CURRENT_TIMESTAMP(3)"
)
args = [
"reserved",
platform.node(),
os.getpid(),
self.connection.connection_id,
self.connection.get_user(),
_get_job_version(),
]
cursor = self.connection.query(query, args=args)
return cursor.rowcount == 1

def complete(self, key: dict, duration: float | None = None) -> None:
"""
Expand Down
Loading