diff --git a/scheduler/admin/task_admin.py b/scheduler/admin/task_admin.py index 7786d0c..597df23 100644 --- a/scheduler/admin/task_admin.py +++ b/scheduler/admin/task_admin.py @@ -27,7 +27,7 @@ def get_job_executions_for_task(queue_name: str, scheduled_task: Task) -> List[J ) res = sorted( - list(filter(lambda j: job_execution_of(j, scheduled_task), job_list)), key=lambda j: j.created_at, reverse=True + filter(lambda j: job_execution_of(j, scheduled_task), job_list), key=lambda j: j.created_at, reverse=True ) return res @@ -35,13 +35,13 @@ def get_job_executions_for_task(queue_name: str, scheduled_task: Task) -> List[J class JobArgInline(GenericStackedInline): model = TaskArg extra = 0 - fieldsets = ((None, dict(fields=("arg_type", "val"))),) + fieldsets = ((None, {"fields": ("arg_type", "val")}),) class JobKwargInline(GenericStackedInline): model = TaskKwarg extra = 0 - fieldsets = ((None, dict(fields=("key", ("arg_type", "val")))),) + fieldsets = ((None, {"fields": ("key", ("arg_type", "val"))}),) def get_message_bit(rows_updated: int) -> str: @@ -86,40 +86,40 @@ class Media: fieldsets = ( ( None, - dict( - fields=( + { + "fields": ( "name", "callable", ("enabled", "timeout", "result_ttl"), "task_type", ) - ), + }, ), ( None, - dict(fields=("scheduled_time",), classes=("tasktype-OnceTaskType",)), + {"fields": ("scheduled_time",), "classes": ("tasktype-OnceTaskType",)}, ), ( None, - dict(fields=("cron_string",), classes=("tasktype-CronTaskType",)), + {"fields": ("cron_string",), "classes": ("tasktype-CronTaskType",)}, ), ( None, - dict( - fields=( + { + "fields": ( ( "interval", "interval_unit", ), "repeat", ), - classes=("tasktype-RepeatableTaskType",), - ), + "classes": ("tasktype-RepeatableTaskType",), + }, ), - (_("Queue settings"), dict(fields=(("queue", "at_front"), "job_name"))), + (_("Queue settings"), {"fields": (("queue", "at_front"), "job_name")}), ( _("Previous runs info"), - dict(fields=(("successful_runs", "last_successful_run"), ("failed_runs", "last_failed_run"))), + {"fields": (("successful_runs", "last_successful_run"), ("failed_runs", "last_failed_run"))}, ), ) @@ -157,7 +157,7 @@ def change_view(self, request: HttpRequest, object_id, form_url="", extra_contex execution_list = get_job_executions_for_task(obj.queue, obj) except ConnectionErrorTypes as e: logger.warn(f"Could not get job executions: {e}") - execution_list = list() + execution_list = [] paginator = self.get_paginator(request, execution_list, SCHEDULER_CONFIG.EXECUTIONS_IN_PAGE) page_number = request.GET.get("p", 1) page_obj = paginator.get_page(page_number) diff --git a/scheduler/decorators.py b/scheduler/decorators.py index d9d1339..0b51594 100644 --- a/scheduler/decorators.py +++ b/scheduler/decorators.py @@ -4,7 +4,7 @@ from scheduler.helpers.callback import Callback from scheduler.types import ConnectionType -JOB_METHODS_LIST: List[str] = list() +JOB_METHODS_LIST: List[str] = [] class job: diff --git a/scheduler/helpers/queues/queue_logic.py b/scheduler/helpers/queues/queue_logic.py index 46f61bd..4af73d6 100644 --- a/scheduler/helpers/queues/queue_logic.py +++ b/scheduler/helpers/queues/queue_logic.py @@ -63,14 +63,14 @@ def queue_perform_job(job_model: JobModel, connection: ConnectionType) -> Any: class Queue: - REGISTRIES = dict( - finished="finished_job_registry", - failed="failed_job_registry", - scheduled="scheduled_job_registry", - active="active_job_registry", - canceled="canceled_job_registry", - queued="queued_job_registry", - ) + REGISTRIES = { + "finished": "finished_job_registry", + "failed": "failed_job_registry", + "scheduled": "scheduled_job_registry", + "active": "active_job_registry", + "canceled": "canceled_job_registry", + "queued": "queued_job_registry", + } def __init__(self, connection: ConnectionType, name: str, is_async: bool = True) -> None: """Initializes a Queue object. @@ -150,7 +150,7 @@ def get_registry(self, name: str) -> JobNamesRegistry: raise NoSuchRegistryError(f"Unknown registry name {name}") def get_all_job_names(self) -> List[str]: - all_job_names = list() + all_job_names = [] all_job_names.extend(self.queued_job_registry.all(self.connection)) all_job_names.extend(self.finished_job_registry.all(self.connection)) all_job_names.extend(self.active_job_registry.all(self.connection)) diff --git a/scheduler/management/commands/export.py b/scheduler/management/commands/export.py index 3571686..0a89e20 100644 --- a/scheduler/management/commands/export.py +++ b/scheduler/management/commands/export.py @@ -40,13 +40,11 @@ def add_arguments(self, parser: CommandParser) -> None: def handle(self, *args: Any, **options: Any) -> None: file = open(options.get("filename"), "w") if options.get("filename") else sys.stdout - res = list() tasks = Task.objects.all() if options.get("enabled"): tasks = tasks.filter(enabled=True) - for task in tasks: - res.append(task.to_dict()) + res = [task.to_dict() for task in tasks] if options.get("format") == "json": import json diff --git a/scheduler/management/commands/import.py b/scheduler/management/commands/import.py index 448913e..b5a5f5c 100644 --- a/scheduler/management/commands/import.py +++ b/scheduler/management/commands/import.py @@ -52,9 +52,7 @@ def create_task_from_dict(task_dict: Dict[str, Any], update: bool) -> Optional[T if not settings.USE_TZ and not timezone.is_naive(target): target = timezone.make_naive(target) kwargs["scheduled_time"] = target - model_fields = set( - map(lambda field: field.attname, filter(lambda field: hasattr(field, "attname"), Task._meta.get_fields())) - ) + model_fields = {field.attname for field in filter(lambda field: hasattr(field, "attname"), Task._meta.get_fields())} keys_to_ignore = list(filter(lambda _k: _k not in model_fields, kwargs.keys())) for k in keys_to_ignore: del kwargs[k] @@ -116,7 +114,7 @@ def add_arguments(self, parser: CommandParser) -> None: def handle(self, *args: Any, **options: Any) -> None: file = open(options.get("filename")) if options.get("filename") else sys.stdin # type: ignore[arg-type] - jobs = list() + jobs = [] if options.get("format") == "json": import json diff --git a/scheduler/management/commands/scheduler_stats.py b/scheduler/management/commands/scheduler_stats.py index 3f19f08..653737e 100644 --- a/scheduler/management/commands/scheduler_stats.py +++ b/scheduler/management/commands/scheduler_stats.py @@ -66,7 +66,7 @@ def _print_stats_dashboard( click.echo(f"| {'Name':<16} | Queued | Active | Finished | Canceled | Workers |") self._print_separator() for ind, queue in enumerate(statistics["queues"]): - vals = list((queue[k] for k in KEYS)) + vals = [queue[k] for k in KEYS] # Deal with colors if not with_color: colors = ["" for _ in KEYS] diff --git a/scheduler/management/commands/scheduler_worker.py b/scheduler/management/commands/scheduler_worker.py index c193363..1ba5dcd 100644 --- a/scheduler/management/commands/scheduler_worker.py +++ b/scheduler/management/commands/scheduler_worker.py @@ -150,7 +150,7 @@ def handle(self, **options: Any) -> None: # Check whether sentry is enabled if options.get("sentry_dsn") is not None: - sentry_opts = dict(ca_certs=options.get("sentry_ca_certs"), debug=options.get("sentry_debug")) + sentry_opts = {"ca_certs": options.get("sentry_ca_certs"), "debug": options.get("sentry_debug")} dsn: str = options.get("sentry_dsn") # type: ignore register_sentry(dsn, **sentry_opts) diff --git a/scheduler/models/task.py b/scheduler/models/task.py index f8f821a..30ee6b1 100644 --- a/scheduler/models/task.py +++ b/scheduler/models/task.py @@ -191,7 +191,7 @@ def is_scheduled(self) -> bool: self.rqueue.queued_job_registry.exists(pipeline, self.job_name) self.rqueue.active_job_registry.exists(pipeline, self.job_name) results = pipeline.execute() - res = any([item is not None for item in results]) + res = any(item is not None for item in results) # If the job_name is not scheduled/queued/started, # update the job_id to None. (The job_id belongs to a previous run which is completed) @@ -230,14 +230,14 @@ def _enqueue_args(self) -> Dict[str, Any]: - Set job-id to proper format - set job meta """ - res = dict( - meta=dict(), - task_type=self.task_type, - scheduled_task_id=self.id, - on_success=Callback(success_callback), - on_failure=Callback(failure_callback), - name=self._next_job_id(), - ) + res = { + "meta": {}, + "task_type": self.task_type, + "scheduled_task_id": self.id, + "on_success": Callback(success_callback), + "on_failure": Callback(failure_callback), + "name": self._next_job_id(), + } if self.at_front: res["at_front"] = self.at_front if self.timeout: @@ -287,29 +287,29 @@ def _schedule_time(self) -> datetime: def to_dict(self) -> Dict[str, Any]: """Export model to dictionary, so it can be saved as external file backup""" interval_unit = str(self.interval_unit) if self.interval_unit else None - res = dict( - model=str(self.task_type), - name=self.name, - callable=self.callable, - callable_args=[dict(arg_type=arg.arg_type, val=arg.val) for arg in self.callable_args.all()], - callable_kwargs=[ - dict(arg_type=arg.arg_type, key=arg.key, val=arg.val) for arg in self.callable_kwargs.all() + res = { + "model": str(self.task_type), + "name": self.name, + "callable": self.callable, + "callable_args": [{"arg_type": arg.arg_type, "val": arg.val} for arg in self.callable_args.all()], + "callable_kwargs": [ + {"arg_type": arg.arg_type, "key": arg.key, "val": arg.val} for arg in self.callable_kwargs.all() ], - enabled=self.enabled, - queue=self.queue, - repeat=getattr(self, "repeat", None), - at_front=self.at_front, - timeout=self.timeout, - result_ttl=self.result_ttl, - cron_string=getattr(self, "cron_string", None), - scheduled_time=self._schedule_time().isoformat(), - interval=getattr(self, "interval", None), - interval_unit=interval_unit, - successful_runs=getattr(self, "successful_runs", None), - failed_runs=getattr(self, "failed_runs", None), - last_successful_run=getattr(self, "last_successful_run", None), - last_failed_run=getattr(self, "last_failed_run", None), - ) + "enabled": self.enabled, + "queue": self.queue, + "repeat": getattr(self, "repeat", None), + "at_front": self.at_front, + "timeout": self.timeout, + "result_ttl": self.result_ttl, + "cron_string": getattr(self, "cron_string", None), + "scheduled_time": self._schedule_time().isoformat(), + "interval": getattr(self, "interval", None), + "interval_unit": interval_unit, + "successful_runs": getattr(self, "successful_runs", None), + "failed_runs": getattr(self, "failed_runs", None), + "last_successful_run": getattr(self, "last_successful_run", None), + "last_failed_run": getattr(self, "last_failed_run", None), + } return res def get_absolute_url(self) -> str: diff --git a/scheduler/settings.py b/scheduler/settings.py index 4be61f0..55c5e3c 100644 --- a/scheduler/settings.py +++ b/scheduler/settings.py @@ -13,7 +13,7 @@ logger = logging.getLogger("scheduler") -_QUEUES: Dict[str, QueueConfiguration] = dict() +_QUEUES: Dict[str, QueueConfiguration] = {} SCHEDULER_CONFIG: SchedulerConfiguration = SchedulerConfiguration() diff --git a/scheduler/tests/test_mgmt_commands/test_export.py b/scheduler/tests/test_mgmt_commands/test_export.py index 8b3d8f8..759c2dd 100644 --- a/scheduler/tests/test_mgmt_commands/test_export.py +++ b/scheduler/tests/test_mgmt_commands/test_export.py @@ -22,7 +22,7 @@ def tearDown(self) -> None: os.remove(self.tmpfile.name) def test_export__should_export_job(self): - tasks = list() + tasks = [] tasks.append(task_factory(TaskType.ONCE, enabled=True)) tasks.append(task_factory(TaskType.REPEATABLE, enabled=True)) @@ -35,7 +35,7 @@ def test_export__should_export_job(self): self.assertEqual(result[1], tasks[1].to_dict()) def test_export__should_export_enabled_jobs_only(self): - tasks = list() + tasks = [] tasks.append(task_factory(TaskType.ONCE, enabled=True)) tasks.append(task_factory(TaskType.REPEATABLE, enabled=False)) @@ -47,7 +47,7 @@ def test_export__should_export_enabled_jobs_only(self): self.assertEqual(result[0], tasks[0].to_dict()) def test_export__should_export_job_yaml_without_yaml_lib(self): - tasks = list() + tasks = [] tasks.append(task_factory(TaskType.ONCE, enabled=True)) tasks.append(task_factory(TaskType.REPEATABLE, enabled=True)) @@ -58,7 +58,7 @@ def test_export__should_export_job_yaml_without_yaml_lib(self): self.assertEqual(cm.exception.code, 1) def test_export__should_export_job_yaml_green(self): - tasks = list() + tasks = [] tasks.append(task_factory(TaskType.ONCE, enabled=True)) tasks.append(task_factory(TaskType.REPEATABLE, enabled=True)) tasks.append(task_factory(TaskType.CRON, enabled=True)) diff --git a/scheduler/tests/test_mgmt_commands/test_import.py b/scheduler/tests/test_mgmt_commands/test_import.py index b37a407..8560486 100644 --- a/scheduler/tests/test_mgmt_commands/test_import.py +++ b/scheduler/tests/test_mgmt_commands/test_import.py @@ -21,7 +21,7 @@ def tearDown(self) -> None: os.remove(self.tmpfile.name) def test_import__should_schedule_job(self): - tasks = list() + tasks = [] tasks.append(task_factory(TaskType.ONCE, enabled=True, instance_only=True)) tasks.append(task_factory(TaskType.REPEATABLE, enabled=True, instance_only=True)) res = json.dumps([j.to_dict() for j in tasks]) @@ -38,7 +38,7 @@ def test_import__should_schedule_job(self): self.assertEqual(getattr(tasks[0], attr), getattr(db_task, attr)) def test_import__should_schedule_job_yaml(self): - tasks = list() + tasks = [] tasks.append(task_factory(TaskType.ONCE, enabled=True, instance_only=True)) tasks.append(task_factory(TaskType.REPEATABLE, enabled=True, instance_only=True)) res = yaml.dump([j.to_dict() for j in tasks], default_flow_style=False) @@ -55,7 +55,7 @@ def test_import__should_schedule_job_yaml(self): self.assertEqual(getattr(tasks[0], attr), getattr(task, attr)) def test_import__should_schedule_job_yaml_without_yaml_lib(self): - tasks = list() + tasks = [] tasks.append(task_factory(TaskType.ONCE, enabled=True, instance_only=True)) tasks.append(task_factory(TaskType.REPEATABLE, enabled=True, instance_only=True)) res = yaml.dump([j.to_dict() for j in tasks], default_flow_style=False) @@ -68,7 +68,7 @@ def test_import__should_schedule_job_yaml_without_yaml_lib(self): self.assertEqual(cm.exception.code, 1) def test_import__should_schedule_job_reset(self): - tasks = list() + tasks = [] task_factory(TaskType.ONCE, enabled=True) task_factory(TaskType.ONCE, enabled=True) tasks.append(task_factory(TaskType.ONCE, enabled=True)) @@ -91,7 +91,7 @@ def test_import__should_schedule_job_reset(self): self.assertEqual(getattr(tasks[1], attr), getattr(task, attr)) def test_import__should_schedule_job_update_existing(self): - tasks = list() + tasks = [] tasks.append(task_factory(TaskType.ONCE, enabled=True)) tasks.append(task_factory(TaskType.ONCE, enabled=True)) res = json.dumps([j.to_dict() for j in tasks]) @@ -107,7 +107,7 @@ def test_import__should_schedule_job_update_existing(self): self.assertEqual(getattr(tasks[0], attr), getattr(task, attr)) def test_import__should_schedule_job_without_update_existing(self): - tasks = list() + tasks = [] tasks.append(task_factory(TaskType.ONCE, enabled=True)) tasks.append(task_factory(TaskType.ONCE, enabled=True)) res = json.dumps([j.to_dict() for j in tasks]) diff --git a/scheduler/tests/test_mgmt_commands/test_scheduler_stats.py b/scheduler/tests/test_mgmt_commands/test_scheduler_stats.py index 1a04648..165292c 100644 --- a/scheduler/tests/test_mgmt_commands/test_scheduler_stats.py +++ b/scheduler/tests/test_mgmt_commands/test_scheduler_stats.py @@ -10,7 +10,7 @@ from scheduler.helpers.queues import get_queue -@override_settings(SCHEDULER_QUEUES=dict(default={"HOST": "localhost", "PORT": 6379, "DB": 0})) +@override_settings(SCHEDULER_QUEUES={"default": {"HOST": "localhost", "PORT": 6379, "DB": 0}}) class SchedulerStatsTest(TestCase): EXPECTED_OUTPUT = { "queues": [ @@ -33,7 +33,7 @@ class SchedulerStatsTest(TestCase): def setUp(self): super(SchedulerStatsTest, self).setUp() SchedulerStatsTest.OLD_QUEUES = settings._QUEUES - settings._QUEUES = dict() + settings._QUEUES = {} settings.conf_settings() get_queue("default").connection.flushall() diff --git a/scheduler/tests/test_settings.py b/scheduler/tests/test_settings.py index 4d0e957..9d3da00 100644 --- a/scheduler/tests/test_settings.py +++ b/scheduler/tests/test_settings.py @@ -22,21 +22,21 @@ def tearDown(self): def test_scheduler_config_as_dict(self): from scheduler.settings import SCHEDULER_CONFIG - settings.SCHEDULER_CONFIG = dict( - EXECUTIONS_IN_PAGE=SCHEDULER_CONFIG.EXECUTIONS_IN_PAGE + 1, - SCHEDULER_INTERVAL=SCHEDULER_CONFIG.SCHEDULER_INTERVAL + 1, - BROKER=Broker.REDIS, - CALLBACK_TIMEOUT=SCHEDULER_CONFIG.SCHEDULER_INTERVAL + 1, - DEFAULT_SUCCESS_TTL=SCHEDULER_CONFIG.DEFAULT_SUCCESS_TTL + 1, - DEFAULT_FAILURE_TTL=SCHEDULER_CONFIG.DEFAULT_FAILURE_TTL + 1, - DEFAULT_JOB_TTL=SCHEDULER_CONFIG.DEFAULT_JOB_TTL + 1, - DEFAULT_JOB_TIMEOUT=SCHEDULER_CONFIG.DEFAULT_JOB_TIMEOUT + 1, + settings.SCHEDULER_CONFIG = { + "EXECUTIONS_IN_PAGE": SCHEDULER_CONFIG.EXECUTIONS_IN_PAGE + 1, + "SCHEDULER_INTERVAL": SCHEDULER_CONFIG.SCHEDULER_INTERVAL + 1, + "BROKER": Broker.REDIS, + "CALLBACK_TIMEOUT": SCHEDULER_CONFIG.SCHEDULER_INTERVAL + 1, + "DEFAULT_SUCCESS_TTL": SCHEDULER_CONFIG.DEFAULT_SUCCESS_TTL + 1, + "DEFAULT_FAILURE_TTL": SCHEDULER_CONFIG.DEFAULT_FAILURE_TTL + 1, + "DEFAULT_JOB_TTL": SCHEDULER_CONFIG.DEFAULT_JOB_TTL + 1, + "DEFAULT_JOB_TIMEOUT": SCHEDULER_CONFIG.DEFAULT_JOB_TIMEOUT + 1, # General configuration values - DEFAULT_WORKER_TTL=SCHEDULER_CONFIG.DEFAULT_WORKER_TTL + 1, - DEFAULT_MAINTENANCE_TASK_INTERVAL=SCHEDULER_CONFIG.DEFAULT_MAINTENANCE_TASK_INTERVAL + 1, - DEFAULT_JOB_MONITORING_INTERVAL=SCHEDULER_CONFIG.DEFAULT_JOB_MONITORING_INTERVAL + 1, - SCHEDULER_FALLBACK_PERIOD_SECS=SCHEDULER_CONFIG.SCHEDULER_FALLBACK_PERIOD_SECS + 1, - ) + "DEFAULT_WORKER_TTL": SCHEDULER_CONFIG.DEFAULT_WORKER_TTL + 1, + "DEFAULT_MAINTENANCE_TASK_INTERVAL": SCHEDULER_CONFIG.DEFAULT_MAINTENANCE_TASK_INTERVAL + 1, + "DEFAULT_JOB_MONITORING_INTERVAL": SCHEDULER_CONFIG.DEFAULT_JOB_MONITORING_INTERVAL + 1, + "SCHEDULER_FALLBACK_PERIOD_SECS": SCHEDULER_CONFIG.SCHEDULER_FALLBACK_PERIOD_SECS + 1, + } conf_settings() from scheduler.settings import SCHEDULER_CONFIG @@ -69,20 +69,20 @@ def test_scheduler_config_as_data_class(self): self.assertEqual(getattr(SCHEDULER_CONFIG, key), value) def test_scheduler_config_as_dict_bad_param(self): - settings.SCHEDULER_CONFIG = dict( - EXECUTIONS_IN_PAGE=1, - SCHEDULER_INTERVAL=60, - BROKER=Broker.REDIS, - CALLBACK_TIMEOUT=1111, - DEFAULT_SUCCESS_TTL=1111, - DEFAULT_FAILURE_TTL=111111, - DEFAULT_JOB_TTL=1111, - DEFAULT_JOB_TIMEOUT=11111, + settings.SCHEDULER_CONFIG = { + "EXECUTIONS_IN_PAGE": 1, + "SCHEDULER_INTERVAL": 60, + "BROKER": Broker.REDIS, + "CALLBACK_TIMEOUT": 1111, + "DEFAULT_SUCCESS_TTL": 1111, + "DEFAULT_FAILURE_TTL": 111111, + "DEFAULT_JOB_TTL": 1111, + "DEFAULT_JOB_TIMEOUT": 11111, # General configuration values - DEFAULT_WORKER_TTL=11111, - DEFAULT_MAINTENANCE_TASK_INTERVAL=111, - DEFAULT_JOB_MONITORING_INTERVAL=1111, - SCHEDULER_FALLBACK_PERIOD_SECS=1111, - BAD_PARAM="bad_value", # This should raise an error - ) + "DEFAULT_WORKER_TTL": 11111, + "DEFAULT_MAINTENANCE_TASK_INTERVAL": 111, + "DEFAULT_JOB_MONITORING_INTERVAL": 1111, + "SCHEDULER_FALLBACK_PERIOD_SECS": 1111, + "BAD_PARAM": "bad_value", # This should raise an error + } self.assertRaises(ImproperlyConfigured, conf_settings) diff --git a/scheduler/tests/test_task_types/test_task_model.py b/scheduler/tests/test_task_types/test_task_model.py index 2b28bbc..fcab1ec 100644 --- a/scheduler/tests/test_task_types/test_task_model.py +++ b/scheduler/tests/test_task_types/test_task_model.py @@ -231,7 +231,7 @@ def test_parse_kwargs(self): taskarg_factory(TaskKwarg, key="key3", arg_type="bool", val=True, content_object=task) taskarg_factory(TaskKwarg, key="key4", arg_type="datetime", val=date, content_object=task) kwargs = task.parse_kwargs() - self.assertEqual(kwargs, dict(key1="one", key2=2, key3=True, key4=date)) + self.assertEqual(kwargs, {'key1': "one", 'key2': 2, 'key3': True, 'key4': date}) def test_callable_args_and_kwargs(self): task = task_factory(self.task_type, callable="scheduler.tests.jobs.test_args_kwargs") @@ -418,7 +418,7 @@ def test_admin_single_delete(self): self.assertTrue(task.is_scheduled()) prev_executions_count = len(_get_executions(task)) url = reverse("admin:scheduler_task_delete", args=[task.pk]) - data = dict(post="yes") + data = {'post': "yes"} # act res = self.client.post(url, data=data, follow=True) # assert diff --git a/scheduler/tests/test_worker/test_worker_commands.py b/scheduler/tests/test_worker/test_worker_commands.py index 5720a89..b6426d6 100644 --- a/scheduler/tests/test_worker/test_worker_commands.py +++ b/scheduler/tests/test_worker/test_worker_commands.py @@ -71,7 +71,7 @@ def test_stop_job_command__success(self, mock_stopped_callback): sleep(0.1) command = StopJobCommand(worker_name=worker_name, job_name=job.name) command_payload = json.dumps(command.command_payload()) - worker._command_listener.handle_payload(dict(data=command_payload)) + worker._command_listener.handle_payload({"data": command_payload}) worker.monitor_job_execution_process(job, queue) # Assert diff --git a/scheduler/tests/testtools.py b/scheduler/tests/testtools.py index ddc18e1..07e9e5d 100644 --- a/scheduler/tests/testtools.py +++ b/scheduler/tests/testtools.py @@ -26,7 +26,7 @@ def _run_worker_process(worker: Worker, **kwargs): def run_worker_in_process(*args, name="test-worker") -> Tuple[multiprocessing.Process, str]: worker = create_worker(*args, name=name, fork_job_execution=False, with_scheduler=False) - process = multiprocessing.Process(target=_run_worker_process, args=(worker,), kwargs=dict()) + process = multiprocessing.Process(target=_run_worker_process, args=(worker,), kwargs={}) process.start() return process, name @@ -49,32 +49,32 @@ def sequence_gen(): def task_factory( task_type: TaskType, callable_name: str = "scheduler.tests.jobs.test_job", instance_only=False, **kwargs ): - values = dict( - name="Scheduled Job %d" % next(seq), - queue=list(settings._QUEUES.keys())[0], - callable=callable_name, - enabled=True, - timeout=None, - ) + values = { + "name": "Scheduled Job %d" % next(seq), + "queue": list(settings._QUEUES.keys())[0], + "callable": callable_name, + "enabled": True, + "timeout": None, + } if task_type == TaskType.ONCE: values.update( - dict( - result_ttl=None, - scheduled_time=timezone.now() + timedelta(days=1), - ) + { + "result_ttl": None, + "scheduled_time": timezone.now() + timedelta(days=1), + } ) elif task_type == TaskType.REPEATABLE: values.update( - dict( - result_ttl=None, - interval=1, - interval_unit="hours", - repeat=None, - scheduled_time=timezone.now() + timedelta(days=1), - ) + { + "result_ttl": None, + "interval": 1, + "interval_unit": "hours", + "repeat": None, + "scheduled_time": timezone.now() + timedelta(days=1), + } ) elif task_type == TaskType.CRON: - values.update(dict(cron_string="0 0 * * *")) + values.update({"cron_string": "0 0 * * *"}) values.update(kwargs) if instance_only: instance = Task(task_type=task_type, **values) @@ -87,13 +87,13 @@ def taskarg_factory(cls, **kwargs): content_object = kwargs.pop("content_object", None) if content_object is None: content_object = task_factory(TaskType.ONCE) - values = dict( - arg_type="str", - val="", - object_id=content_object.id, - content_type=ContentType.objects.get_for_model(content_object), - content_object=content_object, - ) + values = { + "arg_type": "str", + "val": "", + "object_id": content_object.id, + "content_type": ContentType.objects.get_for_model(content_object), + "content_object": content_object, + } if cls == TaskKwarg: values["key"] = ("key%d" % next(seq),) values.update(kwargs) diff --git a/scheduler/views/queue_views.py b/scheduler/views/queue_views.py index 82c06bf..f97547f 100644 --- a/scheduler/views/queue_views.py +++ b/scheduler/views/queue_views.py @@ -20,7 +20,7 @@ def _get_registry_job_list(queue: Queue, registry: JobNamesRegistry, page: int) -> Tuple[List[JobModel], int, range]: items_per_page = settings.SCHEDULER_CONFIG.EXECUTIONS_IN_PAGE num_jobs = registry.count(queue.connection) - job_list: List[JobModel] = list() + job_list: List[JobModel] = [] if num_jobs == 0: return job_list, num_jobs, range(1, 1) @@ -114,7 +114,7 @@ class QueueData: def get_statistics(run_maintenance_tasks: bool = False) -> Dict[str, List[Dict[str, Any]]]: queue_names = get_queue_names() queues: List[QueueData] = [] - queue_workers_count: Dict[str, int] = {queue_name: 0 for queue_name in queue_names} + queue_workers_count: Dict[str, int] = dict.fromkeys(queue_names, 0) workers = get_all_workers() for worker in workers: for queue_name in worker.queue_names: diff --git a/scheduler/views/worker_views.py b/scheduler/views/worker_views.py index 2736730..da0457d 100644 --- a/scheduler/views/worker_views.py +++ b/scheduler/views/worker_views.py @@ -14,7 +14,7 @@ def get_worker_executions(worker: WorkerModel) -> List[JobModel]: - res = list() + res = [] for queue_name in worker.queue_names: queue = get_queue(queue_name) curr_jobs = queue.get_all_jobs() @@ -57,7 +57,7 @@ def worker_details(request: HttpRequest, name: str) -> HttpResponse: @staff_member_required # type: ignore def workers_list(request: HttpRequest) -> HttpResponse: all_workers = get_all_workers() - worker_list = [worker for worker in all_workers] + worker_list = list(all_workers) context_data = { **admin.site.each_context(request), diff --git a/scheduler/worker/commands/worker_commands.py b/scheduler/worker/commands/worker_commands.py index 649b054..2744a61 100644 --- a/scheduler/worker/commands/worker_commands.py +++ b/scheduler/worker/commands/worker_commands.py @@ -7,7 +7,7 @@ from scheduler.types import ConnectionType, Self _PUBSUB_CHANNEL_TEMPLATE: str = ":workers:pubsub:{}" -_WORKER_COMMANDS_REGISTRY: Dict[str, Type["WorkerCommand"]] = dict() +_WORKER_COMMANDS_REGISTRY: Dict[str, Type["WorkerCommand"]] = {} class WorkerCommandError(Exception): diff --git a/scheduler/worker/scheduler.py b/scheduler/worker/scheduler.py index a89b911..c7a30e9 100644 --- a/scheduler/worker/scheduler.py +++ b/scheduler/worker/scheduler.py @@ -38,7 +38,7 @@ def __init__(self, queues: Sequence[Queue], worker_name: str, interval: Optional raise ValueError("At least one queue must be provided to WorkerScheduler") self._scheduled_job_registries: List[ScheduledJobRegistry] = [] self.lock_acquisition_time: Optional[datetime] = None - self._locks: Dict[str, SchedulerLock] = dict() + self._locks: Dict[str, SchedulerLock] = {} self.connection = get_queue_connection(queues[0].name) self.interval = interval or SCHEDULER_CONFIG.SCHEDULER_INTERVAL self._stop_requested = False diff --git a/scheduler/worker/worker.py b/scheduler/worker/worker.py index 48c1d3e..3a8a031 100644 --- a/scheduler/worker/worker.py +++ b/scheduler/worker/worker.py @@ -58,9 +58,9 @@ class QueueConnectionDiscrepancyError(Exception): pass -_signames = dict( - (getattr(signal, signame), signame) for signame in dir(signal) if signame.startswith("SIG") and "_" not in signame -) +_signames = { + getattr(signal, signame): signame for signame in dir(signal) if signame.startswith("SIG") and "_" not in signame +} def signal_name(signum: int) -> str: