From 1b4342c94ea829653fa68e548a87fc0fbafa8e49 Mon Sep 17 00:00:00 2001 From: Weather Date: Fri, 1 May 2026 23:10:42 -0400 Subject: [PATCH] feat: Caches Slack Events --- src/api/endpoints.py | 93 +-------------------- src/core/cshcalendar.py | 8 +- src/core/slack.py | 161 ++++++++++++++++++++++++++++++++++++- src/modules/taskmanager.py | 43 ++++++++++ 4 files changed, 212 insertions(+), 93 deletions(-) create mode 100644 src/modules/taskmanager.py diff --git a/src/api/endpoints.py b/src/api/endpoints.py index f60e70f..96b2975 100644 --- a/src/api/endpoints.py +++ b/src/api/endpoints.py @@ -1,21 +1,13 @@ from logging import getLogger, Logger -import json -import httpx -import asyncio - from fastapi import APIRouter, Request, Form from fastapi.responses import JSONResponse from core import slack, wikithoughts, cshcalendar -from config import WATCHED_CHANNELS logger: Logger = getLogger(__name__) router: APIRouter = APIRouter() -ACCEPT_MESSAGE: str = "Posting right now :^)" -DENY_MESSAGE: str = "Okay :( maybe next time" - @router.get("/calendar") async def get_calendar() -> JSONResponse: @@ -64,46 +56,7 @@ async def slack_events(request: Request) -> JSONResponse: JSONResponse: A JSON response indicating the result of the event handling. """ - try: - body: dict = await request.json() - logger.info(f"Received Slack event: {body}") - - if request.headers.get("x-slack-retry-num"): - logger.info("SLACK EVENT: Ignoring Slack retry") - return JSONResponse({"status": "ignored"}) - - if request.headers.get("content-type") == "application/json": - if body.get("type") == "url_verification": - logger.info("SLACK EVENT: Was a challenge!") - return JSONResponse({"challenge": body.get("challenge")}) - - if not body: - logger.debug("SLACK EVENT: Was a challenge, with no body") - return JSONResponse({"challenge": body.get("challenge")}) - - event: dict = body.get("event", {}) - cleaned_text: str = slack.clean_text(event.get("text", "")) - - if event.get("subtype", None) is not None: - logger.info("SLACK EVENT: Had no subtype, ignoring it") - return JSONResponse({"status": "ignored"}) - - if event.get("channel", None) not in WATCHED_CHANNELS: - logger.info( - "SLACK EVENT: Message was not in a Watched Channel, ignoring it" - ) - return JSONResponse({"status": "ignored"}) - - logger.info("SLACK EVENT: Requesting upload via dm!") - - asyncio.create_task( - slack.request_upload_via_dm(event.get("user", ""), cleaned_text) - ) - except Exception as e: - logger.error(f"Error handling Slack event: {e}") - return JSONResponse({"status": "error", "message": str(e)}) - - return JSONResponse({"status": "success"}) + return JSONResponse(slack.process_slack_events(request)) @router.post("/slack/message_actions") @@ -118,48 +71,8 @@ async def message_actions(payload: str = Form(...)) -> JSONResponse: JSONResponse: A JSON response indicating the result of the action. """ - try: - form_json: dict = json.loads(payload) - response_url = form_json.get("response_url") - - if form_json.get("type") != "block_actions": - return JSONResponse({}, status_code=200) - - if slack.convert_user_response_to_bool(form_json): - logger.info( - "User approved the announcement, Adding it to the announcement list!" - ) - - message_object: dict[str, dict] = json.loads( - form_json.get("actions", [{}])[0].get("value", '{text:""}') - ).get("text", None) - - user_id = form_json.get("user", {}).get("id") - - username: str = await slack.get_username(user_id=user_id) - username = username[:40] - - slack.add_announcement(message_object, username) - - if response_url: - async with httpx.AsyncClient() as client: - await client.post( - response_url, - json={"text": ACCEPT_MESSAGE, "replace_original": True}, - ) - else: - if response_url: - async with httpx.AsyncClient() as client: - await client.post( - response_url, - json={"text": DENY_MESSAGE, "replace_original": True}, - ) - - except Exception as e: - logger.error(f"Error in message_actions: {e}") - return JSONResponse({"status": "error", "message": str(e)}, status_code=500) - - return JSONResponse({"status": "success"}, status_code=200) + response_dict, status_code = slack.process_slack_message_actions(payload) + return JSONResponse(response_dict, status_code=status_code) @router.get("/wikithought") diff --git a/src/core/cshcalendar.py b/src/core/cshcalendar.py index a5788cc..2e16cae 100644 --- a/src/core/cshcalendar.py +++ b/src/core/cshcalendar.py @@ -7,6 +7,9 @@ import recurring_ical_events import arrow import re +import asyncio + +from modules import taskmanager from config import ( CALENDAR_CACHE_REFRESH, @@ -15,7 +18,6 @@ CALENDAR_TIMEZONE, CALENDAR_URL, ) -import asyncio calendar_cache: list[CalendarInfo] = [] # The current cache of the calendar cal_last_update: date | None = ( @@ -292,9 +294,11 @@ async def get_future_events() -> list[CalendarInfo]: if cal_correct_length: logger.info("Calendar cache is full length, rebuilding async!") - asyncio.create_task( + + taskmanager.create_background_task( rebuild_calendar() ) # Calendar is correct length, we can just run this in the background + else: logger.info("Calendar cache is NOT full length, yielding rebuild!") await rebuild_calendar() diff --git a/src/core/slack.py b/src/core/slack.py index 699d6ca..544d21a 100644 --- a/src/core/slack.py +++ b/src/core/slack.py @@ -8,19 +8,34 @@ from slack_sdk.web.slack_response import SlackResponse from slack_sdk.errors import SlackApiError +from modules import taskmanager + from config import ( SLACK_API_TOKEN, SLACK_JUMPSTART_MESSAGE, SLACK_DM_TEMPLATE, CALENDAR_TIMEZONE, + WATCHED_CHANNELS, ) + from datetime import datetime from zoneinfo import ZoneInfo +from fastapi import Request + +import httpx logger: Logger = getLogger(__name__) +client: AsyncWebClient | None = None +event_id_cache: dict[str, str] = {} +EVENT_CACHE_DEBOUNCE = ( + 60 # Hold event in for one minute? I think its fine genuiflowkirkenuinelowskinly +) -client: AsyncWebClient | None = None +ACCEPT_MESSAGE: str = "Posting right now :^)" +DENY_MESSAGE: str = ( + "RAHHHHHHHHHHHHHHHHHHHHHHHH HOW DARE YOU :skeleton-shield-banging-here:" +) try: client = AsyncWebClient(token=SLACK_API_TOKEN) @@ -52,6 +67,40 @@ def clean_text(raw: str) -> str: return text.replace("*", "").replace("_", "").replace("`", "").strip() +async def reset_event_from_cache(event_id: str) -> None: + """ + Removes an event from the cache + + Arguments: + event_id (str): The id of the slack event + """ + global event_id_cache + + event_id_cache[event_id] = None + return + + +async def get_event_retry_amount(event_id: str) -> int: + """ + Returns the amount of times a event has been retried + + Arguments: + event_id (str): The id of the slack event + + Returns: + int: The amount of times the event has been retried + """ + + global event_id_cache + + if event_id in event_id_cache: + event_id_cache += 1 + return event_id_cache + + event_id_cache[event_id] = 0 + taskmanager.create_background_task(reset_event_from_cache(event_id)) + + async def gather_emojis() -> dict: """ Gathers emojis from Slack and returns a mapping of emoji names to their URLs. @@ -134,6 +183,116 @@ async def request_upload_via_dm(user_id: str, announcement_text: str) -> None: logger.error(f"Error messaging user {user_id}: {e}") +async def process_slack_events(request: Request) -> dict[str, str]: + """ + Processes a slack event, logging and returning the result from the event + + Arguments: + request (Request): The slack event to be processed + + Returns: + dict[str, str]: The dictionary to be responded to. + """ + + try: + body: dict = await request.json() + logger.info(f"Received Slack event: {body}") + + event_amounts: int = get_event_retry_amount(body.get("event_id"), None) + if event_amounts > 0: + logger.info( + f"SLACK EVENT: Retried event for {body.get('event_id', None)} {event_amounts} time(s)!" + ) + return + + # Challenge from Bot Authentication + if request.headers.get("content-type") == "application/json": + if body.get("type") == "url_verification": + logger.info("SLACK EVENT: Was a challenge!") + return {"challenge": body.get("challenge")} + + if not body: + logger.debug("SLACK EVENT: Was a challenge, with no body") + return {"challenge": body.get("challenge")} + + event: dict = body.get("event", {}) + + if event.get("subtype", None) is not None: + logger.info("SLACK EVENT: Had a subtype, ignoring it") + return {"status": "ignored"} + + if event.get("channel", None) not in WATCHED_CHANNELS: + logger.info( + "SLACK EVENT: Message was not in a Watched Channel, ignoring it" + ) + return {"status": "ignored"} + + logger.info("SLACK EVENT: Requesting upload via dm!") + cleaned_text: str = clean_text(event.get("text", "")) + + taskmanager.create_background_task( + request_upload_via_dm(event.get("user", ""), cleaned_text) + ) + except Exception as e: + logger.error(f"Error handling Slack event: {e}") + return {"status": "error", "message": str(e)} + + return {"status": "success"} + + +async def process_slack_message_actions(payload: str): + + try: + form_json: dict = json.loads(payload) + response_url = form_json.get("response_url") + + event_amounts: int = get_event_retry_amount(form_json.get("event_id"), None) + if event_amounts > 0: + logger.info( + f"SLACK MESSAGE ACTION: Retried event for {form_json.get('event_id', None)} {event_amounts} time(s)!" + ) + return + + if form_json.get("type") != "block_actions": + return ({}, 200) + + if convert_user_response_to_bool(form_json): + logger.info( + "User approved the announcement, Adding it to the announcement list!" + ) + + message_object: dict[str, dict] = json.loads( + form_json.get("actions", [{}])[0].get("value", '{text:""}') + ).get("text", None) + + user_id = form_json.get("user", {}).get("id") + + username: str = await get_username(user_id) + username = username[:40] + + add_announcement(message_object, username) + + if response_url: + async with httpx.AsyncClient() as client: + await client.post( + response_url, + json={"text": ACCEPT_MESSAGE, "replace_original": True}, + ) + else: + if response_url: + async with httpx.AsyncClient() as client: + await client.post( + response_url, + json={"text": DENY_MESSAGE, "replace_original": True}, + ) + + except Exception as e: + logger.error(f"Error in message_actions: {e}") + return ({"status": "error", "message": str(e)}, 500) + + return ({"status": "success"}, 200) + + def convert_user_response_to_bool(message_data: dict) -> bool: """ Converts a Slack message action response to a boolean indicating whether the user approved the announcement. diff --git a/src/modules/taskmanager.py b/src/modules/taskmanager.py new file mode 100644 index 0000000..2dd58b0 --- /dev/null +++ b/src/modules/taskmanager.py @@ -0,0 +1,43 @@ +import asyncio + +from logging import getLogger, Logger +from collections.abc import Coroutine + +logger: Logger = getLogger(__name__) + +running_background_tasks: set[asyncio.Task] = set() + + +def handle_task_exception(task: asyncio.Task) -> None: + """ + Views ended tasks result, throwing the end error if applicable + + Arguments: + Task (asyncio.Task): The task to be + """ + try: + task.result() + except asyncio.CancelledError: + logger.info("Background task was cancelled") + except Exception as e: + logger.error(f"Background task failed: {e}") + + return + + +def create_background_task(coroutine: Coroutine) -> asyncio.Task: + """ + Creates and executes a background task, holding a strong reference to avoid GC + + Arguments: + coroutine (Coroutine): The coroutine object that was created + + Returns: + asyncio.Task: The task object that was created and executing + """ + + task: asyncio.Task = asyncio.create_task(coroutine) + running_background_tasks.add(task) + task.add_done_callback(running_background_tasks.discard) + task.add_done_callback(handle_task_exception) + return task