-
-
Notifications
You must be signed in to change notification settings - Fork 806
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
6253b25
commit 79f2141
Showing
14 changed files
with
586 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
from __future__ import annotations | ||
|
||
import re | ||
from typing import TYPE_CHECKING, List, Type | ||
|
||
from aiogram.methods import Response, TelegramMethod | ||
from aiogram.types import TelegramObject | ||
from aiogram.utils.exceptions.base import TelegramAPIError | ||
from aiogram.utils.exceptions.exceptions import ( | ||
CantParseEntitiesStartTag, | ||
CantParseEntitiesUnclosed, | ||
CantParseEntitiesUnmatchedTags, | ||
CantParseEntitiesUnsupportedTag, | ||
DetailedTelegramAPIError, | ||
) | ||
|
||
if TYPE_CHECKING: | ||
from aiogram.client.bot import Bot | ||
from aiogram.client.session.base import NextRequestMiddlewareType | ||
|
||
|
||
class RequestErrorMiddleware: | ||
def __init__(self) -> None: | ||
self._registry: List[Type[DetailedTelegramAPIError]] = [ | ||
CantParseEntitiesStartTag, | ||
CantParseEntitiesUnmatchedTags, | ||
CantParseEntitiesUnclosed, | ||
CantParseEntitiesUnsupportedTag, | ||
] | ||
|
||
def mount(self, error: Type[DetailedTelegramAPIError]) -> Type[DetailedTelegramAPIError]: | ||
if error in self: | ||
raise ValueError(f"{error!r} is already registered") | ||
if not hasattr(error, "patterns"): | ||
raise ValueError(f"{error!r} has no attribute 'patterns'") | ||
self._registry.append(error) | ||
return error | ||
|
||
def detect_error(self, err: TelegramAPIError) -> TelegramAPIError: | ||
message = err.message | ||
for variant in self._registry: | ||
for pattern in variant.patterns: | ||
if match := re.match(pattern, message): | ||
return variant( | ||
method=err.method, | ||
message=err.message, | ||
match=match, | ||
) | ||
return err | ||
|
||
def __contains__(self, item: Type[DetailedTelegramAPIError]) -> bool: | ||
return item in self._registry | ||
|
||
async def __call__( | ||
self, | ||
bot: Bot, | ||
method: TelegramMethod[TelegramObject], | ||
make_request: NextRequestMiddlewareType, | ||
) -> Response[TelegramObject]: | ||
try: | ||
return await make_request(bot, method) | ||
except TelegramAPIError as e: | ||
detected_err = self.detect_error(err=e) | ||
raise detected_err from e |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
from __future__ import annotations | ||
|
||
from decimal import Decimal | ||
from enum import Enum | ||
from fractions import Fraction | ||
from typing import TYPE_CHECKING, Any, Dict, Optional, Type, TypeVar, Union | ||
from uuid import UUID | ||
|
||
from magic_filter import MagicFilter | ||
from pydantic import BaseModel | ||
|
||
from aiogram.dispatcher.filters import BaseFilter | ||
from aiogram.types import CallbackQuery | ||
|
||
T = TypeVar("T", bound="CallbackData") | ||
|
||
MAX_CALLBACK_LENGTH: int = 64 | ||
|
||
|
||
class CallbackDataException(Exception): | ||
pass | ||
|
||
|
||
class CallbackData(BaseModel): | ||
if TYPE_CHECKING: # pragma: no cover | ||
sep: str | ||
prefix: str | ||
|
||
def __init_subclass__(cls, **kwargs: Any) -> None: | ||
if "prefix" not in kwargs: | ||
raise ValueError( | ||
f"prefix required, usage example: " | ||
f"`class {cls.__name__}(CallbackData, prefix='my_callback'): ...`" | ||
) | ||
cls.sep = kwargs.pop("sep", ":") | ||
cls.prefix = kwargs.pop("prefix") | ||
if cls.sep in cls.prefix: | ||
raise ValueError( | ||
f"Separator symbol {cls.sep!r} can not be used inside prefix {cls.prefix!r}" | ||
) | ||
|
||
def _encode_value(self, key: str, value: Any) -> str: | ||
if value is None: | ||
return "" | ||
if isinstance(value, Enum): | ||
return str(value.value) | ||
if isinstance(value, (int, str, float, Decimal, Fraction, UUID)): | ||
return str(value) | ||
raise ValueError( | ||
f"Attribute {key}={value!r} of type {type(value).__name__!r}" | ||
f" can not be packed to callback data" | ||
) | ||
|
||
def pack(self) -> str: | ||
result = [self.prefix] | ||
for key, value in self.dict().items(): | ||
encoded = self._encode_value(key, value) | ||
if self.sep in encoded: | ||
raise ValueError( | ||
f"Separator symbol {self.sep!r} can not be used in value {key}={encoded!r}" | ||
) | ||
result.append(encoded) | ||
callback_data = self.sep.join(result) | ||
if len(callback_data.encode()) > MAX_CALLBACK_LENGTH: | ||
raise ValueError( | ||
f"Resulted callback data is too long! len({callback_data!r}.encode()) > {MAX_CALLBACK_LENGTH}" | ||
) | ||
return callback_data | ||
|
||
@classmethod | ||
def unpack(cls: Type[T], value: str) -> T: | ||
prefix, *parts = value.split(cls.sep) | ||
names = cls.__fields__.keys() | ||
if len(parts) != len(names): | ||
raise TypeError( | ||
f"Callback data {cls.__name__!r} takes {len(names)} arguments but {len(parts)} were given" | ||
) | ||
if prefix != cls.prefix: | ||
raise ValueError(f"Bad prefix ({prefix!r} != {cls.prefix!r})") | ||
payload = {} | ||
for k, v in zip(names, parts): # type: str, Optional[str] | ||
if field := cls.__fields__.get(k): | ||
if v == "" and not field.required: | ||
v = None | ||
payload[k] = v | ||
return cls(**payload) | ||
|
||
@classmethod | ||
def filter(cls, rule: MagicFilter) -> CallbackQueryFilter: | ||
return CallbackQueryFilter(callback_data=cls, rule=rule) | ||
|
||
class Config: | ||
use_enum_values = True | ||
|
||
|
||
class CallbackQueryFilter(BaseFilter): | ||
callback_data: Type[CallbackData] | ||
rule: MagicFilter | ||
|
||
async def __call__(self, query: CallbackQuery) -> Union[bool, Dict[str, Any]]: | ||
if not isinstance(query, CallbackQuery) or not query.data: | ||
return False | ||
try: | ||
callback_data = self.callback_data.unpack(query.data) | ||
except (TypeError, ValueError): | ||
return False | ||
|
||
if self.rule.resolve(callback_data): | ||
return {"callback_data": callback_data} | ||
return False | ||
|
||
class Config: | ||
arbitrary_types_allowed = True |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
from aiogram.utils.exceptions.base import DetailedTelegramAPIError | ||
|
||
|
||
class BadRequest(DetailedTelegramAPIError): | ||
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
from typing import ClassVar, List, Match, Optional, TypeVar | ||
|
||
from aiogram.methods import TelegramMethod | ||
from aiogram.methods.base import TelegramType | ||
|
||
ErrorType = TypeVar("ErrorType") | ||
|
||
|
||
class TelegramAPIError(Exception): | ||
url: Optional[str] = None | ||
|
||
def __init__( | ||
self, | ||
method: TelegramMethod[TelegramType], | ||
message: str, | ||
) -> None: | ||
self.method = method | ||
self.message = message | ||
|
||
def render_description(self) -> str: | ||
return self.message | ||
|
||
def __str__(self) -> str: | ||
message = [self.render_description()] | ||
if self.url: | ||
message.append(f"(background on this error at: {self.url})") | ||
return "\n".join(message) | ||
|
||
|
||
class DetailedTelegramAPIError(TelegramAPIError): | ||
patterns: ClassVar[List[str]] | ||
|
||
def __init__( | ||
self, | ||
method: TelegramMethod[TelegramType], | ||
message: str, | ||
match: Match[str], | ||
) -> None: | ||
super().__init__(method=method, message=message) | ||
self.match: Match[str] = match |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
from aiogram.utils.exceptions.base import DetailedTelegramAPIError | ||
|
||
|
||
class NetworkError(DetailedTelegramAPIError): | ||
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
from aiogram.utils.exceptions.base import DetailedTelegramAPIError | ||
|
||
|
||
class NotFound(DetailedTelegramAPIError): | ||
pass |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
from typing import Optional | ||
|
||
from aiogram.methods import TelegramMethod | ||
from aiogram.methods.base import TelegramType | ||
from aiogram.utils.exceptions.base import TelegramAPIError | ||
|
||
|
||
class RetryAfter(TelegramAPIError): | ||
url = "https://core.telegram.org/bots/faq#my-bot-is-hitting-limits-how-do-i-avoid-this" | ||
|
||
def __init__( | ||
self, | ||
method: TelegramMethod[TelegramType], | ||
message: str, | ||
retry_after: int, | ||
) -> None: | ||
super().__init__(method=method, message=message) | ||
self.retry_after = retry_after | ||
|
||
def render_description(self) -> str: | ||
description = f"Flood control exceeded on method {type(self.method).__name__!r}" | ||
if chat_id := getattr(self.method, "chat_id", None): | ||
description += f" in chat {chat_id}" | ||
description += f". Retry in {self.retry_after} seconds." | ||
return description | ||
|
||
|
||
class MigrateToChat(TelegramAPIError): | ||
url = "https://core.telegram.org/bots/api#responseparameters" | ||
|
||
def __init__( | ||
self, | ||
method: TelegramMethod[TelegramType], | ||
message: str, | ||
migrate_to_chat_id: int, | ||
) -> None: | ||
super().__init__(method=method, message=message) | ||
self.migrate_to_chat_id = migrate_to_chat_id | ||
|
||
def render_message(self) -> Optional[str]: | ||
description = ( | ||
f"The group has been migrated to a supergroup with id {self.migrate_to_chat_id}" | ||
) | ||
if chat_id := getattr(self.method, "chat_id", None): | ||
description += f" from {chat_id}" | ||
return description |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
def mark_line(text: str, offset: int, length: int = 1) -> str: | ||
try: | ||
if offset > 0 and (new_line_pos := text[:offset].rindex("\n")): | ||
text = "..." + text[:new_line_pos] | ||
offset -= new_line_pos - 3 | ||
except ValueError: | ||
pass | ||
|
||
if offset > 10: | ||
text = "..." + text[offset - 10 :] | ||
offset = 13 | ||
|
||
mark = " " * offset | ||
mark += "^" * length | ||
try: | ||
if new_line_pos := text[len(mark) :].index("\n"): | ||
text = text[:new_line_pos].rstrip() + "..." | ||
except ValueError: | ||
pass | ||
return text + "\n" + mark |
Oops, something went wrong.