generated from LightDestory/RepositoryTemplate
-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
plex_agent.py
260 lines (240 loc) · 8.9 KB
/
plex_agent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
import json
import logging
import pprint
from plexapi.server import PlexServer
from pathlib import Path
from threading import Event, Thread
from ..config import shared
class PlexAgent:
__plex_config: dict[str, str] = {}
__server: PlexServer = None
__save_cache: bool = False
__internal_paths: dict[str, tuple[str, str]] = {}
__notify_queue: list[tuple[str, str]] = list()
__supported_ext: list[str] = [
"3g2",
"3gp",
"amv",
"asf",
"ass",
"avi",
"drc",
"f4a",
"f4b",
"f4p",
"f4v",
"flac",
"flv",
"gif",
"gifv",
"idx",
"m2ts",
"m2v",
"m4p",
"m4v",
"m4v",
"mkv",
"mng",
"mov",
"mp2",
"mp3",
"mp4",
"mpe",
"mpeg",
"mpg",
"mpv",
"mxf",
"nsv",
"ogg",
"ogv",
"qt",
"rm",
"rmvb",
"roq",
"smi",
"srt",
"ssa",
"sub",
"svi",
"ts",
"vob",
"vtt",
"wmv",
"yuv",
"webm"
]
def is_cache_loaded(self) -> bool:
"""
Checks if the Plex configuration is set
:return bool: True if the Plex configuration is set, False otherwise
"""
return self.__plex_config != {}
def load_config_cache(self) -> None:
"""
Loads the Plex configuration from the cache
:return:
"""
try:
logging.info(f"Found Plex configuration from cache: {shared.cache_path}")
with open(shared.cache_path, "r") as cache_file:
self.__plex_config = json.load(cache_file)
except OSError as e:
logging.error(f"Could not load Plex configuration from cache: {e}")
exit(-1)
def __save_config_cache(self) -> None:
"""
Saves the Plex configuration to the cache
:return:
"""
try:
logging.info(f"Saving Plex configuration to cache: {shared.cache_path}")
if not shared.cache_path.parent.exists():
shared.cache_path.parent.mkdir(parents=True)
with open(shared.cache_path, "w") as cache_file:
json.dump(self.__plex_config, cache_file)
except OSError as e:
logging.error(f"Could not save Plex configuration to cache: {e}")
exit(-1)
def __eval_config(self) -> None:
"""
Returns the Plex configuration, it is ensured that host and token are available from cli or cache
:return:
"""
if shared.user_input.token and not self.__plex_config:
self.__plex_config['host'] = shared.user_input.host
self.__plex_config['token'] = shared.user_input.token
self.__save_cache = True
elif shared.user_input.token and self.__plex_config:
if self.__plex_config['host'] != shared.user_input.host \
or self.__plex_config['token'] != shared.user_input.token:
logging.warning("Plex host and/or token are different from the cached ones!")
while True:
answer: str = input("Do you want to overwrite the cached configuration? [y/N]: ").lower()
if answer == 'y':
self.__plex_config['host'] = shared.user_input.host
self.__plex_config['token'] = shared.user_input.token
self.__save_cache = True
break
elif answer == 'n':
break
def connect(self) -> None:
"""
Connects to the Plex server
:return:
"""
self.__eval_config()
try:
self.__server = PlexServer(self.__plex_config["host"], self.__plex_config["token"])
logging.info("Connected to Plex server")
logging.info(f"Plex version: {self.__server.version}")
self.__inspect_library()
num_detected_sections: int = len(self.__internal_paths)
if num_detected_sections == 0:
logging.error("No Plex sections detected, please check your configuration")
exit(-1)
logging.info(f"Found {num_detected_sections} Plex sections:\n{pprint.pformat(self.__internal_paths)}")
if self.__save_cache:
self.__save_config_cache()
except Exception as e:
logging.error(f"Unable to connect to Plex server:\n{e}")
exit(-1)
def __inspect_library(self) -> None:
"""
Loads the internal paths from the Plex server
:return:
"""
for section in self.__server.library.sections():
remote_path: str = section.locations[0]
self.__internal_paths[Path(remote_path).name] = (section.title, remote_path)
def is_plex_section(self, folder_name: str) -> bool:
"""
Checks if the given path is a Plex section
:param folder_name: The folder name to check
:return bool: True if the given folder_name is a Plex section, False otherwise
"""
return folder_name in self.__internal_paths.keys()
def __find_section_child_of(self, item: Path) -> Path | None:
"""
Finds the direct child of the Plex section of the given item
:param item: The item to find the section of
:return: The direct child of the Plex section of the given item
"""
while item.parent.name not in self.__internal_paths.keys():
if len(item.parents) == 0:
return None
item = item.parent
return item
def _scan(self, section: str, item: str) -> None:
"""
Scans the given item in the given section
:param section: The section to scan
:param item: The item to scan
:return:
"""
section_title, section_path = self.__internal_paths[section]
plex_section = self.__server.library.section(section_title)
if plex_section.refreshing:
if shared.user_input.daemon:
logging.warning(f"Plex section {section_title} is already refreshing, re-scheduling...")
self.__notify_queue.append((section, item))
else:
logging.warning(f"Plex section {section_title} is already refreshing, skipping...")
return
scan_path: Path = Path(f"{section_path}/{item}")
logging.info(f"Requesting Plex to scan remote path {str(scan_path)}")
if shared.user_input.dry_run:
logging.info(f"Skipping Plex scan due to dry-run")
else:
plex_section.update(str(scan_path))
def manual_scan(self, paths: set[Path]) -> None:
"""
Manually scans the given paths
:param paths: A list of paths to scan
:return:
"""
scan_sections: set[tuple[str, str]] = set()
for given_path in paths:
logging.info(f"Analyzing {given_path}")
if given_path.name in self.__internal_paths.keys():
scan_sections.add((given_path.name, ""))
else:
section_child: Path | None = self.__find_section_child_of(given_path)
if section_child is None:
logging.error(f"Could not find Plex section for {given_path}")
continue
else:
scan_sections.add((section_child.parent.name, section_child.name))
for (section, item) in scan_sections:
self._scan(section, item)
def parse_event(self, event) -> None:
"""
Parses the given event and adds it to the queue
:param event: The event to parse
:return:
"""
event_type: str = event.event_type
event_path: Path = Path(event.src_path) if event_type != 'moved' else Path(event.dest_path)
if event_path.name in self.__internal_paths.keys():
return
section_child: Path | None = self.__find_section_child_of(event_path)
if section_child is None:
logging.error(f"Could not find Plex section for {event_path}")
return
section_scan = (section_child.parent.name, section_child.name)
if section_scan not in self.__notify_queue:
logging.info(f"Adding to queue ({event_type}): {section_child.name}")
self.__notify_queue.append(section_scan)
def start_service(self) -> ():
"""
Start a thread to manage a queue of pending scans
:return callable: A function to stop the thread
"""
stopped = Event()
def loop():
while not stopped.wait(shared.user_input.interval):
if self.__notify_queue:
section, item = self.__notify_queue.pop(0)
self._scan(section, item)
Thread(target=loop).start()
return stopped.set
plex_agent_singleton = PlexAgent()