Skip to content

Commit

Permalink
Fixed finished schedule cleanup in the SQLAlchemy and MongoDB stores
Browse files Browse the repository at this point in the history
Co-authored-by: s_kovalev <s_kovalev@wargaming.net>
  • Loading branch information
Reskov and s_kovalev committed Apr 17, 2024
1 parent 65b7606 commit 3e42a1b
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 13 deletions.
30 changes: 22 additions & 8 deletions src/apscheduler/datastores/mongodb.py
Expand Up @@ -652,35 +652,49 @@ async def cleanup(self) -> None:
with attempt, self.client.start_session() as session:
# Purge expired job results
now = datetime.now(timezone.utc).timestamp()

await to_thread.run_sync(
lambda: self._jobs_results.delete_many(
{"expired_at": {"$lte": now}}, session=session
{"expires_at": {"$lte": now}}, session=session
)
)

# Find finished schedules
async with await AsyncCursor.create(
lambda: self._schedules.find(
{"next_fire_time": None}, projection=["_id"], session=session
{"next_fire_time": None},
projection=["_id", "task_id"],
session=session,
)
) as cursor:
if finished_schedule_ids := {item["_id"] async for item in cursor}:
if finished_schedules := {
item["_id"]: item["task_id"] async for item in cursor
}:
# Find distinct schedule IDs of jobs associated with these
# schedules
for schedule_id in await to_thread.run_sync(
lambda: self._jobs.distinct(
"schedule_id",
{"schedule_id": {"$in": list(finished_schedule_ids)}},
{"schedule_id": {"$in": list(finished_schedules)}},
session=session,
)
):
finished_schedule_ids.discard(schedule_id)
finished_schedules.pop(schedule_id)

# Delete finished schedules that not having any associated jobs
if finished_schedule_ids:
if finished_schedules:
await to_thread.run_sync(
lambda: self._jobs_results.delete_many(
{"schedule_id": {"$in": list(finished_schedule_ids)}},
lambda: self._schedules.delete_many(
{"_id": {"$in": list(finished_schedules)}},
session=session,
)
)

for schedule_id, task_id in finished_schedules.items():
await self._event_broker.publish(
ScheduleRemoved(
schedule_id=schedule_id,
task_id=task_id,
finished=True,
)
)
2 changes: 1 addition & 1 deletion src/apscheduler/datastores/sqlalchemy.py
Expand Up @@ -963,7 +963,7 @@ async def cleanup(self) -> None:
results = await self._execute(conn, query)
if finished_schedule_ids := dict(results.all()):
delete = self._t_schedules.delete().where(
~self._t_schedules.c.id.in_(finished_schedule_ids)
self._t_schedules.c.id.in_(finished_schedule_ids)
)
await self._execute(conn, delete)

Expand Down
12 changes: 8 additions & 4 deletions tests/test_schedulers.py
Expand Up @@ -689,13 +689,16 @@ def check_contextvars() -> None:

async def test_explicit_cleanup(self, raw_datastore: DataStore) -> None:
send, receive = create_memory_object_stream[Event](1)
async with AsyncScheduler(cleanup_interval=None) as scheduler:
async with AsyncScheduler(raw_datastore, cleanup_interval=None) as scheduler:
scheduler.subscribe(send.send, {ScheduleRemoved})
event = anyio.Event()
scheduler.subscribe(lambda _: event.set(), {JobReleased}, one_shot=True)
await scheduler.start_in_background()

# Add a job whose result expires after 1 ms
event = anyio.Event()
job_id = await scheduler.add_job(event.set, result_expiration_time=0.001)
job_id = await scheduler.add_job(
dummy_async_job, result_expiration_time=0.001
)
with fail_after(3):
await event.wait()

Expand All @@ -708,8 +711,9 @@ async def test_explicit_cleanup(self, raw_datastore: DataStore) -> None:

# Add a schedule to immediately set the event
event = anyio.Event()
scheduler.subscribe(lambda _: event.set(), {JobReleased}, one_shot=True)
await scheduler.add_schedule(
event.set, DateTrigger(datetime.now(timezone.utc)), id="event_set"
dummy_async_job, DateTrigger(datetime.now(timezone.utc)), id="event_set"
)
with fail_after(3):
await event.wait()
Expand Down

0 comments on commit 3e42a1b

Please sign in to comment.