diff --git a/.gitignore b/.gitignore index d9005f2..99a867e 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,9 @@ __pycache__/ # C extensions *.so +#Pycharm +.idea/ + # Distribution / packaging .Python build/ diff --git a/CHANGELOG.md b/CHANGELOG.md index 369ddc0..fae9c1c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,22 @@ # Changelog -All important changes to this vrfy will be documented here. +All important changes to vrfy will be documented here. + +## [0.4.0] + +### Added +- Added command line help menu "vrfy -h". + +### Changed +- Code refactor: Separate CLI and core functionality, introduce data type for result post-processing, and code simplification. +- Moved cli from vrfy into own class. +- Cli option: vrfy /path/of/master /path/of/backup no longer supported. +- Renamed "clone" to "backup" for improved clarity. +- Changed direction verification option -c to -b. +- [+] and [-] cli indicators are now based on status of backup directory + +### Fixed +- Fixed bug where directories may be dropped in directory verification. ## [0.3.1] diff --git a/README.md b/README.md index 911584f..3d14ad6 100644 --- a/README.md +++ b/README.md @@ -16,19 +16,15 @@ Install **vrfy** using pip: pip install vrfy --user ``` -## Usage +## Usage the CLI ### 1. Verifing that two directories are identical -Verifing that the contents of '/path/of/clone' are identical to those of '/path/of/master'. For example, '/path/of/master' might be a local backup, whereas '/path/of/clone' might be loaded from cloud storage. +Verifing that the contents of '/path/of/backup' are identical to those of '/path/of/master'. For example, '/path/of/master' might be a local backup, whereas '/path/of/backup' might be loaded from cloud storage. ```bash -vrfy /path/of/master /path/of/clone +vrfy -m /path/of/master -b /path/of/backup -r ``` -or +or (mismatched checksums are printed to console) ```bash -vrfy -m /path/of/master -c /path/of/clone -r -``` -or (checksums are printed to console) -```bash -vrfy -m /path/of/master -c /path/of/clone -r -p +vrfy -m /path/of/master -b /path/of/backup -r -p ``` ### 2. Storing checksums for future verification @@ -50,7 +46,7 @@ Using option **-r** (recursive) all sub-directories are verified as well: ```bash vrfy -r -v /path/of/data ``` -Using option **-p** (print) all checksums are printed to console for further inspection: +Using option **-p** (print) all mismatched checksums are printed to console for further inspection: ```bash vrfy -p -r -v /path/of/data ``` @@ -70,9 +66,47 @@ Where "expectedChecksum" can be one of the following: ### 4. Other CLI options Display version of vrfy: ```bash -vrfy -version +vrfy --version ``` Display checksum for given file: ```bash vrfy -p -f /path/of/file/filename ``` +Display help: +```bash +vrfy -h +``` + +## Using the python package +### Getting started +```python +from vrfy.vrfy import vrfy +vf = vrfy() + +# Get version string +versionStr = vf.GetVersion() + +# Verify a single file to an expected checksum +Result = vf.VerifyFile("/path/and/fileName.xyz", "expectedChecksum") + +# Verify contents of a directory to stored checksums +Result = VerifyFilesAgainstChecksums("path/to/directory") + +# Create/store checksum file for later file verification +Result = WriteChecksumFile("path/to/directory") + +# Verify that files within master and backup directories are identical +Result = VerifyFiles("path/to/directory/master", "path/to/directory/backup") +``` +where +```python +class Result: + self.Result: bool # Determines whether operation was successful (True) or not (False). + self.Path: str # Path the result object corresponds to. + self.PathError: bool # True: Path is a valid directory, False otherwise. + self.MissingFiles: list # Missing files in (backup) directory that are included in master directory / checksum list. + self.AdditionalFiles: list # Additional files in (backup) directory that are NOT included in master directory / checksum list. + self.ChecksumMismatch: list # List of files with mismachting checksums. + self.MasterChecksums: dict # Dictionary of files within master directory and their checksums. + self.BackupChecksums: dict # Dictionary of files within backup directory and their checksums. +``` \ No newline at end of file diff --git a/setup.py b/setup.py index b5f439a..700b419 100644 --- a/setup.py +++ b/setup.py @@ -7,24 +7,21 @@ long_description = file.read() setup( - name = 'vrfy', - version = '0.3.1', - description = 'Verify with VRFY: Ensure the integrity of your file copies, hash by hash!', - long_description = long_description, + name='vrfy', + version='0.4.0', + description='Verify with VRFY: Ensure the integrity of your file copies, hash by hash!', + long_description=long_description, long_description_content_type='text/markdown', - py_modules = ["vrfy"], - package_dir = {'': 'vrfy'}, + packages=['vrfy', 'vrfy.cli'], - author="BumblebeeMan (Dennis Koerner)", - author_email="dennis@bumblebeeman.dev", + author="BumblebeeMan (Dennis Koerner)", + author_email="dennis@bumblebeeman.dev", url="https://github.com/BumblebeeMan/vrfy", - #install_requires=["requests >= 2.30.0", "psutil >= 5.9.0"], - entry_points={ 'console_scripts': [ - 'vrfy = vrfy:main', + 'vrfy = vrfy.cli.vrfyCli:main', ], }, @@ -48,5 +45,6 @@ "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", - ] + "Programming Language :: Python :: 3.12", + ] ) diff --git a/vrfy/__init__.py b/vrfy/__init__.py index b672699..8b13789 100644 --- a/vrfy/__init__.py +++ b/vrfy/__init__.py @@ -1,3 +1 @@ -#!/usr/bin/env python3 -from vrfy import vrfy diff --git a/vrfy/cli/__init__.py b/vrfy/cli/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/vrfy/cli/__init__.py @@ -0,0 +1 @@ + diff --git a/vrfy/cli/vrfyCli.py b/vrfy/cli/vrfyCli.py new file mode 100644 index 0000000..d2e7f71 --- /dev/null +++ b/vrfy/cli/vrfyCli.py @@ -0,0 +1,245 @@ +#!/usr/bin/env python3 +from vrfy.vrfy import vrfy +import sys +import os +import argparse +from argparse import RawTextHelpFormatter +import pathlib +from inspect import signature + + +class vrfyCli: + def __init__(self): + pass + + def parseArgumentsAndExecute(self, arguments: list) -> int: + """ + Decodes program arguments and executes required methods. + + Parameters: + arguments (List[str]): List of arguments provided by user. + + Returns: + int: 0, when all execution steps resulted in PASS, else 1. + """ + # decode options from argument list + parser = argparse.ArgumentParser( + description="Verify with VRFY: Ensure the integrity of your file copies, hash by hash!", + formatter_class=RawTextHelpFormatter) + parser.add_argument("-ver", "--version", action="store_true", help="Print version string") + parser.add_argument("-r", "--recursive", action="store_true", help="Recursive operation") + parser.add_argument("-p", "--print", action="store_true", help="Print mismatched checksums") + + filevrfy = parser.add_argument_group('File verification', + 'Verify a single file against an expected') + filevrfy.add_argument("-f", "--file", type=argparse.FileType('rb'), help="File to verify") + filevrfy.add_argument("-cs", "--checksum", type=str, + help="Checksum string or sums.csv/*.sha256-file") + + csvrfy = parser.add_argument_group('Checksum verification', 'Verify files against stored checksums.' + '\nError indicators:' + '\n\t[+]: Additional files in directory that are missing in checksum list.' + '\n\t[-]: Files that are included in checksum list, but missing in directory.' + '\n\t[MISMATCH]: Checksums mismatch.') + mcsvrfy = csvrfy.add_mutually_exclusive_group() # required=True) + mcsvrfy.add_argument("-v", "--verify", type=pathlib.Path, dest='VERIFY_PATH', + help="Path to files for verification") + mcsvrfy.add_argument("-c", "--create", type=pathlib.Path, dest='CREATE_PATH', + help="Path to files to create checksums for") + + dirvrfy = parser.add_argument_group('Directory verification', 'Verify files against a known good master copy.' + '\nError indicators:' + '\n\t[+]: Additional files/directories in backup that are missing in master. ' + '\n\t[-]: Files/directories that are included in master, but missing in backup.' + '\n\t[MISMATCH]: Checksums mismatch.') + dirvrfy.add_argument("-m", "--master", type=pathlib.Path, dest='MASTER_PATH', help="Path to master directory") + dirvrfy.add_argument("-b", "--backup", type=pathlib.Path, dest='BACKUP_PATH', + help="Path to backup directory") + + args = parser.parse_args() + + # mutually exclude directory and file verification mode + f = (args.file is not None or args.checksum is not None) + d = (args.MASTER_PATH is not None or args.BACKUP_PATH is not None) + vp = (args.VERIFY_PATH is not None) + if (f + d + vp) > 1: + print("ERROR: Verification modes can NOT be mixed.") + return 1 + + # check parameters for file verification mode + if (args.MASTER_PATH is not None and args.BACKUP_PATH is None) or ( + args.MASTER_PATH is None and args.BACKUP_PATH is not None): + print("ERROR: Parameter missing for 'directory verification'.") + return 1 + + self.OPTION_RECURSIVE = args.recursive + self.OPTION_PRINT = args.print + + vf = vrfy() + # execute decoded options + executionResult = False + + # cli option: vrfy -version + if args.version: + print("vrfy version: " + str(vf.GetVersion())) + + # cli option: vrfy + elif len(arguments) == 0: + # no arguments are provided -> verify checksums of files within current working directory + self.OPTION_VERIFY_CSV = True + self.OPTION_RECURSIVE = True + # verify sums + print("Verifying current working directory against checksums:") + executionResult = self.__walker__(os.getcwd(), os.getcwd(), vf.VerifyFilesAgainstChecksums) + self.__printOverallResult__(executionResult) + + # cli option: vrfy -m <> -c <> + elif args.MASTER_PATH is not None and args.BACKUP_PATH is not None: + if os.path.isdir(args.MASTER_PATH) and os.path.isdir(args.BACKUP_PATH): + print("Verifying directories:") + print("Master: " + str(args.MASTER_PATH)) + print("Backup: " + str(args.BACKUP_PATH)) + self.OPTION_RECURSIVE = True + executionResult = self.__walker__(str(args.MASTER_PATH), str(args.BACKUP_PATH), vf.VerifyFiles) + self.__printOverallResult__(executionResult) + else: + print("ERROR: Unvalid argument " + str(args.MASTER_PATH) + " or " + str(args.BACKUP_PATH)) + + # cli option: vrfy -f <> -cs <> OR vrfy -p -f <> OR vrfy -p -f <> -cs <> + elif args.file is not None: + if os.path.isfile(args.file): + if args.checksum is not None: + res = vf.VerifyFile(args.file.name, args.checksum) + else: + res = vf.VerifyFile(args.file.name, "") + executionResult = res.Result + path, filename = os.path.split(args.file.name) + calcChecksum = res.MasterChecksums[filename] + if self.OPTION_PRINT: + print(calcChecksum + " " + str(filename)) + else: + self.__printOverallResult__(executionResult) + else: + print("ERROR: Unvalid argument " + str(args.file)) + + # cli option: vrfy -c <> + elif args.CREATE_PATH is not None: + if os.path.isdir(args.CREATE_PATH): + # create sums + print("Creating checksums for files:") + executionResult = self.__walker__(str(args.CREATE_PATH), str(args.CREATE_PATH), vf.WriteChecksumFile) + self.__printOverallResult__(executionResult) + else: + print("ERROR: Unvalid argument " + str(args.CREATE_PATH)) + + # cli option: vrfy -v <> + elif args.VERIFY_PATH is not None: + if os.path.isdir(args.VERIFY_PATH): + # verify sums + print("Verifying files against checksums:") + executionResult = self.__walker__(str(args.VERIFY_PATH), str(args.VERIFY_PATH), + vf.VerifyFilesAgainstChecksums) + self.__printOverallResult__(executionResult) + else: + print("ERROR: Unvalid argument " + str(args.VERIFY_PATH)) + + else: + print("No valid argument setting found!") + return 1 + if executionResult: + return 0 + else: + return 1 + + def __walker__(self, pathMaster: str, pathBackup: str, func) -> bool: + """ + Verifies the contents of directory "pathMaster" against the included checksums in sums.csv. + + Parameters: + pathMaster (str): Path to the master directory whose contents are considered valid and unchanged, serving as + a baseline for comparison. . + pathBackup (str): Path to backup directory whose files shall get verified against the master copy. + func (callable) -- Function that gets executed on the respective folder. + + Returns: + bool: True, when all executions of >>func<< returned PASS, else False. + """ + # check if received strings are valid paths + # create lists of files and directories that are included in current path + filesM = filesC = dictsM = dictsC = [] + if os.path.isdir(pathMaster): + filesM = [entryA for entryA in os.listdir(pathMaster) if + not os.path.isdir(os.path.join(pathMaster, entryA))] + dictsM = [entryDir for entryDir in os.listdir(pathMaster) if + os.path.isdir(os.path.join(pathMaster, entryDir))] + if os.path.isdir(pathBackup): + filesC = [entryB for entryB in os.listdir(pathBackup) if not os.path.isdir(os.path.join(pathBackup, entryB))] + dictsC = [entryDir for entryDir in os.listdir(pathBackup) if + os.path.isdir(os.path.join(pathBackup, entryDir))] + + combinedDicts = list(set(dictsM + dictsC)) + + if os.path.isdir(pathMaster) and os.path.isdir(pathBackup): + print(pathMaster, end=" : ", flush=True) + # execute requested operation + numParam = len(signature(func).parameters) + if numParam == 2: + resultObject = func(pathMaster, pathBackup) + elif numParam == 1: + resultObject = func(pathBackup) + else: + return False + else: + if os.path.isdir(pathMaster): + print("[-] " + pathMaster, end=" : ", flush=True) + #resultObject = vrfy.Result(False, pathMaster, additionalMaster=filesM, missingMaster=filesC) + resultObject = func(pathMaster, pathBackup) + if os.path.isdir(pathBackup): + print("[+] " + pathBackup, end=" : ", flush=True) + #resultObject = vrfy.Result(False, pathBackup, additionalMaster=filesM, missingMaster=filesC) + resultObject = func(pathMaster, pathBackup) + resultVerify = resultObject.Result + # ResultList.append(resultObject) + self.__printResult__(resultObject) + + # jump into child directories, if recursive operation is requested + if self.OPTION_RECURSIVE: + for nextFolder in combinedDicts: + resultVerify = self.__walker__(os.path.join(pathMaster, nextFolder), + os.path.join(pathBackup, nextFolder), func) & resultVerify + + return resultVerify + + def __printResult__(self, result) -> None: + if result.Result: + print("PASS") + else: + print("FAILED!!!") + for file in result.ChecksumMismatch: + print("[MISMATCH] " + str(file)) + if self.OPTION_PRINT: + print("- Master: " + result.MasterChecksums[file]) + print("- Backup: " + result.BackupChecksums[file]) + for file in result.AdditionalFiles: + print("[+] " + str(file)) + if self.OPTION_PRINT: + print("- cs: " + result.BackupChecksums[file]) + for file in result.MissingFiles: + print("[-] " + str(file)) + if self.OPTION_PRINT: + print("- cs: " + result.MasterChecksums[file]) + + def __printOverallResult__(self, res: bool): + if res: + print("Overall: PASS") + else: + print("Overall: FAIL") + + +def main(): + verify = vrfyCli() + sys.exit(verify.parseArgumentsAndExecute(sys.argv[1:])) + + +if __name__ == "__main__": + main() diff --git a/vrfy/vrfy.py b/vrfy/vrfy.py index f194cc1..dfd74d8 100755 --- a/vrfy/vrfy.py +++ b/vrfy/vrfy.py @@ -1,535 +1,385 @@ -#!/usr/bin/env python3 +#!/usr/bin/env python3 +import os +import hashlib + + class vrfy: - import os - import subprocess - - VERSION_STR = "0.3.1" - - #options - GLOBAL_VERBOSITY = None - CREATE_CSV = "-c" - VERIFY_CSV = "-v" - RECURSIVE = "-r" - VERSION = "-version" - PRINT = "-p" - FILE = "-f" - CHECKSUM = "-cs" - MERGE = "-merge" - MASTER_DIR = "-m" - CLONE_DIR = "-c" - MERGE_MASTER_TO_CLONE = "-MergeMasterToClone" - MERGE_CLONE_TO_MASTER = "-MergeCloneToMaster" - MERGE_MIRRORED = "-MergeMirrored" - OPTION_RECURSIVE = False - OPTION_CREATE_CSV = False - OPTION_VERIFY_CSV = False - OPTION_VERIFY_VERSION = False - OPTION_PRINT = False - OPTION_FILE = -1 - OPTION_CHECKSUM = -1 - OPTION_MASTER_DIR = -1 - OPTION_CLONE_DIR = -1 - OPTION_MERGE = "" - + class Result: + def __init__(self, result, path, pathError = False, missingFiles=[], additionalFiles=[], ChecksumMismatch=[], + masterChecksums=dict(), backupChecksums=dict()): + self.Result = result + self.Path = path + self.PathError = pathError + self.MissingFiles = missingFiles + self.AdditionalFiles = additionalFiles + self.ChecksumMismatch = ChecksumMismatch + self.MasterChecksums = masterChecksums + self.BackupChecksums = backupChecksums + + + VERSION_STR = "0.4.0" HASH_ERROR = "ERROR" - LINE_BREAK = "-----------------------------------------" - CHAPTER_BREAK = "=========================================" - - + def __init__(self): pass - - - def parseArgumentsAndExecute(self, arguments): + + def GetVersion(self) -> str: + """ + Returns version string. + + Returns: + str: Returns version string. + """ + return self.VERSION_STR + + def VerifyFile(self, filePath: str, expectedChecksum: str = "") -> Result: """ - Decodes programm arguments and executes required methods. - + Verifies the contents of file "filePath" against the checksum provided with "expectedChecksum". + Parameters: - arguments (List[str]): List of arguments provided by user. - + filePath (str): Path + name to the file that should be verified. + expectedChecksum (str): Checksum value or path + file name of *.sha256-/sums.csv file. + Returns: - int: 0, when all execution steps resulted in PASS, else 1. + result, calculatedChecksum (bool, str): Tuple. Result of verification and calculated sum of "filePath". """ - # find options - directories = [] - for index in range(0, len(arguments)): - if arguments[index] == self.RECURSIVE: - self.OPTION_RECURSIVE = True - if arguments[index] == self.CREATE_CSV: - self.OPTION_CREATE_CSV = True - if arguments[index] == self.VERIFY_CSV: - self.OPTION_VERIFY_CSV = True - if self.os.path.isdir(arguments[index]): - directories.append(arguments[index]) - if arguments[index] == self.VERSION: - self.OPTION_VERIFY_VERSION = True - if arguments[index] == self.PRINT: - self.OPTION_PRINT = True - if arguments[index] == self.CHECKSUM: - if (len(arguments)-2) <= index + 1: # index + 1 shall be second to last or earlier - self.OPTION_CHECKSUM = index + 1 - else: - print("Error: Option '" + str(self.CHECKSUM) + "' provided, but '" + str(self.CHECKSUM)+ "' is not followed by checksum to test.") - if arguments[index] == self.FILE: - if self.os.path.isfile(arguments[index + 1]): - self.OPTION_FILE = index + 1 - else: - print("Error: Option '" + str(self.FILE) + "' provided, but '" + str(arguments[index + 1]) + "' is no file.") - if arguments[index] == self.MASTER_DIR: - if self.os.path.isdir(arguments[index + 1]): - self.OPTION_MASTER_DIR = index + 1 - else: - print("Error: Option '" + str(self.MASTER_DIR) + "' provided, but '" + str(arguments[index + 1]) + "' is no directory.") - if arguments[index] == self.CLONE_DIR: - if self.os.path.isdir(arguments[index + 1]): - self.OPTION_CLONE_DIR = index + 1 - else: - print("Error: Option '" + str(self.CLONE_DIR) + "' provided, but '" + str(arguments[index + 1]) + "' is no directory.") - if arguments[index] == self.MERGE_MASTER_TO_CLONE or arguments[index] == self.MERGE_CLONE_TO_MASTER or arguments[index] == self.MERGE_MIRRORED: - self.OPTION_MERGE = arguments[index] - - executionResult = False - # cli option: vrfy -version - if self.OPTION_VERIFY_VERSION == True: - print("vrfy version: " + str(self.VERSION_STR)) - # cli option: vrfy - elif len(arguments) == 0: - # no arguments are provided -> verify checksums of files within current working directory - import os - directories.append(os.getcwd()) - self.OPTION_VERIFY_CSV = True - self.OPTION_RECURSIVE = True - # verify sums - print("Verifying current working directory against checksums:") - executionResult = self.walker(directories[0], directories[0], self.verifySums) - self.__printResults__(executionResult) - # cli option: vrfy <> <> - elif (len(arguments) == 2 and self.os.path.isdir(arguments[0]) and self.os.path.isdir(arguments[1])): - # only two paths are provided -> first: golden master / second: clone/copy to be verified - print("Verifying directories:") - print("Master: " + str(arguments[0])) - print("Clone: " + str(arguments[1])) - self.OPTION_RECURSIVE = True - executionResult = self.walker(arguments[0], arguments[1], self.verifyFiles) - self.__printResults__(executionResult) - # cli option: vrfy -m <> -c <> - elif self.OPTION_MASTER_DIR >= 0 and self.OPTION_MASTER_DIR <= len(arguments) - 1 and self.OPTION_CLONE_DIR >= 0 and self.OPTION_CLONE_DIR <= len(arguments) - 1: - print("Verifying directories:") - print("Master: " + str(arguments[self.OPTION_MASTER_DIR])) - print("Clone: " + str(arguments[self.OPTION_CLONE_DIR])) - self.OPTION_RECURSIVE = True - executionResult = self.walker(arguments[self.OPTION_MASTER_DIR], arguments[self.OPTION_CLONE_DIR], self.verifyFiles) - self.__printResults__(executionResult) - # cli option: vrfy -f <> -cs <> OR vrfy -p -f <> OR vrfy -p -f <> -cs <> - elif self.OPTION_FILE >= 0 and self.OPTION_FILE <= len(arguments) - 1: - calcChecksum = self.calcChecksum(arguments[self.OPTION_FILE]) - if self.OPTION_PRINT == True: - name, extension = self.os.path.splitext(self.os.path.basename(arguments[self.OPTION_FILE])) - print(str(calcChecksum) + " " + str(name) + str(extension)) - executionResult = True - if self.OPTION_CHECKSUM >= 0 and self.OPTION_CHECKSUM <= len(arguments) - 1: - givenChecksum = arguments[self.OPTION_CHECKSUM] - if self.os.path.isfile(arguments[self.OPTION_CHECKSUM]): - try: - path, filename = self.os.path.split(arguments[self.OPTION_FILE]) - sumsDict = self.getChecksumsFromFile(arguments[self.OPTION_CHECKSUM]) - givenChecksum = sumsDict[filename] - except: - givenChecksum = "" - executionResult = False - print("Error: Option '" + str(self.CHECKSUM) + "' provided, but no checksum found in file" + str(arguments[self.OPTION_CHECKSUM]) + ".") - if calcChecksum == givenChecksum: - executionResult = True - else: - executionResult = False - self.__printResults__(executionResult) - # cli option: vrfy -c <> - elif self.OPTION_CREATE_CSV == True: - if len(directories) == 1: - # create sums - print("Creating checksums for files:") - executionResult = self.walker(directories[0], directories[0], self.createSums) - self.__printResults__(executionResult) - else: - print("Error: Option '" + str(self.VERIFY_CSV) + "' provided, but '" + str(len(directories)) + "' directories are given (required: 1).") - # cli option: vrfy -v <> - elif self.OPTION_VERIFY_CSV == True: - if len(directories) == 1: - # verify sums - print("Verifying files against checksums:") - executionResult = self.walker(directories[0], directories[0], self.verifySums) - self.__printResults__(executionResult) + # start file verification, when filepath is valid + if os.path.isfile(filePath): + masterHashDict = dict() + expHashDict = dict() + checksumErrors = [] + + resultVerify = True + expectation = str(expectedChecksum) + calcChecksum = self.__calcChecksum__(filePath) + path, filename = os.path.split(filePath) + # no hex checksum provided, read sums.csv / *.sha256sums-file + if os.path.isfile(expectedChecksum): + try: + sumsDict = self.__getChecksumsFromFile__(expectedChecksum) + expectation = sumsDict[filename] + except Exception: + resultVerify = False + # store calculated and expected checksum + masterHashDict[filename] = calcChecksum + expHashDict[filename] = expectation + # verify file checksum + if calcChecksum == expectation: + pass else: - print("Error: Option '" + str(self.VERIFY_CSV) + "' provided, but '" + str(len(directories)) + "' directories are given (required: 1).") - else: - print("No valid argument setting found!") - return 1 - if executionResult == True: - return 0 + resultVerify = False + checksumErrors.append(filename) + + return self.Result(result=resultVerify, path=path, ChecksumMismatch=checksumErrors, + masterChecksums=masterHashDict, backupChecksums=expHashDict) else: - return 1 - - def __printResults__(self, res): - if res: - print("Overall: PASS") + # stop execution, since filepath is NOT valid + return self.Result(result=False, path=filePath, pathError=True) + + def VerifyFilesAgainstChecksums(self, path: str) -> Result: + """ + Verifies the contents of directory >>path<< against the included checksums in sums.csv. + + Parameters: + path (str): Path to directory whose files shall get verified. + + Returns: + vrfy.Result: Results result object of type vrfy.Result. + """ + # start file verification, when path is valid + if os.path.isdir(path): + # determine available files in path -> list of file names + files = [entryB for entryB in os.listdir(path) if + not os.path.isdir(os.path.join(path, entryB))] + + resultVerify = True + fileHashDict = dict() + + # check if checksum file is available, if not abort execution + if "sums.csv" not in files and len(files) > 0: + # calculate has values for additional files + for file in files: + fileHashDict[file] = self.__calcChecksum__(os.path.join(path, file)) + return self.Result(result=False, path=path, additionalFiles=files, + masterChecksums=fileHashDict) + + if len(files) > 0: + # do not try to verify "sums.csv" as it will not be included in "sums.csv" + if "sums.csv" in files: + # save hashvalues for later analysis + fileHashDict["sums.csv"] = self.__calcChecksum__(os.path.join(path, "sums.csv")) + files.remove("sums.csv") + + # read checksum file and get dictionary + sumsDict = self.__readSumsCsvFile__(path) + + checksumErrors = [] + # iterate through all files and compare their checksum with those stored in sums.csv + for file in sumsDict.keys(): + if file in files: + checksumCalc = self.__calcChecksum__(os.path.join(path, file)) + checksumSaved = sumsDict[file] + # save hashvalues for later analysis + fileHashDict[file] = checksumCalc + # verify calculated checksum against the one stored in sums.csv + if checksumCalc == checksumSaved: + pass + else: + # checksum mismatch -> set error flag and add file name to list of mismatched files + checksumErrors.append(file) + resultVerify = False + else: + # file is included in sums.csv, but not in directory + resultVerify = False + + # verify that all files in current working directory are included in sums.csv, and vice versa + # set error flag if check failed + missingItemsInSumsCSV = [i for i in files if i not in sumsDict.keys()] + additionalItemsInSumsCSV = [i for i in sumsDict.keys() if i not in files] + if len(missingItemsInSumsCSV) != 0 or len(additionalItemsInSumsCSV) != 0: + resultVerify = False + + # calculate has values for additional files + for file in missingItemsInSumsCSV: + fileHashDict[file] = self.__calcChecksum__(os.path.join(path, file)) + + return self.Result(result=resultVerify, path=path, missingFiles=additionalItemsInSumsCSV, + additionalFiles=missingItemsInSumsCSV, ChecksumMismatch=checksumErrors, + masterChecksums=fileHashDict, backupChecksums=sumsDict) + else: + # return with True, in case no files needed to be verifed + return self.Result(result=True, path=path) else: - print("Overall: FAIL") - - - def calcChecksum(self, filePath): + # stop execution, since path is NOT valid + return self.Result(result=True, path=path, pathError=True) + + def WriteChecksumFile(self, path: str) -> Result: """ - Calculates and returns file hash for >>filePath<<. + Creates file sums.csv with checksums for files in >>path<<. Parameters: - filePath (str): Path and name of the file that shall get hashed. - + path (str): Path to directory whose files shall get hashed and checksums stored in sums.csv. + Returns: - str: Hash digest. + vrfy.Result: Results result object of type vrfy.Result. """ - import hashlib - sha256_hash = hashlib.sha256() - try: - with open(filePath, 'rb') as file: - while True: - block = file.read(8192) - if not block: - break - sha256_hash.update(block) - except: - #print("ERROR: Unable to calculate SHA256 hash.") - return self.HASH_ERROR - return sha256_hash.hexdigest() - - - def verifyFiles(self, pathMaster, filesMaster, pathClone, filesClone): + # start checksum creation, when path is valid + if os.path.isdir(path): + # determine available files in path -> list of file names + files = [entryB for entryB in os.listdir(path) if + not os.path.isdir(os.path.join(path, entryB))] + # create sums.csv, if directory contains files + result = True + hashErrors = [] + if len(files) > 0: + try: + with open(os.path.join(path, "sums.csv"), "w") as f: + # do not hash "sums.csv" itself + if "sums.csv" in files: + files.remove("sums.csv") + # iterate through all files of current directory and add their names and checksums to "sums.csv" + for file in files: + hash_digest = self.__calcChecksum__(os.path.join(path, file)) + # only add files without checksum errors or set error flag, when checksum calculation + # failed + if hash_digest != self.HASH_ERROR: + f.write(str(file) + ";" + str(hash_digest) + "\n") + else: + result = False + hashErrors.append(str(file)) + except OSError: + return self.Result(result=False, path=path, ChecksumMismatch=hashErrors) + return self.Result(result=result, path=path, ChecksumMismatch=hashErrors) + else: + # stop execution, since path is NOT valid + return self.Result(result=False, path=path, pathError=True) + + def VerifyFiles(self, pathMaster: str, pathBackup: str) -> Result: """ - Verifies the contents of directory "pathMaster" against the contents of "pathClone" based on the respective file checksums. - + Verifies the contents of directory "pathMaster" against the contents of "pathBackup" based on the respective + file checksums. + Parameters: - pathMaster (str): Path to the master directory whose contents are considered valid and unchanged, serving as a baseline for comparison. - filesMaster (List[str]): Names of all files that are included in the directory >>pathMaster<<. - pathClone (str): Path to clone directory whose files shall get verified against the master copy. - filesClone (List[str]): Names of all files that are included in the directory >>pathClone<<. - + pathMaster (str): Path to the master directory whose contents are considered valid and unchanged, serving as + a baseline for comparison. + pathBackup (str): Path to backup directory whose files shall get verified against the master copy. + Returns: - bool: True, when contents of directory "pathMaster" are verified successfully against "pathClone". - False, when at least one file mismatches. + vrfy.Result: Results result object of type vrfy.Result. """ + # stop execution if both path are invalid + if not os.path.isdir(pathMaster) and not os.path.isdir(pathBackup): + return self.Result(result=False, path=pathMaster, pathError=True) + result = True - # start file verification, when files are included in directory - if len(filesMaster) > 0: - # print current working directory - if self.OPTION_PRINT == True: - print(str(pathMaster) + " : ") - else: - print(str(pathMaster), end=" : ", flush=True) - - # search for missing or additional files in master and clone, and trigger error condition - missingItemsInPathClone = [i for i in filesMaster if i not in filesClone] - additionalItemsInPathClone = [i for i in filesClone if i not in filesMaster] - if len(missingItemsInPathClone) != 0: - result = False - if len(additionalItemsInPathClone) != 0: + masterHashDict = dict() + backupHashDict = dict() + missingItemsInPathBackup = [] + additionalItemsInPathBackup = [] + checksumErrors = [] + + # stop execution, if backup path is invalid + if os.path.isdir(pathMaster) and not os.path.isdir(pathBackup): + filesMaster = [entryB for entryB in os.listdir(pathMaster) if + not os.path.isdir(os.path.join(pathMaster, entryB))] + for file in filesMaster: + masterFilePath = os.path.join(pathMaster, file) + checksumMaster = self.__calcChecksum__(masterFilePath) + masterHashDict[file] = checksumMaster + return self.Result(result=False, path=pathMaster, pathError=True, missingFiles=filesMaster, + additionalFiles=[], ChecksumMismatch=[], + masterChecksums=masterHashDict, backupChecksums=dict()) + + # stop execution, if master path is invalid + if os.path.isdir(pathBackup) and not os.path.isdir(pathMaster): + filesBackup = [entryB for entryB in os.listdir(pathBackup) if + not os.path.isdir(os.path.join(pathBackup, entryB))] + for file in filesBackup: + backupFilePath = os.path.join(pathBackup, file) + checksumBackup = self.__calcChecksum__(backupFilePath) + backupHashDict[file] = checksumBackup + return self.Result(result=False, path=pathMaster, pathError=True, missingFiles=[], + additionalFiles=filesBackup, ChecksumMismatch=[], + masterChecksums=dict(), backupChecksums=backupHashDict) + + # below: both paths are valid + # determine available files in master and backup paths -> lists of file names + filesMaster = [entryB for entryB in os.listdir(pathMaster) if + not os.path.isdir(os.path.join(pathMaster, entryB))] + filesBackup = [entryB for entryB in os.listdir(pathBackup) if + not os.path.isdir(os.path.join(pathBackup, entryB))] + + # start file verification, when files are included in master directory + # iterate through all files of master directory and verify their checksums to those of the backup directory + # not executed, when no entires in filesMaster list + for fileNameMaster in filesMaster: + # master file is not included on backup directory -> set error flag + if fileNameMaster not in filesBackup: result = False - - # iterate through all files of master directory and verify their checksums to those of the clone directory - checksumErrors = [] - for fileNameMaster in filesMaster: - # master file is not included on clone directory -> trigger error condition - if fileNameMaster not in filesClone: - result = False - else: - # calculate checksums for master and clone file - masterFilePath = self.os.path.join(pathMaster, fileNameMaster) - cloneFilePath = self.os.path.join(pathClone, fileNameMaster) - checksumMaster = self.calcChecksum(masterFilePath) - checksumClone = self.calcChecksum(cloneFilePath) - # print checksums if requested by user - if self.OPTION_PRINT == True: - print("File: " + fileNameMaster) - print("Master: " + checksumMaster) - print("Clone: " + checksumClone) - # verify that both match and both are NOT "HASH_ERROR" - if checksumClone == checksumMaster and (checksumMaster != self.HASH_ERROR): - if self.OPTION_PRINT == True: - print("Check: PASS") - print(self.LINE_BREAK) - else: - # checksum mismatch -> trigger error condition and add file name to list of mismatched files - if self.OPTION_PRINT == True: - print("Check: FAIL") - print(self.LINE_BREAK) - checksumErrors.append(str(fileNameMaster)) - result = False - - if self.OPTION_PRINT == True: - print("" + str(pathMaster), end=" : ", flush=True) - # append result to printed working directory - if result == True: - print("PASS") else: - print("FAILED!!!") - # print failed items - for file in checksumErrors: - print("[MISMATCH] " + str(file)) - for file in missingItemsInPathClone: - print("[+] " + str(file)) - for file in additionalItemsInPathClone: - print("[-] " + str(file)) - if self.OPTION_PRINT == True: - print(self.CHAPTER_BREAK) - return result - - - def createSums(self, pathMaster, filesMaster, pathClone, filesClone): + # calculate checksums for master and backup file + masterFilePath = os.path.join(pathMaster, fileNameMaster) + backupFilePath = os.path.join(pathBackup, fileNameMaster) + checksumMaster = self.__calcChecksum__(masterFilePath) + checksumBackup = self.__calcChecksum__(backupFilePath) + # save hashvalues for later analysis + masterHashDict[fileNameMaster] = checksumMaster + backupHashDict[fileNameMaster] = checksumBackup + # verify that both match and both are NOT "HASH_ERROR" + if checksumBackup == checksumMaster and (checksumMaster != self.HASH_ERROR): + pass + else: + # checksum mismatch -> set error flag and add file name to list of mismatched files + checksumErrors.append(str(fileNameMaster)) + result = False + + # search for missing or additional files in master and backup, and set error flag + missingItemsInPathBackup = [i for i in filesMaster if i not in filesBackup] + additionalItemsInPathBackup = [i for i in filesBackup if i not in filesMaster] + if len(missingItemsInPathBackup) != 0 or len(additionalItemsInPathBackup) != 0: + result = False + + # calculate checksums for additional and missing files + for missingBackup in missingItemsInPathBackup: + masterFilePath = os.path.join(pathMaster, missingBackup) + checksumMaster = self.__calcChecksum__(masterFilePath) + masterHashDict[missingBackup] = checksumMaster + for additionalBackup in additionalItemsInPathBackup: + backupFilePath = os.path.join(pathBackup, additionalBackup) + checksumBackup = self.__calcChecksum__(backupFilePath) + backupHashDict[additionalBackup] = checksumBackup + + return self.Result(result=result, path=pathMaster, missingFiles=missingItemsInPathBackup, + additionalFiles=additionalItemsInPathBackup, ChecksumMismatch=checksumErrors, + masterChecksums=masterHashDict, backupChecksums=backupHashDict) + + def __calcChecksum__(self, filePath: str) -> str: """ - Creates file sums.csv with checksums for files [filesMaster] in >>pathMaster<<. - + Calculates and returns file hash for >>filePath<<. + Parameters: - pathMaster (str): Path to directory whose files shall get hashed and checksums stored in sums.csv. - filesMaster (List[str]): Names of all files that are included in the directory >>pathMaster<<. - pathClone (str): NOT USED! Included for compatibility reasons only. - filesClone (List[str]): NOT USED! Included for compatibility reasons only. - + filePath (str): Path and name of the file that shall get hashed. + Returns: - bool: True, when file sums.csv is created successfully, else False. + str: Hash digest. """ - #create sums.csv, if directory contains files - result = True - if len(filesMaster) > 0: - # print current working directory - print("" + str(pathMaster), end=" : ", flush=True) - try: - f = open(self.os.path.join(pathMaster, "sums.csv"), "w") - # do not hash "sums.csv" itself - if "sums.csv" in filesMaster: - filesMaster.remove("sums.csv") - # iterate through all files of current directory and add their names and checksums to "sums.csv" - for file in filesMaster: - hash_digest = str(self.calcChecksum(self.os.path.join(pathMaster, file))) - # only add files without checksum errors and trigger error condition, when checksum calculation failed - if hash_digest != self.HASH_ERROR: - f.write(str(file) + ";" + str(hash_digest) + "\n") - else: - result = False - f.close() - print("PASS") - except: - print("FAILED!!!") - return False - return result - + sha256_hash = hashlib.sha256() + try: + with open(filePath, 'rb') as f: + while True: + block = f.read(8192) + if not block: + break + sha256_hash.update(block) + except OSError: + # print("ERROR: Unable to calculate SHA256 hash.") + return self.HASH_ERROR + return str(sha256_hash.hexdigest()) - def getChecksumsFromFile(self, filePathName): + def __getChecksumsFromFile__(self, filePathName: str) -> dict: """ Reads and decodes checksums from file and returns a filename / hash digest dictionary. Note: To be used, when undefined whether *.sha256sum or sums.csv was provided by user. Parameters: filePathName (str): Path + file name to directory where *.sha256sum/sums.csv-file is located. - + Returns: dict: dict[filename] = hash digest. """ - path, filename = self.os.path.split(filePathName) - name, extension = self.os.path.splitext(self.os.path.basename(filePathName)) + path, filename = os.path.split(filePathName) + name, extension = os.path.splitext(os.path.basename(filePathName)) if extension == ".sha256sum": - return self.readSha256SumFile(path, filename) + return self.__readSha256SumFile__(path, filename) elif filename == "sums.csv": - return self.readSumsCsvFile(path) + return self.__readSumsCsvFile__(path) else: return dict() - - - def readSha256SumFile(self, filePath, fileName): + + def __readSha256SumFile__(self, filePath: str, fileName: str) -> dict: """ Reads and decodes *.sha256sum-files and returns a filename / hash digest dictionary. Parameters: filePath (str): Path to directory where *.sha256sum-file is located. fileName (str): Name of *.sha256sum-file with file extension. - + Returns: dict: dict[filename] = hash digest. """ sumsDict = dict() # read and decode sums.csv into dictionary sumsDict[<>] = <> try: - f = open(self.os.path.join(filePath, fileName), "r") - for line in f.readlines(): - entry = line.replace("\n","").split(" ") - sumsDict[entry[1]] = entry[0] - f.close() - except: + with open(os.path.join(filePath, fileName), "r") as f: + for line in f.readlines(): + entry = line.replace("\n", "").split(" ") + sumsDict[entry[1]] = entry[0] + except OSError: # except file errors, and close verification with FAIL (i.e. "False" result) - print("\n>>> ERROR: No *.sha256sum-file found!") - return sumsDict - - - def readSumsCsvFile(self, filePath): + return dict() + return sumsDict + + def __readSumsCsvFile__(self, filePath: str) -> dict: """ Reads and decodes sums.csv-files and returns a filename / hash digest dictionary. Parameters: filePath (str): Path to directory where sums.csv-file is located. - + Returns: dict: dict[filename] = hash digest. """ # read and decode sums.csv into dictionary sumsDict[<>] = <> sumsDict = dict() try: - f = open(self.os.path.join(filePath, "sums.csv"), "r") - for line in f.readlines(): - entry = line.replace("\n","").split(";") - # compatibility layer for legacy sums.csv, where hash digest started with "b'" and ended with "'" - if entry[1][:2] == "b'" and entry[1][-1] == "'": - entry[1] = entry[1][2:-1] - sumsDict[entry[0]] = entry[1] - f.close() - except: + with open(os.path.join(filePath, "sums.csv"), "r") as f: + for line in f.readlines(): + entry = line.replace("\n", "").split(";") + # compatibility layer for legacy sums.csv, where hash digest started with "b'" and ended with "'" + if entry[1][:2] == "b'" and entry[1][-1] == "'": + entry[1] = entry[1][2:-1] + sumsDict[entry[0]] = entry[1] + except OSError: # except file errors, and close verification with FAIL (i.e. "False" result) - print("\n>>> ERROR: No sums.csv found!") - return False + return dict() return sumsDict - - - def verifySums(self, pathMaster, filesMaster, pathClone, filesClone): - """ - Verifies the contents of directory "pathMaster" against the included checksums in sums.csv. - - Parameters: - pathMaster (str): Path to directory whose files shall get verified. - filesMaster (List[str]): Names of all files that are included in the directory >>pathMaster<<. - pathClone (str): NOT USED! Included for compatibility reasons only. - filesClone (List[str]): NOT USED! Included for compatibility reasons only. - - Returns: - bool: True, when contents of directory are verified successfully against sums.csv, else False. - """ - # check if checksum file is available, if not abort execution - if "sums.csv" not in filesMaster and len(filesMaster) > 0: - print(str(pathMaster) + " : FAIL") - print("ERROR: Unable to find checksum file!") - return False - - if len(filesMaster) > 0: - # print current working directory without line ending - if self.OPTION_PRINT == True: - print(str(pathMaster) + " : ") - else: - print(str(pathMaster), end=" : ", flush=True) - - # do not try to verify "sums.csv" as it will not be included in "sums.csv" - if "sums.csv" in filesMaster: - filesMaster.remove("sums.csv") - - # read checksum file and get dictionary - sumsDict = self.readSumsCsvFile(pathMaster) - - resultVerify = True - - # verify that all files in current working directory are included in sums.csv, and vice versa - # trigger error condition if check failed - missingItemsInSumsCSV = [i for i in filesMaster if i not in sumsDict.keys()] - additionalItemsInSumsCSV = [i for i in sumsDict.keys() if i not in filesMaster] - if len(missingItemsInSumsCSV) != 0: - resultVerify = False - if len(additionalItemsInSumsCSV) != 0: - resultVerify = False - - checksumErrors = [] - # iterate through all files and compare their checksum with those stored in sums.csv - for file in sumsDict.keys(): - checksumCalc = str(self.calcChecksum(self.os.path.join(pathMaster, file))) - checksumSaved = sumsDict[file] - # print checksums if requested by user - if self.OPTION_PRINT == True: - print("File: " + file) - print("Calculated: " + checksumCalc) - print("Saved: " + checksumSaved) - # verify calculated checksum against the one stored in sums.csv - if checksumCalc != checksumSaved: - # checksum mismatch -> trigger error condition and add file name to list of mismatched files - checksumErrors.append(file) - resultVerify = False - if self.OPTION_PRINT == True: - print("Check: FAIL") - print(self.LINE_BREAK) - elif checksumCalc == checksumSaved: - if self.OPTION_PRINT == True: - print("Check: PASS") - print(self.LINE_BREAK) - else: - # shall never be executed - resultVerify = False - print("EXECUTION ERROR") - import sys - sys.exit(1) - - if self.OPTION_PRINT == True: - print("" + str(pathMaster), end=" : ", flush=True) - # append result to printed working directory - if resultVerify == True: - print("PASS") - else: - print("FAILED!!!") - # print failed items - for file in checksumErrors: - print("[MISMATCH] " + str(file)) - for file in missingItemsInSumsCSV: - print("[+] " + str(file)) - for file in additionalItemsInSumsCSV: - print("[-] " + str(file)) - - if self.OPTION_PRINT == True: - print(self.CHAPTER_BREAK) - - return resultVerify - - # return with True, in case no files needed to be verifed - return True - - - def walker(self, pathMaster, pathClone, func): - """ - Verifies the contents of directory "pathMaster" against the included checksums in sums.csv. - - Parameters: - pathMaster (str): Path to the master directory whose contents are considered valid and unchanged, serving as a baseline for comparison. . - pathClone (str): Path to clone directory whose files shall get verified against the master copy. - func (callable) -- Function that gets executed on the respective folder. - - Returns: - bool: True, when all executions of >>func<< returned PASS, else False. - """ - # check if received strings are valid paths - if self.os.path.isdir(pathMaster) and self.os.path.isdir(pathClone): - # create lists of files and directories that are included in current path - filesM = [entryA for entryA in self.os.listdir(pathMaster) if not self.os.path.isdir(self.os.path.join(pathMaster, entryA))] - filesC = [entryB for entryB in self.os.listdir(pathClone) if not self.os.path.isdir(self.os.path.join(pathClone, entryB))] - dictsM = [entryDir for entryDir in self.os.listdir(pathMaster) if self.os.path.isdir(self.os.path.join(pathMaster, entryDir))] - - # execute requested operation - resultVerify = func(pathMaster, filesM, pathClone, filesC) - - # jump into child directories, if recursive operation is requested - if self.OPTION_RECURSIVE == True: - for nextFolder in dictsM: - resultVerify = self.walker(self.os.path.join(pathMaster, nextFolder), self.os.path.join(pathClone, nextFolder), func) & resultVerify - - return resultVerify - else: - # terminate, since paths are not pointing to valid directories - return False - - -def main(): - import sys - verify = vrfy() - sys.exit(verify.parseArgumentsAndExecute(sys.argv[1:])) - -if __name__ == "__main__": - main() - -