diff --git a/.isort.cfg b/.isort.cfg new file mode 100644 index 0000000..4c0cb09 --- /dev/null +++ b/.isort.cfg @@ -0,0 +1,5 @@ +# SPDX-FileCopyrightText: 2023 VladimĂ­r Kotal +# +# SPDX-License-Identifier: Unlicense +[settings] +profile = black diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index fa3d605..a76c7f4 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -3,6 +3,11 @@ # SPDX-License-Identifier: Unlicense repos: +- repo: https://github.com/python/black + rev: 22.3.0 + hooks: + - id: black + exclude: "^tests/bad_python.py$" - repo: https://github.com/pycqa/pylint rev: v2.15.5 hooks: @@ -16,11 +21,6 @@ repos: name: lint (code) types: [python] exclude: "^(docs/|examples/|setup.py$|tests/bad_python.py$)" -- repo: https://github.com/python/black - rev: 22.3.0 - hooks: - - id: black - exclude: "^tests/bad_python.py$" - repo: https://github.com/fsfe/reuse-tool rev: v0.14.0 hooks: diff --git a/README.rst b/README.rst index 5a23414..118cb39 100644 --- a/README.rst +++ b/README.rst @@ -89,24 +89,32 @@ To get help, just type the command:: A tool to manage and update libraries on a CircuitPython device. Options: - --verbose Comprehensive logging is sent to stdout. - --version Show the version and exit. - --path DIRECTORY Path to CircuitPython directory. Overrides automatic - path detection. - --help Show this message and exit. - -r --requirement Supports requirements.txt tracking of library - requirements with freeze and install commands. + --verbose Comprehensive logging is sent to stdout. + --path DIRECTORY Path to CircuitPython directory. Overrides automatic + path detection. + --host TEXT Hostname or IP address of a device. Overrides automatic + path detection. + --password TEXT Password to use for authentication when --host is used. + --board-id TEXT Manual Board ID of the CircuitPython device. If provided + in combination with --cpy-version, it overrides the + detected board ID. + --cpy-version TEXT Manual CircuitPython version. If provided in combination + with --board-id, it overrides the detected CPy version. + --version Show the version and exit. + --help Show this message and exit. Commands: - freeze Output details of all the modules found on the connected... - install Installs .mpy version of named module(s) onto the device. - install --py Installs .py version of named module(s). - list Lists all out of date modules found on the connected... - show Show the long list of all available modules in the bundle. - show Search the names in the modules in the bundle for a match. - uninstall Uninstall a named module(s) from the connected device. - update Update modules on the device. Use --all to automatically update - all modules. + bundle-add Add bundles to the local bundles list, by "user/repo"... + bundle-remove Remove one or more bundles from the local bundles list. + bundle-show Show the list of bundles, default and local, with URL,... + freeze Output details of all the modules found on the connected... + install Install a named module(s) onto the device. + list Lists all out of date modules found on the connected... + show Show a list of available modules in the bundle. + uninstall Uninstall a named module(s) from the connected device. + update Update modules on the device. Use --all to automatically + update all modules without Major Version warnings. + To automatically install all modules imported by ``code.py``, @@ -221,6 +229,12 @@ The ``--version`` flag will tell you the current version of the $ circup --version CircUp, A CircuitPython module updater. Version 0.0.1 + +To use circup via the `Web Workflow `_. on devices that support it. Use the ``--host`` and ``--password`` arguments before your circup command.:: + + $ circup --host 192.168.1.119 --password s3cr3t install adafruit_hid + $ circup --host cpy-9573b2.local --password s3cr3t install adafruit_hid + That's it! diff --git a/circup/__init__.py b/circup/__init__.py index 166dfe1..12fc2d3 100644 --- a/circup/__init__.py +++ b/circup/__init__.py @@ -5,17 +5,21 @@ CircUp -- a utility to manage and update libraries on a CircuitPython device. """ - import ctypes import glob import json import logging +import time +from logging.handlers import RotatingFileHandler import os import re import shutil -from subprocess import check_output +import socket import sys +import tempfile import zipfile +from subprocess import check_output +from urllib.parse import urlparse, urljoin import appdirs import click @@ -23,15 +27,20 @@ import pkg_resources import requests import toml -from semver import VersionInfo import update_checker +from requests.auth import HTTPBasicAuth +from semver import VersionInfo +from circup.shared import DATA_DIR, BAD_FILE_FORMAT, extract_metadata, _get_modules_file +from circup.backends import WebBackend, DiskBackend + +#: The version of CircuitPython found on the connected device. +CPY_VERSION = "" # Useful constants. #: Flag to indicate if the command is being run in verbose mode. VERBOSE = False -#: The location of data files used by circup (following OS conventions). -DATA_DIR = appdirs.user_data_dir(appname="circup", appauthor="adafruit") + #: The path to the JSON file containing the metadata about the bundles. BUNDLE_CONFIG_FILE = pkg_resources.resource_filename( "circup", "config/bundle_config.json" @@ -46,6 +55,7 @@ LOG_DIR = appdirs.user_log_dir(appname="circup", appauthor="adafruit") #: The location of the log file for the utility. LOGFILE = os.path.join(LOG_DIR, "circup.log") + #: The libraries (and blank lines) which don't go on devices NOT_MCU_LIBRARIES = [ "", @@ -56,14 +66,12 @@ "circuitpython_typing", "pyserial", ] -#: The version of CircuitPython found on the connected device. -CPY_VERSION = "" + #: Module formats list (and the other form used in github files) PLATFORMS = {"py": "py", "8mpy": "8.x-mpy", "9mpy": "9.x-mpy"} #: Commands that do not require an attached board BOARDLESS_COMMANDS = ["show", "bundle-add", "bundle-remove", "bundle-show"] -#: Version identifier for a bad MPY file format -BAD_FILE_FORMAT = "Invalid" + #: Timeout for requests calls like get() REQUESTS_TIMEOUT = 30 @@ -77,7 +85,7 @@ # Setup logging. logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) -logfile_handler = logging.FileHandler(LOGFILE) +logfile_handler = RotatingFileHandler(LOGFILE, maxBytes=10_000_000, backupCount=0) log_formatter = logging.Formatter( "%(asctime)s %(levelname)s: %(message)s", datefmt="%m/%d/%Y %H:%M:%S" ) @@ -230,7 +238,15 @@ class Module: # pylint: disable=too-many-arguments def __init__( - self, path, repo, device_version, bundle_version, mpy, bundle, compatibility + self, + name, + backend, + repo, + device_version, + bundle_version, + mpy, + bundle, + compatibility, ): """ The ``self.file`` and ``self.name`` attributes are constructed from @@ -238,8 +254,8 @@ def __init__( resulting self.file value will be None, and the name will be the basename of the directory path. - :param str path: The path to the module on the connected - CIRCUITPYTHON device. + :param str name: The file name of the module. + :param Backend backend: The backend that the module is on. :param str repo: The URL of the Git repository for this module. :param str device_version: The semver value for the version on device. :param str bundle_version: The semver value for the version in bundle. @@ -247,15 +263,31 @@ def __init__( :param Bundle bundle: Bundle object where the module is located. :param (str,str) compatibility: Min and max versions of CP compatible with the mpy. """ - self.path = path - if os.path.isfile(self.path): - # Single file module. - self.file = os.path.basename(path) - self.name = self.file.replace(".py", "").replace(".mpy", "") - else: - # Directory based module. + self.name = name + self.backend = backend + self.path = ( + urljoin(backend.library_path, name, allow_fragments=False) + if isinstance(backend, WebBackend) + else os.path.join(backend.library_path, name) + ) + + url = urlparse(self.path, allow_fragments=False) + + if ( + url.path.endswith("/") + if isinstance(backend, WebBackend) + else self.path.endswith(os.sep) + ): self.file = None - self.name = os.path.basename(os.path.dirname(self.path)) + self.name = self.path.split( + "/" if isinstance(backend, WebBackend) else os.sep + )[-2] + else: + self.file = os.path.basename(url.path) + self.name = ( + os.path.basename(url.path).replace(".py", "").replace(".mpy", "") + ) + self.repo = repo self.device_version = device_version self.bundle_version = bundle_version @@ -371,22 +403,6 @@ def row(self): update_reason = "Minor Version" return (self.name, loc, rem, update_reason) - def update(self): - """ - Delete the module on the device, then copy the module from the bundle - back onto the device. - - The caller is expected to handle any exceptions raised. - """ - if os.path.isdir(self.path): - # Delete and copy the directory. - shutil.rmtree(self.path, ignore_errors=True) - shutil.copytree(self.bundle_path, self.path) - else: - # Delete and copy file. - os.remove(self.path) - shutil.copyfile(self.bundle_path, self.path) - def __repr__(self): """ Helps with log files. @@ -497,89 +513,6 @@ def ensure_latest_bundle(bundle): logger.info("Current bundle up to date %s.", tag) -def extract_metadata(path): - # pylint: disable=too-many-locals,too-many-branches - """ - Given an file path, return a dictionary containing metadata extracted from - dunder attributes found therein. Works with both .py and .mpy files. - - For Python source files, such metadata assignments should be simple and - single-line. For example:: - - __version__ = "1.1.4" - __repo__ = "https://github.com/adafruit/SomeLibrary.git" - - For byte compiled .mpy files, a brute force / backtrack approach is used - to find the __version__ number in the file -- see comments in the - code for the implementation details. - - :param str path: The path to the file containing the metadata. - :return: The dunder based metadata found in the file, as a dictionary. - """ - result = {} - logger.info("%s", path) - if path.endswith(".py"): - result["mpy"] = False - with open(path, "r", encoding="utf-8") as source_file: - content = source_file.read() - #: The regex used to extract ``__version__`` and ``__repo__`` assignments. - dunder_key_val = r"""(__\w+__)(?:\s*:\s*\w+)?\s*=\s*(?:['"]|\(\s)(.+)['"]""" - for match in re.findall(dunder_key_val, content): - result[match[0]] = str(match[1]) - if result: - logger.info("Extracted metadata: %s", result) - elif path.endswith(".mpy"): - find_by_regexp_match = False - result["mpy"] = True - with open(path, "rb") as mpy_file: - content = mpy_file.read() - # Track the MPY version number - mpy_version = content[0:2] - compatibility = None - loc = -1 - # Find the start location of the __version__ - if mpy_version == b"M\x03": - # One byte for the length of "__version__" - loc = content.find(b"__version__") - 1 - compatibility = (None, "7.0.0-alpha.1") - elif mpy_version == b"C\x05": - # Two bytes for the length of "__version__" in mpy version 5 - loc = content.find(b"__version__") - 2 - compatibility = ("7.0.0-alpha.1", "8.99.99") - elif mpy_version == b"C\x06": - # Two bytes in mpy version 6 - find_by_regexp_match = True - compatibility = ("9.0.0-alpha.1", None) - if find_by_regexp_match: - # Too hard to find the version positionally. - # Find the first thing that looks like an x.y.z version number. - match = re.search(rb"([\d]+\.[\d]+\.[\d]+)\x00", content) - if match: - result["__version__"] = match.group(1).decode("utf-8") - elif loc > -1: - # Backtrack until a byte value of the offset is reached. - offset = 1 - while offset < loc: - val = int(content[loc - offset]) - if mpy_version == b"C\x05": - val = val // 2 - if val == offset - 1: # Off by one..! - # Found version, extract the number given boundaries. - start = loc - offset + 1 # No need for prepended length. - end = loc # Up to the start of the __version__. - version = content[start:end] # Slice the version number. - # Create a string version as metadata in the result. - result["__version__"] = version.decode("utf-8") - break # Nothing more to do. - offset += 1 # ...and again but backtrack by one. - if compatibility: - result["compatibility"] = compatibility - else: - # not a valid MPY file - result["__version__"] = BAD_FILE_FORMAT - return result - - def find_device(): """ Return the location on the filesystem for the connected CircuitPython device. @@ -648,43 +581,50 @@ def get_volume_name(disk_name): return device_dir -def find_modules(device_path, bundles_list): +def find_modules(backend, bundles_list): """ Extracts metadata from the connected device and available bundles and returns this as a list of Module instances representing the modules on the device. - :param str device_path: The path to the connected board. - :param Bundle bundles_list: List of supported bundles as Bundle objects. + :param Backend backend: Backend with the device connection. + :param List[Bundle] bundles_list: List of supported bundles as Bundle objects. :return: A list of Module instances describing the current state of the modules on the connected device. """ # pylint: disable=broad-except,too-many-locals try: - device_modules = get_device_versions(device_path) + device_modules = backend.get_device_versions() bundle_modules = get_bundle_versions(bundles_list) result = [] - for name, device_metadata in device_modules.items(): - if name in bundle_modules: + for key, device_metadata in device_modules.items(): + + if key in bundle_modules: path = device_metadata["path"] - bundle_metadata = bundle_modules[name] + bundle_metadata = bundle_modules[key] repo = bundle_metadata.get("__repo__") bundle = bundle_metadata.get("bundle") device_version = device_metadata.get("__version__") bundle_version = bundle_metadata.get("__version__") mpy = device_metadata["mpy"] compatibility = device_metadata.get("compatibility", (None, None)) - result.append( - Module( - path, - repo, - device_version, - bundle_version, - mpy, - bundle, - compatibility, - ) + module_name = ( + path.split(os.sep)[-1] + if not path.endswith(os.sep) + else path[:-1].split(os.sep)[-1] + os.sep + ) + + m = Module( + module_name, + backend, + repo, + device_version, + bundle_version, + mpy, + bundle, + compatibility, ) + result.append(m) return result except Exception as ex: # If it's not possible to get the device and bundle metadata, bail out @@ -739,7 +679,7 @@ def get_bundle_versions(bundles_list, avoid_download=False): of the library bundle. Uses the Python version (rather than the compiled version) of the library modules. - :param Bundle bundles_list: List of supported bundles as Bundle objects. + :param List[Bundle] bundles_list: List of supported bundles as Bundle objects. :param bool avoid_download: if True, download the bundle only if missing. :return: A dictionary of metadata about the modules available in the library bundle. @@ -749,7 +689,7 @@ def get_bundle_versions(bundles_list, avoid_download=False): if not avoid_download or not os.path.isdir(bundle.lib_dir("py")): ensure_latest_bundle(bundle) path = bundle.lib_dir("py") - path_modules = get_modules(path) + path_modules = _get_modules_file(path, logger) for name, module in path_modules.items(): module["bundle"] = bundle if name not in all_the_modules: # here we decide the order of priority @@ -807,43 +747,6 @@ def get_bundles_list(): return bundles_list -def get_circuitpython_version(device_path): - """ - Returns the version number of CircuitPython running on the board connected - via ``device_path``, along with the board ID. This is obtained from the - ``boot_out.txt`` file on the device, whose first line will start with - something like this:: - - Adafruit CircuitPython 4.1.0 on 2019-08-02; - - While the second line is:: - - Board ID:raspberry_pi_pico - - :param str device_path: The path to the connected board. - :return: A tuple with the version string for CircuitPython and the board ID string. - """ - try: - with open( - os.path.join(device_path, "boot_out.txt"), "r", encoding="utf-8" - ) as boot: - version_line = boot.readline() - circuit_python = version_line.split(";")[0].split(" ")[-3] - board_line = boot.readline() - if board_line.startswith("Board ID:"): - board_id = board_line[9:].strip() - else: - board_id = "" - except FileNotFoundError: - click.secho( - "Missing file boot_out.txt on the device: wrong path or drive corrupted.", - fg="red", - ) - logger.error("boot_out.txt not found.") - sys.exit(1) - return (circuit_python, board_id) - - def get_circup_version(): """Return the version of circup that is running. If not available, return None. @@ -864,7 +767,8 @@ def get_circup_version(): def get_dependencies(*requested_libraries, mod_names, to_install=()): """ - Return a list of other CircuitPython libraries + Return a list of other CircuitPython libraries required by the given list + of libraries :param tuple requested_libraries: The libraries to search for dependencies :param object mod_names: All the modules metadata from bundle @@ -957,17 +861,6 @@ def get_circup_dependencies(bundle, library): return tuple() -def get_device_versions(device_path): - """ - Returns a dictionary of metadata from modules on the connected device. - - :param str device_path: Path to the device volume. - :return: A dictionary of metadata about the modules available on the - connected device. - """ - return get_modules(os.path.join(device_path, "lib")) - - def get_latest_release_from_url(url): """ Find the tag name of the latest release by using HTTP HEAD and decoding the redirect. @@ -986,138 +879,6 @@ def get_latest_release_from_url(url): return tag -def get_modules(path): - """ - Get a dictionary containing metadata about all the Python modules found in - the referenced path. - - :param str path: The directory in which to find modules. - :return: A dictionary containing metadata about the found modules. - """ - result = {} - if not path: - return result - single_file_py_mods = glob.glob(os.path.join(path, "*.py")) - single_file_mpy_mods = glob.glob(os.path.join(path, "*.mpy")) - package_dir_mods = [ - d - for d in glob.glob(os.path.join(path, "*", "")) - if not os.path.basename(os.path.normpath(d)).startswith(".") - ] - single_file_mods = single_file_py_mods + single_file_mpy_mods - for sfm in [f for f in single_file_mods if not os.path.basename(f).startswith(".")]: - metadata = extract_metadata(sfm) - metadata["path"] = sfm - result[os.path.basename(sfm).replace(".py", "").replace(".mpy", "")] = metadata - for package_path in package_dir_mods: - name = os.path.basename(os.path.dirname(package_path)) - py_files = glob.glob(os.path.join(package_path, "**/*.py"), recursive=True) - mpy_files = glob.glob(os.path.join(package_path, "**/*.mpy"), recursive=True) - all_files = py_files + mpy_files - # default value - result[name] = {"path": package_path, "mpy": bool(mpy_files)} - # explore all the submodules to detect bad ones - for source in [f for f in all_files if not os.path.basename(f).startswith(".")]: - metadata = extract_metadata(source) - if "__version__" in metadata: - metadata["path"] = package_path - result[name] = metadata - # break now if any of the submodules has a bad format - if metadata["__version__"] == BAD_FILE_FORMAT: - break - return result - - -# pylint: disable=too-many-locals,too-many-branches -def install_module( - device_path, device_modules, name, pyext, mod_names -): # pragma: no cover - """ - Finds a connected device and installs a given module name if it - is available in the current module bundle and is not already - installed on the device. - TODO: There is currently no check for the version. - - :param str device_path: The path to the connected board. - :param list(dict) device_modules: List of module metadata from the device. - :param str name: Name of module to install - :param bool pyext: Boolean to specify if the module should be installed from - source or from a pre-compiled module - :param mod_names: Dictionary of metadata from modules that can be generated - with get_bundle_versions() - """ - if not name: - click.echo("No module name(s) provided.") - elif name in mod_names: - library_path = os.path.join(device_path, "lib") - if not os.path.exists(library_path): # pragma: no cover - os.makedirs(library_path) - metadata = mod_names[name] - bundle = metadata["bundle"] - # Grab device modules to check if module already installed - if name in device_modules: - click.echo("'{}' is already installed.".format(name)) - return - if pyext: - # Use Python source for module. - source_path = metadata["path"] # Path to Python source version. - if os.path.isdir(source_path): - target = os.path.basename(os.path.dirname(source_path)) - target_path = os.path.join(library_path, target) - # Copy the directory. - shutil.copytree(source_path, target_path) - else: - target = os.path.basename(source_path) - target_path = os.path.join(library_path, target) - # Copy file. - shutil.copyfile(source_path, target_path) - else: - # Use pre-compiled mpy modules. - module_name = os.path.basename(metadata["path"]).replace(".py", ".mpy") - if not module_name: - # Must be a directory based module. - module_name = os.path.basename(os.path.dirname(metadata["path"])) - major_version = CPY_VERSION.split(".")[0] - bundle_platform = "{}mpy".format(major_version) - bundle_path = os.path.join(bundle.lib_dir(bundle_platform), module_name) - if os.path.isdir(bundle_path): - target_path = os.path.join(library_path, module_name) - # Copy the directory. - shutil.copytree(bundle_path, target_path) - elif os.path.isfile(bundle_path): - target = os.path.basename(bundle_path) - target_path = os.path.join(library_path, target) - # Copy file. - shutil.copyfile(bundle_path, target_path) - else: - raise IOError("Cannot find compiled version of module.") - click.echo("Installed '{}'.".format(name)) - else: - click.echo("Unknown module named, '{}'.".format(name)) - - -# pylint: enable=too-many-locals,too-many-branches - - -def libraries_from_imports(code_py, mod_names): - """ - Parse the given code.py file and return the imported libraries - - :param str code_py: Full path of the code.py file - :return: sequence of library names - """ - # pylint: disable=broad-except - try: - found_imports = findimports.find_imports(code_py) - except Exception as ex: # broad exception because anything could go wrong - logger.exception(ex) - click.secho('Unable to read the auto file: "{}"'.format(str(ex)), fg="red") - sys.exit(2) - # pylint: enable=broad-except - imports = [info.name.split(".", 1)[0] for info in found_imports] - return [r for r in imports if r in mod_names] - - def libraries_from_requirements(requirements): """ Clean up supplied requirements.txt and turn into tuple of CP libraries @@ -1188,6 +949,25 @@ def tags_data_save_tag(key, tag): json.dump(tags_data, data) +def libraries_from_code_py(code_py, mod_names): + """ + Parse the given code.py file and return the imported libraries + + :param str code_py: Full path of the code.py file + :return: sequence of library names + """ + # pylint: disable=broad-except + try: + found_imports = findimports.find_imports(code_py) + except Exception as ex: # broad exception because anything could go wrong + logger.exception(ex) + click.secho('Unable to read the auto file: "{}"'.format(str(ex)), fg="red") + sys.exit(2) + # pylint: enable=broad-except + imports = [info.name.split(".", 1)[0] for info in found_imports] + return [r for r in imports if r in mod_names] + + # ----------- CLI command definitions ----------- # # The following functions have IO side effects (for instance they emit to @@ -1206,6 +986,18 @@ def tags_data_save_tag(key, tag): type=click.Path(exists=True, file_okay=False), help="Path to CircuitPython directory. Overrides automatic path detection.", ) +@click.option( + "--host", + help="Hostname or IP address of a device. Overrides automatic path detection.", +) +@click.option( + "--password", help="Password to use for authentication when --host is used." +) +@click.option( + "--timeout", + default=30, + help="Specify the timeout in seconds for any network operations.", +) @click.option( "--board-id", default=None, @@ -1223,11 +1015,46 @@ def tags_data_save_tag(key, tag): message="%(prog)s, A CircuitPython module updater. Version %(version)s", ) @click.pass_context -def main(ctx, verbose, path, board_id, cpy_version): # pragma: no cover +def main( # pylint: disable=too-many-locals + ctx, verbose, path, host, password, timeout, board_id, cpy_version +): # pragma: no cover """ A tool to manage and update libraries on a CircuitPython device. """ + # pylint: disable=too-many-arguments,too-many-branches,too-many-statements,too-many-locals ctx.ensure_object(dict) + global REQUESTS_TIMEOUT + ctx.obj["TIMEOUT"] = REQUESTS_TIMEOUT = timeout + device_path = get_device_path(host, password, path) + + using_webworkflow = "host" in ctx.params.keys() and ctx.params["host"] is not None + + if using_webworkflow: + if host == "circuitpython.local": + click.echo("Checking versions.json on circuitpython.local to find hostname") + versions_resp = requests.get( + "http://circuitpython.local/cp/version.json", timeout=timeout + ) + host = f'{versions_resp.json()["hostname"]}.local' + click.echo(f"Using hostname: {host}") + device_path = device_path.replace("circuitpython.local", host) + try: + ctx.obj["backend"] = WebBackend( + host=host, password=password, logger=logger, timeout=timeout + ) + except ValueError as e: + click.secho(e, fg="red") + time.sleep(0.3) + sys.exit(1) + except RuntimeError as e: + click.secho(e, fg="red") + sys.exit(1) + else: + try: + ctx.obj["backend"] = DiskBackend(device_path, logger) + except ValueError as e: + print(e) + if verbose: # Configure additional logging to stdout. global VERBOSE @@ -1248,21 +1075,18 @@ def main(ctx, verbose, path, board_id, cpy_version): # pragma: no cover # stop early if the command is boardless if ctx.invoked_subcommand in BOARDLESS_COMMANDS: return - if path: - device_path = path - else: - device_path = find_device() + ctx.obj["DEVICE_PATH"] = device_path latest_version = get_latest_release_from_url( "https://github.com/adafruit/circuitpython/releases/latest" ) global CPY_VERSION - if device_path is None: + if device_path is None or not ctx.obj["backend"].is_device_present(): click.secho("Could not find a connected CircuitPython device.", fg="red") sys.exit(1) else: CPY_VERSION, board_id = ( - get_circuitpython_version(device_path) + ctx.obj["backend"].get_circuitpython_version() if board_id is None or cpy_version is None else (cpy_version, board_id) ) @@ -1289,6 +1113,23 @@ def main(ctx, verbose, path, board_id, cpy_version): # pragma: no cover logger.warning(ex) +def get_device_path(host, password, path): + """ + :param host Hostname or IP address. + :param password REST API password. + :param path File system path. + :return device URL or None if the device cannot be found. + """ + if path: + device_path = path + elif host: + # pylint: enable=no-member + device_path = f"http://:{password}@" + host + else: + device_path = find_device() + return device_path + + @main.command() @click.option("-r", "--requirement", is_flag=True) @click.pass_context @@ -1298,7 +1139,7 @@ def freeze(ctx, requirement): # pragma: no cover device. Option -r saves output to requirements.txt file """ logger.info("Freeze") - modules = find_modules(ctx.obj["DEVICE_PATH"], get_bundles_list()) + modules = find_modules(ctx.obj["backend"], get_bundles_list()) if modules: output = [] for module in modules: @@ -1331,7 +1172,7 @@ def list_cli(ctx): # pragma: no cover modules = [ m.row - for m in find_modules(ctx.obj["DEVICE_PATH"], get_bundles_list()) + for m in find_modules(ctx.obj["backend"], get_bundles_list()) if m.outofdate ] if modules: @@ -1393,6 +1234,7 @@ def install(ctx, modules, pyext, requirement, auto, auto_file): # pragma: no co can be installed at once by providing more than one module name, each separated by a space. """ + # TODO: Ensure there's enough space on the device available_modules = get_bundle_versions(get_bundles_list()) mod_names = {} @@ -1405,25 +1247,39 @@ def install(ctx, modules, pyext, requirement, auto, auto_file): # pragma: no co elif auto or auto_file: if auto_file is None: auto_file = "code.py" + print(f"Auto file: {auto_file}") # pass a local file with "./" or "../" - is_relative = auto_file.split(os.sep)[0] in [os.path.curdir, os.path.pardir] + is_relative = not isinstance(ctx.obj["backend"], WebBackend) or auto_file.split( + os.sep + )[0] in [os.path.curdir, os.path.pardir] if not os.path.isabs(auto_file) and not is_relative: - auto_file = os.path.join(ctx.obj["DEVICE_PATH"], auto_file or "code.py") - if not os.path.isfile(auto_file): - click.secho(f"Auto file not found: {auto_file}", fg="red") - sys.exit(1) - requested_installs = libraries_from_imports(auto_file, mod_names) + auto_file = ctx.obj["backend"].get_file_path(auto_file or "code.py") + + auto_file_path = ctx.obj["backend"].get_auto_file_path(auto_file) + print(f"Auto file path: {auto_file_path}") + if not os.path.isfile(auto_file_path): + # fell through to here when run from random folder on windows - ask backend. + new_auto_file = ctx.obj["backend"].get_file_path(auto_file) + if os.path.isfile(new_auto_file): + auto_file = new_auto_file + auto_file_path = ctx.obj["backend"].get_auto_file_path(auto_file) + print(f"Auto file path: {auto_file_path}") + else: + click.secho(f"Auto file not found: {auto_file}", fg="red") + sys.exit(1) + + requested_installs = libraries_from_code_py(auto_file_path, mod_names) else: requested_installs = modules requested_installs = sorted(set(requested_installs)) click.echo(f"Searching for dependencies for: {requested_installs}") to_install = get_dependencies(requested_installs, mod_names=mod_names) - device_modules = get_device_versions(ctx.obj["DEVICE_PATH"]) + device_modules = ctx.obj["backend"].get_device_versions() if to_install is not None: to_install = sorted(to_install) click.echo(f"Ready to install: {to_install}\n") for library in to_install: - install_module( + ctx.obj["backend"].install_module( ctx.obj["DEVICE_PATH"], device_modules, library, pyext, mod_names ) @@ -1461,29 +1317,22 @@ def uninstall(ctx, module): # pragma: no cover can be uninstalled at once by providing more than one module name, each separated by a space. """ + device_path = ctx.obj["DEVICE_PATH"] + print(f"Uninstalling {module} from {device_path}") for name in module: - device_modules = get_device_versions(ctx.obj["DEVICE_PATH"]) + device_modules = ctx.obj["backend"].get_device_versions() name = name.lower() mod_names = {} for module_item, metadata in device_modules.items(): mod_names[module_item.replace(".py", "").lower()] = metadata if name in mod_names: - library_path = os.path.join(ctx.obj["DEVICE_PATH"], "lib") metadata = mod_names[name] module_path = metadata["path"] - if os.path.isdir(module_path): - target = os.path.basename(os.path.dirname(module_path)) - target_path = os.path.join(library_path, target) - # Remove the directory. - shutil.rmtree(target_path) - else: - target = os.path.basename(module_path) - target_path = os.path.join(library_path, target) - # Remove file - os.remove(target_path) + ctx.obj["backend"].uninstall(device_path, module_path) click.echo("Uninstalled '{}'.".format(name)) else: click.echo("Module '{}' not found on device.".format(name)) + continue # pylint: disable=too-many-branches @@ -1510,9 +1359,7 @@ def update(ctx, update_all): # pragma: no cover logger.info("Update") # Grab out of date modules. modules = [ - m - for m in find_modules(ctx.obj["DEVICE_PATH"], get_bundles_list()) - if m.outofdate + m for m in find_modules(ctx.obj["backend"], get_bundles_list()) if m.outofdate ] if modules: click.echo("Found {} module[s] needing update.".format(len(modules))) @@ -1564,7 +1411,7 @@ def update(ctx, update_all): # pragma: no cover if update_flag: # pylint: disable=broad-except try: - module.update() + ctx.obj["backend"].update(module) click.echo("Updated {}".format(module.name)) except Exception as ex: logger.exception(ex) diff --git a/circup/backends.py b/circup/backends.py new file mode 100644 index 0000000..754cbac --- /dev/null +++ b/circup/backends.py @@ -0,0 +1,829 @@ +# SPDX-FileCopyrightText: 2019 Nicholas Tollervey, written for Adafruit Industries +# SPDX-FileCopyrightText: 2023 Tim Cocks, written for Adafruit Industries +# +# SPDX-License-Identifier: MIT +""" +Backend classes that represent interfaces to physical devices. +""" +import os +import shutil +import sys +import socket +import tempfile +from urllib.parse import urlparse, urljoin +import click +import requests +from requests.adapters import HTTPAdapter +from requests.auth import HTTPBasicAuth + + +from circup.shared import DATA_DIR, BAD_FILE_FORMAT, extract_metadata, _get_modules_file + +#: The location to store a local copy of code.py for use with --auto and +# web workflow +LOCAL_CODE_PY_COPY = os.path.join(DATA_DIR, "code.tmp.py") + + +class Backend: + """ + Backend parent class to be extended for workflow specific + implementations + """ + + def __init__(self, logger): + self.device_location = None + self.LIB_DIR_PATH = None + self.logger = logger + + def get_circuitpython_version(self): + """ + Must be overridden by subclass for implementation! + + Returns the version number of CircuitPython running on the board connected + via ``device_url``, along with the board ID. + + :param str device_location: http based device URL or local file path. + :return: A tuple with the version string for CircuitPython and the board ID string. + """ + raise NotImplementedError + + def _get_modules(self, device_lib_path): + """ + To be overridden by subclass + """ + raise NotImplementedError + + def get_modules(self, device_url): + """ + Get a dictionary containing metadata about all the Python modules found in + the referenced path. + + :param str device_url: URL to be used to find modules. + :return: A dictionary containing metadata about the found modules. + """ + return self._get_modules(device_url) + + def get_device_versions(self): + """ + Returns a dictionary of metadata from modules on the connected device. + + :param str device_url: URL for the device. + :return: A dictionary of metadata about the modules available on the + connected device. + """ + return self.get_modules(os.path.join(self.device_location, self.LIB_DIR_PATH)) + + def _create_library_directory(self, device_path, library_path): + """ + To be overridden by subclass + """ + raise NotImplementedError + + def _install_module_py(self, metadata): + """ + To be overridden by subclass + """ + raise NotImplementedError + + def _install_module_mpy(self, bundle, metadata): + """ + To be overridden by subclass + """ + raise NotImplementedError + + # pylint: disable=too-many-locals,too-many-branches,too-many-arguments,too-many-nested-blocks + def install_module( + self, device_path, device_modules, name, pyext, mod_names + ): # pragma: no cover + """ + Finds a connected device and installs a given module name if it + is available in the current module bundle and is not already + installed on the device. + TODO: There is currently no check for the version. + + :param str device_path: The path to the connected board. + :param list(dict) device_modules: List of module metadata from the device. + :param str name: Name of module to install + :param bool pyext: Boolean to specify if the module should be installed from + source or from a pre-compiled module + :param mod_names: Dictionary of metadata from modules that can be generated + with get_bundle_versions() + """ + if not name: + click.echo("No module name(s) provided.") + elif name in mod_names: + # Grab device modules to check if module already installed + if name in device_modules: + click.echo("'{}' is already installed.".format(name)) + return + + library_path = ( + os.path.join(device_path, self.LIB_DIR_PATH) + if not isinstance(self, WebBackend) + else urljoin(device_path, self.LIB_DIR_PATH) + ) + metadata = mod_names[name] + bundle = metadata["bundle"] + bundle.size = os.path.getsize(metadata["path"]) + if os.path.isdir(metadata["path"]): + # pylint: disable=unused-variable + for dirpath, dirnames, filenames in os.walk(metadata["path"]): + for f in filenames: + fp = os.path.join(dirpath, f) + try: + if not os.path.islink(fp): # Ignore symbolic links + bundle.size += os.path.getsize(fp) + else: + self.logger.warning( + f"Skipping symbolic link in space calculation: {fp}" + ) + except OSError as e: + self.logger.error( + f"Error: {e} - Skipping file in space calculation: {fp}" + ) + + if self.get_free_space() < bundle.size: + self.logger.error( + f"Aborted installing module {name} - " + f"not enough free space ({bundle.size} < {self.get_free_space()})" + ) + click.secho( + f"Aborted installing module {name} - " + f"not enough free space ({bundle.size} < {self.get_free_space()})", + fg="red", + ) + return + + # Create the library directory first. + self._create_library_directory(device_path, library_path) + + if pyext: + # Use Python source for module. + self._install_module_py(metadata) + else: + # Use pre-compiled mpy modules. + self._install_module_mpy(bundle, metadata) + click.echo("Installed '{}'.".format(name)) + else: + click.echo("Unknown module named, '{}'.".format(name)) + + # def libraries_from_imports(self, code_py, mod_names): + # """ + # To be overridden by subclass + # """ + # raise NotImplementedError + + def uninstall(self, device_path, module_path): + """ + To be overridden by subclass + """ + raise NotImplementedError + + def update(self, module): + """ + To be overridden by subclass + """ + raise NotImplementedError + + def get_file_path(self, filename): + """ + To be overridden by subclass + """ + raise NotImplementedError + + def get_free_space(self): + """ + To be overridden by subclass + """ + raise NotImplementedError + + def is_device_present(self): + """ + To be overriden by subclass + """ + raise NotImplementedError + + @staticmethod + def parse_boot_out_file(boot_out_contents): + """ + Parse the contents of boot_out.txt + Returns: circuitpython version and board id + """ + lines = boot_out_contents.split("\n") + version_line = lines[0] + circuit_python = version_line.split(";")[0].split(" ")[-3] + board_line = lines[1] + if board_line.startswith("Board ID:"): + board_id = board_line[9:].strip() + else: + board_id = "" + return circuit_python, board_id + + +def _writeable_error(): + click.secho( + "CircuitPython Web Workflow Device not writable\n - " + "Remount storage as writable to device (not PC)", + fg="red", + ) + sys.exit(1) + + +class WebBackend(Backend): + """ + Backend for interacting with a device via Web Workflow + """ + + def __init__(self, host, password, logger, timeout=10): + super().__init__(logger) + if password is None: + raise ValueError("--host needs --password") + + # pylint: disable=no-member + # verify hostname/address + try: + socket.getaddrinfo(host, 80, proto=socket.IPPROTO_TCP) + except socket.gaierror as exc: + raise RuntimeError( + "Invalid host: {}.".format(host) + " You should remove the 'http://'" + if "http://" in host or "https://" in host + else "Could not find or connect to specified device" + ) from exc + + self.LIB_DIR_PATH = "fs/lib/" + self.host = host + self.password = password + self.device_location = f"http://:{self.password}@{self.host}" + + self.session = requests.Session() + self.session.mount(self.device_location, HTTPAdapter(max_retries=5)) + self.library_path = self.device_location + "/" + self.LIB_DIR_PATH + self.timeout = timeout + + def install_file_http(self, source): + """ + Install file to device using web workflow. + :param source source file. + """ + file_name = source.split(os.path.sep) + file_name = file_name[-2] if file_name[-1] == "" else file_name[-1] + target = self.device_location + "/" + self.LIB_DIR_PATH + file_name + + auth = HTTPBasicAuth("", self.password) + + with open(source, "rb") as fp: + r = self.session.put(target, fp.read(), auth=auth, timeout=self.timeout) + if r.status_code == 409: + _writeable_error() + r.raise_for_status() + + def install_dir_http(self, source): + """ + Install directory to device using web workflow. + :param source source directory. + """ + mod_name = source.split(os.path.sep) + mod_name = mod_name[-2] if mod_name[-1] == "" else mod_name[-1] + target = self.device_location + "/" + self.LIB_DIR_PATH + mod_name + target = target + "/" if target[:-1] != "/" else target + url = urlparse(target) + auth = HTTPBasicAuth("", url.password) + + # Create the top level directory. + with self.session.put(target, auth=auth, timeout=self.timeout) as r: + if r.status_code == 409: + _writeable_error() + r.raise_for_status() + + # Traverse the directory structure and create the directories/files. + for root, dirs, files in os.walk(source): + rel_path = os.path.relpath(root, source) + if rel_path == ".": + rel_path = "" + for name in dirs: + path_to_create = ( + urljoin( + urljoin(target, rel_path + "/", allow_fragments=False), + name, + allow_fragments=False, + ) + if rel_path != "" + else urljoin(target, name, allow_fragments=False) + ) + path_to_create = ( + path_to_create + "/" + if path_to_create[:-1] != "/" + else path_to_create + ) + + with self.session.put( + path_to_create, auth=auth, timeout=self.timeout + ) as r: + if r.status_code == 409: + _writeable_error() + r.raise_for_status() + for name in files: + with open(os.path.join(root, name), "rb") as fp: + path_to_create = ( + urljoin( + urljoin(target, rel_path + "/", allow_fragments=False), + name, + allow_fragments=False, + ) + if rel_path != "" + else urljoin(target, name, allow_fragments=False) + ) + with self.session.put( + path_to_create, fp.read(), auth=auth, timeout=self.timeout + ) as r: + if r.status_code == 409: + _writeable_error() + r.raise_for_status() + + def get_circuitpython_version(self): + """ + Returns the version number of CircuitPython running on the board connected + via ``device_path``, along with the board ID. This is obtained using + RESTful API from the /cp/version.json URL. + + :return: A tuple with the version string for CircuitPython and the board ID string. + """ + # pylint: disable=arguments-renamed + with self.session.get( + self.device_location + "/cp/version.json", timeout=self.timeout + ) as r: + # pylint: disable=no-member + if r.status_code != requests.codes.ok: + click.secho( + f" Unable to get version from {self.device_location}: {r.status_code}", + fg="red", + ) + sys.exit(1) + # pylint: enable=no-member + ver_json = r.json() + return ver_json.get("version"), ver_json.get("board_id") + + def _get_modules(self, device_lib_path): + return self._get_modules_http(device_lib_path) + + def _get_modules_http(self, url): + """ + Get a dictionary containing metadata about all the Python modules found using + the referenced URL. + + :param str url: URL for the modules. + :return: A dictionary containing metadata about the found modules. + """ + result = {} + u = urlparse(url) + auth = HTTPBasicAuth("", u.password) + with self.session.get( + url, auth=auth, headers={"Accept": "application/json"}, timeout=self.timeout + ) as r: + r.raise_for_status() + + directory_mods = [] + single_file_mods = [] + + for entry in r.json()["files"]: + + entry_name = entry.get("name") + if entry.get("directory"): + directory_mods.append(entry_name) + else: + if entry_name.endswith(".py") or entry_name.endswith(".mpy"): + single_file_mods.append(entry_name) + + self._get_modules_http_single_mods(auth, result, single_file_mods, url) + self._get_modules_http_dir_mods(auth, directory_mods, result, url) + + return result + + def _get_modules_http_dir_mods(self, auth, directory_mods, result, url): + # pylint: disable=too-many-locals + """ + Builds result dictionary with keys containing module names and values containing a + dictionary with metadata bout the module like version, compatibility, mpy or not etc. + + :param auth HTTP authentication. + :param directory_mods list of modules. + :param result dictionary for the result. + :param url: URL of the device. + """ + for dm in directory_mods: + if str(urlparse(dm).scheme).lower() not in ("http", "https"): + dm_url = url + dm + "/" + else: + dm_url = dm + + with self.session.get( + dm_url, + auth=auth, + headers={"Accept": "application/json"}, + timeout=self.timeout, + ) as r: + r.raise_for_status() + mpy = False + + for entry in r.json()["files"]: + entry_name = entry.get("name") + if not entry.get("directory") and ( + entry_name.endswith(".py") or entry_name.endswith(".mpy") + ): + if entry_name.endswith(".mpy"): + mpy = True + + with self.session.get( + dm_url + entry_name, auth=auth, timeout=self.timeout + ) as rr: + rr.raise_for_status() + idx = entry_name.rfind(".") + with tempfile.NamedTemporaryFile( + prefix=entry_name[:idx] + "-", + suffix=entry_name[idx:], + delete=False, + ) as fp: + fp.write(rr.content) + tmp_name = fp.name + metadata = extract_metadata(tmp_name, self.logger) + os.remove(tmp_name) + if "__version__" in metadata: + metadata["path"] = dm_url + result[dm] = metadata + # break now if any of the submodules has a bad format + if metadata["__version__"] == BAD_FILE_FORMAT: + break + + if result.get(dm) is None: + result[dm] = {"path": dm_url, "mpy": mpy} + + def _get_modules_http_single_mods(self, auth, result, single_file_mods, url): + """ + :param auth HTTP authentication. + :param single_file_mods list of modules. + :param result dictionary for the result. + :param url: URL of the device. + """ + for sfm in single_file_mods: + if str(urlparse(sfm).scheme).lower() not in ("http", "https"): + sfm_url = url + sfm + else: + sfm_url = sfm + with self.session.get(sfm_url, auth=auth, timeout=self.timeout) as r: + r.raise_for_status() + idx = sfm.rfind(".") + with tempfile.NamedTemporaryFile( + prefix=sfm[:idx] + "-", suffix=sfm[idx:], delete=False + ) as fp: + fp.write(r.content) + tmp_name = fp.name + metadata = extract_metadata(tmp_name, self.logger) + os.remove(tmp_name) + metadata["path"] = sfm_url + result[sfm[:idx]] = metadata + + def _create_library_directory(self, device_path, library_path): + url = urlparse(device_path) + auth = HTTPBasicAuth("", url.password) + with self.session.put(library_path, auth=auth, timeout=self.timeout) as r: + if r.status_code == 409: + _writeable_error() + r.raise_for_status() + + def _install_module_mpy(self, bundle, metadata): + """ + :param bundle library bundle. + :param library_path library path + :param metadata dictionary. + """ + module_name = os.path.basename(metadata["path"]).replace(".py", ".mpy") + if not module_name: + # Must be a directory based module. + module_name = os.path.basename(os.path.dirname(metadata["path"])) + major_version = self.get_circuitpython_version()[0].split(".")[0] + bundle_platform = "{}mpy".format(major_version) + bundle_path = os.path.join(bundle.lib_dir(bundle_platform), module_name) + if os.path.isdir(bundle_path): + + self.install_dir_http(bundle_path) + + elif os.path.isfile(bundle_path): + self.install_file_http(bundle_path) + + else: + raise IOError("Cannot find compiled version of module.") + + # pylint: enable=too-many-locals,too-many-branches + def _install_module_py(self, metadata): + """ + :param library_path library path + :param metadata dictionary. + """ + + source_path = metadata["path"] # Path to Python source version. + if os.path.isdir(source_path): + self.install_dir_http(source_path) + + else: + self.install_file_http(source_path) + + def get_auto_file_path(self, auto_file_path): + """ + Make a local temp copy of the --auto file from the device. + Returns the path to the local copy. + """ + url = auto_file_path + auth = HTTPBasicAuth("", self.password) + with self.session.get(url, auth=auth, timeout=self.timeout) as r: + r.raise_for_status() + with open(LOCAL_CODE_PY_COPY, "w", encoding="utf-8") as f: + f.write(r.text) + return LOCAL_CODE_PY_COPY + + def uninstall(self, device_path, module_path): + """ + Uninstall given module on device using REST API. + """ + url = urlparse(device_path) + auth = HTTPBasicAuth("", url.password) + with self.session.delete(module_path, auth=auth, timeout=self.timeout) as r: + if r.status_code == 409: + _writeable_error() + r.raise_for_status() + + def update(self, module): + """ + Delete the module on the device, then copy the module from the bundle + back onto the device. + + The caller is expected to handle any exceptions raised. + """ + self._update_http(module) + + def _update_http(self, module): + """ + Update the module using web workflow. + """ + if module.file: + # Copy the file (will overwrite). + self.install_file_http(module.bundle_path) + else: + # Delete the directory (recursive) first. + url = urlparse(module.path) + auth = HTTPBasicAuth("", url.password) + with self.session.delete(module.path, auth=auth, timeout=self.timeout) as r: + if r.status_code == 409: + _writeable_error() + r.raise_for_status() + self.install_dir_http(module.bundle_path) + + def get_file_path(self, filename): + """ + retuns the full path on the device to a given file name. + """ + return urljoin( + urljoin(self.device_location, "fs/", allow_fragments=False), + filename, + allow_fragments=False, + ) + + def is_device_present(self): + """ + returns True if the device is currently connected + """ + try: + _ = self.session.get(f"{self.device_location}/cp/version.json") + return True + except requests.exceptions.ConnectionError: + return False + + def get_device_versions(self): + """ + Returns a dictionary of metadata from modules on the connected device. + + :param str device_url: URL for the device. + :return: A dictionary of metadata about the modules available on the + connected device. + """ + return self.get_modules(urljoin(self.device_location, self.LIB_DIR_PATH)) + + def get_free_space(self): + """ + Returns the free space on the device in bytes. + """ + auth = HTTPBasicAuth("", self.password) + with self.session.get( + urljoin(self.device_location, "fs/"), + auth=auth, + headers={"Accept": "application/json"}, + timeout=self.timeout, + ) as r: + r.raise_for_status() + if r.json().get("free") is None: + self.logger.error("Unable to get free block count from device.") + click.secho("Unable to get free block count from device.", fg="red") + elif r.json().get("block_size") is None: + self.logger.error("Unable to get block size from device.") + click.secho("Unable to get block size from device.", fg="red") + elif r.json().get("writable") is None or r.json().get("writable") is False: + self.logger.error( + "CircuitPython Web Workflow Device not writable\n - " + "Remount storage as writable to device (not PC)" + ) + click.secho( + "CircuitPython Web Workflow Device not writable\n - " + "Remount storage as writable to device (not PC)", + fg="red", + ) + else: + return r.json()["free"] * r.json()["block_size"] # bytes + sys.exit(1) + + +class DiskBackend(Backend): + """ + Backend for interacting with a device via USB Workflow + + :param String device_location: Path to the device + :param logger: logger to use for outputting messages + :param String boot_out: Optional mock contents of a boot_out.txt file + to use for version information. + """ + + def __init__(self, device_location, logger, boot_out=None): + if device_location is None: + raise ValueError( + "Auto locating USB Disk based device failed. " + "Please specify --path argument or ensure your device " + "is connected and mounted under the name CIRCUITPY." + ) + super().__init__(logger) + self.LIB_DIR_PATH = "lib" + self.device_location = device_location + self.library_path = os.path.join(self.device_location, self.LIB_DIR_PATH) + self.version_info = None + if boot_out is not None: + self.version_info = self.parse_boot_out_file(boot_out) + + def get_circuitpython_version(self): + """ + Returns the version number of CircuitPython running on the board connected + via ``device_path``, along with the board ID. This is obtained from the + ``boot_out.txt`` file on the device, whose first line will start with + something like this:: + + Adafruit CircuitPython 4.1.0 on 2019-08-02; + + While the second line is:: + + Board ID:raspberry_pi_pico + + :return: A tuple with the version string for CircuitPython and the board ID string. + """ + if not self.version_info: + try: + with open( + os.path.join(self.device_location, "boot_out.txt"), + "r", + encoding="utf-8", + ) as boot: + boot_out_contents = boot.read() + circuit_python, board_id = self.parse_boot_out_file( + boot_out_contents + ) + except FileNotFoundError: + click.secho( + "Missing file boot_out.txt on the device: wrong path or drive corrupted.", + fg="red", + ) + self.logger.error("boot_out.txt not found.") + sys.exit(1) + return circuit_python, board_id + + return self.version_info + + def _get_modules(self, device_lib_path): + """ + Get a dictionary containing metadata about all the Python modules found in + the referenced path. + + :param str device_lib_path: URL to be used to find modules. + :return: A dictionary containing metadata about the found modules. + """ + return _get_modules_file(device_lib_path, self.logger) + + def _create_library_directory(self, device_path, library_path): + if not os.path.exists(library_path): # pragma: no cover + os.makedirs(library_path) + + def _install_module_mpy(self, bundle, metadata): + """ + :param bundle library bundle. + :param library_path library path + :param metadata dictionary. + """ + module_name = os.path.basename(metadata["path"]).replace(".py", ".mpy") + if not module_name: + # Must be a directory based module. + module_name = os.path.basename(os.path.dirname(metadata["path"])) + + major_version = self.get_circuitpython_version()[0].split(".")[0] + bundle_platform = "{}mpy".format(major_version) + bundle_path = os.path.join(bundle.lib_dir(bundle_platform), module_name) + if os.path.isdir(bundle_path): + target_path = os.path.join(self.library_path, module_name) + # Copy the directory. + shutil.copytree(bundle_path, target_path) + elif os.path.isfile(bundle_path): + + target = os.path.basename(bundle_path) + + target_path = os.path.join(self.library_path, target) + + # Copy file. + shutil.copyfile(bundle_path, target_path) + else: + raise IOError("Cannot find compiled version of module.") + + # pylint: enable=too-many-locals,too-many-branches + def _install_module_py(self, metadata): + """ + :param library_path library path + :param metadata dictionary. + """ + + source_path = metadata["path"] # Path to Python source version. + if os.path.isdir(source_path): + target = os.path.basename(os.path.dirname(source_path)) + target_path = os.path.join(self.library_path, target) + # Copy the directory. + shutil.copytree(source_path, target_path) + else: + target = os.path.basename(source_path) + target_path = os.path.join(self.library_path, target) + # Copy file. + shutil.copyfile(source_path, target_path) + + def get_auto_file_path(self, auto_file_path): + """ + Returns the path on the device to the file to be read for --auto. + """ + return auto_file_path + + def uninstall(self, device_path, module_path): + """ + Uninstall module using local file system. + """ + library_path = os.path.join(device_path, "lib") + if os.path.isdir(module_path): + target = os.path.basename(os.path.dirname(module_path)) + target_path = os.path.join(library_path, target) + # Remove the directory. + shutil.rmtree(target_path) + else: + target = os.path.basename(module_path) + target_path = os.path.join(library_path, target) + # Remove file + os.remove(target_path) + + def update(self, module): + """ + Delete the module on the device, then copy the module from the bundle + back onto the device. + + The caller is expected to handle any exceptions raised. + """ + self._update_file(module) + + def _update_file(self, module): + """ + Update the module using file system. + """ + if os.path.isdir(module.path): + # Delete and copy the directory. + shutil.rmtree(module.path, ignore_errors=True) + shutil.copytree(module.bundle_path, module.path) + else: + # Delete and copy file. + os.remove(module.path) + shutil.copyfile(module.bundle_path, module.path) + + def get_file_path(self, filename): + """ + returns the full path on the device to a given file name. + """ + return os.path.join(self.device_location, filename) + + def is_device_present(self): + """ + returns True if the device is currently connected + """ + return os.path.exists(self.device_location) + + def get_free_space(self): + """ + Returns the free space on the device in bytes. + """ + # pylint: disable=unused-variable + _, total, free = shutil.disk_usage(self.device_location) + return free diff --git a/circup/shared.py b/circup/shared.py new file mode 100644 index 0000000..eb45430 --- /dev/null +++ b/circup/shared.py @@ -0,0 +1,144 @@ +# SPDX-FileCopyrightText: 2019 Nicholas Tollervey, written for Adafruit Industries +# SPDX-FileCopyrightText: 2023 Tim Cocks, written for Adafruit Industries +# +# SPDX-License-Identifier: MIT +""" +Utilities that are shared and used by both click CLI command functions +and Backend class functions. +""" +import glob +import os +import re + +import appdirs + +#: Version identifier for a bad MPY file format +BAD_FILE_FORMAT = "Invalid" + +#: The location of data files used by circup (following OS conventions). +DATA_DIR = appdirs.user_data_dir(appname="circup", appauthor="adafruit") + + +def _get_modules_file(path, logger): + """ + Get a dictionary containing metadata about all the Python modules found in + the referenced file system path. + + :param str path: The directory in which to find modules. + :return: A dictionary containing metadata about the found modules. + """ + result = {} + if not path: + return result + single_file_py_mods = glob.glob(os.path.join(path, "*.py")) + single_file_mpy_mods = glob.glob(os.path.join(path, "*.mpy")) + package_dir_mods = [ + d + for d in glob.glob(os.path.join(path, "*", "")) + if not os.path.basename(os.path.normpath(d)).startswith(".") + ] + single_file_mods = single_file_py_mods + single_file_mpy_mods + for sfm in [f for f in single_file_mods if not os.path.basename(f).startswith(".")]: + metadata = extract_metadata(sfm, logger) + metadata["path"] = sfm + result[os.path.basename(sfm).replace(".py", "").replace(".mpy", "")] = metadata + for package_path in package_dir_mods: + name = os.path.basename(os.path.dirname(package_path)) + py_files = glob.glob(os.path.join(package_path, "**/*.py"), recursive=True) + mpy_files = glob.glob(os.path.join(package_path, "**/*.mpy"), recursive=True) + all_files = py_files + mpy_files + # default value + result[name] = {"path": package_path, "mpy": bool(mpy_files)} + # explore all the submodules to detect bad ones + for source in [f for f in all_files if not os.path.basename(f).startswith(".")]: + metadata = extract_metadata(source, logger) + if "__version__" in metadata: + metadata["path"] = package_path + result[name] = metadata + # break now if any of the submodules has a bad format + if metadata["__version__"] == BAD_FILE_FORMAT: + break + return result + + +def extract_metadata(path, logger): + # pylint: disable=too-many-locals,too-many-branches + """ + Given a file path, return a dictionary containing metadata extracted from + dunder attributes found therein. Works with both .py and .mpy files. + + For Python source files, such metadata assignments should be simple and + single-line. For example:: + + __version__ = "1.1.4" + __repo__ = "https://github.com/adafruit/SomeLibrary.git" + + For byte compiled .mpy files, a brute force / backtrack approach is used + to find the __version__ number in the file -- see comments in the + code for the implementation details. + + :param str path: The path to the file containing the metadata. + :return: The dunder based metadata found in the file, as a dictionary. + """ + result = {} + logger.info("%s", path) + if path.endswith(".py"): + result["mpy"] = False + with open(path, "r", encoding="utf-8") as source_file: + content = source_file.read() + #: The regex used to extract ``__version__`` and ``__repo__`` assignments. + dunder_key_val = r"""(__\w+__)(?:\s*:\s*\w+)?\s*=\s*(?:['"]|\(\s)(.+)['"]""" + for match in re.findall(dunder_key_val, content): + result[match[0]] = str(match[1]) + if result: + logger.info("Extracted metadata: %s", result) + elif path.endswith(".mpy"): + find_by_regexp_match = False + result["mpy"] = True + with open(path, "rb") as mpy_file: + content = mpy_file.read() + # Track the MPY version number + mpy_version = content[0:2] + compatibility = None + loc = -1 + # Find the start location of the __version__ + if mpy_version == b"M\x03": + # One byte for the length of "__version__" + loc = content.find(b"__version__") - 1 + compatibility = (None, "7.0.0-alpha.1") + elif mpy_version == b"C\x05": + # Two bytes for the length of "__version__" in mpy version 5 + loc = content.find(b"__version__") - 2 + compatibility = ("7.0.0-alpha.1", "8.99.99") + elif mpy_version == b"C\x06": + # Two bytes in mpy version 6 + find_by_regexp_match = True + compatibility = ("9.0.0-alpha.1", None) + if find_by_regexp_match: + # Too hard to find the version positionally. + # Find the first thing that looks like an x.y.z version number. + match = re.search(rb"([\d]+\.[\d]+\.[\d]+)\x00", content) + if match: + result["__version__"] = match.group(1).decode("utf-8") + elif loc > -1: + # Backtrack until a byte value of the offset is reached. + offset = 1 + while offset < loc: + val = int(content[loc - offset]) + if mpy_version == b"C\x05": + val = val // 2 + if val == offset - 1: # Off by one..! + # Found version, extract the number given boundaries. + start = loc - offset + 1 # No need for prepended length. + end = loc # Up to the start of the __version__. + version = content[start:end] # Slice the version number. + # Create a string version as metadata in the result. + result["__version__"] = version.decode("utf-8") + break # Nothing more to do. + offset += 1 # ...and again but backtrack by one. + if compatibility: + result["compatibility"] = compatibility + else: + # not a valid MPY file + result["__version__"] = BAD_FILE_FORMAT + return result diff --git a/tests/mock_device/boot_out.txt b/tests/mock_device/boot_out.txt new file mode 100644 index 0000000..43d58ee --- /dev/null +++ b/tests/mock_device/boot_out.txt @@ -0,0 +1,3 @@ +Adafruit CircuitPython 4.1.0 on 2019-08-02; Adafruit CircuitPlayground Express with samd21g18 +Board ID:this_is_a_board +UID:AAAABBBBCCCC diff --git a/tests/mock_device/boot_out.txt.license b/tests/mock_device/boot_out.txt.license new file mode 100644 index 0000000..7207a5d --- /dev/null +++ b/tests/mock_device/boot_out.txt.license @@ -0,0 +1,3 @@ +# SPDX-FileCopyrightText: 2023 Tim Cocks, written for Adafruit Industries +# +# SPDX-License-Identifier: MIT diff --git a/tests/test_circup.py b/tests/test_circup.py index db1924d..73672cb 100644 --- a/tests/test_circup.py +++ b/tests/test_circup.py @@ -31,14 +31,12 @@ import json import pathlib from unittest import mock - - from click.testing import CliRunner import pytest import requests import circup - +from circup import DiskBackend TEST_BUNDLE_CONFIG_JSON = "tests/test_bundle_config.json" with open(TEST_BUNDLE_CONFIG_JSON, "rb") as tbc: @@ -211,18 +209,29 @@ def test_Module_init_file_module(): Ensure the Module instance is set up as expected and logged, as if for a single file Python module. """ - path = os.path.join("foo", "bar", "baz", "local_module.py") + name = "local_module.py" + path = os.path.join("mock_device", "lib", name) repo = "https://github.com/adafruit/SomeLibrary.git" device_version = "1.2.3" bundle_version = "3.2.1" + with mock.patch("circup.logger.info") as mock_logger, mock.patch( "circup.os.path.isfile", return_value=True ), mock.patch("circup.CPY_VERSION", "4.1.2"), mock.patch( - "circup.Bundle.lib_dir", return_value="tests" + "circup.Bundle.lib_dir", + return_value="tests", ): + backend = DiskBackend("mock_device", mock_logger) bundle = circup.Bundle(TEST_BUNDLE_NAME) m = circup.Module( - path, repo, device_version, bundle_version, False, bundle, (None, None) + name, + backend, + repo, + device_version, + bundle_version, + False, + bundle, + (None, None), ) mock_logger.assert_called_once_with(m) assert m.path == path @@ -240,7 +249,8 @@ def test_Module_init_directory_module(): Ensure the Module instance is set up as expected and logged, as if for a directory based Python module. """ - path = os.path.join("foo", "bar", "dir_module", "") + name = "dir_module/" + path = os.path.join("mock_device", "lib", f"{name}", "") repo = "https://github.com/adafruit/SomeLibrary.git" device_version = "1.2.3" bundle_version = "3.2.1" @@ -252,9 +262,17 @@ def test_Module_init_directory_module(): ), mock.patch( "circup.Bundle.lib_dir", return_value="tests" ): + backend = DiskBackend("mock_device", mock_logger) bundle = circup.Bundle(TEST_BUNDLE_NAME) m = circup.Module( - path, repo, device_version, bundle_version, mpy, bundle, (None, None) + name, + backend, + repo, + device_version, + bundle_version, + mpy, + bundle, + (None, None), ) mock_logger.assert_called_once_with(m) assert m.path == path @@ -274,15 +292,23 @@ def test_Module_outofdate(): out of date. """ bundle = circup.Bundle(TEST_BUNDLE_NAME) - path = os.path.join("foo", "bar", "baz", "module.py") + name = "module.py" repo = "https://github.com/adafruit/SomeLibrary.git" - m1 = circup.Module(path, repo, "1.2.3", "3.2.1", False, bundle, (None, None)) - m2 = circup.Module(path, repo, "1.2.3", "1.2.3", False, bundle, (None, None)) - # shouldn't happen! - m3 = circup.Module(path, repo, "3.2.1", "1.2.3", False, bundle, (None, None)) - assert m1.outofdate is True - assert m2.outofdate is False - assert m3.outofdate is False + with mock.patch("circup.logger.info") as mock_logger: + backend = DiskBackend("mock_device", mock_logger) + m1 = circup.Module( + name, backend, repo, "1.2.3", "3.2.1", False, bundle, (None, None) + ) + m2 = circup.Module( + name, backend, repo, "1.2.3", "1.2.3", False, bundle, (None, None) + ) + # shouldn't happen! + m3 = circup.Module( + name, backend, repo, "3.2.1", "1.2.3", False, bundle, (None, None) + ) + assert m1.outofdate is True + assert m2.outofdate is False + assert m3.outofdate is False def test_Module_outofdate_bad_versions(): @@ -292,14 +318,25 @@ def test_Module_outofdate_bad_versions(): this problem). Such a problem should be logged. """ bundle = circup.Bundle(TEST_BUNDLE_NAME) - path = os.path.join("foo", "bar", "baz", "module.py") + name = "module.py" + repo = "https://github.com/adafruit/SomeLibrary.git" device_version = "hello" bundle_version = "3.2.1" - m = circup.Module( - path, repo, device_version, bundle_version, False, bundle, (None, None) - ) + with mock.patch("circup.logger.warning") as mock_logger: + backend = DiskBackend("mock_device", mock_logger) + m = circup.Module( + name, + backend, + repo, + device_version, + bundle_version, + False, + bundle, + (None, None), + ) + assert m.outofdate is True assert mock_logger.call_count == 2 @@ -310,16 +347,28 @@ def test_Module_mpy_mismatch(): boolean value to correctly indicate if the referenced module is, in fact, out of date. """ - path = os.path.join("foo", "bar", "baz", "module.mpy") + name = "module.py" repo = "https://github.com/adafruit/SomeLibrary.git" - with mock.patch("circup.CPY_VERSION", "8.0.0"): + with mock.patch("circup.CPY_VERSION", "8.0.0"), mock.patch( + "circup.logger.warning" + ) as mock_logger: + backend = DiskBackend("mock_device", mock_logger) bundle = circup.Bundle(TEST_BUNDLE_NAME) - m1 = circup.Module(path, repo, "1.2.3", "1.2.3", True, bundle, (None, None)) + m1 = circup.Module( + name, backend, repo, "1.2.3", "1.2.3", True, bundle, (None, None) + ) m2 = circup.Module( - path, repo, "1.2.3", "1.2.3", True, bundle, ("7.0.0-alpha.1", "8.99.99") + name, + backend, + repo, + "1.2.3", + "1.2.3", + True, + bundle, + ("7.0.0-alpha.1", "8.99.99"), ) m3 = circup.Module( - path, repo, "1.2.3", "1.2.3", True, bundle, (None, "7.0.0-alpha.1") + name, backend, repo, "1.2.3", "1.2.3", True, bundle, (None, "7.0.0-alpha.1") ) with mock.patch("circup.CPY_VERSION", "6.2.0"): assert m1.mpy_mismatch is False @@ -345,14 +394,23 @@ def test_Module_major_update_bad_versions(): Such a problem should be logged. """ bundle = circup.Bundle(TEST_BUNDLE_NAME) - path = os.path.join("foo", "bar", "baz", "module.py") + name = "module.py" repo = "https://github.com/adafruit/SomeLibrary.git" device_version = "1.2.3" bundle_version = "version-3" - m = circup.Module( - path, repo, device_version, bundle_version, False, bundle, (None, None) - ) + with mock.patch("circup.logger.warning") as mock_logger: + backend = DiskBackend("mock_device", mock_logger) + m = circup.Module( + name, + backend, + repo, + device_version, + bundle_version, + False, + bundle, + (None, None), + ) assert m.major_update is True assert mock_logger.call_count == 2 @@ -363,16 +421,23 @@ def test_Module_row(): a table of version-related results. """ bundle = circup.Bundle(TEST_BUNDLE_NAME) - path = os.path.join("foo", "bar", "baz", "module.py") + name = "module.py" repo = "https://github.com/adafruit/SomeLibrary.git" with mock.patch("circup.os.path.isfile", return_value=True), mock.patch( "circup.CPY_VERSION", "8.0.0" - ): - m = circup.Module(path, repo, "1.2.3", None, False, bundle, (None, None)) + ), mock.patch("circup.logger.warning") as mock_logger: + backend = DiskBackend("mock_device", mock_logger) + m = circup.Module( + name, backend, repo, "1.2.3", None, False, bundle, (None, None) + ) assert m.row == ("module", "1.2.3", "unknown", "Major Version") - m = circup.Module(path, repo, "1.2.3", "1.3.4", False, bundle, (None, None)) + m = circup.Module( + name, backend, repo, "1.2.3", "1.3.4", False, bundle, (None, None) + ) assert m.row == ("module", "1.2.3", "1.3.4", "Minor Version") - m = circup.Module(path, repo, "1.2.3", "1.2.3", True, bundle, ("9.0.0", None)) + m = circup.Module( + name, backend, repo, "1.2.3", "1.2.3", True, bundle, ("9.0.0", None) + ) assert m.row == ("module", "1.2.3", "1.2.3", "MPY Format") @@ -382,17 +447,25 @@ def test_Module_update_dir(): update the module on the connected device. """ bundle = circup.Bundle(TEST_BUNDLE_NAME) - path = os.path.join("foo", "bar", "baz", "module.py") + name = "adafruit_waveform" repo = "https://github.com/adafruit/SomeLibrary.git" device_version = "1.2.3" bundle_version = None - m = circup.Module( - path, repo, device_version, bundle_version, False, bundle, (None, None) - ) - with mock.patch("circup.shutil") as mock_shutil, mock.patch( + with mock.patch("circup.backends.shutil") as mock_shutil, mock.patch( "circup.os.path.isdir", return_value=True - ): - m.update() + ), mock.patch("circup.logger.warning") as mock_logger: + backend = DiskBackend("mock_device", mock_logger) + m = circup.Module( + name, + backend, + repo, + device_version, + bundle_version, + False, + bundle, + (None, None), + ) + backend.update(m) mock_shutil.rmtree.assert_called_once_with(m.path, ignore_errors=True) mock_shutil.copytree.assert_called_once_with(m.bundle_path, m.path) @@ -403,17 +476,31 @@ def test_Module_update_file(): update the module on the connected device. """ bundle = circup.Bundle(TEST_BUNDLE_NAME) - path = os.path.join("foo", "bar", "baz", "module.py") + name = "colorsys.py" + # path = os.path.join("foo", "bar", "baz", "module.py") repo = "https://github.com/adafruit/SomeLibrary.git" device_version = "1.2.3" bundle_version = None - m = circup.Module( - path, repo, device_version, bundle_version, False, bundle, (None, None) - ) - with mock.patch("circup.shutil") as mock_shutil, mock.patch( + + with mock.patch("circup.backends.shutil") as mock_shutil, mock.patch( "circup.os.remove" - ) as mock_remove, mock.patch("circup.os.path.isdir", return_value=False): - m.update() + ) as mock_remove, mock.patch( + "circup.os.path.isdir", return_value=False + ), mock.patch( + "circup.logger.warning" + ) as mock_logger: + backend = circup.DiskBackend("mock_device", mock_logger) + m = circup.Module( + name, + backend, + repo, + device_version, + bundle_version, + False, + bundle, + (None, None), + ) + backend.update(m) mock_remove.assert_called_once_with(m.path) mock_shutil.copyfile.assert_called_once_with(m.bundle_path, m.path) @@ -422,16 +509,27 @@ def test_Module_repr(): """ Ensure the repr(dict) is returned (helps when logging). """ - path = os.path.join("foo", "bar", "baz", "local_module.py") + name = "local_module.py" + path = os.path.join("mock_device", "lib", f"{name}") repo = "https://github.com/adafruit/SomeLibrary.git" device_version = "1.2.3" bundle_version = "3.2.1" with mock.patch("circup.os.path.isfile", return_value=True), mock.patch( "circup.CPY_VERSION", "4.1.2" - ), mock.patch("circup.Bundle.lib_dir", return_value="tests"): + ), mock.patch("circup.Bundle.lib_dir", return_value="tests"), mock.patch( + "circup.logger.warning" + ) as mock_logger: bundle = circup.Bundle(TEST_BUNDLE_NAME) + backend = circup.DiskBackend("mock_device", mock_logger) m = circup.Module( - path, repo, device_version, bundle_version, False, bundle, (None, None) + name, + backend, + repo, + device_version, + bundle_version, + False, + bundle, + (None, None), ) assert repr(m) == repr( { @@ -563,8 +661,10 @@ def test_extract_metadata_python(): 'print("Hello, world!")\n' ) path = "foo.py" - with mock.patch("builtins.open", mock.mock_open(read_data=code)) as mock_open: - result = circup.extract_metadata(path) + with mock.patch( + "builtins.open", mock.mock_open(read_data=code) + ) as mock_open, mock.patch("circup.logger.warning") as mock_logger: + result = circup.extract_metadata(path, mock_logger) mock_open.assert_called_once_with(path, "r", encoding="utf-8") assert len(result) == 3 assert result["__version__"] == "1.1.4" @@ -578,10 +678,11 @@ def test_extract_metadata_byte_code_v6(): Ensure the __version__ is correctly extracted from the bytecode ".mpy" file generated from Circuitpython < 7. Version in test_module is 0.9.2 """ - result = circup.extract_metadata("tests/test_module.mpy") - assert result["__version__"] == "0.9.2" - assert result["mpy"] is True - assert result["compatibility"] == (None, "7.0.0-alpha.1") + with mock.patch("circup.logger.warning") as mock_logger: + result = circup.extract_metadata("tests/test_module.mpy", mock_logger) + assert result["__version__"] == "0.9.2" + assert result["mpy"] is True + assert result["compatibility"] == (None, "7.0.0-alpha.1") def test_extract_metadata_byte_code_v7(): @@ -589,10 +690,11 @@ def test_extract_metadata_byte_code_v7(): Ensure the __version__ is correctly extracted from the bytecode ".mpy" file generated from Circuitpython >= 7. Version in local_module_cp7 is 1.2.3 """ - result = circup.extract_metadata("tests/local_module_cp7.mpy") - assert result["__version__"] == "1.2.3" - assert result["mpy"] is True - assert result["compatibility"] == ("7.0.0-alpha.1", "8.99.99") + with mock.patch("circup.logger.warning") as mock_logger: + result = circup.extract_metadata("tests/local_module_cp7.mpy", mock_logger) + assert result["__version__"] == "1.2.3" + assert result["mpy"] is True + assert result["compatibility"] == ("7.0.0-alpha.1", "8.99.99") def test_find_modules(): @@ -604,18 +706,23 @@ def test_find_modules(): device_modules = json.load(f) with open("tests/bundle.json", "rb") as f: bundle_modules = json.load(f) + with mock.patch( - "circup.get_device_versions", return_value=device_modules + "circup.DiskBackend.get_device_versions", return_value=device_modules ), mock.patch( "circup.get_bundle_versions", return_value=bundle_modules ), mock.patch( "circup.os.path.isfile", return_value=True - ): + ), mock.patch( + "circup.logger.warning" + ) as mock_logger: + backend = DiskBackend("mock_device", mock_logger) bundle = circup.Bundle(TEST_BUNDLE_NAME) bundles_list = [bundle] for module in bundle_modules: bundle_modules[module]["bundle"] = bundle - result = circup.find_modules("", bundles_list) + + result = circup.find_modules(backend, bundles_list) assert len(result) == 1 assert result[0].name == "adafruit_74hc595" assert ( @@ -630,13 +737,16 @@ def test_find_modules_goes_bang(): and the utility exists with an error code of 1. """ with mock.patch( - "circup.get_device_versions", side_effect=Exception("BANG!") + "circup.DiskBackend.get_device_versions", side_effect=Exception("BANG!") ), mock.patch("circup.click") as mock_click, mock.patch( "circup.sys.exit" - ) as mock_exit: + ) as mock_exit, mock.patch( + "circup.logger.warning" + ) as mock_logger: bundle = circup.Bundle(TEST_BUNDLE_NAME) bundles_list = [bundle] - circup.find_modules("", bundles_list) + backend = DiskBackend("mock_devcie", mock_logger) + circup.find_modules(backend, bundles_list) assert mock_click.echo.call_count == 1 mock_exit.assert_called_once_with(1) @@ -647,19 +757,21 @@ def test_get_bundle_versions(): Ensure ensure_latest_bundle is called even if lib_dir exists. """ with mock.patch("circup.ensure_latest_bundle") as mock_elb, mock.patch( - "circup.get_modules", return_value={"ok": {"name": "ok"}} + "circup._get_modules_file", return_value={"ok": {"name": "ok"}} ) as mock_gm, mock.patch("circup.CPY_VERSION", "4.1.2"), mock.patch( "circup.Bundle.lib_dir", return_value="foo/bar/lib" ), mock.patch( "circup.os.path.isdir", return_value=True - ): + ), mock.patch( + "circup.logger" + ) as mock_logger: bundle = circup.Bundle(TEST_BUNDLE_NAME) bundles_list = [bundle] assert circup.get_bundle_versions(bundles_list) == { "ok": {"name": "ok", "bundle": bundle} } mock_elb.assert_called_once_with(bundle) - mock_gm.assert_called_once_with("foo/bar/lib") + mock_gm.assert_called_once_with("foo/bar/lib", mock_logger) def test_get_bundle_versions_avoid_download(): @@ -668,10 +780,12 @@ def test_get_bundle_versions_avoid_download(): Testing both cases: lib_dir exists and lib_dir doesn't exists. """ with mock.patch("circup.ensure_latest_bundle") as mock_elb, mock.patch( - "circup.get_modules", return_value={"ok": {"name": "ok"}} + "circup._get_modules_file", return_value={"ok": {"name": "ok"}} ) as mock_gm, mock.patch("circup.CPY_VERSION", "4.1.2"), mock.patch( "circup.Bundle.lib_dir", return_value="foo/bar/lib" - ): + ), mock.patch( + "circup.logger" + ) as mock_logger: bundle = circup.Bundle(TEST_BUNDLE_NAME) bundles_list = [bundle] with mock.patch("circup.os.path.isdir", return_value=True): @@ -679,13 +793,13 @@ def test_get_bundle_versions_avoid_download(): "ok": {"name": "ok", "bundle": bundle} } assert mock_elb.call_count == 0 - mock_gm.assert_called_once_with("foo/bar/lib") + mock_gm.assert_called_once_with("foo/bar/lib", mock_logger) with mock.patch("circup.os.path.isdir", return_value=False): assert circup.get_bundle_versions(bundles_list, avoid_download=True) == { "ok": {"name": "ok", "bundle": bundle} } mock_elb.assert_called_once_with(bundle) - mock_gm.assert_called_with("foo/bar/lib") + mock_gm.assert_called_with("foo/bar/lib", mock_logger) def test_get_circuitpython_version(): @@ -693,36 +807,24 @@ def test_get_circuitpython_version(): Given valid content of a boot_out.txt file on a connected device, return the version number of CircuitPython running on the board. """ - device_path = "device" - data_no_id = ( - "Adafruit CircuitPython 4.1.0 on 2019-08-02; " - "Adafruit CircuitPlayground Express with samd21g18" - ) - with mock.patch("builtins.open", mock.mock_open(read_data=data_no_id)) as mock_open: - assert circup.get_circuitpython_version(device_path) == ("4.1.0", "") - mock_open.assert_called_once_with( - os.path.join(device_path, "boot_out.txt"), "r", encoding="utf-8" - ) - data_with_id = data_no_id + "\r\n" "Board ID:this_is_a_board" - with mock.patch( - "builtins.open", mock.mock_open(read_data=data_with_id) - ) as mock_open: - assert circup.get_circuitpython_version(device_path) == ( + with mock.patch("circup.logger.warning") as mock_logger: + backend = DiskBackend("tests/mock_device", mock_logger) + assert backend.get_circuitpython_version() == ( "4.1.0", "this_is_a_board", ) - mock_open.assert_called_once_with( - os.path.join(device_path, "boot_out.txt"), "r", encoding="utf-8" - ) def test_get_device_versions(): """ Ensure get_modules is called with the path for the attached device. """ - with mock.patch("circup.get_modules", return_value="ok") as mock_gm: - assert circup.get_device_versions("TESTDIR") == "ok" - mock_gm.assert_called_once_with(os.path.join("TESTDIR", "lib")) + with mock.patch( + "circup.DiskBackend.get_modules", return_value="ok" + ) as mock_gm, mock.patch("circup.logger.warning") as mock_logger: + backend = circup.DiskBackend("mock_device", mock_logger) + assert backend.get_device_versions() == "ok" + mock_gm.assert_called_once_with(os.path.join("mock_device", "lib")) def test_get_modules_empty_path(): @@ -730,7 +832,9 @@ def test_get_modules_empty_path(): Sometimes a path to a device or bundle may be empty. Ensure, if this is the case, an empty dictionary is returned. """ - assert circup.get_modules("") == {} + with mock.patch("circup.logger.warning") as mock_logger: + backend = circup.DiskBackend("tests/mock_device", mock_logger) + assert backend.get_modules("") == {} def test_get_modules_that_are_files(): @@ -743,8 +847,11 @@ def test_get_modules_that_are_files(): os.path.join("tests", "local_module.py"), os.path.join("tests", ".hidden_module.py"), ] - with mock.patch("circup.glob.glob", side_effect=[mods, [], []]): - result = circup.get_modules(path) + with mock.patch("circup.glob.glob", side_effect=[mods, [], []]), mock.patch( + "circup.logger.warning" + ) as mock_logger: + backend = circup.DiskBackend("mock_device", mock_logger) + result = backend.get_modules(path) assert len(result) == 1 # Hidden files are ignored. assert "local_module" in result assert result["local_module"]["path"] == os.path.join( @@ -766,8 +873,11 @@ def test_get_modules_that_are_directories(): os.path.join("tests", ".hidden_dir", ""), ] mod_files = ["tests/dir_module/my_module.py", "tests/dir_module/__init__.py"] - with mock.patch("circup.glob.glob", side_effect=[[], [], mods, mod_files, []]): - result = circup.get_modules(path) + with mock.patch( + "circup.glob.glob", side_effect=[[], [], mods, mod_files, []] + ), mock.patch("circup.logger.warning") as mock_logger: + backend = circup.DiskBackend("mock_device", mock_logger) + result = backend.get_modules(path) assert len(result) == 1 assert "dir_module" in result assert result["dir_module"]["path"] == os.path.join("tests", "dir_module", "") @@ -784,8 +894,11 @@ def test_get_modules_that_are_directories_with_no_metadata(): path = "tests" # mocked away in function. mods = [os.path.join("tests", "bad_module", "")] mod_files = ["tests/bad_module/my_module.py", "tests/bad_module/__init__.py"] - with mock.patch("circup.glob.glob", side_effect=[[], [], mods, mod_files, []]): - result = circup.get_modules(path) + with mock.patch( + "circup.glob.glob", side_effect=[[], [], mods, mod_files, []] + ), mock.patch("circup.logger.warning") as mock_logger: + backend = circup.DiskBackend("mock_device", mock_logger) + result = backend.get_modules(path) assert len(result) == 1 assert "bad_module" in result assert result["bad_module"]["path"] == os.path.join("tests", "bad_module", "") @@ -1019,8 +1132,8 @@ def test_libraries_from_imports(): "adafruit_touchscreen", ] test_file = str(pathlib.Path(__file__).parent / "import_styles.py") - result = circup.libraries_from_imports(test_file, mod_names) - print(result) + + result = circup.libraries_from_code_py(test_file, mod_names) assert result == [ "adafruit_bus_device", "adafruit_button", @@ -1033,6 +1146,16 @@ def test_libraries_from_imports_bad(): """Ensure that we catch an import error""" TEST_BUNDLE_MODULES = {"one.py": {}, "two.py": {}, "three.py": {}} runner = CliRunner() + with mock.patch("circup.get_bundle_versions", return_value=TEST_BUNDLE_MODULES): - result = runner.invoke(circup.install, ["--auto-file", "./tests/bad_python.py"]) + result = runner.invoke( + circup.main, + [ + "--path", + "./tests/mock_device/", + "install", + "--auto-file", + "./tests/bad_python.py", + ], + ) assert result.exit_code == 2