Skip to content

Commit

Permalink
Add command and cooldown decorators. Added a basic version of Command…
Browse files Browse the repository at this point in the history
… for testing.
  • Loading branch information
EvieePy committed Jun 21, 2019
1 parent b1c723a commit a1956cf
Show file tree
Hide file tree
Showing 2 changed files with 224 additions and 2 deletions.
5 changes: 4 additions & 1 deletion twitchio/ext/commands/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
from .bot import Bot
from .bot import Bot
from .core import *
from .errors import *
from .cooldowns import *
221 changes: 220 additions & 1 deletion twitchio/ext/commands/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,223 @@
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.
"""
"""

import inspect
from typing import Union, Optional
from twitchio.abcs import Messageable
from .cooldowns import *
from .errors import *


__all__ = ('Command', 'command', 'Context', 'cooldown')


class Event:
pass


class Command:

def __init__(self, name: str, func, **attrs):
if not inspect.iscoroutinefunction(func):
raise TypeError('Command callback must be a coroutine.')

self._callback = func
self._checks = []
self._cooldowns = []
self._name = name

try:
self._checks.extend(func.__checks__)
except AttributeError:
pass

try:
self._cooldowns.extend(func.__cooldowns__)
except AttributeError:
pass

self.aliases = attrs.get('aliases', None)
sig = inspect.signature(func)
self.params = sig.parameters.copy()

self.event_error = None
self._before_invoke = None
self._after_invoke = None
self.no_global_checks = attrs.get('no_global_checks', False)

self._instance = None

for key, value in self.params.items():
if isinstance(value.annotation, str):
self.params[key] = value.replace(annotation=eval(value.annotation, func.__globals__))

@property
def name(self):
return self._name

def _convert_types(self, param, parsed):

converter = param.annotation
if converter is param.empty:
if param.default in (param.empty, None):
converter = str
else:
converter = type(param.default)

try:
argument = converter(parsed)
except Exception:
raise BadArgument(f'Invalid argument parsed at `{param.name}` in command `{self.name}`.'
f' Expected type {converter} got {type(parsed)}.')

return argument

def parse_args(self, instance, parsed):
iterator = iter(self.params.items())
index = 0
args = []
kwargs = {}

try:
next(iterator)
if instance:
next(iterator)
except StopIteration:
raise TwitchCommandError(f'self or ctx is a required argument which is missing.')

for _, param in iterator:
index += 1
if param.kind == param.POSITIONAL_OR_KEYWORD:
try:
argument = parsed.pop(index)
except (KeyError, IndexError):
if param.default is param.empty:
raise MissingRequiredArgument(param)
args.append(param.default)
else:
argument = self._convert_types(param, argument)
args.append(argument)

elif param.kind == param.KEYWORD_ONLY:
rest = ' '.join(parsed.values())
if rest.startswith(' '):
rest = rest.lstrip(' ')

if rest:
rest = self._convert_types(param, rest)
elif param.default is param.empty:
raise MissingRequiredArgument(param)
else:
rest = param.default

kwargs[param.name] = rest
parsed.clear()
break
elif param.VAR_POSITIONAL:
args.extend(parsed.values())
break

if parsed:
pass # TODO Raise Too Many Arguments.

return args, kwargs


class Context(Messageable):

__messageable_channel__ = True

def __init__(self, message, **attrs):
self.message = message
self.channel = message.channel
self.author = message.author

self.prefix = attrs.get('prefix')

self.command = attrs.get('command')
self.args = attrs.get('args')
self.kwargs = attrs.get('kwargs')

self._ws = self.channel._ws

def _fetch_channel(self):
return self.channel # Abstract method

def _fetch_websocket(self):
return self._ws # Abstract method

def _bot_is_mod(self):
cache = self._ws._cache[self.channel._name]
for user in cache:
if user.name == self.channel._bot.nick:
try:
mod = user.is_mod
except AttributeError:
return False

return mod

@property
def chatters(self) -> Optional[set]:
"""The channels current chatters."""
try:
users = self._ws._cache[self.channel._name]
except KeyError:
return None

return users

@property
def users(self) -> Optional[set]: # Alias to chatters
"""Alias to chatters."""
return self.chatters

def get_user(self, name: str):
"""Retrieve a user from the channels user cache.
Parameters
-----------
name: str
The user's name to try and retrieve.
Returns
--------
Union[:class:`twitchio.user.User`, :class:`twitchio.user.PartialUser`]
Could be a :class:`twitchio.user.PartialUser` depending on how the user joined the channel.
Returns None if no user was found.
"""
name = name.lower()

cache = self._ws._cache[self.channel._name]
for user in cache:
if user.name == name:
return user

return None


def command(*, name: str=None, aliases: Union[list, tuple]=None, cls=None, no_global_checks=False):
if cls and not inspect.isclass(cls):
raise TypeError(f'cls must be of type <class> not <{type(cls)}>')

cls = cls or Command

def decorator(func):
fname = name or func.__name__
cmd = cls(name=fname, func=func, aliases=aliases, no_global_checks=no_global_checks)

return cmd
return decorator


def cooldown(rate, per, bucket=Bucket.default):
def decorator(func):
if isinstance(func, Command):
func._cooldowns.append(Cooldown(rate, per, bucket))
else:
func.__cooldowns__ = [Cooldown(rate, per, bucket)]
return func
return decorator

0 comments on commit a1956cf

Please sign in to comment.