Skip to content

Commit

Permalink
Type annotations & generate protobuf type stubs
Browse files Browse the repository at this point in the history
  • Loading branch information
MetRonnie authored and dwsutherland committed May 23, 2024
1 parent e118655 commit 63a6a37
Show file tree
Hide file tree
Showing 8 changed files with 682 additions and 51 deletions.
2 changes: 1 addition & 1 deletion cylc/flow/data_messages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ syntax = "proto3";
* message modules.
*
* Command:
* $ protoc -I=./ --python_out=./ data_messages.proto && sed -i '1i# type: ignore' data_messages_pb2.py
* $ protoc -I=./ --python_out=./ --pyi_out=./ data_messages.proto
*
* Pre-compiled protoc binary may be download from:
* https://github.com/protocolbuffers/protobuf/releases
Expand Down
1 change: 0 additions & 1 deletion cylc/flow/data_messages_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

630 changes: 630 additions & 0 deletions cylc/flow/data_messages_pb2.pyi

Large diffs are not rendered by default.

84 changes: 44 additions & 40 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@

from cylc.flow import __version__ as CYLC_VERSION, LOG
from cylc.flow.cycling.loader import get_point
from cylc.flow.data_messages_pb2 import ( # type: ignore
from cylc.flow.data_messages_pb2 import (
PbEdge, PbEntireWorkflow, PbFamily, PbFamilyProxy, PbJob, PbTask,
PbTaskProxy, PbWorkflow, PbRuntime, AllDeltas, EDeltas, FDeltas,
FPDeltas, JDeltas, TDeltas, TPDeltas, WDeltas)
Expand Down Expand Up @@ -112,7 +112,7 @@

if TYPE_CHECKING:
from cylc.flow.cycling import PointBase

from cylc.flow.flow_mgr import FlowNums

EDGES = 'edges'
FAMILIES = 'families'
Expand Down Expand Up @@ -713,10 +713,10 @@ def generate_definition_elements(self):
def increment_graph_window(
self,
source_tokens: Tokens,
point,
flow_nums,
is_manual_submit=False,
itask=None
point: 'PointBase',
flow_nums: 'FlowNums',
is_manual_submit: bool = False,
itask: Optional['TaskProxy'] = None
) -> None:
"""Generate graph window about active task proxy to n-edge-distance.
Expand All @@ -731,16 +731,12 @@ def increment_graph_window(
assessed for pruning eligibility.
Args:
source_tokens (cylc.flow.id.Tokens)
point (PointBase)
flow_nums (set)
is_manual_submit (bool)
itask (cylc.flow.task_proxy.TaskProxy):
source_tokens
point
flow_nums
is_manual_submit
itask:
Active/Other task proxy, passed in with pool invocation.
Returns:
None
"""

# common refrences
Expand Down Expand Up @@ -1149,28 +1145,23 @@ def add_pool_node(self, name, point):
def generate_ghost_task(
self,
tokens: Tokens,
point,
flow_nums,
is_parent=False,
itask=None,
n_depth=0,
):
point: 'PointBase',
flow_nums: 'FlowNums',
is_parent: bool = False,
itask: Optional['TaskProxy'] = None,
n_depth: int = 0,
) -> None:
"""Create task-point element populated with static data.
Args:
source_tokens (cylc.flow.id.Tokens)
point (PointBase)
flow_nums (set)
is_parent (bool):
source_tokens
point
flow_nums
is_parent:
Used to determine whether to load DB state.
itask (cylc.flow.task_proxy.TaskProxy):
itask:
Update task-node from corresponding task proxy object.
n_depth (int): n-window graph edge distance.
Returns:
None
n_depth: n-window graph edge distance.
"""
tp_id = tokens.id
if (
Expand Down Expand Up @@ -1486,7 +1477,11 @@ def apply_task_proxy_db_history(self):

self.db_load_task_proxies.clear()

def _process_internal_task_proxy(self, itask, tproxy):
def _process_internal_task_proxy(
self,
itask: 'TaskProxy',
tproxy: PbTaskProxy,
):
"""Extract information from internal task proxy object."""

update_time = time()
Expand Down Expand Up @@ -1569,6 +1564,7 @@ def insert_job(self, name, cycle_point, status, job_conf):
cycle=str(cycle_point),
task=name,
)
tproxy: Optional[PbTaskProxy]
tp_id, tproxy = self.store_node_fetcher(tp_tokens)
if not tproxy:
return
Expand Down Expand Up @@ -1642,6 +1638,7 @@ def insert_db_job(self, row_idx, row):
cycle=point_string,
task=name,
)
tproxy: Optional[PbTaskProxy]
tp_id, tproxy = self.store_node_fetcher(tp_tokens)
if not tproxy:
return
Expand Down Expand Up @@ -1761,9 +1758,8 @@ def update_workflow_states(self):
self.apply_delta_checksum()
self.publish_deltas = self.get_publish_deltas()

def window_resize_rewalk(self):
def window_resize_rewalk(self) -> None:
"""Re-create data-store n-window on resize."""
tokens: Tokens
# Gather pre-resize window nodes
if not self.all_n_window_nodes:
self.all_n_window_nodes = set().union(*(
Expand All @@ -1777,7 +1773,8 @@ def window_resize_rewalk(self):
self.n_window_node_walks.clear()
for tp_id in self.all_task_pool:
tokens = Tokens(tp_id)
tp_id, tproxy = self.store_node_fetcher(tokens)
tproxy: PbTaskProxy
_, tproxy = self.store_node_fetcher(tokens)
self.increment_graph_window(
tokens,
get_point(tokens['cycle']),
Expand Down Expand Up @@ -2279,6 +2276,7 @@ def delta_task_state(self, itask):
objects from the workflow task pool.
"""
tproxy: Optional[PbTaskProxy]
tp_id, tproxy = self.store_node_fetcher(itask.tokens)
if not tproxy:
return
Expand Down Expand Up @@ -2331,7 +2329,7 @@ def delta_task_held(
task=name,
cycle=str(cycle),
)

tproxy: Optional[PbTaskProxy]
tp_id, tproxy = self.store_node_fetcher(tokens)
if not tproxy:
return
Expand All @@ -2351,6 +2349,7 @@ def delta_task_queued(self, itask: TaskProxy) -> None:
objects from the workflow task pool.
"""
tproxy: Optional[PbTaskProxy]
tp_id, tproxy = self.store_node_fetcher(itask.tokens)
if not tproxy:
return
Expand All @@ -2370,6 +2369,7 @@ def delta_task_runahead(self, itask: TaskProxy) -> None:
objects from the workflow task pool.
"""
tproxy: Optional[PbTaskProxy]
tp_id, tproxy = self.store_node_fetcher(itask.tokens)
if not tproxy:
return
Expand All @@ -2393,6 +2393,7 @@ def delta_task_output(
objects from the workflow task pool.
"""
tproxy: Optional[PbTaskProxy]
tp_id, tproxy = self.store_node_fetcher(itask.tokens)
if not tproxy:
return
Expand All @@ -2419,6 +2420,7 @@ def delta_task_outputs(self, itask: TaskProxy) -> None:
objects from the workflow task pool.
"""
tproxy: Optional[PbTaskProxy]
tp_id, tproxy = self.store_node_fetcher(itask.tokens)
if not tproxy:
return
Expand All @@ -2444,6 +2446,7 @@ def delta_task_prerequisite(self, itask: TaskProxy) -> None:
objects from the workflow task pool.
"""
tproxy: Optional[PbTaskProxy]
tp_id, tproxy = self.store_node_fetcher(itask.tokens)
if not tproxy:
return
Expand Down Expand Up @@ -2472,13 +2475,14 @@ def delta_task_ext_trigger(
"""Create delta for change in task proxy external_trigger.
Args:
itask (cylc.flow.task_proxy.TaskProxy):
itask:
Update task-node from corresponding task proxy
objects from the workflow task pool.
trig (str): Trigger ID.
message (str): Trigger message.
trig: Trigger ID.
message: Trigger message.
"""
tproxy: Optional[PbTaskProxy]
tp_id, tproxy = self.store_node_fetcher(itask.tokens)
if not tproxy:
return
Expand Down
4 changes: 2 additions & 2 deletions cylc/flow/network/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@
from cylc.flow.network.resolvers import Resolvers
from cylc.flow.network.schema import schema
from cylc.flow.data_store_mgr import DELTAS_MAP
from cylc.flow.data_messages_pb2 import PbEntireWorkflow # type: ignore
from cylc.flow.data_messages_pb2 import PbEntireWorkflow

if TYPE_CHECKING:
from cylc.flow.scheduler import Scheduler
from graphql.execution import ExecutionResult


# maps server methods to the protobuf message (for client/UIS import)
PB_METHOD_MAP = {
PB_METHOD_MAP: Dict[str, Any] = {
'pb_entire_workflow': PbEntireWorkflow,
'pb_data_elements': DELTAS_MAP
}
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/prerequisite.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

from cylc.flow.cycling.loader import get_point
from cylc.flow.exceptions import TriggerExpressionError
from cylc.flow.data_messages_pb2 import ( # type: ignore
from cylc.flow.data_messages_pb2 import (
PbPrerequisite,
PbCondition,
)
Expand Down
6 changes: 4 additions & 2 deletions cylc/flow/scripts/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from google.protobuf.json_format import MessageToDict
import json
import sys
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, cast

from cylc.flow.id_cli import parse_id
from cylc.flow.option_parsers import (
Expand All @@ -40,6 +40,7 @@

if TYPE_CHECKING:
from optparse import Values
from google.protobuf.message import Message


INTERNAL = True
Expand Down Expand Up @@ -76,11 +77,12 @@ def main(_, options: 'Values', workflow_id: str, func: str) -> None:
sys.stdin.close()
res = pclient(func, kwargs)
if func in PB_METHOD_MAP:
pb_msg: Message
if 'element_type' in kwargs:
pb_msg = PB_METHOD_MAP[func][kwargs['element_type']]()
else:
pb_msg = PB_METHOD_MAP[func]()
pb_msg.ParseFromString(res)
pb_msg.ParseFromString(cast('bytes', res))
res_msg: object = MessageToDict(pb_msg)
else:
res_msg = res
Expand Down
4 changes: 0 additions & 4 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,3 @@ exclude= cylc/flow/etc/tutorial/.*
# Suppress the following messages:
# By default the bodies of untyped functions are not checked, consider using --check-untyped-defs
disable_error_code = annotation-unchecked

# For some reason, couldn't exclude this with the exclude directive above
[mypy-cylc.flow.data_messages_pb2]
ignore_errors = True

0 comments on commit 63a6a37

Please sign in to comment.