Skip to content

Commit

Permalink
Rename hub_port in executors to clarify it is for ZMQ (#3439)
Browse files Browse the repository at this point in the history
This follows a rename in PR #3266 which focused on making the same change
inside parsl/monitoring/

A final use of hub_port is left in place in the MonitoringHub constructor
because it is user-facing - future work on monitoring radio plugins might
change this significantly (or not) so I am leaving it as is for now.
  • Loading branch information
benclifford committed May 17, 2024
1 parent bdb9e63 commit f210753
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 14 deletions.
2 changes: 1 addition & 1 deletion parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1156,7 +1156,7 @@ def add_executors(self, executors: Sequence[ParslExecutor]) -> None:
executor.run_id = self.run_id
executor.run_dir = self.run_dir
executor.hub_address = self.hub_address
executor.hub_port = self.hub_zmq_port
executor.hub_zmq_port = self.hub_zmq_port
if self.monitoring:
executor.monitoring_radio = self.monitoring.radio
if hasattr(executor, 'provider'):
Expand Down
14 changes: 7 additions & 7 deletions parsl/executors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@ def __init__(
self,
*,
hub_address: Optional[str] = None,
hub_port: Optional[int] = None,
hub_zmq_port: Optional[int] = None,
monitoring_radio: Optional[MonitoringRadio] = None,
run_dir: str = ".",
run_id: Optional[str] = None,
):
self.hub_address = hub_address
self.hub_port = hub_port
self.hub_zmq_port = hub_zmq_port
self.monitoring_radio = monitoring_radio
self.run_dir = os.path.abspath(run_dir)
self.run_id = run_id
Expand Down Expand Up @@ -136,14 +136,14 @@ def hub_address(self, value: Optional[str]) -> None:
self._hub_address = value

@property
def hub_port(self) -> Optional[int]:
def hub_zmq_port(self) -> Optional[int]:
"""Port to the Hub for monitoring.
"""
return self._hub_port
return self._hub_zmq_port

@hub_port.setter
def hub_port(self, value: Optional[int]) -> None:
self._hub_port = value
@hub_zmq_port.setter
def hub_zmq_port(self, value: Optional[int]) -> None:
self._hub_zmq_port = value

@property
def monitoring_radio(self) -> Optional[MonitoringRadio]:
Expand Down
2 changes: 1 addition & 1 deletion parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ def _start_local_interchange_process(self):
"worker_ports": self.worker_ports,
"worker_port_range": self.worker_port_range,
"hub_address": self.hub_address,
"hub_port": self.hub_port,
"hub_zmq_port": self.hub_zmq_port,
"logdir": self.logdir,
"heartbeat_threshold": self.heartbeat_threshold,
"poll_period": self.poll_period,
Expand Down
10 changes: 5 additions & 5 deletions parsl/executors/high_throughput/interchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def __init__(self,
worker_ports: Optional[Tuple[int, int]] = None,
worker_port_range: Tuple[int, int] = (54000, 55000),
hub_address: Optional[str] = None,
hub_port: Optional[int] = None,
hub_zmq_port: Optional[int] = None,
heartbeat_threshold: int = 60,
logdir: str = ".",
logging_level: int = logging.INFO,
Expand Down Expand Up @@ -105,7 +105,7 @@ def __init__(self,
The IP address at which the interchange can send info about managers to when monitoring is enabled.
Default: None (meaning monitoring disabled)
hub_port : str
hub_zmq_port : str
The port at which the interchange can send info about managers to when monitoring is enabled.
Default: None (meaning monitoring disabled)
Expand Down Expand Up @@ -151,7 +151,7 @@ def __init__(self,
logger.info("Connected to client")

self.hub_address = hub_address
self.hub_port = hub_port
self.hub_zmq_port = hub_zmq_port

self.pending_task_queue: queue.Queue[Any] = queue.Queue(maxsize=10 ** 6)
self.count = 0
Expand Down Expand Up @@ -244,12 +244,12 @@ def task_puller(self) -> NoReturn:
logger.debug(f"Fetched {task_counter} tasks so far")

def _create_monitoring_channel(self) -> Optional[zmq.Socket]:
if self.hub_address and self.hub_port:
if self.hub_address and self.hub_zmq_port:
logger.info("Connecting to MonitoringHub")
# This is a one-off because monitoring is unencrypted
hub_channel = zmq.Context().socket(zmq.DEALER)
hub_channel.set_hwm(0)
hub_channel.connect("tcp://{}:{}".format(self.hub_address, self.hub_port))
hub_channel.connect("tcp://{}:{}".format(self.hub_address, self.hub_zmq_port))
logger.info("Connected to MonitoringHub")
return hub_channel
else:
Expand Down

0 comments on commit f210753

Please sign in to comment.