-
Notifications
You must be signed in to change notification settings - Fork 102
/
eval_exec.py
152 lines (124 loc) · 3.9 KB
/
eval_exec.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import asyncio
import io
import os
import sys
import traceback
from pyrogram import filters
from pyrogram.types import Message
from userbot import UserBot
from userbot.database import database
from userbot.helpers.PyroHelpers import ReplyCheck
@UserBot.on_message(
filters.command("eval", ".")
& filters.me
& ~filters.forwarded
& ~filters.via_bot
)
async def eval_func_init(bot, message):
await evaluation_func(bot, message)
@UserBot.on_edited_message(
filters.command("eval", ".")
& filters.me
& ~filters.forwarded
& ~filters.via_bot
)
async def eval_func_edited(bot, message):
await evaluation_func(bot, message)
async def evaluation_func(bot: UserBot, message: Message):
status_message = await message.reply_text("Processing ...")
cmd = message.text.split(" ", maxsplit=1)[1]
reply_to_id = message.id
if message.reply_to_message:
reply_to_id = message.reply_to_message.id
old_stderr = sys.stderr
old_stdout = sys.stdout
redirected_output = sys.stdout = io.StringIO()
redirected_error = sys.stderr = io.StringIO()
stdout, stderr, exc = None, None, None
try:
reply = message.reply_to_message or None
await aexec(cmd, bot, message, reply, database)
except Exception:
exc = traceback.format_exc()
stdout = redirected_output.getvalue()
stderr = redirected_error.getvalue()
sys.stdout = old_stdout
sys.stderr = old_stderr
if exc:
evaluation = exc
elif stderr:
evaluation = stderr
elif stdout:
evaluation = stdout
else:
evaluation = "Success"
final_output = "<b>Expression</b>:\n<code>{}</code>\n\n<b>Result</b>:\n<code>{}</code> \n".format(
cmd, evaluation.strip()
)
if len(final_output) > 4096:
with open("eval.txt", "w", encoding="utf8") as out_file:
out_file.write(str(final_output))
await message.reply_document(
"eval.txt",
caption=cmd,
disable_notification=True,
reply_to_message_id=ReplyCheck(message),
)
os.remove("eval.txt")
await status_message.delete()
else:
await status_message.edit(final_output)
async def aexec(code, b, m, r, d):
sys.tracebacklimit = 0
exec(
"async def __aexec(b, m, r, d): "
+ "".join(f"\n {line}" for line in code.split("\n"))
)
return await locals()["__aexec"](b, m, r, d)
@UserBot.on_edited_message(
filters.command("exec", ".")
& filters.me
& ~filters.forwarded
& ~filters.via_bot
)
async def execution_func_edited(bot, message):
await execution(bot, message)
@UserBot.on_message(
filters.command("exec", ".")
& filters.me
& ~filters.forwarded
& ~filters.via_bot
)
async def execution_func(bot, message):
await execution(bot, message)
async def execution(bot: UserBot, message: Message):
cmd = message.text.split(" ", maxsplit=1)[1]
reply_to_id = message.id
if message.reply_to_message:
reply_to_id = message.reply_to_message.id
process = await asyncio.create_subprocess_shell(
cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
e = stderr.decode()
if not e:
e = "No errors"
o = stdout.decode()
if not o:
o = "No output"
OUTPUT = ""
OUTPUT += f"<b>Command:</b>\n<code>{cmd}</code>\n\n"
OUTPUT += f"<b>Output</b>: \n<code>{o}</code>\n"
OUTPUT += f"<b>Errors</b>: \n<code>{e}</code>"
if len(OUTPUT) > 4096:
with open("exec.text", "w+", encoding="utf8") as out_file:
out_file.write(str(OUTPUT))
await message.reply_document(
document="exec.text",
caption=cmd,
disable_notification=True,
reply_to_message_id=ReplyCheck(message),
)
os.remove("exec.text")
else:
await message.reply_text(OUTPUT)