Skip to content

Commit

Permalink
refactor(command): improve readability and maintainability
Browse files Browse the repository at this point in the history
Signed-off-by: Rongrong <i@rong.moe>
  • Loading branch information
Rongronggg9 committed Jul 28, 2024
1 parent bf571d0 commit 587c993
Show file tree
Hide file tree
Showing 7 changed files with 604 additions and 383 deletions.
160 changes: 103 additions & 57 deletions src/command/administration.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,12 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.

from __future__ import annotations
from typing import Union, Optional
from typing import Optional
from typing_extensions import Final

import asyncio
import re
from telethon import events, Button
from telethon.tl.patched import Message
from telethon import Button
from telethon.tl import types
from telethon.utils import get_peer_id

Expand All @@ -30,6 +29,7 @@
from ..parsing.post import get_post_from_entry
from .utils import command_gatekeeper, parse_command, logger, parse_customization_callback_data
from . import inner
from .types import *

SELECTED_EMOJI: Final = '🔘'
UNSELECTED_EMOJI: Final = '⚪️'
Expand All @@ -38,17 +38,19 @@


@command_gatekeeper(only_manager=True)
async def cmd_set_option(event: Union[events.NewMessage.Event, Message], *_, lang: Optional[str] = None, **__):
async def cmd_set_option(event: TypeEventMsgHint, *_, lang: Optional[str] = None, **__):
kv = parseKeyValuePair.match(event.raw_text)
if not kv: # return options info
options = db.EffectiveOptions.options
msg = (
f'<b>{i18n[lang]["current_options"]}</b>\n\n'
+ '\n'.join(f'<code>{key}</code> = <code>{value}</code> '
f'({i18n[lang]["option_value_type"]}: <code>{type(value).__name__}</code>)'
for key, value in options.items())
+ '\n\n' + i18n[lang]['cmd_set_option_usage_prompt_html']
)
msg = '\n\n'.join((
f'<b>{i18n[lang]["current_options"]}</b>',
'\n'.join(
f'<code>{key}</code> = <code>{value}</code> '
f'({i18n[lang]["option_value_type"]}: <code>{type(value).__name__}</code>)'
for key, value in options.items()
),
i18n[lang]['cmd_set_option_usage_prompt_html'],
))
await event.respond(msg, parse_mode='html')
return
key, value = kv.groups()
Expand All @@ -72,18 +74,21 @@ async def cmd_set_option(event: Union[events.NewMessage.Event, Message], *_, lan
env.loop.create_task(inner.utils.update_interval(feed))
logger.info("Flushed the interval of all feeds")

await event.respond(f'<b>{i18n[lang]["option_updated"]}</b>\n'
f'<code>{key}</code> = <code>{value}</code>',
parse_mode='html')
await event.respond(
f'<b>{i18n[lang]["option_updated"]}</b>\n'
f'<code>{key}</code> = <code>{value}</code>',
parse_mode='html',
)


@command_gatekeeper(only_manager=True, only_in_private_chat=False, timeout=None if env.DEBUG else 300)
async def cmd_test(
event: Union[events.NewMessage.Event, Message],
event: TypeEventMsgHint,
*_,
lang: Optional[str] = None,
chat_id: Optional[int] = None,
**__):
**__,
):
chat_id = chat_id or event.chat_id

args = parse_command(event.raw_text)
Expand Down Expand Up @@ -123,7 +128,10 @@ async def cmd_test(
return

await asyncio.gather(
*(__send(chat_id, entry, rss_d.feed.title, wf.url) for entry in entries_to_send)
*(
__send(chat_id, entry, rss_d.feed.title, wf.url)
for entry in entries_to_send
)
)

except Exception as e:
Expand All @@ -139,26 +147,38 @@ async def __send(chat_id, entry, feed_title, link):


@command_gatekeeper(only_manager=True)
async def cmd_user_info_or_callback_set_user(event: Union[events.NewMessage.Event, Message, events.CallbackQuery.Event],
*_,
lang: Optional[str] = None,
user_id: Optional[int] = None,
**__):
async def cmd_user_info_or_callback_set_user(
event: TypeEventCollectionMsgOrCb,
*_,
lang: Optional[str] = None,
user_id: Optional[int] = None,
**__,
):
"""
command = `/user_info user_id` or `/user_info @username` or `/user_info`
callback data = set_user={user_id},{state}
"""
is_callback = isinstance(event, events.CallbackQuery.Event)
is_callback = isinstance(event, TypeEventCb)
if user_id:
state = None
user_entity_like = user_id
elif is_callback:
user_entity_like, state, _, _ = parse_customization_callback_data(event.data)
assert user_entity_like is not None
state = int(state)
else:
state = None
args = parse_command(event.raw_text, strip_target_chat=False)
if len(args) < 2 or (not args[1].lstrip('-').isdecimal() and not args[1].startswith('@')):
if (
len(args) < 2
or
not (
args[1].lstrip('-').isdecimal()
or
args[1].startswith('@')
)

):
await event.respond(i18n[lang]['cmd_user_info_usage_prompt_html'], parse_mode='html')
return
user_entity_like = int(args[1]) if args[1].lstrip('-').isdecimal() else args[1].lstrip('@')
Expand Down Expand Up @@ -192,49 +212,75 @@ async def cmd_user_info_or_callback_set_user(event: Union[events.NewMessage.Even
user.state = state
await user.save()
state = None if user_id in env.MANAGER else user.state
default_sub_limit = (db.EffectiveOptions.user_sub_limit
if user_id > 0
else db.EffectiveOptions.channel_or_group_sub_limit)
default_sub_limit = (
db.EffectiveOptions.user_sub_limit
if user_id > 0
else db.EffectiveOptions.channel_or_group_sub_limit
)
if user_created:
sub_count = 0
sub_limit = default_sub_limit
is_default_limit = True
else:
_, sub_count, sub_limit, is_default_limit = await inner.utils.check_sub_limit(user_id, force_count_current=True)

msg_text = (
f"<b>{i18n[lang]['user_info']}</b>\n\n"
+ (f"{name}\n" if name else '')
+ (f"{user_type} " if user_type else '') + f"<code>{user_id}</code>\n"
+ (f"@{username}\n" if username else '')
+ f"\n{i18n[lang]['sub_count']}: {sub_count}"
+ f"\n{i18n[lang]['sub_limit']}: {sub_limit if sub_limit > 0 else i18n[lang]['sub_limit_unlimited']}"
+ (f" ({i18n[lang]['sub_limit_default']})" if is_default_limit else '')
+ (f"\n{i18n[lang]['participant_count']}: {participant_count}" if participant_count else '')
+ (f"\n\n{i18n[lang]['user_state']}: {i18n[lang][f'user_state_{state}']} "
f"({i18n[lang][f'user_state_description_{state}']})" if state is not None else '')
)
buttons = None if user_id in env.MANAGER else tuple(filter(None, (
*(
(Button.inline(
(SELECTED_EMOJI if user.state == btn_state else UNSELECTED_EMOJI)
+ '{prompt} "{state}"'.format(prompt=i18n[lang]['set_user_state_as'],
state=i18n[lang][f'user_state_{btn_state}']),
data='null' if user.state == btn_state else f"set_user={user_id},{btn_state}"
),)
for btn_state in range(-1, 2)
),
(Button.inline(f"{i18n[lang]['reset_sub_limit_to_default']} "
f"({default_sub_limit if default_sub_limit > 0 else i18n[lang]['sub_limit_unlimited']})",
data=f"reset_sub_limit={user_id}"),) if not is_default_limit else None,
(Button.switch_inline(i18n[lang]['set_sub_limit_to'], query=f'/set_sub_limit {user_id} ', same_peer=True),),
msg_text = '\n\n'.join(filter(None, (
f"<b>{i18n[lang]['user_info']}</b>",
'\n'.join(filter(None, (
name,
(f'{user_type} ' if user_type else '') + f'<code>{user_id}</code>',
f'@{username}' if username else '',
))),
'\n'.join(filter(None, (
f"{i18n[lang]['sub_count']}: {sub_count}",
f"{i18n[lang]['sub_limit']}: {sub_limit if sub_limit > 0 else i18n[lang]['sub_limit_unlimited']}" + (
f" ({i18n[lang]['sub_limit_default']})" if is_default_limit else ''
),
f"{i18n[lang]['participant_count']}: {participant_count}" if participant_count else '',
))),
''
if state is None
else f"{i18n[lang]['user_state']}: {i18n[lang][f'user_state_{state}']} "
f"({i18n[lang][f'user_state_description_{state}']})",
)))
await event.edit(msg_text, parse_mode='html', buttons=buttons) if is_callback \
else await event.respond(msg_text, parse_mode='html', buttons=buttons)
buttons = (
None
if user_id in env.MANAGER
else tuple(filter(None, (
*(
(Button.inline(
'{emoji}{prompt} "{state}"'.format(
emoji=SELECTED_EMOJI if user.state == btn_state else UNSELECTED_EMOJI,
prompt=i18n[lang]['set_user_state_as'],
state=i18n[lang][f'user_state_{btn_state}'],
),
data='null' if user.state == btn_state else f"set_user={user_id},{btn_state}"
),)
for btn_state in range(-1, 2)
),
None
if is_default_limit
else (Button.inline(
f"{i18n[lang]['reset_sub_limit_to_default']} "
f"({default_sub_limit if default_sub_limit > 0 else i18n[lang]['sub_limit_unlimited']})",
data=f"reset_sub_limit={user_id}",
),),
(Button.switch_inline(
i18n[lang]['set_sub_limit_to'],
query=f'/set_sub_limit {user_id} ',
same_peer=True,
),),
)))
)
await (
event.edit(msg_text, parse_mode='html', buttons=buttons)
if is_callback
else event.respond(msg_text, parse_mode='html', buttons=buttons)
)


@command_gatekeeper(only_manager=True)
async def callback_reset_sub_limit(event: events.CallbackQuery.Event, *_, lang: Optional[str] = None, **__):
async def callback_reset_sub_limit(event: TypeEventCb, *_, lang: Optional[str] = None, **__):
"""
callback data = reset_sub_limit={user_id}
"""
Expand All @@ -248,7 +294,7 @@ async def callback_reset_sub_limit(event: events.CallbackQuery.Event, *_, lang:


@command_gatekeeper(only_manager=True)
async def cmd_set_sub_limit(event: Union[events.NewMessage.Event, Message], *_, lang: Optional[str] = None, **__):
async def cmd_set_sub_limit(event: TypeEventMsgHint, *_, lang: Optional[str] = None, **__):
"""
command = `/set_sub_limit user_id sub_limit`
"""
Expand Down
Loading

0 comments on commit 587c993

Please sign in to comment.