-
-
Notifications
You must be signed in to change notification settings - Fork 119
/
up_helper.py
231 lines (215 loc) · 8.53 KB
/
up_helper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
# Copyright (c) 2023 EDM115
import os
import pathlib
import re
import shutil
import subprocess
from asyncio import sleep
from time import time
from pyrogram.errors import FloodWait
from config import Config
from unzipper import LOGGER
from unzipper import unzipperbot
from unzipper.helpers.database import get_upload_mode
from unzipper.helpers.unzip_help import extentions_list
from unzipper.helpers.unzip_help import progress_for_pyrogram
from unzipper.modules.bot_data import Messages
from unzipper.modules.ext_script.custom_thumbnail import thumb_exists
# To get video duration and thumbnail
async def run_shell_cmds(command):
run = subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True
)
shell_output = run.stdout.read()[:-1].decode("utf-8")
return shell_output
# Get file size
async def get_size(doc_f):
try:
fsize = os.stat(doc_f).st_size
return fsize
except:
return 0
# Send file to a user
async def send_file(unzip_bot, c_id, doc_f, query, full_path, log_msg, split):
try:
ul_mode = await get_upload_mode(c_id)
if split:
fname = doc_f
fext = doc_f.split("/")[-1].split(".")[-1].casefold()
else:
fname = os.path.basename(doc_f)
fext = ((pathlib.Path(os.path.abspath(doc_f)).suffix).casefold().replace(".", ""))
thumbornot = await thumb_exists(c_id)
upmsg = await unzip_bot.send_message(c_id, "`Processing… ⏳`\n\nUploading videos as media without thumbnail set will fail. Sorry, will be fixed later 😥")
if ul_mode == "media" and fext in extentions_list["audio"]:
if thumbornot:
thumb_image = Config.THUMB_LOCATION + "/" + str(c_id) + ".jpg"
await unzip_bot.send_audio(
chat_id=c_id,
audio=doc_f,
caption=Messages.EXT_CAPTION.format(fname),
thumb=thumb_image,
progress=progress_for_pyrogram,
progress_args=(
f"**Trying to upload {fname}… Please wait** \n",
upmsg,
time(),
),
)
else:
await unzip_bot.send_audio(
chat_id=c_id,
audio=doc_f,
caption=Messages.EXT_CAPTION.format(fname),
progress=progress_for_pyrogram,
progress_args=(
f"**Trying to upload {fname}… Please wait** \n",
upmsg,
time(),
),
)
elif ul_mode == "media" and fext in extentions_list["photo"]:
# impossible to use a thumb here :(
await unzip_bot.send_photo(
chat_id=c_id,
photo=doc_f,
caption=Messages.EXT_CAPTION.format(fname),
progress=progress_for_pyrogram,
progress_args=(
f"**Trying to upload {fname}… Please wait** \n",
upmsg,
time(),
),
)
elif ul_mode == "media" and fext in extentions_list["video"]:
vid_duration = await run_shell_cmds(
f"ffprobe -v error -show_entries format=duration -of default=noprint_wrappers=1:nokey=1 {doc_f}"
)
LOGGER.warning(vid_duration)
if thumbornot:
thumb_image = Config.THUMB_LOCATION + "/" + str(c_id) + ".jpg"
await unzip_bot.send_video(
chat_id=c_id,
video=doc_f,
caption=Messages.EXT_CAPTION.format(fname),
duration=int(vid_duration) if vid_duration.isnumeric() else 0,
thumb=thumb_image,
progress=progress_for_pyrogram,
progress_args=(
f"**Trying to upload {fname}… Please wait** \n",
upmsg,
time(),
),
)
else:
thmb_pth = (
f"{Config.THUMB_LOCATION}/thumbnail_{os.path.basename(doc_f)}.jpg"
)
if os.path.exists(thmb_pth):
os.remove(thmb_pth)
try:
thumb = await run_shell_cmds(
f"ffmpeg -ss 00:00:01.00 -i {doc_f} -vf 'scale=320:320:force_original_aspect_ratio=decrease' -vframes 1 {thmb_pth}"
)
LOGGER.warning(thumb)
except Exception as e:
LOGGER.warning(e)
thumb = Config.BOT_THUMB
await unzip_bot.send_video(
chat_id=c_id,
video=doc_f,
caption=Messages.EXT_CAPTION.format(fname),
duration=int(vid_duration) if vid_duration.isnumeric() else 0,
thumb=str(thumb),
progress=progress_for_pyrogram,
progress_args=(
f"**Trying to upload {fname}… Please wait** \n",
upmsg,
time(),
),
)
os.remove(thmb_pth)
else:
if thumbornot:
thumb_image = Config.THUMB_LOCATION + "/" + str(c_id) + ".jpg"
await unzip_bot.send_document(
chat_id=c_id,
document=doc_f,
thumb=thumb_image,
caption=Messages.EXT_CAPTION.format(fname),
progress=progress_for_pyrogram,
progress_args=(
f"**Trying to upload {fname}… Please wait** \n",
upmsg,
time(),
),
)
else:
await unzip_bot.send_document(
chat_id=c_id,
document=doc_f,
caption=Messages.EXT_CAPTION.format(fname),
progress=progress_for_pyrogram,
progress_args=(
f"**Trying to upload {fname}… Please wait** \n",
upmsg,
time(),
),
)
await upmsg.delete()
os.remove(doc_f)
except FloodWait as f:
await sleep(f.value)
return await send_file(unzip_bot, c_id, doc_f, query, full_path, log_msg, split)
except FileNotFoundError:
return await query.answer("Sorry ! I can't find that file 💀", show_alert=True)
except BaseException as e:
LOGGER.warning(e)
shutil.rmtree(full_path)
async def send_url_logs(unzip_bot, c_id, doc_f, source):
try:
u_file_size = os.stat(doc_f).st_size
if Config.TG_MAX_SIZE < int(u_file_size):
return await unzip_bot.send_message(
chat_id=c_id, text="URL file is too large to send in telegram 😥"
)
fname = os.path.basename(doc_f)
await unzip_bot.send_document(
chat_id=c_id,
document=doc_f,
caption=Messages.LOG_CAPTION.format(fname, source),
)
except FloodWait as f:
await sleep(f.value)
return send_url_logs(unzip_bot, c_id, doc_f, source)
except FileNotFoundError:
await unzip_bot.send_message(
chat_id=Config.LOGS_CHANNEL,
text="Archive has gone from servers before uploading 😥",
)
except BaseException:
pass
async def merge_splitted_archives(user_id, path):
cmd = f"cd {path} && cat * > MERGED_{user_id}.zip"
await run_shell_cmds(cmd)
# Function to remove basic markdown characters from a string
async def rm_mark_chars(text: str):
return re.sub("[*`_]", "", text)
# Function to answer queries
async def answer_query(
query, message_text: str, answer_only: bool = False, unzip_client=None, buttons=None
):
try:
if answer_only:
await query.answer(await rm_mark_chars(message_text), show_alert=True)
else:
await query.message.edit(message_text, reply_markup=buttons)
except:
if unzip_client:
await unzip_client.send_message(
chat_id=query.message.chat.id, text=message_text, reply_markup=buttons
)
else:
await unzipperbot.send_message(
chat_id=query.message.chat.id, text=message_text, reply_markup=buttons
)