From d6378b5bcd794174a7505e970de818461694b918 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 14 Apr 2023 11:02:49 +0000 Subject: [PATCH 1/3] Traverse configuration heirarchy to report more usage information This PR introduces code to traverse the configuration object (in a similar manner to the RepresentationMixin style of logging the supplied configuration object) with the intention of giving each object a chance to report its own usage information. This PR modifies the HighThroughputExecutor to use this API to report richer usage information: a specific usage query is to ask about use of the enable_mpi_mode parameter and this modification supports that. Beware that this reports on configuration of components, and does not report any further usage unless those components are so augmented using the new API: for example, configurations by default will include three staging providers, even though I believe it is extremely rare that either the FTP or HTTP staging providers are actually used to stage data. (It is hopefully a straightforward change to add a UsageInformation implementation to report if those classes are actually used to stage data in any run). To support UsageInformation instances which report on usage during a run, the component tree is traversed both for the start message and the end message, and may result in different information in each message. An example start message looks like this: (pretty-formatted) {'correlator': 'f7595d08-7b94-49bc-b3d7-1ea7532b2f51', 'parsl_v': '1.3.0-dev', 'python_v': '3.12.2', 'platform.system': 'Linux', 'start': 1710150467, 'components': ['parsl.config.Config', {'c': 'parsl.executors.high_throughput.executor.HighThroughputExecutor', 'mpi': False}, 'parsl.providers.local.local.LocalProvider', 'parsl.channels.local.local.LocalChannel', 'parsl.launchers.launchers.SingleNodeLauncher', 'parsl.data_provider.ftp.FTPInTaskStaging', 'parsl.data_provider.http.HTTPInTaskStaging', 'parsl.data_provider.file_noop.NoOpFileStaging', 'parsl.monitoring.monitoring.MonitoringHub']} --- parsl/executors/high_throughput/executor.py | 6 +- parsl/usage_tracking/api.py | 66 +++++++++++++++++++++ parsl/usage_tracking/usage.py | 50 +++++++++------- 3 files changed, 100 insertions(+), 22 deletions(-) create mode 100644 parsl/usage_tracking/api.py diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index 422ef1b50b..dbff2a6703 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -14,6 +14,7 @@ import warnings import parsl.launchers +from parsl.usage_tracking.api import UsageInformation from parsl.serialize import pack_res_spec_apply_message, deserialize from parsl.serialize.errors import SerializationError, DeserializationError from parsl.app.errors import RemoteExceptionWrapper @@ -61,7 +62,7 @@ "--available-accelerators {accelerators}") -class HighThroughputExecutor(BlockProviderExecutor, RepresentationMixin): +class HighThroughputExecutor(BlockProviderExecutor, RepresentationMixin, UsageInformation): """Executor designed for cluster-scale The HighThroughputExecutor system has the following components: @@ -824,3 +825,6 @@ def shutdown(self, timeout: float = 10.0): self.interchange_proc.kill() logger.info("Finished HighThroughputExecutor shutdown attempt") + + def get_usage_information(self): + return {"mpi": self.enable_mpi_mode} diff --git a/parsl/usage_tracking/api.py b/parsl/usage_tracking/api.py new file mode 100644 index 0000000000..24ba040678 --- /dev/null +++ b/parsl/usage_tracking/api.py @@ -0,0 +1,66 @@ +from parsl.utils import RepresentationMixin + +from abc import abstractmethod +from functools import singledispatch +from typing import Any, List, Sequence + + +# Traverse the configuration heirarchy, returning a JSON component +# for each one. Configuration components which implement +# RepresentationMixin will be in the right form for inspecting +# object attributes. Configuration components which are lists or tuples +# are traversed in sequence. Other types default to reporting no +# usage information. + +@singledispatch +def get_parsl_usage(obj) -> List[Any]: + return [] + + +@get_parsl_usage.register +def get_parsl_usage_representation_mixin(obj: RepresentationMixin): + t = type(obj) + qualified_name = t.__module__ + "." + t.__name__ + + # me can contain anything that can be rendered as JSON + me: List[Any] = [] + + if isinstance(obj, UsageInformation): + # report rich usage information for this component + attrs = {'c': qualified_name} + attrs.update(obj.get_usage_information()) + me = [attrs] + else: + # report the class name of this component + me = [qualified_name] + + # unwrap typeguard-style unwrapping + init: Any = type(obj).__init__ + if hasattr(init, '__wrapped__'): + init = init.__wrapped__ + + import inspect + + argspec = inspect.getfullargspec(init) + + for arg in argspec.args[1:]: # skip first arg, self + arg_value = getattr(obj, arg) + d = get_parsl_usage(arg_value) + me += d + + return me + + +@get_parsl_usage.register(list) +@get_parsl_usage.register(tuple) +def get_parsl_usage_sequence(obj: Sequence): + result = [] + for v in obj: + result += get_parsl_usage(v) + return result + + +class UsageInformation: + @abstractmethod + def get_usage_information(self) -> dict: + pass diff --git a/parsl/usage_tracking/usage.py b/parsl/usage_tracking/usage.py index d13731f478..fed77e6ec2 100644 --- a/parsl/usage_tracking/usage.py +++ b/parsl/usage_tracking/usage.py @@ -7,6 +7,7 @@ import sys import platform +from parsl.usage_tracking.api import get_parsl_usage from parsl.utils import setproctitle from parsl.multiprocessing import ForkProcess from parsl.dataflow.states import States @@ -17,6 +18,13 @@ from typing import Callable from typing_extensions import ParamSpec +# protocol version byte: when (for example) compression parameters are changed +# that cannot be inferred from the compressed message itself, this version +# ID needs to imply those parameters. + +# Earlier protocol versions: b'{' - the original pure-JSON protocol pre-March 2024 +PROTOCOL_VERSION = b'1' + P = ParamSpec("P") @@ -32,7 +40,7 @@ def run(*args, **kwargs): @async_process -def udp_messenger(domain_name: str, UDP_PORT: int, sock_timeout: int, message: str) -> None: +def udp_messenger(domain_name: str, UDP_PORT: int, sock_timeout: int, message: bytes) -> None: """Send UDP messages to usage tracker asynchronously This multiprocessing based messenger was written to overcome the limitations @@ -46,16 +54,11 @@ def udp_messenger(domain_name: str, UDP_PORT: int, sock_timeout: int, message: s setproctitle("parsl: Usage tracking") try: - encoded_message = bytes(message, "utf-8") - UDP_IP = socket.gethostbyname(domain_name) - if UDP_PORT is None: - raise Exception("UDP_PORT is None") - sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP sock.settimeout(sock_timeout) - sock.sendto(encoded_message, (UDP_IP, UDP_PORT)) + sock.sendto(message, (UDP_IP, UDP_PORT)) sock.close() except socket.timeout: @@ -102,7 +105,7 @@ def __init__(self, dfk, port=50077, self.procs = [] self.dfk = dfk self.config = self.dfk.config - self.uuid = str(uuid.uuid4()) + self.correlator_uuid = str(uuid.uuid4()) self.parsl_version = PARSL_VERSION self.python_version = "{}.{}.{}".format(sys.version_info.major, sys.version_info.minor, @@ -131,22 +134,23 @@ def check_tracking_enabled(self): return track - def construct_start_message(self) -> str: + def construct_start_message(self) -> bytes: """Collect preliminary run info at the start of the DFK. Returns : - Message dict dumped as json string, ready for UDP """ - message = {'uuid': self.uuid, + message = {'correlator': self.correlator_uuid, 'parsl_v': self.parsl_version, 'python_v': self.python_version, - 'os': platform.system(), - 'os_v': platform.release(), - 'start': time.time()} + 'platform.system': platform.system(), + 'start': int(time.time()), + 'components': get_parsl_usage(self.dfk._config)} + logger.debug(f"Usage tracking start message: {message}") - return json.dumps(message) + return self.encode_message(message) - def construct_end_message(self) -> str: + def construct_end_message(self) -> bytes: """Collect the final run information at the time of DFK cleanup. Returns: @@ -158,16 +162,20 @@ def construct_end_message(self) -> str: app_fails = self.dfk.task_state_counts[States.failed] + self.dfk.task_state_counts[States.dep_fail] - message = {'uuid': self.uuid, - 'end': time.time(), + message = {'correlator': self.correlator_uuid, + 'end': int(time.time()), 't_apps': app_count, 'sites': site_count, - 'failed': app_fails - } + 'failed': app_fails, + 'components': get_parsl_usage(self.dfk._config)} + logger.debug(f"Usage tracking end message (unencoded): {message}") + + return self.encode_message(message) - return json.dumps(message) + def encode_message(self, obj): + return PROTOCOL_VERSION + json.dumps(obj).encode('utf-8') - def send_UDP_message(self, message: str) -> None: + def send_UDP_message(self, message: bytes) -> None: """Send UDP message.""" if self.tracking_enabled: try: From 8beeba13be9e94874bb0e4577e95bd88c502e784 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Mon, 11 Mar 2024 10:14:21 +0000 Subject: [PATCH 2/3] move site count away from being a hard-coded parameter to being usageinfo of Config object --- parsl/config.py | 6 +++++- parsl/usage_tracking/usage.py | 3 --- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/parsl/config.py b/parsl/config.py index 4f3b7be23a..a06cb1ac44 100644 --- a/parsl/config.py +++ b/parsl/config.py @@ -10,11 +10,12 @@ from parsl.errors import ConfigurationError from parsl.dataflow.taskrecord import TaskRecord from parsl.monitoring import MonitoringHub +from parsl.usage_tracking.api import UsageInformation logger = logging.getLogger(__name__) -class Config(RepresentationMixin): +class Config(RepresentationMixin, UsageInformation): """ Specification of Parsl configuration options. @@ -140,3 +141,6 @@ def _validate_executors(self) -> None: if len(duplicates) > 0: raise ConfigurationError('Executors must have unique labels ({})'.format( ', '.join(['label={}'.format(repr(d)) for d in duplicates]))) + + def get_usage_information(self): + return {"executors_len": len(self.executors)} diff --git a/parsl/usage_tracking/usage.py b/parsl/usage_tracking/usage.py index fed77e6ec2..2920a600fa 100644 --- a/parsl/usage_tracking/usage.py +++ b/parsl/usage_tracking/usage.py @@ -158,14 +158,11 @@ def construct_end_message(self) -> bytes: """ app_count = self.dfk.task_count - site_count = len(self.dfk.config.executors) - app_fails = self.dfk.task_state_counts[States.failed] + self.dfk.task_state_counts[States.dep_fail] message = {'correlator': self.correlator_uuid, 'end': int(time.time()), 't_apps': app_count, - 'sites': site_count, 'failed': app_fails, 'components': get_parsl_usage(self.dfk._config)} logger.debug(f"Usage tracking end message (unencoded): {message}") From f21d88e5ca2e6626008a2bed8cd46ff522599b94 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Mon, 11 Mar 2024 10:06:38 +0000 Subject: [PATCH 3/3] abstract away from requiring a DFK to support non-DFK users of parsl --- parsl/usage_tracking/usage.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/parsl/usage_tracking/usage.py b/parsl/usage_tracking/usage.py index 2920a600fa..22986b1402 100644 --- a/parsl/usage_tracking/usage.py +++ b/parsl/usage_tracking/usage.py @@ -160,11 +160,16 @@ def construct_end_message(self) -> bytes: app_fails = self.dfk.task_state_counts[States.failed] + self.dfk.task_state_counts[States.dep_fail] + # the DFK is tangled into this code as a god-object, so it is + # handled separately from the usual traversal code, but presenting + # the same protocol-level report. + dfk_component = {'c': type(self.dfk).__module__ + "." + type(self.dfk).__name__, + 'app_count': app_count, + 'app_fails': app_fails} + message = {'correlator': self.correlator_uuid, 'end': int(time.time()), - 't_apps': app_count, - 'failed': app_fails, - 'components': get_parsl_usage(self.dfk._config)} + 'components': [dfk_component] + get_parsl_usage(self.dfk._config)} logger.debug(f"Usage tracking end message (unencoded): {message}") return self.encode_message(message)