Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ While the new TargetCategory class supports subtypes, only reading them is curre
* `TargetIds(ComponentIds(1), ComponentIds(2), ComponentIds(3))`
* `TargetCategories` can be used to specify one or more target categories:
* `TargetCategories(ComponentCategory.BATTERY, ComponentCategory.INVERTER)`
* Dispatch ids and microgrid ids are no longer simple `int` types but are now wrapped in `DispatchId` and `MicrogridId` classes, respectively. This allows for better type safety and clarity in the codebase.

## New Features

Expand Down
5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ classifiers = [
]
requires-python = ">= 3.11, < 4"
dependencies = [
"typing-extensions >= 4.6.1, < 5",
"typing-extensions >= 4.13.0, < 5",
"frequenz-api-dispatch == 1.0.0-rc2",
"frequenz-client-base >= 0.8.0, < 0.12.0",
"frequenz-client-common >= 0.1.0, < 0.4.0",
"frequenz-client-common >= 0.3.2, < 0.4.0",
"frequenz-core >= 1.0.2, < 2.0.0",
"grpcio >= 1.70.0, < 2",
"python-dateutil >= 2.8.2, < 3.0",
]
Expand Down
19 changes: 12 additions & 7 deletions src/frequenz/client/dispatch/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
from prompt_toolkit.patch_stdout import patch_stdout
from prompt_toolkit.shortcuts import CompleteStyle

from frequenz.client.common.microgrid import MicrogridId

from ._cli_types import (
FuzzyDateTime,
FuzzyIntRange,
Expand All @@ -27,7 +29,7 @@
)
from ._client import DispatchApiClient
from .recurrence import EndCriteria, Frequency, RecurrenceRule, Weekday
from .types import Dispatch, DispatchEvent
from .types import Dispatch, DispatchEvent, DispatchId


def format_datetime(dt: datetime | None) -> str:
Expand Down Expand Up @@ -260,7 +262,7 @@ async def list_(ctx: click.Context, /, **filters: Any) -> None:
@cli.command("stream")
@click.pass_context
@click.argument("microgrid-id", required=True, type=int)
async def stream(ctx: click.Context, microgrid_id: int) -> None:
async def stream(ctx: click.Context, microgrid_id: MicrogridId) -> None:
"""Stream dispatches."""
event_stream: Receiver[DispatchEvent] = ctx.obj["client"].stream(
microgrid_id=microgrid_id
Expand Down Expand Up @@ -452,8 +454,8 @@ async def create(
async def update(
ctx: click.Context,
/,
microgrid_id: int,
dispatch_id: int,
microgrid_id: MicrogridId,
dispatch_id: DispatchId,
**new_fields: dict[str, Any],
) -> None:
"""Update a dispatch."""
Expand Down Expand Up @@ -499,14 +501,17 @@ def skip_field(value: Any) -> bool:
@click.argument("microgrid-id", required=True, type=int)
@click.argument("dispatch_ids", type=int, nargs=-1) # Allow multiple IDs
@click.pass_context
async def get(ctx: click.Context, microgrid_id: int, dispatch_ids: List[int]) -> None:
async def get(
ctx: click.Context, microgrid_id: MicrogridId, dispatch_ids: List[int]
) -> None:
"""Get one or multiple dispatches."""
num_failed = 0

for dispatch_id in dispatch_ids:
try:
dispatch = await ctx.obj["client"].get(
microgrid_id=microgrid_id, dispatch_id=dispatch_id
microgrid_id=microgrid_id,
dispatch_id=DispatchId(dispatch_id),
)
if ctx.obj["raw"]:
click.echo(pformat(dispatch, compact=True))
Expand Down Expand Up @@ -537,7 +542,7 @@ async def repl(
@click.argument("dispatch_ids", type=FuzzyIntRange(), nargs=-1) # Allow multiple IDs
@click.pass_context
async def delete(
ctx: click.Context, microgrid_id: int, dispatch_ids: list[list[int]]
ctx: click.Context, microgrid_id: MicrogridId, dispatch_ids: list[list[int]]
) -> None:
"""Delete multiple dispatches.

Expand Down
37 changes: 22 additions & 15 deletions src/frequenz/client/dispatch/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@
from frequenz.client.base.exception import ClientNotConnected
from frequenz.client.base.retry import LinearBackoff
from frequenz.client.base.streaming import GrpcStreamBroadcaster
from frequenz.client.common.microgrid import MicrogridId

from ._internal_types import DispatchCreateRequest
from .recurrence import RecurrenceRule
from .types import (
Dispatch,
DispatchEvent,
DispatchId,
TargetComponents,
_target_components_to_protobuf,
)
Expand Down Expand Up @@ -83,7 +85,8 @@ def __init__(
)
self._metadata = (("key", key),)
self._streams: dict[
int, GrpcStreamBroadcaster[StreamMicrogridDispatchesResponse, DispatchEvent]
MicrogridId,
GrpcStreamBroadcaster[StreamMicrogridDispatchesResponse, DispatchEvent],
] = {}
"""A dictionary of streamers, keyed by microgrid_id."""

Expand Down Expand Up @@ -114,7 +117,7 @@ def stub(self) -> dispatch_pb2_grpc.MicrogridDispatchServiceAsyncStub:
# pylint: disable=too-many-arguments, too-many-locals
async def list(
self,
microgrid_id: int,
microgrid_id: MicrogridId,
*,
target_components: Iterator[TargetComponents] = iter(()),
start_from: datetime | None = None,
Expand All @@ -138,7 +141,7 @@ async def list(
key="key",
server_url="grpc://dispatch.url.goes.here.example.com"
)
async for page in client.list(microgrid_id=1):
async for page in client.list(microgrid_id=MicrogridId(1)):
for dispatch in page:
print(dispatch)
```
Expand Down Expand Up @@ -185,7 +188,7 @@ def to_interval(
)

request = ListMicrogridDispatchesRequest(
microgrid_id=microgrid_id,
microgrid_id=int(microgrid_id),
filter=filters,
pagination_params=(
PaginationParams(page_size=page_size) if page_size else None
Expand All @@ -211,7 +214,7 @@ def to_interval(
else:
break

def stream(self, microgrid_id: int) -> channels.Receiver[DispatchEvent]:
def stream(self, microgrid_id: MicrogridId) -> channels.Receiver[DispatchEvent]:
"""Receive a stream of dispatch events.

This function returns a receiver channel that can be used to receive
Expand All @@ -238,15 +241,15 @@ def stream(self, microgrid_id: int) -> channels.Receiver[DispatchEvent]:
return self._get_stream(microgrid_id).new_receiver()

def _get_stream(
self, microgrid_id: int
self, microgrid_id: MicrogridId
) -> GrpcStreamBroadcaster[StreamMicrogridDispatchesResponse, DispatchEvent]:
"""Get an instance to the streaming helper."""
broadcaster = self._streams.get(microgrid_id)
if broadcaster is not None and not broadcaster.is_running:
del self._streams[microgrid_id]
broadcaster = None
if broadcaster is None:
request = StreamMicrogridDispatchesRequest(microgrid_id=microgrid_id)
request = StreamMicrogridDispatchesRequest(microgrid_id=int(microgrid_id))
broadcaster = GrpcStreamBroadcaster(
stream_name="StreamMicrogridDispatches",
stream_method=lambda: cast(
Expand All @@ -266,7 +269,7 @@ def _get_stream(

async def create( # pylint: disable=too-many-positional-arguments
self,
microgrid_id: int,
microgrid_id: MicrogridId,
type: str, # pylint: disable=redefined-builtin
start_time: datetime | Literal["NOW"],
duration: timedelta | None,
Expand Down Expand Up @@ -334,8 +337,8 @@ async def create( # pylint: disable=too-many-positional-arguments
async def update(
self,
*,
microgrid_id: int,
dispatch_id: int,
microgrid_id: MicrogridId,
dispatch_id: DispatchId,
new_fields: dict[str, Any],
) -> Dispatch:
"""Update a dispatch.
Expand All @@ -359,7 +362,7 @@ async def update(
ValueError: If updating `type` or `dry_run`.
"""
msg = UpdateMicrogridDispatchRequest(
dispatch_id=dispatch_id, microgrid_id=microgrid_id
dispatch_id=int(dispatch_id), microgrid_id=int(microgrid_id)
)

for key, val in new_fields.items():
Expand Down Expand Up @@ -423,7 +426,9 @@ async def update(

return Dispatch.from_protobuf(response.dispatch)

async def get(self, *, microgrid_id: int, dispatch_id: int) -> Dispatch:
async def get(
self, *, microgrid_id: MicrogridId, dispatch_id: DispatchId
) -> Dispatch:
"""Get a dispatch.

Args:
Expand All @@ -434,7 +439,7 @@ async def get(self, *, microgrid_id: int, dispatch_id: int) -> Dispatch:
Dispatch: The dispatch.
"""
request = GetMicrogridDispatchRequest(
dispatch_id=dispatch_id, microgrid_id=microgrid_id
dispatch_id=int(dispatch_id), microgrid_id=int(microgrid_id)
)
response = await cast(
Awaitable[GetMicrogridDispatchResponse],
Expand All @@ -444,15 +449,17 @@ async def get(self, *, microgrid_id: int, dispatch_id: int) -> Dispatch:
)
return Dispatch.from_protobuf(response.dispatch)

async def delete(self, *, microgrid_id: int, dispatch_id: int) -> None:
async def delete(
self, *, microgrid_id: MicrogridId, dispatch_id: DispatchId
) -> None:
"""Delete a dispatch.

Args:
microgrid_id: The microgrid_id to delete the dispatch for.
dispatch_id: The dispatch_id to delete.
"""
request = DeleteMicrogridDispatchRequest(
dispatch_id=dispatch_id, microgrid_id=microgrid_id
dispatch_id=int(dispatch_id), microgrid_id=int(microgrid_id)
)
await cast(
Awaitable[None],
Expand Down
7 changes: 4 additions & 3 deletions src/frequenz/client/dispatch/_internal_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from google.protobuf.timestamp_pb2 import Timestamp

from frequenz.client.base.conversion import to_datetime, to_timestamp
from frequenz.client.common.microgrid import MicrogridId

from .recurrence import RecurrenceRule
from .types import (
Expand All @@ -36,7 +37,7 @@
class DispatchCreateRequest:
"""Request to create a new dispatch."""

microgrid_id: int
microgrid_id: MicrogridId
"""The identifier of the microgrid to which this dispatch belongs."""

type: str
Expand Down Expand Up @@ -93,7 +94,7 @@ def from_protobuf(
)

return DispatchCreateRequest(
microgrid_id=pb_object.microgrid_id,
microgrid_id=MicrogridId(pb_object.microgrid_id),
type=pb_object.dispatch_data.type,
start_time=(
"NOW"
Expand All @@ -118,7 +119,7 @@ def to_protobuf(self) -> PBDispatchCreateRequest:
payload.update(self.payload)

return PBDispatchCreateRequest(
microgrid_id=self.microgrid_id,
microgrid_id=int(self.microgrid_id),
dispatch_data=DispatchData(
type=self.type,
start_time=(
Expand Down
Loading
Loading