From 3f78d627ecf995383952cd756348fd4e01b40247 Mon Sep 17 00:00:00 2001 From: Fabio Caccamo Date: Mon, 2 Jan 2023 12:34:02 +0100 Subject: [PATCH] Allow support to `pathlib.Path` path arguments. #14 --- fsutil/__init__.py | 92 +++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 91 insertions(+), 1 deletion(-) diff --git a/fsutil/__init__.py b/fsutil/__init__.py index 589de7e..961b072 100644 --- a/fsutil/__init__.py +++ b/fsutil/__init__.py @@ -3,6 +3,7 @@ import hashlib import json import os +import pathlib import shutil import tempfile import uuid @@ -126,10 +127,20 @@ def _require_requests_installed(): ) +def _get_path(path): + if isinstance(path, str): + return path + elif isinstance(path, pathlib.Path): + return str(path) + else: + return path + + def assert_dir(path): """ Raise an OSError if the given path doesn't exist or it is not a directory. """ + path = _get_path(path) if not is_dir(path): raise OSError(f"Invalid directory path: {path}") @@ -138,6 +149,7 @@ def assert_exists(path): """ Raise an OSError if the given path doesn't exist. """ + path = _get_path(path) if not exists(path): raise OSError(f"Invalid item path: {path}") @@ -146,6 +158,7 @@ def assert_file(path): """ Raise an OSError if the given path doesn't exist or it is not a file. """ + path = _get_path(path) if not is_file(path): raise OSError(f"Invalid file path: {path}") @@ -154,6 +167,7 @@ def assert_not_dir(path): """ Raise an OSError if the given path is an existing directory. """ + path = _get_path(path) if is_dir(path): raise OSError(f"Invalid path, directory already exists: {path}") @@ -162,6 +176,7 @@ def assert_not_exists(path): """ Raise an OSError if the given path already exists. """ + path = _get_path(path) if exists(path): raise OSError(f"Invalid path, item already exists: {path}") @@ -170,11 +185,13 @@ def assert_not_file(path): """ Raise an OSError if the given path is an existing file. """ + path = _get_path(path) if is_file(path): raise OSError(f"Invalid path, file already exists: {path}") def _clean_dir_empty_dirs(path): + path = _get_path(path) for basepath, dirnames, _ in os.walk(path, topdown=False): for dirname in dirnames: dirpath = os.path.join(basepath, dirname) @@ -183,6 +200,7 @@ def _clean_dir_empty_dirs(path): def _clean_dir_empty_files(path): + path = _get_path(path) for basepath, _, filenames in os.walk(path, topdown=False): for filename in filenames: filepath = os.path.join(basepath, filename) @@ -194,6 +212,7 @@ def clean_dir(path, dirs=True, files=True): """ Clean a directory by removing empty directories and/or empty files. """ + path = _get_path(path) assert_dir(path) if files: _clean_dir_empty_files(path) @@ -239,6 +258,8 @@ def copy_dir(path, dest, overwrite=False, **kwargs): More informations about kwargs supported options here: https://docs.python.org/3/library/shutil.html#shutil.copytree """ + path = _get_path(path) + dest = _get_path(dest) assert_dir(path) dirname = os.path.basename(os.path.normpath(path)) dest = os.path.join(dest, dirname) @@ -254,6 +275,8 @@ def copy_dir_content(path, dest, **kwargs): More informations about kwargs supported options here: https://docs.python.org/3/library/shutil.html#shutil.copytree """ + path = _get_path(path) + dest = _get_path(dest) assert_dir(path) assert_not_file(dest) make_dirs(dest) @@ -268,6 +291,8 @@ def copy_file(path, dest, overwrite=False, **kwargs): More informations about kwargs supported options here: https://docs.python.org/3/library/shutil.html#shutil.copy2 """ + path = _get_path(path) + dest = _get_path(dest) assert_file(path) assert_not_dir(dest) if not overwrite: @@ -281,6 +306,7 @@ def create_dir(path, overwrite=False): Create directory at the given path. If overwrite is not allowed and path exists, an OSError is raised. """ + path = _get_path(path) assert_not_file(path) if not overwrite: assert_not_exists(path) @@ -292,6 +318,7 @@ def create_file(path, content="", overwrite=False): Create file with the specified content at the given path. If overwrite is not allowed and path exists, an OSError is raised. """ + path = _get_path(path) assert_not_dir(path) if not overwrite: assert_not_exists(path) @@ -305,12 +332,14 @@ def create_zip_file( Create zip file at path compressing directories/files listed in content_paths. If overwrite is allowed and dest zip already exists, it will be overwritten. """ + path = _get_path(path) assert_not_dir(path) if not overwrite: assert_not_exists(path) make_dirs_for_file(path) def _write_content_to_zip_file(file, path, basedir=""): + path = _get_path(path) assert_exists(path) if is_file(path): filename = get_filename(path) @@ -375,6 +404,7 @@ def download_file(url, dirpath=None, filename=None, chunk_size=8192, **kwargs): _require_requests_installed() # https://stackoverflow.com/a/16696317/2096218 dirpath = dirpath or tempfile.gettempdir() + dirpath = _get_path(dirpath) filename = filename or get_filename(url) or "download" filepath = join_path(dirpath, filename) assert_not_dir(dirpath) @@ -393,6 +423,7 @@ def exists(path): """ Check if a directory of a file exists at the given path. """ + path = _get_path(path) return os.path.exists(path) @@ -402,6 +433,8 @@ def extract_zip_file(path, dest, autodelete=False, content_paths=None): If autodelete, the archive will be deleted after extraction. If content_paths list is defined, only listed items will be extracted, otherwise all. """ + path = _get_path(path) + dest = _get_path(dest) assert_file(path) assert_not_file(dest) make_dirs(dest) @@ -429,6 +462,7 @@ def get_dir_creation_date(path): """ Get the directory creation date. """ + path = _get_path(path) assert_dir(path) creation_timestamp = os.path.getctime(path) creation_date = datetime.fromtimestamp(creation_timestamp) @@ -439,6 +473,7 @@ def get_dir_creation_date_formatted(path, format="%Y-%m-%d %H:%M:%S"): """ Get the directory creation date formatted using the given format. """ + path = _get_path(path) date = get_dir_creation_date(path) return date.strftime(format) @@ -448,6 +483,7 @@ def get_dir_hash(path, func="md5"): Get the hash of the directory at the given path using the specified algorithm function (md5 by default). """ + path = _get_path(path) assert_dir(path) hash = hashlib.new(func) files = search_files(path) @@ -463,6 +499,7 @@ def get_dir_last_modified_date(path): """ Get the directory last modification date. """ + path = _get_path(path) assert_dir(path) last_modified_timestamp = os.path.getmtime(path) for basepath, dirnames, filenames in os.walk(path): @@ -484,6 +521,7 @@ def get_dir_last_modified_date_formatted(path, format="%Y-%m-%d %H:%M:%S"): """ Get the directory last modification date formatted using the given format. """ + path = _get_path(path) date = get_dir_last_modified_date(path) return date.strftime(format) @@ -492,6 +530,7 @@ def get_dir_size(path): """ Get the directory size in bytes. """ + path = _get_path(path) assert_dir(path) size = 0 for basepath, _, filenames in os.walk(path): @@ -515,6 +554,7 @@ def get_file_basename(path): """ Get the file basename from the given path/url. """ + path = _get_path(path) basename, _ = split_filename(path) return basename @@ -523,6 +563,7 @@ def get_file_creation_date(path): """ Get the file creation date. """ + path = _get_path(path) assert_file(path) creation_timestamp = os.path.getctime(path) creation_date = datetime.fromtimestamp(creation_timestamp) @@ -533,6 +574,7 @@ def get_file_creation_date_formatted(path, format="%Y-%m-%d %H:%M:%S"): """ Get the file creation date formatted using the given format. """ + path = _get_path(path) date = get_file_creation_date(path) return date.strftime(format) @@ -541,6 +583,7 @@ def get_file_extension(path): """ Get the file extension from the given path/url. """ + path = _get_path(path) _, extension = split_filename(path) return extension @@ -550,6 +593,7 @@ def get_file_hash(path, func="md5"): Get the hash of the file at the given path using the specified algorithm function (md5 by default). """ + path = _get_path(path) assert_file(path) hash = hashlib.new(func) with open(path, "rb") as file: @@ -563,6 +607,7 @@ def get_file_last_modified_date(path): """ Get the file last modification date. """ + path = _get_path(path) assert_file(path) last_modified_timestamp = os.path.getmtime(path) last_modified_date = datetime.fromtimestamp(last_modified_timestamp) @@ -573,6 +618,7 @@ def get_file_last_modified_date_formatted(path, format="%Y-%m-%d %H:%M:%S"): """ Get the file last modification date formatted using the given format. """ + path = _get_path(path) date = get_file_last_modified_date(path) return date.strftime(format) @@ -581,6 +627,7 @@ def get_file_size(path): """ Get the directory size in bytes. """ + path = _get_path(path) assert_file(path) # size = os.stat(path).st_size size = os.path.getsize(path) @@ -591,6 +638,7 @@ def get_file_size_formatted(path): """ Get the directory size formatted using the right unit suffix. """ + path = _get_path(path) size = get_file_size(path) size_formatted = convert_size_bytes_to_string(size) return size_formatted @@ -600,6 +648,7 @@ def get_filename(path): """ Get the filename from the given path/url. """ + path = _get_path(path) filepath = urlsplit(path).path filename = os.path.basename(filepath) return filename @@ -609,6 +658,7 @@ def get_parent_dir(path, levels=1): """ Get the parent directory for the given path going up N levels. """ + path = _get_path(path) return join_path(path, *([os.pardir] * max(1, levels))) @@ -616,6 +666,7 @@ def get_unique_name(path, prefix="", suffix="", extension="", separator="-"): """ Gets a unique name for a directory/file ath the given directory path. """ + path = _get_path(path) assert_dir(path) name = "" while True: @@ -638,6 +689,7 @@ def is_dir(path): """ Determine whether the specified path represents an existing directory. """ + path = _get_path(path) return os.path.isdir(path) @@ -645,6 +697,7 @@ def is_empty(path): """ Determine whether the specified path represents an empty directory or an empty file. """ + path = _get_path(path) assert_exists(path) if is_dir(path): return is_empty_dir(path) @@ -655,6 +708,7 @@ def is_empty_dir(path): """ Determine whether the specified path represents an empty directory. """ + path = _get_path(path) assert_dir(path) return len(os.listdir(path)) == 0 @@ -663,6 +717,7 @@ def is_empty_file(path): """ Determine whether the specified path represents an empty file. """ + path = _get_path(path) return get_file_size(path) == 0 @@ -670,6 +725,7 @@ def is_file(path): """ Determine whether the specified path represents an existing file. """ + path = _get_path(path) return os.path.isfile(path) @@ -687,6 +743,7 @@ def join_filepath(dirpath, filename): """ Create a filepath joining the directory path and the filename. """ + dirpath = _get_path(dirpath) return join_path(dirpath, filename) @@ -696,10 +753,11 @@ def join_path(path, *paths): If path is __file__ (or a .py file), the resulting path will be relative to the directory path of the module in which it's used. """ + path = _get_path(path) basepath = path if get_file_extension(path) in ["py", "pyc", "pyo"]: basepath = os.path.dirname(os.path.realpath(path)) - paths = [path.lstrip(os.sep) for path in paths] + paths = [_get_path(path.lstrip(os.sep)) for path in paths] return os.path.normpath(os.path.join(basepath, *paths)) @@ -707,6 +765,7 @@ def list_dirs(path): """ List all directories contained at the given directory path. """ + path = _get_path(path) return _filter_paths(path, os.listdir(path), predicate=is_dir) @@ -714,6 +773,7 @@ def list_files(path): """ List all files contained at the given directory path. """ + path = _get_path(path) return _filter_paths(path, os.listdir(path), predicate=is_file) @@ -722,6 +782,7 @@ def make_dirs(path): Create the directories needed to ensure that the given path exists. If a file already exists at the given path an OSError is raised. """ + path = _get_path(path) if is_dir(path): return assert_not_file(path) @@ -733,6 +794,7 @@ def make_dirs_for_file(path): Create the directories needed to ensure that the given path exists. If a directory already exists at the given path an OSError is raised. """ + path = _get_path(path) if is_file(path): return assert_not_dir(path) @@ -747,6 +809,8 @@ def move_dir(path, dest, overwrite=False, **kwargs): More informations about kwargs supported options here: https://docs.python.org/3/library/shutil.html#shutil.move """ + path = _get_path(path) + dest = _get_path(dest) assert_dir(path) assert_not_file(dest) if not overwrite: @@ -762,6 +826,8 @@ def move_file(path, dest, overwrite=False, **kwargs): More informations about kwargs supported options here: https://docs.python.org/3/library/shutil.html#shutil.move """ + path = _get_path(path) + dest = _get_path(dest) assert_file(path) assert_not_file(dest) dest = os.path.join(dest, get_filename(path)) @@ -776,6 +842,7 @@ def read_file(path, encoding="utf-8"): """ Read the content of the file at the given path using the specified encoding. """ + path = _get_path(path) assert_file(path) content = "" with open(path, encoding=encoding) as file: @@ -806,6 +873,7 @@ def read_file_json( """ Read and decode a json encoded file at the given path. """ + path = _get_path(path) content = read_file(path) data = json.loads( content, @@ -820,6 +888,7 @@ def read_file_json( def _read_file_lines_in_range(path, line_start=0, line_end=-1, encoding="utf-8"): + path = _get_path(path) line_start_negative = line_start < 0 line_end_negative = line_end < 0 if line_start_negative or line_end_negative: @@ -847,6 +916,7 @@ def read_file_lines( It is possible to specify the line indexes (negative indexes too), very useful especially when reading large files. """ + path = _get_path(path) assert_file(path) if line_start == 0 and line_end == -1: content = read_file(path, encoding=encoding) @@ -868,6 +938,7 @@ def read_file_lines_count(path): """ Read file lines count. """ + path = _get_path(path) assert_file(path) lines_count = 0 with open(path, "rb") as file: @@ -883,6 +954,7 @@ def remove_dir(path, **kwargs): More informations about kwargs supported options here: https://docs.python.org/3/library/shutil.html#shutil.rmtree """ + path = _get_path(path) if not exists(path): return False assert_dir(path) @@ -894,6 +966,7 @@ def remove_dir_content(path): """ Removes all directory content (both sub-directories and files). """ + path = _get_path(path) assert_dir(path) remove_dirs(*list_dirs(path)) remove_files(*list_files(path)) @@ -912,6 +985,7 @@ def remove_file(path): Remove a file at the given path. If the file is removed with success returns True, otherwise False. """ + path = _get_path(path) if not exists(path): return False assert_file(path) @@ -932,6 +1006,7 @@ def rename_dir(path, name): Rename a directory with the given name. If a directory or a file with the given name already exists, an OSError is raised. """ + path = _get_path(path) assert_dir(path) comps = list(os.path.split(path)) comps[-1] = name @@ -945,6 +1020,7 @@ def rename_file(path, name): Rename a file with the given name. If a directory or a file with the given name already exists, an OSError is raised. """ + path = _get_path(path) assert_file(path) dirpath, filename = split_filepath(path) dest = join_filepath(dirpath, name) @@ -956,6 +1032,7 @@ def rename_file_basename(path, basename): """ Rename a file basename with the given basename. """ + path = _get_path(path) extension = get_file_extension(path) filename = join_filename(basename, extension) rename_file(path, filename) @@ -965,6 +1042,7 @@ def rename_file_extension(path, extension): """ Rename a file extension with the given extension. """ + path = _get_path(path) basename = get_file_basename(path) filename = join_filename(basename, extension) rename_file(path, filename) @@ -976,6 +1054,8 @@ def replace_dir(path, src, autodelete=False): If autodelete, the src directory will be removed at the end of the operation. Optimized for large files. """ + path = _get_path(path) + src = _get_path(src) assert_not_file(path) assert_dir(src) @@ -1009,6 +1089,8 @@ def replace_file(path, src, autodelete=False): If autodelete, the src file will be removed at the end of the operation. Optimized for large files. """ + path = _get_path(path) + src = _get_path(src) assert_not_dir(path) assert_file(src) if path == src: @@ -1040,6 +1122,7 @@ def _search_paths(path, pattern): """ Search all paths relative to path matching the given pattern. """ + path = _get_path(path) assert_dir(path) pathname = os.path.join(path, pattern) paths = glob.glob(pathname, recursive=True) @@ -1050,6 +1133,7 @@ def search_dirs(path, pattern="**/*"): """ Search for directories at path matching the given pattern. """ + path = _get_path(path) return _filter_paths(path, _search_paths(path, pattern), predicate=is_dir) @@ -1057,6 +1141,7 @@ def search_files(path, pattern="**/*.*"): """ Search for files at path matching the given pattern. """ + path = _get_path(path) return _filter_paths(path, _search_paths(path, pattern), predicate=is_file) @@ -1064,6 +1149,7 @@ def split_filename(path): """ Split a filename and returns its basename and extension. """ + path = _get_path(path) filename = get_filename(path) basename, extension = os.path.splitext(filename) extension = extension.replace(".", "").strip() @@ -1074,6 +1160,7 @@ def split_filepath(path): """ Split a filepath and returns its directory-path and filename. """ + path = _get_path(path) dirpath = os.path.dirname(path) filename = get_filename(path) return (dirpath, filename) @@ -1083,6 +1170,7 @@ def split_path(path): """ Split a path and returns its path-names. """ + path = _get_path(path) head, tail = os.path.split(path) names = head.split(os.sep) + [tail] names = list(filter(None, names)) @@ -1093,6 +1181,7 @@ def write_file(path, content, append=False, encoding="utf-8"): """ Write file with the specified content at the given path. """ + path = _get_path(path) assert_not_dir(path) make_dirs_for_file(path) mode = "a" if append else "w" @@ -1116,6 +1205,7 @@ def write_file_json( """ Write a json file at the given path with the specified data encoded in json format. """ + path = _get_path(path) def default_encoder(obj): if isinstance(obj, datetime):