Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(#64) Support tasks retry & propagate raised exception #65

Merged
merged 1 commit into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,9 @@ source = src/
parallel = True
concurrency = multiprocessing
sigterm = True

[report]
exclude_lines =
pragma: no cover
if TYPE_CHECKING:
if t.TYPE_CHECKING:
2 changes: 1 addition & 1 deletion .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v1
- run: docker-compose -f ./docker-compose.yml up --detach redis
- run: docker compose -f ./docker-compose.yml up --detach redis
- uses: actions/setup-python@v3
with:
python-version: "3.10.x"
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/coverage.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v1
- run: docker-compose -f ./docker-compose.yml up --detach redis
- run: docker compose -f ./docker-compose.yml up --detach redis
- uses: actions/setup-python@v3
with:
python-version: "3.10.x"
Expand Down
12 changes: 8 additions & 4 deletions .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -257,16 +257,16 @@ ignored-parents=
max-args=10

# Maximum number of attributes for a class (see R0902).
max-attributes=7
max-attributes=10

# Maximum number of boolean expressions in an if statement (see R0916).
max-bool-expr=5

# Maximum number of branch for function / method body.
max-branches=12
max-branches=15

# Maximum number of locals for function / method body.
max-locals=15
max-locals=20

# Maximum number of parents for a class (see R0901).
max-parents=7
Expand Down Expand Up @@ -458,7 +458,11 @@ good-names=i,
a,
b,
c,
n
n,
id,
e,
s,
on

# Good variable names regexes, separated by a comma. If names match any regex,
# they will always be accepted
Expand Down
6 changes: 5 additions & 1 deletion .pylintrc.tests
Original file line number Diff line number Diff line change
Expand Up @@ -461,8 +461,12 @@ good-names=i,
a,
b,
c,
e,
n,
ls
t,
ls,
fo,
fi

# Good variable names regexes, separated by a comma. If names match any regex,
# they will always be accepted
Expand Down
13 changes: 12 additions & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,16 @@
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python: Attach",
"type": "python",
"request": "attach",
"connect": {
"host": "localhost",
"port": 5678
},
"justMyCode": false
},
{
"name": "Main",
"type": "python",
Expand Down Expand Up @@ -31,7 +41,8 @@
"-s",
],
"request": "launch",
"console": "integratedTerminal"
"console": "integratedTerminal",
"justMyCode": false
},
{
"name": "Sample Worker (Simple App)",
Expand Down
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import asyncio
import aiotaskq


@aiotaskq.task
@aiotaskq.task()
def some_task(b: int) -> int:
# Some task with high cpu usage
def _naive_fib(n: int) -> int:
Expand Down Expand Up @@ -132,22 +132,22 @@ import asyncio
from aiotaskq import task


@task
@task()
def task_1(*args, **kwargs):
pass


@task
@task()
def task_2(*args, **kwargs):
pass


@task
@task()
def task_3(*args, **kwargs):
pass


@task
@task()
def task_4(*args, **kwargs):
pass

Expand Down
5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ build-backend = "setuptools.build_meta"
requires-python = ">=3.9"
dependencies = [
"aioredis >= 2.0.0, < 2.1.0",
"jsonpickle >= 3.0.0, < 3.1.0",
"tomlkit >= 0.11.0, < 0.12.0",
"typer >= 0.4.0, < 0.5.0",
]
name = "aiotaskq"
version = "0.0.12"
version = "0.0.13"
readme = "README.md"
description = "A simple asynchronous task queue"
authors = [
Expand All @@ -28,7 +29,7 @@ license = { file = "LICENSE" }

[project.optional-dependencies]
dev = [
"black >= 22.1.0, < 22.2.0",
"black >= 22.2.0, < 23.0.0",
"coverage >= 6.4.0, < 6.5.0",
"mypy >= 0.931, < 1.0",
"mypy-extensions >= 0.4.0, < 0.5.0",
Expand Down
2 changes: 1 addition & 1 deletion src/aiotaskq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import aiotaskq


@aiotaskq.task
@aiotaskq.task()
def some_task(b: int) -> int:
# Some task with high cpu usage
def _naive_fib(n: int) -> int:
Expand Down
5 changes: 4 additions & 1 deletion src/aiotaskq/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,18 @@

#!/usr/bin/env python

import logging
import typing as t

import typer

from . import __version__
from .config import Config
from .interfaces import ConcurrencyType
from .worker import Defaults, run_worker_forever
from . import __version__

cli = typer.Typer()
logging.basicConfig(level=Config.log_level())


def _version_callback(value: bool):
Expand Down
47 changes: 47 additions & 0 deletions src/aiotaskq/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
"""
Module to define and store all configuration values used across the library.

The public object from this module is `Config`. This object wraps
all the configuration values, which include:
- Variables
- Environment variables
"""

import logging
from os import environ

from .interfaces import SerializationType

_REDIS_URL = "redis://127.0.0.1:6379"


class Config:
"""
Provide configuration values.

These include:
- Variables
- Environment variables
"""

@staticmethod
def serialization_type() -> SerializationType:
"""Return the serialization type as provided via env var AIOTASKQ_SERIALIZATION."""
s: str = environ.get("AIOTASKQ_SERIALIZATION", SerializationType.DEFAULT.value)
return SerializationType[s.upper()]

@staticmethod
def log_level() -> int:
"""Return the log level as provided via env var LOG_LEVEL."""
level: int = int(environ.get("AIOTASKQ_LOG_LEVEL", logging.DEBUG))
return level

@staticmethod
def broker_url() -> str:
"""
Return the broker url as provided via env var BROKER_URL.

Defaults to "redis://127.0.0.1:6379".
"""
broker_url: str = environ.get("BROKER_URL", _REDIS_URL)
return broker_url
33 changes: 29 additions & 4 deletions src/aiotaskq/constants.py
imranariffin marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -1,5 +1,30 @@
"""Module to define and store all constants used across the library."""
"""
Module to define and store all constants used across the library.

REDIS_URL = "redis://127.0.0.1:6379"
TASKS_CHANNEL = "channel:tasks"
RESULTS_CHANNEL_TEMPLATE = "channel:results:{task_id}"
The public object from this module is `Constants`. This object wraps
all the constants, which include:
- Static methods that return constant values
"""


_TASKS_CHANNEL = "channel:tasks"
_RESULTS_CHANNEL_TEMPLATE = "channel:results:{task_id}"


class Constants:
"""
Provide all the constants.

These include:
- Static methods that return constant values
"""

@staticmethod
def tasks_channel() -> str:
"""Return the channel name used for transporting task requests on the broker."""
return _TASKS_CHANNEL

@staticmethod
def results_channel_template() -> str:
"""Return the template chnnale name used for transporting task results on the broker."""
imranariffin marked this conversation as resolved.
Show resolved Hide resolved
return _RESULTS_CHANNEL_TEMPLATE
4 changes: 4 additions & 0 deletions src/aiotaskq/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,7 @@ class ConcurrencyTypeNotSupported(Exception):

class InvalidArgument(Exception):
"""A task is applied with invalid arguments."""


class InvalidRetryOptions(Exception):
"""A task is defined with invalid retry options."""
58 changes: 58 additions & 0 deletions src/aiotaskq/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,61 @@ class IWorkerManager(IWorker):
"""

concurrency_manager: IConcurrencyManager


class SerializationType(str, enum.Enum):
"""Specify the types of serialization supported."""

JSON = "json"
DEFAULT = JSON


T = t.TypeVar("T")


class ISerialization(t.Protocol, t.Generic[T]):
"""Define the interface required to serialize and deserialize a generic object."""

@classmethod
def serialize(cls, obj: T) -> bytes:
"""Serialize any object into bytes."""

@classmethod
def deserialize(cls, klass: type[T], s: bytes) -> T:
"""Deserialize bytes into any object."""


class RetryOptions(t.TypedDict):
"""
Specify the available retry options.

max_retries int | None: The number times to keep retrying the execution of the task
until the task executes successfully. Counting starts from
0 so if max_retries = 2 for example, then the task will execute
1 + 2 times (1 time for first execution, 2 times for re-try) in the
worst case scenario.
on tuple[type[Exception], ...]: The tuple of exception classes to retry on. The task will
will only be retried if that exception that is raised
during task execution is an instance of one of the listed
exception classes.

Examples:

If `on=(Exception,)` then any kind of exception will trigger
a retry.

If `on=(ExceptionA, ExceptionB,)` and during task
execution ExceptionC was raised, then retry is not triggered.

If `on=tuple()` then during task definition aiotaskq will raise
`InvalidRetryOptions`
"""

max_retries: int | None
on: tuple[type[Exception], ...]


class TaskOptions(t.TypedDict):
"""Specify the options available for a task."""

retry: RetryOptions | None
Loading
Loading