Skip to content

Commit

Permalink
Merge 11f995c into 9446caa
Browse files Browse the repository at this point in the history
  • Loading branch information
Flix6x committed Apr 12, 2024
2 parents 9446caa + 11f995c commit bf09983
Showing 1 changed file with 62 additions and 25 deletions.
87 changes: 62 additions & 25 deletions flexmeasures/cli/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from flask.cli import with_appcontext
from rq import Queue, Worker
from sqlalchemy.orm import configure_mappers
from tabulate import tabulate

from flexmeasures.data.services.scheduling import handle_scheduling_exception
from flexmeasures.data.services.forecasting import handle_forecasting_exception
Expand Down Expand Up @@ -98,10 +99,23 @@ def show_queues():
"""

configure_mappers()
for q in app.queues.values():
click.echo(
f"Queue {q.name} has {q.count} jobs (and {q.failed_job_registry.count} jobs have failed)."
queue_data = [
(
q.name,
q.started_job_registry.count,
q.count,
q.deferred_job_registry.count,
q.scheduled_job_registry.count,
q.failed_job_registry.count,
)
for q in app.queues.values()
]
click.echo(
tabulate(
queue_data,
headers=["Queue", "Started", "Queued", "Deferred", "Scheduled", "Failed"],
)
)


@fm_jobs.command("clear-queue")
Expand All @@ -110,34 +124,53 @@ def show_queues():
"--queue",
default=None,
required=True,
help="State which queue(s) to clear (using '|' as separator), e.g. 'forecasting', 'scheduling' or 'forecasting|scheduling'. 'failed' is also supported.",
help="State which queue(s) to clear (using '|' as separator), e.g. 'forecasting', 'scheduling' or 'forecasting|scheduling'.",
)
@click.option(
"--deferred",
is_flag=True,
default=False,
help="If True, the deferred registry of the queue(s) will be cleared (and not the jobs currently in queue to be done).",
)
@click.option(
"--scheduled",
is_flag=True,
default=False,
help="If True, the scheduled registry of the queue(s) will be cleared (and not the jobs currently in queue to be done).",
)
@click.option(
"--failed",
is_flag=True,
default=False,
help="If True, the failed registry of the queue(s) will be cleared (and not the jobs to be done).",
help="If True, the failed registry of the queue(s) will be cleared (and not the jobs currently in queue to be done).",
)
def clear_queue(queue: str, failed: bool):
def clear_queue(queue: str, deferred: bool, scheduled: bool, failed: bool):
"""
Clear a job queue (or its registry of failed jobs).
Clear a job queue (or its registry of deferred/scheduled/failed jobs).
We use the app context to find out which redis queues to use.
"""
q_list = parse_queue_list(queue)
registries = dict(
deferred=("deferred_job_registry", deferred),
scheduled=("scheduled_job_registry", scheduled),
failed=("failed_job_registry", failed),
)
configure_mappers()
for the_queue in q_list:
if failed:
reg = the_queue.failed_job_registry
count_before = reg.count
for job_id in reg.get_job_ids():
reg.remove(job_id) # not actually deleting the job
count_after = reg.count
click.secho(
f"Cleared {count_before - count_after} failed jobs from the registry at {the_queue}.",
**MsgStyle.WARN,
)
else:
for _type, (registry, needs_clearing) in registries.items():
if needs_clearing:
reg = getattr(the_queue, registry)
count_before = reg.count
for job_id in reg.get_job_ids():
reg.remove(job_id) # not actually deleting the job
count_after = reg.count
click.secho(
f"Cleared {count_before - count_after} {_type} jobs from the {registry} at {the_queue}.",
**MsgStyle.WARN,
)
wrap_up_message(count_after)
if not any([deferred, scheduled, failed]):
count_before = the_queue.count
if count_before > 0:
the_queue.empty()
Expand All @@ -146,13 +179,17 @@ def clear_queue(queue: str, failed: bool):
f"Cleared {count_before - count_after} jobs from {the_queue}.",
**MsgStyle.SUCCESS,
)
if count_after > 0:
click.secho(
f"There are {count_after} jobs which could not be removed for some reason.",
**MsgStyle.WARN,
)
else:
click.echo("No jobs left.")
wrap_up_message(count_after)


def wrap_up_message(count_after: int):
if count_after > 0:
click.secho(
f"There are {count_after} jobs which could not be removed for some reason.",
**MsgStyle.WARN,
)
else:
click.echo("No jobs left.")


def handle_worker_exception(job, exc_type, exc_value, traceback):
Expand Down

0 comments on commit bf09983

Please sign in to comment.