-
-
Notifications
You must be signed in to change notification settings - Fork 172
/
compilation.py
140 lines (100 loc) · 3.92 KB
/
compilation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# -*- coding: utf-8 -*-
"""
jishaku.repl.compilation
~~~~~~~~~~~~~~~~~~~~~~~~
Constants, functions and classes related to classifying, compiling and executing Python code.
:copyright: (c) 2020 Devon (Gorialis) R
:license: MIT, see LICENSE for more details.
"""
import ast
import asyncio
import inspect
import import_expression
from jishaku.functools import AsyncSender
from jishaku.repl.scope import Scope
from jishaku.repl.walkers import KeywordTransformer
CORO_CODE = """
async def _repl_coroutine({{0}}):
import asyncio
from importlib import import_module as {0}
import aiohttp
import discord
from discord.ext import commands
try:
import jishaku
except ImportError:
jishaku = None # keep working even if in panic recovery mode
try:
pass
finally:
_async_executor.scope.globals.update(locals())
""".format(import_expression.constants.IMPORTER)
def wrap_code(code: str, args: str = '') -> ast.Module:
"""
Compiles Python code into an async function or generator,
and automatically adds return if the function body is a single evaluation.
Also adds inline import expression support.
"""
user_code = import_expression.parse(code, mode='exec')
mod = import_expression.parse(CORO_CODE.format(args), mode='exec')
definition = mod.body[-1] # async def ...:
assert isinstance(definition, ast.AsyncFunctionDef)
try_block = definition.body[-1] # try:
assert isinstance(try_block, ast.Try)
try_block.body.extend(user_code.body)
ast.fix_missing_locations(mod)
KeywordTransformer().generic_visit(try_block)
last_expr = try_block.body[-1]
# if the last part isn't an expression, ignore it
if not isinstance(last_expr, ast.Expr):
return mod
# if the last expression is not a yield
if not isinstance(last_expr.value, ast.Yield):
# copy the value of the expression into a yield
yield_stmt = ast.Yield(last_expr.value)
ast.copy_location(yield_stmt, last_expr)
# place the yield into its own expression
yield_expr = ast.Expr(yield_stmt)
ast.copy_location(yield_expr, last_expr)
# place the yield where the original expression was
try_block.body[-1] = yield_expr
return mod
class AsyncCodeExecutor: # pylint: disable=too-few-public-methods
"""
Executes/evaluates Python code inside of an async function or generator.
Example
-------
.. code:: python3
total = 0
# prints 1, 2 and 3
async for x in AsyncCodeExecutor('yield 1; yield 2; yield 3'):
total += x
print(x)
# prints 6
print(total)
"""
__slots__ = ('args', 'arg_names', 'code', 'loop', 'scope')
def __init__(self, code: str, scope: Scope = None, arg_dict: dict = None, loop: asyncio.BaseEventLoop = None):
self.args = [self]
self.arg_names = ['_async_executor']
if arg_dict:
for key, value in arg_dict.items():
self.arg_names.append(key)
self.args.append(value)
self.code = wrap_code(code, args=', '.join(self.arg_names))
self.scope = scope or Scope()
self.loop = loop or asyncio.get_event_loop()
def __aiter__(self):
exec(compile(self.code, '<repl>', 'exec'), self.scope.globals, self.scope.locals) # pylint: disable=exec-used
func_def = self.scope.locals.get('_repl_coroutine') or self.scope.globals['_repl_coroutine']
return self.traverse(func_def)
async def traverse(self, func):
"""
Traverses an async function or generator, yielding each result.
This function is private. The class should be used as an iterator instead of using this method.
"""
if inspect.isasyncgenfunction(func):
async for send, result in AsyncSender(func(*self.args)):
send((yield result))
else:
yield await func(*self.args)