Skip to content

Commit

Permalink
ci: Run pre-commit --all-files on ci to lint project
Browse files Browse the repository at this point in the history
  • Loading branch information
jairhenrique committed Dec 13, 2023
1 parent 2de9251 commit b9d0630
Show file tree
Hide file tree
Showing 9 changed files with 49 additions and 40 deletions.
17 changes: 17 additions & 0 deletions .github/workflows/pre-commit.yml
@@ -0,0 +1,17 @@
name: Run pre-commit

on:
- pull_request
- push
- workflow_dispatch

jobs:
pre-commit:
name: Run pre-commit
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v5
with:
python-version: 3.11
- uses: pre-commit/action@v3.0.0
2 changes: 1 addition & 1 deletion .gitignore
Expand Up @@ -104,4 +104,4 @@ venv.bak/
.mypy_cache/

# PyCharm
.idea/
.idea/
8 changes: 4 additions & 4 deletions .pre-commit-config.yaml
Expand Up @@ -3,7 +3,7 @@ default_language_version:

repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.2.0
rev: v4.5.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
Expand All @@ -13,7 +13,7 @@ repos:
- id: debug-statements

- repo: https://github.com/PyCQA/flake8
rev: 4.0.1
rev: 6.1.0
hooks:
- id: flake8

Expand All @@ -23,14 +23,14 @@ repos:
- id: black

- repo: https://github.com/pre-commit/mirrors-mypy
rev: v0.942
rev: v1.7.1
hooks:
- id: mypy
additional_dependencies:
- types-attrs
- types-boto3

- repo: https://github.com/PyCQA/isort
rev: 5.12.0
rev: 5.13.1
hooks:
- id: isort
35 changes: 14 additions & 21 deletions sqs_workers/async_task.py
@@ -1,16 +1,8 @@
from __future__ import annotations

from contextlib import contextmanager
from typing import (
TYPE_CHECKING,
Any,
Callable,
Generator,
Generic,
NoReturn,
Optional,
TypeVar,
)
from typing import TYPE_CHECKING, Any, Callable, Generator, Generic, NoReturn, Optional

from typing_extensions import ParamSpec

from sqs_workers.utils import bind_arguments
Expand All @@ -33,15 +25,16 @@ def __init__(

def __call__(self, *args: P.args, **kwargs: P.kwargs) -> NoReturn:
raise RuntimeError(
f"Async task {self.queue.name}.{self.job_name} called synchronously (probably, "
"by mistake). Use either AsyncTask.run(...) to run the task synchronously "
"or AsyncTask.delay(...) to add it to the queue"
f"Async task {self.queue.name}.{self.job_name} called synchronously "
"(probably, by mistake). Use either AsyncTask.run(...) "
"to run the task synchronously or AsyncTask.delay(...) "
"to add it to the queue"
)

def __repr__(self) -> str:
return "<%s %s.%s>" % (self.__class__.__name__, self.queue.name, self.job_name)

def run(self, *args: P.args, **kwargs: P.kwargs) -> NoReturn:
def run(self, *args: P.args, **kwargs: P.kwargs) -> Any:
"""
Run the task synchronously.
"""
Expand All @@ -66,10 +59,10 @@ def delay(self, *args: P.args, **kwargs: P.kwargs) -> Optional[str]:
"""
Run the task asynchronously.
"""
_content_type = kwargs.pop("_content_type", self.queue.env.codec)
_delay_seconds = kwargs.pop("_delay_seconds", None)
_deduplication_id = kwargs.pop("_deduplication_id", None)
_group_id = kwargs.pop("_group_id", None)
_content_type = kwargs.pop("_content_type", self.queue.env.codec) # type: ignore # noqa
_delay_seconds = kwargs.pop("_delay_seconds", None) # type: ignore
_deduplication_id = kwargs.pop("_deduplication_id", None) # type: ignore
_group_id = kwargs.pop("_group_id", None) # type: ignore

if self.queue.batching_policy.batching_enabled:
if len(args) > 0:
Expand All @@ -79,10 +72,10 @@ def delay(self, *args: P.args, **kwargs: P.kwargs) -> Optional[str]:

return self.queue.add_job(
self.job_name,
_content_type=_content_type,
_delay_seconds=_delay_seconds,
_content_type=_content_type, # type: ignore
_delay_seconds=_delay_seconds, # type: ignore
_deduplication_id=_deduplication_id,
_group_id=_group_id,
_group_id=_group_id, # type: ignore
**kwargs,
)

Expand Down
4 changes: 2 additions & 2 deletions sqs_workers/core.py
@@ -1,6 +1,6 @@
import json
import logging
from typing import Any, Optional
from typing import Any

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -64,6 +64,6 @@ def __json__(self) -> str:
)


def get_job_name(message) -> Optional[str]:
def get_job_name(message) -> str | None:
attrs = message.message_attributes or {}
return (attrs.get("JobName") or {}).get("StringValue")
1 change: 0 additions & 1 deletion sqs_workers/deadletter_queue.py
Expand Up @@ -48,7 +48,6 @@ def maker(cls, upstream_queue, **kwargs):

@attr.s
class PushBackSender(object):

upstream_queue: "GenericQueue" = attr.ib(default=None)

def __call__(self, message):
Expand Down
15 changes: 7 additions & 8 deletions sqs_workers/queue.py
Expand Up @@ -14,10 +14,9 @@
Optional,
TypeVar,
)
from typing_extensions import ParamSpec


import attr
from typing_extensions import ParamSpec

from sqs_workers import DEFAULT_BACKOFF, codecs
from sqs_workers.async_task import AsyncTask
Expand Down Expand Up @@ -424,7 +423,7 @@ def connect_processor(
)
return AsyncTask(self, job_name, processor)

def get_processor(self, job_name: str) -> Optional[Processor]:
def get_processor(self, job_name: str) -> Processor | None:
"""
Helper function to return a processor for the queue
"""
Expand Down Expand Up @@ -503,7 +502,7 @@ def add_job(
_delay_seconds: Optional[int] = None,
_deduplication_id=None,
_group_id: Optional[str] = None,
**job_kwargs
**job_kwargs,
) -> Optional[str]:
"""
Add job to the queue. The body of the job will be converted to the text
Expand Down Expand Up @@ -574,11 +573,11 @@ def process_message(self, message: Any) -> bool:
Return True if processing went successful
"""
job_name = get_job_name(message)
processor = self.get_processor(job_name)
processor = self.get_processor(job_name) # type: ignore
if processor:
return processor.process_message(message)
else:
return self.process_message_fallback(job_name)
return self.process_message_fallback(job_name) # type: ignore

def process_messages(self, messages: List[Any]) -> bool:
"""
Expand All @@ -594,12 +593,12 @@ def process_messages(self, messages: List[Any]) -> bool:
results = []

for job_name, grouped_messages in messages_by_job_name.items():
processor = self.get_processor(job_name)
processor = self.get_processor(job_name) # type: ignore
if processor:
result = processor.process_messages(grouped_messages)
results.append(result)
else:
result = self.process_message_fallback(job_name)
result = self.process_message_fallback(job_name) # type: ignore
results.append(result)

return all(results)
Expand Down
4 changes: 2 additions & 2 deletions sqs_workers/sqs_env.py
Expand Up @@ -12,18 +12,18 @@
Union,
overload,
)
from typing_extensions import ParamSpec

import attr
import boto3
from typing_extensions import ParamSpec

from sqs_workers import DEFAULT_BACKOFF, RawQueue, codecs, context, processors
from sqs_workers.async_task import AsyncTask
from sqs_workers.batching import BatchingConfiguration, NoBatching
from sqs_workers.core import RedrivePolicy
from sqs_workers.processors import DEFAULT_CONTEXT_VAR
from sqs_workers.queue import GenericQueue, JobQueue
from sqs_workers.shutdown_policies import NeverShutdown
from sqs_workers.async_task import AsyncTask

if TYPE_CHECKING:
from sqs_workers.backoff_policies import BackoffPolicy
Expand Down
3 changes: 2 additions & 1 deletion sqs_workers/sqs_manage.py
@@ -1,6 +1,7 @@
"""
Helper functions to create and delete queues on SQS.
"""
from typing import Any, Dict


def create_standard_queue(
Expand All @@ -13,7 +14,7 @@ def create_standard_queue(
"""
Create a new standard queue
"""
attrs = {}
attrs: Dict[str, Any] = {}
kwargs = {"QueueName": env.get_sqs_queue_name(queue_name), "Attributes": attrs}
if message_retention_period is not None:
attrs["MessageRetentionPeriod"] = str(message_retention_period)
Expand Down

0 comments on commit b9d0630

Please sign in to comment.