Skip to content

Commit

Permalink
Added command invocation logic which handles the process of invoking …
Browse files Browse the repository at this point in the history
…a command. The process can be broken down and is similar to the following scenario:

Recieve a Message, Trigger Handle Commands, Create Context, Check for a valid prefix, Strip Prefix and Parse message content into a dict view, Check for a valid command from the dict view, run all checks, run cooldowns, parse arguments, rune before invoke hooks, run the command callback, rune after command hooks. If anything fails it is propagated into event_command_error.

This push is the first alpha push of command invocation and needs deep testing.
  • Loading branch information
EvieePy committed Jun 21, 2019
1 parent 00d73d2 commit b1c723a
Show file tree
Hide file tree
Showing 2 changed files with 276 additions and 4 deletions.
276 changes: 274 additions & 2 deletions twitchio/ext/commands/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,17 @@
"""

import asyncio
import inspect
import itertools
import sys
import traceback
from typing import Union
from twitchio.channel import Channel
from twitchio.client import Client
from twitchio.websocket import WSConnection
from .core import Command, Context
from .errors import *
from .stringparser import StringParser


class Bot(Client):
Expand All @@ -41,14 +46,247 @@ def __init__(self, irc_token: str, *, nick: str, prefix: Union[str, list, tuple]
self._connection = WSConnection(bot=self, token=irc_token, nick=nick.lower(), loop=self.loop,
initial_channels=initial_channels)

self._preix = prefix
self._prefix = prefix
self._nick = nick.lower()

self._commands = {}
self._command_aliases = {}
self._events = {}
self._cogs = {}

def add_command(self):
self._checks = []

def __init__commands__(self):
commands = inspect.getmembers(self)

for _, obj in commands:
if not isinstance(obj, Command):
continue

obj._instance = self

try:
self.add_command(obj)
except TwitchCommandError:
traceback.print_exc()
continue

async def __get_prefixes__(self, message):
ret = self._prefix

if callable(self._prefix):
if inspect.iscoroutinefunction(self._prefix):
ret = await self._prefix(self, message)
else:
ret = self._prefix(self, message)

if not isinstance(ret, (list, tuple, set, str)):
raise TypeError(f'Prefix must be of either class <list, tuple, set, str> not <{type(ret)}>')

return ret

async def get_prefix(self, message):
# TODO Docs
prefixes = await self.__get_prefixes__(message)

if not isinstance(prefixes, str):
for prefix in prefixes:
if message.content.startswith(prefix):
return prefix

return prefixes

def add_command(self, command: Command):
# TODO Docs
if not isinstance(command, Command):
raise TypeError('Commands passed must be a subclass of Command.')
elif command.name in self.commands:
raise TwitchCommandError(f'Failed to load command <{command.name}>, a command with that name already exists.')
elif not inspect.iscoroutinefunction(command._callback):
raise TwitchCommandError(f'Failed to load command <{command.name}>. Commands must be coroutines.')

self.commands[command.name] = command

if not command.aliases:
return

for alias in command.aliases:
if alias in self.commands:
del self.commands[command.name]
raise TwitchCommandError(
f'Failed to load command <{command.name}>, a command with that name/alias already exists.')

self._command_aliases[alias] = command.name

async def get_context(self, message, *, cls=None):
# TODO Docs
if not cls:
cls = Context

prefix = await self.get_prefix(message)

context = cls(message=message, prefix=prefix)
return context

async def handle_commands(self, message):
context = await self.get_context(message)

await self.invoke(context)

async def invoke(self, context):
# TODO Docs
if not context.prefix:
return

content = context.message.content[len(context.prefix)::].lstrip(' ') # Strip prefix and remainder whitespace
parsed = StringParser().process_string(content) # Return the string as a dict view

try:
command = parsed.pop(0)
except KeyError:
return # No command was found

try:
command = self._command_aliases[command]
except KeyError:
pass

if command in self.commands:
command = self.commands[command]
else:
context.command = None
return await self.event_command_error(context, CommandNotFound(f'Command <{command}> was not found.'))

context.command = command
instance = command._instance

check_result = await self.handle_checks(context)

if check_result is not True:
return await self.event_command_error(context, check_result)

limited = self._run_cooldowns(context)

if limited:
return await self.event_command_error(context, limited[0])

try:
context.args, context.kwargs = command.parse_args(instance, parsed)

await self.global_before_invoke(context)

if context.command._before_invoke:
await context.command._before_invoke(instance, context)

if instance:
await context.command._callback(instance, context, *context.args, **context.kwargs)
else:
await context.command._callback(context, *context.args, **context.kwargs)

except Exception as e:
if context.command.event_error:
await content.command.on_error(instance, context, e)

await self.event_command_error(context, e)

try:
# Invoke our after command hooks...
if context.command._after_invoke:
await context.command._after_invoke(context)
await self.global_after_invoke(context)
except Exception as e:
await self.event_command_error(context, e)

def _run_cooldowns(self, context):
try:
buckets = context.command._cooldowns[0].get_buckets(context)
except IndexError:
return None

expired = []

try:
for bucket in buckets:
bucket.update_bucket(context)
except CommandOnCooldown as e:
expired.append(e)

return expired

async def handle_checks(self, context):
# TODO Docs
command = context.command

if not command.no_global_checks:
checks = [predicate for predicate in itertools.chain(self._checks, command._checks)]
else:
checks = command._checks

if not checks:
return True

try:
for predicate in checks:
if inspect.isawaitable(predicate):
result = await predicate(context)
else:
result = predicate(context)

if result is False:
raise CheckFailure(f'The check <{predicate}> for command <{command.name}> failed.')

return True
except Exception as e:
return e

async def global_before_invoke(self, ctx):
"""|coro|
Method which is called before any command is about to be invoked.
This method is useful for setting up things before command invocation. E.g Database connections or
retrieving tokens for use in the command.
Parameters
------------
ctx:
The context used for command invocation.
Examples
----------
.. code:: py
async def global_before_invoke(self, ctx):
# Make a database query for example to retrieve a specific token.
token = db_query()
ctx.token = token
async def my_command(self, ctx):
data = await self.create_clip(ctx.token, ...)
Note
------
The global_before_invoke is called before any other command specific hooks.
"""
pass

async def global_after_invoke(self, ctx):
"""|coro|
Method which is called after any command is invoked regardless if it failed or not.
This method is useful for cleaning up things after command invocation. E.g Database connections.
Parameters
------------
ctx:
The context used for command invocation.
Note
------
The global_after_invoke is called only after the command successfully invokes.
"""
pass

def add_event(self):
Expand Down Expand Up @@ -274,6 +512,40 @@ async def event_raw_data(data):
"""
pass

def command(self, *, name: str=None, aliases: Union[list, tuple]=None, cls=Command, no_global_checks=False):
"""Decorator which registers a command with the bot.
Commands must be a coroutine.
Parameters
------------
name: str [Optional]
The name of the command. By default if this is not supplied, the function name will be used.
aliases: Optional[Union[list, tuple]]
The command aliases. This must be a list or tuple.
cls: class [Optional]
The custom command class to override the default class. This must be similar to :class:`.Command`.
no_global_checks : Optional[bool]
Whether or not the command should abide by global checks. Defaults to False, which checks global checks.
Raises
--------
TypeError
cls is not type class.
"""

if not inspect.isclass(cls):
raise TypeError(f'cls must be of type <class> not <{type(cls)}>')

def decorator(func):
cmd_name = name or func.__name__

cmd = cls(name=cmd_name, func=func, aliases=aliases, instance=None)
self.add_command(cmd)

return cmd
return decorator

def run(self):
try:
self.loop.create_task(self._connection._connect())
Expand Down
4 changes: 2 additions & 2 deletions twitchio/ext/commands/cooldowns.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,13 @@ def update_bucket(self, ctx):
now = time.time()

self._tokens = self.get_tokens(now)

if self._tokens == 0:
self._window = now

if self._tokens == self._rate:
retry = self._per - (now - self._window)
return CommandOnCooldown(command=ctx.command, retry_after=retry)
raise CommandOnCooldown(command=ctx.command, retry_after=retry)

self._tokens += 1

Expand Down

0 comments on commit b1c723a

Please sign in to comment.