-
Notifications
You must be signed in to change notification settings - Fork 1
/
evaluators.py
145 lines (134 loc) · 4.76 KB
/
evaluators.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import asyncio
import io
import time
import os
import sys
import traceback
from . import *
lg_id = Config.LOGGER_ID
@bot.on(d3vil_cmd(pattern="exec(?: |$|\n)(.*)", command="exec"))
@bot.on(sudo_cmd(pattern="exec(?: |$|\n)(.*)", command="exec", allow_sudo=True))
async def _(event):
if event.fwd_from:
return
cmd = "".join(event.text.split(maxsplit=1)[1:])
if not cmd:
return await eod(event, "`What should i execute?..`")
d3vilevent = await eor(event, "`Executing.....`")
process = await asyncio.create_subprocess_sd3vil(
cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
result = str(stdout.decode().strip()) + str(stderr.decode().strip())
d3viluser = await event.client.get_me()
if d3viluser.username:
curruser = d3viluser.username
else:
curruser = "d3vilbot"
uid = os.geteuid()
if uid == 0:
cresult = f"`{curruser}:~#` `{cmd}`\n`{result}`"
else:
cresult = f"`{curruser}:~$` `{cmd}`\n`{result}`"
await eor(event, "**Terminal Command Was Executed Successfully. Check LOGGER for Output.**")
await event.client.send_message(
lg_id,
f"#EXEC \n\nTerminal command was executed sucessfully.\n\n**Command :** `{cmd}`\n**Result :** \n{cresult}",
)
@bot.on(d3vil_cmd(pattern="eval(?: |$|\n)(.*)", command="eval"))
@bot.on(sudo_cmd(pattern="eval(?: |$|\n)(.*)", command="eval", allow_sudo=True))
async def _(event):
if event.fwd_from:
return
cmd = "".join(event.text.split(maxsplit=1)[1:])
if not cmd:
return await eod(event, "`What should i run ?..`")
d3vilevent = await eor(event, "`Running ...`")
old_stderr = sys.stderr
old_stdout = sys.stdout
redirected_output = sys.stdout = io.StringIO()
redirected_error = sys.stderr = io.StringIO()
stdout, stderr, exc = None, None, None
try:
await aexec(cmd, event)
except Exception:
exc = traceback.format_exc()
stdout = redirected_output.getvalue()
stderr = redirected_error.getvalue()
sys.stdout = old_stdout
sys.stderr = old_stderr
evaluation = ""
if exc:
evaluation = exc
elif stderr:
evaluation = stderr
elif stdout:
evaluation = stdout
else:
evaluation = "Success"
final_output = f"• Eval : \n`{cmd}` \n\n• Result : \n`{evaluation}` \n"
await eor(d3vilevent, "**Eval Command Executed. Check out LOGGER for result.**")
await event.client.send_message(
lg_id,
f"#EVAL \n\nEval command was executed sucessfully. \n\n{final_output}",
)
async def aexec(code, smessatatus):
message = event = smessatatus
p = lambda _x: print(yaml_format(_x))
reply = await event.get_reply_message()
exec(
f"async def __aexec(message, event , reply, client, p, chat): "
+ "".join(f"\n {l}" for l in code.split("\n"))
)
return await locals()["__aexec"](
message, event, reply, message.client, p, message.chat_id
)
@bot.on(d3vil_cmd(pattern="bash ?(.*)", outgoing=True))
@bot.on(sudo_cmd(pattern="bash ?(.*)", allow_sudo=True))
async def _(event):
if event.fwd_from:
return
PROCESS_RUN_TIME = 100
cmd = event.pattern_match.group(1)
reply_to_id = event.message.id
if event.reply_to_msg_id:
reply_to_id = event.reply_to_msg_id
time.time() + PROCESS_RUN_TIME
process = await asyncio.create_subprocess_sd3vil(
cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
e = stderr.decode()
if not e:
e = "No Error"
o = stdout.decode()
if not o:
o = "**Tip**: \n`If you want to see the results of your code, I suggest printing them to stdout.`"
else:
_o = o.split("\n")
o = "`\n".join(_o)
OUTPUT = f"**QUERY:**\n__Command:__\n`{cmd}` \n__PID:__\n`{process.pid}`\n\n**stderr:** \n`{e}`\n**Output:**\n{o}"
if len(OUTPUT) > 4095:
with io.BytesIO(str.encode(OUTPUT)) as out_file:
out_file.name = "exec.text"
await bot.send_file(
event.chat_id,
out_file,
force_document=True,
allow_cache=False,
caption=cmd,
reply_to=reply_to_id,
)
await event.delete()
await eor(event, "**Check out logger for result..**")
await event.client.send_message(
lg_id,
f"#BASH \n\n{output}"
)
CmdHelp("evaluators").add_command(
"eval", "<expr>", "Execute python script"
).add_command(
"exec", "<command>", "Execute a Terminal command on D3vilBot server and shows details"
).add_command(
"bash", "<query>", "Bash your codes on linux and gives the output in current chat"
).add()