Skip to content

Commit

Permalink
Split monitoring router and hub into separate files
Browse files Browse the repository at this point in the history
This is ongoing work to split up monitoring.py into more
topical pieces - see also PRs #2468, #2439
  • Loading branch information
benclifford committed Mar 19, 2024
1 parent dc521d0 commit d0c073b
Show file tree
Hide file tree
Showing 2 changed files with 210 additions and 190 deletions.
192 changes: 2 additions & 190 deletions parsl/monitoring/monitoring.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
from __future__ import annotations

import os
import socket
import time
import pickle
import logging
import typeguard
import zmq
Expand All @@ -22,8 +20,9 @@

from parsl.serialize import deserialize

from parsl.monitoring.router import router_starter
from parsl.monitoring.message_type import MessageType
from parsl.monitoring.types import AddressedMonitoringMessage, TaggedMonitoringMessage
from parsl.monitoring.types import AddressedMonitoringMessage
from typing import cast, Any, Callable, Dict, Optional, Sequence, Tuple, Union, TYPE_CHECKING

_db_manager_excepts: Optional[Exception]
Expand Down Expand Up @@ -325,190 +324,3 @@ def filesystem_receiver(logdir: str, q: "queue.Queue[AddressedMonitoringMessage]
logger.exception(f"Exception processing {filename} - probably will be retried next iteration")

time.sleep(1) # whats a good time for this poll?


class MonitoringRouter:

def __init__(self,
*,
hub_address: str,
hub_port: Optional[int] = None,
hub_port_range: Tuple[int, int] = (55050, 56000),

monitoring_hub_address: str = "127.0.0.1",
logdir: str = ".",
run_id: str,
logging_level: int = logging.INFO,
atexit_timeout: int = 3 # in seconds
):
""" Initializes a monitoring configuration class.
Parameters
----------
hub_address : str
The ip address at which the workers will be able to reach the Hub.
hub_port : int
The specific port at which workers will be able to reach the Hub via UDP. Default: None
hub_port_range : tuple(int, int)
The MonitoringHub picks ports at random from the range which will be used by Hub.
This is overridden when the hub_port option is set. Default: (55050, 56000)
logdir : str
Parsl log directory paths. Logs and temp files go here. Default: '.'
logging_level : int
Logging level as defined in the logging module. Default: logging.INFO
atexit_timeout : float, optional
The amount of time in seconds to terminate the hub without receiving any messages, after the last dfk workflow message is received.
"""
os.makedirs(logdir, exist_ok=True)
self.logger = set_file_logger("{}/monitoring_router.log".format(logdir),
name="monitoring_router",
level=logging_level)
self.logger.debug("Monitoring router starting")

self.hub_address = hub_address
self.atexit_timeout = atexit_timeout
self.run_id = run_id

self.loop_freq = 10.0 # milliseconds

# Initialize the UDP socket
self.sock = socket.socket(socket.AF_INET,
socket.SOCK_DGRAM,
socket.IPPROTO_UDP)

# We are trying to bind to all interfaces with 0.0.0.0
if not hub_port:
self.sock.bind(('0.0.0.0', 0))
self.hub_port = self.sock.getsockname()[1]
else:
self.hub_port = hub_port
try:
self.sock.bind(('0.0.0.0', self.hub_port))
except Exception as e:
raise RuntimeError(f"Could not bind to hub_port {hub_port} because: {e}")
self.sock.settimeout(self.loop_freq / 1000)
self.logger.info("Initialized the UDP socket on 0.0.0.0:{}".format(self.hub_port))

self._context = zmq.Context()
self.ic_channel = self._context.socket(zmq.DEALER)
self.ic_channel.setsockopt(zmq.LINGER, 0)
self.ic_channel.set_hwm(0)
self.ic_channel.RCVTIMEO = int(self.loop_freq) # in milliseconds
self.logger.debug("hub_address: {}. hub_port_range {}".format(hub_address, hub_port_range))
self.ic_port = self.ic_channel.bind_to_random_port("tcp://*",
min_port=hub_port_range[0],
max_port=hub_port_range[1])

def start(self,
priority_msgs: "queue.Queue[AddressedMonitoringMessage]",
node_msgs: "queue.Queue[AddressedMonitoringMessage]",
block_msgs: "queue.Queue[AddressedMonitoringMessage]",
resource_msgs: "queue.Queue[AddressedMonitoringMessage]") -> None:
try:
router_keep_going = True
while router_keep_going:
try:
data, addr = self.sock.recvfrom(2048)
resource_msg = pickle.loads(data)
self.logger.debug("Got UDP Message from {}: {}".format(addr, resource_msg))
resource_msgs.put((resource_msg, addr))
except socket.timeout:
pass

try:
dfk_loop_start = time.time()
while time.time() - dfk_loop_start < 1.0: # TODO make configurable
# note that nothing checks that msg really is of the annotated type
msg: TaggedMonitoringMessage
msg = self.ic_channel.recv_pyobj()

assert isinstance(msg, tuple), "IC Channel expects only tuples, got {}".format(msg)
assert len(msg) >= 1, "IC Channel expects tuples of length at least 1, got {}".format(msg)
assert len(msg) == 2, "IC Channel expects message tuples of exactly length 2, got {}".format(msg)

msg_0: AddressedMonitoringMessage
msg_0 = (msg, 0)

if msg[0] == MessageType.NODE_INFO:
msg[1]['run_id'] = self.run_id
node_msgs.put(msg_0)
elif msg[0] == MessageType.RESOURCE_INFO:
resource_msgs.put(msg_0)
elif msg[0] == MessageType.BLOCK_INFO:
block_msgs.put(msg_0)
elif msg[0] == MessageType.TASK_INFO:
priority_msgs.put(msg_0)
elif msg[0] == MessageType.WORKFLOW_INFO:
priority_msgs.put(msg_0)
if 'exit_now' in msg[1] and msg[1]['exit_now']:
router_keep_going = False
else:
# There is a type: ignore here because if msg[0]
# is of the correct type, this code is unreachable,
# but there is no verification that the message
# received from ic_channel.recv_pyobj() is actually
# of that type.
self.logger.error("Discarding message " # type: ignore[unreachable]
f"from interchange with unknown type {msg[0].value}")
except zmq.Again:
pass
except Exception:
# This will catch malformed messages. What happens if the
# channel is broken in such a way that it always raises
# an exception? Looping on this would maybe be the wrong
# thing to do.
self.logger.warning("Failure processing a ZMQ message", exc_info=True)

self.logger.info("Monitoring router draining")
last_msg_received_time = time.time()
while time.time() - last_msg_received_time < self.atexit_timeout:
try:
data, addr = self.sock.recvfrom(2048)
msg = pickle.loads(data)
self.logger.debug("Got UDP Message from {}: {}".format(addr, msg))
resource_msgs.put((msg, addr))
last_msg_received_time = time.time()
except socket.timeout:
pass

self.logger.info("Monitoring router finishing normally")
finally:
self.logger.info("Monitoring router finished")


@wrap_with_logs
def router_starter(comm_q: "queue.Queue[Union[Tuple[int, int], str]]",
exception_q: "queue.Queue[Tuple[str, str]]",
priority_msgs: "queue.Queue[AddressedMonitoringMessage]",
node_msgs: "queue.Queue[AddressedMonitoringMessage]",
block_msgs: "queue.Queue[AddressedMonitoringMessage]",
resource_msgs: "queue.Queue[AddressedMonitoringMessage]",

hub_address: str,
hub_port: Optional[int],
hub_port_range: Tuple[int, int],

logdir: str,
logging_level: int,
run_id: str) -> None:
setproctitle("parsl: monitoring router")
try:
router = MonitoringRouter(hub_address=hub_address,
hub_port=hub_port,
hub_port_range=hub_port_range,
logdir=logdir,
logging_level=logging_level,
run_id=run_id)
except Exception as e:
logger.error("MonitoringRouter construction failed.", exc_info=True)
comm_q.put(f"Monitoring router construction failed: {e}")
else:
comm_q.put((router.hub_port, router.ic_port))

router.logger.info("Starting MonitoringRouter in router_starter")
try:
router.start(priority_msgs, node_msgs, block_msgs, resource_msgs)
except Exception as e:
router.logger.exception("router.start exception")
exception_q.put(('Hub', str(e)))

0 comments on commit d0c073b

Please sign in to comment.