diff --git a/README.md b/README.md index 341829a..afe1c7b 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,9 @@ Pyscript also provides a kernel that interfaces with the Jupyter front-ends (eg, lab and VSCode). That allows you to develop and test pyscript code interactively. Plus you can interact with much of HASS by looking at state variables, calling services etc. +Pyscript can also generate IDE stub modules by calling the `pyscript.generate_stubs` service. +See the “IDE Helpers” section of the docs for setup details. + ## Documentation Here is the [pyscript documentation](https://hacs-pyscript.readthedocs.io/en/stable). diff --git a/custom_components/pyscript/__init__.py b/custom_components/pyscript/__init__.py index dd47e39..60c0bc0 100644 --- a/custom_components/pyscript/__init__.py +++ b/custom_components/pyscript/__init__.py @@ -5,6 +5,7 @@ import json import logging import os +import shutil import time import traceback from typing import Any, Callable, Dict, List, Set, Union @@ -37,7 +38,9 @@ FOLDER, LOGGER_PATH, REQUIREMENTS_FILE, + SERVICE_GENERATE_STUBS, SERVICE_JUPYTER_KERNEL_START, + SERVICE_RESPONSE_ONLY, UNSUB_LISTENERS, WATCHDOG_TASK, ) @@ -49,6 +52,7 @@ from .mqtt import Mqtt from .requirements import install_requirements from .state import State, StateVal +from .stubs.generator import StubsGenerator from .trigger import TrigTime from .webhook import Webhook @@ -300,6 +304,47 @@ async def reload_scripts_handler(call: ServiceCall) -> None: hass.services.async_register(DOMAIN, SERVICE_RELOAD, reload_scripts_handler) + async def generate_stubs_service(call: ServiceCall) -> Dict[str, Any]: + """Generate pyscript IDE stub files.""" + + generator = StubsGenerator(hass) + generated_body = await generator.build() + stubs_path = os.path.join(hass.config.path(FOLDER), "modules", "stubs") + + def write_stubs(path) -> dict[str, Any]: + res: dict[str, Any] = {} + try: + os.makedirs(path, exist_ok=True) + + builtins_path = os.path.join(os.path.dirname(__file__), "stubs", "pyscript_builtins.py") + shutil.copy2(builtins_path, path) + + gen_path = os.path.join(path, "pyscript_generated.py") + with open(gen_path, "w", encoding="utf-8") as f: + f.write(generated_body) + res["status"] = "OK" + return res + except Exception as e: + _LOGGER.exception("Stubs generation failed: %s", e) + res["status"] = "Error" + res["exception"] = str(e) + res["message"] = "Check pyscript logs" + return res + + result = await hass.async_add_executor_job(write_stubs, stubs_path) + + if generator.ignored_identifiers: + result["ignored_identifiers"] = sorted(generator.ignored_identifiers) + + if result["status"] == "OK": + _LOGGER.info("Pyscript stubs generated to %s", stubs_path) + + return result + + hass.services.async_register( + DOMAIN, SERVICE_GENERATE_STUBS, generate_stubs_service, supports_response=SERVICE_RESPONSE_ONLY + ) + async def jupyter_kernel_start(call: ServiceCall) -> None: """Handle Jupyter kernel start call.""" _LOGGER.debug("service call to jupyter_kernel_start: %s", call.data) diff --git a/custom_components/pyscript/const.py b/custom_components/pyscript/const.py index 26b675c..5800f78 100644 --- a/custom_components/pyscript/const.py +++ b/custom_components/pyscript/const.py @@ -34,6 +34,7 @@ CONF_INSTALLED_PACKAGES = "_installed_packages" SERVICE_JUPYTER_KERNEL_START = "jupyter_kernel_start" +SERVICE_GENERATE_STUBS = "generate_stubs" LOGGER_PATH = "custom_components.pyscript" diff --git a/custom_components/pyscript/eval.py b/custom_components/pyscript/eval.py index 1c4f9a0..85bad74 100644 --- a/custom_components/pyscript/eval.py +++ b/custom_components/pyscript/eval.py @@ -999,6 +999,14 @@ async def ast_importfrom(self, arg): raise ModuleNotFoundError(f"module '{imp.name}' not found") self.sym_table[imp.name if imp.asname is None else imp.asname] = mod return + if arg.module == "stubs" or arg.module.startswith("stubs."): + for imp in arg.names: + if imp.asname is not None: + raise ModuleNotFoundError( + f"from {arg.module} import {imp.name} *as {imp.asname}* not supported for stubs" + ) + _LOGGER.debug("Skipping stubs import %s", arg.module) + return mod, error_ctx = await self.global_ctx.module_import(arg.module, arg.level) if error_ctx: self.exception_obj = error_ctx.exception_obj diff --git a/custom_components/pyscript/services.yaml b/custom_components/pyscript/services.yaml index 0cf9a9f..8cad68c 100644 --- a/custom_components/pyscript/services.yaml +++ b/custom_components/pyscript/services.yaml @@ -105,3 +105,7 @@ jupyter_kernel_start: default: pyscript selector: text: + +generate_stubs: + name: Generate pyscript stubs + description: Build a stub files combining builtin helpers with discovered entities and services. diff --git a/custom_components/pyscript/stubs/generator.py b/custom_components/pyscript/stubs/generator.py new file mode 100644 index 0000000..b7b2ae1 --- /dev/null +++ b/custom_components/pyscript/stubs/generator.py @@ -0,0 +1,406 @@ +"""AST-based stubs generator used to build IDE helper files.""" + +from __future__ import annotations + +import ast +from dataclasses import dataclass +from enum import IntEnum, IntFlag, StrEnum +import keyword +import logging +from typing import Any, Literal + +from custom_components.pyscript.stubs.pyscript_builtins import StateVal +from homeassistant.core import HomeAssistant, split_entity_id +from homeassistant.helpers import entity_registry as er +from homeassistant.helpers.service import async_get_all_descriptions + +_LOGGER = logging.getLogger(__name__) + +_STATE_BASE_FIELDS = {attr for attr, value in StateVal.__annotations__.items()} +_STATE_CLASS_SUFFIX = "_state" +_STATE_CLASS = "StateVal" +_DOCSTRING_INDENT = " " * 8 + +SELECTOR_SIMPLE_TYPES = { + "boolean": "bool", + "color_rgb": "tuple[int, int, int]", + "color_temp": "int", + "config_entry": "str", + "date": "datetime", + "datetime": "datetime", + "entity": "str", + "icon": "str", + "object": "Any", + "state": "str", + "text": "str", + "time": "str", +} + + +@dataclass +class _ServiceField: + """Describe a Home Assistant service field.""" + + name: str + required: bool + annotation: ast.expr + default: ast.expr | None + description: str | None + + +class StubsGenerator: + """Build a pyscript stubs modules using the Python AST.""" + + def __init__(self, hass: HomeAssistant) -> None: + """Initialize stubs generator.""" + self._hass = hass + self.ignored_identifiers: list[str] = [] + self._classes: dict[str, ast.ClassDef] = {} + self._domain_attributes: dict[str, dict[str, set[str]]] = {} + + async def build(self) -> str: + """Return the generated stub body.""" + + module_body: list[ast.stmt] = [] + + imports = { + "typing": ["Any", "Literal"], + "datetime": ["datetime"], + "pyscript_builtins": [_STATE_CLASS], + } + + for module, imports in imports.items(): + module_body.append( + ast.ImportFrom( + module=module, + level=0, + names=[ast.alias(name=imp, asname=None) for imp in imports], + ) + ) + + await self._build_entity_classes() + await self._build_services() + + # order: entities first, then domains; ignore leading “_”. + sorted_domains = sorted( + self._classes.keys(), + key=lambda s: ( + s[1:].split("_", 1)[0] if s.startswith("_") else s, + 0 if s.startswith("_") else 1, + s, + ), + ) + + for domain_id in sorted_domains: + domain = self._classes[domain_id] + attributes = self._domain_attributes.get(domain_id) + if attributes is not None: + for attr, attr_types in sorted(attributes.items(), reverse=True): + ann = None + for attr_type in attr_types: + if attr_type is None: + continue + if ann is not None: + ann = ast.BinOp(left=ann, op=ast.BitOr(), right=self._name(attr_type)) + else: + ann = self._name(attr_type) + + if not ann: + ann = self._name("Any") + domain.body.insert( + 0, + ast.AnnAssign( + target=self._name(attr), + annotation=ann, + value=None, + simple=1, + ), + ) + + if len(domain.body) == 0: + # empty class body + domain.body.append(ast.Expr(value=ast.Constant(value=Ellipsis))) + + module_body.append(domain) + + module = ast.Module(body=module_body, type_ignores=[]) + ast.fix_missing_locations(module) + return ast.unparse(module) + + def _get_or_create_class(self, domain_id: str, base_class: str = None) -> ast.ClassDef: + cls = self._classes.get(domain_id) + if cls is None: + cls = ast.ClassDef( + name=domain_id, + bases=[] if base_class is None else [self._name(base_class)], + keywords=[], + body=[], + decorator_list=[], + ) + self._classes[domain_id] = cls + return cls + + def _collect_entity_atts(self, domain_id: str, entity_id: str) -> None: + state = self._hass.states.get(f"{domain_id}.{entity_id}") + if state is None: + return + # _LOGGER.debug(f"Collecting entity attributes for {domain_id}.{entity_id}: {state.attributes}") + entity_attributes = self._domain_attributes.setdefault(self._get_entity_class_name(domain_id), {}) + + for attr_key, attr_value in state.attributes.items(): + fqn = f"{domain_id}.{entity_id}.{attr_key}" + + if attr_key in _STATE_BASE_FIELDS: + continue + + if not self._is_identifier(attr_key, fqn): + continue + value_type = self._get_entity_attribute_type(fqn, attr_value) + + entity_attributes.setdefault(attr_key, set()).add(value_type) + + def _get_entity_class_name(self, domain_id: str) -> str: + return f"_{domain_id}{_STATE_CLASS_SUFFIX}" + + async def _build_entity_classes(self): + for entity in er.async_get(self._hass).entities.values(): + if entity.disabled: + continue + + domain_id, entity_id = split_entity_id(entity.entity_id) + + if not self._is_identifier(entity_id, entity.entity_id): + continue + + self._collect_entity_atts(domain_id, entity_id) + + self._get_or_create_class(domain_id).body.append( + ast.AnnAssign( + target=self._name(entity_id), + annotation=self._name(self._get_entity_class_name(domain_id)), + value=None, # ast.Constant(value=Ellipsis), + simple=1, + ) + ) + + self._get_or_create_class(self._get_entity_class_name(domain_id), _STATE_CLASS) + + async def _build_services(self): + def process_fields(fields: dict[str, Any]) -> list[_ServiceField]: + result: list[_ServiceField] = [] + for field_name, field in (fields.get("fields") or {}).items(): + if field_name == "advanced_fields": + result.extend(process_fields(field)) + continue + definition = self._describe_service_field(service_id, field_name, field) + if definition is not None: + result.append(definition) + return result + + descriptions = await async_get_all_descriptions(self._hass) + for domain_id, services in descriptions.items(): + + domain_class = self._get_or_create_class(domain_id) + for service_id, payload in services.items(): + if not self._is_identifier(service_id, f"{domain_id}.{service_id}"): + continue + + _LOGGER.debug("Building service %s.%s payload: %s", domain_id, service_id, payload) + + field_nodes = sorted( + process_fields(payload), + key=lambda x: not x.required, + ) + + has_target = "target" in payload + entity_class = self._get_entity_class_name(domain_id) + if has_target and entity_class in self._classes: + entity_service = await self._create_service_function( + service_id, field_nodes, payload, "entity" + ) + self._get_or_create_class(entity_class).body.append(entity_service) + + service = await self._create_service_function(service_id, field_nodes, payload, "service") + domain_class.body.append(service) + + async def _create_service_function( + self, + service_id: str, + field_nodes: list[_ServiceField], + payload: dict[str, Any], + def_type: Literal["entity", "service"] = "service", + ) -> ast.FunctionDef: + """Create a function definition describing the service signature.""" + + args: list[ast.arg] = [] + kwonlyargs: list[ast.arg] = [] + kw_defaults: list[ast.expr] = [] + decorator_list: list[ast.expr] = [] + + has_target = "target" in payload + + if def_type == "service": + decorator_list.append(self._name("staticmethod")) + + if has_target: + field_nodes = [ + _ServiceField( + name="entity_id", + annotation=self._name("str"), + required=True, + default=None, + description="Entity ID", + ) + ] + field_nodes + + elif def_type == "entity": + args.append(ast.arg(arg="self")) + + if def_type == "entity" and len(field_nodes) == 1: # simple calling with 1 arg service + args.append(ast.arg(arg=field_nodes[0].name, annotation=field_nodes[0].annotation)) + else: + for field in field_nodes: + kwonlyargs.append(ast.arg(arg=field.name, annotation=field.annotation)) + kw_defaults.append(field.default) + + if "response" in payload: + returns = ast.Subscript( + value=self._name("dict"), + slice=ast.Tuple(elts=[self._name("str"), self._name("Any")]), + ) + else: + returns = None + + body: list[ast.stmt] = [] + + docstring_value = self._build_docstring(payload.get("description"), field_nodes) + if docstring_value: + body.append(ast.Expr(value=ast.Constant(value=docstring_value))) + body.append(ast.Expr(value=ast.Constant(value=Ellipsis))) + + service_function = ast.FunctionDef( + name=service_id, + args=ast.arguments( + posonlyargs=[], + args=args, + vararg=None, + kwonlyargs=kwonlyargs, + kw_defaults=kw_defaults, + kwarg=None, + defaults=[], + ), + body=body, + decorator_list=decorator_list, + returns=returns, + ) + return service_function + + def _get_entity_attribute_type(self, fqn: str, value: Any) -> str | None: + if value is None: + return None + t = type(value) + if t in (bool, int, float, str, list, dict, set, tuple): + return t.__qualname__ + if t.__module__ == "datetime" and t.__qualname__ == "datetime": + return t.__qualname__ + if isinstance(value, StrEnum): + return "str" + if isinstance(value, (IntEnum, IntFlag)): + return "int" + _LOGGER.debug("Attribute %s type %s unknown, value: %s", fqn, t, value) + return None + + def _describe_service_field( + self, + service: str, + field_name: str, + field: dict[str, Any], + ) -> _ServiceField | None: + fqn = f"{service}({field_name})" + if not self._is_identifier(field_name, fqn): + return None + + try: + annotation = self._selector_annotation(field.get("selector")) + + is_required = field.get("required") is True + + default_expr = None + default_value = field.get("default") + if default_value is not None and isinstance(default_value, (int, float, str, bool)): + default_expr = ast.Constant(value=default_value) + + if not is_required: + if default_expr is None: + if annotation is not None: + # add | None for optional fields without default value + annotation = ast.BinOp(left=annotation, op=ast.BitOr(), right=ast.Constant(None)) + default_expr = ast.Constant(value=None) + + description = field.get("description") or "" + if example := field.get("example"): + description += f" Example: {example}" + + return _ServiceField( + name=field_name, + required=is_required, + annotation=annotation, + default=default_expr, + description=description, + ) + except Exception: + _LOGGER.exception("Incorrect method description %s: %s", fqn, field) + return None + + def _selector_annotation(self, selector: dict[str, Any] | None) -> ast.expr | None: + if not selector: + return None + for selector_id, selector_value in selector.items(): + if selector_type := SELECTOR_SIMPLE_TYPES.get(selector_id): + return self._name(selector_type) + if selector_id == "number": + if selector_value == "any": + return self._name("float") + if isinstance(selector_value, dict) and selector_value.get("mode") == "box": + return self._name("float") + return self._name("int") + if selector_id == "select": + options = [] + if isinstance(selector_value, dict): + options = selector_value.get("options") or [] + literals = [ast.Constant(value="")] + for option in options: + value = option.get("value") if isinstance(option, dict) else option + literals.append(ast.Constant(value=value)) + return ast.Subscript( + value=self._name("Literal"), + slice=ast.Tuple(elts=literals), + ) + _LOGGER.debug("Selector annotation unknown %s", selector) + return None + + def _is_identifier(self, value: str, fqn: str) -> bool: + valid = value.isidentifier() and not keyword.iskeyword(value) + if not valid: + self.ignored_identifiers.append(fqn) + _LOGGER.debug("Invalid python identifier %s (%s)", value, fqn) + return valid + + def _name(self, identifier: str, ctx: ast.expr_context | None = None) -> ast.expr: + if ctx is None: + ctx = ast.Load() + return ast.Name(id=identifier, ctx=ctx) + + def _build_docstring(self, description: str | None, fields: list[_ServiceField]) -> str | None: + docstring = description.strip() if description else "" + docstring.replace("\n", f"\n{_DOCSTRING_INDENT}") + first_arg = True + for field in fields: + if not field.description: + continue + if first_arg: + docstring += f"\n\n{_DOCSTRING_INDENT}Args:" + first_arg = False + f_desc = field.description.replace("\n", f"\n{_DOCSTRING_INDENT * 2}") + docstring += f"\n{_DOCSTRING_INDENT} {field.name}: {f_desc}" + return docstring diff --git a/custom_components/pyscript/stubs/pyscript_builtins.py b/custom_components/pyscript/stubs/pyscript_builtins.py new file mode 100644 index 0000000..aa1a294 --- /dev/null +++ b/custom_components/pyscript/stubs/pyscript_builtins.py @@ -0,0 +1,583 @@ +"""Helper stub that exposes pyscript's dynamic built-ins to static analyzers. + +The real implementations are injected by pyscript at runtime; only signatures +and documentation live here. +""" + +# pylint: disable=unnecessary-ellipsis, invalid-name, redefined-outer-name +from __future__ import annotations + +from asyncio import Task +from collections.abc import Callable +from datetime import datetime +from typing import Any, Literal + +from homeassistant.core import HomeAssistant + +hass: HomeAssistant + + +def service( + *service_name: str, supports_response: Literal["none", "only", "optional"] = "none" +) -> Callable[..., Any]: + """Register the wrapped function as a Home Assistant service. + + Args: + service_name: Optional ``DOMAIN.SERVICE`` aliases; defaults to ``pyscript.``. + supports_response: Advertised response mode (``"none"``, ``"only"``, or ``"optional"``). + """ + ... + + +def state_trigger( + *str_expr: str, + state_hold: int | float | None = None, + state_hold_false: int | float | None = None, + state_check_now: bool = False, + kwargs: dict | None = None, + watch: list[str] | set[str] | None = None, +) -> Callable[..., Any]: + """Trigger when any provided state expression evaluates truthy. + + Args: + str_expr: One or more state expressions (strings, lists, or sets) that are ORed together. + state_hold: Seconds the expression must stay true before firing; cancelled if it reverts. + state_hold_false: Seconds the expression must stay false before another trigger; ``0`` enforces edges. + state_check_now: Evaluate at registration time and fire immediately if the expression is true. + kwargs: Extra keywords injected into each call in addition to the standard trigger context. + watch: Explicit entities or attributes to monitor when autodetection from the expression is insufficient. + + Trigger kwargs include ``trigger_type="state"``, ``var_name``, ``value`` and ``old_value`` when available. + """ + ... + + +def state_active(str_expr: str) -> Callable[..., Any]: + """Restrict trigger execution to state-based condition. + + Args: + str_expr: Expression that must evaluate truthy for the trigger to run; ``.old`` values are available for state triggers. + """ + ... + + +def time_trigger(*time_spec: str | None, **kwargs) -> Callable[..., Any]: + """Schedule the function using time specifications. + + Args: + *time_spec: Time expressions such as ``startup``, ``shutdown``, ``once()``, ``period()``, or ``cron()``. + **kwargs: Optional trigger keywords merged into each invocation. + """ + ... + + +def task_unique(name: str, kill_me: bool = False) -> Callable[..., Any]: + """Ensure only one running instance of the decorated task. + + Args: + name: Identifier used to reclaim prior tasks that called ``task.unique`` or ``@task_unique``. + kill_me: Cancel the new run instead of the existing one when a conflict is found. + """ + ... + + +def event_trigger(*event_type: str, str_expr: str = None, **kwargs) -> Callable[..., Any]: + """Trigger when a Home Assistant event matches the criteria. + + Args: + event_type: Event types to subscribe to; multiple values act as aliases. + str_expr: Optional filter evaluated against the event payload and context variables. + kwargs: Extra keyword arguments merged into each call. + + Trigger kwargs include ``trigger_type="event"`` and the event data fields. + """ + ... + + +def time_active(*time_spec: str, hold_off: int | float | None = None) -> Callable[..., Any]: + """Restrict trigger execution to specific time windows. + + Args: + time_spec: ``range()`` or ``cron()`` expressions (optionally prefixed with ``not``) checked on each trigger. + hold_off: Seconds to suppress further triggers after a successful run. + + """ + ... + + +def mqtt_trigger(topic: str, str_expr: str | None = None, **kwargs) -> Callable[..., Any]: + """Trigger when a subscribed MQTT message matches the specification. + + Args: + topic: MQTT topic to monitor; wildcards ``+`` and ``#`` are supported. + str_expr: Optional expression evaluated against ``payload``, ``payload_obj``, ``retain``, ``topic``, and ``qos``. + kwargs: Extra keyword arguments merged into each invocation. + """ + ... + + +def pyscript_compile() -> Callable[..., Any]: + """Compile the wrapped function into native (synchronous) Python. + + Compiled functions cannot use pyscript-only features but run at full CPython speed. + """ + ... + + +def pyscript_executor() -> Callable[..., Any]: + """Compile the wrapped function and run it transparently in ``task.executor``. + + Use it for blocking or I/O-bound code so each call runs in a background thread. + """ + ... + + +class log: + """Logging helpers that mirror Home Assistant's logging levels.""" + + @staticmethod + def debug(msg: Any, *args, **kwargs) -> None: + """Log a debug-level message scoped to the current pyscript context. + + Args: + msg: Message or format string to log. + """ + ... + + @staticmethod + def info(msg: Any, *args, **kwargs) -> None: + """Log an info-level message scoped to the current pyscript context. + + Args: + msg: Message or format string to log. + """ + ... + + @staticmethod + def warning(msg: Any, *args, **kwargs) -> None: + """Log a warning-level message scoped to the current pyscript context. + + Args: + msg: Message or format string to log. + """ + ... + + @staticmethod + def error(msg: Any, *args, **kwargs) -> None: + """Log an error-level message scoped to the current pyscript context. + + Args: + msg: Message or format string to log. + """ + ... + + +class state: + """Utility functions for accessing and managing Home Assistant state.""" + + @staticmethod + def delete(name: str) -> None: + """Remove an entity or attribute identified by ``name``. + + Args: + name: Fully qualified entity or entity attribute to delete (``DOMAIN.entity[.attr]``). + """ + ... + + @staticmethod + def exist(name: str) -> bool: + """Check whether a state variable or attribute exists. + + Args: + name: Fully qualified entity or entity attribute name. + + Returns: + bool: ``True`` if the entity or attribute is present, otherwise ``False``. + """ + ... + + @staticmethod + def get(name: str) -> Any: + """Return the current value for an entity or attribute. + + Args: + name: Fully qualified entity or entity attribute name. + + Returns: + Any: State value or attribute value; raises ``NameError``/``AttributeError`` if missing. + """ + ... + + @staticmethod + def getattr(name: str) -> dict[str, Any] | None: + """Return the attribute dictionary for an entity, if present. + + Args: + name: Entity id or attribute path that resolves to an entity. + + Returns: + dict[str, Any]: Attribute mapping, or ``None`` when the entity is unknown. + """ + ... + + @staticmethod + def names(domain: str | None = None) -> list[str]: + """List entity ids within an optional domain. + + Args: + domain: Domain prefix to filter by; returns every entity when omitted. + + Returns: + list[str]: Entity ids known to Home Assistant. + """ + ... + + @staticmethod + def persist( + entity_id: str, default_value: Any = None, default_attributes: dict[str, Any] | None = None + ) -> None: + """Persist a ``pyscript.`` entity across restarts with optional defaults. + + Args: + entity_id: Entity id that must live in the ``pyscript`` domain. + default_value: Value to seed when the entity is missing. + default_attributes: Attribute dictionary to seed when attributes are absent. + """ + ... + + @staticmethod + def set( + entity_id: str, + value: Any = None, + new_attributes: dict[str, Any] | None = None, + **kwargs: Any, + ) -> None: + """Set an entity value and optionally update or replace attributes. + + Args: + entity_id: Fully qualified entity id to update. + value: New state value; omit to leave the current value unchanged. + new_attributes: Attribute dictionary that replaces existing attributes. + """ + ... + + @staticmethod + def setattr(name: str, value: Any) -> None: + """Assign a single attribute on the specified entity. + + Args: + name: Entity attribute path in ``DOMAIN.entity.attr`` form. + value: Attribute value to write. + """ + ... + + +class event: + """Helpers for interacting with Home Assistant's event bus.""" + + @staticmethod + def fire(event_type: str, **kwargs: Any) -> None: + """Send an event on the Home Assistant event bus. + + Args: + event_type: Name of the event to publish. + **kwargs: Event payload delivered as event data. + """ + ... + + +class task: + """Asynchronous task utilities built on top of ``asyncio``.""" + + @staticmethod + def create(func: Callable[..., Any], *args: Any, **kwargs: Any) -> Task: + """Spawn a new pyscript task that executes ``func`` with the supplied arguments. + + Args: + func: Callable to execute in the new task. + *args: Positional arguments forwarded to ``func``. + **kwargs: Keyword arguments forwarded to ``func``. + + Returns: + Task: Newly created asyncio task. + """ + ... + + @staticmethod + def cancel(task_id: Task | None = None) -> None: + """Cancel a task, defaulting to the current task. + + Args: + task_id: Task returned by ``task.create``; cancels the current task when omitted. + """ + ... + + @staticmethod + def current_task() -> Task: + """Return the currently running pyscript task. + + Returns: + Task: Task representing the active pyscript coroutine. + """ + ... + + @staticmethod + def name2id(name: str | None = None) -> Task | dict[str, Task]: + """Resolve registered task names (from ``task.unique``) to task objects. + + Args: + name: Specific task name to resolve; return a mapping of all names when omitted. + + Returns: + Task | dict[str, Task]: Task matching the name, or mapping of all names to tasks. + """ + ... + + @staticmethod + def wait( + task_set: list[Task], + timeout: int | float | None = None, + return_when: Literal["ALL_COMPLETED", "FIRST_COMPLETED", "FIRST_EXCEPTION"] = "ALL_COMPLETED", + ) -> tuple[set[Task], set[Task]]: + """Wait for tasks using ``asyncio.wait`` semantics. + + Args: + task_set: List of asyncio tasks to monitor. + timeout: Seconds to wait before returning pending tasks; ``None`` waits forever. + return_when: Condition that ends the wait (see ``asyncio.wait``). + + Returns: + tuple[set[Task], set[Task]]: Two sets ``(done, pending)`` mirroring ``asyncio.wait``. + """ + ... + + @staticmethod + def add_done_callback( + task_id: Task, + func: Callable[..., Any], + *args: Any, + **kwargs: Any, + ) -> None: + """Register a callback that runs when the task completes. + + Args: + task_id: Task to monitor for completion. + func: Callback to invoke when the task finishes. + *args: Positional arguments forwarded to ``func``. + **kwargs: Keyword arguments forwarded to ``func``. + """ + ... + + @staticmethod + def remove_done_callback(task_id: Task, func: Callable[..., Any]) -> None: + """Remove a previously registered completion callback. + + Args: + task_id: Task the callback was attached to. + func: Callback function that should be removed. + """ + ... + + @staticmethod + def executor(func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any: + """Run a blocking callable in a background thread and return its result. + + Args: + func: Synchronous callable to execute. + *args: Positional arguments forwarded to ``func``. + **kwargs: Keyword arguments forwarded to ``func``. + + Returns: + Any: Result returned by ``func``. + """ + ... + + @staticmethod + def sleep(seconds: int | float) -> None: + """Yield control for the given seconds without blocking the event loop. + + Args: + seconds: Duration to suspend execution; fractional values are allowed. + """ + ... + + @staticmethod + def unique(task_name: str, kill_me: bool = False) -> None: + """Assign a unique name to the current task, optionally killing peers. + + Args: + task_name: Identifier shared with ``task.name2id`` and other callers. + kill_me: Cancel the current task if another live task already claimed the name. + """ + ... + + @staticmethod + def wait_until( + state_trigger: str | list[str] | None = None, + time_trigger: str | list[str] | None = None, + event_trigger: str | list[str] | None = None, + mqtt_trigger: str | list[str] | None = None, + webhook_trigger: str | list[str] | None = None, + webhook_local_only: bool = True, + webhook_methods: list[str] = ("POST", "PUT"), + timeout: int | float | None = None, + state_check_now: bool = True, + state_hold: int | float | None = None, + state_hold_false: int | float | None = None, + ) -> dict[str, Any]: + """Block until any supplied trigger fires or a timeout occurs. + + Args: + state_trigger: State expressions matching ``@state_trigger`` semantics. + time_trigger: Time specifications matching ``@time_trigger`` semantics. + event_trigger: Event types or filters matching ``@event_trigger`` semantics. + mqtt_trigger: MQTT topics or filters matching ``@mqtt_trigger`` semantics. + webhook_trigger: Webhook ids matching ``@webhook_trigger`` semantics. + webhook_local_only: Limit webhooks to local network clients when ``True``. + webhook_methods: Allowed HTTP methods for webhook triggers. + timeout: Seconds to wait before returning ``trigger_type="timeout"``. + state_check_now: Evaluate state expressions immediately when ``True``. + state_hold: Seconds a state expression must remain true before returning. + state_hold_false: Seconds a state expression must remain false before it can trigger again. + + Returns: + dict[str, Any]: Trigger context mirroring decorator kwargs and always including ``trigger_type``. + """ + ... + + +class pyscript(Any): + """Runtime helpers for inspecting and switching pyscript global contexts.""" + + app_config: dict[str, Any] + + @staticmethod + def get_global_ctx() -> str: + """Return the name of the current pyscript global context. + + Returns: + str: Active global context identifier. + """ + ... + + @staticmethod + def set_global_ctx(new_ctx_name: str) -> None: + """Switch the active global context to ``new_ctx_name``. + + Args: + new_ctx_name: Name of an existing global context to activate. + """ + ... + + @staticmethod + def list_global_ctx() -> list[str]: + """Return available global context names, current first. + + Returns: + list[str]: Global context names ordered with the active context first. + """ + ... + + @staticmethod + def reload() -> None: + """Trigger a full pyscript reload, covering scripts, apps, and modules.""" + ... + + +class StateVal: + """Representation of a Home Assistant entity state value.""" + + entity_id: str + friendly_name: str + device_class: str + icon: str + last_changed: datetime + last_updated: datetime + last_reported: datetime + + def as_float(self, default: Any = object()) -> float: + """Convert the state to ``float`` or return ``default`` on failure. + + Args: + default: Fallback value used when conversion raises an error or the value is empty. + + Returns: + float: Parsed float, or ``default`` when provided. + """ + ... + + def as_int(self, default: Any = object(), base: int = 10) -> int: + """Convert the state to ``int`` (using ``base``) or return ``default``. + + Args: + default: Fallback value used when conversion raises an error or the value is empty. + base: Numeric base to use when interpreting the value. + + Returns: + int: Parsed integer, or ``default`` when provided. + """ + ... + + def as_bool(self, default: Any = object()) -> bool: + """Interpret the state as ``bool`` or return ``default``. + + Args: + default: Fallback value used when conversion raises an error or the value is empty. + + Returns: + bool: Parsed boolean, or ``default`` when provided. + """ + ... + + def as_round( + self, + precision: int = 0, + method: Literal["common", "ceil", "floor", "half"] = "common", + default: Any = object(), + ) -> float: + """Convert the state to ``float`` and round it using the requested strategy. + + Args: + precision: Decimal places to keep after rounding. + method: Rounding strategy supported by ``homeassistant.helpers.template``. + default: Fallback value used when conversion fails. + + Returns: + float: Rounded floating-point value, or ``default`` when provided. + """ + ... + + def as_datetime(self, default: Any = object()) -> datetime: + """Parse the state into a timezone-aware ``datetime`` if possible. + + Args: + default: Fallback value used when parsing fails. + + Returns: + datetime: Parsed datetime, or ``default`` when provided. + """ + ... + + def is_unknown(self) -> bool: + """Return whether the entity reports the ``unknown`` sentinel. + + Returns: + bool: ``True`` if the state equals ``unknown``. + """ + ... + + def is_unavailable(self) -> bool: + """Return whether the entity reports the ``unavailable`` sentinel. + + Returns: + bool: ``True`` if the state equals ``unavailable``. + """ + ... + + def has_value(self) -> bool: + """Return whether the entity has a concrete (non-empty) value. + + Returns: + bool: ``True`` if a non-empty state value is available. + """ + ... diff --git a/docs/reference.rst b/docs/reference.rst index 9e38b57..005ba84 100644 --- a/docs/reference.rst +++ b/docs/reference.rst @@ -140,10 +140,33 @@ scripts or apps that depend on those modules will be reloaded. Importing modules ``/pyscript/modules`` are not restricted if ``allow_all_imports`` is ``False``. Typically common functions or features would be implemented in a module or package, and then imported and used by scripts in ``/pyscript`` or applications in ``/pyscript/apps``. +Imports whose top-level package is ``stubs`` (i.e., anything below ``modules/stubs``) are ignored at +runtime, which allows you to store IDE helper modules there without affecting execution. Even if you can't directly call one function from another script file, HASS state variables are global and services can be called from any script file. +IDE Helpers +----------- + +Pyscript can generate IDE stubs describing dynamic helpers, discovered entities, and services in +your installation. Call the ``pyscript.generate_stubs`` service from Developer Tools -> Actions to +build them. The generator copies the bundled ``pyscript_builtins.py`` and produces a +``pyscript_generated.py`` module under ``/pyscript/modules/stubs``. Re-run the service whenever +you add new Pyscript services or entities to keep the metadata up to date. + +At runtime the interpreter ignores ``import`` statements whose top-level package is ``stubs``. That +lets you safely add ``from stubs.pyscript_builtins import ...`` lines for type hints without affecting +execution. Point your IDE at ``/pyscript/modules`` (e.g., mark it as a source root or add it to +the workspace) and the helpers will be picked up for code completion, inspections, and navigation. + +- PyCharm / IntelliJ: right-click the ``/pyscript/modules`` directory in the Project tool + window and choose *Mark Directory as -> Sources Root*. +- VS Code: open *Settings → Python Analysis: Extra Paths -> Add Item* -> ``pyscript/modules``. +- Studio Code Server (Home Assistant add-on): same as in VS Code; just install ``Pyright``. +- Other editors: ensure the Python path includes ``modules`` (for example by exporting + ``PYTHONPATH=/config/pyscript/modules``) before launching the IDE or language server. + Reloading Scripts ----------------- diff --git a/tests/test_stubs.py b/tests/test_stubs.py new file mode 100644 index 0000000..db7d881 --- /dev/null +++ b/tests/test_stubs.py @@ -0,0 +1,158 @@ +"""Tests for pyscript stub generation and stub imports.""" + +from __future__ import annotations + +from datetime import datetime as dt +from pathlib import Path +from types import SimpleNamespace +from typing import Any + +import pytest + +from custom_components.pyscript.const import DOMAIN, FOLDER, SERVICE_GENERATE_STUBS + +from tests.test_init import setup_script + + +@pytest.mark.asyncio +async def test_generate_stubs_service_writes_files(hass, caplog, monkeypatch): + """Ensure the generate_stubs service writes expected files into modules/stubs.""" + + # Set up pyscript so the service is registered. + await setup_script( + hass, + notify_q=None, + now=dt(2024, 1, 1, 0, 0, 0), + source=""" +@service +def ready(): + pass +""", + script_name="/stub_service.py", + ) + + hass.states.async_set( + "light.lamp", + "on", + { + "valid_attr": 42, + "invalid attr": "ignored", + }, + ) + + dummy_registry = SimpleNamespace( + entities={ + "light.lamp": SimpleNamespace(entity_id="light.lamp", disabled=False), + } + ) + monkeypatch.setattr("custom_components.pyscript.stubs.generator.er.async_get", lambda _: dummy_registry) + + async def fake_service_descriptions(_hass) -> dict[str, dict[str, dict[str, Any]]]: + return { + "light": { + "blink": { + "description": "Blink the light once.", + "target": {"entity": {"domain": "light"}}, + "fields": { + "brightness": { + "required": True, + "selector": {"number": {}}, + "description": "Brightness.", + }, + "speed": { + "required": False, + "selector": {"select": {"options": ["slow", "fast"]}}, + "description": "Blink speed.", + }, + "invalid-field": {"required": True, "selector": {"boolean": None}}, + }, + "response": {"optional": True}, + } + } + } + + monkeypatch.setattr( + "custom_components.pyscript.stubs.generator.async_get_all_descriptions", fake_service_descriptions + ) + + stubs_dir = Path(hass.config.path(FOLDER)) / "modules" / "stubs" + builtins_target = stubs_dir / "pyscript_builtins.py" + generated_target = stubs_dir / "pyscript_generated.py" + + if stubs_dir.exists(): + # Clean up artifacts from previous runs to avoid false positives. + for child in stubs_dir.iterdir(): + child.unlink() + else: + stubs_dir.mkdir(parents=True, exist_ok=True) + + response: dict[str, Any] = await hass.services.async_call( + DOMAIN, + SERVICE_GENERATE_STUBS, + {}, + blocking=True, + return_response=True, + ) + + expected_ignored: list[str] = [ + "blink(invalid-field)", + "light.lamp.invalid attr", + ] + assert response["ignored_identifiers"] == sorted(expected_ignored) + assert response["status"] == "OK" + assert builtins_target.exists() + assert generated_target.exists() + + generated_content = generated_target.read_text(encoding="utf-8") + assert "class light" in generated_content + assert "class _light_state(StateVal)" in generated_content + assert "lamp: _light_state" in generated_content + assert "def blink(self, *, brightness: int, speed" in generated_content + assert "def blink(*, entity_id: str, brightness: int, speed:" in generated_content + assert "Blink the light once." in generated_content + assert "Literal" in generated_content + assert "'slow'" in generated_content + assert "'fast'" in generated_content + assert "-> dict[str, Any]" in generated_content + + original_builtins = ( + Path(__file__).resolve().parent.parent + / "custom_components" + / "pyscript" + / "stubs" + / "pyscript_builtins.py" + ) + assert builtins_target.read_text(encoding="utf-8") == original_builtins.read_text(encoding="utf-8") + + # Clean up generated files so other tests start with a blank slate. + generated_target.unlink() + builtins_target.unlink() + try: + stubs_dir.rmdir() + except OSError: + # Directory contains other content; leave it in place. + pass + + +@pytest.mark.asyncio +async def test_stub_imports_are_ignored(hass, caplog): + """Verify importing from stubs.* does not raise even when the module is missing.""" + + await setup_script( + hass, + notify_q=None, + now=dt(2024, 2, 2, 0, 0, 0), + source=""" +from stubs import helper1 +from stubs.fake_module import helper2 +from stubs.fake_module.deep import helper3 + +@service +def stub_import_ready(): + log.info("stub import ready") +""", + script_name="/stub_import.py", + ) + + assert hass.services.has_service(DOMAIN, "stub_import_ready") + assert "ModuleNotFoundError" not in caplog.text