diff --git a/winpython/piptree.py b/winpython/piptree.py index b47e165f..abb1a082 100644 --- a/winpython/piptree.py +++ b/winpython/piptree.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- """ -This script provides functionality to inspect and display package dependencies -in a Python environment, including both downward and upward dependencies. +Enhanced script to inspect and display Python package dependencies, +supporting both downward and upward dependency trees. Requires Python 3.8+ due to importlib.metadata. """ @@ -10,52 +10,32 @@ import re import platform import os +import logging +from functools import lru_cache from collections import OrderedDict from typing import Dict, List, Optional, Tuple, Union from pip._vendor.packaging.markers import Marker from importlib.metadata import Distribution, distributions from pathlib import Path -def normalize(name: str) -> str: - """ - Normalize package name according to PEP 503. +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) - This function converts a package name to its canonical form by replacing - any sequence of dashes, underscores, or dots with a single dash and - converting the result to lowercase. - - :param name: The package name to normalize - :return: The normalized package name - """ - return re.sub(r"[-_.]+", "-", name).lower() +class PipDataError(Exception): + """Custom exception for PipData related errors.""" + pass def sum_up(text: str, max_length: int = 144, stop_at: str = ". ") -> str: - """ - Summarize text to fit within max_length characters, ending at the last complete sentence if possible. - - This function attempts to create a summary of the given text that fits within - the specified maximum length. It tries to end the summary at a complete sentence - if possible. - - :param text: The text to summarize - :param max_length: Maximum length for summary - :param stop_at: String to stop summarization at - :return: Summarized text - """ + """Summarize text to fit within max_length, ending at last complete sentence.""" summary = (text + os.linesep).splitlines()[0] - if len(summary) > max_length and len(stop_at) > 1: - summary = (summary + stop_at).split(stop_at)[0] - if len(summary) > max_length: - summary = summary[:max_length] - return summary + if len(summary) <= max_length: + return summary + if stop_at and stop_at in summary[:max_length]: + return summary[:summary.rfind(stop_at, 0, max_length)] + stop_at.rstrip() + return summary[:max_length].rstrip() class PipData: - """ - Wrapper around Distribution.discover() or Distribution.distributions() to manage package metadata. - - This class provides methods to inspect and display package dependencies - in a Python environment, including both downward and upward dependencies. - """ + """Manages package metadata and dependency relationships in a Python environment.""" def __init__(self, target: Optional[str] = None): """ @@ -66,25 +46,21 @@ def __init__(self, target: Optional[str] = None): self.distro: Dict[str, Dict] = {} self.raw: Dict[str, Dict] = {} self.environment = self._get_environment() - - search_path = target or sys.executable - packages = self._get_packages(search_path) - - for package in packages: - self._process_package(package) - - # On a second pass, complement dependencies in reverse mode with 'wanted-per': - self._populate_reverse_dependencies() + try: + packages = self._get_packages(target or sys.executable) + self._process_packages(packages) + self._populate_reverse_dependencies() + except Exception as e: + raise PipDataError(f"Failed to initialize package data: {str(e)}") from e + + @staticmethod + @lru_cache(maxsize=None) + def normalize(name: str) -> str: + """Normalize package name per PEP 503.""" + return re.sub(r"[-_.]+", "-", name).lower() def _get_environment(self) -> Dict[str, str]: - """ - Collect environment details for dependency evaluation. - - This method gathers information about the system and Python environment, - which is used to evaluate package dependencies. - - :return: Dictionary containing system and Python environment information - """ + """Collect system and Python environment details.""" return { "implementation_name": sys.implementation.name, "implementation_version": f"{sys.implementation.version.major}.{sys.implementation.version.minor}.{sys.implementation.version.micro}", @@ -100,60 +76,37 @@ def _get_environment(self) -> Dict[str, str]: } def _get_packages(self, search_path: str) -> List[Distribution]: - """ - Get the list of packages based on the search path. - - This method retrieves the list of installed packages in the specified - search path. If the search path is the current executable, it uses - Distribution.discover(). Otherwise, it uses distributions() with the - specified path. - - :param search_path: Path to search for packages - :return: List of Distribution objects - """ + """Retrieve installed packages from the specified path.""" if sys.executable == search_path: return Distribution.discover() else: return distributions(path=[str(Path(search_path).parent / 'lib' / 'site-packages')]) - def _process_package(self, package: Distribution) -> None: - """ - Process package metadata and store it in the distro dictionary. - - This method extracts metadata from a Distribution object and stores it - in the distro dictionary. It also initializes the reverse dependencies - and provided extras for the package. - - :param package: The Distribution object to process - """ - meta = package.metadata - name = meta['Name'] - version = package.version - key = normalize(name) - self.raw[key] = meta - - self.distro[key] = { - "name": name, - "version": version, - "summary": meta.get("Summary", ""), - "requires_dist": self._get_requires(package), - "reverse_dependencies": [], - "description": meta.get("Description", ""), - "provides": self._get_provides(package), - "provided": {'': None} # Placeholder for extras provided by this package - } + def _process_packages(self, packages: List[Distribution]) -> None: + """Process packages metadata and store them in the distro dictionary.""" + for package in packages: + try: + meta = package.metadata + name = meta.get('Name') + if not name: + continue + key = self.normalize(name) + self.raw[key] = meta + self.distro[key] = { + "name": name, + "version": package.version, + "summary": meta.get("Summary", ""), + "requires_dist": self._get_requires(package), + "reverse_dependencies": [], + "description": meta.get("Description", ""), + "provides": self._get_provides(package), + "provided": {'': None} # Placeholder for extras provided by this package + } + except Exception as e: + logger.warning(f"Failed to process package {name}: {str(e)}", exc_info=True) def _get_requires(self, package: Distribution) -> List[Dict[str, str]]: - """ - Extract and normalize requirements for a package. - - This method parses the requirements of a package and normalizes them - into a list of dictionaries. Each dictionary contains the required - package key, version, extra, and marker (if any). - - :param package: The Distribution object to extract requirements from - :return: List of dictionaries containing normalized requirements - """ + """Extract and normalize requirements for a package.""" requires = [] replacements = str.maketrans({" ": " ", "[": "", "]": "", "'": "", '"': ""}) further_replacements = [ @@ -166,8 +119,8 @@ def _get_requires(self, package: Distribution) -> List[Dict[str, str]]: if package.requires: for req in package.requires: req_nameextra, req_marker = (req + ";").split(";")[:2] - req_nameextra = normalize(re.split(r" |;|==|!|>|<", req_nameextra + ";")[0]) - req_key = normalize((req_nameextra + "[").split("[")[0]) + req_nameextra = self.normalize(re.split(r" |;|==|!|>|<", req_nameextra + ";")[0]) + req_key = self.normalize((req_nameextra + "[").split("[")[0]) req_key_extra = req_nameextra[len(req_key) + 1:].split("]")[0] req_version = req[len(req_nameextra):].translate(replacements) @@ -185,16 +138,7 @@ def _get_requires(self, package: Distribution) -> List[Dict[str, str]]: return requires def _get_provides(self, package: Distribution) -> Dict[str, None]: - """ - Get the list of extras provided by this package. - - This method extracts the extras provided by a package from its requirements. - It returns a dictionary where the keys are the provided extras and the values - are None. - - :param package: The Distribution object to extract provided extras from - :return: Dictionary containing provided extras - """ + """Extract provided extras from package requirements.""" provides = {'': None} if package.requires: for req in package.requires: @@ -205,83 +149,62 @@ def _get_provides(self, package: Distribution) -> Dict[str, None]: return provides def _populate_reverse_dependencies(self) -> None: - """ - Add reverse dependencies to each package. - - This method populates the reverse dependencies for each package in the - distro dictionary. It iterates over the requirements of each package - and adds the package as a reverse dependency to the required packages. - """ - for package in self.distro: - for requirement in self.distro[package]["requires_dist"]: - if requirement["req_key"] in self.distro: - want_add = { - "req_key": package, - "req_version": requirement["req_version"], - "req_extra": requirement["req_extra"], - } - if "req_marker" in requirement: - want_add["req_marker"] = requirement["req_marker"] - if 'extra == ' in requirement["req_marker"]: + """Populate reverse dependencies.""" + for pkg_key, pkg_data in self.distro.items(): + for req in pkg_data["requires_dist"]: + target_key = req["req_key"] + if target_key in self.distro: + rev_dep = {"req_key": pkg_key, "req_version": req["req_version"], "req_extra": req["req_extra"]} + if "req_marker" in req: + rev_dep["req_marker"] = req["req_marker"] + if 'extra == ' in req["req_marker"]: remove_list = {ord("'"): None, ord('"'): None} - self.distro[requirement["req_key"]]["provided"][requirement["req_marker"].split('extra == ')[1].translate(remove_list)] = None - self.distro[requirement["req_key"]]["reverse_dependencies"].append(want_add) + self.distro[target_key]["provided"][req["req_marker"].split('extra == ')[1].translate(remove_list)] = None + self.distro[target_key]["reverse_dependencies"].append(rev_dep) def _get_dependency_tree(self, package_name: str, extra: str = "", version_req: str = "", depth: int = 20, path: Optional[List[str]] = None, verbose: bool = False, upward: bool = False) -> List[List[str]]: - """ - Recursive function to build dependency tree. - - This method builds a dependency tree for the specified package. It can - build the tree for downward dependencies (default) or upward dependencies - (if upward is True). The tree is built recursively up to the specified - depth. - - :param package_name: The name of the package to build the tree for - :param extra: The extra to include in the dependency tree - :param version_req: The version requirement for the package - :param depth: The maximum depth of the dependency tree - :param path: The current path in the dependency tree (used for cycle detection) - :param verbose: Whether to include verbose output in the tree - :param upward: Whether to build the tree for upward dependencies - :return: List of lists containing the dependency tree - """ + """Recursive function to build dependency tree.""" path = path or [] extras = extra.split(",") - package_key = normalize(package_name) + pkg_key = self.normalize(package_name) ret_all = [] - if package_key + "[" + extra + "]" in path: - print("cycle!", "->".join(path + [package_key + "[" + extra + "]"])) - return [] # Return empty list to avoid further recursion + full_name = f"{package_name}[{extra}]" if extra else package_name + if full_name in path: + logger.warning(f"Cycle detected: {' -> '.join(path + [full_name])}") + return [] - package_data = self.distro.get(package_key) - if package_data and len(path) <= depth: + pkg_data = self.distro[pkg_key] + if pkg_data and len(path) <= depth: for extra in extras: environment = {"extra": extra, **self.environment} - summary = f' {package_data["summary"]}' if verbose else '' + summary = f' {pkg_data["summary"]}' if verbose else '' base_name = f'{package_name}[{extra}]' if extra else package_name - ret = [f'{base_name}=={package_data["version"]} {version_req}{summary}'] + ret = [f'{base_name}=={pkg_data["version"]} {version_req}{summary}'] - dependencies = package_data["requires_dist"] if not upward else package_data["reverse_dependencies"] + dependencies = pkg_data["requires_dist"] if not upward else pkg_data["reverse_dependencies"] for dependency in dependencies: if dependency["req_key"] in self.distro: next_path = path + [base_name] if upward: up_req = (dependency.get("req_marker", "").split('extra == ')+[""])[1].strip("'\"") - # avoids circular links on dask[array] if dependency["req_key"] in self.distro and dependency["req_key"]+"["+up_req+"]" not in path: # upward dependancy taken if: # - if extra "" demanded, and no marker from upward package: like pandas[] ==> numpy # - or the extra is in the upward package, like pandas[test] ==> pytest, for 'test' extra # - or an extra "array" is demanded, and indeed in the req_extra list: array,dataframe,diagnostics,distributer - if (not dependency.get("req_marker") and extra =="" - ) or ("req_marker" in dependency and extra==up_req and dependency["req_key"]!=package_key and Marker(dependency["req_marker"]).evaluate(environment=environment) - ) or ("req_marker" in dependency and extra!="" and extra+',' in dependency["req_extra"]+',' and Marker(dependency["req_marker"]).evaluate(environment=environment|{"extra": up_req}) - ): + if (not dependency.get("req_marker") and extra == "") or \ + ("req_marker" in dependency and extra == up_req and \ + dependency["req_key"] != pkg_key and \ + Marker(dependency["req_marker"]).evaluate(environment=environment)) or \ + ("req_marker" in dependency and extra != "" and \ + extra + ',' in dependency["req_extra"] + ',' and \ + Marker(dependency["req_marker"]).evaluate(environment=environment | {"extra": up_req})): + # IA risk error: # dask[array] go upwards as dask[dataframe], so {"extra": up_req} , not {"extra": extra} ret += self._get_dependency_tree( dependency["req_key"], - up_req, # dask[array] going upwards continues as dask[dataframe] + up_req, f"[requires: {package_name}" + (f"[{dependency['req_extra']}]" if dependency["req_extra"] != "" else "") + f'{dependency["req_version"]}]', @@ -305,23 +228,9 @@ def _get_dependency_tree(self, package_name: str, extra: str = "", version_req: return ret_all def down(self, pp: str = "", extra: str = "", depth: int = 20, indent: int = 5, version_req: str = "", verbose: bool = False) -> str: - """ - Print the downward requirements for the package or all packages. - - This method prints the downward dependencies for the specified package - or all packages if pp is ".". It uses the _get_dependency_tree method - to build the dependency tree and formats the output as a JSON string. - - :param pp: The package name or "." to print dependencies for all packages - :param extra: The extra to include in the dependency tree - :param depth: The maximum depth of the dependency tree - :param indent: The indentation level for the JSON output - :param version_req: The version requirement for the package - :param verbose: Whether to include verbose output in the tree - :return: JSON string containing the downward dependencies - """ + """Generate downward dependency tree as formatted string.""" if pp == ".": - results = [self.down(one_pp, extra, depth, indent, version_req, verbose=verbose) for one_pp in sorted(self.distro)] + results = [self.down(p, extra, depth, indent, version_req, verbose=verbose) for p in sorted(self.distro)] return '\n'.join(filter(None, results)) if extra == ".": @@ -329,39 +238,25 @@ def down(self, pp: str = "", extra: str = "", depth: int = 20, indent: int = 5, results = [self.down(pp, one_extra, depth, indent, version_req, verbose=verbose) for one_extra in sorted(self.distro[pp]["provides"])] return '\n'.join(filter(None, results)) - return "" # Handle cases where extra is "." and package_name is not found. + return "" if pp not in self.distro: - return "" # Handle cases where package_name is not found. + return "" rawtext = json.dumps(self._get_dependency_tree(pp, extra, version_req, depth, verbose=verbose), indent=indent) lines = [l for l in rawtext.split("\n") if len(l.strip()) > 2] return "\n".join(lines).replace('"', "") def up(self, pp: str, extra: str = "", depth: int = 20, indent: int = 5, version_req: str = "", verbose: bool = False) -> str: - """ - Print the upward needs for the package. - - This method prints the upward dependencies for the specified package. - It uses the _get_dependency_tree method to build the dependency tree - and formats the output as a JSON string. - - :param pp: The package name - :param extra: The extra to include in the dependency tree - :param depth: The maximum depth of the dependency tree - :param indent: The indentation level for the JSON output - :param version_req: The version requirement for the package - :param verbose: Whether to include verbose output in the tree - :return: JSON string containing the upward dependencies - """ + """Generate upward dependency tree as formatted string.""" if pp == ".": - results = [self.up(one_pp, extra, depth, indent, version_req, verbose) for one_pp in sorted(self.distro)] + results = [self.up(p, extra, depth, indent, version_req, verbose) for p in sorted(self.distro)] return '\n'.join(filter(None, results)) if extra == ".": if pp in self.distro: extras = set(self.distro[pp]["provided"]).union(set(self.distro[pp]["provides"])) - results = [self.up(pp, one_extra, depth, indent, version_req, verbose=verbose) for one_extra in sorted(extras)] + results = [self.up(pp, e, depth, indent, version_req, verbose=verbose) for e in sorted(extras)] return '\n'.join(filter(None, results)) return "" @@ -373,42 +268,24 @@ def up(self, pp: str, extra: str = "", depth: int = 20, indent: int = 5, version return "\n".join(filter(None, lines)).replace('"', "") def description(self, pp: str) -> None: - """ - Return description of the package. - - This method prints the description of the specified package. - - :param pp: The package name - """ + """Return package description or None if not found.""" if pp in self.distro: return print("\n".join(self.distro[pp]["description"].split(r"\n"))) def summary(self, pp: str) -> str: - """ - Return summary of the package. - - This method returns the summary of the specified package. - - :param pp: The package name - :return: The summary of the package - """ + """Return package summary or empty string if not found.""" if pp in self.distro: return self.distro[pp]["summary"] return "" def pip_list(self, full: bool = False, max_length: int = 144) -> List[Tuple[str, Union[str, Tuple[str, str]]]]: - """ - List installed packages similar to pip list. - - This method lists the installed packages in a format similar to the - output of the `pip list` command. If full is True, it includes the - package version and summary. + """List installed packages with optional details. :param full: Whether to include the package version and summary :param max_length: The maximum length for the summary :return: List of tuples containing package information """ + pkgs = sorted(self.distro.items()) if full: - return [(p, self.distro[p]["version"], sum_up(self.distro[p]["summary"], max_length)) for p in sorted(self.distro)] - else: - return [(p, sum_up(self.distro[p]["version"], max_length)) for p in sorted(self.distro)] + return [(p, d["version"], sum_up(d["summary"], max_length)) for p, d in pkgs] + return [(p, d["version"]) for p, d in pkgs]