-
-
Notifications
You must be signed in to change notification settings - Fork 604
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: root dir index errors in mass add table and api, as well as refa… #8726
Changes from all commits
86bb764
7cefd2b
0d344a1
37eb0c7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
from pathlib import Path | ||
from typing import Union | ||
|
||
from sickchill import logger, settings | ||
|
||
|
||
class RootDirectories: | ||
def __init__(self, root_directories_string: str = "", root_directories_list: Union[list, None] = None) -> None: | ||
self.default_root_index: Union[int, None] = None | ||
self.root_directories: list = [] | ||
|
||
self.parse(root_directories_string or settings.ROOT_DIRS, root_directories_list) | ||
logger.debug(f"parsed roots: {[str(directory) for directory in self]} from {settings.ROOT_DIRS}") | ||
if self.default: | ||
logger.debug(f"default root directory: {self[self.default]}") | ||
else: | ||
logger.debug(f"no default root directory set") | ||
|
||
def parse(self, root_directories_string: str = "", root_directories_list: Union[list, None] = None) -> None: | ||
split_setting = root_directories_list or root_directories_string.split("|") | ||
|
||
if len(split_setting) < 2: | ||
return | ||
|
||
self.default_root_index = int(split_setting[0]) | ||
for root_directory in split_setting[1:]: | ||
self.add(root_directory) | ||
|
||
def add(self, root_directory: Union[str, Path]) -> bool: | ||
root_directory = Path(root_directory).resolve() | ||
if not root_directory.is_dir(): | ||
logger.debug(f"tried to add a non-existent root directory: {root_directory}") | ||
return False | ||
|
||
if root_directory in self.root_directories: | ||
logger.debug(f"tried to add a root directory that is already added: {root_directory}") | ||
return False | ||
|
||
logger.debug(f"adding root directory {root_directory}") | ||
self.root_directories.append(root_directory) | ||
|
||
self.__check_default_index() | ||
|
||
return True | ||
|
||
def delete(self, root_directory: Union[str, Path]) -> bool: | ||
root_directory = Path(root_directory) | ||
if root_directory in self.root_directories: | ||
self.root_directories.remove(root_directory) | ||
self.__check_default_index() | ||
return True | ||
return False | ||
|
||
@property | ||
def default(self) -> int: | ||
return self.default_root_index | ||
|
||
@default.setter | ||
def default(self, value: Union[int, str]) -> None: | ||
value = int(value) | ||
self.default_root_index = value | ||
self.__check_default_index() | ||
|
||
def info(self) -> list: | ||
output = [] | ||
for root_dir in self: | ||
output.append({"valid": root_dir.is_dir(), "location": str(root_dir), "default": int(root_dir is self[self.default])}) | ||
|
||
return output or {} | ||
|
||
def __check_default_index(self): | ||
if not self.root_directories: | ||
self.default_root_index = None | ||
return | ||
|
||
if not self.default_root_index or self.default_root_index > len(self.root_directories): | ||
self.default_root_index = 1 | ||
|
||
def __str__(self) -> str: | ||
return "|".join(f"{item}" for item in [self.default_root_index] + self.root_directories) | ||
|
||
def __getitem__(self, item: int) -> Path: | ||
return self.root_directories[item - 1] | ||
|
||
def __setitem__(self, index: int, value: Path): | ||
self.root_directories[index - 1] = value | ||
self.__check_default_index() | ||
|
||
def __delitem__(self, key): | ||
self.root_directories.pop(key - 1) | ||
self.__check_default_index() | ||
|
||
def __contains__(self, item): | ||
return item in self.root_directories | ||
|
||
def __iter__(self): | ||
for item in self.root_directories: | ||
yield item | ||
|
||
def __len__(self): | ||
return len(self.root_directories) | ||
Comment on lines
+7
to
+101
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The
Overall, the class provides a solid foundation for managing root directories, but addressing these points could enhance its clarity, robustness, and maintainability. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,7 @@ | |
import os.path | ||
import platform | ||
import re | ||
from pathlib import Path | ||
from urllib import parse | ||
|
||
import rarfile | ||
|
@@ -300,7 +301,9 @@ def change_unpack_dir(unpack_dir): | |
return True | ||
|
||
if os.path.normpath(settings.UNPACK_DIR) != os.path.normpath(unpack_dir): | ||
if bool(settings.ROOT_DIRS) and any(helpers.is_subdirectory(unpack_dir, rd) for rd in settings.ROOT_DIRS.split("|")[1:]): | ||
if settings.ROOT_DIRS and any( | ||
Path(root_directory).resolve() in Path(unpack_dir).resolve().parents for root_directory in settings.ROOT_DIRS.split("|")[1:] | ||
): | ||
Comment on lines
+304
to
+306
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The use of Consider caching the resolved paths of |
||
# don't change if it's in any of the TV root directories | ||
logger.info("Unable to change unpack directory to a sub-directory of a TV root dir") | ||
return False | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -950,31 +950,6 @@ def has_hidden_attribute(filepath): | |
return False | ||
|
||
|
||
def real_path(path): | ||
""" | ||
Returns: | ||
the canonicalized absolute pathname. The resulting path will have no symbolic link, '/./' or '/../' components. | ||
""" | ||
return os.path.normpath(os.path.normcase(os.path.realpath(path))) | ||
|
||
|
||
def is_subdirectory(subdir_path, topdir_path): | ||
""" | ||
Returns true if a subdir_path is a subdirectory of topdir_path | ||
else otherwise. | ||
|
||
Parameters: | ||
subdir_path: The full path to the subdirectory | ||
topdir_path: The full path to the top directory to check subdir_path against | ||
""" | ||
topdir_path = real_path(topdir_path) | ||
subdir_path = real_path(subdir_path) | ||
|
||
# checks if the common prefix of both is equal to directory | ||
# e.g. /a/b/c/d.rst and directory is /a/b, the common prefix is /a/b | ||
return os.path.commonprefix([subdir_path, topdir_path]) == topdir_path | ||
|
||
|
||
def set_up_anidb_connection(): | ||
"""Connect to anidb""" | ||
|
||
Comment on lines
950
to
955
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The file imports a significant number of modules, which is expected for a utility file like Consider reviewing the imports to ensure all are necessary and remove any that are unused.
The function Consider simplifying the directory creation logic by using - if not os.path.isdir(path):
- try:
- os.makedirs(path)
+ try:
+ os.makedirs(path, exist_ok=True)
The use of global variables (e.g., Consider refactoring the code to reduce reliance on global state, possibly by encapsulating related functionality within classes or passing state explicitly between functions where feasible. |
||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -36,30 +36,31 @@ def delete_folder(folder, check_empty=True): | |||||||||||||||||||||
return: True on success, False on failure | ||||||||||||||||||||||
""" | ||||||||||||||||||||||
|
||||||||||||||||||||||
folder = Path(folder).resolve() | ||||||||||||||||||||||
# check if it's a folder | ||||||||||||||||||||||
if not os.path.isdir(folder): | ||||||||||||||||||||||
if not folder.is_dir(): | ||||||||||||||||||||||
return False | ||||||||||||||||||||||
|
||||||||||||||||||||||
# check if it isn't TV_DOWNLOAD_DIR | ||||||||||||||||||||||
if settings.TV_DOWNLOAD_DIR and helpers.real_path(folder) == helpers.real_path(settings.TV_DOWNLOAD_DIR): | ||||||||||||||||||||||
if settings.TV_DOWNLOAD_DIR and str(Path(folder).resolve()) == str(Path(settings.TV_DOWNLOAD_DIR).resolve()): | ||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Comparing paths using - if settings.TV_DOWNLOAD_DIR and str(Path(folder).resolve()) == str(Path(settings.TV_DOWNLOAD_DIR).resolve()):
+ if settings.TV_DOWNLOAD_DIR and Path(folder).resolve() == Path(settings.TV_DOWNLOAD_DIR).resolve(): Committable suggestion
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In pathlib, the slashes are not normalized according to the operating system until you call str There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @miigotu: Thank you for your input. Pathlib's design abstracts away the differences between operating systems, including how slashes are handled in file paths. When comparing |
||||||||||||||||||||||
return False | ||||||||||||||||||||||
|
||||||||||||||||||||||
# check if it's empty folder when wanted to be checked | ||||||||||||||||||||||
if check_empty: | ||||||||||||||||||||||
check_files = os.listdir(folder) | ||||||||||||||||||||||
if check_files: | ||||||||||||||||||||||
logger.info(f"Not deleting folder {folder} found the following files: {check_files}") | ||||||||||||||||||||||
found_files = [file for file in folder.iterdir()] | ||||||||||||||||||||||
if found_files: | ||||||||||||||||||||||
logger.info(f"Not deleting folder {folder} found the following files: {found_files}") | ||||||||||||||||||||||
return False | ||||||||||||||||||||||
|
||||||||||||||||||||||
try: | ||||||||||||||||||||||
logger.info(f"Deleting folder (if it's empty): {folder}") | ||||||||||||||||||||||
os.rmdir(folder) | ||||||||||||||||||||||
folder.rmdir() | ||||||||||||||||||||||
except (OSError, IOError) as error: | ||||||||||||||||||||||
logger.warning(f"Warning: unable to delete folder: {folder}: {error}") | ||||||||||||||||||||||
return False | ||||||||||||||||||||||
else: | ||||||||||||||||||||||
try: | ||||||||||||||||||||||
logger.info("Deleting folder: " + folder) | ||||||||||||||||||||||
logger.info(f"Deleting folder: {folder}") | ||||||||||||||||||||||
shutil.rmtree(folder) | ||||||||||||||||||||||
except (OSError, IOError) as error: | ||||||||||||||||||||||
logger.warning(f"Warning: unable to delete folder: {folder}: {error}") | ||||||||||||||||||||||
|
@@ -82,27 +83,28 @@ def delete_files(process_path, unwanted_files, result, force=False): | |||||||||||||||||||||
elif not result.result: | ||||||||||||||||||||||
return | ||||||||||||||||||||||
|
||||||||||||||||||||||
process_path = Path(process_path) | ||||||||||||||||||||||
# Delete all file not needed | ||||||||||||||||||||||
for cur_file in unwanted_files: | ||||||||||||||||||||||
cur_file_path = os.path.join(process_path, cur_file) | ||||||||||||||||||||||
if not os.path.isfile(cur_file_path): | ||||||||||||||||||||||
for current_file in unwanted_files: | ||||||||||||||||||||||
file_path = process_path / current_file | ||||||||||||||||||||||
if not file_path.is_file(): | ||||||||||||||||||||||
continue # Prevent error when a notwantedfiles is an associated files | ||||||||||||||||||||||
|
||||||||||||||||||||||
result.output += log_helper(f"Deleting file: {cur_file}", logger.DEBUG) | ||||||||||||||||||||||
result.output += log_helper(f"Deleting file: {current_file}", logger.DEBUG) | ||||||||||||||||||||||
|
||||||||||||||||||||||
# check first the read-only attribute | ||||||||||||||||||||||
file_attribute = os.stat(cur_file_path)[0] | ||||||||||||||||||||||
file_attribute = file_path.stat()[0] | ||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Retrieving file attributes using - file_attribute = file_path.stat()[0]
+ file_attribute = file_path.stat().st_mode Committable suggestion
Suggested change
|
||||||||||||||||||||||
if not file_attribute & stat.S_IWRITE: | ||||||||||||||||||||||
# File is read-only, so make it writeable | ||||||||||||||||||||||
result.output += log_helper(f"Changing ReadOnly Flag for file: {cur_file}", logger.DEBUG) | ||||||||||||||||||||||
result.output += log_helper(f"Changing ReadOnly Flag for file: {current_file}", logger.DEBUG) | ||||||||||||||||||||||
try: | ||||||||||||||||||||||
os.chmod(cur_file_path, stat.S_IWRITE) | ||||||||||||||||||||||
file_path.chmod(stat.S_IWRITE) | ||||||||||||||||||||||
except OSError as error: | ||||||||||||||||||||||
result.output += log_helper(f"Cannot change permissions of {cur_file_path}: {error}", logger.DEBUG) | ||||||||||||||||||||||
result.output += log_helper(f"Cannot change permissions of {current_file}: {error}", logger.DEBUG) | ||||||||||||||||||||||
try: | ||||||||||||||||||||||
os.remove(cur_file_path) | ||||||||||||||||||||||
file_path.unlink(True) | ||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The use of - file_path.unlink(True)
+ file_path.unlink() Committable suggestion
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The signature for unlink in pathlib.Path changed in python 3.8, adding a parameter miising_ok. It is now There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @miigotu: Thank you for the correction. You're right, starting with Python 3.8, the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In python 3.8+ pathlib.Path.unlink takes a missong_ok:bool parameter There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @miigotu: Absolutely, you're correct. The |
||||||||||||||||||||||
except OSError as error: | ||||||||||||||||||||||
result.output += log_helper(f"Unable to delete file {cur_file}: {error}", logger.DEBUG) | ||||||||||||||||||||||
result.output += log_helper(f"Unable to delete file {current_file}: {error}", logger.DEBUG) | ||||||||||||||||||||||
|
||||||||||||||||||||||
|
||||||||||||||||||||||
def log_helper(message, level=logging.INFO): | ||||||||||||||||||||||
|
@@ -132,13 +134,17 @@ def process_dir(process_path, release_name=None, process_method=None, force=Fals | |||||||||||||||||||||
|
||||||||||||||||||||||
# if the client and SickChill are not on the same machine translate the directory into a network directory | ||||||||||||||||||||||
elif all( | ||||||||||||||||||||||
[settings.TV_DOWNLOAD_DIR, os.path.isdir(settings.TV_DOWNLOAD_DIR), os.path.normpath(process_path) == os.path.normpath(settings.TV_DOWNLOAD_DIR)] | ||||||||||||||||||||||
[ | ||||||||||||||||||||||
settings.TV_DOWNLOAD_DIR, | ||||||||||||||||||||||
Path(settings.TV_DOWNLOAD_DIR).is_dir(), | ||||||||||||||||||||||
str(Path(process_path).resolve()) == str(Path(settings.TV_DOWNLOAD_DIR).resolve()), | ||||||||||||||||||||||
] | ||||||||||||||||||||||
Comment on lines
+137
to
+141
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The logic to check if the client and SickChill are on the same machine and to translate the directory into a network directory is thoughtful. However, the conversion of - process_path = os.path.join(settings.TV_DOWNLOAD_DIR, os.path.abspath(process_path).split(os.path.sep)[-1])
+ process_path = Path(settings.TV_DOWNLOAD_DIR) / Path(process_path).name Committable suggestion
Suggested change
|
||||||||||||||||||||||
): | ||||||||||||||||||||||
process_path = os.path.join(settings.TV_DOWNLOAD_DIR, os.path.abspath(process_path).split(os.path.sep)[-1]) | ||||||||||||||||||||||
result.output += log_helper(f"Trying to use folder: {process_path} ", logger.DEBUG) | ||||||||||||||||||||||
|
||||||||||||||||||||||
# if we didn't find a real dir then quit | ||||||||||||||||||||||
if not os.path.isdir(process_path): | ||||||||||||||||||||||
if not Path(process_path).is_dir(): | ||||||||||||||||||||||
result.output += log_helper( | ||||||||||||||||||||||
"Unable to figure out what folder to process. " | ||||||||||||||||||||||
"If your downloader and SickChill aren't on the same PC " | ||||||||||||||||||||||
|
@@ -162,6 +168,8 @@ def process_dir(process_path, release_name=None, process_method=None, force=Fals | |||||||||||||||||||||
result.output += log_helper(_("Processing {process_path}").format(process_path=process_path)) | ||||||||||||||||||||||
generator_to_use = os.walk(process_path, followlinks=settings.PROCESSOR_FOLLOW_SYMLINKS) | ||||||||||||||||||||||
|
||||||||||||||||||||||
rar_files = [] | ||||||||||||||||||||||
|
||||||||||||||||||||||
for current_directory, directory_names, filenames in generator_to_use: | ||||||||||||||||||||||
result.result = True | ||||||||||||||||||||||
|
||||||||||||||||||||||
|
@@ -273,7 +281,11 @@ def validate_dir(process_path, release_name, failed, result): | |||||||||||||||||||||
result.missed_files.append(f"{process_path} : Failed download") | ||||||||||||||||||||||
return False | ||||||||||||||||||||||
|
||||||||||||||||||||||
if settings.TV_DOWNLOAD_DIR and helpers.real_path(process_path) != helpers.real_path(settings.TV_DOWNLOAD_DIR) and helpers.is_hidden_folder(process_path): | ||||||||||||||||||||||
if ( | ||||||||||||||||||||||
settings.TV_DOWNLOAD_DIR | ||||||||||||||||||||||
and str(Path(process_path).resolve()) != str(Path(settings.TV_DOWNLOAD_DIR).resolve()) | ||||||||||||||||||||||
and helpers.is_hidden_folder(process_path) | ||||||||||||||||||||||
): | ||||||||||||||||||||||
result.output += log_helper(f"Ignoring hidden folder: {process_path}", logger.DEBUG) | ||||||||||||||||||||||
if not process_path.endswith("@eaDir"): | ||||||||||||||||||||||
result.missed_files.append(f"{process_path} : Hidden folder") | ||||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,5 @@ | ||
from typing import Union | ||
|
||
from .handler import ShowIndexer | ||
|
||
indexer = None | ||
indexer: Union[ShowIndexer, None] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic for adjusting file paths for downloads using
urljoin
andquote
fromurllib.parse
is correctly implemented. However, consider adding a comment explaining the purpose of this block, especially the logic for stripping the root directory from the filename and constructing the download URL. This will enhance maintainability and readability for future contributors.Additionally, ensure that
settings.ROOT_DIRS.split('|')[1:]
correctly handles all expected cases, including potential edge cases wheresettings.ROOT_DIRS
might not be formatted as expected or could be empty.+ # Adjust file paths for downloads by stripping the root directory and constructing the download URL for rootDir in settings.ROOT_DIRS.split('|')[1:]: if filename.startswith(rootDir): filename = filename.replace(rootDir, "").lstrip("/\\") filename = urljoin(settings.DOWNLOAD_URL, quote(filename))
Committable suggestion