diff --git a/README.md b/README.md index cff8240..10685d9 100644 --- a/README.md +++ b/README.md @@ -36,9 +36,9 @@ dispatcher = discatcore.Dispatcher() intents = 3243773 gateway = discatcore.GatewayClient(http, dispatcher, intents=intents.value) -@dispatcher.new_event("ready").callback -async def ready(event: discord_typings.ReadyData): - print(event) +@dispatcher.listen_to(discatcore.gateway.ReadyEvent) +async def ready(event): + print(event.data) async def main(): url: str | None = None diff --git a/discatcore/gateway/__init__.py b/discatcore/gateway/__init__.py index 513714b..0cfc1b7 100644 --- a/discatcore/gateway/__init__.py +++ b/discatcore/gateway/__init__.py @@ -6,8 +6,10 @@ """ from .client import * +from .events import * from .ratelimiter import * __all__ = () __all__ += client.__all__ +__all__ += events.__all__ __all__ += ratelimiter.__all__ diff --git a/discatcore/gateway/client.py b/discatcore/gateway/client.py index c6a86a0..8509b17 100644 --- a/discatcore/gateway/client.py +++ b/discatcore/gateway/client.py @@ -16,8 +16,9 @@ from ..errors import GatewayReconnect from ..http import HTTPClient +from ..utils import json from ..utils.dispatcher import Dispatcher -from ..utils.json import dumps, loads +from . import events from .ratelimiter import Ratelimiter from .types import BaseTypedWSMessage, is_binary, is_text @@ -183,7 +184,7 @@ async def send(self, data: Mapping[str, t.Any]) -> None: return await self.ratelimiter.acquire() - await self._ws.send_json(data, dumps=dumps) + await self._ws.send_json(data, dumps=json.dumps) _log.debug("Sent JSON payload %s to the Gateway.", data) async def receive(self) -> t.Optional[bool]: @@ -214,7 +215,7 @@ async def receive(self) -> t.Optional[bool]: else: received_msg = t.cast(str, typed_msg.data) - self.recent_payload = t.cast(dt.GatewayEvent, loads(received_msg)) + self.recent_payload = t.cast(dt.GatewayEvent, json.loads(received_msg)) _log.debug("Received payload from the Gateway: %s", self.recent_payload) self.sequence = self.recent_payload.get("s") return True @@ -279,32 +280,33 @@ async def connection_loop(self) -> None: if res and self.recent_payload is not None: op = int(self.recent_payload["op"]) - if op == DISPATCH and self.recent_payload.get("t") is not None: - event_name = str(self.recent_payload.get("t")).lower() - data = self.recent_payload.get("d") + if op == DISPATCH and (event_name := self.recent_payload.get("t")) is not None: + data = t.cast(json.JSONObject, self.recent_payload.get("d")) - if event_name == "ready": + self._dispatcher.consume(event_name, self, data) + await self._dispatcher.dispatch( + events.DispatchEvent(t.cast(t.Mapping[str, t.Any], data)) + ) + + if event_name == "READY": ready_data = t.cast(dt.ReadyData, data) self.session_id = ready_data["session_id"] self.resume_url = ready_data["resume_gateway_url"] - args = (data,) - if data is None: - args = () - self._dispatcher.dispatch(event_name, *args) + await self._dispatcher.dispatch(events.ReadyEvent(ready_data)) # these should be rare, but it's better to be safe than sorry elif op == HEARTBEAT: await self.heartbeat() elif op == RECONNECT: - self._dispatcher.dispatch("reconnect") + await self._dispatcher.dispatch(events.ReconnectEvent()) await self.close(code=1012) return elif op == INVALID_SESSION: self.can_resume = bool(self.recent_payload.get("d")) - self._dispatcher.dispatch("invalid_session", self.can_resume) + await self._dispatcher.dispatch(events.InvalidSessionEvent(self.can_resume)) await self.close(code=1012) return diff --git a/discatcore/gateway/events.py b/discatcore/gateway/events.py new file mode 100644 index 0000000..a15e36e --- /dev/null +++ b/discatcore/gateway/events.py @@ -0,0 +1,48 @@ +# SPDX-License-Identifier: MIT +from __future__ import annotations + +import typing as t + +import attr +import discord_typings as dt + +from ..utils.event import Event + +__all__ = ( + "GatewayEvent", + "DispatchEvent", + "InvalidSessionEvent", + "ReadyEvent", + "ReconnectEvent", + "ResumedEvent", +) + + +@attr.define +class GatewayEvent(Event): + pass + + +@attr.define +class DispatchEvent(GatewayEvent): + data: t.Mapping[str, t.Any] + + +@attr.define +class ReadyEvent(GatewayEvent): + data: dt.ReadyData + + +@attr.define +class ResumedEvent(GatewayEvent): + pass + + +@attr.define +class ReconnectEvent(GatewayEvent): + pass + + +@attr.define +class InvalidSessionEvent(GatewayEvent): + resumable: bool diff --git a/discatcore/utils/dispatcher.py b/discatcore/utils/dispatcher.py index 67d1c1a..af028c0 100644 --- a/discatcore/utils/dispatcher.py +++ b/discatcore/utils/dispatcher.py @@ -1,162 +1,192 @@ # SPDX-License-Identifier: MIT +from __future__ import annotations + import asyncio import inspect import logging import traceback import typing as t -from collections.abc import Callable, Coroutine - -from .event import Event - -_log = logging.getLogger(__name__) - -__all__ = ("Dispatcher",) - -T = t.TypeVar("T") -Func = Callable[..., T] -CoroFunc = Func[Coroutine[t.Any, t.Any, t.Any]] - - -class Dispatcher: - """A class that helps manage events. - - Attributes: - events (dict[str, Event]): The callbacks for each event. - """ - - __slots__ = ("events",) - - def __init__(self) -> None: - self.events: dict[str, Event] = {} +from collections import defaultdict - def get_event(self, name: str) -> t.Optional[Event]: - """Returns an event with the name provided. +import attr +from typing_extensions import Self, TypeGuard - Args: - name (str): The name of the event that will be returned. +from .event import Event, EventT, ExceptionEvent +from .json import JSONObject - Returns: - The event, none if not found. - """ - return self.events.get(name) +if t.TYPE_CHECKING: + from ..gateway import GatewayClient - def new_event(self, name: str) -> Event: - """Creates a new event. Returns this new event after creation. +_log = logging.getLogger(__name__) - Args: - name (str): The name of the new event. +__all__ = ( + "Consumer", + "consumer_for", + "Dispatcher", +) - Returns: - The new event created. - """ - new_event = Event(name, self) - self.events[name] = new_event - return new_event +T = t.TypeVar("T") +DispatcherT = t.TypeVar("DispatcherT", bound="Dispatcher") +ListenerCallbackT = t.TypeVar("ListenerCallbackT", bound="ListenerCallback[Event]") +Coro = t.Coroutine[T, t.Any, t.Any] - def add_event(self, event: Event, *, override: bool = True) -> None: - """Adds a new pre-existing event. +ListenerCallback = t.Callable[[EventT], Coro[None]] +ConsumerCallback = t.Callable[[DispatcherT, "GatewayClient", JSONObject], Coro[None]] - Args: - event (Event): The event to add. - """ - if self.has_event(event.name) and not override: - return - self.events[event.name] = event +@attr.define +class Consumer(t.Generic[DispatcherT]): + """Represents a dispatcher consumer. A consumer consumes a raw event and performs actions based on the raw event.""" - def remove_event(self, name: str) -> None: - """Removes an event. + callback: ConsumerCallback[DispatcherT] + events: tuple[str, ...] - Args: - name (str): The name of the event to remove. - """ - if name not in self.events: - raise ValueError(f"There is no event with name {name}!") - del self.events[name] - _log.debug("Removed event with name %s", name) +def consumer_for( + *events: str, +) -> t.Callable[[ConsumerCallback[DispatcherT]], Consumer[DispatcherT]]: + def wrapper(func: ConsumerCallback[DispatcherT]) -> Consumer[DispatcherT]: + return Consumer(func, events) - def has_event(self, name: str) -> bool: - """Check if this dispatcher already has a event. + return wrapper - Args: - name (str): The name of the event to find. - Returns: - A bool correlating to if there is a event with that name or not. - """ - return name in self.events +def _is_exception_event(e: EventT) -> TypeGuard[ExceptionEvent[EventT]]: + return isinstance(e, ExceptionEvent) - def callback_for( - self, event: str, *, one_shot: bool = False, force_parent: bool = False - ) -> Callable[[CoroFunc], Event]: - """A shortcut decorator to add a callback to an event. - If the event does not exist already, then a new one will be created. - Args: - event: The name of the event to get or create. - one_shot: Whether or not the callback should be a one shot (which means the callback will be removed after running). Defaults to False. - force_parent: Whether or not this callback contains a self parameter. Defaults to False. +class Dispatcher: + """A class that helps manage events.""" - Returns: - A wrapper function that acts as the actual decorator. - """ + __slots__ = ("_listeners", "_consumers") - def wrapper(coro: CoroFunc): - if not self.has_event(event): - event_cls = self.new_event(event) + def __init__(self) -> None: + self._listeners: defaultdict[type[Event], list[ListenerCallback[Event]]] = defaultdict(list) + self._consumers: dict[str, Consumer[Self]] = {} + + for name, value in inspect.getmembers(self): + if not isinstance(value, Consumer): + continue + + self._consumers[name.lower()] = value + + for event_name in value.events: + self._consumers[event_name.lower()] = value + + async def _run_listener(self, event: EventT, listener: ListenerCallback[EventT]) -> None: + try: + await listener(event) + except asyncio.CancelledError: + pass + except Exception as e: + if _is_exception_event(event): + _log.error( + "There was an error while running the listener callback (%s%s) under exception event %s.%s: %s", + listener.__name__, + inspect.signature(listener), + type(event).__module__, + type(event).__qualname__, + traceback.format_exception(type(e), e, e.__traceback__), + ) else: - event_cls = self.events[event] - - event_cls.add_callback(coro, one_shot=one_shot, force_parent=force_parent) - return event_cls + exec_event = ExceptionEvent( + exception=e, failed_event=event, failed_listener=listener + ) + + _log.info( + "An exception occured while handling %s.%s.", + type(event).__module__, + type(event).__qualname__, + ) + await self.dispatch(exec_event) + + async def _handle_consumer( + self, consumer: ConsumerCallback[Self], gateway: GatewayClient, payload: JSONObject + ): + try: + await consumer(self, gateway, payload) + except asyncio.CancelledError: + pass + except Exception as e: + asyncio.get_running_loop().call_exception_handler( + { + "message": "An exception occured while consuming a raw event.", + "exception": e, + "task": asyncio.current_task(), + } + ) - return wrapper + def subscribe(self, event: type[EventT], func: ListenerCallback[EventT]) -> None: + if not asyncio.iscoroutinefunction(func): + raise TypeError(f"listener callback {func.__name__!r} has to be a coroutine function!") + + _log.debug( + "Subscribing listener callback (%s%s) to event %s.%s", + func.__name__, + inspect.signature(func), + event.__module__, + event.__qualname__, + ) + self._listeners[event].append(func) # pyright: ignore + + def unsubscribe(self, event: type[EventT], func: ListenerCallback[EventT]) -> None: + listeners = self._listeners.get(event) + if not listeners: + return - # global error handler + _log.debug( + "Unsubscribing listener callback (%s%s) from event %s.%s", + func.__name__, + inspect.signature(func), + event.__module__, + event.__qualname__, + ) + listeners.remove(func) # pyright: ignore - async def error_handler(self, exception: Exception) -> None: - """Basic error handler for dispatched events. + if not listeners: + del self._listeners[event] - Args: - exception (Exception): The exception from the dispatched event. - """ - traceback.print_exception(type(exception), exception, exception.__traceback__) + def listen_to( + self, *, events: list[type[EventT]] + ) -> t.Callable[[ListenerCallback[EventT]], ListenerCallback[EventT]]: + def wrapper(func: ListenerCallback[EventT]) -> ListenerCallback[EventT]: + for event in events: + self.subscribe(event, func) - def override_error_handler(self, func: CoroFunc) -> None: - """Overrides a new error handler for dispatched events. + return func - Args: - func (CoroFunc): The new error handler. - """ - if not asyncio.iscoroutinefunction(func): - raise TypeError("Callback provided is not a coroutine.") + return wrapper - orig_handler_sig = inspect.signature(self.error_handler) - new_handler_sig = inspect.signature(func) + def dispatch(self, event: Event) -> asyncio.Future[t.Any]: + _log.debug( + "Dispatching event %s.%s (which dispatches event(s) %r).", + type(event).__module__, + type(event).__qualname__, + [f"{e.__module__}.{e.__qualname__}" for e in event.dispatches], + ) + dispatched: t.List[Coro[None]] = [] - if orig_handler_sig.parameters != new_handler_sig.parameters: - raise TypeError( - "Overloaded error handler does not have the same parameters as original error handler." - ) + for event_type in event.dispatches: + for listener in self._listeners.get(event_type, []): + dispatched.append(self._run_listener(event, listener)) - setattr(self, "error_handler", func) - _log.debug("Registered new error handler") + def _completed_future() -> asyncio.Future[None]: + future = asyncio.get_running_loop().create_future() + future.set_result(None) + return future - # dispatch + return asyncio.gather(*dispatched) if dispatched else _completed_future() - def dispatch(self, name: str, *args: t.Any, **kwargs: t.Any) -> None: - """Dispatches a event. This will trigger the all of the event's - callbacks. + def consume(self, event: str, gateway: GatewayClient, payload: JSONObject): + consumer = self._consumers.get(event) - Args: - name (str): The name of the event to dispatch. - *args (t.Any): Arguments to pass into the event. - **kwargs (t.Any): Keyword arguments to pass into the event. - """ - _log.debug("Dispatching event %s", name) - event = self.events.get(name) + if not consumer: + _log.info("Consumer %s does not exist. Skipping consumption.", event) + return - if event is not None: - event.dispatch(*args, **kwargs) + _log.debug("Consuming raw event %s.", event) + asyncio.create_task( + self._handle_consumer(consumer.callback, gateway, payload), + name=f"DisCatCore Consumer {event}", + ) diff --git a/discatcore/utils/event.py b/discatcore/utils/event.py index fa4905c..e5b3d00 100644 --- a/discatcore/utils/event.py +++ b/discatcore/utils/event.py @@ -2,285 +2,42 @@ from __future__ import annotations -import asyncio -import inspect -import logging import typing as t -from collections.abc import Callable, Coroutine -from dataclasses import dataclass -if t.TYPE_CHECKING: - from .dispatcher import Dispatcher - -_log = logging.getLogger(__name__) +import attr -__all__ = ("Event",) +from .functools import classproperty -T = t.TypeVar("T") -Func = Callable[..., T] -CoroFunc = Func[Coroutine[t.Any, t.Any, t.Any]] +if t.TYPE_CHECKING: + from .dispatcher import ListenerCallback +__all__ = ("Event", "ExceptionEvent") -@dataclass -class _EventCallbackMetadata: - one_shot: bool = False - parent: bool = False +EventT = t.TypeVar("EventT", bound="Event") class Event: - """Represents an event for a dispatcher. - - Args: - name (str): The name of this event. - parent (Dispatcher): The parent dispatcher of this event. - - Attributes: - name (str): The name of this event. - parent (Dispatcher): The parent dispatcher of this event. - callbacks (list[Callable[..., Coroutine[t.Any, t.Any, t.Any]]]): The callbacks for this event. - metadata (dict[Callable[..., Coroutine[t.Any, t.Any, t.Any]], _EventCallbackMetadata]): The metadata for the callbacks for this event. - _proto (t.Optional[inspect.Signature]): The prototype of this event. - This will define what signature all of the callbacks will have. - _error_handler (Callable[..., Coroutine[t.Any, t.Any, t.Any]]): The error handler of this event. - The error handler will be run whenever an event dispatched raises an error. - Defaults to the error handler from the parent dispatcher. - """ - - def __init__(self, name: str, parent: Dispatcher) -> None: - self.name: str = name - self.parent: Dispatcher = parent - self.callbacks: list[CoroFunc] = [] - self.metadata: dict[CoroFunc, _EventCallbackMetadata] = {} - self._proto: t.Optional[inspect.Signature] = None - self._error_handler: CoroFunc = self.parent.error_handler - - # setters/decorators - - def set_proto( - self, - proto_func: t.Union[Func[t.Any], staticmethod[..., t.Any]], - *, - force_parent: bool = False, - ) -> None: - """Sets the prototype for this event. - - Args: - proto_func (Callable[..., t.Any]): The prototype for this event. - force_parent (bool): Whether or not this callback contains a self parameter. Defaults to ``False``. - """ - is_static = isinstance(proto_func, staticmethod) - if is_static: - proto_func = proto_func.__func__ - - if not self._proto: - sig = inspect.signature(proto_func) - if force_parent and not is_static: - new_params = list(sig.parameters.values()) - new_params.pop(0) - sig = sig.replace(parameters=new_params) - self._proto = sig - - _log.debug("Registered new event prototype under event %s", self.name) - else: - raise ValueError(f"Event prototype for event {self.name} has already been set!") - - @t.overload - def proto(self, func: CoroFunc, *, force_parent: bool = ...) -> Event: - pass - - @t.overload - def proto( - self, func: None = ..., *, force_parent: bool = ... - ) -> Callable[[Func[t.Any]], Event]: - pass - - def proto( - self, - func: t.Optional[t.Union[Func[t.Any], staticmethod[..., t.Any]]] = None, - *, - force_parent: bool = False, - ) -> t.Union[Event, Callable[[Func[t.Any]], Event]]: - """A decorator to set the prototype of this event. - - Args: - func (t.Optional[Callable[..., t.Any]]): The prototype to pass into this decorator. Defaults to ``None``. - force_parent (bool): Whether or not this callback contains a self parameter. Defaults to ``False``. - - Returns: - Either this event object or a wrapper function that acts as the actual decorator. - This depends on if the ``func`` arg was passed in. - """ - - def wrapper(func: t.Union[Func[t.Any], staticmethod[..., t.Any]]): - self.set_proto(func, force_parent=force_parent) - return self - - if func: - return wrapper(func) - return wrapper - - def set_error_handler(self, func: CoroFunc) -> None: - """Overrides the error handler of this event. - - Args: - func (Callable[..., Coroutine[t.Any, t.Any, t.Any]]): The new error handler for this event. - """ - if not asyncio.iscoroutinefunction(func): - raise TypeError("Callback provided is not a coroutine.") - - orig_handler_sig = inspect.signature(self._error_handler) - new_handler_sig = inspect.signature(func) - - if len(orig_handler_sig.parameters) != len(new_handler_sig.parameters): - raise TypeError( - "Overloaded error handler does not have the same parameters as original error handler." - ) - - self._error_handler = func - _log.debug("Registered new error handler under event %s", self.name) - - def error_handler(self) -> Callable[[Func[t.Any]], Event]: - """A decorator to override the error handler of this event. - - Returns: - A wrapper function that acts as the actual decorator. - """ - - def wrapper(func: CoroFunc): - self.set_error_handler(func) - return self - - return wrapper - - def add_callback( - self, func: CoroFunc, *, one_shot: bool = False, force_parent: bool = False - ) -> None: - """Adds a new callback to this event. - - Args: - func (Callable[..., Coroutine[t.Any, t.Any, t.Any]]): The callback to add to this event. - one_shot (bool): Whether or not the callback should be a one shot (which means the callback will be removed after running). Defaults to False. - force_parent (bool): Whether or not this callback contains a self parameter. Defaults to False. - """ - if not self._proto: - self.set_proto(func, force_parent=force_parent) - # this is to prevent static type checkers from inferring that self._proto is - # still None after setting it indirectly via a different function - # (it should never go here tho because exceptions stop the flow of this code - # and it should be set if we don't reach t.Any exceptions) - if not self._proto: - return - - if not asyncio.iscoroutinefunction(func): - raise TypeError("Callback provided is not a coroutine.") - - callback_sig = inspect.signature(func) - if force_parent: - new_params = list(callback_sig.parameters.values()) - new_params.pop(0) - callback_sig = callback_sig.replace(parameters=new_params) - - if len(self._proto.parameters) != len(callback_sig.parameters): - raise TypeError( - "Event callback parameters do not match up with the event prototype parameters." - ) - - metadat = _EventCallbackMetadata(one_shot) - self.metadata[func] = metadat - self.callbacks.append(func) - - _log.debug("Registered new event callback under event %s", self.name) - - def remove_callback(self, index: int) -> None: - """Removes a callback located at a certain index. - - Args: - index (int): The index where the callback is located. - """ - if len(self.callbacks) - 1 < index: - raise ValueError(f"Event {self.name} has less callbacks than the index provided!") - - del self.callbacks[index] - _log.debug("Removed event callback with index %d under event %s", index, self.name) - - @t.overload - def callback(self, func: CoroFunc, *, one_shot: bool = ..., force_parent: bool = ...) -> Event: - pass - - @t.overload - def callback( - self, func: None = ..., *, one_shot: bool = ..., force_parent: bool = ... - ) -> Callable[[Func[t.Any]], Event]: - pass - - def callback( - self, - func: t.Optional[CoroFunc] = None, - *, - one_shot: bool = False, - force_parent: bool = False, - ) -> t.Union[Event, Callable[[Func[t.Any]], Event]]: - """A decorator to add a callback to this event. - - Args: - func (t.Optional[Callable[..., Coroutine[t.Any, t.Any, t.Any]]]): The function to pass into this decorator. Defaults to None. - one_shot (bool): Whether or not the callback should be a one shot (which means the callback will be removed after running). Defaults to False. - force_parent (bool): Whether or not this callback contains a self parameter. Defaults to False. - - Returns: - Either this event object or a wrapper function that acts as the actual decorator. - This depends on if the ``func`` arg was passed in. - """ - - def wrapper(func: CoroFunc): - self.add_callback(func, one_shot=one_shot, force_parent=force_parent) - return self - - if func: - return wrapper(func) - return wrapper + """Represents a dispatcher event. An event class contains information about an event for use in listeners.""" - # dispatch + __slots__ = () - async def _run(self, coro: CoroFunc, *args: t.Any, **kwargs: t.Any) -> None: - try: - await coro(*args, **kwargs) - except asyncio.CancelledError: - pass - except Exception as e: - try: - await self._error_handler(e) - except asyncio.CancelledError: - pass + __dispatches: tuple[type[Event], ...] - def _schedule_task( - self, - coro: CoroFunc, - index: t.Optional[int], - *args: t.Any, - **kwargs: t.Any, - ) -> asyncio.Task[t.Any]: - task_name = f"DisCatCore Event:{self.name}" - if index: - task_name += f" Index:{index}" - task_name = task_name.rstrip() + def __init_subclass__(cls) -> None: + super().__init_subclass__() - wrapped = self._run(coro, *args, **kwargs) - return asyncio.create_task(wrapped, name=task_name) + cls.__dispatches = tuple(base for base in cls.__mro__ if issubclass(base, Event)) - def dispatch(self, *args: t.Any, **kwargs: t.Any) -> None: - """Runs all event callbacks with arguments. + @classproperty + @classmethod + def dispatches(cls): + return cls.__dispatches - Args: - *args (t.Any): Arguments to pass into the event callbacks. - **kwargs (t.Any): Keyword arguments to pass into the event callbacks. - """ - for i, callback in enumerate(self.callbacks): - metadata = self.metadata.get(callback, _EventCallbackMetadata()) - _log.debug("Running event callback under event %s with index %s", self.name, i) - self._schedule_task(callback, i, *args, **kwargs) +@attr.define(kw_only=True) +class ExceptionEvent(Event, t.Generic[EventT]): + """An event that is dispatched whenever a dispatched event raises an exception.""" - if metadata.one_shot: - _log.debug("Removing event callback under event %s with index %s", self.name, i) - self.remove_callback(i) + exception: BaseException + failed_event: EventT + failed_listener: ListenerCallback[EventT] diff --git a/discatcore/utils/functools.py b/discatcore/utils/functools.py new file mode 100644 index 0000000..07a9829 --- /dev/null +++ b/discatcore/utils/functools.py @@ -0,0 +1,26 @@ +# SPDX-License-Identifier: MIT + +import typing as t + +from typing_extensions import Self + +__all__ = ("classproperty",) + +T = t.TypeVar("T") +ClsT = t.TypeVar("ClsT") + + +class classproperty(t.Generic[ClsT, T]): + def __init__(self, fget: t.Callable[[ClsT], T], /) -> None: + self.fget: "classmethod[ClsT, ..., T]" + self.getter(fget) + + def getter(self, fget: t.Callable[[ClsT], T], /) -> Self: + if not isinstance(fget, classmethod): + raise ValueError(f"Callable {fget.__name__} is not a classmethod!") + + self.fget = fget + return self + + def __get__(self, obj: t.Optional[t.Any], type: t.Optional[ClsT]) -> T: + return self.fget.__func__(type) diff --git a/discatcore/utils/json.py b/discatcore/utils/json.py index 0235e67..5ca5dfc 100644 --- a/discatcore/utils/json.py +++ b/discatcore/utils/json.py @@ -10,7 +10,12 @@ except ImportError: import json -__all__ = ("dumps", "loads") +__all__ = ("JSONObject", "dumps", "loads") + + +JSONObject = t.Union[ + str, int, float, bool, None, t.Sequence["JSONObject"], t.Mapping[str, "JSONObject"] +] def dumps(obj: t.Any) -> str: diff --git a/pyproject.toml b/pyproject.toml index f4a3169..51eea46 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -47,6 +47,7 @@ classifiers = [ [tool.poetry.dependencies] python = ">=3.9" aiohttp = ">=3.6.0,<3.9.0" +attrs = "^22.1" discord-typings = {git = "https://github.com/Bluenix2/discord-typings.git"} [tool.poetry.urls]