Skip to content

Commit a363618

Browse files
committed
refactor(notification): use job to send discord and telegram notifications
1 parent 87a69c1 commit a363618

File tree

4 files changed

+215
-31
lines changed

4 files changed

+215
-31
lines changed

app/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ async def lifespan(app: FastAPI):
6666
openapi_url="/openapi.json" if DOCS else None,
6767
)
6868

69-
scheduler = AsyncIOScheduler(job_defaults={"max_instances": 20}, timezone="UTC")
69+
scheduler = AsyncIOScheduler(job_defaults={"max_instances": 30}, timezone="UTC")
7070
logger = get_logger()
7171

7272

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import asyncio
2+
3+
from app import scheduler
4+
from app.notification.client import process_telegram_queue, process_discord_queue
5+
from app.utils.logger import get_logger
6+
from config import JOB_SEND_NOTIFICATIONS_INTERVAL
7+
8+
logger = get_logger("process-notification-queues")
9+
10+
11+
async def process_all_notification_queues():
12+
"""
13+
Process both Telegram and Discord notification queues concurrently.
14+
Each queue sends messages one by one internally.
15+
"""
16+
logger.debug("Processing notification queues")
17+
18+
await asyncio.gather(
19+
process_telegram_queue(),
20+
process_discord_queue(),
21+
return_exceptions=True
22+
)
23+
24+
25+
# Schedule the job to run at the same interval as webhook notifications
26+
scheduler.add_job(
27+
process_all_notification_queues,
28+
"interval",
29+
seconds=JOB_SEND_NOTIFICATIONS_INTERVAL,
30+
max_instances=1,
31+
coalesce=True
32+
)

app/notification/client.py

Lines changed: 133 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,14 @@
55
from app.settings import notification_settings
66
from app.utils.logger import get_logger
77
from app import on_startup
8+
from app.notification.queue_manager import (
9+
telegram_queue,
10+
discord_queue,
11+
enqueue_telegram,
12+
enqueue_discord,
13+
TelegramNotification,
14+
DiscordNotification,
15+
)
816

917

1018
client = None
@@ -30,51 +38,59 @@ async def define_client():
3038
logger = get_logger("Notification")
3139

3240

33-
async def send_discord_webhook(json_data, webhook):
34-
max_retries = (await notification_settings()).max_retries
41+
async def _send_discord_webhook_direct(json_data, webhook, max_retries: int) -> bool:
42+
"""
43+
Internal function to send Discord webhook with proper retry_after handling.
44+
Returns True if successful, False otherwise.
45+
"""
3546
retries = 0
3647
while retries < max_retries:
3748
try:
3849
response = await client.post(webhook, json=json_data)
3950
if response.status_code in [200, 204]:
4051
logger.debug(f"Discord webhook payload delivered successfully, code {response.status_code}.")
41-
return
52+
return True
4253
elif response.status_code == 429:
4354
retries += 1
4455
if retries < max_retries:
45-
await asyncio.sleep(0.5)
56+
# Extract retry_after from response
57+
try:
58+
retry_after = response.json().get("retry_after", 0.5)
59+
except Exception:
60+
retry_after = 0.5
61+
logger.warning(f"Discord rate limit hit, waiting {retry_after}s (attempt {retries}/{max_retries})")
62+
await asyncio.sleep(retry_after)
4663
continue
4764
else:
4865
response_text = response.text
4966
logger.error(f"Discord webhook failed: {response.status_code} - {response_text}")
50-
return
67+
return False
5168
except Exception as err:
5269
logger.error(f"Discord webhook failed Exception: {str(err)}")
53-
return
70+
return False
5471

5572
logger.error(f"Discord webhook failed after {max_retries} retries")
73+
return False
5674

5775

58-
async def send_telegram_message(
59-
message, chat_id: int | None = None, channel_id: int | None = None, topic_id: int | None = None
60-
):
76+
async def send_discord_webhook(json_data, webhook):
77+
"""Enqueue Discord notification for processing"""
78+
await enqueue_discord(json_data, webhook)
79+
80+
81+
async def _send_telegram_message_direct(
82+
message: str,
83+
chat_id: int | None,
84+
channel_id: int | None,
85+
topic_id: int | None,
86+
max_retries: int,
87+
telegram_api_token: str
88+
) -> bool:
6189
"""
62-
Send a message to Telegram based on the available IDs.
63-
Args:
64-
message (str): The message to send
65-
chat_id (int, optional): The chat ID for direct messages
66-
channel_id (int, optional): The channel ID for channel messages
67-
topic_id (int, optional): The topic ID for forum topics in channels
68-
Returns:
69-
bool: True if message was sent successfully, False otherwise
90+
Internal function to send Telegram message with proper retry_after handling.
91+
Returns True if successful, False otherwise.
7092
"""
71-
# Ensure TELEGRAM_API_TOKEN is available
72-
settings: NotificationSettings = await notification_settings()
73-
if not settings.telegram_api_token:
74-
logger.error("TELEGRAM_API_TOKEN is not defined")
75-
return
76-
77-
base_url = f"https://api.telegram.org/bot{settings.telegram_api_token}/sendMessage"
93+
base_url = f"https://api.telegram.org/bot{telegram_api_token}/sendMessage"
7894
payload = {"parse_mode": "HTML", "text": message}
7995

8096
# Determine the target chat/channel/topic
@@ -87,27 +103,114 @@ async def send_telegram_message(
87103
payload["chat_id"] = chat_id
88104
else:
89105
logger.error("At least one of chat_id, channel_id must be provided")
90-
return
106+
return False
91107

92-
max_retries = settings.max_retries
93108
retries = 0
94109
while retries < max_retries:
95110
try:
96111
response = await client.post(base_url, data=payload)
97112
if response.status_code == 200:
98113
logger.debug(f"Telegram message sent successfully, code {response.status_code}.")
99-
return
114+
return True
100115
elif response.status_code == 429:
101116
retries += 1
102117
if retries < max_retries:
103-
await asyncio.sleep(0.5)
118+
# Extract retry_after from Telegram response
119+
try:
120+
retry_after = response.json().get("parameters", {}).get("retry_after", 0.5)
121+
except Exception:
122+
retry_after = 0.5
123+
logger.warning(f"Telegram rate limit hit, waiting {retry_after}s (attempt {retries}/{max_retries})")
124+
await asyncio.sleep(retry_after)
104125
continue
105126
else:
106127
response_text = response.text
107128
logger.error(f"Telegram message failed: {response.status_code} - {response_text}")
108-
return
129+
return False
109130
except Exception as err:
110131
logger.error(f"Telegram message failed: {str(err)}")
111-
return
132+
return False
112133

113134
logger.error(f"Telegram message failed after {max_retries} retries")
135+
return False
136+
137+
138+
async def send_telegram_message(
139+
message, chat_id: int | None = None, channel_id: int | None = None, topic_id: int | None = None
140+
):
141+
"""
142+
Enqueue a Telegram message for processing.
143+
Args:
144+
message (str): The message to send
145+
chat_id (int, optional): The chat ID for direct messages
146+
channel_id (int, optional): The channel ID for channel messages
147+
topic_id (int, optional): The topic ID for forum topics in channels
148+
"""
149+
await enqueue_telegram(message, chat_id, channel_id, topic_id)
150+
151+
152+
async def process_telegram_queue():
153+
"""
154+
Process Telegram notification queue, sending messages one by one.
155+
"""
156+
settings: NotificationSettings = await notification_settings()
157+
if not settings.telegram_api_token:
158+
return
159+
160+
processed = 0
161+
failed = 0
162+
163+
while not telegram_queue.empty():
164+
try:
165+
notification: TelegramNotification = await telegram_queue.get()
166+
167+
success = await _send_telegram_message_direct(
168+
message=notification.message,
169+
chat_id=notification.chat_id,
170+
channel_id=notification.channel_id,
171+
topic_id=notification.topic_id,
172+
max_retries=settings.max_retries,
173+
telegram_api_token=settings.telegram_api_token
174+
)
175+
176+
if success:
177+
processed += 1
178+
else:
179+
failed += 1
180+
except Exception as err:
181+
logger.error(f"Error processing Telegram notification: {str(err)}")
182+
failed += 1
183+
184+
if processed > 0 or failed > 0:
185+
logger.info(f"Telegram queue processed: {processed} sent, {failed} failed")
186+
187+
188+
async def process_discord_queue():
189+
"""
190+
Process Discord notification queue, sending webhooks one by one.
191+
"""
192+
settings: NotificationSettings = await notification_settings()
193+
194+
processed = 0
195+
failed = 0
196+
197+
while not discord_queue.empty():
198+
try:
199+
notification: DiscordNotification = await discord_queue.get()
200+
201+
success = await _send_discord_webhook_direct(
202+
json_data=notification.json_data,
203+
webhook=notification.webhook,
204+
max_retries=settings.max_retries
205+
)
206+
207+
if success:
208+
processed += 1
209+
else:
210+
failed += 1
211+
except Exception as err:
212+
logger.error(f"Error processing Discord notification: {str(err)}")
213+
failed += 1
214+
215+
if processed > 0 or failed > 0:
216+
logger.info(f"Discord queue processed: {processed} sent, {failed} failed")

app/notification/queue_manager.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
import asyncio
2+
from typing import Optional
3+
from pydantic import BaseModel, Field
4+
5+
6+
class TelegramNotification(BaseModel):
7+
"""Model for Telegram notification queue items"""
8+
message: str
9+
chat_id: Optional[int] = Field(default=None)
10+
channel_id: Optional[int] = Field(default=None)
11+
topic_id: Optional[int] = Field(default=None)
12+
tries: int = Field(default=0)
13+
14+
15+
class DiscordNotification(BaseModel):
16+
"""Model for Discord notification queue items"""
17+
json_data: dict
18+
webhook: str
19+
tries: int = Field(default=0)
20+
21+
22+
# Global queues for Telegram and Discord notifications
23+
telegram_queue: asyncio.Queue[TelegramNotification] = asyncio.Queue()
24+
discord_queue: asyncio.Queue[DiscordNotification] = asyncio.Queue()
25+
26+
27+
async def enqueue_telegram(
28+
message: str,
29+
chat_id: Optional[int] = None,
30+
channel_id: Optional[int] = None,
31+
topic_id: Optional[int] = None
32+
) -> None:
33+
"""Add a Telegram notification to the queue"""
34+
notification = TelegramNotification(
35+
message=message,
36+
chat_id=chat_id,
37+
channel_id=channel_id,
38+
topic_id=topic_id
39+
)
40+
await telegram_queue.put(notification)
41+
42+
43+
async def enqueue_discord(json_data: dict, webhook: str) -> None:
44+
"""Add a Discord notification to the queue"""
45+
notification = DiscordNotification(
46+
json_data=json_data,
47+
webhook=webhook
48+
)
49+
await discord_queue.put(notification)

0 commit comments

Comments
 (0)