Skip to content

Commit ae7af7d

Browse files
authored
Add lots of simple type annotations throughout parsl/dataflow (#2414)
This comes from the benc-mypy branch The benc-mypy branch contains some more complicated situations, which are not merged by this PR.
1 parent 36f0407 commit ae7af7d

File tree

4 files changed

+30
-27
lines changed

4 files changed

+30
-27
lines changed

parsl/dataflow/dflow.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ def _create_task_log_info(self, task_record):
244244
task_log_info['task_joins'] = None
245245
return task_log_info
246246

247-
def _count_deps(self, depends):
247+
def _count_deps(self, depends: Sequence[Future]) -> int:
248248
"""Count the number of unresolved futures in the list depends.
249249
"""
250250
count = 0
@@ -256,7 +256,7 @@ def _count_deps(self, depends):
256256
return count
257257

258258
@property
259-
def config(self):
259+
def config(self) -> Config:
260260
"""Returns the fully initialized config that the DFK is actively using.
261261
262262
Returns:
@@ -483,14 +483,14 @@ def _unwrap_remote_exception_wrapper(future: Future) -> Any:
483483
result.reraise()
484484
return result
485485

486-
def wipe_task(self, task_id):
486+
def wipe_task(self, task_id: int) -> None:
487487
""" Remove task with task_id from the internal tasks table
488488
"""
489489
if self.config.garbage_collect:
490490
del self.tasks[task_id]
491491

492492
@staticmethod
493-
def check_staging_inhibited(kwargs):
493+
def check_staging_inhibited(kwargs: Dict[str, Any]) -> bool:
494494
return kwargs.get('_parsl_staging_inhibit', False)
495495

496496
def launch_if_ready(self, task_record: TaskRecord) -> None:
@@ -642,7 +642,7 @@ def launch_task(self, task_record: TaskRecord, executable: Callable, *args: Tupl
642642

643643
return exec_fu
644644

645-
def _add_input_deps(self, executor, args, kwargs, func):
645+
def _add_input_deps(self, executor: str, args: Sequence[Any], kwargs: Dict[str, Any], func: Callable) -> Tuple[Sequence[Any], Dict[str, Any], Callable]:
646646
"""Look for inputs of the app that are files. Give the data manager
647647
the opportunity to replace a file with a data future for that file,
648648
for example wrapping the result of a staging action.
@@ -672,7 +672,7 @@ def _add_input_deps(self, executor, args, kwargs, func):
672672

673673
return tuple(newargs), kwargs, func
674674

675-
def _add_output_deps(self, executor, args, kwargs, app_fut, func):
675+
def _add_output_deps(self, executor: str, args: Sequence[Any], kwargs: Dict[str, Any], app_fut: AppFuture, func: Callable) -> Callable:
676676
logger.debug("Adding output dependencies")
677677
outputs = kwargs.get('outputs', [])
678678
app_fut._outputs = []
@@ -720,7 +720,7 @@ def _gather_all_deps(self, args: Sequence[Any], kwargs: Dict[str, Any]) -> List[
720720
"""
721721
depends: List[Future] = []
722722

723-
def check_dep(d):
723+
def check_dep(d: Any) -> None:
724724
if isinstance(d, Future):
725725
depends.extend([d])
726726

@@ -953,7 +953,7 @@ def submit(self, func, app_args, executors='all', cache=False, ignore_for_cache=
953953

954954
for d in depends:
955955

956-
def callback_adapter(dep_fut):
956+
def callback_adapter(dep_fut: Future) -> None:
957957
self.launch_if_ready(task_def)
958958

959959
try:
@@ -972,7 +972,7 @@ def callback_adapter(dep_fut):
972972
# and a drain function might look like this.
973973
# If tasks have their states changed, this won't work properly
974974
# but we can validate that...
975-
def log_task_states(self):
975+
def log_task_states(self) -> None:
976976
logger.info("Summary of tasks in DFK:")
977977

978978
with self.task_state_counts_lock:
@@ -1221,7 +1221,7 @@ def checkpoint(self, tasks: Optional[Sequence[TaskRecord]] = None) -> str:
12211221

12221222
return checkpoint_dir
12231223

1224-
def _load_checkpoints(self, checkpointDirs):
1224+
def _load_checkpoints(self, checkpointDirs: Sequence[str]) -> Dict[str, Future[Any]]:
12251225
"""Load a checkpoint file into a lookup table.
12261226
12271227
The data being loaded from the pickle file mostly contains input
@@ -1248,7 +1248,7 @@ def _load_checkpoints(self, checkpointDirs):
12481248
try:
12491249
data = pickle.load(f)
12501250
# Copy and hash only the input attributes
1251-
memo_fu = Future()
1251+
memo_fu: Future = Future()
12521252
assert data['exception'] is None
12531253
memo_fu.set_result(data['result'])
12541254
memo_lookup_table[data['hash']] = memo_fu

parsl/dataflow/flow_control.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22
import threading
33
import time
44

5+
from typing import Sequence
6+
7+
from parsl.executors.base import ParslExecutor
58
from parsl.dataflow.task_status_poller import TaskStatusPoller
69

710
logger = logging.getLogger(__name__)
@@ -67,7 +70,7 @@ def __init__(self, dfk, *args, threshold=20, interval=5):
6770
self._thread.daemon = True
6871
self._thread.start()
6972

70-
def _wake_up_timer(self, kill_event):
73+
def _wake_up_timer(self, kill_event: threading.Event) -> None:
7174
"""Internal. This is the function that the thread will execute.
7275
waits on an event so that the thread can make a quick exit when close() is called
7376
@@ -95,7 +98,7 @@ def notify(self, event_id):
9598
logger.debug("Eventcount >= threshold")
9699
self.make_callback()
97100

98-
def make_callback(self):
101+
def make_callback(self) -> None:
99102
"""Makes the callback and resets the timer.
100103
"""
101104
self._wake_up_time = time.time() + self.interval
@@ -105,10 +108,10 @@ def make_callback(self):
105108
logger.error("Flow control callback threw an exception - logging and proceeding anyway", exc_info=True)
106109
self._event_buffer = []
107110

108-
def add_executors(self, executors):
111+
def add_executors(self, executors: Sequence[ParslExecutor]) -> None:
109112
self.task_status_poller.add_executors(executors)
110113

111-
def close(self):
114+
def close(self) -> None:
112115
"""Merge the threads and terminate."""
113116
self._kill_event.set()
114117
self._thread.join()
@@ -161,7 +164,7 @@ def __init__(self, callback, *args, interval=5, name=None):
161164
self._thread.daemon = True
162165
self._thread.start()
163166

164-
def _wake_up_timer(self, kill_event):
167+
def _wake_up_timer(self, kill_event: threading.Event) -> None:
165168
"""Internal. This is the function that the thread will execute.
166169
waits on an event so that the thread can make a quick exit when close() is called
167170
@@ -185,13 +188,13 @@ def _wake_up_timer(self, kill_event):
185188
else:
186189
print("Sleeping a bit more")
187190

188-
def make_callback(self):
191+
def make_callback(self) -> None:
189192
"""Makes the callback and resets the timer.
190193
"""
191194
self._wake_up_time = time.time() + self.interval
192195
self.callback(*self.cb_args)
193196

194-
def close(self):
197+
def close(self) -> None:
195198
"""Merge the threads and terminate.
196199
"""
197200
self._kill_event.set()

parsl/dataflow/strategy.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -130,18 +130,18 @@ def add_executors(self, executors):
130130
for executor in executors:
131131
self.executors[executor.label] = {'idle_since': None, 'config': executor.label}
132132

133-
def _strategy_noop(self, status: List[ExecutorStatus], tasks):
133+
def _strategy_noop(self, status: List[ExecutorStatus], tasks: List[int]) -> None:
134134
"""Do nothing.
135135
136136
Args:
137137
- tasks (task_ids): Not used here.
138138
"""
139139
logger.debug("strategy_noop: doing nothing")
140140

141-
def _strategy_simple(self, status_list, tasks):
141+
def _strategy_simple(self, status_list, tasks: List[int]) -> None:
142142
self._general_strategy(status_list, tasks, strategy_type='simple')
143143

144-
def _strategy_htex_auto_scale(self, status_list, tasks):
144+
def _strategy_htex_auto_scale(self, status_list, tasks: List[int]) -> None:
145145
"""HTEX specific auto scaling strategy
146146
147147
This strategy works only for HTEX. This strategy will scale out by

parsl/dataflow/task_status_poller.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,10 @@ def __init__(self, executor: ParslExecutor, dfk: "parsl.dataflow.dflow.DataFlowK
3636
self.hub_channel.connect("tcp://{}:{}".format(hub_address, hub_port))
3737
logger.info("Monitoring enabled on task status poller")
3838

39-
def _should_poll(self, now: float):
39+
def _should_poll(self, now: float) -> bool:
4040
return now >= self._last_poll_time + self._interval
4141

42-
def poll(self, now: float):
42+
def poll(self, now: float) -> None:
4343
if self._should_poll(now):
4444
previous_status = self._status
4545
self._status = self._executor.status()
@@ -53,7 +53,7 @@ def poll(self, now: float):
5353
if delta_status:
5454
self.send_monitoring_info(delta_status)
5555

56-
def send_monitoring_info(self, status=None):
56+
def send_monitoring_info(self, status: Dict):
5757
# Send monitoring info for HTEX when monitoring enabled
5858
if self.monitoring_enabled:
5959
msg = self._executor.create_monitoring_info(status)
@@ -95,7 +95,7 @@ def scale_out(self, n):
9595
self._status.update(new_status)
9696
return block_ids
9797

98-
def __repr__(self):
98+
def __repr__(self) -> str:
9999
return self._status.__repr__()
100100

101101

@@ -111,12 +111,12 @@ def poll(self, tasks=None):
111111
self._error_handler.run(self._poll_items)
112112
self._strategy.strategize(self._poll_items, tasks)
113113

114-
def _update_state(self):
114+
def _update_state(self) -> None:
115115
now = time.time()
116116
for item in self._poll_items:
117117
item.poll(now)
118118

119-
def add_executors(self, executors: Sequence[ParslExecutor]):
119+
def add_executors(self, executors: Sequence[ParslExecutor]) -> None:
120120
for executor in executors:
121121
if executor.status_polling_interval > 0:
122122
logger.debug("Adding executor {}".format(executor.label))

0 commit comments

Comments
 (0)