From 5ae38193036d3cff49cbc53ee65af4ae105963a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20V=C3=B6lcker?= Date: Mon, 16 Mar 2026 15:31:26 +0100 Subject: [PATCH] Use more-itertools.peekable instead of our own implementation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Simon Völcker --- pyproject.toml | 1 + .../sdk/timeseries/formulas/_lexer.py | 27 ++++++---- .../sdk/timeseries/formulas/_parser.py | 31 ++++++----- .../sdk/timeseries/formulas/_peekable.py | 53 ------------------- 4 files changed, 37 insertions(+), 75 deletions(-) delete mode 100644 src/frequenz/sdk/timeseries/formulas/_peekable.py diff --git a/pyproject.toml b/pyproject.toml index c9292f64d..4b7835460 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,6 +38,7 @@ dependencies = [ "typing_extensions >= 4.14.1, < 5", "marshmallow >= 3.19.0, < 5", "marshmallow_dataclass >= 8.7.1, < 9", + "more-itertools >= 10.8.0, < 11", ] dynamic = ["version"] diff --git a/src/frequenz/sdk/timeseries/formulas/_lexer.py b/src/frequenz/sdk/timeseries/formulas/_lexer.py index eca4f880d..1607137db 100644 --- a/src/frequenz/sdk/timeseries/formulas/_lexer.py +++ b/src/frequenz/sdk/timeseries/formulas/_lexer.py @@ -8,11 +8,11 @@ from collections.abc import Iterator +from more_itertools import peekable from typing_extensions import override from . import _token from ._exceptions import FormulaSyntaxError -from ._peekable import Peekable class Lexer(Iterator[_token.Token]): @@ -25,33 +25,40 @@ def __init__(self, formula: str): formula: The formula string to lex. """ self._formula: str = formula - self._iter: Peekable[tuple[int, str]] = Peekable(enumerate(iter(formula))) + self._iter: peekable[tuple[int, str]] = peekable(enumerate(iter(formula))) + + def _peek_char(self) -> tuple[int, str] | None: + """Return the next position and character, or None if _iter is at its end.""" + try: + return self._iter.peek() + except StopIteration: + return None def _read_integer(self) -> str: num_str = "" - peek = self._iter.peek() + peek = self._peek_char() while peek is not None and peek[1].isdigit(): _, char = next(self._iter) num_str += char - peek = self._iter.peek() + peek = self._peek_char() return num_str def _read_number(self) -> str: num_str = "" - peek = self._iter.peek() + peek = self._peek_char() while peek is not None and (peek[1].isdigit() or peek[1] == "."): _, char = next(self._iter) num_str += char - peek = self._iter.peek() + peek = self._peek_char() return num_str def _read_symbol(self) -> str: word_str = "" - peek = self._iter.peek() + peek = self._peek_char() while peek is not None and peek[1].isalnum(): _, char = next(self._iter) word_str += char - peek = self._iter.peek() + peek = self._peek_char() return word_str @override @@ -62,10 +69,10 @@ def __iter__(self) -> Lexer: @override def __next__(self) -> _token.Token: # pylint: disable=too-many-branches """Return the next token from the formula string.""" - peek = self._iter.peek() + peek = self._peek_char() while peek is not None and peek[1].isspace(): _ = next(self._iter) - peek = self._iter.peek() + peek = self._peek_char() if peek is None: raise StopIteration diff --git a/src/frequenz/sdk/timeseries/formulas/_parser.py b/src/frequenz/sdk/timeseries/formulas/_parser.py index 0e7ab46ad..f2cefd65c 100644 --- a/src/frequenz/sdk/timeseries/formulas/_parser.py +++ b/src/frequenz/sdk/timeseries/formulas/_parser.py @@ -12,6 +12,7 @@ from frequenz.channels import Receiver from frequenz.client.common.microgrid.components import ComponentId from frequenz.quantities import Quantity +from more_itertools import peekable from frequenz.sdk.timeseries import Sample from frequenz.sdk.timeseries._base_types import QuantityT @@ -22,7 +23,6 @@ from ._formula import Formula from ._functions import FunCall, Function from ._lexer import Lexer -from ._peekable import Peekable from ._resampled_stream_fetcher import ResampledStreamFetcher _logger = logging.getLogger(__name__) @@ -67,10 +67,17 @@ def __init__( """Initialize the parser.""" self._name: str = name self._formula: str = formula - self._lexer: Peekable[_token.Token] = Peekable(Lexer(formula)) + self._lexer: peekable[_token.Token] = peekable(Lexer(formula)) self._telemetry_fetcher: ResampledStreamFetcher = telemetry_fetcher self._create_method: Callable[[float], QuantityT] = create_method + def _peek_next_token(self) -> _token.Token | None: + """Get the next token from the lexer, or None if there is None.""" + try: + return self._lexer.peek() + except StopIteration: + return None + def _parse_term(self) -> AstNode[QuantityT] | None: """Parse a term. @@ -81,7 +88,7 @@ def _parse_term(self) -> AstNode[QuantityT] | None: if factor is None: return None - token: _token.Token | None = self._lexer.peek() + token: _token.Token | None = self._peek_next_token() while token is not None and isinstance(token, (_token.Plus, _token.Minus)): token = next(self._lexer) next_factor = self._parse_factor() @@ -98,7 +105,7 @@ def _parse_term(self) -> AstNode[QuantityT] | None: elif isinstance(token, _token.Minus): factor = _ast.Sub(left=factor, right=next_factor) - token = self._lexer.peek() + token = self._peek_next_token() return factor @@ -113,7 +120,7 @@ def _parse_factor(self) -> AstNode[QuantityT] | None: if unary is None: return None - token: _token.Token | None = self._lexer.peek() + token: _token.Token | None = self._peek_next_token() while token is not None and isinstance(token, (_token.Mul, _token.Div)): token = next(self._lexer) next_unary = self._parse_unary() @@ -129,7 +136,7 @@ def _parse_factor(self) -> AstNode[QuantityT] | None: elif isinstance(token, _token.Div): unary = _ast.Div(left=unary, right=next_unary) - token = self._lexer.peek() + token = self._peek_next_token() return unary @@ -139,7 +146,7 @@ def _parse_unary(self) -> AstNode[QuantityT] | None: A unary is any expression that does not contain any binary operators outside of parentheses. """ - token: _token.Token | None = self._lexer.peek() + token: _token.Token | None = self._peek_next_token() if token is not None and isinstance(token, _token.Minus): token = next(self._lexer) primary: AstNode[QuantityT] | None = self._parse_primary() @@ -167,7 +174,7 @@ def _parse_bracketed(self) -> AstNode[QuantityT] | None: message="Expected expression", ) - token: _token.Token | None = self._lexer.peek() + token: _token.Token | None = self._peek_next_token() if token is None or not isinstance(token, _token.CloseParen): raise FormulaSyntaxError( formula=self._formula, @@ -194,7 +201,7 @@ def _parse_function_call(self) -> AstNode[QuantityT] | None: params: list[AstNode[QuantityT]] = [] - token: _token.Token | None = self._lexer.peek() + token: _token.Token | None = self._peek_next_token() if token is None or not isinstance(token, _token.OpenParen): raise FormulaSyntaxError( formula=self._formula, @@ -213,7 +220,7 @@ def _parse_function_call(self) -> AstNode[QuantityT] | None: ) params.append(param) - token = self._lexer.peek() + token = self._peek_next_token() if token is None: raise FormulaSyntaxError( formula=self._formula, @@ -245,7 +252,7 @@ def _parse_primary(self) -> AstNode[QuantityT] | None: - A function call - A bracketed expression """ - token: _token.Token | None = self._lexer.peek() + token: _token.Token | None = self._peek_next_token() if token is None: return None @@ -286,7 +293,7 @@ def parse(self) -> Formula[QuantityT]: message="Empty formula", ) # There should not be any tokens left - token = self._lexer.peek() + token = self._peek_next_token() if token is not None: raise FormulaSyntaxError( formula=self._formula, diff --git a/src/frequenz/sdk/timeseries/formulas/_peekable.py b/src/frequenz/sdk/timeseries/formulas/_peekable.py deleted file mode 100644 index 4617a523d..000000000 --- a/src/frequenz/sdk/timeseries/formulas/_peekable.py +++ /dev/null @@ -1,53 +0,0 @@ -# License: MIT -# Copyright © 2025 Frequenz Energy-as-a-Service GmbH - -"""A peekable iterator implementation.""" - -from __future__ import annotations - -from collections.abc import Iterator -from typing import Generic, TypeVar - -from typing_extensions import override - -_T = TypeVar("_T") - - -class Peekable(Generic[_T], Iterator[_T]): - """Create a Peekable iterator from an existing iterator.""" - - def __init__(self, iterator: Iterator[_T]): - """Initialize this instance. - - Args: - iterator: The underlying iterator to wrap. - """ - self._iterator: Iterator[_T] = iterator - self._buffer: _T | None = None - - @override - def __iter__(self) -> Peekable[_T]: - """Return the iterator itself.""" - return self - - @override - def __next__(self) -> _T: - """Return the next item from the iterator.""" - if self._buffer is not None: - item = self._buffer - self._buffer = None - return item - return next(self._iterator) - - def peek(self) -> _T | None: - """Return the next item without advancing the iterator. - - Returns: - The next item, or `None` if the iterator is exhausted. - """ - if self._buffer is None: - try: - self._buffer = next(self._iterator) - except StopIteration: - return None - return self._buffer