Skip to content

Commit

Permalink
Merge branch 'webhook' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
JrooTJunior committed Aug 9, 2017
2 parents 1173598 + 326f82f commit 882d60f
Show file tree
Hide file tree
Showing 13 changed files with 1,859 additions and 38 deletions.
7 changes: 4 additions & 3 deletions aiogram/bot/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,9 @@ async def send_file(self, file_type, method, file, payload) -> Union[Dict, Boole
:param payload: request payload
:return: resonse
"""
if isinstance(file, str):
if file is None:
files = {}
elif isinstance(file, str):
# You can use file ID or URL in the most of requests
payload[file_type] = file
files = None
Expand Down Expand Up @@ -1076,8 +1078,7 @@ async def answer_callback_query(self, callback_query_id: String,
If you have created a Game and accepted the conditions via @Botfather,
specify the URL that opens your game – note that this will only work
if the query comes from a callback_game button.
Otherwise, you may use links like t.me/your_bot?start=XXXX that open your bot with a parameter.
Otherwise, you may use links like t.me/your_bot?start=XXXX that open your bot with a parameter.
:param cache_time: Integer (Optional) - The maximum amount of time in seconds that the result
of the callback query may be cached client-side. Telegram apps will support
caching starting in version 3.14. Defaults to 0.
Expand Down
6 changes: 6 additions & 0 deletions aiogram/contrib/fsm_storage/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ class MemoryStorage(BaseStorage):
This type of storage is not recommended for usage in bots, because you will lost all states after restarting.
"""

async def wait_closed(self):
pass

def close(self):
self.data.clear()

def __init__(self):
self.data = {}

Expand Down
34 changes: 28 additions & 6 deletions aiogram/contrib/fsm_storage/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
This module has redis storage for finite-state machine based on `aioredis <https://github.com/aio-libs/aioredis>`_ driver
"""

import asyncio
import typing

import aioredis
Expand All @@ -21,6 +22,13 @@ class RedisStorage(BaseStorage):
storage = RedisStorage('localhost', 6379, db=5)
dp = Dispatcher(bot, storage=storage)
And need to close Redis connection when shutdown
.. code-block:: python3
dp.storage.close()
await dp.storage.wait_closed()
"""

def __init__(self, host, port, db=None, password=None, ssl=None, loop=None, **kwargs):
Expand All @@ -29,10 +37,22 @@ def __init__(self, host, port, db=None, password=None, ssl=None, loop=None, **kw
self._db = db
self._password = password
self._ssl = ssl
self._loop = loop
self._loop = loop or asyncio.get_event_loop()
self._kwargs = kwargs

self._redis: aioredis.RedisConnection = None
self._connection_lock = asyncio.Lock(loop=self._loop)

def close(self):
if self._redis and not self._redis.closed:
self._redis.close()
del self._redis
self._redis = None

async def wait_closed(self):
if self._redis:
return await self._redis.wait_closed()
return True

@property
async def redis(self) -> aioredis.RedisConnection:
Expand All @@ -41,11 +61,13 @@ async def redis(self) -> aioredis.RedisConnection:
This property is awaitable.
"""
if self._redis is None:
self._redis = await aioredis.create_connection((self._host, self._port),
db=self._db, password=self._password, ssl=self._ssl,
loop=self._loop,
**self._kwargs)
# Use thread-safe asyncio Lock because this method without that is not safe
async with self._connection_lock:
if self._redis is None:
self._redis = await aioredis.create_connection((self._host, self._port),
db=self._db, password=self._password, ssl=self._ssl,
loop=self._loop,
**self._kwargs)
return self._redis

async def get_record(self, *,
Expand Down
53 changes: 39 additions & 14 deletions aiogram/dispatcher/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from .filters import CommandsFilter, RegexpFilter, ContentTypeFilter, generate_default_filters
from .handler import Handler
from .storage import DisabledStorage, BaseStorage, FSMContext
from .webhook import BaseResponse
from ..bot import Bot
from ..types.message import ContentType

Expand Down Expand Up @@ -74,8 +75,10 @@ async def process_updates(self, updates):
:param updates:
:return:
"""
tasks = []
for update in updates:
self.loop.create_task(self.updates_handler.notify(update))
tasks.append(self.loop.create_task(self.updates_handler.notify(update)))
return await asyncio.gather(*tasks)

async def process_update(self, update):
"""
Expand All @@ -86,30 +89,31 @@ async def process_update(self, update):
"""
self.last_update_id = update.update_id
if update.message:
await self.message_handlers.notify(update.message)
return await self.message_handlers.notify(update.message)
if update.edited_message:
await self.edited_message_handlers.notify(update.edited_message)
return await self.edited_message_handlers.notify(update.edited_message)
if update.channel_post:
await self.channel_post_handlers.notify(update.channel_post)
return await self.channel_post_handlers.notify(update.channel_post)
if update.edited_channel_post:
await self.edited_channel_post_handlers.notify(update.edited_channel_post)
return await self.edited_channel_post_handlers.notify(update.edited_channel_post)
if update.inline_query:
await self.inline_query_handlers.notify(update.inline_query)
return await self.inline_query_handlers.notify(update.inline_query)
if update.chosen_inline_result:
await self.chosen_inline_result_handlers.notify(update.chosen_inline_result)
return await self.chosen_inline_result_handlers.notify(update.chosen_inline_result)
if update.callback_query:
await self.callback_query_handlers.notify(update.callback_query)
return await self.callback_query_handlers.notify(update.callback_query)
if update.shipping_query:
await self.shipping_query_handlers.notify(update.shipping_query)
return await self.shipping_query_handlers.notify(update.shipping_query)
if update.pre_checkout_query:
await self.pre_checkout_query_handlers.notify(update.pre_checkout_query)
return await self.pre_checkout_query_handlers.notify(update.pre_checkout_query)

async def start_pooling(self, timeout=20, relax=0.1):
async def start_pooling(self, timeout=20, relax=0.1, limit=None):
"""
Start long-pooling
:param timeout:
:param relax:
:param limit:
:return:
"""
if self._pooling:
Expand All @@ -120,21 +124,42 @@ async def start_pooling(self, timeout=20, relax=0.1):
offset = None
while self._pooling:
try:
updates = await self.bot.get_updates(offset=offset, timeout=timeout)
updates = await self.bot.get_updates(limit=limit, offset=offset, timeout=timeout)
except Exception as e:
log.exception('Cause exception while getting updates')
await asyncio.sleep(relax)
if relax:
await asyncio.sleep(relax)
continue

if updates:
log.info("Received {0} updates.".format(len(updates)))
offset = updates[-1].update_id + 1
await self.process_updates(updates)

self.loop.create_task(self._process_pooling_updates(updates))

await asyncio.sleep(relax)

log.warning('Pooling is stopped.')

async def _process_pooling_updates(self, updates):
"""
Process updates received from long-pooling.
:param updates: list of updates.
"""
need_to_call = []
for update in await self.process_updates(updates):
for responses in update:
for response in responses:
if not isinstance(response, BaseResponse):
continue
need_to_call.append(response.execute_response(self.bot))
if need_to_call:
try:
asyncio.gather(*need_to_call)
except Exception as e:
log.exception('Cause exception while processing updates.')

def stop_pooling(self):
"""
Break long-pooling process.
Expand Down
8 changes: 7 additions & 1 deletion aiogram/dispatcher/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,19 @@ def unregister(self, handler):
raise ValueError('This handler is not registered!')

async def notify(self, *args, **kwargs):
results = []

for filters, handler in self.handlers:
if await check_filters(filters, args, kwargs):
try:
await handler(*args, **kwargs)
response = await handler(*args, **kwargs)
if results is not None:
results.append(response)
if self.once:
break
except SkipHandler:
continue
except CancelHandler:
break

return results
23 changes: 23 additions & 0 deletions aiogram/dispatcher/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,23 @@ class BaseStorage:
In states-storage you can save current user state and data for all steps
"""

def close(self):
"""
Need override this method and use when application is shutdowns.
You can save data or etc.
:return:
"""
raise NotImplementedError

async def wait_closed(self):
"""
You need override this method for all asynchronously storage's like Redis.
:return:
"""
raise NotImplementedError

@classmethod
def check_address(cls, *,
chat: typing.Union[str, int, None] = None,
Expand Down Expand Up @@ -209,6 +226,12 @@ class DisabledStorage(BaseStorage):
Empty storage. Use it if you don't want to use Finite-State Machine
"""

def close(self):
pass

async def wait_closed(self):
pass

async def get_state(self, *,
chat: typing.Union[str, int, None] = None,
user: typing.Union[str, int, None] = None,
Expand Down

0 comments on commit 882d60f

Please sign in to comment.