Skip to content

Commit

Permalink
Make endless long-polling
Browse files Browse the repository at this point in the history
  • Loading branch information
JrooTJunior committed Jun 18, 2021
1 parent 5296724 commit ac1f0ef
Show file tree
Hide file tree
Showing 7 changed files with 245 additions and 22 deletions.
18 changes: 12 additions & 6 deletions aiogram/client/session/aiohttp.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import asyncio
from typing import (
TYPE_CHECKING,
Any,
Expand All @@ -14,11 +15,12 @@
cast,
)

from aiohttp import BasicAuth, ClientSession, FormData, TCPConnector
from aiohttp import BasicAuth, ClientError, ClientSession, FormData, TCPConnector

from aiogram.methods import Request, TelegramMethod

from ...methods.base import TelegramType
from ...utils.exceptions.network import NetworkError
from .base import UNSET, BaseSession

if TYPE_CHECKING: # pragma: no cover
Expand Down Expand Up @@ -139,11 +141,15 @@ async def make_request(
url = self.api.api_url(token=bot.token, method=request.method)
form = self.build_form_data(request)

async with session.post(
url, data=form, timeout=self.timeout if timeout is None else timeout
) as resp:
raw_result = await resp.text()

try:
async with session.post(
url, data=form, timeout=self.timeout if timeout is None else timeout
) as resp:
raw_result = await resp.text()
except asyncio.TimeoutError:
raise NetworkError(method=call, message="Request timeout error")
except ClientError as e:
raise NetworkError(method=call, message=f"{type(e).__name__}: {e}")
response = self.check_response(method=call, status_code=resp.status, content=raw_result)
return cast(TelegramType, response.result)

Expand Down
78 changes: 68 additions & 10 deletions aiogram/dispatcher/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
from ..client.bot import Bot
from ..methods import GetUpdates, TelegramMethod
from ..types import TelegramObject, Update, User
from ..utils.backoff import Backoff, BackoffConfig
from ..utils.exceptions.base import TelegramAPIError
from ..utils.exceptions.network import NetworkError
from ..utils.exceptions.server import ServerError
from .event.bases import UNHANDLED, SkipHandler
from .event.telegram import TelegramEventObserver
from .fsm.middleware import FSMContextMiddleware
Expand All @@ -21,6 +24,8 @@
from .middlewares.user_context import UserContextMiddleware
from .router import Router

DEFAULT_BACKOFF_CONFIG = BackoffConfig(min_delay=1.0, max_delay=5.0, factor=1.3, jitter=0.1)


class Dispatcher(Router):
"""
Expand Down Expand Up @@ -63,7 +68,7 @@ def __init__(
@property
def parent_router(self) -> None:
"""
Dispatcher has no parent router
Dispatcher has no parent router and can't be included to any other routers or dispatchers
:return:
"""
Expand All @@ -82,6 +87,7 @@ def parent_router(self, value: Router) -> None:
async def feed_update(self, bot: Bot, update: Update, **kwargs: Any) -> Any:
"""
Main entry point for incoming updates
Response of this method can be used as Webhook response
:param bot:
:param update:
Expand All @@ -90,7 +96,7 @@ async def feed_update(self, bot: Bot, update: Update, **kwargs: Any) -> Any:
handled = False
start_time = loop.time()

Bot.set_current(bot)
token = Bot.set_current(bot)
try:
response = await self.update.trigger(update, bot=bot, **kwargs)
handled = response is not UNHANDLED
Expand All @@ -105,6 +111,7 @@ async def feed_update(self, bot: Bot, update: Update, **kwargs: Any) -> Any:
duration,
bot.id,
)
Bot.reset_current(token)

async def feed_raw_update(self, bot: Bot, update: Dict[str, Any], **kwargs: Any) -> Any:
"""
Expand All @@ -119,20 +126,50 @@ async def feed_raw_update(self, bot: Bot, update: Dict[str, Any], **kwargs: Any)

@classmethod
async def _listen_updates(
cls, bot: Bot, polling_timeout: int = 30
cls,
bot: Bot,
polling_timeout: int = 30,
backoff_config: BackoffConfig = DEFAULT_BACKOFF_CONFIG,
) -> AsyncGenerator[Update, None]:
"""
Infinity updates reader
Endless updates reader with correctly handling any server-side or connection errors.
So you may not worry that the polling will stop working.
"""
backoff = Backoff(config=backoff_config)
get_updates = GetUpdates(timeout=polling_timeout)
kwargs = {}
if bot.session.timeout:
# Request timeout can be lower than session timeout ant that's OK.
# To prevent false-positive TimeoutError we should wait longer than polling timeout
kwargs["request_timeout"] = int(bot.session.timeout + polling_timeout)
while True:
# TODO: Skip restarting telegram error
updates = await bot(get_updates, **kwargs)
try:
updates = await bot(get_updates, **kwargs)
except (NetworkError, ServerError) as e:
# In cases when Telegram Bot API was inaccessible don't need to stop polling process
# because some of developers can't make auto-restarting of the script
loggers.dispatcher.error("Failed to fetch updates - %s: %s", type(e).__name__, e)
# And also backoff timeout is best practice to retry any network activity
loggers.dispatcher.warning(
"Sleep for %f seconds and try again... (tryings = %d, bot id = %d)",
backoff.next_delay,
backoff.counter,
bot.id,
)
await backoff.asleep()
continue

# In case when network connection was fixed let's reset the backoff
# to initial value and then process updates
backoff.reset()

for update in updates:
yield update
# The getUpdates method returns the earliest 100 unconfirmed updates.
# To confirm an update, use the offset parameter when calling getUpdates
# All updates with update_id less than or equal to offset will be marked as confirmed on the server
# and will no longer be returned.
get_updates.offset = update.update_id + 1

async def _listen_update(self, update: Update, **kwargs: Any) -> Any:
Expand Down Expand Up @@ -255,7 +292,12 @@ async def _process_update(
return True # because update was processed but unsuccessful

async def _polling(
self, bot: Bot, polling_timeout: int = 30, handle_as_tasks: bool = True, **kwargs: Any
self,
bot: Bot,
polling_timeout: int = 30,
handle_as_tasks: bool = True,
backoff_config: BackoffConfig = DEFAULT_BACKOFF_CONFIG,
**kwargs: Any,
) -> None:
"""
Internal polling process
Expand All @@ -264,7 +306,9 @@ async def _polling(
:param kwargs:
:return:
"""
async for update in self._listen_updates(bot, polling_timeout=polling_timeout):
async for update in self._listen_updates(
bot, polling_timeout=polling_timeout, backoff_config=backoff_config
):
handle_update = self._process_update(bot=bot, update=update, **kwargs)
if handle_as_tasks:
asyncio.create_task(handle_update)
Expand Down Expand Up @@ -348,7 +392,12 @@ def process_response(task: Future[Any]) -> None:
return None

async def start_polling(
self, *bots: Bot, polling_timeout: int = 10, handle_as_tasks: bool = True, **kwargs: Any
self,
*bots: Bot,
polling_timeout: int = 10,
handle_as_tasks: bool = True,
backoff_config: BackoffConfig = DEFAULT_BACKOFF_CONFIG,
**kwargs: Any,
) -> None:
"""
Polling runner
Expand All @@ -357,6 +406,7 @@ async def start_polling(
:param polling_timeout:
:param handle_as_tasks:
:param kwargs:
:param backoff_config:
:return:
"""
async with self._running_lock: # Prevent to run this method twice at a once
Expand All @@ -376,6 +426,7 @@ async def start_polling(
bot=bot,
handle_as_tasks=handle_as_tasks,
polling_timeout=polling_timeout,
backoff_config=backoff_config,
**kwargs,
)
)
Expand All @@ -387,13 +438,19 @@ async def start_polling(
await self.emit_shutdown(**workflow_data)

def run_polling(
self, *bots: Bot, polling_timeout: int = 30, handle_as_tasks: bool = True, **kwargs: Any
self,
*bots: Bot,
polling_timeout: int = 30,
handle_as_tasks: bool = True,
backoff_config: BackoffConfig = DEFAULT_BACKOFF_CONFIG,
**kwargs: Any,
) -> None:
"""
Run many bots with polling
:param bots: Bot instances
:param polling_timeout: Poling timeout
:param backoff_config:
:param handle_as_tasks: Run task for each event and no wait result
:param kwargs: contextual data
:return:
Expand All @@ -405,6 +462,7 @@ def run_polling(
**kwargs,
polling_timeout=polling_timeout,
handle_as_tasks=handle_as_tasks,
backoff_config=backoff_config,
)
)
except (KeyboardInterrupt, SystemExit): # pragma: no cover
Expand Down
6 changes: 2 additions & 4 deletions aiogram/dispatcher/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,7 @@ def parent_router(self, router: Router) -> None:
:param router:
"""
if not isinstance(router, Router):
raise ValueError(
f"router should be instance of Router not {type(router).__class__.__name__}"
)
raise ValueError(f"router should be instance of Router not {type(router).__name__!r}")
if self._parent_router:
raise RuntimeError(f"Router is already attached to {self._parent_router!r}")
if self == router:
Expand All @@ -133,7 +131,7 @@ def parent_router(self, router: Router) -> None:

if not self.use_builtin_filters and parent.use_builtin_filters:
warnings.warn(
f"{self.__class__.__name__}(use_builtin_filters=False) has no effect"
f"{type(self).__name__}(use_builtin_filters=False) has no effect"
f" for router {self} in due to builtin filters is already registered"
f" in parent router",
CodeHasNoEffect,
Expand Down
80 changes: 80 additions & 0 deletions aiogram/utils/backoff.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import asyncio
import time
from dataclasses import dataclass
from random import normalvariate


@dataclass(frozen=True)
class BackoffConfig:
min_delay: float
max_delay: float
factor: float
jitter: float

def __post_init__(self):
if self.max_delay <= self.min_delay:
raise ValueError("`max_delay` should be greater than `min_delay`")
if self.factor <= 1:
raise ValueError("`factor` should be greater than 1")


class Backoff:
def __init__(self, config: BackoffConfig) -> None:
self.config = config
self._next_delay = config.min_delay
self._current_delay = 0.0
self._counter = 0

def __iter__(self):
return self

@property
def min_delay(self) -> float:
return self.config.min_delay

@property
def max_delay(self) -> float:
return self.config.max_delay

@property
def factor(self) -> float:
return self.config.factor

@property
def jitter(self) -> float:
return self.config.jitter

@property
def next_delay(self) -> float:
return self._next_delay

@property
def current_delay(self) -> float:
return self._current_delay

@property
def counter(self) -> int:
return self._counter

def sleep(self) -> None:
time.sleep(next(self))

async def asleep(self) -> None:
await asyncio.sleep(next(self))

def _calculate_next(self, value: float) -> float:
return normalvariate(min(value * self.factor, self.max_delay), self.jitter)

def __next__(self) -> float:
self._current_delay = self._next_delay
self._next_delay = self._calculate_next(self._next_delay)
self._counter += 1
return self._current_delay

def reset(self) -> None:
self._current_delay = 0.0
self._counter = 0
self._next_delay = self.min_delay

def __str__(self) -> str:
return f"Backoff(tryings={self._counter}, current_delay={self._current_delay}, next_delay={self._next_delay})"
4 changes: 2 additions & 2 deletions aiogram/utils/exceptions/network.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from aiogram.utils.exceptions.base import DetailedTelegramAPIError
from aiogram.utils.exceptions.base import TelegramAPIError


class NetworkError(DetailedTelegramAPIError):
class NetworkError(TelegramAPIError):
pass
5 changes: 5 additions & 0 deletions aiogram/utils/exceptions/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from aiogram.utils.exceptions.base import TelegramAPIError


class ServerError(TelegramAPIError):
pass

0 comments on commit ac1f0ef

Please sign in to comment.