Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ dependencies = [
"typing_extensions >= 4.14.1, < 5",
"marshmallow >= 3.19.0, < 5",
"marshmallow_dataclass >= 8.7.1, < 9",
"more-itertools >= 10.8.0, < 11",
]
dynamic = ["version"]

Expand Down
27 changes: 17 additions & 10 deletions src/frequenz/sdk/timeseries/formulas/_lexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@

from collections.abc import Iterator

from more_itertools import peekable
from typing_extensions import override

from . import _token
from ._exceptions import FormulaSyntaxError
from ._peekable import Peekable


class Lexer(Iterator[_token.Token]):
Expand All @@ -25,33 +25,40 @@ def __init__(self, formula: str):
formula: The formula string to lex.
"""
self._formula: str = formula
self._iter: Peekable[tuple[int, str]] = Peekable(enumerate(iter(formula)))
self._iter: peekable[tuple[int, str]] = peekable(enumerate(iter(formula)))

def _peek_char(self) -> tuple[int, str] | None:
"""Return the next position and character, or None if _iter is at its end."""
try:
return self._iter.peek()
except StopIteration:
return None

def _read_integer(self) -> str:
num_str = ""
peek = self._iter.peek()
peek = self._peek_char()
while peek is not None and peek[1].isdigit():
_, char = next(self._iter)
num_str += char
peek = self._iter.peek()
peek = self._peek_char()
return num_str

def _read_number(self) -> str:
num_str = ""
peek = self._iter.peek()
peek = self._peek_char()
while peek is not None and (peek[1].isdigit() or peek[1] == "."):
_, char = next(self._iter)
num_str += char
peek = self._iter.peek()
peek = self._peek_char()
return num_str

def _read_symbol(self) -> str:
word_str = ""
peek = self._iter.peek()
peek = self._peek_char()
while peek is not None and peek[1].isalnum():
_, char = next(self._iter)
word_str += char
peek = self._iter.peek()
peek = self._peek_char()
return word_str

@override
Expand All @@ -62,10 +69,10 @@ def __iter__(self) -> Lexer:
@override
def __next__(self) -> _token.Token: # pylint: disable=too-many-branches
"""Return the next token from the formula string."""
peek = self._iter.peek()
peek = self._peek_char()
while peek is not None and peek[1].isspace():
_ = next(self._iter)
peek = self._iter.peek()
peek = self._peek_char()

if peek is None:
raise StopIteration
Expand Down
31 changes: 19 additions & 12 deletions src/frequenz/sdk/timeseries/formulas/_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from frequenz.channels import Receiver
from frequenz.client.common.microgrid.components import ComponentId
from frequenz.quantities import Quantity
from more_itertools import peekable

from frequenz.sdk.timeseries import Sample
from frequenz.sdk.timeseries._base_types import QuantityT
Expand All @@ -22,7 +23,6 @@
from ._formula import Formula
from ._functions import FunCall, Function
from ._lexer import Lexer
from ._peekable import Peekable
from ._resampled_stream_fetcher import ResampledStreamFetcher

_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -67,10 +67,17 @@ def __init__(
"""Initialize the parser."""
self._name: str = name
self._formula: str = formula
self._lexer: Peekable[_token.Token] = Peekable(Lexer(formula))
self._lexer: peekable[_token.Token] = peekable(Lexer(formula))
self._telemetry_fetcher: ResampledStreamFetcher = telemetry_fetcher
self._create_method: Callable[[float], QuantityT] = create_method

def _peek_next_token(self) -> _token.Token | None:
"""Get the next token from the lexer, or None if there is None."""
try:
return self._lexer.peek()
except StopIteration:
return None

def _parse_term(self) -> AstNode[QuantityT] | None:
"""Parse a term.

Expand All @@ -81,7 +88,7 @@ def _parse_term(self) -> AstNode[QuantityT] | None:
if factor is None:
return None

token: _token.Token | None = self._lexer.peek()
token: _token.Token | None = self._peek_next_token()
while token is not None and isinstance(token, (_token.Plus, _token.Minus)):
token = next(self._lexer)
next_factor = self._parse_factor()
Expand All @@ -98,7 +105,7 @@ def _parse_term(self) -> AstNode[QuantityT] | None:
elif isinstance(token, _token.Minus):
factor = _ast.Sub(left=factor, right=next_factor)

token = self._lexer.peek()
token = self._peek_next_token()

return factor

Expand All @@ -113,7 +120,7 @@ def _parse_factor(self) -> AstNode[QuantityT] | None:
if unary is None:
return None

token: _token.Token | None = self._lexer.peek()
token: _token.Token | None = self._peek_next_token()
while token is not None and isinstance(token, (_token.Mul, _token.Div)):
token = next(self._lexer)
next_unary = self._parse_unary()
Expand All @@ -129,7 +136,7 @@ def _parse_factor(self) -> AstNode[QuantityT] | None:
elif isinstance(token, _token.Div):
unary = _ast.Div(left=unary, right=next_unary)

token = self._lexer.peek()
token = self._peek_next_token()

return unary

Expand All @@ -139,7 +146,7 @@ def _parse_unary(self) -> AstNode[QuantityT] | None:
A unary is any expression that does not contain any binary
operators outside of parentheses.
"""
token: _token.Token | None = self._lexer.peek()
token: _token.Token | None = self._peek_next_token()
if token is not None and isinstance(token, _token.Minus):
token = next(self._lexer)
primary: AstNode[QuantityT] | None = self._parse_primary()
Expand Down Expand Up @@ -167,7 +174,7 @@ def _parse_bracketed(self) -> AstNode[QuantityT] | None:
message="Expected expression",
)

token: _token.Token | None = self._lexer.peek()
token: _token.Token | None = self._peek_next_token()
if token is None or not isinstance(token, _token.CloseParen):
raise FormulaSyntaxError(
formula=self._formula,
Expand All @@ -194,7 +201,7 @@ def _parse_function_call(self) -> AstNode[QuantityT] | None:

params: list[AstNode[QuantityT]] = []

token: _token.Token | None = self._lexer.peek()
token: _token.Token | None = self._peek_next_token()
if token is None or not isinstance(token, _token.OpenParen):
raise FormulaSyntaxError(
formula=self._formula,
Expand All @@ -213,7 +220,7 @@ def _parse_function_call(self) -> AstNode[QuantityT] | None:
)
params.append(param)

token = self._lexer.peek()
token = self._peek_next_token()
if token is None:
raise FormulaSyntaxError(
formula=self._formula,
Expand Down Expand Up @@ -245,7 +252,7 @@ def _parse_primary(self) -> AstNode[QuantityT] | None:
- A function call
- A bracketed expression
"""
token: _token.Token | None = self._lexer.peek()
token: _token.Token | None = self._peek_next_token()
if token is None:
return None

Expand Down Expand Up @@ -286,7 +293,7 @@ def parse(self) -> Formula[QuantityT]:
message="Empty formula",
)
# There should not be any tokens left
token = self._lexer.peek()
token = self._peek_next_token()
if token is not None:
raise FormulaSyntaxError(
formula=self._formula,
Expand Down
53 changes: 0 additions & 53 deletions src/frequenz/sdk/timeseries/formulas/_peekable.py

This file was deleted.

Loading