-
Notifications
You must be signed in to change notification settings - Fork 25
/
configuration_utils.py
355 lines (280 loc) · 13.6 KB
/
configuration_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
"""
Namer Configuration readers/verifier
"""
import json
import os
import io
import random
import re
import tempfile
import shutil
from importlib import resources
from typing import Dict, List, Optional, Callable, Pattern, Any, Tuple
from configupdater import ConfigUpdater
from datetime import timedelta
from pathlib import Path
from loguru import logger
from requests_cache import BACKEND_CLASSES, BaseCache, CachedSession
from namer.configuration import NamerConfig
from namer.database import abbreviations
from namer.name_formatter import PartialFormatter
from namer.ffmpeg import FFMpeg
def __verify_naming_config(config: NamerConfig, formatter: PartialFormatter) -> bool:
"""
Verifies the contents of your config file. Returns False if configuration failed.
"""
success = True
if not config.enable_metadataapi_genres and not config.default_genre:
logger.error("Since enable_metadataapi_genres is not True, you must specify a default_genre")
success = False
success = __verify_name_string(formatter, "inplace_name", config.inplace_name) and success
return success
def __verify_watchdog_config(config: NamerConfig, formatter: PartialFormatter) -> bool:
"""
Verifies the contents of your config file. Returns False if configuration failed.
"""
success = True
if not config.enable_metadataapi_genres and not config.default_genre:
logger.error("Since enable_metadataapi_genres is not True, you must specify a default_genre")
success = False
success = __verify_dir(config, "watch_dir") and success
success = __verify_dir(config, "work_dir") and success
success = __verify_dir(config, "failed_dir") and success
success = __verify_dir(config, "dest_dir") and success
success = __verify_name_string(formatter, "new_relative_path_name", config.new_relative_path_name) and success
return success
def __verify_dir(config: NamerConfig, name: str) -> bool:
"""
verify a config directory exist. return false if verification fails
"""
file_name = getattr(config, name) if hasattr(config, name) else None
if file_name and not file_name.is_dir():
logger.error("Configured directory {}: {} is not a directory or not accessible", name, file_name)
return False
return True
def __verify_name_string(formatter: PartialFormatter, name: str, name_string: str) -> bool:
"""
Verify the name format string.
"""
values = dict(zip(formatter.supported_keys, formatter.supported_keys))
try:
formatter.format(name_string, values)
return True
except KeyError as key_error:
logger.error("Configuration {} is not a valid file name format, please check {}", name, name_string)
logger.error("Error message: {}", key_error)
return False
def __verify_ffmpeg() -> bool:
versions = FFMpeg().ffmpeg_version()
for tool, version in versions.items():
if not version:
logger.error(f'No {tool} found, please install {tool}')
else:
logger.info(f'{tool} version {version} found')
return None not in versions.values()
def verify_configuration(config: NamerConfig, formatter: PartialFormatter) -> bool:
"""
Can verify a NamerConfig with a formatter
"""
success = __verify_naming_config(config, formatter)
success = __verify_watchdog_config(config, formatter) and success
success: bool = __verify_ffmpeg() and success
return success
# Read and write .ini files utils below
def get_str(updater: ConfigUpdater, section: str, key: str) -> Optional[str]:
"""
Read a string from an ini file if the config exists, else return None if the config does not
exist in file.
"""
if updater.has_option(section, key):
output = updater.get(section, key)
return str(output.value)
else:
return None
# Ini file string converters, to and from NamerConfig type
def to_bool(value: Optional[str]) -> Optional[bool]:
return value.lower() == "true" if value else None
def from_bool(value: Optional[bool]) -> str:
return str(value) if value is not None else ""
def to_str_list_lower(value: Optional[str]) -> List[str]:
return [x.strip().lower() for x in value.lower().split(',')] if value else []
def from_str_list_lower(value: Optional[List[str]]) -> str:
return ", ".join(value) if value else ""
def to_int(value: Optional[str]) -> Optional[int]:
return int(value) if value is not None else None
def from_int(value: Optional[int]) -> str:
return str(value) if value is not None else ""
def to_path(value: Optional[str]) -> Optional[Path]:
return Path(value).resolve() if value else None
def from_path(value: Optional[Path]) -> str:
return str(value) if value else ""
def to_regex_list(value: Optional[str]) -> List[Pattern]:
return [re.compile(x.strip()) for x in value.split(',')] if value else []
def from_regex_list(value: Optional[List[Pattern]]) -> str:
return ", ".join([x.pattern for x in value]) if value else ""
def to_site_abreviation(site_abbreviations: Optional[str]) -> Dict[Pattern, str]:
abbreviations_db = abbreviations.copy()
if site_abbreviations:
data = json.loads(site_abbreviations)
abbreviations_db.update(data)
new_abbreviation: Dict[Pattern, str] = {}
for abbreviation, full in abbreviations_db.items():
key = re.compile(fr'^{abbreviation}[ .-]+', re.IGNORECASE)
new_abbreviation[key] = f'{full} '
return new_abbreviation
def from_site_abbreviation(site_abbreviations: Optional[Dict[Pattern, str]]) -> str:
out: Dict[str, str] = {x.pattern[1:-6]: y[0:-1] for (x, y) in site_abbreviations.items()} if site_abbreviations else {}
wrapper = io.StringIO("")
json.dump(out, wrapper)
return wrapper.getvalue()
def to_pattern(value: Optional[str]) -> Optional[Pattern]:
return re.compile(value, re.IGNORECASE) if value else None
def from_pattern(value: Optional[Pattern]) -> str:
return value.pattern if value else ""
def to_site_list(value: Optional[str]) -> List[str]:
return [re.sub(r"[^a-z0-9]", "", x.strip().lower()) for x in value.split(",")] if value else []
def set_str(updater: ConfigUpdater, section: str, key: str, value: str) -> None:
updater[section][key].value = value
def set_comma_list(updater: ConfigUpdater, section: str, key: str, value: List[str]) -> None:
updater[section][key].value = ", ".join(value)
def set_int(updater: ConfigUpdater, section: str, key: str, value: int) -> None:
updater[section][key].value = str(value)
def set_boolean(updater: ConfigUpdater, section: str, key: str, value: bool) -> None:
updater[section][key] = str(value)
field_info: Dict[str, Tuple[str, Optional[Callable[[Optional[str]], Any]], Optional[Callable[[Any], str]]]] = {
"porndb_token": ("namer", None, None),
"name_parser": ("namer", None, None),
"inplace_name": ("namer", None, None),
"prefer_dir_name_if_available": ("namer", to_bool, from_bool),
"min_file_size": ("namer", to_int, from_int),
"write_namer_log": ("namer", to_bool, from_bool),
"write_namer_failed_log": ("namer", to_bool, from_bool),
"target_extensions": ("namer", to_str_list_lower, from_str_list_lower),
"update_permissions_ownership": ("namer", to_bool, from_bool),
"set_dir_permissions": ("namer", to_int, from_int),
"set_file_permissions": ("namer", to_int, from_int),
"set_uid": ("namer", to_int, from_int),
"set_gid": ("namer", to_int, from_int),
"trailer_location": ("namer", None, None),
"sites_with_no_date_info": ("namer", to_str_list_lower, from_str_list_lower),
"movie_data_preferred": ("namer", to_str_list_lower, from_str_list_lower),
"vr_studios": ("namer", to_str_list_lower, from_str_list_lower),
"vr_tags": ("namer", to_str_list_lower, from_str_list_lower),
"site_abbreviations": ("namer", to_site_abreviation, from_site_abbreviation),
"max_performer_names": ("namer", to_int, from_int),
"use_requests_cache": ("namer", to_bool, from_bool),
"requests_cache_expire_minutes": ("namer", to_int, from_int),
"override_tpdb_address": ("namer", None, None),
"plex_hack": ("namer", to_bool, from_bool),
"search_phash": ("Phash", to_bool, from_bool),
# "require_match_phash_top": ("Phash", to_int, from_int),
# "send_phash_of_matches_to_tpdb": ("Phash", to_bool, from_bool),
"write_nfo": ("metadata", to_bool, from_bool),
"enabled_tagging": ("metadata", to_bool, from_bool),
"enabled_poster": ("metadata", to_bool, from_bool),
"enable_metadataapi_genres": ("metadata", to_bool, from_bool),
"default_genre": ("metadata", None, None),
"language": ("metadata", None, None),
"preserve_duplicates": ("duplicates", to_bool, from_bool),
"max_desired_resolutions": ("duplicates", to_int, from_int),
"desired_codec": ("duplicates", to_str_list_lower, from_str_list_lower),
"ignored_dir_regex": ("watchdog", to_pattern, from_pattern),
"del_other_files": ("watchdog", to_bool, from_bool),
"extra_sleep_time": ("watchdog", to_int, from_int),
"new_relative_path_name": ("watchdog", None, None),
"watch_dir": ("watchdog", to_path, from_path),
"work_dir": ("watchdog", to_path, from_path),
"failed_dir": ("watchdog", to_path, from_path),
"dest_dir": ("watchdog", to_path, from_path),
"retry_time": ("watchdog", None, None),
"web": ("watchdog", to_bool, from_bool),
"port": ("watchdog", to_int, from_int),
"host": ("watchdog", None, None),
"web_root": ("watchdog", None, None),
"allow_delete_files": ("watchdog", to_bool, from_bool),
"add_max_percent_column": ("watchdog", to_bool, from_bool),
"debug": ("watchdog", to_bool, from_bool),
"diagnose_errors": ("watchdog", to_bool, from_bool),
}
"""
A mapping from NamerConfig field to ini file section - the ini property name and the field name
must be identical. The conversion of string too and from functions are also provided here allowing
the conversion of types from NamerConfig to and from strings. If the converters are not set then the
the string is unaltered.
"""
def to_ini(config: NamerConfig) -> str:
updater = config.config_updater
for name in field_info.keys():
info = field_info.get(name)
if info:
section = info[0]
if section:
value = getattr(config, name)
convert: Optional[Callable[[Any], str]] = info[2]
if convert:
updater.get(section, name).value = convert(value)
else:
updater.get(section, name).value = value
return str(updater)
def from_config(config: ConfigUpdater, namer_config: NamerConfig) -> NamerConfig:
"""
Given a config parser pointed at a namer.cfg file, return a NamerConfig with the file's parameters.
"""
keys = field_info.keys()
for name in keys:
info = field_info.get(name)
if info and info[0]:
new_value = get_str(config, info[0], name)
if new_value or not hasattr(namer_config, name):
type_converter_lambda: Optional[Callable[[Optional[str]], Any]] = info[1]
if type_converter_lambda:
setattr(namer_config, name, type_converter_lambda(new_value))
else:
setattr(namer_config, name, new_value)
if not hasattr(namer_config, "retry_time") or namer_config.retry_time is None:
setattr(namer_config, "retry_time", f"03:{random.randint(0, 59):0>2}")
# create a CachedSession objects for request caching.
if namer_config.enabled_requests_cache:
cache_file = Path(tempfile.gettempdir()) / "namer_cache"
sqlite_supported = issubclass(BACKEND_CLASSES['sqlite'], BaseCache)
backend = 'sqlite' if sqlite_supported else 'filesystem'
expire_time = timedelta(minutes=namer_config.requests_cache_expire_minutes)
namer_config.cache_session = CachedSession(str(cache_file), backend=backend, expire_after=expire_time, ignored_parameters=["Authorization"])
return namer_config
def resource_file_to_str(package: str, file_name: str) -> str:
config_str = ""
if hasattr(resources, 'files'):
config_str = resources.files(package).joinpath(file_name).read_text()
elif hasattr(resources, 'read_text'):
config_str = resources.read_text(package, file_name)
return config_str
def copy_resource_to_file(package: str, file_name: str, output: Path) -> bool:
if hasattr(resources, 'files'):
with resources.files(package).joinpath(file_name).open("rb") as bin, open(output, mode="+bw") as out:
shutil.copyfileobj(bin, out)
return True
elif hasattr(resources, 'read_text'):
with resources.open_binary(package, file_name) as bin, open(output, mode="+bw") as out:
shutil.copyfileobj(bin, out)
return True
return False
def default_config(user_set: Optional[Path] = None) -> NamerConfig:
"""
Attempts reading various locations to fine a namer.cfg file.
"""
config = ConfigUpdater()
config_str = resource_file_to_str("namer", "namer.cfg.default")
config.read_string(config_str)
namer_config = from_config(config, NamerConfig())
namer_config.config_updater = config
user_config = ConfigUpdater()
config_loc = os.environ.get("NAMER_CONFIG")
if user_set and Path(user_set).is_file():
user_config.read(user_set)
elif config_loc and Path(config_loc).exists():
user_config.read(config_loc)
elif (Path.home() / ".namer.cfg").exists():
user_config.read(str(Path.home() / ".namer.cfg"))
elif Path("./namer.cfg").exists():
user_config.read(".namer.cfg")
return from_config(user_config, namer_config)