-
Notifications
You must be signed in to change notification settings - Fork 0
/
run_command.py
177 lines (142 loc) · 5.92 KB
/
run_command.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
import subprocess
import special_commands
import requests
import json
from config import bot_token, users_directories, main_path, admins_chat_id_list, chatid_users, user_running_bot, reply_to_messages
import re
import os
import pickle
import re
from variables import pidlist
# this function clears ansi escape codes from text
def escape_ansi(line):
# find all escape codes with regex
ansi_escape =re.compile(r'(\x9B|\x1B\[)[0-?]*[ -\/]*[@-~]')
# clear them all
return ansi_escape.sub('', line)
# process commands
def run_command(command, chat_id, message_id):
# text = updates.json()['result'][0]['message']['text']
message = {}
# change shell directory to users directory
os.chdir(users_directories[chat_id])
# try processing message and sending output
try:
# pick up first word of the message
first_keyword = re.split(r' |\t|\n', command, 1)[0]
# if first word of message is one of the special commands:
if first_keyword in special_commands.special_commands:
# remove first word from the command text
try:
cmd = re.split(r' |\t|\n', command, 1)[1]
# if command exists of only one chunk and there is no arguments
except IndexError:
cmd = ''
# run special function for the command and get output
out = special_commands.special_commands[first_keyword](cmd, chat_id)
# if command is not one of the specials
else:
if user_running_bot == 'root':
username = chatid_users[chat_id]
# run the command using subprocess and get output
proc = subprocess.Popen(command.split(' '), stdout=subprocess.PIPE)
pid = proc.pid
pidlist.append((pid, command, proc, chat_id))
out = proc.communicate()[0]
pidlist.remove((pid, command, proc, chat_id))
else:
# run the command using subprocess and get output
proc = subprocess.Popen(command.split(' '), stdout=subprocess.PIPE)
pid = proc.pid
pidlist.append((pid, command, proc, chat_id))
out = proc.communicate()[0]
pidlist.remove((pid, command, proc, chat_id))
# remove ansi escape codes from command output
# if out is str
if isinstance(out, str):
# just remove ansi escape characters
printable_out_text = escape_ansi(out)
# if it is not str
else:
# first encode it into str, then remove ansi escape characters
printable_out_text = escape_ansi(str(out, encoding='utf-8'))
if len(printable_out_text) > 0:
text = '```\n' + printable_out_text + '\n```'
else:
text = '*done*'
# make a message to send to telegram
message = {
# chat id should be id of the one who had requested
"chat_id":chat_id,
# text is command output in monospace format
"text": text,
# set parse mode to markdown so that text can be in monospace
"parse_mode":"MarkdownV2",
}
if reply_to_messages == True:
message.update({"reply_to_message_id":message_id,})
# if there was a permission error while running the command
except PermissionError:
# send proper Error message
message = {
"chat_id":chat_id,
"text":'permissionError',
}
if reply_to_messages == True:
message.update({"reply_to_message_id":message_id,})
# if there was a file not found error while running the command
except FileNotFoundError:
# send proper error message
message = {
"chat_id":chat_id,
"text":'FileNotFound',
}
if reply_to_messages == True:
message.update({"reply_to_message_id":message_id,})
# if there was any other error while running the command
except subprocess.CalledProcessError as error_text:
# send the error message got from command to telegram
message = {
"chat_id":chat_id,
"text":f'ERROR: {error_text}',
}
if reply_to_messages == True:
message.update({"reply_to_message_id":message_id,})
# if there was any other Error
except:
# do not make a fuss!
pass
# send the message
if len(message.keys()) > 0:
requests.post(f'https://api.telegram.org/bot{bot_token}/sendMessage', data=message)
def check_document(document, chat_id, message_id):
# make message to get more details from telegram
message = {
"file_id":document['file_id']
}
# ask telegram for file path of document
responce = requests.post(f'https://api.telegram.org/bot{bot_token}/getFile', data=message)
# get file path from responce
file_path = responce.json()['result']['file_path']
# get file name
file_name = document['file_name']
# change directory into robot dir
os.chdir(main_path)
# now we have to store (file_name, file_path, chat_id) somewhere.
# we are going to make a dictionary and pickle.dump it
try:
users_file_paths = pickle.load(open('users_file_paths.pickle', 'rb'))
except FileNotFoundError:
users_file_paths = {id : ('', '') for id in admins_chat_id_list}
users_file_paths[chat_id] = (file_name, file_path)
pickle.dump(users_file_paths, open('users_file_paths.pickle', 'wb'))
del users_file_paths
# text message to send status to user
message = {
"chat_id":chat_id,
"text":'OK! I see your file.\nif you want to store it, send /receive command.',
}
if reply_to_messages == True:
message.update({"reply_to_message_id":message_id,})
# tell user that file is seen
requests.post(f'https://api.telegram.org/bot{bot_token}/sendMessage', data=message)