-
-
Notifications
You must be signed in to change notification settings - Fork 42
/
qbittorrent.py
executable file
·518 lines (493 loc) · 25.5 KB
/
qbittorrent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
"""Qbittorrent Module"""
import os
import sys
from functools import cache
from qbittorrentapi import Client
from qbittorrentapi import LoginFailed
from qbittorrentapi import NotFound404Error
from qbittorrentapi import TrackerStatus
from qbittorrentapi import Version
from modules import util
from modules.util import Failed
from modules.util import TorrentMessages
from modules.util import list_in_text
logger = util.logger
class Qbt:
"""
Qbittorrent Class
"""
SUPPORTED_VERSION = Version.latest_supported_app_version()
MIN_SUPPORTED_VERSION = "v4.3.0"
TORRENT_DICT_COMMANDS = ["recheck", "cross_seed", "rem_unregistered", "tag_tracker_error", "tag_nohardlinks", "share_limits"]
def __init__(self, config, params):
self.config = config
self.host = params["host"]
self.username = params["username"]
self.password = params["password"]
logger.secret(self.username)
logger.secret(self.password)
logger.debug(f"Host: {self.host}")
ex = ""
try:
self.client = Client(
host=self.host,
username=self.username,
password=self.password,
VERIFY_WEBUI_CERTIFICATE=False,
REQUESTS_ARGS={"timeout": (45, 60)},
)
self.client.auth_log_in()
self.current_version = self.client.app.version
logger.debug(f"qBittorrent: {self.current_version}")
logger.debug(f"qBittorrent Web API: {self.client.app.web_api_version}")
logger.debug(f"qbit_manage supported versions: {self.MIN_SUPPORTED_VERSION} - {self.SUPPORTED_VERSION}")
if self.current_version < self.MIN_SUPPORTED_VERSION:
ex = (
f"Qbittorrent Error: qbit_manage is only compatible with {self.MIN_SUPPORTED_VERSION} or higher. "
f"You are currently on {self.current_version}."
+ "\n"
+ f"Please upgrade your qBittorrent version to {self.MIN_SUPPORTED_VERSION} or higher to use qbit_manage."
)
elif not Version.is_app_version_supported(self.current_version):
ex = (
f"Qbittorrent Error: qbit_manage is only compatible with {self.SUPPORTED_VERSION} or lower. "
f"You are currently on {self.current_version}."
+ "\n"
+ f"Please downgrade your qBittorrent version to {self.SUPPORTED_VERSION} to use qbit_manage."
)
if ex:
if self.config.commands["skip_qb_version_check"]:
ex += "\n[BYPASS]: Continuing because qBittorrent version check is bypassed... Please do not ask for support!"
logger.print_line(ex, "WARN")
else:
self.config.notify(ex, "Qbittorrent")
logger.print_line(ex, "CRITICAL")
sys.exit(1)
logger.info("Qbt Connection Successful")
except LoginFailed:
ex = "Qbittorrent Error: Failed to login. Invalid username/password."
self.config.notify(ex, "Qbittorrent")
raise Failed(ex)
except Exception as exc:
self.config.notify(exc, "Qbittorrent")
raise Failed(exc)
logger.separator("Getting Torrent List", space=False, border=False)
self.torrent_list = self.get_torrents({"sort": "added_on"})
self.torrentfiles = {} # a map of torrent files to track cross-seeds
self.global_max_ratio_enabled = self.client.app.preferences.max_ratio_enabled
self.global_max_ratio = self.client.app.preferences.max_ratio
self.global_max_seeding_time_enabled = self.client.app.preferences.max_seeding_time_enabled
self.global_max_seeding_time = self.client.app.preferences.max_seeding_time
if any(config.commands.get(command, False) for command in self.TORRENT_DICT_COMMANDS):
# Get an updated torrent dictionary information of the torrents
self.get_torrent_info()
else:
self.torrentinfo = None
self.torrentissue = None
self.torrentvalid = None
self.get_tags = cache(self.get_tags)
self.get_category = cache(self.get_category)
self.get_category_save_paths = cache(self.get_category_save_paths)
def get_torrent_info(self):
"""
Will create a 2D Dictionary with the torrent name as the key
self.torrentinfo = {'TorrentName1' : {'Category':'TV', 'save_path':'/data/torrents/TV', 'msg':'[]'...},
'TorrentName2' : {'Category':'Movies', 'save_path':'/data/torrents/Movies'}, 'msg':'[]'...}
List of dictionary key definitions
Category = Returns category of the torrent (str)
save_path = Returns the save path of the torrent (str)
msg = Returns a list of torrent messages by name (list of str)
status = Returns the list of status numbers of the torrent by name
(0: Tracker is disabled (used for DHT, PeX, and LSD),
1: Tracker has not been contacted yet,
2: Tracker has been contacted and is working,
3: Tracker is updating,
4: Tracker has been contacted, but it is not working (or doesn't send proper replies)
is_complete = Returns the state of torrent
(Returns True if at least one of the torrent with the State is categorized as Complete.)
"""
self.torrentinfo = {}
self.torrentissue = [] # list of unregistered torrent objects
self.torrentvalid = [] # list of working torrents
t_obj_list = [] # list of all torrent objects
settings = self.config.settings
logger.separator("Checking Settings", space=False, border=False)
if settings["force_auto_tmm"]:
logger.print_line(
"force_auto_tmm set to True. Will force Auto Torrent Management for all torrents.", self.config.loglevel
)
logger.separator("Gathering Torrent Information", space=True, border=True)
for torrent in self.torrent_list:
is_complete = False
msg = None
status = None
working_tracker = None
issue = {"potential": False}
if torrent.auto_tmm is False and settings["force_auto_tmm"] and torrent.category != "" and not self.config.dry_run:
torrent.set_auto_management(True)
try:
torrent_name = torrent.name
torrent_hash = torrent.hash
torrent_is_complete = torrent.state_enum.is_complete
save_path = torrent.save_path
category = torrent.category
torrent_trackers = torrent.trackers
self.add_torrent_files(torrent_hash, torrent.files, save_path)
except Exception as ex:
self.config.notify(ex, "Get Torrent Info", False)
logger.warning(ex)
if torrent_name in self.torrentinfo:
t_obj_list.append(torrent)
msg_list = self.torrentinfo[torrent_name]["msg"]
status_list = self.torrentinfo[torrent_name]["status"]
is_complete = True if self.torrentinfo[torrent_name]["is_complete"] is True else torrent_is_complete
else:
t_obj_list = [torrent]
msg_list = []
status_list = []
is_complete = torrent_is_complete
for trk in torrent_trackers:
if trk.url.split(":")[0] in ["http", "https", "udp", "ws", "wss"]:
status = trk.status
msg = trk.msg.upper()
if TrackerStatus(trk.status) == TrackerStatus.WORKING:
working_tracker = True
break
# Add any potential unregistered torrents to a list
if TrackerStatus(trk.status) == TrackerStatus.NOT_WORKING and not list_in_text(
msg, TorrentMessages.EXCEPTIONS_MSGS
):
issue["potential"] = True
issue["msg"] = msg
issue["status"] = status
if working_tracker:
status = 2
msg = ""
self.torrentvalid.append(torrent)
elif issue["potential"]:
status = issue["status"]
msg = issue["msg"]
self.torrentissue.append(torrent)
if msg is not None:
msg_list.append(msg)
if status is not None:
status_list.append(status)
torrentattr = {
"torrents": t_obj_list,
"Category": category,
"save_path": save_path,
"msg": msg_list,
"status": status_list,
"is_complete": is_complete,
}
self.torrentinfo[torrent_name] = torrentattr
def add_torrent_files(self, torrent_hash, torrent_files, save_path):
"""Process torrent files by adding the hash to the appropriate torrent_files list.
Example structure:
torrent_files = {
"folder1/file1.txt": {"original": torrent_hash1, "cross_seed": ["torrent_hash2", "torrent_hash3"]},
"folder1/file2.txt": {"original": torrent_hash1, "cross_seed": ["torrent_hash2"]},
"folder2/file1.txt": {"original": torrent_hash2, "cross_seed": []},
}
"""
for file in torrent_files:
full_path = os.path.join(save_path, file.name)
if full_path not in self.torrentfiles:
self.torrentfiles[full_path] = {"original": torrent_hash, "cross_seed": []}
else:
self.torrentfiles[full_path]["cross_seed"].append(torrent_hash)
def is_cross_seed(self, torrent):
"""Check if the torrent is a cross seed if it has one or more files that are cross seeded."""
t_hash = torrent.hash
t_name = torrent.name
if torrent.downloaded != 0:
logger.trace(f"Torrent: {t_name} [Hash: {t_hash}] is not a cross seeded torrent. Download is > 0.")
return False
cross_seed = True
for file in torrent.files:
full_path = os.path.join(torrent.save_path, file.name)
if self.torrentfiles[full_path]["original"] == t_hash or t_hash not in self.torrentfiles[full_path]["cross_seed"]:
logger.trace(f"File: [{full_path}] is found in Torrent: {t_name} [Hash: {t_hash}] as the original torrent")
cross_seed = False
break
elif self.torrentfiles[full_path]["original"] is None:
cross_seed = False
break
logger.trace(f"Torrent: {t_name} [Hash: {t_hash}] {'is' if cross_seed else 'is not'} a cross seed torrent.")
return cross_seed
def has_cross_seed(self, torrent):
"""Check if the torrent has a cross seed"""
cross_seed = False
t_hash = torrent.hash
t_name = torrent.name
for file in torrent.files:
full_path = os.path.join(torrent.save_path, file.name)
if len(self.torrentfiles[full_path]["cross_seed"]) > 0:
logger.trace(f"{full_path} has cross seeds: {self.torrentfiles[full_path]['cross_seed']}")
cross_seed = True
break
logger.trace(f"Torrent: {t_name} [Hash: {t_hash}] {'has' if cross_seed else 'has no'} cross seeds.")
return cross_seed
def remove_torrent_files(self, torrent):
"""Update the torrent_files list after a torrent is deleted"""
torrent_hash = torrent.hash
for file in torrent.files:
full_path = os.path.join(torrent.save_path, file.name)
if self.torrentfiles[full_path]["original"] == torrent_hash:
if len(self.torrentfiles[full_path]["cross_seed"]) > 0:
self.torrentfiles[full_path]["original"] = self.torrentfiles[full_path]["cross_seed"].pop(0)
logger.trace(f"Updated {full_path} original to {self.torrentfiles[full_path]['original']}")
else:
self.torrentfiles[full_path]["original"] = None
else:
if torrent_hash in self.torrentfiles[full_path]["cross_seed"]:
self.torrentfiles[full_path]["cross_seed"].remove(torrent_hash)
logger.trace(f"Removed {torrent_hash} from {full_path} cross seeds")
logger.trace(f"{full_path} original: {self.torrentfiles[full_path]['original']}")
logger.trace(f"{full_path} cross seeds: {self.torrentfiles[full_path]['cross_seed']}")
def get_torrents(self, params):
"""Get torrents from qBittorrent"""
return self.client.torrents.info(**params)
def get_tracker_urls(self, trackers):
"""Get tracker urls from torrent"""
return tuple(x.url for x in trackers if x.url.startswith(("http", "udp", "ws")))
def get_tags(self, urls):
"""Get tags from config file based on keyword"""
urls = list(urls)
tracker = {}
tracker["tag"] = None
tracker["cat"] = None
tracker["notifiarr"] = None
tracker["url"] = None
tracker_other_tag = self.config.util.check_for_attribute(
self.config.data, "tag", parent="tracker", subparent="other", default_is_none=True, var_type="list", save=False
)
try:
tracker["url"] = util.trunc_val(urls[0], os.sep)
except IndexError as e:
tracker["url"] = None
if not urls:
urls = []
if not tracker_other_tag:
tracker_other_tag = ["other"]
tracker["url"] = "No http URL found"
else:
logger.debug(f"Tracker Url:{urls}")
logger.debug(e)
if "tracker" in self.config.data and self.config.data["tracker"] is not None:
tag_values = self.config.data["tracker"]
for tag_url, tag_details in tag_values.items():
for url in urls:
if tag_url in url:
if tracker["url"] is None:
default_tag = tracker_other_tag
else:
try:
tracker["url"] = util.trunc_val(url, os.sep)
default_tag = tracker["url"].split(os.sep)[2].split(":")[0]
except IndexError as e:
logger.debug(f"Tracker Url:{url}")
logger.debug(e)
tracker["tag"] = self.config.util.check_for_attribute(
self.config.data, "tag", parent="tracker", subparent=tag_url, default=tag_url, var_type="list"
)
tracker["cat"] = self.config.util.check_for_attribute(
self.config.data,
"cat",
parent="tracker",
subparent=tag_url,
default_is_none=True,
var_type="str",
save=False,
do_print=False,
)
if tracker["tag"] == [tag_url]:
self.config.data["tracker"][tag_url]["tag"] = [tag_url]
if isinstance(tracker["tag"], str):
tracker["tag"] = [tracker["tag"]]
tracker["notifiarr"] = self.config.util.check_for_attribute(
self.config.data,
"notifiarr",
parent="tracker",
subparent=tag_url,
default_is_none=True,
do_print=False,
save=False,
)
return tracker
if tracker_other_tag:
tracker["tag"] = tracker_other_tag
tracker["notifiarr"] = self.config.util.check_for_attribute(
self.config.data,
"notifiarr",
parent="tracker",
subparent="other",
default_is_none=True,
do_print=False,
save=False,
)
return tracker
if tracker["url"]:
logger.trace(f"tracker url: {tracker['url']}")
if tracker_other_tag:
default_tag = tracker_other_tag
else:
default_tag = tracker["url"].split(os.sep)[2].split(":")[0]
tracker["tag"] = self.config.util.check_for_attribute(
self.config.data, "tag", parent="tracker", subparent=default_tag, default=default_tag, var_type="list"
)
if isinstance(tracker["tag"], str):
tracker["tag"] = [tracker["tag"]]
try:
self.config.data["tracker"][default_tag]["tag"] = [default_tag]
except Exception:
self.config.data["tracker"][default_tag] = {"tag": [default_tag]}
e = f'No tags matched for {tracker["url"]}. Please check your config.yml file. Setting tag to {default_tag}'
self.config.notify(e, "Tag", False)
logger.warning(e)
return tracker
def get_category(self, path):
"""Get category from config file based on path provided"""
category = []
path = os.path.join(path, "")
if "cat" in self.config.data and self.config.data["cat"] is not None:
cat_path = self.config.data["cat"]
for cat, save_path in cat_path.items():
if os.path.join(save_path, "") == path:
category.append(cat)
if not category:
default_cat = path.split(os.sep)[-2]
category = [default_cat]
self.config.util.check_for_attribute(self.config.data, default_cat, parent="cat", default=path)
self.config.data["cat"][str(default_cat)] = path
e = f"No categories matched for the save path {path}. Check your config.yml file. - Setting category to {default_cat}"
self.config.notify(e, "Category", False)
logger.warning(e)
return category
def get_category_save_paths(self):
"""Get all categories from qbitorrenta and return a list of save_paths"""
save_paths = set()
categories = self.client.torrent_categories.categories
for cat in categories:
save_path = categories[cat].savePath.replace(self.config.root_dir, self.config.remote_dir)
if save_path:
save_paths.add(save_path)
return list(save_paths)
def tor_delete_recycle(self, torrent, info):
"""Move torrent to recycle bin"""
try:
self.remove_torrent_files(torrent)
except ValueError:
logger.debug(f"Torrent {torrent.name} has already been removed from torrent files.")
tor_files = []
try:
info_hash = torrent.hash
save_path = torrent.save_path.replace(self.config.root_dir, self.config.remote_dir)
# Define torrent files/folders
for file in torrent.files:
tor_files.append(os.path.join(save_path, file.name))
except NotFound404Error:
return
if self.config.recyclebin["enabled"]:
if self.config.recyclebin["split_by_category"]:
recycle_path = os.path.join(save_path, os.path.basename(self.config.recycle_dir.rstrip(os.sep)))
else:
recycle_path = self.config.recycle_dir
# Create recycle bin if not exists
torrent_path = os.path.join(recycle_path, "torrents")
torrents_json_path = os.path.join(recycle_path, "torrents_json")
torrent_name = info["torrents"][0]
os.makedirs(recycle_path, exist_ok=True)
if self.config.recyclebin["save_torrents"]:
if os.path.isdir(torrent_path) is False:
os.makedirs(torrent_path)
if os.path.isdir(torrents_json_path) is False:
os.makedirs(torrents_json_path)
torrent_json_file = os.path.join(torrents_json_path, f"{torrent_name}.json")
torrent_json = util.load_json(torrent_json_file)
if not torrent_json:
logger.info(f"Saving Torrent JSON file to {torrent_json_file}")
torrent_json["torrent_name"] = torrent_name
torrent_json["category"] = info["torrent_category"]
else:
logger.info(f"Adding {info['torrent_tracker']} to existing {os.path.basename(torrent_json_file)}")
dot_torrent_files = []
for file in os.listdir(self.config.torrents_dir):
if file.startswith(info_hash):
dot_torrent_files.append(file)
try:
util.copy_files(os.path.join(self.config.torrents_dir, file), os.path.join(torrent_path, file))
except Exception as ex:
logger.stacktrace()
self.config.notify(ex, "Deleting Torrent", False)
logger.warning(f"RecycleBin Warning: {ex}")
if "tracker_torrent_files" in torrent_json:
tracker_torrent_files = torrent_json["tracker_torrent_files"]
else:
tracker_torrent_files = {}
tracker_torrent_files[info["torrent_tracker"]] = dot_torrent_files
if dot_torrent_files:
backup_str = "Backing up "
for idx, val in enumerate(dot_torrent_files):
if idx == 0:
backup_str += val
else:
backup_str += f" and {val.replace(info_hash, '')}"
backup_str += f" to {torrent_path}"
logger.info(backup_str)
torrent_json["tracker_torrent_files"] = tracker_torrent_files
if "files" not in torrent_json:
files_cleaned = [f.replace(self.config.remote_dir, "") for f in tor_files]
torrent_json["files"] = files_cleaned
if "deleted_contents" not in torrent_json:
torrent_json["deleted_contents"] = info["torrents_deleted_and_contents"]
else:
if torrent_json["deleted_contents"] is False and info["torrents_deleted_and_contents"] is True:
torrent_json["deleted_contents"] = info["torrents_deleted_and_contents"]
logger.debug("")
logger.debug(f"JSON: {torrent_json}")
util.save_json(torrent_json, torrent_json_file)
if info["torrents_deleted_and_contents"] is True:
logger.separator(f"Moving {len(tor_files)} files to RecycleBin", space=False, border=False, loglevel="DEBUG")
if len(tor_files) == 1:
logger.print_line(tor_files[0], "DEBUG")
else:
logger.print_line("\n".join(tor_files), "DEBUG")
logger.debug(
f"Moved {len(tor_files)} files to {recycle_path.replace(self.config.remote_dir, self.config.root_dir)}"
)
# Move files from torrent contents to Recycle bin
for file in tor_files:
src = file
dest = os.path.join(recycle_path, file.replace(self.config.remote_dir, ""))
# Move files and change date modified
try:
to_delete = util.move_files(src, dest, True)
except FileNotFoundError:
ex = logger.print_line(f"RecycleBin Warning - FileNotFound: No such file or directory: {src} ", "WARNING")
self.config.notify(ex, "Deleting Torrent", False)
# Add src file to orphan exclusion since sometimes deleting files are slow in certain environments
exclude_file = src.replace(self.config.remote_dir, self.config.root_dir)
if exclude_file not in self.config.orphaned["exclude_patterns"]:
self.config.orphaned["exclude_patterns"].append(exclude_file)
# Delete torrent and files
torrent.delete(delete_files=to_delete)
# Remove any empty directories
util.remove_empty_directories(save_path, self.get_category_save_paths())
else:
torrent.delete(delete_files=False)
else:
if info["torrents_deleted_and_contents"] is True:
for file in tor_files:
# Add src file to orphan exclusion since sometimes deleting files are slow in certain environments
exclude_file = file.replace(self.config.remote_dir, self.config.root_dir)
if exclude_file not in self.config.orphaned["exclude_patterns"]:
self.config.orphaned["exclude_patterns"].append(exclude_file)
torrent.delete(delete_files=True)
else:
torrent.delete(delete_files=False)
try:
if torrent in self.torrent_list:
self.torrent_list.remove(torrent)
except ValueError:
logger.debug(f"Torrent {torrent.name} has already been deleted from torrent list.")